Files
streamlink-server/src/routes/utils.py

66 lines
2.0 KiB
Python

from fastapi import Depends, APIRouter, Request, HTTPException
from typing import Dict, List
from schema import GenerateUrlRequest
from utils.file_utils import load_channels, verify_credentials
from utils.http_utils import encode_streamlink_server_url
from utils.stream_utils import generate_streamlink_process, stream_generator
utils_router = APIRouter()
def get_data_dir(request: Request) -> str:
try:
return request.app.state.DATA_DIR
except AttributeError:
raise HTTPException(
status_code=500,
detail="Server configuration error: DATA_DIR not initialized"
)
def get_channels_file_name(request: Request) -> str:
try:
return request.app.state.CHANNELS_FILE_NAME
except AttributeError:
raise HTTPException(
status_code=500,
detail="Server configuration error: CHANNELS_FILE_NAME not initialized"
)
@utils_router.get("/channels", response_model=List[Dict])
async def list_channels(
data_dir: str = Depends(get_data_dir),
channels_file_name: str = Depends(get_channels_file_name)):
"""List all available channels"""
channels = load_channels(data_dir, channels_file_name)
return [{
'id': c['id'],
'name': c['name']
} for c in channels.values()]
@utils_router.post(
"/generate_url",
description="Generate a single encoded URL",
response_description="Returns a single encoded URL",
)
async def generate_url(
request: GenerateUrlRequest,
http_request: Request,
credentials: tuple = Depends(verify_credentials)):
"""Generate a single encoded URL based on the provided request."""
username, password = credentials
encoded_url = encode_streamlink_server_url(
username=username,
password=password,
base_url=str(http_request.base_url),
stream_url=request.stream_url,
endpoint=request.endpoint,
agent=request.agent,
proxy_url=request.proxy_url,
request_headers=request.request_headers,
)
return {"url": encoded_url}