Started (incomplete) implementation of stream verification scheduler and endpoints
All checks were successful
AWS Deploy on Push / build (push) Successful in 5m18s

This commit is contained in:
2025-06-17 17:12:39 -05:00
parent abb467749b
commit a42d4c30a6
14 changed files with 1066 additions and 33 deletions

View File

@@ -32,7 +32,9 @@ def require_roles(*required_roles: str) -> Callable:
def decorator(endpoint: Callable) -> Callable:
@wraps(endpoint)
def wrapper(*args, user: CognitoUser = Depends(get_current_user), **kwargs):
async def wrapper(
*args, user: CognitoUser = Depends(get_current_user), **kwargs
):
user_roles = set(user.roles or [])
needed_roles = set(required_roles)
if not needed_roles.issubset(user_roles):

110
app/iptv/scheduler.py Normal file
View File

@@ -0,0 +1,110 @@
import logging
import os
from typing import Optional
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from fastapi import FastAPI
from sqlalchemy.orm import Session
from app.iptv.stream_manager import StreamManager
from app.models.db import ChannelDB
from app.utils.database import get_db_session
logger = logging.getLogger(__name__)
class StreamScheduler:
"""Scheduler service for periodic stream validation tasks."""
def __init__(self, app: Optional[FastAPI] = None):
"""
Initialize the scheduler with optional FastAPI app integration.
Args:
app: Optional FastAPI app instance for lifecycle integration
"""
self.scheduler = BackgroundScheduler()
self.app = app
self.batch_size = int(os.getenv("STREAM_VALIDATION_BATCH_SIZE", "10"))
self.schedule_time = os.getenv(
"STREAM_VALIDATION_SCHEDULE", "0 3 * * *"
) # Default 3 AM daily
logger.info(f"Scheduler initialized with app: {app is not None}")
def validate_streams_batch(self, db_session: Optional[Session] = None) -> None:
"""
Validate streams and update their status.
When batch_size=0, validates all channels.
Args:
db_session: Optional SQLAlchemy session
"""
db = db_session if db_session else get_db_session()
try:
manager = StreamManager(db)
# Get channels to validate
query = db.query(ChannelDB)
if self.batch_size > 0:
query = query.limit(self.batch_size)
channels = query.all()
for channel in channels:
try:
logger.info(f"Validating streams for channel {channel.id}")
manager.validate_and_select_stream(str(channel.id))
except Exception as e:
logger.error(f"Error validating channel {channel.id}: {str(e)}")
continue
logger.info(f"Completed stream validation of {len(channels)} channels")
finally:
if db_session is None:
db.close()
def start(self) -> None:
"""Start the scheduler and add jobs."""
if not self.scheduler.running:
# Add the scheduled job
self.scheduler.add_job(
self.validate_streams_batch,
trigger=CronTrigger.from_crontab(self.schedule_time),
id="daily_stream_validation",
)
# Start the scheduler
self.scheduler.start()
logger.info(
f"Stream scheduler started with daily validation job. "
f"Running: {self.scheduler.running}"
)
# Register shutdown handler if FastAPI app is provided
if self.app:
logger.info(
f"Registering scheduler with FastAPI "
f"app: {hasattr(self.app, 'state')}"
)
@self.app.on_event("shutdown")
def shutdown_scheduler():
self.shutdown()
def shutdown(self) -> None:
"""Shutdown the scheduler gracefully."""
if self.scheduler.running:
self.scheduler.shutdown()
logger.info("Stream scheduler stopped")
def trigger_manual_validation(self) -> None:
"""Trigger manual validation of streams."""
logger.info("Manually triggering stream validation")
self.validate_streams_batch()
def init_scheduler(app: FastAPI) -> StreamScheduler:
"""Initialize and start the scheduler with FastAPI integration."""
scheduler = StreamScheduler(app)
scheduler.start()
return scheduler

151
app/iptv/stream_manager.py Normal file
View File

