diff --git a/.vscode/settings.json b/.vscode/settings.json index 803fd13..796ad9f 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,9 +2,11 @@ "cSpell.words": [ "dotenv", "fout", + "levelname", "mpegts", "Referer", "ringbuffer", - "streamlink" + "streamlink", + "uvicorn" ] } \ No newline at end of file diff --git a/Dockerfile_HF b/Dockerfile_HF index 50ecba4..0d3a6b2 100644 --- a/Dockerfile_HF +++ b/Dockerfile_HF @@ -17,4 +17,4 @@ RUN git clone https://git.fiorinis.com/Home/streamlink-server.git . RUN pip install --no-cache-dir -r ./src/requirements.txt EXPOSE 7860 -CMD ["uvicorn", "run:main_app", "--host", "0.0.0.0", "--reload", "--port", "7860", "--workers", "4"] \ No newline at end of file +CMD ["uvicorn", "src.stream_link_server:app", "--host", "0.0.0.0", "--reload", "--port", "7860", "--workers", "4"] \ No newline at end of file diff --git a/run.py b/run.py index 690d9ee..0f78832 100644 --- a/run.py +++ b/run.py @@ -1,21 +1,19 @@ +import logging import os import sys # Add the src directory to Python path sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src")) -from fastapi import FastAPI -from src.stream_link_server import app as streamlink_server_app +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) -# Initialize the main FastAPI application -main_app = FastAPI() - -# Manually add only non-static routes from streamlink_server_app -for route in streamlink_server_app.routes: - if route.path != "/": # Exclude the static file path - main_app.router.routes.append(route) - -# Run the main app if __name__ == "__main__": import uvicorn - uvicorn.run(main_app, host="0.0.0.0", port=8080) \ No newline at end of file + uvicorn.run( + "src.stream_link_server:app", + host="0.0.0.0", + port=8080, + reload=True, + workers=2 + ) \ No newline at end of file diff --git a/src/routes/__init__.py b/src/routes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/routes/stream.py b/src/routes/stream.py new file mode 100644 index 0000000..4c3ecc8 --- /dev/null +++ b/src/routes/stream.py @@ -0,0 +1,93 @@ + + +from typing import Optional +from fastapi import Depends, HTTPException, APIRouter, Request +from fastapi.responses import StreamingResponse +from utils.file_utils import load_channels +from utils.stream_utils import generate_streamlink_process, stream_generator + +stream_router = APIRouter() + +def get_data_dir(request: Request) -> str: + try: + return request.app.state.DATA_DIR + except AttributeError: + raise HTTPException( + status_code=500, + detail="Server configuration error: DATA_DIR not initialized" + ) + +def get_channels_file_name(request: Request) -> str: + try: + return request.app.state.CHANNELS_FILE_NAME + except AttributeError: + raise HTTPException( + status_code=500, + detail="Server configuration error: CHANNELS_FILE_NAME not initialized" + ) + +@stream_router.get("/url") +async def stream_custom( + url: str, + origin: Optional[str] = None, + referer: Optional[str] = None, + agent: Optional[str] = None, + proxy: Optional[str] = None +): + """Stream directly from query parameters""" + headers = {} + if origin: + headers['Origin'] = origin + if referer: + headers['Referer'] = referer + if agent: + headers['User-Agent'] = agent + + try: + process = await generate_streamlink_process( + url, + headers if headers else None, + proxy if proxy else None + ) + return StreamingResponse( + stream_generator(process, url, headers, proxy), + media_type='video/mp2t' + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error starting Streamlink: {str(e)}") + +@stream_router.get("/{channel_id}") +async def stream_channel( + channel_id: str, + data_dir: str = Depends(get_data_dir), + channels_file_name: str = Depends(get_channels_file_name) + ): + + """Stream a channel by ID""" + channels = load_channels(data_dir, channels_file_name) + if channel_id not in channels: + raise HTTPException(status_code=404, detail="Channel not found") + + channel = channels[channel_id] + url = channel['url'] + + # Build headers dict only including specified headers + headers = {} + if 'origin' in channel: + headers['Origin'] = channel['origin'] + if 'referer' in channel: + headers['Referer'] = channel['referer'] + if 'agent' in channel: + headers['User-Agent'] = channel['agent'] + + # Get proxy if specified + proxy = channel.get('proxy') + + try: + process = await generate_streamlink_process(url, headers if headers else None, proxy if proxy else None) + return StreamingResponse( + stream_generator(process, url, headers if headers else None, proxy if proxy else None), + media_type='video/mp2t' + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error starting Streamlink: {str(e)}") \ No newline at end of file diff --git a/src/routes/utils.py b/src/routes/utils.py new file mode 100644 index 0000000..9276321 --- /dev/null +++ b/src/routes/utils.py @@ -0,0 +1,66 @@ + + +from fastapi import Depends, APIRouter, Request, HTTPException +from typing import Dict, List +from schema import GenerateUrlRequest +from utils.file_utils import load_channels, verify_credentials +from utils.http_utils import encode_streamlink_server_url +from utils.stream_utils import generate_streamlink_process, stream_generator + +utils_router = APIRouter() + +def get_data_dir(request: Request) -> str: + try: + return request.app.state.DATA_DIR + except AttributeError: + raise HTTPException( + status_code=500, + detail="Server configuration error: DATA_DIR not initialized" + ) + +def get_channels_file_name(request: Request) -> str: + try: + return request.app.state.CHANNELS_FILE_NAME + except AttributeError: + raise HTTPException( + status_code=500, + detail="Server configuration error: CHANNELS_FILE_NAME not initialized" + ) + +@utils_router.get("/channels", response_model=List[Dict]) +async def list_channels( + data_dir: str = Depends(get_data_dir), + channels_file_name: str = Depends(get_channels_file_name)): + + """List all available channels""" + channels = load_channels(data_dir, channels_file_name) + return [{ + 'id': c['id'], + 'name': c['name'] + } for c in channels.values()] + +@utils_router.post( + "/generate_url", + description="Generate a single encoded URL", + response_description="Returns a single encoded URL", +) +async def generate_url( + request: GenerateUrlRequest, + http_request: Request, + credentials: tuple = Depends(verify_credentials)): + """Generate a single encoded URL based on the provided request.""" + + username, password = credentials + + encoded_url = encode_streamlink_server_url( + username=username, + password=password, + base_url=str(http_request.base_url), + stream_url=request.stream_url, + endpoint=request.endpoint, + agent=request.agent, + proxy_url=request.proxy_url, + request_headers=request.request_headers, + ) + + return {"url": encoded_url} \ No newline at end of file diff --git a/src/static/index.html b/src/static/index.html new file mode 100644 index 0000000..ea185a0 --- /dev/null +++ b/src/static/index.html @@ -0,0 +1,76 @@ + + + + + + MediaFlow Proxy + + + + +
+ MediaFlow Proxy Logo +

