From 323ca2d1b6cec746b42bd1c0d8760b4580567096 Mon Sep 17 00:00:00 2001 From: UrloMythus Date: Tue, 29 Apr 2025 18:52:23 +0200 Subject: [PATCH] Updated to newest version --- mediaflow_proxy/configs.py | 1 + mediaflow_proxy/handlers.py | 65 ++++++---- mediaflow_proxy/main.py | 103 ++++++++++++--- mediaflow_proxy/routes.py | 164 ------------------------ mediaflow_proxy/routes/proxy.py | 27 +++- mediaflow_proxy/schemas.py | 38 +++++- mediaflow_proxy/utils/cache_utils.py | 2 +- mediaflow_proxy/utils/crypto_utils.py | 55 +++++++- mediaflow_proxy/utils/http_utils.py | 72 +++++++++-- mediaflow_proxy/utils/m3u8_processor.py | 66 ++++++++++ mediaflow_proxy/utils/mpd_utils.py | 2 +- 11 files changed, 358 insertions(+), 237 deletions(-) delete mode 100644 mediaflow_proxy/routes.py diff --git a/mediaflow_proxy/configs.py b/mediaflow_proxy/configs.py index 7d24dba..391ad6f 100644 --- a/mediaflow_proxy/configs.py +++ b/mediaflow_proxy/configs.py @@ -23,6 +23,7 @@ class TransportConfig(BaseSettings): transport_routes: Dict[str, RouteConfig] = Field( default_factory=dict, description="Pattern-based route configuration" ) + timeout: int = Field(30, description="Timeout for HTTP requests in seconds") def get_mounts( self, async_http: bool = True diff --git a/mediaflow_proxy/handlers.py b/mediaflow_proxy/handlers.py index 957eb51..882e975 100644 --- a/mediaflow_proxy/handlers.py +++ b/mediaflow_proxy/handlers.py @@ -1,14 +1,15 @@ import base64 import logging -from urllib.parse import urlparse +from urllib.parse import urlparse, parse_qs import httpx +import tenacity from fastapi import Request, Response, HTTPException from starlette.background import BackgroundTask from .const import SUPPORTED_RESPONSE_HEADERS from .mpd_processor import process_manifest, process_playlist, process_segment -from .schemas import HLSManifestParams, ProxyStreamParams, MPDManifestParams, MPDPlaylistParams, MPDSegmentParams +from .schemas import HLSManifestParams, MPDManifestParams, MPDPlaylistParams, MPDSegmentParams from .utils.cache_utils import get_cached_mpd, get_cached_init_segment from .utils.http_utils import ( Streamer, @@ -52,6 +53,8 @@ def handle_exceptions(exception: Exception) -> Response: elif isinstance(exception, DownloadError): logger.error(f"Error downloading content: {exception}") return Response(status_code=exception.status_code, content=str(exception)) + elif isinstance(exception, tenacity.RetryError): + return Response(status_code=502, content="Max retries exceeded while downloading content") else: logger.exception(f"Internal server error while handling request: {exception}") return Response(status_code=502, content=f"Internal server error: {exception}") @@ -74,32 +77,32 @@ async def handle_hls_stream_proxy( Union[Response, EnhancedStreamingResponse]: Either a processed m3u8 playlist or a streaming response. """ client, streamer = await setup_client_and_streamer() + # Handle range requests + content_range = proxy_headers.request.get("range", "bytes=0-") + if "NaN" in content_range: + # Handle invalid range requests "bytes=NaN-NaN" + raise HTTPException(status_code=416, detail="Invalid Range Header") + proxy_headers.request.update({"range": content_range}) try: - if urlparse(hls_params.destination).path.endswith((".m3u", ".m3u8")): + parsed_url = urlparse(hls_params.destination) + # Check if the URL is a valid m3u8 playlist or m3u file + if parsed_url.path.endswith((".m3u", ".m3u8", ".m3u_plus")) or parse_qs(parsed_url.query).get("type", [""])[ + 0 + ] in ["m3u", "m3u8", "m3u_plus"]: return await fetch_and_process_m3u8( streamer, hls_params.destination, proxy_headers, request, hls_params.key_url ) # Create initial streaming response to check content type await streamer.create_streaming_response(hls_params.destination, proxy_headers.request) + response_headers = prepare_response_headers(streamer.response.headers, proxy_headers.response) - if "mpegurl" in streamer.response.headers.get("content-type", "").lower(): + if "mpegurl" in response_headers.get("content-type", "").lower(): return await fetch_and_process_m3u8( streamer, hls_params.destination, proxy_headers, request, hls_params.key_url ) - # Handle range requests - content_range = proxy_headers.request.get("range", "bytes=0-") - if "NaN" in content_range: - # Handle invalid range requests "bytes=NaN-NaN" - raise HTTPException(status_code=416, detail="Invalid Range Header") - proxy_headers.request.update({"range": content_range}) - - # Create new streaming response with updated headers - await streamer.create_streaming_response(hls_params.destination, proxy_headers.request) - response_headers = prepare_response_headers(streamer.response.headers, proxy_headers.response) - return EnhancedStreamingResponse( streamer.stream_content(), status_code=streamer.response.status_code, @@ -171,26 +174,26 @@ def prepare_response_headers(original_headers, proxy_response_headers) -> dict: return response_headers -async def proxy_stream(method: str, stream_params: ProxyStreamParams, proxy_headers: ProxyRequestHeaders): +async def proxy_stream(method: str, destination: str, proxy_headers: ProxyRequestHeaders): """ Proxies the stream request to the given video URL. Args: method (str): The HTTP method (e.g., GET, HEAD). - stream_params (ProxyStreamParams): The parameters for the stream request. + destination (str): The URL of the stream to be proxied. proxy_headers (ProxyRequestHeaders): The headers to include in the request. Returns: Response: The HTTP response with the streamed content. """ - return await handle_stream_request(method, stream_params.destination, proxy_headers) + return await handle_stream_request(method, destination, proxy_headers) async def fetch_and_process_m3u8( streamer: Streamer, url: str, proxy_headers: ProxyRequestHeaders, request: Request, key_url: str = None ): """ - Fetches and processes the m3u8 playlist, converting it to an HLS playlist. + Fetches and processes the m3u8 playlist on-the-fly, converting it to an HLS playlist. Args: streamer (Streamer): The HTTP client to use for streaming. @@ -203,20 +206,28 @@ async def fetch_and_process_m3u8( Response: The HTTP response with the processed m3u8 playlist. """ try: - content = await streamer.get_text(url, proxy_headers.request) + # Create streaming response if not already created + if not streamer.response: + await streamer.create_streaming_response(url, proxy_headers.request) + + # Initialize processor and response headers processor = M3U8Processor(request, key_url) - processed_content = await processor.process_m3u8(content, str(streamer.response.url)) - response_headers = {"Content-Disposition": "inline", "Accept-Ranges": "none"} + response_headers = { + "Content-Disposition": "inline", + "Accept-Ranges": "none", + "Content-Type": "application/vnd.apple.mpegurl", + } response_headers.update(proxy_headers.response) - return Response( - content=processed_content, - media_type="application/vnd.apple.mpegurl", + + # Create streaming response with on-the-fly processing + return EnhancedStreamingResponse( + processor.process_m3u8_streaming(streamer.stream_content(), str(streamer.response.url)), headers=response_headers, + background=BackgroundTask(streamer.close), ) except Exception as e: - return handle_exceptions(e) - finally: await streamer.close() + return handle_exceptions(e) async def handle_drm_key_data(key_id, key, drm_info): diff --git a/mediaflow_proxy/main.py b/mediaflow_proxy/main.py index 9b45b81..4fe19c2 100644 --- a/mediaflow_proxy/main.py +++ b/mediaflow_proxy/main.py @@ -1,3 +1,4 @@ +import asyncio import logging from importlib import resources @@ -9,7 +10,7 @@ from starlette.staticfiles import StaticFiles from mediaflow_proxy.configs import settings from mediaflow_proxy.routes import proxy_router, extractor_router, speedtest_router -from mediaflow_proxy.schemas import GenerateUrlRequest +from mediaflow_proxy.schemas import GenerateUrlRequest, GenerateMultiUrlRequest, MultiUrlRequestItem from mediaflow_proxy.utils.crypto_utils import EncryptionHandler, EncryptionMiddleware from mediaflow_proxy.utils.http_utils import encode_mediaflow_proxy_url @@ -62,23 +63,95 @@ async def show_speedtest_page(): return RedirectResponse(url="/speedtest.html") -@app.post("/generate_encrypted_or_encoded_url") -async def generate_encrypted_or_encoded_url(request: GenerateUrlRequest): - if "api_password" not in request.query_params: - request.query_params["api_password"] = request.api_password +@app.post( + "/generate_encrypted_or_encoded_url", + description="Generate a single encoded URL", + response_description="Returns a single encoded URL", + deprecated=True, + tags=["url"], +) +async def generate_encrypted_or_encoded_url( + request: GenerateUrlRequest, +): + """ + Generate a single encoded URL based on the provided request. + """ + return {"encoded_url": (await generate_url(request))["url"]} + + +@app.post( + "/generate_url", + description="Generate a single encoded URL", + response_description="Returns a single encoded URL", + tags=["url"], +) +async def generate_url(request: GenerateUrlRequest): + """Generate a single encoded URL based on the provided request.""" + encryption_handler = EncryptionHandler(request.api_password) if request.api_password else None + + # Ensure api_password is in query_params if provided + query_params = request.query_params.copy() + if "api_password" not in query_params and request.api_password: + query_params["api_password"] = request.api_password + + # Convert IP to string if provided + ip_str = str(request.ip) if request.ip else None encoded_url = encode_mediaflow_proxy_url( - request.mediaflow_proxy_url, - request.endpoint, - request.destination_url, - request.query_params, - request.request_headers, - request.response_headers, - EncryptionHandler(request.api_password) if request.api_password else None, - request.expiration, - str(request.ip) if request.ip else None, + mediaflow_proxy_url=request.mediaflow_proxy_url, + endpoint=request.endpoint, + destination_url=request.destination_url, + query_params=query_params, + request_headers=request.request_headers, + response_headers=request.response_headers, + encryption_handler=encryption_handler, + expiration=request.expiration, + ip=ip_str, + filename=request.filename, ) - return {"encoded_url": encoded_url} + + return {"url": encoded_url} + + +@app.post( + "/generate_urls", + description="Generate multiple encoded URLs with shared common parameters", + response_description="Returns a list of encoded URLs", + tags=["url"], +) +async def generate_urls(request: GenerateMultiUrlRequest): + """Generate multiple encoded URLs with shared common parameters.""" + # Set up encryption handler if password is provided + encryption_handler = EncryptionHandler(request.api_password) if request.api_password else None + + # Convert IP to string if provided + ip_str = str(request.ip) if request.ip else None + + async def _process_url_item( + url_item: MultiUrlRequestItem, + ) -> str: + """Process a single URL item with common parameters and return the encoded URL.""" + query_params = url_item.query_params.copy() + if "api_password" not in query_params and request.api_password: + query_params["api_password"] = request.api_password + + # Generate the encoded URL + return encode_mediaflow_proxy_url( + mediaflow_proxy_url=request.mediaflow_proxy_url, + endpoint=url_item.endpoint, + destination_url=url_item.destination_url, + query_params=query_params, + request_headers=url_item.request_headers, + response_headers=url_item.response_headers, + encryption_handler=encryption_handler, + expiration=request.expiration, + ip=ip_str, + filename=url_item.filename, + ) + + tasks = [_process_url_item(url_item) for url_item in request.urls] + encoded_urls = await asyncio.gather(*tasks) + return {"urls": encoded_urls} app.include_router(proxy_router, prefix="/proxy", tags=["proxy"], dependencies=[Depends(verify_api_key)]) diff --git a/mediaflow_proxy/routes.py b/mediaflow_proxy/routes.py deleted file mode 100644 index 078c243..0000000 --- a/mediaflow_proxy/routes.py +++ /dev/null @@ -1,164 +0,0 @@ -from fastapi import Request, Depends, APIRouter -from pydantic import HttpUrl - -from .handlers import handle_hls_stream_proxy, proxy_stream, get_manifest, get_playlist, get_segment, get_public_ip -from .utils.http_utils import get_proxy_headers, ProxyRequestHeaders - -proxy_router = APIRouter() - - -@proxy_router.head("/hls") -@proxy_router.get("/hls") -async def hls_stream_proxy( - request: Request, - d: HttpUrl, - proxy_headers: ProxyRequestHeaders = Depends(get_proxy_headers), - key_url: HttpUrl | None = None, - verify_ssl: bool = False, - use_request_proxy: bool = True, -): - """ - Proxify HLS stream requests, fetching and processing the m3u8 playlist or streaming the content. - - Args: - request (Request): The incoming HTTP request. - d (HttpUrl): The destination URL to fetch the content from. - key_url (HttpUrl, optional): The HLS Key URL to replace the original key URL. Defaults to None. (Useful for bypassing some sneaky protection) - proxy_headers (ProxyRequestHeaders): The headers to include in the request. - verify_ssl (bool, optional): Whether to verify the SSL certificate of the destination. Defaults to False. - use_request_proxy (bool, optional): Whether to use the MediaFlow proxy configuration. Defaults to True. - - Returns: - Response: The HTTP response with the processed m3u8 playlist or streamed content. - """ - destination = str(d) - return await handle_hls_stream_proxy(request, destination, proxy_headers, key_url, verify_ssl, use_request_proxy) - - -@proxy_router.head("/stream") -@proxy_router.get("/stream") -async def proxy_stream_endpoint( - request: Request, - d: HttpUrl, - proxy_headers: ProxyRequestHeaders = Depends(get_proxy_headers), - verify_ssl: bool = False, - use_request_proxy: bool = True, -): - """ - Proxies stream requests to the given video URL. - - Args: - request (Request): The incoming HTTP request. - d (HttpUrl): The URL of the video to stream. - proxy_headers (ProxyRequestHeaders): The headers to include in the request. - verify_ssl (bool, optional): Whether to verify the SSL certificate of the destination. Defaults to False. - use_request_proxy (bool, optional): Whether to use the MediaFlow proxy configuration. Defaults to True. - - Returns: - Response: The HTTP response with the streamed content. - """ - proxy_headers.request.update({"range": proxy_headers.request.get("range", "bytes=0-")}) - return await proxy_stream(request.method, str(d), proxy_headers, verify_ssl, use_request_proxy) - - -@proxy_router.get("/mpd/manifest") -async def manifest_endpoint( - request: Request, - d: HttpUrl, - proxy_headers: ProxyRequestHeaders = Depends(get_proxy_headers), - key_id: str = None, - key: str = None, - verify_ssl: bool = False, - use_request_proxy: bool = True, -): - """ - Retrieves and processes the MPD manifest, converting it to an HLS manifest. - - Args: - request (Request): The incoming HTTP request. - d (HttpUrl): The URL of the MPD manifest. - proxy_headers (ProxyRequestHeaders): The headers to include in the request. - key_id (str, optional): The DRM key ID. Defaults to None. - key (str, optional): The DRM key. Defaults to None. - verify_ssl (bool, optional): Whether to verify the SSL certificate of the destination. Defaults to False. - use_request_proxy (bool, optional): Whether to use the MediaFlow proxy configuration. Defaults to True. - - Returns: - Response: The HTTP response with the HLS manifest. - """ - return await get_manifest(request, str(d), proxy_headers, key_id, key, verify_ssl, use_request_proxy) - - -@proxy_router.get("/mpd/playlist") -async def playlist_endpoint( - request: Request, - d: HttpUrl, - profile_id: str, - proxy_headers: ProxyRequestHeaders = Depends(get_proxy_headers), - key_id: str = None, - key: str = None, - verify_ssl: bool = False, - use_request_proxy: bool = True, -): - """ - Retrieves and processes the MPD manifest, converting it to an HLS playlist for a specific profile. - - Args: - request (Request): The incoming HTTP request. - d (HttpUrl): The URL of the MPD manifest. - profile_id (str): The profile ID to generate the playlist for. - proxy_headers (ProxyRequestHeaders): The headers to include in the request. - key_id (str, optional): The DRM key ID. Defaults to None. - key (str, optional): The DRM key. Defaults to None. - verify_ssl (bool, optional): Whether to verify the SSL certificate of the destination. Defaults to False. - use_request_proxy (bool, optional): Whether to use the MediaFlow proxy configuration. Defaults to True. - - Returns: - Response: The HTTP response with the HLS playlist. - """ - return await get_playlist(request, str(d), profile_id, proxy_headers, key_id, key, verify_ssl, use_request_proxy) - - -@proxy_router.get("/mpd/segment") -async def segment_endpoint( - init_url: HttpUrl, - segment_url: HttpUrl, - mime_type: str, - proxy_headers: ProxyRequestHeaders = Depends(get_proxy_headers), - key_id: str = None, - key: str = None, - verify_ssl: bool = False, - use_request_proxy: bool = True, -): - """ - Retrieves and processes a media segment, decrypting it if necessary. - - Args: - init_url (HttpUrl): The URL of the initialization segment. - segment_url (HttpUrl): The URL of the media segment. - mime_type (str): The MIME type of the segment. - proxy_headers (ProxyRequestHeaders): The headers to include in the request. - key_id (str, optional): The DRM key ID. Defaults to None. - key (str, optional): The DRM key. Defaults to None. - verify_ssl (bool, optional): Whether to verify the SSL certificate of the destination. Defaults to False. - use_request_proxy (bool, optional): Whether to use the MediaFlow proxy configuration. Defaults to True. - - Returns: - Response: The HTTP response with the processed segment. - """ - return await get_segment( - str(init_url), str(segment_url), mime_type, proxy_headers, key_id, key, verify_ssl, use_request_proxy - ) - - -@proxy_router.get("/ip") -async def get_mediaflow_proxy_public_ip( - use_request_proxy: bool = True, -): - """ - Retrieves the public IP address of the MediaFlow proxy server. - - Returns: - Response: The HTTP response with the public IP address in the form of a JSON object. {"ip": "xxx.xxx.xxx.xxx"} - """ - return await get_public_ip(use_request_proxy) diff --git a/mediaflow_proxy/routes/proxy.py b/mediaflow_proxy/routes/proxy.py index fb936fa..46b8a40 100644 --- a/mediaflow_proxy/routes/proxy.py +++ b/mediaflow_proxy/routes/proxy.py @@ -1,4 +1,5 @@ from typing import Annotated +from urllib.parse import quote from fastapi import Request, Depends, APIRouter, Query, HTTPException @@ -14,7 +15,6 @@ from mediaflow_proxy.schemas import ( MPDSegmentParams, MPDPlaylistParams, HLSManifestParams, - ProxyStreamParams, MPDManifestParams, ) from mediaflow_proxy.utils.http_utils import get_proxy_headers, ProxyRequestHeaders @@ -45,18 +45,22 @@ async def hls_manifest_proxy( @proxy_router.head("/stream") @proxy_router.get("/stream") +@proxy_router.head("/stream/{filename:path}") +@proxy_router.get("/stream/{filename:path}") async def proxy_stream_endpoint( request: Request, - stream_params: Annotated[ProxyStreamParams, Query()], proxy_headers: Annotated[ProxyRequestHeaders, Depends(get_proxy_headers)], + destination: str = Query(..., description="The URL of the stream.", alias="d"), + filename: str | None = None, ): """ - Proxies stream requests to the given video URL. + Proxify stream requests to the given video URL. Args: request (Request): The incoming HTTP request. - stream_params (ProxyStreamParams): The parameters for the stream request. proxy_headers (ProxyRequestHeaders): The headers to include in the request. + destination (str): The URL of the stream to be proxied. + filename (str | None): The filename to be used in the response headers. Returns: Response: The HTTP response with the streamed content. @@ -66,7 +70,20 @@ async def proxy_stream_endpoint( # Handle invalid range requests "bytes=NaN-NaN" raise HTTPException(status_code=416, detail="Invalid Range Header") proxy_headers.request.update({"range": content_range}) - return await proxy_stream(request.method, stream_params, proxy_headers) + if filename: + # If a filename is provided, set it in the headers using RFC 6266 format + try: + # Try to encode with latin-1 first (simple case) + filename.encode("latin-1") + content_disposition = f'attachment; filename="{filename}"' + except UnicodeEncodeError: + # For filenames with non-latin-1 characters, use RFC 6266 format with UTF-8 + encoded_filename = quote(filename.encode("utf-8")) + content_disposition = f"attachment; filename*=UTF-8''{encoded_filename}" + + proxy_headers.response.update({"content-disposition": content_disposition}) + + return await proxy_stream(request.method, destination, proxy_headers) @proxy_router.get("/mpd/manifest.m3u8") diff --git a/mediaflow_proxy/schemas.py b/mediaflow_proxy/schemas.py index ba0ee46..1ffee30 100644 --- a/mediaflow_proxy/schemas.py +++ b/mediaflow_proxy/schemas.py @@ -23,6 +23,34 @@ class GenerateUrlRequest(BaseModel): None, description="API password for encryption. If not provided, the URL will only be encoded." ) ip: Optional[IPvAnyAddress] = Field(None, description="The IP address to restrict the URL to.") + filename: Optional[str] = Field(None, description="Filename to be preserved for media players like Infuse.") + + +class MultiUrlRequestItem(BaseModel): + endpoint: Optional[str] = Field(None, description="The specific endpoint to be appended to the base URL.") + destination_url: Optional[str] = Field( + None, description="The destination URL to which the request will be proxied." + ) + query_params: Optional[dict] = Field( + default_factory=dict, description="Query parameters to be included in the request." + ) + request_headers: Optional[dict] = Field(default_factory=dict, description="Headers to be included in the request.") + response_headers: Optional[dict] = Field( + default_factory=dict, description="Headers to be included in the response." + ) + filename: Optional[str] = Field(None, description="Filename to be preserved for media players like Infuse.") + + +class GenerateMultiUrlRequest(BaseModel): + mediaflow_proxy_url: str = Field(..., description="The base URL for the mediaflow proxy.") + api_password: Optional[str] = Field( + None, description="API password for encryption. If not provided, the URL will only be encoded." + ) + expiration: Optional[int] = Field( + None, description="Expiration time for the URL in seconds. If not provided, the URL will not expire." + ) + ip: Optional[IPvAnyAddress] = Field(None, description="The IP address to restrict the URL to.") + urls: list[MultiUrlRequestItem] = Field(..., description="List of URL configurations to generate.") class GenericParams(BaseModel): @@ -37,10 +65,6 @@ class HLSManifestParams(GenericParams): ) -class ProxyStreamParams(GenericParams): - destination: str = Field(..., description="The URL of the stream.", alias="d") - - class MPDManifestParams(GenericParams): destination: str = Field(..., description="The URL of the MPD manifest.", alias="d") key_id: Optional[str] = Field(None, description="The DRM key ID (optional).") @@ -63,9 +87,9 @@ class MPDSegmentParams(GenericParams): class ExtractorURLParams(GenericParams): - host: Literal["Doodstream", "Mixdrop", "Uqload", "Streamtape", "Supervideo", "VixCloud", "Okru", "Maxstream", "LiveTV"] = Field( - ..., description="The host to extract the URL from." - ) + host: Literal[ + "Doodstream", "Mixdrop", "Uqload", "Streamtape", "Supervideo", "VixCloud", "Okru", "Maxstream", "LiveTV" + ] = Field(..., description="The host to extract the URL from.") destination: str = Field(..., description="The URL of the stream.", alias="d") redirect_stream: bool = Field(False, description="Whether to redirect to the stream endpoint automatically.") extra_params: Dict[str, Any] = Field( diff --git a/mediaflow_proxy/utils/cache_utils.py b/mediaflow_proxy/utils/cache_utils.py index d300a2b..a94cf1b 100644 --- a/mediaflow_proxy/utils/cache_utils.py +++ b/mediaflow_proxy/utils/cache_utils.py @@ -325,7 +325,7 @@ async def get_cached_mpd( parsed_dict = parse_mpd_dict(mpd_dict, mpd_url, parse_drm, parse_segment_profile_id) # Cache the original MPD dict - await MPD_CACHE.set(mpd_url, json.dumps(mpd_dict).encode(), ttl=parsed_dict["minimumUpdatePeriod"]) + await MPD_CACHE.set(mpd_url, json.dumps(mpd_dict).encode(), ttl=parsed_dict.get("minimumUpdatePeriod")) return parsed_dict except DownloadError as error: logger.error(f"Error downloading MPD: {error}") diff --git a/mediaflow_proxy/utils/crypto_utils.py b/mediaflow_proxy/utils/crypto_utils.py index 056df3b..d57edc8 100644 --- a/mediaflow_proxy/utils/crypto_utils.py +++ b/mediaflow_proxy/utils/crypto_utils.py @@ -29,11 +29,13 @@ class EncryptionHandler: iv = get_random_bytes(16) cipher = AES.new(self.secret_key, AES.MODE_CBC, iv) encrypted_data = cipher.encrypt(pad(json_data, AES.block_size)) - return base64.urlsafe_b64encode(iv + encrypted_data).decode("utf-8") + return base64.urlsafe_b64encode(iv + encrypted_data).decode("utf-8").rstrip("=") def decrypt_data(self, token: str, client_ip: str) -> dict: try: - encrypted_data = base64.urlsafe_b64decode(token.encode("utf-8")) + padding_needed = (4 - len(token) % 4) % 4 + encrypted_token_b64_padded = token + ("=" * padding_needed) + encrypted_data = base64.urlsafe_b64decode(encrypted_token_b64_padded.encode("utf-8")) iv = encrypted_data[:16] cipher = AES.new(self.secret_key, AES.MODE_CBC, iv) decrypted_data = unpad(cipher.decrypt(encrypted_data[16:]), AES.block_size) @@ -60,14 +62,55 @@ class EncryptionMiddleware(BaseHTTPMiddleware): self.encryption_handler = encryption_handler async def dispatch(self, request: Request, call_next): - encrypted_token = request.query_params.get("token") + path = request.url.path + token_marker = "/_token_" + encrypted_token = None + + # Check for token in path + if token_marker in path and self.encryption_handler: + try: + # Extract token from path + token_start = path.find(token_marker) + len(token_marker) + token_end = path.find("/", token_start) + + if token_end == -1: # No trailing slash (no filename after token) + token_end = len(path) + filename_part = "" + else: + # There's something after the token (likely a filename) + filename_part = path[token_end:] + + # Get the encrypted token + encrypted_token = path[token_start:token_end] + + # Modify the path to remove the token part but preserve the filename + original_path = path[: path.find(token_marker)] + original_path += filename_part # Add back the filename part + + request.scope["path"] = original_path + + # Update the raw path as well + request.scope["raw_path"] = original_path.encode() + + except Exception as e: + logging.error(f"Error processing token in path: {str(e)}") + return JSONResponse(content={"error": f"Invalid token in path: {str(e)}"}, status_code=400) + + # Check for token in query parameters (original method) + if not encrypted_token: # Only check if we didn't already find a token in the path + encrypted_token = request.query_params.get("token") + + # Process the token if found (from either source) if encrypted_token and self.encryption_handler: try: client_ip = self.get_client_ip(request) decrypted_data = self.encryption_handler.decrypt_data(encrypted_token, client_ip) + # Modify request query parameters with decrypted data query_params = dict(request.query_params) - query_params.pop("token") # Remove the encrypted token from query params + if "token" in query_params: + query_params.pop("token") # Remove the encrypted token from query params + query_params.update(decrypted_data) # Add decrypted data to query params query_params["has_encrypted"] = True @@ -75,8 +118,12 @@ class EncryptionMiddleware(BaseHTTPMiddleware): new_query_string = urlencode(query_params) request.scope["query_string"] = new_query_string.encode() request._query_params = query_params + except HTTPException as e: return JSONResponse(content={"error": str(e.detail)}, status_code=e.status_code) + except Exception as e: + logging.error(f"Error decrypting token: {str(e)}") + return JSONResponse(content={"error": f"Invalid token: {str(e)}"}, status_code=400) try: response = await call_next(request) diff --git a/mediaflow_proxy/utils/http_utils.py b/mediaflow_proxy/utils/http_utils.py index 1a594e2..9554907 100644 --- a/mediaflow_proxy/utils/http_utils.py +++ b/mediaflow_proxy/utils/http_utils.py @@ -30,10 +30,11 @@ class DownloadError(Exception): super().__init__(message) -def create_httpx_client(follow_redirects: bool = True, timeout: float = 30.0, **kwargs) -> httpx.AsyncClient: +def create_httpx_client(follow_redirects: bool = True, **kwargs) -> httpx.AsyncClient: """Creates an HTTPX client with configured proxy routing""" mounts = settings.transport_config.get_mounts() - client = httpx.AsyncClient(mounts=mounts, follow_redirects=follow_redirects, timeout=timeout, **kwargs) + kwargs.setdefault("timeout", settings.transport_config.timeout) + client = httpx.AsyncClient(mounts=mounts, follow_redirects=follow_redirects, **kwargs) return client @@ -94,6 +95,11 @@ class Streamer: self.end_byte = 0 self.total_size = 0 + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + retry=retry_if_exception_type(DownloadError), + ) async def create_streaming_response(self, url: str, headers: dict): """ Creates and sends a streaming request. @@ -103,9 +109,27 @@ class Streamer: headers (dict): The headers to include in the request. """ - request = self.client.build_request("GET", url, headers=headers) - self.response = await self.client.send(request, stream=True, follow_redirects=True) - self.response.raise_for_status() + try: + request = self.client.build_request("GET", url, headers=headers) + self.response = await self.client.send(request, stream=True, follow_redirects=True) + self.response.raise_for_status() + except httpx.TimeoutException: + logger.warning("Timeout while creating streaming response") + raise DownloadError(409, "Timeout while creating streaming response") + except httpx.HTTPStatusError as e: + logger.error(f"HTTP error {e.response.status_code} while creating streaming response") + if e.response.status_code == 404: + logger.error(f"Segment Resource not found: {url}") + raise e + raise DownloadError( + e.response.status_code, f"HTTP error {e.response.status_code} while creating streaming response" + ) + except httpx.RequestError as e: + logger.error(f"Error creating streaming response: {e}") + raise DownloadError(502, f"Error creating streaming response: {e}") + except Exception as e: + logger.error(f"Error creating streaming response: {e}") + raise RuntimeError(f"Error creating streaming response: {e}") async def stream_content(self) -> typing.AsyncGenerator[bytes, None]: """ @@ -258,6 +282,7 @@ def encode_mediaflow_proxy_url( encryption_handler: EncryptionHandler = None, expiration: int = None, ip: str = None, + filename: typing.Optional[str] = None, ) -> str: """ Encodes & Encrypt (Optional) a MediaFlow proxy URL with query parameters and headers. @@ -272,10 +297,12 @@ def encode_mediaflow_proxy_url( encryption_handler (EncryptionHandler, optional): The encryption handler to use. Defaults to None. expiration (int, optional): The expiration time for the encrypted token. Defaults to None. ip (str, optional): The public IP address to include in the query parameters. Defaults to None. + filename (str, optional): Filename to be preserved for media players like Infuse. Defaults to None. Returns: str: The encoded MediaFlow proxy URL. """ + # Prepare query parameters query_params = query_params or {} if destination_url is not None: query_params["d"] = destination_url @@ -290,18 +317,37 @@ def encode_mediaflow_proxy_url( {key if key.startswith("r_") else f"r_{key}": value for key, value in response_headers.items()} ) + # Construct the base URL + if endpoint is None: + base_url = mediaflow_proxy_url + else: + base_url = parse.urljoin(mediaflow_proxy_url, endpoint) + + # Ensure base_url doesn't end with a slash for consistent handling + if base_url.endswith("/"): + base_url = base_url[:-1] + + # Handle encryption if needed if encryption_handler: encrypted_token = encryption_handler.encrypt_data(query_params, expiration, ip) - encoded_params = urlencode({"token": encrypted_token}) + # Build the URL with token in path + path_parts = [base_url, f"_token_{encrypted_token}"] + + # Add filename at the end if provided + if filename: + path_parts.append(parse.quote(filename)) + + return "/".join(path_parts) + else: - encoded_params = urlencode(query_params) + # No encryption, use regular query parameters + url = base_url + if filename: + url = f"{url}/{parse.quote(filename)}" - # Construct the full URL - if endpoint is None: - return f"{mediaflow_proxy_url}?{encoded_params}" - - base_url = parse.urljoin(mediaflow_proxy_url, endpoint) - return f"{base_url}?{encoded_params}" + if query_params: + return f"{url}?{urlencode(query_params)}" + return url def get_original_scheme(request: Request) -> str: diff --git a/mediaflow_proxy/utils/m3u8_processor.py b/mediaflow_proxy/utils/m3u8_processor.py index 83f8958..87e0f6a 100644 --- a/mediaflow_proxy/utils/m3u8_processor.py +++ b/mediaflow_proxy/utils/m3u8_processor.py @@ -1,4 +1,6 @@ +import codecs import re +from typing import AsyncGenerator from urllib import parse from mediaflow_proxy.utils.crypto_utils import encryption_handler @@ -42,6 +44,70 @@ class M3U8Processor: processed_lines.append(line) return "\n".join(processed_lines) + async def process_m3u8_streaming( + self, content_iterator: AsyncGenerator[bytes, None], base_url: str + ) -> AsyncGenerator[str, None]: + """ + Processes the m3u8 content on-the-fly, yielding processed lines as they are read. + + Args: + content_iterator: An async iterator that yields chunks of the m3u8 content. + base_url (str): The base URL to resolve relative URLs. + + Yields: + str: Processed lines of the m3u8 content. + """ + buffer = "" # String buffer for decoded content + decoder = codecs.getincrementaldecoder("utf-8")(errors="replace") + + # Process the content chunk by chunk + async for chunk in content_iterator: + if isinstance(chunk, str): + chunk = chunk.encode("utf-8") + + # Incrementally decode the chunk + decoded_chunk = decoder.decode(chunk) + buffer += decoded_chunk + + # Process complete lines + lines = buffer.split("\n") + if len(lines) > 1: + # Process all complete lines except the last one + for line in lines[:-1]: + if line: # Skip empty lines + processed_line = await self.process_line(line, base_url) + yield processed_line + "\n" + + # Keep the last line in the buffer (it might be incomplete) + buffer = lines[-1] + + # Process any remaining data in the buffer plus final bytes + final_chunk = decoder.decode(b"", final=True) + if final_chunk: + buffer += final_chunk + + if buffer: # Process the last line if it's not empty + processed_line = await self.process_line(buffer, base_url) + yield processed_line + + async def process_line(self, line: str, base_url: str) -> str: + """ + Process a single line from the m3u8 content. + + Args: + line (str): The line to process. + base_url (str): The base URL to resolve relative URLs. + + Returns: + str: The processed line. + """ + if "URI=" in line: + return await self.process_key_line(line, base_url) + elif not line.startswith("#") and line.strip(): + return await self.proxy_url(line, base_url) + else: + return line + async def process_key_line(self, line: str, base_url: str) -> str: """ Processes a key line in the m3u8 content, proxying the URI. diff --git a/mediaflow_proxy/utils/mpd_utils.py b/mediaflow_proxy/utils/mpd_utils.py index 5603694..19827bf 100644 --- a/mediaflow_proxy/utils/mpd_utils.py +++ b/mediaflow_proxy/utils/mpd_utils.py @@ -317,7 +317,7 @@ def parse_segment_timeline(parsed_dict: dict, item: dict, profile: dict, source: """ timelines = item["SegmentTimeline"]["S"] timelines = timelines if isinstance(timelines, list) else [timelines] - period_start = parsed_dict["availabilityStartTime"] + timedelta(seconds=parsed_dict.get("PeriodStart", 0)) + period_start = parsed_dict.get("availabilityStartTime", datetime.fromtimestamp(0, tz=timezone.utc)) + timedelta(seconds=parsed_dict.get("PeriodStart", 0)) presentation_time_offset = int(item.get("@presentationTimeOffset", 0)) start_number = int(item.get("@startNumber", 1))