"""
Gig management router.
Handles CRUD operations for gigs, set and setlist management, PDF
export of printed setlists and the gig schedule calculation.
Requires authentication. Create/update/delete operations additionally
require the ``editor`` or ``admin`` role.
Prefix: ``/gigs`` | Tag: ``gigs``
"""
# libre-stage - Band rehearsal and gig management software
# Copyright (C) 2026 libre-stage contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from fastapi import APIRouter, Depends, HTTPException, Response, Query
from fastapi.responses import StreamingResponse
from datetime import date, datetime, time, timedelta
import logging
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from pathlib import Path
from typing import List, Literal
from backend.pdf.generator import SetlistPDF
from backend.services.setlist import SetlistService
from backend.utils.check_permissions import check_admin, check_editor
from backend.utils.pdf_palette import find_logo_path, resolve_schedule_palette
from backend.utils.setlist_timing import calculate_setlist_timing, serialize_timing_for_api
from backend import models, schemas, auth, app_config
import openpyxl
from openpyxl.styles import Font, Alignment
from io import BytesIO
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4
from reportlab.lib.utils import ImageReader
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfgen import canvas
import os
from dotenv import load_dotenv
router = APIRouter(
prefix="/gigs", tags=["gigs"], dependencies=[Depends(auth.get_current_user_dep)]
)
logger = logging.getLogger("uvicorn.error")
# suppress progress polls to reduce log clutter
block_endpoints = ["/gigs/log"]
load_dotenv(".env")
MM_CHANNEL = os.getenv("MM_CHANNEL_GIGS")
[Doku]
class LogFilter(logging.Filter): # pragma: no cover
[Doku]
def filter(self, record):
if record.args and len(record.args) >= 3:
if record.args[2] in block_endpoints: # type: ignore
return False
return True
uvicorn_logger = logging.getLogger("uvicorn.access")
uvicorn_logger.addFilter(LogFilter())
DEFAULT_SCHEDULE_PALETTE = {
"bg": colors.HexColor("#0B1220"),
"card": colors.HexColor("#111827"),
"primary": colors.HexColor("#60A5FA"),
"text": colors.HexColor("#E2E8F0"),
"muted": colors.HexColor("#94A3B8"),
"header_bg": colors.HexColor("#1E293B"),
"row_alt": colors.HexColor("#0F172A"),
"line": colors.HexColor("#334155"),
"fixed": colors.HexColor("#123129"),
}
GENRE_PALETTE = [
'#3b82f6', '#10b981', '#f59e0b', '#ef4444', '#8b5cf6',
'#06b6d4', '#14b8a6', '#f97316', '#ec4899', '#84cc16',
'#6366f1', '#22c55e', '#f43f5e', '#eab308', '#0ea5e9',
'#a855f7', '#ef6c00', '#00acc1', '#7cb342', '#d81b60',
'#3949ab', '#00897b', '#c0ca33', '#5e35b1', '#039be5'
]
def _build_rollover_datetimes(base_date: date, values: list[tuple[str, time]]) -> list[tuple[str, datetime]]:
"""Build a monotonic timeline and roll over to next day after midnight."""
result: list[tuple[str, datetime]] = []
day_offset = 0
last_time: time | None = None
for label, t in values:
if last_time is not None and t < last_time:
day_offset += 1
dt = datetime.combine(base_date + timedelta(days=day_offset), t)
result.append((label, dt))
last_time = t
return result
def _fixed_schedule_points(gig: models.Gig) -> list[tuple[str, datetime]]:
"""Build fixed schedule timestamps from gig base data (naive UTC)."""
fixed_points: list[tuple[str, datetime]] = []
if gig.datum is None:
return fixed_points
base_date = gig.datum if isinstance(gig.datum, date) else date.fromisoformat(str(gig.datum))
ordered_times = [
("Einlass", gig.doors),
("Beginn", gig.begin),
("Ende", gig.end),
]
present_times = [(label, t) for label, t in ordered_times if t is not None]
return _build_rollover_datetimes(base_date, present_times)
def _build_schedule_response(gig: models.Gig) -> schemas.GigScheduleOut:
fixed_items = [
schemas.GigScheduleItemOut(
id=None,
gig_id=gig.id,
item_datetime=item_dt,
was=label,
wer="",
wo="",
is_fixed=True,
)
for label, item_dt in _fixed_schedule_points(gig)
]
dynamic_items = [
schemas.GigScheduleItemOut.model_validate(item, from_attributes=True)
for item in gig.schedule_items
]
all_items = fixed_items + dynamic_items
all_items.sort(key=lambda item: (item.item_datetime, 0 if item.is_fixed else 1))
return schemas.GigScheduleOut(items=all_items)
def _raise_if_schedule_conflict(
db: Session,
gig: models.Gig,
item_datetime: datetime,
exclude_item_id: int | None = None,
) -> None:
existing_query = db.query(models.GigScheduleItem).filter(
models.GigScheduleItem.gig_id == gig.id,
models.GigScheduleItem.item_datetime == item_datetime,
)
if exclude_item_id is not None:
existing_query = existing_query.filter(models.GigScheduleItem.id != exclude_item_id)
if existing_query.first() is not None:
raise HTTPException(
status_code=409,
detail="Zeitpunkt kollidiert mit einem vorhandenen Ablaufplan-Eintrag.",
)
for label, fixed_dt in _fixed_schedule_points(gig):
if fixed_dt == item_datetime:
raise HTTPException(
status_code=409,
detail=f"Zeitpunkt kollidiert mit festem Eintrag: {label}.",
)
def _build_schedule_pdf(gig: models.Gig) -> bytes:
"""Render schedule items (fixed + dynamic) to a printable portrait PDF."""
schedule = _build_schedule_response(gig)
buffer = BytesIO()
pdf = canvas.Canvas(buffer, pagesize=A4, pageCompression=0)
page_width, page_height = A4
left_margin = 36
right_margin = page_width - 36
bottom_margin = 52
table_header_height = 20
row_padding = 4
row_line_height = 12
table_width = right_margin - left_margin
time_col_width = 96
was_col_width = 180
wer_col_width = 110
wo_col_width = max(90, table_width - time_col_width - was_col_width - wer_col_width)
was_col_width = max(140, table_width - time_col_width - wer_col_width - wo_col_width)
columns = [
("Zeit", left_margin, time_col_width),
("Was", left_margin + time_col_width, was_col_width),
("Wer", left_margin + time_col_width + was_col_width, wer_col_width),
(
"Wo",
left_margin + time_col_width + was_col_width + wer_col_width,
wo_col_width,
),
]
footer_prefix = "Generiert mit libreStage | "
footer_domain = "pakleds-patentoffice.de"
footer_url = "https://pakleds-patentoffice.de"
footer_text = f"{footer_prefix}{footer_domain}"
root_dir = Path(__file__).resolve().parents[2]
logo_path = find_logo_path({"root_dir": root_dir})
logo_reader: ImageReader | None = None
logo_size: tuple[float, float] | None = None
palette = resolve_schedule_palette(
{
"default_palette": DEFAULT_SCHEDULE_PALETTE,
"logo_path": logo_path,
}
)
def _get_logo_reader() -> ImageReader | None:
nonlocal logo_reader, logo_size
if logo_reader is not None:
return logo_reader
if logo_path is None:
return None
try:
logo_reader = ImageReader(str(logo_path))
logo_size = logo_reader.getSize()
return logo_reader
except Exception: # pragma: no cover - logo rendering should not break export
logger.warning("Could not load logo for schedule PDF", exc_info=True)
return None
def _format_time(value: time | None) -> str:
return value.strftime("%H:%M") if value else "-"
def _wrap_text(text: str, max_width: float, font_name: str, font_size: int) -> list[str]:
content = (text or "-").strip() or "-"
words = content.split()
if not words:
return ["-"]
lines: list[str] = []
current = ""
for word in words:
trial = f"{current} {word}" if current else word
if pdfmetrics.stringWidth(trial, font_name, font_size) <= max_width:
current = trial
continue
if current:
lines.append(current)
current = ""
# Break very long tokens that do not fit into one column line.
if pdfmetrics.stringWidth(word, font_name, font_size) <= max_width:
current = word
continue
chunk = ""
for char in word:
trial_chunk = f"{chunk}{char}"
if pdfmetrics.stringWidth(trial_chunk, font_name, font_size) <= max_width:
chunk = trial_chunk
else:
if chunk:
lines.append(chunk)
chunk = char
current = chunk
if current:
lines.append(current)
return lines
def draw_table_header(y: float) -> float:
pdf.setFillColor(palette["header_bg"])
pdf.roundRect(left_margin, y - table_header_height + 4, right_margin - left_margin, table_header_height, 4, stroke=0, fill=1)
pdf.setFont("Helvetica-Bold", 10)
pdf.setFillColor(palette["text"])
for title, x, _ in columns:
pdf.drawString(x + 4, y - 10, title)
pdf.setStrokeColor(palette["line"])
pdf.setLineWidth(0.8)
pdf.line(left_margin, y - table_header_height, right_margin, y - table_header_height)
return y - table_header_height - 6
def draw_watermark() -> None:
img = _get_logo_reader()
if img is None or not logo_size:
return
img_width, img_height = logo_size
max_width = page_width * 0.55
max_height = page_height * 0.55
ratio = min(max_width / img_width, max_height / img_height)
render_w = img_width * ratio
render_h = img_height * ratio
x = (page_width - render_w) / 2
y = (page_height - render_h) / 2 - 28
pdf.saveState()
# Keep watermark subtle so table data stays readable.
if hasattr(pdf, "setFillAlpha"):
pdf.setFillAlpha(0.05)
pdf.setStrokeAlpha(0.05)
pdf.drawImage(
img,
x,
y,
width=render_w,
height=render_h,
preserveAspectRatio=True,
mask="auto",
)
pdf.restoreState()
def draw_footer() -> None:
pdf.setFillColor(palette["muted"])
pdf.setFont("Helvetica", 8)
footer_y = 20
footer_font = "Helvetica"
footer_size = 8
full_width = pdfmetrics.stringWidth(footer_text, footer_font, footer_size)
prefix_width = pdfmetrics.stringWidth(footer_prefix, footer_font, footer_size)
start_x = (page_width - full_width) / 2
pdf.drawString(start_x, footer_y, footer_prefix)
pdf.setFillColor(palette["primary"])
pdf.drawString(start_x + prefix_width, footer_y, footer_domain)
pdf.linkURL(
footer_url,
(start_x + prefix_width, footer_y - 2, start_x + full_width, footer_y + footer_size + 1),
relative=0,
thickness=0,
)
def draw_header() -> float:
pdf.setFillColor(palette["bg"])
pdf.rect(0, 0, page_width, page_height, stroke=0, fill=1)
draw_watermark()
draw_footer()
card_height = 134
card_y = page_height - 26 - card_height
pdf.setFillColor(palette["card"])
pdf.roundRect(left_margin, card_y, right_margin - left_margin, card_height, 8, stroke=0, fill=1)
pdf.setStrokeColor(palette["line"])
pdf.setLineWidth(0.8)
pdf.roundRect(left_margin, card_y, right_margin - left_margin, card_height, 8, stroke=1, fill=0)
pdf.setFillColor(palette["primary"])
pdf.rect(left_margin, card_y + card_height - 8, right_margin - left_margin, 8, stroke=0, fill=1)
img = _get_logo_reader()
if img is not None and logo_size:
try:
img_width, img_height = logo_size
max_logo_width = 150
max_logo_height = 50
ratio = min(max_logo_width / img_width, max_logo_height / img_height)
render_w = img_width * ratio
render_h = img_height * ratio
pdf.drawImage(
img,
right_margin - 14 - render_w,
card_y + card_height - 20 - render_h,
width=render_w,
height=render_h,
preserveAspectRatio=True,
mask="auto",
)
except Exception: # pragma: no cover - logo rendering should not break export
logger.warning("Could not render logo in schedule PDF", exc_info=True)
title_x = left_margin + 14
title_y = card_y + card_height - 26
pdf.setFillColor(palette["text"])
pdf.setFont("Helvetica-Bold", 18)
pdf.drawString(title_x, title_y, "Ablaufplan")
pdf.setFont("Helvetica", 16)
pdf.setFillColor(palette["muted"])
pdf.drawString(title_x, title_y - 20, f"Gig: {gig.name}")
pdf.setFont("Helvetica", 11)
date_str = gig.datum.strftime("%d.%m.%Y") if gig.datum else "-"
base_fields = [
f"Datum: {date_str}",
f"Art: {gig.kind_of_gig or '-'}",
f"Veranstalter: {gig.organizer or '-'}",
f"Ort: {gig.venue or '-'}",
f"Einlass/Beginn/Ende: {_format_time(gig.doors)} / {_format_time(gig.begin)} / {_format_time(gig.end)}",
f"Export: {datetime.now().strftime('%d.%m.%Y %H:%M')}",
]
info_y = title_y - 36
pdf.setFillColor(palette["text"])
pdf.setFont("Helvetica", 11)
for field in base_fields:
pdf.drawString(title_x, info_y, field)
info_y -= 13
return draw_table_header(card_y - 11)
schedule_font_name = "Helvetica"
schedule_font_size = 9
font_ascent = pdfmetrics.getAscent(schedule_font_name)
font_descent = pdfmetrics.getDescent(schedule_font_name)
baseline_center_offset = ((font_ascent + font_descent) / 2000.0) * schedule_font_size
y = draw_header()
pdf.setFont(schedule_font_name, schedule_font_size)
row_index = 0
for item in schedule.items:
dt_str = item.item_datetime.strftime("%d.%m.%Y %H:%M")
was_str = item.was + (" [fix]" if item.is_fixed else "")
wrapped = {
"Zeit": _wrap_text(dt_str, columns[0][2] - 4, "Helvetica", 9),
"Was": _wrap_text(was_str, columns[1][2] - 4, "Helvetica", 9),
"Wer": _wrap_text(item.wer or "-", columns[2][2] - 4, "Helvetica", 9),
"Wo": _wrap_text(item.wo or "-", columns[3][2] - 4, "Helvetica", 9),
}
row_lines = max(len(lines) for lines in wrapped.values())
row_height = row_lines * row_line_height + row_padding * 2
if y - row_height < bottom_margin:
pdf.showPage()
y = draw_header()
pdf.setFont(schedule_font_name, schedule_font_size)
row_top = y + 4
row_bottom = row_top - row_height
if item.is_fixed:
fill = palette["fixed"]
elif row_index % 2:
fill = palette["row_alt"]
else:
fill = None
if fill is not None:
pdf.setFillColor(fill)
pdf.rect(left_margin, row_bottom, right_margin - left_margin, row_height, stroke=0, fill=1)
for idx in range(row_lines):
line_center = row_top - row_padding - (idx * row_line_height) - (row_line_height / 2)
line_y = line_center - baseline_center_offset
for title, x, _ in columns:
lines = wrapped[title]
if idx < len(lines):
if title == "Zeit":
pdf.setFillColor(palette["muted"])
else:
pdf.setFillColor(palette["text"])
pdf.drawString(x + 4, line_y, lines[idx])
y -= row_height
pdf.setStrokeColor(palette["line"])
pdf.setLineWidth(0.5)
pdf.line(left_margin, y + 2, right_margin, y + 2)
row_index += 1
pdf.save()
buffer.seek(0)
return buffer.getvalue()
[Doku]
@router.get("/", response_model=List[schemas.GigOut])
def list_gigs(
db: Session = Depends(auth.get_db),
current_user=Depends(auth.get_current_user),
jahr: int = Query(None, description="Das Jahr der Gigs"),
):
logger.info("Fetching gigs from database")
query = db.query(models.Gig).order_by(models.Gig.datum.desc())
if jahr is not None:
# Wir gehen davon aus, dass 'datum' als 'YYYY-MM-DD' (Text) gespeichert ist
query = query.filter(models.Gig.datum.startswith(str(jahr)))
return query.all()
[Doku]
@router.get("/statistics", response_model=schemas.SeasonStatistics)
def get_season_statistics(
jahr: int = Query(None, description="Jahr für die Saisonstatistik"),
db: Session = Depends(auth.get_db),
current_user=Depends(auth.get_current_user),
):
"""Gibt aggregierte Statistiken für alle Gigs eines Jahres zurück."""
from datetime import date
query = db.query(models.Gig).order_by(models.Gig.datum.asc())
if jahr is not None:
query = query.filter(models.Gig.datum.startswith(str(jahr)))
gigs = query.all()
gig_count = len(gigs)
played_gig_count = sum(1 for gig in gigs if gig.datum and gig.datum < date.today())
total_songs = 0
unique_song_ids = set()
skipped_count = 0
inserted_count = 0
feedback_values = []
song_play_counts = {} # song_id -> count
gigs_overview = []
genre_counts = {} # genre -> count
genre_timeline = []
for gig in gigs:
gig_songs = 0
gig_skipped = 0
gig_inserted = 0
gig_feedbacks = []
gig_genre_counts = {}
for gigset in gig.sets:
set_obj = gigset.set
if not set_obj:
continue
for setsong in set_obj.songs:
total_songs += 1
gig_songs += 1
if setsong.id_song:
unique_song_ids.add(setsong.id_song)
song_play_counts[setsong.id_song] = song_play_counts.get(setsong.id_song, 0) + 1
if setsong.uebersprungen:
skipped_count += 1
gig_skipped += 1
if setsong.eingeschoben:
inserted_count += 1
gig_inserted += 1
if setsong.feedback is not None:
feedback_values.append(setsong.feedback)
gig_feedbacks.append(setsong.feedback)
# Genre zählen
if setsong.song and setsong.song.genre:
genre = setsong.song.genre.strip()
if genre:
genre_counts[genre] = genre_counts.get(genre, 0) + 1
gig_genre_counts[genre] = gig_genre_counts.get(genre, 0) + 1
if gig_genre_counts:
genre_timeline.append(schemas.GenreTimelinePoint(
label=gig.name,
date=gig.datum.strftime('%Y-%m-%d') if gig.datum else None,
kind_of_gig=gig.kind_of_gig,
genre_counts=gig_genre_counts,
total=sum(gig_genre_counts.values()),
))
gig_feedback_avg = round(sum(gig_feedbacks) / len(gig_feedbacks), 2) if gig_feedbacks else None
gigs_overview.append(schemas.GigOverviewEntry(
gig_id=gig.id,
gig_name=gig.name,
gig_date=gig.datum.strftime('%Y-%m-%d') if gig.datum else '',
song_count=gig_songs,
skipped_count=gig_skipped,
inserted_count=gig_inserted,
feedback_avg=gig_feedback_avg,
))
feedback_distribution = {}
for fv in feedback_values:
feedback_distribution[fv] = feedback_distribution.get(fv, 0) + 1
feedback_avg = round(sum(feedback_values) / len(feedback_values), 2) if feedback_values else None
# Top 10 Songs
top_song_entries = sorted(song_play_counts.items(), key=lambda x: x[1], reverse=True)[:10]
top_songs = []
for song_id, count in top_song_entries:
song = db.query(models.Song).get(song_id)
if song:
top_songs.append(schemas.TopSongEntry(
song_id=song.id,
title=song.title,
interpret=song.interpret or '',
count=count,
))
return schemas.SeasonStatistics(
jahr=jahr,
gig_count=gig_count,
played_gig_count=played_gig_count,
total_songs=total_songs,
unique_songs=len(unique_song_ids),
skipped_count=skipped_count,
inserted_count=inserted_count,
feedback_count=len(feedback_values),
feedback_avg=feedback_avg,
feedback_distribution=feedback_distribution,
genre_distribution=genre_counts,
genre_timeline=genre_timeline,
top_songs=top_songs,
gigs_overview=gigs_overview,
)
[Doku]
@router.get("/genre_palette", response_model=schemas.GenrePaletteOut)
def get_genre_palette(
db: Session = Depends(auth.get_db),
current_user=Depends(auth.get_current_user),
):
"""Return a deterministic global genre->color map for all available genres."""
genres_by_key: dict[str, str] = {}
def _register_genre(value: str | None) -> None:
if not isinstance(value, str):
return
text = value.strip()
if not text:
return
key = text.casefold()
genres_by_key.setdefault(key, text)
# Prefer global configured genres when available.
soft_config = app_config.get_soft_config()
for entry in soft_config.get("genres", []):
if isinstance(entry, str):
_register_genre(entry)
elif isinstance(entry, dict):
_register_genre(entry.get("label") or entry.get("key"))
# Add any legacy/ad-hoc genres present in DB songs.
db_genres = db.query(models.Song.genre).filter(models.Song.genre.isnot(None)).distinct().all()
for (genre,) in db_genres:
_register_genre(genre)
sorted_keys = sorted(genres_by_key.keys(), key=lambda k: k.casefold())
palette = {
genres_by_key[key]: GENRE_PALETTE[idx % len(GENRE_PALETTE)]
for idx, key in enumerate(sorted_keys)
}
return schemas.GenrePaletteOut(palette=palette)
[Doku]
@router.post("/livemode_available_batch")
def get_livemode_available_batch(
gig_ids: List[int],
db: Session = Depends(auth.get_db),
current_user=Depends(auth.get_current_user)
):
"""
Batch-Endpoint: Gibt Live-Mode-Status für mehrere Gigs auf einmal zurück.
Body: { "gig_ids": [1, 2, 3, ...] }
Returns: { "1": {...}, "2": {...}, ... }
"""
from datetime import date
# Normale User haben KEINEN Zugriff
is_admin = check_admin(current_user)
if not is_admin:
# Gebe für alle Gigs "not available" zurück
return {
str(gig_id): {
"available": False,
"reason": "insufficient_permissions",
"can_force": False
}
for gig_id in gig_ids
}
# Hole alle Gigs in einem Query
gigs = db.query(models.Gig).filter(models.Gig.id.in_(gig_ids)).all()
gigs_by_id = {gig.id: gig for gig in gigs}
today = date.today()
result = {}
for gig_id in gig_ids:
gig = gigs_by_id.get(gig_id)
if not gig:
result[str(gig_id)] = {
"available": False,
"reason": "gig_not_found",
"can_force": False
}
continue
# gig.datum ist bereits ein date-Objekt
gig_date = gig.datum if isinstance(gig.datum, date) else date.fromisoformat(str(gig.datum))
is_gig_day = today == gig_date
result[str(gig_id)] = {
"available": is_gig_day,
"reason": "gig_day" if is_gig_day else "not_gig_day",
"can_force": True,
"gig_date": gig_date.isoformat()
}
return result
[Doku]
@router.get("/{gig_id}/livemode_available")
def is_livemode_available(
gig_id: int,
force: bool = Query(False, description="Editor/Admin override"),
db: Session = Depends(auth.get_db),
current_user=Depends(auth.get_current_user)
):
from datetime import date
gig = db.query(models.Gig).get(gig_id)
if not gig:
raise HTTPException(status_code=404, detail="Gig not found")
# Normale User haben KEINEN Zugriff
is_editor_or_admin = check_editor(current_user)
if not is_editor_or_admin:
return {
"available": False,
"reason": "insufficient_permissions",
"can_force": False
}
# Editor/Admin mit Force-Flag
if force:
return {
"available": True,
"forced": True,
"reason": "manually_unlocked"
}
# Automatische Verfügbarkeit: Gig-Tag
today = date.today()
# gig.datum ist bereits ein date-Objekt
gig_date = gig.datum if isinstance(gig.datum, date) else date.fromisoformat(str(gig.datum))
is_gig_day = today == gig_date
return {
"available": is_gig_day,
"reason": "gig_day" if is_gig_day else "not_gig_day",
"can_force": True,
"gig_date": gig_date.isoformat()
}
[Doku]
@router.get("/{gig_id}/statistics", response_model=schemas.GigStatistics)
def get_gig_statistics(
gig_id: int,
db: Session = Depends(auth.get_db),
current_user=Depends(auth.get_current_user),
):
"""Gibt detaillierte Statistiken für einen einzelnen Gig zurück."""
gig = db.query(models.Gig).get(gig_id)
if not gig:
raise HTTPException(status_code=404, detail="Gig nicht gefunden")
song_count = 0
skipped_count = 0
inserted_count = 0
feedback_values = []
set_entries = []
genre_counts = {} # genre -> count
genre_timeline = []
for gigset in gig.sets:
set_obj = gigset.set
if not set_obj:
continue
set_songs = []
set_feedbacks = []
set_genre_counts = {}
for setsong in sorted(set_obj.songs, key=lambda s: s.position):
song = setsong.song
title = song.title if song else "⚠️ Song gelöscht"
interpret = (song.interpret or '') if song else ''
song_id = setsong.id_song or 0
song_count += 1
if setsong.uebersprungen:
skipped_count += 1
if setsong.eingeschoben:
inserted_count += 1
if setsong.feedback is not None:
feedback_values.append(setsong.feedback)
set_feedbacks.append(setsong.feedback)
# Genre zählen
if song and song.genre:
genre = song.genre.strip()
if genre:
genre_counts[genre] = genre_counts.get(genre, 0) + 1
set_genre_counts[genre] = set_genre_counts.get(genre, 0) + 1
set_songs.append(schemas.GigStatsSongEntry(
song_id=song_id,
title=title,
interpret=interpret,
position=setsong.position,
feedback=setsong.feedback,
uebersprungen=setsong.uebersprungen,
eingeschoben=setsong.eingeschoben,
))
set_feedback_avg = round(sum(set_feedbacks) / len(set_feedbacks), 2) if set_feedbacks else None
if set_genre_counts:
genre_timeline.append(schemas.GenreTimelinePoint(
label=set_obj.setlist_name or set_obj.name or f"Set {len(set_entries) + 1}",
date=gig.datum.strftime('%Y-%m-%d') if gig.datum else None,
kind_of_gig=gig.kind_of_gig,
genre_counts=set_genre_counts,
total=sum(set_genre_counts.values()),
))
set_entries.append(schemas.GigStatsSetEntry(
set_name=set_obj.setlist_name or set_obj.name or f"Set {len(set_entries) + 1}",
feedback_avg=set_feedback_avg,
songs=set_songs,
))
feedback_distribution = {}
for fv in feedback_values:
feedback_distribution[fv] = feedback_distribution.get(fv, 0) + 1
feedback_avg = round(sum(feedback_values) / len(feedback_values), 2) if feedback_values else None
return schemas.GigStatistics(
gig_id=gig.id,
gig_name=gig.name,
gig_date=gig.datum.strftime('%Y-%m-%d') if gig.datum else '',
song_count=song_count,
skipped_count=skipped_count,
inserted_count=inserted_count,
feedback_count=len(feedback_values),
feedback_avg=feedback_avg,
feedback_distribution=feedback_distribution,
genre_distribution=genre_counts,
genre_timeline=genre_timeline,
sets=set_entries,
)
[Doku]
@router.get("/{gig_id}/schedule/", response_model=schemas.GigScheduleOut)
def get_gig_schedule(
gig_id: int,
db: Session = Depends(auth.get_db),
current_user=Depends(auth.get_current_user),
):
gig = db.query(models.Gig).get(gig_id)
if not gig:
raise HTTPException(status_code=404, detail="Gig not found")
return _build_schedule_response(gig)
[Doku]
@router.get("/{gig_id}/schedule.pdf", response_class=Response)
def get_gig_schedule_pdf(
gig_id: int,
db: Session = Depends(auth.get_db),
current_user=Depends(auth.get_current_user),
):
gig = db.query(models.Gig).get(gig_id)
if not gig:
raise HTTPException(status_code=404, detail="Gig not found")
pdf_bytes = _build_schedule_pdf(gig)
safe_name = "".join(ch if ch.isalnum() else "_" for ch in (gig.name or "gig"))
headers = {"Content-Disposition": f"attachment; filename=Ablaufplan_{safe_name}_{gig_id}.pdf"}
return Response(pdf_bytes, media_type="application/pdf", headers=headers)
[Doku]
@router.post("/{gig_id}/schedule/", response_model=schemas.GigScheduleOut)
def create_gig_schedule_item(
gig_id: int,
payload: schemas.GigScheduleItemIn,
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user),
):
if not check_editor(current):
raise HTTPException(status_code=403, detail="Not enough permissions")
gig = db.query(models.Gig).get(gig_id)
if not gig:
raise HTTPException(status_code=404, detail="Gig not found")
_raise_if_schedule_conflict(db, gig, payload.item_datetime)
item = models.GigScheduleItem(gig_id=gig_id, **payload.model_dump())
db.add(item)
try:
db.commit()
except IntegrityError:
db.rollback()
raise HTTPException(status_code=409, detail="Zeitpunkt ist bereits belegt.")
db.refresh(gig)
return _build_schedule_response(gig)
[Doku]
@router.put("/{gig_id}/schedule/", response_model=schemas.GigScheduleOut)
def update_gig_schedule_bulk(
gig_id: int,
payload: schemas.GigScheduleBulkUpdateIn,
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user),
):
if not check_editor(current):
raise HTTPException(status_code=403, detail="Not enough permissions")
gig = db.query(models.Gig).get(gig_id)
if not gig:
raise HTTPException(status_code=404, detail="Gig not found")
fixed_datetimes = {fixed_dt for _, fixed_dt in _fixed_schedule_points(gig)}
payload_datetimes: set[datetime] = set()
payload_ids: set[int] = set()
for item in payload.items:
if item.item_datetime in payload_datetimes:
raise HTTPException(status_code=409, detail="Zeitpunkt doppelt im Ablaufplan-Payload.")
payload_datetimes.add(item.item_datetime)
if item.item_datetime in fixed_datetimes:
raise HTTPException(status_code=409, detail="Zeitpunkt kollidiert mit einem festen Eintrag.")
if item.id is not None:
if item.id in payload_ids:
raise HTTPException(status_code=409, detail="Eintrag-ID doppelt im Ablaufplan-Payload.")
payload_ids.add(item.id)
existing_items = db.query(models.GigScheduleItem).filter_by(gig_id=gig_id).all()
existing_by_id = {item.id: item for item in existing_items}
unknown_ids = [item_id for item_id in payload_ids if item_id not in existing_by_id]
if unknown_ids:
raise HTTPException(status_code=404, detail="Schedule item not found")
try:
# Replace-all behavior: remove dynamic items not present in payload.
for item in existing_items:
if item.id not in payload_ids:
db.delete(item)
db.flush()
updates = []
for item_data in payload.items:
if item_data.id is not None:
updates.append((existing_by_id[item_data.id], item_data))
# Move updated rows to guaranteed-free temporary datetimes first so swaps are possible.
if updates:
used_datetimes = {item.item_datetime for item in existing_items}
used_datetimes.update(fixed_datetimes)
used_datetimes.update(payload_datetimes)
temp_anchor = datetime(9999, 12, 31, 23, 59, 59)
temp_offset_seconds = 0
for item, _ in updates:
temp_dt = temp_anchor - timedelta(seconds=temp_offset_seconds)
while temp_dt in used_datetimes:
temp_offset_seconds += 1
temp_dt = temp_anchor - timedelta(seconds=temp_offset_seconds)
temp_offset_seconds += 1
used_datetimes.add(temp_dt)
item.item_datetime = temp_dt
db.flush()
for item_data in payload.items:
if item_data.id is not None:
item = existing_by_id[item_data.id]
item.item_datetime = item_data.item_datetime
item.was = item_data.was
item.wer = item_data.wer
item.wo = item_data.wo
else:
db.add(models.GigScheduleItem(gig_id=gig_id, **item_data.model_dump(exclude={"id"})))
db.commit()
except IntegrityError:
db.rollback()
raise HTTPException(status_code=409, detail="Zeitpunkt ist bereits belegt.")
db.refresh(gig)
return _build_schedule_response(gig)
[Doku]
@router.put("/{gig_id}/schedule/{item_id}", response_model=schemas.GigScheduleOut)
def update_gig_schedule_item(
gig_id: int,
item_id: int,
payload: schemas.GigScheduleItemIn,
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user),
):
if not check_editor(current):
raise HTTPException(status_code=403, detail="Not enough permissions")
gig = db.query(models.Gig).get(gig_id)
if not gig:
raise HTTPException(status_code=404, detail="Gig not found")
item = db.query(models.GigScheduleItem).filter_by(id=item_id, gig_id=gig_id).first()
if not item:
raise HTTPException(status_code=404, detail="Schedule item not found")
_raise_if_schedule_conflict(db, gig, payload.item_datetime, exclude_item_id=item_id)
for key, value in payload.model_dump().items():
setattr(item, key, value)
try:
db.commit()
except IntegrityError:
db.rollback()
raise HTTPException(status_code=409, detail="Zeitpunkt ist bereits belegt.")
db.refresh(gig)
return _build_schedule_response(gig)
[Doku]
@router.delete("/{gig_id}/schedule/{item_id}", response_model=schemas.GigScheduleOut)
def delete_gig_schedule_item(
gig_id: int,
item_id: int,
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user),
):
if not check_editor(current):
raise HTTPException(status_code=403, detail="Not enough permissions")
gig = db.query(models.Gig).get(gig_id)
if not gig:
raise HTTPException(status_code=404, detail="Gig not found")
item = db.query(models.GigScheduleItem).filter_by(id=item_id, gig_id=gig_id).first()
if not item:
raise HTTPException(status_code=404, detail="Schedule item not found")
db.delete(item)
db.commit()
db.refresh(gig)
return _build_schedule_response(gig)
[Doku]
@router.put("/{gig_id}", response_model=schemas.GigOut)
def update_gig(
gig_id: int,
gig: schemas.GigIn,
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user)
):
if not check_editor(current):
logger.error(f"Permission denied: User {current['user_name']} is not editor")
raise HTTPException(status_code=401, detail="User role does not allow to update a gig!")
logger.info(f"Updating gig in database with gig_id={gig_id}")
db_gig = db.query(models.Gig).get(gig_id)
if not db_gig:
raise HTTPException(status_code=404, detail="Gig not found")
for k, v in gig.model_dump(exclude_unset=True).items():
setattr(db_gig, k, v)
db.commit()
db.refresh(db_gig)
return db_gig
[Doku]
def parse_name(full_name: str) -> dict:
"""
Teilt einen vollständigen Namen in Vor- und Nachname auf.
Falls nur ein Name vorhanden ist, wird dieser als Nachname verwendet.
"""
if not full_name or not full_name.strip():
return {'vorname': '', 'nachname': ''}
# Mehrere Namen durch Komma/Semikolon getrennt -> nur ersten nehmen
if ',' in full_name:
full_name = full_name.split(',')[0].strip()
elif ';' in full_name:
full_name = full_name.split(';')[0].strip()
elif '/' in full_name:
full_name = full_name.split('/')[0].strip()
parts = full_name.strip().split()
if len(parts) == 0:
return {'vorname': '', 'nachname': ''}
elif len(parts) == 1:
return {'vorname': '', 'nachname': parts[0]}
else:
# Letzter Teil ist Nachname, Rest ist Vorname
return {
'vorname': ' '.join(parts[:-1]),
'nachname': parts[-1]
}
[Doku]
@router.get("/{gig_id}/gemalist")
def download_gemalist(
gig_id: int,
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user)
):
logger.info(f"Generating GEMA list for gig_id={gig_id}")
# Gig laden mit allen Sets und Songs
gig = db.query(models.Gig).filter(models.Gig.id == gig_id).first()
if not gig:
raise HTTPException(status_code=404, detail="Gig not found")
# Excel-Workbook erstellen
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "GEMA-Meldung"
# Header-Informationen (Zeilen 1-18 wie in Vorlage)
ws['A1'] = 'Excel-Vorlage zum Hochladen von Titeln'
ws['C1'] = 'Version: 1.2'
ws['K1'] = 'Live'
ws['L1'] = 'Ja'
ws['M1'] = 'Ja'
ws['N1'] = 'F'
# Gig-Informationen in Kopfbereich einfügen
ws['A18'] = 'Setlist'
# Header-Zeile (Zeile 19)
headers = [
'WERKNUMMER / WERKFASSUNGSNUMMER',
'TITEL*',
'SATZANGABE / SONSTIGE(R) Titel',
'ANZAHL MUSIKER / SÄNGER*',
'SPIELDAUER* (MM:SS)',
'INTERPRET / KOMPONIST* (Nachname)',
'INTERPRET / KOMPONIST (Vorname)',
'TEXTDICHTER (Nachname)',
'TEXTDICHTER (Vorname)',
'BEARBEITER (Nachname)',
'BEARBEITER (Vorname)',
'VERLAG',
'LIVE/TONTRÄGER',
'VERÖFFENTLICHTES WERK',
'POTPOURRI/FRAGMENT'
]
for col_idx, header in enumerate(headers, start=1):
cell = ws.cell(row=19, column=col_idx, value=header)
cell.font = Font(bold=True)
cell.alignment = Alignment(wrap_text=True, vertical='top')
# Datenzeilen ab Zeile 20
current_row = 20
# Alle Sets durchgehen (sortiert nach Position)
for gigset in sorted(gig.sets, key=lambda x: x.position):
set_obj = gigset.set
# Alle Songs im Set durchgehen (sortiert nach Position)
for setsong in sorted(set_obj.songs, key=lambda ss: ss.position):
if setsong.uebersprungen:
continue # Übersprungene Songs nicht melden
song = setsong.song
# Spalte A: Werknummer (optional, leer lassen)
ws.cell(row=current_row, column=1, value='')
# Spalte B: TITEL* (Pflichtfeld)
ws.cell(row=current_row, column=2, value=song.title or '')
# Spalte C: SATZANGABE (optional)
ws.cell(row=current_row, column=3, value='')
# Spalte D: ANZAHL MUSIKER (optional, kann leer bleiben bei Pop/Rock)
ws.cell(row=current_row, column=4, value='')
# Spalte E: SPIELDAUER* (MM:SS) (Pflichtfeld)
if song.duration:
if isinstance(song.duration, time):
duration_str = song.duration.strftime("%M:%S")
else:
duration_str = "00:00"
else:
duration_str = "00:00"
ws.cell(row=current_row, column=5, value=duration_str)
# Spalte F: INTERPRET / KOMPONIST* Nachname (Pflichtfeld)
# Composer aufteilen in Vor- und Nachname
composer_parts = parse_name(song.composer)
ws.cell(row=current_row, column=6, value=composer_parts['nachname'])
# Spalte G: INTERPRET / KOMPONIST Vorname
ws.cell(row=current_row, column=7, value=composer_parts['vorname'])
# Spalte H: TEXTDICHTER Nachname
texter_parts = parse_name(song.texter)
ws.cell(row=current_row, column=8, value=texter_parts['nachname'])
# Spalte I: TEXTDICHTER Vorname
ws.cell(row=current_row, column=9, value=texter_parts['vorname'])
# Spalte J: BEARBEITER Nachname
arrangement_parts = parse_name(song.arrangement)
ws.cell(row=current_row, column=10, value=arrangement_parts['nachname'])
# Spalte K: BEARBEITER Vorname
ws.cell(row=current_row, column=11, value=arrangement_parts['vorname'])
# Spalte L: VERLAG
ws.cell(row=current_row, column=12, value=song.publisher or '')
# Spalte M: LIVE/TONTRÄGER
ws.cell(row=current_row, column=13, value='Live')
# Spalte N: VERÖFFENTLICHTES WERK (J/N)
ws.cell(row=current_row, column=14, value='J')
# Spalte O: POTPOURRI/FRAGMENT (J/N)
ws.cell(row=current_row, column=15, value='N')
current_row += 1
# Spaltenbreiten anpassen
ws.column_dimensions['A'].width = 15
ws.column_dimensions['B'].width = 40
ws.column_dimensions['C'].width = 30
ws.column_dimensions['D'].width = 12
ws.column_dimensions['E'].width = 15
ws.column_dimensions['F'].width = 20
ws.column_dimensions['G'].width = 20
ws.column_dimensions['H'].width = 20
ws.column_dimensions['I'].width = 20
ws.column_dimensions['J'].width = 20
ws.column_dimensions['K'].width = 20
ws.column_dimensions['L'].width = 25
ws.column_dimensions['M'].width = 15
ws.column_dimensions['N'].width = 18
ws.column_dimensions['O'].width = 18
# Excel in Memory speichern
output = BytesIO()
wb.save(output)
output.seek(0)
# Dateiname mit Gig-Informationen
filename = f"GEMA_Meldung_{gig.name}_{gig.datum.strftime('%Y-%m-%d')}.xlsx"
return StreamingResponse(
output,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
[Doku]
@router.get("/{gig_id}/setlist.pdf", response_class=Response)
def download_setlist(
gig_id: int,
design: Literal["dark", "print"] = Query(
"dark",
description="Design-Variante fuer die Setlisten-PDF (dark oder print).",
),
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user),
):
logger.info(f"Generating setlist PDF for gig_id={gig_id}")
service = SetlistService(db) # <- sync Session
gig = service.load_gig(gig_id)
#gig.debug_dump()
if not gig:
logger.error(f"Gig with id={gig_id} not found for PDF generation")
raise HTTPException(status_code=404, detail="Gig not found")
schedule = service.calc_schedule(gig)
service.dump_gig_struct(gig)
# Dynamische Sänger-Farben: Farben werden aus einer Palette zugewiesen
SINGER_COLOR_PALETTE = [
"#29B619", "#227FFF", "#E644C3", "#FF8C00", "#8B5CF6",
"#059669", "#DC2626", "#CA8A04", "#0891B2", "#6366F1",
]
singers = sorted({
(setsong.song.singer_lead or "").replace("+"," ").replace(",", " ").split(" ")[0]
for gs in gig.sets
for setsong in gs.set.songs
if setsong.song.singer_lead
})
singer_colors = {
singer: SINGER_COLOR_PALETTE[i % len(SINGER_COLOR_PALETTE)]
for i, singer in enumerate(singers)
}
pdf_bytes = SetlistPDF(gig, schedule, singer_colors, style_mode=design).build().getvalue()
mode_suffix = "_druckfreundlich" if design == "print" else ""
headers = {"Content-Disposition": f"inline; filename=Setliste_{gig.name}{mode_suffix}.pdf"}
return Response(
pdf_bytes,
media_type="application/pdf",
headers=headers
)
[Doku]
@router.get("/{gig_id}/setlist", response_model=schemas.GigSetlistOut)
def get_gig_setlist(
gig_id: int,
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user)
):
logger.info(f"Fetching gig setlist from database with gig_id={gig_id}")
gig = db.query(models.Gig).get(gig_id)
if not gig:
logger.error(f"Gig with id={gig_id} not found")
raise HTTPException(status_code=404, detail="Gig not found")
# Prüfe auf korrupte SetSongs (ohne Song-Referenz) und entferne sie
for gigset in gig.sets:
set_obj = gigset.set
corrupted_setsongs = [ss for ss in set_obj.songs if not ss.song]
if corrupted_setsongs:
logger.warning(f"Found {len(corrupted_setsongs)} corrupted SetSongs in Set {set_obj.id}, removing them")
for ss in corrupted_setsongs:
db.delete(ss)
db.commit()
payload = gig.to_dict() # siehe vorige Antwort
payload["timing"] = serialize_timing_for_api(calculate_setlist_timing(gig))
return payload
[Doku]
@router.get("/{gig_id}/forscore-setlist", response_class=Response)
def export_forscore_setlist(
gig_id: int,
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user)
):
import plistlib
logger.info(f"Generating forScore setlist for gig_id={gig_id}")
gig = db.query(models.Gig).get(gig_id)
if not gig:
logger.error(f"Gig with id={gig_id} not found")
raise HTTPException(status_code=404, detail="Gig not found")
setlist_name = (gig.name or "libreStage Setlist").strip() or "libreStage Setlist"
setlist_items = []
for gigset in sorted(gig.sets, key=lambda x: x.position):
set_obj = gigset.set
setsonglist = sorted(set_obj.songs, key=lambda ss: ss.position)
for setsong in setsonglist:
if setsong.song:
setlist_items.append({
"title": setsong.song.title,
"setlist": setlist_name,
})
try:
xml_data = plistlib.dumps(setlist_items, fmt=plistlib.FMT_XML)
except Exception as e:
logger.error(f"Failed to generate PLIST for forScore: {e}")
raise HTTPException(status_code=500, detail="Fehler bei der PLIST-Generierung")
formatted_date = gig.datum.strftime('%Y-%m-%d') if gig.datum else "gig"
safe_name = "".join(c for c in gig.name if c.isalnum() or c in ("-", "_", " ")).strip().replace(" ", "_")
filename = f"Setlist-{formatted_date}-{safe_name}.4ss"
headers = {
"Content-Disposition": f'attachment; filename="{filename}"'
}
return Response(
content=xml_data,
media_type="application/x-forscore-setlist",
headers=headers
)
# @app.put("/append_song_to_set", response_model=schemas.GigSetlistOut)
# def append_song_to_set(
# data: schemas.SongToSetIn,
# db: Session = Depends(auth.get_db),
# current=Depends(auth.get_current_user)
# ):
# db_gig = db.query(models.Gig).filter_by(id=data.gigId).first()
# print(f"Appending {data.sondId} to {data.setId} on position {data.position}!")
# return db_gig
[Doku]
@router.put("/{gig_id}/update_setlist/", response_model=schemas.GigSetlistOut)
def update_gig_setlist(
gig_id: int,
gig: schemas.GetSetlistIn,
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user)
):
logger.info(f"Updating gig setlist in database with gig_id={gig_id}")
db_gig = db.query(models.Gig).filter_by(id=gig_id).first()
if not db_gig:
logger.error(f"Gig with id={gig_id} not found for setlist update")
raise HTTPException(status_code=404, detail="Gig not found")
if not check_editor(current):
logger.error(f"Permission denied: User {current['user_name']} is not editor")
raise HTTPException(status_code=403, detail="Not enough permissions")
# Aktuelle Sets des Gigs
old_gigsets = db.query(models.GigSet).filter_by(id_gig=gig_id).all()
existing_sets = {gs.set.id: gs.set for gs in old_gigsets}
old_gigset_ids = [gs.id for gs in old_gigsets]
# Input-Set-IDs und -Songs extrahieren
input_set_ids = set(sd.set_id for sd in gig.sets if getattr(sd, 'set_id', None))
# 1. Entferne nur GigSet-Verknüpfungen, die im neuen Input fehlen
for gigset in old_gigsets:
if gigset.set.id not in input_set_ids:
db.delete(gigset)
db.commit()
# 2. Sets löschen, die nicht mehr genutzt werden
for set_id in existing_sets:
if set_id not in input_set_ids:
db.query(models.SetSong).filter_by(id_set=set_id).delete()
db.query(models.Set).filter_by(id=set_id).delete()
db.commit()
# 3. Update/Erstelle Sets und Songs
new_gigsets = []
for set_pos, set_data in enumerate(gig.sets, start=1):
# Existierendes Set, falls vorhanden
if getattr(set_data, 'set_id', None) and set_data.set_id in existing_sets:
set_obj = existing_sets[set_data.set_id]
# Update Eigenschaften des Sets
set_obj.name = set_data.set_name
set_obj.pause = time.fromisoformat(set_data.pause) if set_data.pause else None
set_obj.setlist_name = set_data.setlist_name
db.flush()
# Speichere Live-Mode-Daten der bestehenden SetSongs
old_setsongs = db.query(models.SetSong).filter_by(id_set=set_obj.id).all()
livemode_data = {} # Key: song_id, Value: {eingeschoben, uebersprungen, feedback}
for old_ss in old_setsongs:
livemode_data[old_ss.id_song] = {
'eingeschoben': old_ss.eingeschoben,
'uebersprungen': old_ss.uebersprungen,
'feedback': old_ss.feedback
}
# Update Songs im Set: Entferne alle alten, füge neue hinzu
db.query(models.SetSong).filter_by(id_set=set_obj.id).delete()
else:
# Neues Set anlegen
set_obj = models.Set(
name=set_data.set_name,
pause=time.fromisoformat(set_data.pause) if set_data.pause else None,
setlist_name=set_data.setlist_name,
)
db.add(set_obj)
db.flush()
livemode_data = {} # Keine alten Daten bei neuem Set
# Jetzt Songs der Reihe nach neu anlegen
for song_pos, song_data in enumerate(set_data.songs, start=1):
song = db.query(models.Song).filter_by(id=song_data.song_id).first()
if not song:
raise HTTPException(status_code=400, detail=f"Song not found")
# Hole alte Live-Mode-Daten wenn vorhanden
old_data = livemode_data.get(song.id, {})
setsong = models.SetSong(
id_set=set_obj.id,
id_song=song.id,
position=song_pos,
eingeschoben=old_data.get('eingeschoben'),
uebersprungen=old_data.get('uebersprungen'),
feedback=old_data.get('feedback')
)
db.add(setsong)
# GigSet-Verknüpfung erneuern
gigset = db.query(models.GigSet).filter_by(id_gig=db_gig.id, id_set=set_obj.id).first()
if not gigset:
gigset = models.GigSet(
id_gig=db_gig.id,
id_set=set_obj.id,
position=set_pos
)
db.add(gigset)
else:
gigset.position = set_pos
new_gigsets.append(gigset)
db.commit()
db.refresh(db_gig)
payload = db_gig.to_dict()
payload["timing"] = serialize_timing_for_api(calculate_setlist_timing(db_gig))
return payload
[Doku]
@router.post("/")
def create_new_gig(
gig: schemas.NewGig,
jahr = Query(None, description="Das Jahr der Gigs"),
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user)
):
logger.info(f"Creating new gig in database with name={gig.name}")
if not check_editor(current):
logger.error(f"Permission denied: User {current['user_name']} is not admin or editor")
raise HTTPException(status_code=401, detail="User role does not allow to create a new gig!")
gig_to_add = models.Gig(
name=gig.name,
datum=gig.datum,
kind_of_gig=gig.kind_of_gig,
organizer=gig.organizer,
venue=gig.venue,
doors=gig.doors,
begin=gig.begin,
end=gig.end,
status=gig.status,
publish=False,
)
db.add(gig_to_add)
db.commit()
query = db.query(models.Gig).order_by(models.Gig.datum.desc())
if jahr is not None:
query = query.filter(models.Gig.datum.startswith(str(jahr)))
# Mattermost notification
try:
from backend.utils import mattermost
message = f":mega: Es wurde soeben ein neuer Gig erstellt:\n\tName: **{gig.name}**\n\tDatum: **{gig.datum.strftime('%d.%m.%Y')}**"
if gig.begin:
message += f"\n\tBeginn: **{gig.begin.strftime('%H:%M Uhr')}**"
if gig.end:
message += f"\n\tEnde: **{gig.end.strftime('%H:%M Uhr')}**"
if gig.venue:
message += f"\n\tOrt: **{gig.venue}**"
if gig.organizer:
message += f"\n\tVeranstalter: **{gig.organizer}**"
mattermost.send_mm_message(channel=MM_CHANNEL, text=message)
except Exception as e:
logger.error(f"Failed to send Mattermost message: {e}")
return query.all()
[Doku]
@router.delete("/{gig_id}", response_model=List[schemas.GigOut])
def delete_gig(
gig_id: int,
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user)
):
logger.info(f"Deleting gig in database with gig_id={gig_id}")
if not check_editor(current):
logger.error(f"Permission denied: User {current['user_name']} is not admin or editor")
raise HTTPException(status_code=401, detail="User role does not allow to delete a gig!")
gig_to_del = db.query(models.Gig).get(gig_id)
if not gig_to_del:
logger.error(f"Gig with id={gig_id} not found for deletion")
raise HTTPException(status_code=404, detail="Gig not found")
for gs in list(gig_to_del.sets):
logger.info("Deleting Gigset")
# DELETE ALL SETS
db.delete(gs)
# DELETE GIG
db.delete(gig_to_del)
db.commit()
query = db.query(models.Gig).order_by(models.Gig.datum.desc())
jahr = None
if jahr is not None:
query = query.filter(models.Gig.datum.startswith(str(jahr))) # pragma: no cover
# Mattermost notification
try:
from backend.utils import mattermost
message = f":mega: Der Gig **{gig_to_del.name}** am **{gig_to_del.datum.strftime('%d.%m.%Y')}** wurde abgesagt."
mattermost.send_mm_message(channel=MM_CHANNEL, text=message)
except Exception as e:
logger.error(f"Failed to send Mattermost message: {e}")
return query.all()
[Doku]
@router.get("/{gig_id}/setlist_available")
def is_setlist_available(
gig_id: int,
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user)
):
logger.info(f"Checking if setlist is available for gig_id={gig_id}")
gig = db.query(models.Gig).get(gig_id)
if not gig:
logger.error(f"Gig with id={gig_id} not found for setlist availability check")
raise HTTPException(status_code=404, detail="Gig not found")
set_available = len(gig.sets) > 0
setsong_available = any(len(gs.set.songs) > 0 for gs in gig.sets)
is_available = set_available and setsong_available
logger.info(f"Setlist availability for gig_id={gig_id}: {is_available}")
return {"setlist_available": is_available}