@@ -0,0 +1,151 @@
import logging
import random
from typing import Optional
from sqlalchemy.orm import Session
from app.models.db import ChannelURL
from app.utils.check_streams import StreamValidator
from app.utils.database import get_db_session
logger = logging.getLogger(__name__)
class StreamManager:
"""Service for managing and validating channel streams."""
def __init__(self, db_session: Optional[Session] = None):
"""
Initialize StreamManager with optional database session.
Args:
db_session: Optional SQLAlchemy session. If None, will create a new one.
"""
self.db = db_session if db_session else get_db_session()
self.validator = StreamValidator()
def get_streams_for_channel(self, channel_id: str) -> list[ChannelURL]:
"""
Get all streams for a channel ordered by priority (lowest first),
with same-priority streams randomized.
Args:
channel_id: UUID of the channel to get streams for
Returns:
List of ChannelURL objects ordered by priority
"""
try:
# Get all streams for channel ordered by priority
streams = (
self.db.query(ChannelURL)
.filter(ChannelURL.channel_id == channel_id)
.order_by(ChannelURL.priority_id)
.all()
)
# Group streams by priority and randomize same-priority streams
grouped = {}
for stream in streams:
if stream.priority_id not in grouped:
grouped[stream.priority_id] = []
grouped[stream.priority_id].append(stream)
# Randomize same-priority streams and flatten
randomized_streams = []
for priority in sorted(grouped.keys()):
random.shuffle(grouped[priority])
randomized_streams.extend(grouped[priority])
return randomized_streams
except Exception as e:
logger.error(f"Error getting streams for channel {channel_id}: {str(e)}")
raise
def validate_and_select_stream(self, channel_id: str) -> Optional[str]:
"""
Find and validate a working stream for the given channel.
Args:
channel_id: UUID of the channel to find a stream for
Returns:
URL of the first working stream found, or None if none found
"""
try:
streams = self.get_streams_for_channel(channel_id)
if not streams:
logger.warning(f"No streams found for channel {channel_id}")
return None
working_stream = None
for stream in streams:
logger.info(f"Validating stream {stream.url} for channel {channel_id}")
is_valid, _ = self.validator.validate_stream(stream.url)
if is_valid:
working_stream = stream
break
if working_stream:
self._update_stream_status(working_stream, streams)
return working_stream.url
else:
logger.warning(f"No valid streams found for channel {channel_id}")
return None
except Exception as e:
logger.error(f"Error validating streams for channel {channel_id}: {str(e)}")
raise
def _update_stream_status(
self, working_stream: ChannelURL, all_streams: list[ChannelURL]
) -> None:
"""
Update in_use status for streams (True for working stream, False for others).
Args:
working_stream: The stream that was validated as working
all_streams: All streams for the channel
"""
try:
for stream in all_streams:
stream.in_use = stream.id == working_stream.id
self.db.commit()
logger.info(
f"Updated stream status - set in_use=True for {working_stream.url}"
)
except Exception as e:
self.db.rollback()
logger.error(f"Error updating stream status: {str(e)}")
raise
def __del__(self):
"""Close database session when StreamManager is destroyed."""
if hasattr(self, "db"):
self.db.close()
def get_working_stream(
channel_id: str, db_session: Optional[Session] = None
) -> Optional[str]:
"""
Convenience function to get a working stream for a channel.
Args:
channel_id: UUID of the channel to get a stream for
db_session: Optional SQLAlchemy session
Returns:
URL of the first working stream found, or None if none found
"""
manager = StreamManager(db_session)
try:
return manager.validate_and_select_stream(channel_id)
finally:
if db_session is None: # Only close if we created the session
manager.__del__()

View File

