124 lines
3.7 KiB
Python
124 lines
3.7 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from typing import List
|
|
from uuid import UUID
|
|
from sqlalchemy import and_
|
|
|
|
from app.models import ChannelDB, ChannelCreate, ChannelUpdate, ChannelResponse
|
|
from app.utils.database import get_db
|
|
from app.auth.dependencies import get_current_user, require_roles
|
|
from app.models.auth import CognitoUser
|
|
|
|
router = APIRouter(
|
|
prefix="/channels",
|
|
tags=["channels"]
|
|
)
|
|
|
|
@router.post("/", response_model=ChannelResponse, status_code=status.HTTP_201_CREATED)
|
|
@require_roles("admin")
|
|
def create_channel(
|
|
channel: ChannelCreate,
|
|
db: Session = Depends(get_db),
|
|
user: CognitoUser = Depends(get_current_user)
|
|
):
|
|
"""Create a new channel"""
|
|
# Check for duplicate channel (same group_title + name)
|
|
existing_channel = db.query(ChannelDB).filter(
|
|
and_(
|
|
ChannelDB.group_title == channel.group_title,
|
|
ChannelDB.name == channel.name
|
|
)
|
|
).first()
|
|
|
|
if existing_channel:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Channel with same group_title and name already exists"
|
|
)
|
|
|
|
db_channel = ChannelDB(**channel.model_dump())
|
|
db.add(db_channel)
|
|
db.commit()
|
|
db.refresh(db_channel)
|
|
return db_channel
|
|
|
|
@router.get("/{channel_id}", response_model=ChannelResponse)
|
|
def get_channel(
|
|
channel_id: UUID,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Get a channel by id"""
|
|
channel = db.query(ChannelDB).filter(ChannelDB.id == channel_id).first()
|
|
if not channel:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Channel not found"
|
|
)
|
|
return channel
|
|
|
|
@router.put("/{channel_id}", response_model=ChannelResponse)
|
|
@require_roles("admin")
|
|
def update_channel(
|
|
channel_id: UUID,
|
|
channel: ChannelUpdate,
|
|
db: Session = Depends(get_db),
|
|
user: CognitoUser = Depends(get_current_user)
|
|
):
|
|
"""Update a channel"""
|
|
db_channel = db.query(ChannelDB).filter(ChannelDB.id == channel_id).first()
|
|
if not db_channel:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Channel not found"
|
|
)
|
|
|
|
# Check for duplicate channel (same group_title + name) excluding current channel
|
|
existing_channel = db.query(ChannelDB).filter(
|
|
and_(
|
|
ChannelDB.group_title == channel.group_title,
|
|
ChannelDB.name == channel.name,
|
|
ChannelDB.id != channel_id
|
|
)
|
|
).first()
|
|
|
|
if existing_channel:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Channel with same group_title and name already exists"
|
|
)
|
|
|
|
for key, value in channel.model_dump().items():
|
|
setattr(db_channel, key, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_channel)
|
|
return db_channel
|
|
|
|
@router.delete("/{channel_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
@require_roles("admin")
|
|
def delete_channel(
|
|
channel_id: UUID,
|
|
db: Session = Depends(get_db),
|
|
user: CognitoUser = Depends(get_current_user)
|
|
):
|
|
"""Delete a channel"""
|
|
channel = db.query(ChannelDB).filter(ChannelDB.id == channel_id).first()
|
|
if not channel:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Channel not found"
|
|
)
|
|
db.delete(channel)
|
|
db.commit()
|
|
return None
|
|
|
|
@router.get("/", response_model=List[ChannelResponse])
|
|
@require_roles("admin")
|
|
def list_channels(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
db: Session = Depends(get_db),
|
|
user: CognitoUser = Depends(get_current_user)
|
|
):
|
|
"""List all channels with pagination"""
|
|
return db.query(ChannelDB).offset(skip).limit(limit).all() |