MediaFlow Proxy

+
+

A high-performance proxy server for streaming media, supporting HTTP(S), HLS, and MPEG-DASH with real-time DRM decryption.

+ +

Key Features

+
Convert MPEG-DASH streams (DRM-protected and non-protected) to HLS
+
Support for Clear Key DRM-protected MPD DASH streams
+
Handle both live and video-on-demand (VOD) DASH streams
+
Proxy HTTP/HTTPS links with custom headers
+
Proxy and modify HLS (M3U8) streams in real-time with custom headers and key URL modifications for bypassing some sneaky restrictions.
+
Protect against unauthorized access and network bandwidth abuses
+ +

Getting Started

+

Visit the GitHub repository for installation instructions and documentation.

+ +

Premium Hosted Service

+

For a hassle-free experience, check out premium hosted service on ElfHosted.

+ +

API Documentation

+

Explore the Swagger UI for comprehensive details about the API endpoints and their usage.

+ + + \ No newline at end of file diff --git a/src/static/logo.png b/src/static/logo.png new file mode 100644 index 0000000..193475c Binary files /dev/null and b/src/static/logo.png differ diff --git a/src/stream_link_server.py b/src/stream_link_server.py index 46f470b..e6a5031 100644 --- a/src/stream_link_server.py +++ b/src/stream_link_server.py @@ -1,210 +1,55 @@ -import asyncio +from contextlib import asynccontextmanager import uvicorn -import subprocess -import json import os -from asyncio import create_subprocess_exec, subprocess -from asyncio.subprocess import Process -from fastapi import Depends, FastAPI, HTTPException, Request -from fastapi.responses import StreamingResponse +import logging +from fastapi import Depends, FastAPI from dotenv import load_dotenv -from typing import Dict, List, Optional -from schema import GenerateUrlRequest -from utils.http_utils import encode_streamlink_server_url +from routes.stream import stream_router +from routes.utils import utils_router +from utils.file_utils import load_channels, verify_credentials +from starlette.responses import RedirectResponse +from starlette.staticfiles import StaticFiles +from importlib import resources -# Load environment variables -load_dotenv() +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) -app = FastAPI(title="Streamlink Server") - -AUTH_USERNAME = os.getenv("AUTH_USERNAME") -AUTH_PASSWORD = os.getenv("AUTH_PASSWORD") DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'data') +CHANNELS_FILE_NAME = 'channels.json' -def verify_credentials(username: Optional[str] = None, password: Optional[str] = None): - if not username or not password: - raise HTTPException(status_code=401, detail="Authentication required") +@asynccontextmanager +async def lifespan(app: FastAPI): + if not os.path.exists(DATA_DIR): + os.makedirs(DATA_DIR) - if username != AUTH_USERNAME or password != AUTH_PASSWORD: - raise HTTPException(status_code=403, detail="Invalid credentials") + app.state.DATA_DIR = DATA_DIR + app.state.CHANNELS_FILE_NAME = CHANNELS_FILE_NAME - return username, password + logger.info(f"Initialized app state with DATA_DIR: {DATA_DIR}") + yield + logger.info("Application shutting down...") -# Load channels configuration -def load_channels(): - """Load channels configuration from JSON file""" - channels_path = os.path.join(DATA_DIR, 'channels.json') - with open(channels_path, 'r') as f: - return {str(c['id']): c for c in json.load(f)['channels']} +app = FastAPI(title="Streamlink Server", lifespan=lifespan) -async def generate_streamlink_process(url, headers=None, proxy=None) -> Process: - """ - Run Streamlink as an async subprocess and pipe its output to the response. - Args: - url: Stream URL - headers: Optional dict of HTTP headers - """ - cmd = [ - 'streamlink', - '--ffmpeg-fout', 'mpegts', - '--hls-live-restart', - '--retry-streams', '3', - '--stream-timeout', '60', - '--hls-playlist-reload-attempts', '3', - '--stream-segment-threads', '3', - '--ringbuffer-size', '32M', - ] +@app.get("/health") +async def health_check(): + return {"status": "healthy"} - # Add proxy if specified - if proxy: - cmd.extend(['--http-proxy', proxy]) +@app.get("/favicon.ico") +async def get_favicon(): + return RedirectResponse(url="/logo.png") - # Add headers if specified - if headers: - for key, value in headers.items(): - cmd.extend(['--http-header', f'{key}={value}']) - - cmd.extend(['--stdout', url, 'best']) +app.include_router(stream_router, prefix="/stream", tags=["stream"], dependencies=[Depends(verify_credentials)]) +app.include_router(utils_router, prefix="/utils", tags=["utils"], dependencies=[Depends(verify_credentials)]) - process = await create_subprocess_exec( - *cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE - ) - return process - -async def stream_generator(process: Process, url: str, headers=None, proxy=None): - """Generate streaming content asynchronously""" - CHUNK_SIZE = 32768 - try: - while True: - if process.returncode is not None: - # Process has terminated, restart it - process = await generate_streamlink_process(url, headers, proxy) - continue - - try: - output = await process.stdout.read(CHUNK_SIZE) - if output: - yield output - else: - # No output but process still running, wait briefly - await asyncio.sleep(0.1) - except Exception as e: - print(f"Error reading stream: {e}") - try: - process.terminate() - except: - pass - process = await generate_streamlink_process(url, headers, proxy) - finally: - try: - process.terminate() - except: - pass - -@app.post( - "/generate_url", - description="Generate a single encoded URL", - response_description="Returns a single encoded URL", -) -async def generate_url( - request: GenerateUrlRequest, - http_request: Request, - credentials: tuple = Depends(verify_credentials)): - """Generate a single encoded URL based on the provided request.""" - - username, password = credentials - - encoded_url = encode_streamlink_server_url( - username=username, - password=password, - base_url=str(http_request.base_url), - stream_url=request.stream_url, - endpoint=request.endpoint, - agent=request.agent, - proxy_url=request.proxy_url, - request_headers=request.request_headers, - ) - - return {"url": encoded_url} - -@app.get("/channels", response_model=List[Dict]) -async def list_channels(credentials: tuple = Depends(verify_credentials)): - """List all available channels""" - channels = load_channels() - return [{ - 'id': c['id'], - 'name': c['name'] - } for c in channels.values()] - -@app.get("/stream") -async def stream_custom( - url: str, - origin: Optional[str] = None, - referer: Optional[str] = None, - agent: Optional[str] = None, - proxy: Optional[str] = None, - credentials: tuple = Depends(verify_credentials) -): - """Stream directly from query parameters""" - headers = {} - if origin: - headers['Origin'] = origin - if referer: - headers['Referer'] = referer - if agent: - headers['User-Agent'] = agent - - try: - process = await generate_streamlink_process( - url, - headers if headers else None, - proxy if proxy else None - ) - return StreamingResponse( - stream_generator(process, url, headers, proxy), - media_type='video/mp2t' - ) - except Exception as e: - raise HTTPException(status_code=500, detail=f"Error starting Streamlink: {str(e)}") - -@app.get("/{channel_id}") -async def stream_channel(channel_id: str, credentials: tuple = Depends(verify_credentials)): - """Stream a channel by ID""" - channels = load_channels() - if channel_id not in channels: - raise HTTPException(status_code=404, detail="Channel not found") - - channel = channels[channel_id] - url = channel['url'] - - # Build headers dict only including specified headers - headers = {} - if 'origin' in channel: - headers['Origin'] = channel['origin'] - if 'referer' in channel: - headers['Referer'] = channel['referer'] - if 'agent' in channel: - headers['User-Agent'] = channel['agent'] - - # Get proxy if specified - proxy = channel.get('proxy') - - try: - process = await generate_streamlink_process(url, headers if headers else None, proxy if proxy else None) - return StreamingResponse( - stream_generator(process, url, headers if headers else None, proxy if proxy else None), - media_type='video/mp2t' - ) - except Exception as e: - raise HTTPException(status_code=500, detail=f"Error starting Streamlink: {str(e)}") +static_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static") +app.mount("/", StaticFiles(directory=static_path, html=True), name="static") if __name__ == "__main__": uvicorn.run( "stream_link_server:app", host="0.0.0.0", - port=6090, + port=8080, reload=True, workers=2 ) diff --git a/src/utils/file_utils.py b/src/utils/file_utils.py new file mode 100644 index 0000000..54f565d --- /dev/null +++ b/src/utils/file_utils.py @@ -0,0 +1,28 @@ +import json +import os +from typing import Optional +from dotenv import load_dotenv +from fastapi import Depends, HTTPException + +# Load environment variables +load_dotenv() + +AUTH_USERNAME = os.getenv("AUTH_USERNAME") +AUTH_PASSWORD = os.getenv("AUTH_PASSWORD") + +# Credentials verification +def verify_credentials(username: Optional[str] = None, password: Optional[str] = None): + if not username or not password: + raise HTTPException(status_code=401, detail="Authentication required") + + if username != AUTH_USERNAME or password != AUTH_PASSWORD: + raise HTTPException(status_code=403, detail="Invalid credentials") + + return username, password + +# Load channels configuration +def load_channels(data_dir: str = 'data', filename: str = 'channels.json') -> dict: + """Load channels configuration from JSON file""" + channels_path = os.path.join(data_dir, filename) + with open(channels_path, 'r') as f: + return {str(c['id']): c for c in json.load(f)['channels']} \ No newline at end of file diff --git a/src/utils/stream_utils.py b/src/utils/stream_utils.py new file mode 100644 index 0000000..45c4de7 --- /dev/null +++ b/src/utils/stream_utils.py @@ -0,0 +1,70 @@ + +import asyncio +from asyncio import create_subprocess_exec, subprocess +from asyncio.subprocess import Process + +async def stream_generator(process: Process, url: str, headers=None, proxy=None): + """Generate streaming content asynchronously""" + CHUNK_SIZE = 32768 + try: + while True: + if process.returncode is not None: + # Process has terminated, restart it + process = await generate_streamlink_process(url, headers, proxy) + continue + + try: + output = await process.stdout.read(CHUNK_SIZE) + if output: + yield output + else: + # No output but process still running, wait briefly + await asyncio.sleep(0.1) + except Exception as e: + print(f"Error reading stream: {e}") + try: + process.terminate() + except: + pass + process = await generate_streamlink_process(url, headers, proxy) + finally: + try: + process.terminate() + except: + pass + +async def generate_streamlink_process(url, headers=None, proxy=None) -> Process: + """ + Run Streamlink as an async subprocess and pipe its output to the response. + Args: + url: Stream URL + headers: Optional dict of HTTP headers + """ + cmd = [ + 'streamlink', + '--ffmpeg-fout', 'mpegts', + '--hls-live-restart', + '--retry-streams', '3', + '--stream-timeout', '60', + '--hls-playlist-reload-attempts', '3', + '--stream-segment-threads', '3', + '--ringbuffer-size', '32M', + ] + + # Add proxy if specified + if proxy: + cmd.extend(['--http-proxy', proxy]) + + # Add headers if specified + if headers: + for key, value in headers.items(): + cmd.extend(['--http-header', f'{key}={value}']) + + cmd.extend(['--stdout', url, 'best']) + + process = await create_subprocess_exec( + *cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + return process \ No newline at end of file