# libre-stage - Band rehearsal and gig management software
# Copyright (C) 2026 libre-stage contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Song catalogue router.
Handles CRUD operations for songs in the band's repertoire, song
candidate proposals (member submissions), voting/feedback on
candidates, and per-song rehearsal and gig statistics.
Requires authentication. Create/update/delete operations additionally
require the ``editor`` or ``admin`` role.
Prefix: ``/songs`` | Tag: ``songs``
"""
from fastapi import APIRouter, Depends, HTTPException, Response, Query
from datetime import time, datetime
import logging
import os
from sqlalchemy.orm import Session, joinedload
from typing import List
from backend import models, schemas, auth
from backend.utils import mattermost
from backend.utils import audioscrawler
from dotenv import load_dotenv
from backend.utils.check_permissions import check_editor, check_admin
router = APIRouter(
prefix="/songs", tags=["songs"], dependencies=[Depends(auth.get_current_user_dep)]
)
logger = logging.getLogger("uvicorn.error")
# suppress progress polls to reduce log clutter
block_endpoints = ["/songs/log"]
# Mattermost stuff
load_dotenv(".env")
MM_CHANNEL = os.getenv("MM_CHANNEL_SONG_VOTES")
@router.get("/", response_model = List[schemas.SongOut])
def get_songs(db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user),
):
songs = db.query(models.Song).filter(models.Song.status != "vorschlag").all()
if not songs:
raise HTTPException(status_code=404, detail="No songs found")
return songs
[Doku]
@router.get("/candidates/", response_model = List[schemas.SongCandidateOut])
def get_song_candidates(db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user)
):
songs = db.query(models.Song).options(joinedload(models.Song.feedbacks)).filter(models.Song.status == "vorschlag").all()
if not songs:
raise HTTPException(status_code=404, detail="No song candidates found")
return songs
[Doku]
@router.get("/info/{song_id}", response_model = schemas.SongInSetOut)
def get_songs(song_id: int, db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user),
):
song = db.query(models.Song).get(song_id)
if not song:
raise HTTPException(status_code=404, detail="Song not found")
song = song.to_setlist_element()
return song
[Doku]
@router.get("/{song_id}/rehearsal_history", response_model=List[schemas.SongRehearsalHistoryEntry])
def get_song_rehearsal_history(
song_id: int,
limit: int = Query(default=3, ge=1, le=10),
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user),
):
"""Gibt die letzten N Proben zurück, in denen dieser Song geprobt wurde."""
song = db.query(models.Song).get(song_id)
if not song:
raise HTTPException(status_code=404, detail="Song not found")
# Alle RehSong-Einträge für diesen Song, absteigend nach Probe-Datum
reh_songs = (
db.query(models.RehSong)
.join(models.Rehearsal, models.RehSong.id_rehearsal == models.Rehearsal.id)
.filter(models.RehSong.id_song == song_id)
.order_by(models.Rehearsal.begin.desc())
.limit(limit)
.all()
)
result = []
for rs in reh_songs:
todos = (
db.query(models.RehTodo)
.filter(
models.RehTodo.id_song == song_id,
models.RehTodo.id_reh == rs.id_rehearsal,
)
.all()
)
result.append(schemas.SongRehearsalHistoryEntry(
rehearsal_id=rs.id_rehearsal,
rehearsal_date=rs.rehearsal.begin,
comment=rs.comment,
todo=rs.todo,
done=rs.done or False,
rehearsal_comment=rs.rehearsal.comment,
todos=[
schemas.SongRehearsalHistoryTodo(
id=t.id, id_user=t.id_user, todo=t.todo, done=t.done or False
)
for t in todos
],
))
return result
[Doku]
@router.get("/{song_id}/feedback", response_model=schemas.SongFeedbackSummary)
def get_song_feedback_history(
song_id: int,
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user),
):
"""Gibt anonymisierte Abstimmungssummen aus der Tabelle song_feedback zurück."""
song = db.query(models.Song).get(song_id)
if not song:
raise HTTPException(status_code=404, detail="Song not found")
feedback_rows = (
db.query(models.SongCandidateFeedback.feedback)
.filter(models.SongCandidateFeedback.song_id == song_id)
.all()
)
yes_votes = 0
no_votes = 0
abstain_votes = 0
unknown_votes = 0
for (vote,) in feedback_rows:
if vote == 'a':
yes_votes += 1
elif vote == 'na':
no_votes += 1
elif vote == 'o':
abstain_votes += 1
else:
unknown_votes += 1
return schemas.SongFeedbackSummary(
song_id=song_id,
total_votes=len(feedback_rows),
yes_votes=yes_votes,
no_votes=no_votes,
abstain_votes=abstain_votes,
unknown_votes=unknown_votes,
)
[Doku]
@router.get("/{song_id}/statistics", response_model=schemas.SongStatistics)
def get_song_statistics(
song_id: int,
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user),
):
"""Gibt umfangreiche Statistiken über einen Song zurück."""
song = db.query(models.Song).get(song_id)
if not song:
raise HTTPException(status_code=404, detail="Song not found")
# --- PROBEN-STATISTIKEN ---
reh_songs = (
db.query(models.RehSong)
.join(models.Rehearsal, models.RehSong.id_rehearsal == models.Rehearsal.id)
.filter(models.RehSong.id_song == song_id)
.order_by(models.Rehearsal.begin.asc())
.all()
)
rehearsal_count = len(reh_songs)
first_rehearsal = reh_songs[0].rehearsal.begin.strftime('%Y-%m-%d') if reh_songs else None
last_rehearsal = reh_songs[-1].rehearsal.begin.strftime('%Y-%m-%d') if reh_songs else None
# --- GIG-STATISTIKEN ---
# Finde alle SetSongs für diesen Song, mit zugehörigem Set → GigSet → Gig
set_songs = (
db.query(models.SetSong)
.filter(models.SetSong.id_song == song_id)
.all()
)
gigs_played = []
seen_gig_ids = set()
feedback_values = []
skipped_count = 0
inserted_count = 0
for ss in set_songs:
# Finde GigSet für dieses Set
gig_set = (
db.query(models.GigSet)
.filter(models.GigSet.id_set == ss.id_set)
.first()
)
if not gig_set:
continue
gig = gig_set.gig
if gig.id not in seen_gig_ids:
seen_gig_ids.add(gig.id)
gigs_played.append(schemas.GigPlayedEntry(
gig_id=gig.id,
gig_name=gig.name,
gig_date=gig.datum.strftime('%Y-%m-%d') if gig.datum else '',
feedback=ss.feedback,
uebersprungen=ss.uebersprungen,
eingeschoben=ss.eingeschoben,
))
if ss.feedback is not None:
feedback_values.append(ss.feedback)
if ss.uebersprungen:
skipped_count += 1
if ss.eingeschoben:
inserted_count += 1
# Sortiere Gigs nach Datum absteigend
gigs_played.sort(key=lambda g: g.gig_date, reverse=True)
# Feedback-Verteilung
feedback_distribution = {}
for fv in feedback_values:
feedback_distribution[fv] = feedback_distribution.get(fv, 0) + 1
feedback_avg = round(sum(feedback_values) / len(feedback_values), 2) if feedback_values else None
# --- HÄUFIGE SET-BEGLEITER ---
# Finde alle Sets, in denen dieser Song vorkommt
set_ids = [ss.id_set for ss in set_songs]
companion_counts = {}
if set_ids:
companions = (
db.query(models.SetSong)
.filter(
models.SetSong.id_set.in_(set_ids),
models.SetSong.id_song != song_id,
)
.all()
)
for c in companions:
if c.id_song not in companion_counts:
companion_counts[c.id_song] = 0
companion_counts[c.id_song] += 1
# Top 10 Begleiter
top_companions = sorted(companion_counts.items(), key=lambda x: x[1], reverse=True)[:10]
companion_songs = []
for comp_song_id, count in top_companions:
comp_song = db.query(models.Song).get(comp_song_id)
if comp_song:
companion_songs.append(schemas.CompanionSong(
song_id=comp_song.id,
title=comp_song.title,
interpret=comp_song.interpret or '',
count=count,
))
return schemas.SongStatistics(
rehearsal_count=rehearsal_count,
first_rehearsal=first_rehearsal,
last_rehearsal=last_rehearsal,
gig_count=len(gigs_played),
gigs_played=gigs_played,
feedback_count=len(feedback_values),
feedback_avg=feedback_avg,
feedback_distribution=feedback_distribution,
skipped_count=skipped_count,
inserted_count=inserted_count,
companion_songs=companion_songs,
)
[Doku]
@router.put("/{song_id}", response_model=schemas.SongOut)
def update_song(song_id: int, song: schemas.SongIn, db: Session = Depends(auth.get_db), current=Depends(
auth.get_current_user)):
db_song = db.query(models.Song).get(song_id)
if not db_song:
raise HTTPException(status_code=404, detail="Song not found")
logger.info(f"Updating song ID {song_id} with data: {song.model_dump(exclude_unset=True)} by user {current['user_name']}")
for k, v in song.model_dump(exclude_unset=True).items():
if k == "brass" and v is not None:
v = int(v)
if k == "duration" and isinstance(v, str):
# Fallback-Konvertierung
h, m, s = map(int, v.split(":")) # pragma: no cover
v = time(hour=h, minute=m, second=s)# pragma: no cover
if k == "ytlink" and v in ("None", "", None):
v = None
setattr(db_song, k, v)
db.commit()
db.refresh(db_song)
return db_song
[Doku]
@router.delete("/{song_id}", response_model=schemas.SongOut)
def delete_song(
song_id: int,
db: Session = Depends(auth.get_db), current=Depends(
auth.get_current_user)
):
db_song = db.query(models.Song).get(song_id)
if not check_editor(current):
raise HTTPException(status_code=403, detail="Not authorized to delete songs")
if not db_song:
raise HTTPException(status_code=404, detail="Song not found")
logger.info(f"Deleting song ID {song_id} by user {current['user_name']}")
#set db_song.status to 'retired' instead of deleting
db_song.status = 'retired'
db.commit()
return db_song
[Doku]
@router.post("/", response_model=schemas.SongOut)
def create_song(
song: schemas.SongIn,
db: Session = Depends(auth.get_db), current=Depends(
auth.get_current_user)
):
new_song = models.Song(
title=song.title,
interpret=song.interpret,
genre=song.genre,
singer_background=song.singer_background,
singer_lead=song.singer_lead,
composer=song.composer,
texter=song.texter,
publisher=song.publisher,
arrangement=song.arrangement,
tone_key=song.tone_key,
status=song.status,
comment=song.comment,
ytlink=song.ytlink,
brass=int(song.brass) if song.brass is not None else 0,
text=song.text,
duration=song.duration
)
logger.info(f"Creating new song '{song.title}' by user {current['user_name']}")
db.add(new_song)
db.commit()
db.refresh(new_song)
# Notify Mattermost about the new song
try:
mattermost.send_mm_message(f" :mega: Neuer Songvorschlag:\n\t**{new_song.title}** von **{new_song.interpret}** wurde hinzugefügt.\n\tYoutube: [klick]({new_song.ytlink})\nBitte gib im internen Bereich deine Stimme ab.", channel=MM_CHANNEL)
except Exception as e:
logger.error(f"Fehler beim Senden der Benachrichtigung an Mattermost: {e}")
return new_song
[Doku]
@router.put("/candidates/feedback/{song_id}", response_model=List[schemas.SongFeedbackBase])
def update_song_feedback(
song_id: int,
feedback: List[schemas.SongFeedbackIn],
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user)
):
song = db.query(models.Song).get(song_id)
if not song:
raise HTTPException(status_code=404, detail="Song candidate not found")
if song.status != "vorschlag":
raise HTTPException(status_code=400, detail="Feedback can only be added to song candidates")
# Alle bestehenden Feedbacks für diesen Song abrufen
existing_feedbacks = {
fb.user_id: fb
for fb in db.query(models.SongCandidateFeedback)
.filter(models.SongCandidateFeedback.song_id == song_id)
.all()
}
# Neue Feedbacks verarbeiten
new_feedback_user_ids = set()
for feedback_data in feedback:
new_feedback_user_ids.add(feedback_data.user_id)
existing_fb = existing_feedbacks.get(feedback_data.user_id)
if existing_fb:
# Feedback existiert bereits - prüfe ob es sich geändert hat
if existing_fb.feedback != feedback_data.feedback:
# Feedback hat sich geändert - aktualisiere es
existing_fb.feedback = feedback_data.feedback
existing_fb.date = datetime.now()
else:
# Neues Feedback - erstelle es
new_feedback = models.SongCandidateFeedback(
song_id=song_id,
user_id=feedback_data.user_id,
feedback=feedback_data.feedback,
date=datetime.now()
)
db.add(new_feedback)
# Feedbacks löschen, die nicht mehr vorhanden sind
for user_id, old_fb in existing_feedbacks.items():
if user_id not in new_feedback_user_ids:
db.delete(old_fb)
# Update song feedbacks from db
db.commit()
db.refresh(song)
return song.feedbacks
[Doku]
@router.put("/candidates/accept/{song_id}", response_model=List[schemas.SongOut])
def accept_song_candidate(
song_id: int,
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user)
):
if not check_editor(current):
raise HTTPException(status_code=403, detail="Not authorized to accept song candidates")
song = db.query(models.Song).get(song_id)
if not song:
raise HTTPException(status_code=404, detail="Song candidate not found")
if song.status != "vorschlag":
raise HTTPException(status_code=400, detail="Only song candidates can be accepted")
song.status = "angenommen"
db.commit()
db.refresh(song)
# Mattermost notification
try:
mattermost.send_mm_message(f" :tada: Der Songvorschlag **{song.title}** von **{song.interpret}** wurde angenommen! :tada:", channel=MM_CHANNEL)
except Exception as e:
logger.error(f"Fehler beim Senden der Benachrichtigung an Mattermost: {e}")
return db.query(models.Song).filter(models.Song.status != "vorschlag").all()
[Doku]
@router.get("/singers", response_model=List[str])
def get_singers(
db: Session = Depends(auth.get_db),
):
users = db.query(models.User).filter(models.User.is_singer == 1).all()
output = [u.clear_name for u in users]
return output
[Doku]
@router.get("/crawler/metadata", response_model=schemas.SongScrawlOut)
def get_song_scrawls(
interpret: str = Query(..., min_length=1),
title: str = Query(..., min_length=1),
current=Depends(auth.get_current_user),
):
data = audioscrawler.search_track_musicbrainz(interpret=interpret, title=title)
if not data:
raise HTTPException(status_code=404, detail="No metadata found")
composers = sorted(set(data.get("composers") or []))
lyricists = sorted(set(data.get("lyricists") or []))
return schemas.SongScrawlOut(
recording_id=data.get("recording_id"),
work_id=data.get("work_id"),
duration=data.get("duration"),
ytlink=data.get("ytlink"),
composers=composers,
lyricists=lyricists,
composer=", ".join(composers) if composers else None,
texter=", ".join(lyricists) if lyricists else None,
)