Added generate_url and stream endpoints.

This commit is contained in:
2025-05-04 00:58:28 -05:00
parent 55f89eb5a8
commit 5b18c3306d
5 changed files with 173 additions and 8 deletions

10
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,10 @@
{
"cSpell.words": [
"dotenv",
"fout",
"mpegts",
"Referer",
"ringbuffer",
"streamlink"
]
}

10
src/schema.py Normal file
View File

@@ -0,0 +1,10 @@
from typing import Literal, Dict, Any, Optional
from pydantic import BaseModel, Field, IPvAnyAddress, ConfigDict
class GenerateUrlRequest(BaseModel):
endpoint: Optional[str] = Field(None, description="The specific endpoint to be appended to the base URL.")
stream_url: str = Field(None, description="The URL of the stream.")
agent: Optional[str] = Field(None, description="User-Agent string to be used in the request.")
proxy_url: Optional[str] = Field(None, description="Proxy URL to be used.")
request_headers: Optional[dict] = Field(default_factory=dict, description="Headers to be included in the request.")

View File

@@ -1,14 +1,16 @@
import asyncio
from asyncio import create_subprocess_exec, subprocess
from asyncio.subprocess import Process
from fastapi import Depends, FastAPI, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
from dotenv import load_dotenv
import uvicorn
import subprocess
import json
import os
from asyncio import create_subprocess_exec, subprocess
from asyncio.subprocess import Process
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse
from dotenv import load_dotenv
from typing import Dict, List, Optional
from schema import GenerateUrlRequest
from utils.http_utils import encode_streamlink_server_url
# Load environment variables
load_dotenv()
@@ -26,7 +28,7 @@ def verify_credentials(username: Optional[str] = None, password: Optional[str] =
if username != AUTH_USERNAME or password != AUTH_PASSWORD:
raise HTTPException(status_code=403, detail="Invalid credentials")
return True
return username, password
# Load channels configuration
def load_channels():
@@ -101,8 +103,34 @@ async def stream_generator(process: Process, url: str, headers=None, proxy=None)
except:
pass
@app.post(
"/generate_url",
description="Generate a single encoded URL",
response_description="Returns a single encoded URL",
)
async def generate_url(
request: GenerateUrlRequest,
http_request: Request,
credentials: tuple = Depends(verify_credentials)):
"""Generate a single encoded URL based on the provided request."""
username, password = credentials
encoded_url = encode_streamlink_server_url(
username=username,
password=password,
base_url=str(http_request.base_url),
stream_url=request.stream_url,
endpoint=request.endpoint,
agent=request.agent,
proxy_url=request.proxy_url,
request_headers=request.request_headers,
)
return {"url": encoded_url}
@app.get("/channels", response_model=List[Dict])
async def list_channels(auth: bool = Depends(verify_credentials)):
async def list_channels(credentials: tuple = Depends(verify_credentials)):
"""List all available channels"""
channels = load_channels()
return [{
@@ -110,8 +138,39 @@ async def list_channels(auth: bool = Depends(verify_credentials)):
'name': c['name']
} for c in channels.values()]
@app.get("/stream")
async def stream_custom(
url: str,
origin: Optional[str] = None,
referer: Optional[str] = None,
agent: Optional[str] = None,
proxy: Optional[str] = None,
credentials: tuple = Depends(verify_credentials)
):
"""Stream directly from query parameters"""
headers = {}
if origin:
headers['Origin'] = origin
if referer:
headers['Referer'] = referer
if agent:
headers['User-Agent'] = agent
try:
process = await generate_streamlink_process(
url,
headers if headers else None,
proxy if proxy else None
)
return StreamingResponse(
stream_generator(process, url, headers, proxy),
media_type='video/mp2t'
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error starting Streamlink: {str(e)}")
@app.get("/{channel_id}")
async def stream_channel(channel_id: str, auth: bool = Depends(verify_credentials)):
async def stream_channel(channel_id: str, credentials: tuple = Depends(verify_credentials)):
"""Stream a channel by ID"""
channels = load_channels()
if channel_id not in channels:

0
src/utils/__init__.py Normal file
View File

86
src/utils/http_utils.py Normal file
View File

@@ -0,0 +1,86 @@
import typing
from urllib import parse
from urllib.parse import urlencode
def encode_streamlink_server_url(
username: str,
password: str,
base_url: str,
stream_url: str,
endpoint: typing.Optional[str] = None,
agent: typing.Optional[str] = None,
proxy_url: typing.Optional[str] = None,
request_headers: typing.Optional[dict] = None,
) -> str:
"""
Constructs a Streamlink server URL with authentication and query parameters.
Creates a properly encoded URL for accessing Streamlink server endpoints with
embedded credentials and streaming parameters. All parameters are URL-encoded
and appended as query string parameters.
Args:
username: Authentication username for Streamlink server
password: Authentication password for Streamlink server
base_url: Base URL of the Streamlink server (e.g., 'http://localhost:6090')
stream_url: Target stream URL to be proxied through Streamlink
endpoint: Optional endpoint path to append to base URL (e.g., '/stream')
agent: User-Agent header value for the streaming request
proxy_url: Proxy server URL to use for the stream connection
request_headers: Dictionary of HTTP headers to forward as query parameters
Returns:
str: Fully constructed URL with encoded query parameters in the format:
{base_url}[/{endpoint}]?username=X&password=X&url=X&[params...]
Example:
>>> encode_streamlink_server_url(
... "user", "pass", "http://localhost:6090", "https://example.com/stream.m3u8",
... endpoint="stream", agent="MyApp/1.0", proxy_url="http://proxy:3128",
... request_headers={"Referer": "https://example.com"}
... )
'http://localhost:6090/stream?username=user&password=pass&url=https%3A%2F%2Fexample.com%2Fstream.m3u8&agent=MyApp%2F1.0&proxy=http%3A%2F%2Fproxy%3A3128&Referer=https%3A%2F%2Fexample.com'
"""
# Construct the base URL
if endpoint is None:
url = base_url
else:
url = parse.urljoin(base_url, endpoint)
# Ensure url doesn't end with a slash for consistent handling
if url.endswith("/"):
url = url[:-1]
# Prepare query parameters
query_params = {}
# Username
if username is not None:
query_params["username"] = username
# Password
if password is not None:
query_params["password"] = password
# Stream URL
if stream_url is not None:
query_params["url"] = stream_url
# Agent
if agent is not None:
query_params["agent"] = agent
# Proxy URL
if proxy_url is not None:
query_params["proxy"] = proxy_url
# Add headers if provided
if request_headers:
query_params.update(
{key: value for key, value in request_headers.items()}
)
if query_params:
return f"{url}?{urlencode(query_params)}"
return url