WetterstationServer/main.py
2026-06-28 16:55:51 +02:00

57 lines
2.7 KiB
Python

import functools
from datetime import datetime
from fastapi import FastAPI, Depends, Query
from enum import Enum
from sqlmodel import Session
from db import get_session
from models.measurement import IndoorMeasurementCreateRequest, MeasurementListResponse, OutdoorMeasurementCreateRequest, \
MeasurementResolution
from models.station import StationCreateRequest, StationCreateResponse, Station, StationListResponse, \
StationUpdateResponse, StationUpdateRequest
from services import stationService, measurementService
app = FastAPI()
@app.get("/stations/list")
async def list_stations(session: Session = Depends(get_session)):
result = stationService.list_stations(session)
return [StationListResponse.model_validate(station) for station in result]
@app.post("/stations/create", response_model=StationCreateResponse, status_code=200)
async def create_station(station_data: StationCreateRequest, session: Session = Depends(get_session)) -> StationCreateResponse:
station = stationService.create_station(session, station_data)
return StationCreateResponse.model_validate(station)
@app.patch("/stations/update", response_model=StationUpdateResponse, status_code=200)
async def update_station(station_data: StationUpdateRequest, session: Session = Depends(get_session)):
station = stationService.update_station(session, station_data)
return StationUpdateResponse.model_validate(station)
@app.delete("/stations/{station_id}", status_code=204)
async def delete_station(station_id: int, session: Session = Depends(get_session)):
stationService.delete_station(session, station_id)
@app.post("/measurements/indoor", status_code=204)
async def create_indoor_measurement(data: IndoorMeasurementCreateRequest, session: Session = Depends(get_session)):
measurementService.push_indoor_measurement(session, data)
@app.post("/measurements/outdoor", status_code=204)
async def create_outdoor_measurement(data: OutdoorMeasurementCreateRequest, session: Session = Depends(get_session)):
measurementService.push_outdoor_measurement(session, data)
@app.get("/measurements", response_model=MeasurementListResponse, status_code=200)
async def get_measurements(
station_ids: list[int] | None = Query(default=None),
indoor: bool | None = Query(default=None),
from_timestamp: datetime | None = None,
to_timestamp: datetime | None = None,
resolution: MeasurementResolution = MeasurementResolution.hourly,
limit: int = 100,
session: Session = Depends(get_session)):
return measurementService.get_measurements(session, station_ids, indoor, from_timestamp, to_timestamp, limit, resolution)
@app.get("/hello/{name}")
async def say_hello(name: str):
return {"message": f"Hello {name}"}