Quellcode für backend.main

# libre-stage - Band rehearsal and gig management software
# Copyright (C) 2026  libre-stage contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""
FastAPI application entry point.

Creates the :class:`fastapi.FastAPI` instance, registers all middleware
(CORS, GZip, rate limiting), mounts all API routers and defines the
core endpoints that do not belong to a specific sub-resource:

- ``POST /login`` – password-based login, returns access + refresh tokens
- ``POST /refresh`` – exchange a refresh token for a new access token
- ``POST /logout`` – revoke tokens and clear cookies
- ``GET  /me`` – return the authenticated user's profile
- ``PUT  /update_user`` – update own user profile
- ``PUT  /change_password`` – change own password
- ``GET  /user_list`` – list all users (authenticated)
- ``GET  /user_todo_list`` – personal to-do and feedback list
- ``GET  /app_info`` – application metadata
- ``GET  /db_health`` – database connectivity probe
"""

import logging

from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
import json

#slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

from sqlalchemy import func, text
from sqlalchemy.orm import Session
from backend import database, models, schemas, auth
from backend.app_config import app_config  # Validiert appConfig.json beim Startup
from datetime import datetime, timezone
from dotenv import load_dotenv

from typing import List
import os
from pathlib import Path


from backend.routers import gigs, songs, rehearsals, surveys, cal, admin, password_reset, public, gigs_livemode
from backend.utils.token_cleanup import cleanup_expired_tokens
from backend.utils.password_validator import validate_password

import asyncio

# Limiter initialisieren
limiter = Limiter(key_func=get_remote_address)

api_prefix = ""
assert type(api_prefix) is str

logger = logging.getLogger("uvicorn")
load_dotenv()

log_level_name = os.getenv("LOG_LEVEL", "INFO").upper()
log_level = getattr(logging, log_level_name, None)
if not isinstance(log_level, int):
    logger.warning("Invalid LOG_LEVEL '%s'. Falling back to INFO.", log_level_name)
    log_level = logging.INFO

# Ensure LOG_LEVEL consistently applies to server and app loggers.
for logger_name in (
    "uvicorn",
    "uvicorn.error",
    "uvicorn.access",
    "gunicorn.error",
    "gunicorn.access",
    "granian",
    "granian.access",
):
    logging.getLogger(logger_name).setLevel(log_level)

logging.getLogger().setLevel(log_level)

docs_url = os.getenv("DOCS_URL", None)
openapi_url = os.getenv("OPENAPI_URL", None)

version_path = Path(__file__).parent.parent / "version.json"
try:
    logger.info(f"Searching for version dict in:\n\t {version_path}")
    with open(version_path, "r") as f:
        version_dict = json.load(f) # pragma: no cover
except FileNotFoundError:
    logger.info("Version file not found.")
    version_dict = {
        "release": "0.0.0",
        "date": "1970-01-01T12:00:00Z",
        "title": "Band Manager",
        "Description": "Internal band management platform.",
    }


app = FastAPI(
    root_path=api_prefix,
    redoc_url=None,
    docs_url=docs_url,
    openapi_url=openapi_url,
    version=version_dict.get("release", "0.0.0"),
    title=version_dict.get("title", "Band Manager"),
    description=version_dict.get("Description", "")
)

app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

# ===== EXCEPTION HANDLERS =====
from fastapi.responses import JSONResponse

[Doku] @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): """Excepts all unhandled exceptions, logs them and returns a generic error message""" logger.error(f"Unhandled error on {request.url.path}: {exc}", exc_info=True) return JSONResponse( status_code=500, content={"detail": "Ein interner Fehler ist aufgetreten"} )
[Doku] @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): """Specific handler for HTTPExceptions with logging""" logger.warning(f"HTTP {exc.status_code} on {request.url.path}: {exc.detail}") return JSONResponse( status_code=exc.status_code, content={"detail": exc.detail} )
origins_env = os.getenv("CORS_ORIGINS", "") origins = [origin.strip() for origin in origins_env.split(",") if origin.strip()] print (origins) # CORS für Entwicklung (Frontend <-> Backend) app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.add_middleware(GZipMiddleware, minimum_size=1000, compresslevel=5)
[Doku] @app.on_event("startup") async def startup_event(): """Führe Token-Cleanup bei Startup aus""" logger.info("Running token cleanup on startup...") db = database.SessionLocal() try: cleanup_expired_tokens(db) except Exception as e: logger.error(f"Token cleanup failed: {e}") finally: db.close()
[Doku] async def periodic_token_cleanup(): """Background Task für periodisches Token-Cleanup (alle 24h)""" while True: await asyncio.sleep(86400) # 24 Stunden logger.info("Running periodic token cleanup...") db = database.SessionLocal() try: cleanup_expired_tokens(db) except Exception as e: logger.error(f"Periodic token cleanup failed: {e}") finally: db.close()
[Doku] @app.on_event("startup") async def start_background_tasks(): """Starte Background Tasks""" asyncio.create_task(periodic_token_cleanup())
[Doku] def get_todo_list(user_name: str, db: Session): user = db.query(models.User).filter_by(user_name=user_name).first() db_todos = (db.query(models.RehTodo, models.Song.title, models.Song.interpret) .filter_by(id_user=user.id) .join(models.Song, models.RehTodo.id_song == models.Song.id) .all() ) db_songs_to_todo = ( db.query(models.Song) .filter(models.Song.status == 'vorschlag') .outerjoin( models.SongCandidateFeedback, (models.SongCandidateFeedback.song_id == models.Song.id) & (models.SongCandidateFeedback.user_id == user.id) ) .filter(models.SongCandidateFeedback.id == None) .all() ) db_surveys_to_todo = ( db.query(models.Surveys) .filter(models.Surveys.released == True) .filter(models.Surveys.closed == False) .outerjoin( models.SurveyFields, models.SurveyFields.id_survey == models.Surveys.id ) .outerjoin( models.SurveyFeedback, (models.SurveyFeedback.id_sv_field == models.SurveyFields.id) & (models.SurveyFeedback.id_user == user.id) ) .group_by(models.Surveys.id) .having(func.count(models.SurveyFeedback.id) == 0) .all() ) result_list = { "todo": [ { "id": todo.id, "todo": todo.todo, "user_name": user.user_name, "done": todo.done, "song_title": title, "song_interpret": interpret, "dt": todo.dt } for todo, title, interpret in db_todos ], "songs_to_feedback": [ { "id": song.id, "title": song.title, "interpret": song.interpret, "status": song.status } for song in db_songs_to_todo ], "surveys_to_feedback": [ { "id": survey.id, "kind_of_survey": survey.kind_of_survey, "rf_survey": survey.rf_survey, "release_date": survey.release_date.isoformat() if survey.release_date else None } for survey in db_surveys_to_todo ] } return result_list
[Doku] @app.post("/login") @limiter.limit("10/minute") def login( request: Request, data: schemas.LoginRequest, db: Session = Depends(auth.get_db) ): user = auth.authenticate_user(db, data.username, data.password) if not user: raise HTTPException(status_code=401, detail="Invalid credentials") # Access Token mit kurzer Laufzeit (15 Min) access_token = auth.create_access_token({ "sub": user.user_name, "role": user.user_group }) # Refresh Token mit langer Laufzeit (30 Tage) refresh_token = auth.create_refresh_token(user.id, db) # Tokens im Response Body zurückgeben (nicht in Cookies) # Client speichert diese im localStorage return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer", "expires_in": auth.ACCESS_TOKEN_EXPIRE_MINUTES * 60, "message": "Login successful" }
[Doku] @app.post("/refresh") @limiter.limit("30/minute") def refresh_token( request: Request, refresh_data: schemas.RefreshRequest, db: Session = Depends(auth.get_db) ): """ Refresh Token aus Request Body lesen und neuen Access Token generieren. Erwartet: { "refresh_token": "..." } Multi-Device Support: Der Refresh Token wird NICHT rotiert/revoked, damit mehrere Geräte gleichzeitig eingeloggt bleiben können. """ refresh_token = refresh_data.refresh_token if not refresh_token: raise HTTPException(status_code=401, detail="No refresh token provided") # Verifiziere Refresh Token und hole User user = auth.verify_refresh_token(refresh_token, db) # Erstelle nur neuen Access Token - behalte Refresh Token bei new_access_token = auth.create_access_token({ "sub": user.user_name, "role": user.user_group }) # Gebe den gleichen Refresh Token zurück (kein neuer Token) return { "access_token": new_access_token, "refresh_token": refresh_token, # Gleicher Token "token_type": "bearer", "expires_in": auth.ACCESS_TOKEN_EXPIRE_MINUTES * 60, "message": "Token refreshed" }
[Doku] @app.post("/logout") def logout( request: Request, logout_data: schemas.LogoutRequest = None, db: Session = Depends(auth.get_db) ): """ Logout: Blacklist Access Token und revoke Refresh Token. Token wird aus Authorization Header gelesen. """ # Token aus Header extrahieren token = auth.get_token_from_cookie_or_header(request) # Blacklist Access Token if token: auth.blacklist_access_token(token, db) # Revoke Refresh Token wenn im Body mitgeschickt if logout_data and logout_data.refresh_token: try: auth.revoke_refresh_token(logout_data.refresh_token, db) except Exception as e: logger.warning(f"Could not revoke refresh token during logout: {e}") return {"message": "Logged out successfully"}
[Doku] @app.get("/me", response_model=schemas.UserOut) def get_me(current = Depends(auth.get_current_user), db: Session = Depends(auth.get_db)): user = db.query(models.User).filter(models.User.user_name == current["user_name"]).first() if not user: raise HTTPException(status_code=404, detail="User not found") if user.musician == None: user.musician = 0 return user
[Doku] @app.put("/update_user", response_model=schemas.UserOut) @limiter.limit("10/minute") def update_user( request: Request, user: schemas.UserOut, current=Depends(auth.get_current_user), db: Session = Depends(auth.get_db)): user_db = db.query(models.User).filter(models.User.user_name == current["user_name"]).first() if not user_db.id == user.id: raise HTTPException(status_code=403, detail="Your are not allowed to update this user!") # Felder, die niemals verändert werden dürfen: forbidden_fields = {"user_name", "user_role"} for k, v in user.model_dump(exclude_unset=True).items(): if getattr(user_db, k) == v: continue # Wert unterschiedlich und Feld ist verboten -> Abbruch if k in forbidden_fields: raise HTTPException(status_code=403, detail=f"Not allowed to update field '{k}'") setattr(user_db, k, v) db.commit() db.refresh(user_db) return user_db
[Doku] @app.put("/change_password") @limiter.limit("5/minute") def change_password( request: Request, data: schemas.PasswordUpdateRequest, current=Depends(auth.get_current_user), db: Session = Depends(auth.get_db)): user_db = db.query(models.User).filter(models.User.user_name == current["user_name"]).first() if not user_db: raise HTTPException(status_code=404, detail="User not found") if not user_db.id == data.user_id: raise HTTPException(status_code=403, detail="Your are not allowed to update this user!") if not auth.verify_password(data.old_password, user_db.user_pw): raise HTTPException(status_code=403, detail="Wrong password!") is_valid, error_msg = validate_password(data.new_password) if not is_valid: raise HTTPException(status_code=400, detail=error_msg) if auth.verify_password(data.new_password, user_db.user_pw): raise HTTPException(status_code=403, detail="Old and new passwords are identical") user_db.user_pw = auth.hash_pw(data.new_password) db.commit() return {"msg": "Password updates successfully"}
[Doku] @app.get("/user_list", response_model=List[schemas.UserListElem]) def get_users_list( current=Depends(auth.get_current_user), db: Session = Depends(auth.get_db) ): users = db.query(models.User).filter(models.User.musician==1, models.User.status=="active") return users
[Doku] @app.get("/user_todos", response_model=schemas.UserTodoList) def get_user_todo( current=Depends(auth.get_current_user), db: Session = Depends(auth.get_db) ): return get_todo_list(current["user_name"], db)
[Doku] @app.put("/user_todos_done", response_model=schemas.UserTodoList) @limiter.limit("30/minute") def set_user_todo_done( request: Request, todo: schemas.UserTodo, current= Depends(auth.get_current_user), db: Session = Depends(auth.get_db) ): db_todo = db.get(models.RehTodo, todo.id) db_todo.done = True db.commit() return get_todo_list(current["user_name"], db)
[Doku] @app.get("/version", response_model=dict) def get_version_dict(): return version_dict
[Doku] @app.get("/health") def health_check(db: Session = Depends(auth.get_db)): """Überprüft ob API und DB erreichbar sind""" try: # Einfache DB-Query zum Testen db.execute(text("SELECT 1")) return { "status": "ok", "database": "connected", "version": version_dict.get("release", "0.0.0") } except Exception as e: logger.error(f"Health check failed: {e}") raise HTTPException(status_code=503, detail="Service temporarily unavailable")
app.include_router(gigs.router) app.include_router(songs.router) app.include_router(rehearsals.router) app.include_router(surveys.router) app.include_router(cal.router) app.include_router(admin.router) app.include_router(password_reset.router) app.include_router(public.router) app.include_router(gigs_livemode.router)