Quellcode für backend.routers.rehearsals

"""
Rehearsal router.

Handles CRUD operations for rehearsals: creating and updating
rehearsal sessions, managing the per-rehearsal song list and
per-user to-do items.

Requires authentication. Create/update/delete operations additionally
require the ``editor`` or ``admin`` role.

Prefix: ``/rehearsals``  |  Tag: ``rehearsals``
"""

# libre-stage - Band rehearsal and gig management software
# Copyright (C) 2026  libre-stage contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

from fastapi import APIRouter, Depends, HTTPException, Response, Query, Path
from datetime import time, timedelta
import logging

from sqlalchemy.orm import Session
from typing import List

from backend import models, schemas, auth
from backend.utils.check_permissions import  check_editor, check_admin

import os
from dotenv import load_dotenv

router = APIRouter(
    prefix="/reh", tags=["rehearsals"], dependencies=[Depends(auth.get_current_user_dep)]
)

[Doku] def get_reh_list(db, limit=20, offset=0): return ( db.query(models.Rehearsal) .order_by(models.Rehearsal.begin.desc()) .offset(offset) .limit(limit) .all() )
logger = logging.getLogger("uvicorn.error") load_dotenv(".env") MM_CHANNEL = os.getenv("MM_CHANNEL_REH")
[Doku] @router.get("/", response_model = List[schemas.RehListElem]) def get_rehearsal_list( db: Session = Depends(auth.get_db), current=Depends(auth.get_current_user) ): reh_db = get_reh_list(db) if not reh_db: raise HTTPException(status_code=404, detail="No rehearsals found") return reh_db
[Doku] @router.post("/", response_model = List[schemas.RehListElem]) def create_new_rehearsal( data: schemas.NewRehDict, db: Session = Depends(auth.get_db), current=Depends(auth.get_current_user), ): end_dt = data.end or (data.begin + timedelta(hours=2)) logger.info( f"User {current['user_name']} creates new rehearsal from {data.begin} to {end_dt}" ) if current['user_group'] != 'admin' and current['user_group'] != 'editor': logger.error(f"Permission denied: User {current['user_name']} is not admin or editor") raise HTTPException(status_code=401, detail="User role does not allow to create a new rehearsal!") reh_to_add = models.Rehearsal( begin=data.begin, end=end_dt, ical= "", comment=data.comment, songs=[], ) db.add(reh_to_add) db.commit() # Send Mattermost Messages try: from backend.utils import mattermost message = ( f":mega: Neue Probe von {current['user_name']} erstellt.\n" f"\tDiese findet am {data.begin.strftime('%d.%m.%Y')} von " f"{data.begin.strftime('%H:%M')} bis {end_dt.strftime('%H:%M Uhr')} statt." ) if data.comment: message += f"\n> {data.comment}" mattermost.send_mm_message(channel=MM_CHANNEL, text=message) except Exception as e: logger.error(f"Failed to send Mattermost message: {e}") reh_db = get_reh_list(db) if not reh_db: raise HTTPException(status_code=404, detail="No rehearsals found") # pragma: no cover return reh_db
[Doku] @router.put("/", response_model = List[schemas.RehListElem]) def update_rehearsal( data: schemas.RehListElem, db: Session = Depends(auth.get_db), current=Depends(auth.get_current_user) ): logger.info(f"Update Rehearsal {data.id}") reh_tbu = db.query(models.Rehearsal).get(data.id) if not reh_tbu: raise HTTPException(status_code=404, detail="Rehearsal not found") # Primitive Felder updaten (alles außer Songs) for k, v in data.model_dump(exclude_unset=True, exclude={"songs"}).items(): setattr(reh_tbu, k, v) # Songs synchronisieren payload_songs = {s.id: s for s in (data.songs or []) if s.id} current_songs = {s.id: s for s in reh_tbu.songs} incoming_song_ids = set(payload_songs.keys()) added_song_ids = [] rehsong_fields = ["id_song", "comment", "todo", "done"] #song_fields = ["title", "interpret", "status", "setlist_comment"] song_field_map = { "setlist_comment": "comment", # API-Feld : DB-Feld "status": "status", # weitere Felder nach Bedarf } for s in (data.songs or []): if s.id in current_songs: db_song = current_songs[s.id] for field in rehsong_fields: if hasattr(s, field): setattr(db_song, field, getattr(s, field)) for src_field, db_field in song_field_map.items(): if hasattr(s, src_field): setattr(db_song.song, db_field, getattr(s, src_field)) else: logger.info(f"Add Song {s.id_song} to Rehearsal {s.id_rehearsal}") db_song = models.RehSong( id_rehearsal=reh_tbu.id, id_song=s.id_song, comment=s.comment, todo=s.todo, done=s.done ) db.add(db_song) db.flush() # Damit db_song.id zur Verfügung steht added_song_ids.append(db_song.id) #reh_tbu.songs.append(db_song) # Todos synchronisieren für jeden Song payload_todos = {t.id: t for t in (getattr(s, "song_todos", []) or []) if t.id} current_todos = {t.id: t for t in getattr(db_song, "todos", [])} incoming_todo_ids = set(payload_todos.keys()) added_todo_ids = [] # Update und Hinzufügen Todos for t in (getattr(s, "song_todos", []) or []): if t.id in current_todos: db_todo = current_todos[t.id] for attr in ["todo", "dt", "done"]: setattr(db_todo, attr, getattr(t, attr)) else: logger.info(f"add todo to song {db_song.id_song}, {t.id_reh}, {t.id_user}") db_todo = models.RehTodo( id_song=db_song.id_song, id_reh=t.id_reh, id_user=t.id_user, todo=t.todo, dt=t.dt, done=t.done ) db.add(db_todo) db.flush() added_todo_ids.append(db_todo.id) # Falls du SQLAlchemy-Relation benutzt: # db_song.todos.append(db_todo) (je nach deiner Konfiguration) # Todos entfernen, die nicht mehr enthalten sind all_valid_todo_ids = incoming_todo_ids.union(added_todo_ids) for todo in list(getattr(db_song, "todos", [])): if todo.id not in all_valid_todo_ids: logger.info(f"delete todo from song {db_song.id_song}, {todo.id_reh}, {todo.id_user}") db.delete(todo) all_valid_ids = incoming_song_ids.union(added_song_ids) # Songs entfernen, die nicht mehr vorhanden sind for song in list(reh_tbu.songs): if song.id not in all_valid_ids: logger.info (f"Delete Song: {song.id} from rehearsal {reh_tbu.id}") # Alle zugehörigen Todos ebenfalls löschen for todo in list(getattr(song, "todos", [])): db.delete(todo) db.delete(song) db.commit() db.refresh(reh_tbu) reh_db = get_reh_list(db) logger.info("Update finalized") return reh_db
[Doku] @router.delete("/{reh_id}", response_model=List[schemas.RehListElem]) def delete_rehearsal( reh_id: int = Path(..., description="ID der zu löschenden Probe"), db: Session = Depends(auth.get_db), current=Depends(auth.get_current_user) ): logger.info(f"User {current['user_name']} attempts to delete rehearsal {reh_id}") # Nur Admins dürfen löschen if not check_admin(current): logger.error(f"Permission denied: User {current['user_name']}") raise HTTPException(status_code=401, detail="User role does not allow to delete rehearsal!") reh_to_del = db.query(models.Rehearsal).get(reh_id) if not reh_to_del: raise HTTPException(status_code=404, detail="Rehearsal not found") # Alle verknüpften Songs und deren Todos löschen for song in list(reh_to_del.songs): for todo in list(getattr(song, "todos", [])): db.delete(todo) db.delete(song) db.delete(reh_to_del) db.commit() reh_db = get_reh_list(db) if not reh_db: raise HTTPException(status_code=404, detail="No rehearsals found") logger.info(f"Rehearsal {reh_id} deleted by user {current['user_name']}") return reh_db