@@ -2,7 +2,8 @@ from fastapi import FastAPI
from fastapi.concurrency import asynccontextmanager
from fastapi.openapi.utils import get_openapi
from app.routers import auth, channels, groups, playlist, priorities
from app.iptv.scheduler import StreamScheduler
from app.routers import auth, channels, groups, playlist, priorities, scheduler
from app.utils.database import init_db
@@ -10,8 +11,17 @@ from app.utils.database import init_db
async def lifespan(app: FastAPI):
# Initialize database tables on startup
init_db()
# Initialize and start the stream scheduler
scheduler = StreamScheduler(app)
app.state.scheduler = scheduler # Store scheduler in app state
scheduler.start()
yield
# Shutdown scheduler on app shutdown
scheduler.shutdown()
app = FastAPI(
lifespan=lifespan,
@@ -69,3 +79,4 @@ app.include_router(channels.router)
app.include_router(playlist.router)
app.include_router(priorities.router)
app.include_router(groups.router)
app.include_router(scheduler.router)

View File

@@ -1,15 +1,156 @@
from fastapi import APIRouter, Depends
import logging
from enum import Enum
from typing import Optional
from uuid import uuid4
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.auth.dependencies import get_current_user
from app.iptv.stream_manager import StreamManager
from app.models.auth import CognitoUser
from app.utils.database import get_db_session
router = APIRouter(prefix="/playlist", tags=["playlist"])
logger = logging.getLogger(__name__)
# In-memory store for validation processes
validation_processes: dict[str, dict] = {}
@router.get("/protected", summary="Protected endpoint for authenticated users")
async def protected_route(user: CognitoUser = Depends(get_current_user)):
class ProcessStatus(str, Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
class StreamValidationRequest(BaseModel):
"""Request model for stream validation endpoint"""
channel_id: Optional[str] = None
class ValidatedStream(BaseModel):
"""Model for a validated working stream"""
channel_id: str
stream_url: str
class ValidationProcessResponse(BaseModel):
"""Response model for validation process initiation"""
process_id: str
status: ProcessStatus
message: str
class ValidationResultResponse(BaseModel):
"""Response model for validation results"""
process_id: str
status: ProcessStatus
working_streams: Optional[list[ValidatedStream]] = None
error: Optional[str] = None
def run_stream_validation(process_id: str, channel_id: Optional[str], db: Session):
"""Background task to validate streams"""
try:
validation_processes[process_id]["status"] = ProcessStatus.IN_PROGRESS
manager = StreamManager(db)
if channel_id:
stream_url = manager.validate_and_select_stream(channel_id)
if stream_url:
validation_processes[process_id]["result"] = {
"working_streams": [
ValidatedStream(channel_id=channel_id, stream_url=stream_url)
]
}
else:
validation_processes[process_id]["error"] = (
f"No working streams found for channel {channel_id}"
)
else:
# TODO: Implement validation for all channels
validation_processes[process_id]["error"] = (
"Validation of all channels not yet implemented"
)
validation_processes[process_id]["status"] = ProcessStatus.COMPLETED
except Exception as e:
logger.error(f"Error validating streams: {str(e)}")
validation_processes[process_id]["status"] = ProcessStatus.FAILED
validation_processes[process_id]["error"] = str(e)
@router.post(
"/validate-streams",
summary="Start stream validation process",
response_model=ValidationProcessResponse,
status_code=status.HTTP_202_ACCEPTED,
responses={202: {"description": "Validation process started successfully"}},
)
async def start_stream_validation(
request: StreamValidationRequest,
background_tasks: BackgroundTasks,
user: CognitoUser = Depends(get_current_user),
db: Session = Depends(get_db_session),
):
"""
Protected endpoint that requires authentication for all users.
If the user is authenticated, returns success message.
Start asynchronous validation of streams.
- Returns immediately with a process ID
- Use GET /validate-streams/{process_id} to check status
"""
return {"message": f"Hello {user.username}, you have access to support resources!"}
process_id = str(uuid4())
validation_processes[process_id] = {
"status": ProcessStatus.PENDING,
"channel_id": request.channel_id,
}
background_tasks.add_task(run_stream_validation, process_id, request.channel_id, db)
return {
"process_id": process_id,
"status": ProcessStatus.PENDING,
"message": "Validation process started",
}
@router.get(
"/validate-streams/{process_id}",
summary="Check validation process status",
response_model=ValidationResultResponse,
responses={
200: {"description": "Process status and results"},
404: {"description": "Process not found"},
},
)
async def get_validation_status(
process_id: str, user: CognitoUser = Depends(get_current_user)
):
"""
Check status of a stream validation process.
Returns current status and results if completed.
"""
if process_id not in validation_processes:
raise HTTPException(status_code=404, detail="Process not found")
process = validation_processes[process_id]
response = {"process_id": process_id, "status": process["status"]}
if process["status"] == ProcessStatus.COMPLETED:
if "error" in process:
response["error"] = process["error"]
else:
response["working_streams"] = process["result"]["working_streams"]
elif process["status"] == ProcessStatus.FAILED:
response["error"] = process["error"]
return response

57
app/routers/scheduler.py Normal file
View File

@@ -0,0 +1,57 @@
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from app.auth.dependencies import get_current_user, require_roles
from app.iptv.scheduler import StreamScheduler
from app.models.auth import CognitoUser
from app.utils.database import get_db
router = APIRouter(
prefix="/scheduler",
tags=["scheduler"],
responses={404: {"description": "Not found"}},
)
async def get_scheduler(request: Request) -> StreamScheduler:
"""Get the scheduler instance from the app state."""
if not hasattr(request.app.state.scheduler, "scheduler"):
raise HTTPException(status_code=500, detail="Scheduler not initialized")
return request.app.state.scheduler
@router.get("/health")
@require_roles("admin")
def scheduler_health(
scheduler: StreamScheduler = Depends(get_scheduler),
user: CognitoUser = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Check scheduler health status (admin only)."""
try:
job = scheduler.scheduler.get_job("daily_stream_validation")
next_run = str(job.next_run_time) if job and job.next_run_time else None
return {
"status": "running" if scheduler.scheduler.running else "stopped",
"next_run": next_run,
}
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to check scheduler health: {str(e)}"
)
@router.post("/trigger")
@require_roles("admin")
def trigger_validation(
scheduler: StreamScheduler = Depends(get_scheduler),
user: CognitoUser = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Manually trigger stream validation (admin only)."""
scheduler.trigger_manual_validation()
return JSONResponse(
status_code=202, content={"message": "Stream validation triggered"}
)

View File

@@ -1,6 +1,7 @@
import os
import boto3
from requests import Session
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
@@ -53,3 +54,8 @@ def get_db():
yield db
finally:
db.close()
def get_db_session() -> Session:
"""Get a direct database session (non-generator version)"""
return SessionLocal()