WetterstationServer/services/measurementService.py
2026-06-28 16:06:26 +02:00

116 lines
5.3 KiB
Python

from datetime import datetime
from sqlmodel import Session, select
from models.measurement import IndoorMeasurement, IndoorMeasurementCreateRequest, OutdoorMeasurementCreateRequest, \
OutdoorMeasurement, MeasurementListResponse, StationMeasurementResponse, MeasurementResponse
from models.station import Station, StationCreateRequest, StationListResponse
from services import stationService
from coolname import generate_slug
def push_indoor_measurement(session: Session, raw_measurement: IndoorMeasurementCreateRequest):
statement = select(Station).where(Station.mac == raw_measurement.mac)
station = session.exec(statement).first()
if not station:
station = stationService.create_station(session, StationCreateRequest(
mac=raw_measurement.mac,
name=generate_slug(2)
))
measurement = IndoorMeasurement(
station_id=station.id,
temperature=raw_measurement.temperature,
humidity=raw_measurement.humidity
)
session.add(IndoorMeasurement.model_validate(measurement))
session.commit()
def push_outdoor_measurement(session: Session, raw_measurement: OutdoorMeasurementCreateRequest):
statement = select(Station).where(Station.mac == raw_measurement.mac)
station = session.exec(statement).first()
if not station:
station = stationService.create_station(session, StationCreateRequest(
mac=raw_measurement.mac,
name=generate_slug(2)
))
measurement = OutdoorMeasurement(
station_id=station.id,
temperature=raw_measurement.temperature,
humidity=raw_measurement.humidity,
pressure=raw_measurement.pressure
)
session.add(OutdoorMeasurement.model_validate(measurement))
session.commit()
from typing import Type, Union
def _query_measurements(
session: Session,
model: Type[Union[IndoorMeasurement, OutdoorMeasurement]],
indoor: bool,
station_ids: list[int] | None,
from_timestamp: datetime | None = None,
to_timestamp: datetime | None = None,
limit: int | None = None
) -> list[StationMeasurementResponse]:
statement = select(model)
if station_ids:
statement = statement.where(model.station_id.in_(station_ids))
if from_timestamp:
statement = statement.where(model.timestamp >= from_timestamp)
if to_timestamp:
statement = statement.where(model.timestamp <= to_timestamp)
statement = statement.order_by(model.timestamp.desc()).limit(limit)
results = session.exec(statement).all()
grouped: dict[int, list[MeasurementResponse]] = {}
for m in results:
if m.station_id not in grouped:
grouped[m.station_id] = []
grouped[m.station_id].append(MeasurementResponse.model_validate(m))
return [
StationMeasurementResponse(
station=StationListResponse.model_validate(session.get(Station, station_id)),
measurements=measurements,
indoor=indoor
)
for station_id, measurements in grouped.items()
]
def get_indoor_measurements(session: Session, station_ids: list[int] | None, from_timestamp: datetime | None = None, to_timestamp: datetime | None = None, limit: int | None = None) -> list[StationMeasurementResponse]:
return _query_measurements(session, IndoorMeasurement, True, station_ids, from_timestamp, to_timestamp, limit)
def get_outdoor_measurements(session: Session, station_ids: list[int] | None, from_timestamp: datetime | None = None, to_timestamp: datetime | None = None, limit: int | None = None) -> list[StationMeasurementResponse]:
return _query_measurements(session, OutdoorMeasurement, False, station_ids, from_timestamp, to_timestamp, limit)
def get_measurements(session: Session, station_ids: list[int] | None, indoor: bool | None, from_timestamp: datetime | None = None, to_timestamp: datetime | None = None, limit: int | None = None):
if indoor is None:
indoor_results = get_indoor_measurements(session, station_ids, from_timestamp, to_timestamp, limit)
outdoor_results = get_outdoor_measurements(session, station_ids, from_timestamp, to_timestamp, limit)
return MeasurementListResponse(
stations=[
*[StationMeasurementResponse(station=indoor_result.station, measurements=indoor_result.measurements, indoor=True) for indoor_result in indoor_results],
*[StationMeasurementResponse(station=outdoor_result.station, measurements=outdoor_result.measurements, indoor=False) for outdoor_result in outdoor_results],
]
)
else:
if indoor:
indoor_results = get_indoor_measurements(session, station_ids, from_timestamp, to_timestamp, limit)
return MeasurementListResponse(
stations=[
*[StationMeasurementResponse(station=indoor_result.station, measurements=indoor_result.measurements, indoor=True) for indoor_result in indoor_results],
]
)
else:
outdoor_results = get_outdoor_measurements(session, station_ids, from_timestamp, to_timestamp, limit)
return MeasurementListResponse(
stations=[
*[StationMeasurementResponse(station=outdoor_result.station, measurements=outdoor_result.measurements, indoor=False) for outdoor_result in outdoor_results],
]
)