from datetime import datetime from itertools import groupby from statistics import mean from sqlalchemy import func from sqlmodel import Session, select from models.measurement import IndoorMeasurement, IndoorMeasurementCreateRequest, OutdoorMeasurementCreateRequest, \ OutdoorMeasurement, MeasurementListResponse, StationMeasurementResponse, MeasurementResponse, MeasurementResolution from models.station import Station, StationCreateRequest, StationListResponse from services import stationService from coolname import generate_slug def push_indoor_measurement(session: Session, raw_measurement: IndoorMeasurementCreateRequest): statement = select(Station).where(Station.mac == raw_measurement.mac) station = session.exec(statement).first() if not station: station = stationService.create_station(session, StationCreateRequest( mac=raw_measurement.mac, name=generate_slug(2) )) measurement = IndoorMeasurement( station_id=station.id, temperature=raw_measurement.temperature, humidity=raw_measurement.humidity ) session.add(IndoorMeasurement.model_validate(measurement)) session.commit() def push_outdoor_measurement(session: Session, raw_measurement: OutdoorMeasurementCreateRequest): statement = select(Station).where(Station.mac == raw_measurement.mac) station = session.exec(statement).first() if not station: station = stationService.create_station(session, StationCreateRequest( mac=raw_measurement.mac, name=generate_slug(2) )) measurement = OutdoorMeasurement( station_id=station.id, temperature=raw_measurement.temperature, humidity=raw_measurement.humidity, pressure=raw_measurement.pressure ) session.add(OutdoorMeasurement.model_validate(measurement)) session.commit() from typing import Type, Union def aggregate(measurements: list[MeasurementResponse], resolution: MeasurementResolution) -> list[MeasurementResponse]: def period_key(m: MeasurementResponse) -> str: formats = { MeasurementResolution.hourly: "%Y-%m-%d %H", MeasurementResolution.daily: "%Y-%m-%d", MeasurementResolution.weekly: "%Y-%W", MeasurementResolution.monthly: "%Y-%m", MeasurementResolution.yearly: "%Y", } return m.timestamp.strftime(formats[resolution]) result = [] sorted_measurements = sorted(measurements, key=period_key) for _, group in groupby(sorted_measurements, key=period_key): group_list = list(group) result.append(MeasurementResponse( timestamp=group_list[0].timestamp, temperature=round(mean(m.temperature for m in group_list if m.temperature is not None), 1), humidity=round(mean(m.humidity for m in group_list if m.humidity is not None), 1) if any(m.humidity for m in group_list) else None, pressure=round(mean(m.pressure for m in group_list if m.pressure is not None), 1) if any(m.pressure for m in group_list) else None, )) return result def _query_measurements( session: Session, model: Type[Union[IndoorMeasurement, OutdoorMeasurement]], indoor: bool, station_ids: list[int] | None, from_timestamp: datetime | None = None, to_timestamp: datetime | None = None, limit: int | None = None, resolution: MeasurementResolution = MeasurementResolution.hourly, ) -> list[StationMeasurementResponse]: statement = select(model) if station_ids: statement = statement.where(model.station_id.in_(station_ids)) if from_timestamp: statement = statement.where(model.timestamp >= from_timestamp) if to_timestamp: statement = statement.where(model.timestamp <= to_timestamp) statement = statement.order_by(model.timestamp.desc()) results = session.exec(statement).all() grouped: dict[int, list[MeasurementResponse]] = {} for m in results: if m.station_id not in grouped: grouped[m.station_id] = [] grouped[m.station_id].append(MeasurementResponse.model_validate(m)) if resolution != MeasurementResolution.raw: grouped = { station_id: aggregate(measurements, resolution) for station_id, measurements in grouped.items() } if limit: grouped = { station_id: measurements[:limit] for station_id, measurements in grouped.items() } return [ StationMeasurementResponse( station=StationListResponse.model_validate(session.get(Station, station_id)), measurements=measurements, indoor=indoor ) for station_id, measurements in grouped.items() ] def get_indoor_measurements(session: Session, station_ids: list[int] | None, from_timestamp: datetime | None = None, to_timestamp: datetime | None = None, limit: int | None = None) -> list[StationMeasurementResponse]: return _query_measurements(session, IndoorMeasurement, True, station_ids, from_timestamp, to_timestamp, limit) def get_outdoor_measurements(session: Session, station_ids: list[int] | None, from_timestamp: datetime | None = None, to_timestamp: datetime | None = None, limit: int | None = None) -> list[StationMeasurementResponse]: return _query_measurements(session, OutdoorMeasurement, False, station_ids, from_timestamp, to_timestamp, limit) def get_measurements(session: Session, station_ids: list[int] | None, indoor: bool, from_timestamp: datetime | None, to_timestamp: datetime | None,limit: int, resolution: MeasurementResolution ): if indoor is None: indoor_results = get_indoor_measurements(session, station_ids, from_timestamp, to_timestamp, limit) outdoor_results = get_outdoor_measurements(session, station_ids, from_timestamp, to_timestamp, limit) return MeasurementListResponse( stations=[ *[StationMeasurementResponse(station=indoor_result.station, measurements=indoor_result.measurements, indoor=True) for indoor_result in indoor_results], *[StationMeasurementResponse(station=outdoor_result.station, measurements=outdoor_result.measurements, indoor=False) for outdoor_result in outdoor_results], ] ) else: if indoor: indoor_results = get_indoor_measurements(session, station_ids, from_timestamp, to_timestamp, limit) return MeasurementListResponse( stations=[ *[StationMeasurementResponse(station=indoor_result.station, measurements=indoor_result.measurements, indoor=True) for indoor_result in indoor_results], ] ) else: outdoor_results = get_outdoor_measurements(session, station_ids, from_timestamp, to_timestamp, limit) return MeasurementListResponse( stations=[ *[StationMeasurementResponse(station=outdoor_result.station, measurements=outdoor_result.measurements, indoor=False) for outdoor_result in outdoor_results], ] )