# libre-stage - Band rehearsal and gig management software
# Copyright (C) 2026 libre-stage contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Admin router – user and system management.
All endpoints are protected by :func:`user_is_admin` which verifies
that the authenticated user holds the ``admin`` role.
Prefix: ``/admin`` | Tag: ``admin``
"""
from fastapi import APIRouter, Depends, HTTPException, Request
import logging
from sqlalchemy.orm import Session
from typing import List
from backend import models, schemas, auth
from backend.utils.mailer import send_email
import os
from dotenv import load_dotenv
from backend.utils.check_permissions import check_admin
from backend.app_config import (
SOFT_CONFIG_KEYS,
ConfigValidationError,
get_soft_config,
get_soft_config_updated_at,
update_soft_config,
)
logger = logging.getLogger("uvicorn.error")
# suppress progress polls to reduce log clutter
block_endpoints = ["/admin/log"]
load_dotenv(".env")
[Doku]
async def user_is_admin(request: Request, db: Session = Depends(auth.get_db)):
"""
Router-level dependency: allow access only for admin users.
Reads the token from the request (header or cookie), validates it
and checks the ``admin`` role.
Args:
request (Request): Incoming FastAPI request.
db (Session): Active database session.
Returns:
dict: Current user payload (``user_name``, ``user_group``).
Raises:
HTTPException 403: If the authenticated user is not an admin.
HTTPException 401: If authentication fails.
"""
logger.info(f"[Admin-Check] Request Path: {request.url.path}")
logger.info(f"[Admin-Check] Cookies: {list(request.cookies.keys())}")
logger.info(f"[Admin-Check] Auth Header: {request.headers.get('Authorization', 'None')}")
try:
# Get current user using the cookie/header token
user = auth.get_current_user(request, db)
logger.info(f"[OK] User authenticated: {user.get('user_name')}, Group: {user.get('user_group')}")
if check_admin(user):
logger.info(f"[OK] User is admin - access granted")
return user
else:
logger.warning(f"[DENIED] User {user.get('user_name')} is NOT admin: {user.get('user_group')}")
raise HTTPException(status_code=403, detail="User is not Admin!")
except HTTPException as e:
logger.error(f"[ERROR] Auth failed with HTTPException: {e.detail}")
raise
except Exception as e:
logger.error(f"[ERROR] Auth failed with Exception: {type(e).__name__}: {e}")
raise HTTPException(status_code=401, detail="Authentication failed")
router = APIRouter(
prefix="/admin", tags=["admin"], dependencies=[Depends(user_is_admin)]
)
[Doku]
@router.get("/config/soft", response_model=schemas.SoftConfigAdminResponse)
def admin_get_soft_config():
return {
"data": get_soft_config(),
"meta": {
"editableKeys": list(SOFT_CONFIG_KEYS),
"updatedAt": get_soft_config_updated_at(),
},
}
[Doku]
@router.put("/config/soft", response_model=schemas.SoftConfigUpdateResponse)
def admin_update_soft_config(
data: schemas.SoftConfigUpdateIn,
current=Depends(auth.get_current_user),
):
try:
updated = update_soft_config(data.model_dump())
except ConfigValidationError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
logger.info(
"Admin '%s' updated soft config keys: %s",
current.get("user_name", "unknown"),
", ".join(SOFT_CONFIG_KEYS),
)
return {
"message": "Soft config updated",
"updatedKeys": list(SOFT_CONFIG_KEYS),
"data": updated,
}
[Doku]
class LogFilter(logging.Filter): # pragma: no cover
[Doku]
def filter(self, record):
if record.args and len(record.args) >= 3:
if record.args[2] in block_endpoints: # type: ignore
return False
return True
uvicorn_logger = logging.getLogger("uvicorn.access")
uvicorn_logger.addFilter(LogFilter())
[Doku]
@router.get("/users", response_model=List[schemas.UserOut])
def get_all_users(
db: Session = Depends(auth.get_db),
):
users = db.query(models.User).all()
for user in users:
if user.musician == None:
user.musician = False
return users
[Doku]
@router.put("/users/{user_id}")#, response_model=schemas.UserOut)
def update_user(
user_id: int,
data: schemas.UserOut,
request: Request,
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user),
):
logger.info(f"Admin updating user {user_id}")
user_db = db.query(models.User).filter(models.User.id == user_id).first()
if not user_db:
raise HTTPException(status_code=404, detail="User not found")
# check if user_name is changing and if the new one is already taken
changes = []
if user_db.user_name != data.user_name:
existing_user = db.query(models.User).filter(models.User.user_name == data.user_name).first()
if existing_user:
raise HTTPException(status_code=400, detail="Username already taken")
changes.append(f"Username: {user_db.user_name} → {data.user_name}")
if user_db.user_group != data.user_group:
changes.append(f"Rolle: {user_db.user_group} → {data.user_group}")
if user_db.email != data.email:
changes.append(f"E-Mail: {user_db.email} → {data.email}")
if user_db.clear_name != data.clear_name:
changes.append(f"Klarname: {user_db.clear_name} → {data.clear_name}")
if user_db.mm_username != data.mm_username:
from backend.utils.mattermost import send_mm_message
test_status = False
if data.mm_username == None or data.mm_username.strip() == "":
changes.append(f"Mattermost Nutzername: {user_db.mm_username} → {data.mm_username}")
else:
try:
test_status = send_mm_message(f"Dein Mattermost Nutzername wurde von '{user_db.mm_username}' zu '{data.mm_username}' geändert.", channel=f"@{data.mm_username}")
except Exception as e:
logger.error(f"Fehler beim Senden der Testnachricht an neuen Mattermost Nutzernamen '{data.mm_username}': {e}. Wird nicht geändet")
raise HTTPException(status_code=400, detail=f"Error sending test message to new Mattermost @{data.mm_username}'. Change not applied.")
if test_status:
changes.append(f"Mattermost Nutzername: {user_db.mm_username} → {data.mm_username}")
# Status-Änderung prüfen
new_status = data.status if data.status else "active"
if user_db.status != new_status:
# Admins dürfen sich nicht selbst deaktivieren
if new_status == "deactivated" and user_db.user_name == current["user_name"]:
raise HTTPException(status_code=403, detail="You cannot deactivate your own account")
changes.append(f"Status: {user_db.status} → {new_status}")
user_db.user_name = data.user_name
user_db.clear_name = data.clear_name
user_db.email = data.email
user_db.user_group = data.user_group
user_db.musician = data.musician
user_db.is_singer = data.is_singer
user_db.mm_username = data.mm_username
# Bei Deaktivierung alle Refresh-Tokens widerrufen (vor dem Status-Update prüfen)
if new_status == "deactivated" and user_db.status != "deactivated":
db.query(models.RefreshToken).filter(
models.RefreshToken.user_id == user_id,
models.RefreshToken.revoked == False
).update({"revoked": True})
user_db.status = new_status
db.commit()
db.refresh(user_db)
# E-Mail senden falls Änderungen vorgenommen wurden
if changes and user_db.email:
subject = "Dein Benutzerkonto wurde aktualisiert"
body = f"Hallo {user_db.clear_name or user_db.user_name},\n\n"
body += "Dein Benutzerkonto wurde von einem Administrator geändert:\n\n"
body += "\n".join(changes)
body += "\n\nBei Fragen wende dich an einen Admin."
send_email(user_db.email, subject, body)
return user_db
[Doku]
@router.delete("/users/{user_id}")
def deactivate_user(
user_id: int,
request: Request,
db: Session = Depends(auth.get_db),
current=Depends(auth.get_current_user),
):
user_db = db.query(models.User).filter(models.User.id == user_id).first()
if not user_db:
raise HTTPException(status_code=404, detail="User not found")
# Admins dürfen sich nicht selbst deaktivieren
if user_db.user_name == current["user_name"]:
raise HTTPException(status_code=403, detail="You cannot deactivate your own account")
if user_db.status == "deactivated":
raise HTTPException(status_code=400, detail="User is already deactivated")
user_db.status = "deactivated"
# Alle Refresh-Tokens des Users widerrufen → sofortiger Logout aller Geräte
db.query(models.RefreshToken).filter(
models.RefreshToken.user_id == user_id,
models.RefreshToken.revoked == False
).update({"revoked": True})
db.commit()
logger.info(f"Admin deactivated user {user_id} ({user_db.user_name})")
# E-Mail-Benachrichtigung
if user_db.email:
try:
send_email(
user_db.email,
"Dein Konto wurde deaktiviert",
f"Hallo {user_db.clear_name or user_db.user_name},\n\n"
f"dein Benutzerkonto wurde von einem Administrator deaktiviert.\n"
f"Bei Fragen wende dich an einen Admin."
)
except Exception as e:
logger.error(f"Failed to send deactivation email to {user_db.email}: {e}")
return {"message": f"User {user_id} deactivated"}
[Doku]
@router.put("/users/{user_id}/activate")
def activate_user(
user_id: int,
db: Session = Depends(auth.get_db),
):
user_db = db.query(models.User).filter(models.User.id == user_id).first()
if not user_db:
raise HTTPException(status_code=404, detail="User not found")
if user_db.status == "active":
raise HTTPException(status_code=400, detail="User is already active")
user_db.status = "active"
db.commit()
logger.info(f"Admin activated user {user_id} ({user_db.user_name})")
# E-Mail-Benachrichtigung
if user_db.email:
try:
send_email(
user_db.email,
"Dein Konto wurde reaktiviert",
f"Hallo {user_db.clear_name or user_db.user_name},\n\n"
f"dein Benutzerkonto wurde von einem Administrator reaktiviert.\n"
f"Du kannst dich nun wieder einloggen."
)
except Exception as e:
logger.error(f"Failed to send activation email to {user_db.email}: {e}")
return {"message": f"User {user_id} activated"}
[Doku]
@router.post("/users", response_model=schemas.UserOut)
def create_user(
user: schemas.UserCreate,
db: Session = Depends(auth.get_db),
):
db_user = models.User(
user_name=user.user_name,
user_pw=auth.hash_pw(user.user_pw),
user_group=user.user_group,
musician=user.musician,
clear_name=user.clear_name,
email=user.email,
is_singer=user.is_singer,
status="active",
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
[Doku]
@router.put("/trigger_password_reset/{user_id}")
def trigger_password_reset(
user_id: int,
db: Session = Depends(auth.get_db),
):
user_db = db.query(models.User).filter(models.User.id == user_id).first()
if not user_db:
raise HTTPException(status_code=404, detail="User not found")
if not user_db.email:
raise HTTPException(status_code=400, detail="User has no email address, cannot send reset link.")
# Generate a password reset token (for simplicity, using a timestamp here)
reset_token = auth.create_password_reset_token(user_db.user_name)
reset_link = f"{os.getenv('FRONTEND_URL')}/password_reset?token={reset_token}"
# Send the reset link via Mattermost if the user has a mm_username, otherwise send via email
if user_db.mm_username:
from backend.utils.mattermost import send_mm_message
try:
send_mm_message(f"Ein Passwort-Reset wurde für dein Konto angefordert. Klicke hier, um dein Passwort zurückzusetzen: {reset_link}\n\n**Dieser Link ist für 15 Minuten gültig!**", channel=f"@{user_db.mm_username}")
logger.info(f"Password reset link sent to Mattermost user @{user_db.mm_username}")
except Exception as e:
logger.error(f"Error sending password reset link to Mattermost user @{user_db.mm_username}: {e}. Falling back to email.")
send_email(user_db.email, "Password Reset Request", f"Ein Passwort-Reset wurde für dein Konto angefordert. Klicke hier, um dein Passwort zurückzusetzen: {reset_link}\n\nDieser Link ist für 15 Minuten gültig!")
else:
send_email(user_db.email, "Password Reset Request", f"Ein Passwort-Reset wurde für dein Konto angefordert. Klicke hier, um dein Passwort zurückzusetzen: {reset_link}\n\nDieser Link ist für 15 Minuten gültig!")
logger.info(f"Password reset link sent to email {user_db.email}")
return {"message": "Password reset link sent to user's email."}