WetterstationServer/services/measurementService.py
2026-06-28 17:04:28 +02:00

155 lines
7.0 KiB
Python

from datetime import datetime
from itertools import groupby
from statistics import mean
from sqlalchemy import func
from sqlmodel import Session, select
from models.measurement import IndoorMeasurement, IndoorMeasurementCreateRequest, OutdoorMeasurementCreateRequest, \
OutdoorMeasurement, MeasurementListResponse, StationMeasurementResponse, MeasurementResponse, MeasurementResolution
from models.station import Station, StationCreateRequest, StationListResponse
from services import stationService
from coolname import generate_slug
def push_indoor_measurement(session: Session, raw_measurement: IndoorMeasurementCreateRequest):
statement = select(Station).where(Station.mac == raw_measurement.mac)
station = session.exec(statement).first()
if not station:
station = stationService.create_station(session, StationCreateRequest(
mac=raw_measurement.mac,
name=generate_slug(2)
))
measurement = IndoorMeasurement(
station_id=station.id,
temperature=raw_measurement.temperature,
humidity=raw_measurement.humidity
)
session.add(IndoorMeasurement.model_validate(measurement))
session.commit()
def push_outdoor_measurement(session: Session, raw_measurement: OutdoorMeasurementCreateRequest):
statement = select(Station).where(Station.mac == raw_measurement.mac)
station = session.exec(statement).first()
if not station:
station = stationService.create_station(session, StationCreateRequest(
mac=raw_measurement.mac,
name=generate_slug(2)
))
measurement = OutdoorMeasurement(
station_id=station.id,
temperature=raw_measurement.temperature,
humidity=raw_measurement.humidity,
pressure=raw_measurement.pressure
)
session.add(OutdoorMeasurement.model_validate(measurement))
session.commit()
from typing import Type, Union
def aggregate(measurements: list[MeasurementResponse], resolution: MeasurementResolution) -> list[MeasurementResponse]:
def period_key(m: MeasurementResponse) -> str:
formats = {
MeasurementResolution.hourly: "%Y-%m-%d %H",
MeasurementResolution.daily: "%Y-%m-%d",
MeasurementResolution.weekly: "%Y-%W",
MeasurementResolution.monthly: "%Y-%m",
MeasurementResolution.yearly: "%Y",
}
return m.timestamp.strftime(formats[resolution])
result = []
sorted_measurements = sorted(measurements, key=period_key)
for _, group in groupby(sorted_measurements, key=period_key):
group_list = list(group)
result.append(MeasurementResponse(
timestamp=group_list[0].timestamp,
temperature=round(mean(m.temperature for m in group_list if m.temperature is not None), 1),
humidity=round(mean(m.humidity for m in group_list if m.humidity is not None), 1) if any(m.humidity for m in group_list) else None,
pressure=round(mean(m.pressure for m in group_list if m.pressure is not None), 1) if any(m.pressure for m in group_list) else None,
))
return result
def _query_measurements(
session: Session,
model: Type[Union[IndoorMeasurement, OutdoorMeasurement]],
indoor: bool,
station_ids: list[int] | None,
from_timestamp: datetime | None = None,
to_timestamp: datetime | None = None,
limit: int | None = None,
resolution: MeasurementResolution = MeasurementResolution.hourly,
) -> list[StationMeasurementResponse]:
statement = select(model)
if station_ids:
statement = statement.where(model.station_id.in_(station_ids))
if from_timestamp:
statement = statement.where(model.timestamp >= from_timestamp)
if to_timestamp:
statement = statement.where(model.timestamp <= to_timestamp)
statement = statement.order_by(model.timestamp.desc())
results = session.exec(statement).all()
grouped: dict[int, list[MeasurementResponse]] = {}
for m in results:
if m.station_id not in grouped:
grouped[m.station_id] = []
grouped[m.station_id].append(MeasurementResponse.model_validate(m))
if resolution != MeasurementResolution.raw:
grouped = {
station_id: aggregate(measurements, resolution)
for station_id, measurements in grouped.items()
}
if limit:
grouped = {
station_id: measurements[:limit]
for station_id, measurements in grouped.items()
}
return [
StationMeasurementResponse(
station=StationListResponse.model_validate(session.get(Station, station_id)),
measurements=measurements,
indoor=indoor
)
for station_id, measurements in grouped.items()
]
def get_indoor_measurements(session: Session, station_ids: list[int] | None, from_timestamp: datetime | None = None, to_timestamp: datetime | None = None, limit: int | None = None) -> list[StationMeasurementResponse]:
return _query_measurements(session, IndoorMeasurement, True, station_ids, from_timestamp, to_timestamp, limit)
def get_outdoor_measurements(session: Session, station_ids: list[int] | None, from_timestamp: datetime | None = None, to_timestamp: datetime | None = None, limit: int | None = None) -> list[StationMeasurementResponse]:
return _query_measurements(session, OutdoorMeasurement, False, station_ids, from_timestamp, to_timestamp, limit)
def get_measurements(session: Session, station_ids: list[int] | None, indoor: bool, from_timestamp: datetime | None, to_timestamp: datetime | None,limit: int, resolution: MeasurementResolution ):
if indoor is None:
indoor_results = get_indoor_measurements(session, station_ids, from_timestamp, to_timestamp, limit)
outdoor_results = get_outdoor_measurements(session, station_ids, from_timestamp, to_timestamp, limit)
return MeasurementListResponse(
stations=[
*[StationMeasurementResponse(station=indoor_result.station, measurements=indoor_result.measurements, indoor=True) for indoor_result in indoor_results],
*[StationMeasurementResponse(station=outdoor_result.station, measurements=outdoor_result.measurements, indoor=False) for outdoor_result in outdoor_results],
]
)
else:
if indoor:
indoor_results = get_indoor_measurements(session, station_ids, from_timestamp, to_timestamp, limit)
return MeasurementListResponse(
stations=[
*[StationMeasurementResponse(station=indoor_result.station, measurements=indoor_result.measurements, indoor=True) for indoor_result in indoor_results],
]
)
else:
outdoor_results = get_outdoor_measurements(session, station_ids, from_timestamp, to_timestamp, limit)
return MeasurementListResponse(
stations=[
*[StationMeasurementResponse(station=outdoor_result.station, measurements=outdoor_result.measurements, indoor=False) for outdoor_result in outdoor_results],
]
)