diff --git a/mediaflow_proxy/configs.py b/mediaflow_proxy/configs.py index 99410b5..049105a 100644 --- a/mediaflow_proxy/configs.py +++ b/mediaflow_proxy/configs.py @@ -40,6 +40,11 @@ class TransportConfig(BaseSettings): verify=route.verify_ssl, proxy=route.proxy_url or self.proxy_url if route.proxy else None ) + # Hardcoded configuration for jxoplay.xyz domain - SSL verification disabled + mounts["all://jxoplay.xyz"] = transport_cls( + verify=False, proxy=self.proxy_url if self.all_proxy else None + ) + # Set default proxy for all routes if enabled if self.all_proxy: mounts["all://"] = transport_cls(proxy=self.proxy_url) @@ -63,6 +68,16 @@ class Settings(BaseSettings): m3u8_content_routing: Literal["mediaflow", "stremio", "direct"] = ( "mediaflow" # Routing strategy for M3U8 content URLs: "mediaflow", "stremio", or "direct" ) + enable_hls_prebuffer: bool = False # Whether to enable HLS pre-buffering for improved streaming performance. + hls_prebuffer_segments: int = 5 # Number of segments to pre-buffer ahead. + hls_prebuffer_cache_size: int = 50 # Maximum number of segments to cache in memory. + hls_prebuffer_max_memory_percent: int = 80 # Maximum percentage of system memory to use for HLS pre-buffer cache. + hls_prebuffer_emergency_threshold: int = 90 # Emergency threshold percentage to trigger aggressive cache cleanup. + enable_dash_prebuffer: bool = False # Whether to enable DASH pre-buffering for improved streaming performance. + dash_prebuffer_segments: int = 5 # Number of segments to pre-buffer ahead. + dash_prebuffer_cache_size: int = 50 # Maximum number of segments to cache in memory. + dash_prebuffer_max_memory_percent: int = 80 # Maximum percentage of system memory to use for DASH pre-buffer cache. + dash_prebuffer_emergency_threshold: int = 90 # Emergency threshold percentage to trigger aggressive cache cleanup. user_agent: str = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36" # The user agent to use for HTTP requests. diff --git a/mediaflow_proxy/extractors/base.py b/mediaflow_proxy/extractors/base.py index bf8a15b..359fc5d 100644 --- a/mediaflow_proxy/extractors/base.py +++ b/mediaflow_proxy/extractors/base.py @@ -29,7 +29,7 @@ class BaseExtractor(ABC): """Make HTTP request with error handling.""" try: async with create_httpx_client() as client: - request_headers = self.base_headers + request_headers = self.base_headers.copy() request_headers.update(headers or {}) response = await client.request( method, @@ -40,9 +40,9 @@ class BaseExtractor(ABC): response.raise_for_status() return response except httpx.HTTPError as e: - raise ExtractorError(f"HTTP request failed: {str(e)}") + raise ExtractorError(f"HTTP request failed for URL {url}: {str(e)}") except Exception as e: - raise ExtractorError(f"Request failed: {str(e)}") + raise ExtractorError(f"Request failed for URL {url}: {str(e)}") @abstractmethod async def extract(self, url: str, **kwargs) -> Dict[str, Any]: diff --git a/mediaflow_proxy/extractors/dlhd.py b/mediaflow_proxy/extractors/dlhd.py index 5f9bbab..d3556b4 100644 --- a/mediaflow_proxy/extractors/dlhd.py +++ b/mediaflow_proxy/extractors/dlhd.py @@ -1,373 +1,501 @@ -import re -from typing import Dict, Any, Optional -from urllib.parse import urlparse, quote - -from mediaflow_proxy.extractors.base import BaseExtractor, ExtractorError - - -class DLHDExtractor(BaseExtractor): - """DLHD (DaddyLive) URL extractor for M3U8 streams.""" - - def __init__(self, request_headers: dict): - super().__init__(request_headers) - # Default to HLS proxy endpoint - self.mediaflow_endpoint = "hls_manifest_proxy" - - async def extract(self, url: str, **kwargs) -> Dict[str, Any]: - """Extract DLHD stream URL and required headers. - - Args: - url: The DaddyLive channel URL (required) - - Keyword Args: - player_url: Direct player URL (optional) - stream_url: The stream URL (optional) - auth_url_base: Base URL for auth requests (optional) - - Returns: - Dict containing stream URL and required headers - """ - try: - # Channel URL is required and serves as the referer - channel_url = url - channel_origin = self._get_origin(channel_url) # Channel page origin - - # Check for direct parameters - player_url_from_arg = kwargs.get("player_url") - stream_url_from_arg = kwargs.get("stream_url") - auth_url_base_from_arg = kwargs.get("auth_url_base") - - current_player_url_for_processing: str - - # If player URL not provided, extract it from channel page - if not player_url_from_arg: - # Get the channel page to extract the player iframe URL - channel_headers = { - "referer": channel_origin + "/", - "origin": channel_origin, - "user-agent": self.base_headers["user-agent"], - } - - channel_response = await self._make_request(channel_url, headers=channel_headers) - extracted_iframe_url = self._extract_player_url(channel_response.text) - - if not extracted_iframe_url: - raise ExtractorError("Could not extract player URL from channel page") - current_player_url_for_processing = extracted_iframe_url - else: - current_player_url_for_processing = player_url_from_arg - - # Attempt 1: _handle_vecloud with current_player_url_for_processing - # The referer for _handle_vecloud is the origin of the channel page (channel_origin) - # or the origin of the player itself if it is a /stream/ URL. - try: - referer_for_vecloud = channel_origin + "/" - if re.search(r"/stream/([a-zA-Z0-9-]+)", current_player_url_for_processing): - referer_for_vecloud = self._get_origin(current_player_url_for_processing) + "/" - return await self._handle_vecloud(current_player_url_for_processing, referer_for_vecloud) - except Exception: - pass # Fail, Continue - - # Attempt 2: If _handle_vecloud fail and the URL is not /stream/, try _handle_playnow - # and then _handle_vecloud again with the URL resulting from playnow. - if not re.search(r"/stream/([a-zA-Z0-9-]+)", current_player_url_for_processing): - try: - playnow_derived_player_url = await self._handle_playnow(current_player_url_for_processing, channel_origin + "/") - if re.search(r"/stream/([a-zA-Z0-9-]+)", playnow_derived_player_url): - try: - referer_for_vecloud_after_playnow = self._get_origin(playnow_derived_player_url) + "/" - return await self._handle_vecloud(playnow_derived_player_url, referer_for_vecloud_after_playnow) - except Exception: - pass - except Exception: - pass - - # If all previous attempts have failed, proceed with standard authentication. - player_url_for_auth = current_player_url_for_processing - player_origin_for_auth = self._get_origin(player_url_for_auth) - - # Get player page to extract authentication information - player_headers = { - "referer": player_origin_for_auth + "/", - "origin": player_origin_for_auth, - "user-agent": self.base_headers["user-agent"], - } - - player_response = await self._make_request(player_url_for_auth, headers=player_headers) - player_content = player_response.text - - # Extract authentication details from script tag - auth_data = self._extract_auth_data(player_content) - if not auth_data: - raise ExtractorError("Failed to extract authentication data from player") - - # Extract auth URL base if not provided - final_auth_url_base = auth_url_base_from_arg - if not final_auth_url_base: - final_auth_url_base = self._extract_auth_url_base(player_content) - - # If still no auth URL base, try to derive from stream URL or player URL - if not final_auth_url_base: - if stream_url_from_arg: - final_auth_url_base = self._get_origin(stream_url_from_arg) - else: - # Try to extract from player URL structure - player_domain_for_auth_derive = self._get_origin(player_url_for_auth) - # Attempt to construct a standard auth domain - final_auth_url_base = self._derive_auth_url_base(player_domain_for_auth_derive) - - if not final_auth_url_base: - raise ExtractorError("Could not determine auth URL base") - - # Construct auth URL - auth_url = ( - f"{final_auth_url_base}/auth.php?channel_id={auth_data['channel_key']}" - f"&ts={auth_data['auth_ts']}&rnd={auth_data['auth_rnd']}" - f"&sig={quote(auth_data['auth_sig'])}" - ) - - # Make auth request - auth_req_headers = { - "referer": player_origin_for_auth + "/", - "origin": player_origin_for_auth, - "user-agent": self.base_headers["user-agent"], - } - - auth_response = await self._make_request(auth_url, headers=auth_req_headers) - - # Check if authentication succeeded - if auth_response.json().get("status") != "ok": - raise ExtractorError("Authentication failed") - - # If no stream URL provided, look up the server and generate the stream URL - final_stream_url = stream_url_from_arg - if not final_stream_url: - final_stream_url = await self._lookup_server( - lookup_url_base=player_origin_for_auth, - auth_url_base=final_auth_url_base, - auth_data=auth_data, - headers=auth_req_headers, - ) - - # Set up the final stream headers - stream_headers = { - "referer": player_url_for_auth, - "origin": player_origin_for_auth, - "user-agent": self.base_headers["user-agent"], - } - - # Return the stream URL with headers - return { - "destination_url": final_stream_url, - "request_headers": stream_headers, - "mediaflow_endpoint": self.mediaflow_endpoint, - } - - except Exception as e: - raise ExtractorError(f"Extraction failed: {str(e)}") - - async def _handle_vecloud(self, player_url: str, channel_referer: str) -> Dict[str, Any]: - """Handle vecloud URLs with their specific API. - - Args: - player_url: The vecloud player URL - channel_referer: The referer of the channel page - Returns: - Dict containing stream URL and required headers - """ - try: - # Extract stream ID from vecloud URL - stream_id_match = re.search(r"/stream/([a-zA-Z0-9-]+)", player_url) - if not stream_id_match: - raise ExtractorError("Could not extract stream ID from vecloud URL") - - stream_id = stream_id_match.group(1) - - response = await self._make_request( - player_url, headers={"referer": channel_referer, "user-agent": self.base_headers["user-agent"]} - ) - player_url = str(response.url) - - # Construct API URL - player_parsed = urlparse(player_url) - player_domain = player_parsed.netloc - player_origin = f"{player_parsed.scheme}://{player_parsed.netloc}" - api_url = f"{player_origin}/api/source/{stream_id}?type=live" - - # Set up headers for API request - api_headers = { - "referer": player_url, - "origin": player_origin, - "user-agent": self.base_headers["user-agent"], - "content-type": "application/json", - } - - api_data = {"r": channel_referer, "d": player_domain} - - # Make API request - api_response = await self._make_request(api_url, method="POST", headers=api_headers, json=api_data) - api_data = api_response.json() - - # Check if request was successful - if not api_data.get("success"): - raise ExtractorError("Vecloud API request failed") - - # Extract stream URL from response - stream_url = api_data.get("player", {}).get("source_file") - - if not stream_url: - raise ExtractorError("Could not find stream URL in vecloud response") - - # Set up stream headers - stream_headers = { - "referer": player_origin + "/", - "origin": player_origin, - "user-agent": self.base_headers["user-agent"], - } - - # Return the stream URL with headers - return { - "destination_url": stream_url, - "request_headers": stream_headers, - "mediaflow_endpoint": self.mediaflow_endpoint, - } - - except Exception as e: - raise ExtractorError(f"Vecloud extraction failed: {str(e)}") - - async def _handle_playnow(self, player_iframe: str, channel_origin: str) -> str: - """Handle playnow URLs.""" - # Set up headers for the playnow request - playnow_headers = {"referer": channel_origin + "/", "user-agent": self.base_headers["user-agent"]} - - # Make the playnow request - playnow_response = await self._make_request(player_iframe, headers=playnow_headers) - player_url = self._extract_player_url(playnow_response.text) - if not player_url: - raise ExtractorError("Could not extract player URL from playnow response") - return player_url - - def _extract_player_url(self, html_content: str) -> Optional[str]: - """Extract player iframe URL from channel page HTML.""" - try: - # Look for iframe with allowfullscreen attribute - iframe_match = re.search( - r']*src=["\']([^"\']+)["\'][^>]*allowfullscreen', html_content, re.IGNORECASE - ) - - if not iframe_match: - # Try alternative pattern without requiring allowfullscreen - iframe_match = re.search( - r']*src=["\']([^"\']+(?:premiumtv|daddylivehd|vecloud)[^"\']*)["\']', - html_content, - re.IGNORECASE, - ) - - if iframe_match: - return iframe_match.group(1).strip() - - return None - except Exception: - return None - - async def _lookup_server( - self, lookup_url_base: str, auth_url_base: str, auth_data: Dict[str, str], headers: Dict[str, str] - ) -> str: - """Lookup server information and generate stream URL.""" - try: - # Construct server lookup URL - server_lookup_url = f"{lookup_url_base}/server_lookup.php?channel_id={quote(auth_data['channel_key'])}" - - # Make server lookup request - server_response = await self._make_request(server_lookup_url, headers=headers) - - server_data = server_response.json() - server_key = server_data.get("server_key") - - if not server_key: - raise ExtractorError("Failed to get server key") - - # Extract domain parts from auth URL for constructing stream URL - auth_domain_parts = urlparse(auth_url_base).netloc.split(".") - domain_suffix = ".".join(auth_domain_parts[1:]) if len(auth_domain_parts) > 1 else auth_domain_parts[0] - - # Generate the m3u8 URL based on server response pattern - if "/" in server_key: - # Handle special case like "top1/cdn" - parts = server_key.split("/") - return f"https://{parts[0]}.{domain_suffix}/{server_key}/{auth_data['channel_key']}/mono.m3u8" - else: - # Handle normal case - return f"https://{server_key}new.{domain_suffix}/{server_key}/{auth_data['channel_key']}/mono.m3u8" - - except Exception as e: - raise ExtractorError(f"Server lookup failed: {str(e)}") - - def _extract_auth_data(self, html_content: str) -> Dict[str, str]: - """Extract authentication data from player page.""" - try: - # Extract channel key - channel_key_match = re.search(r'var\s+channelKey\s*=\s*["\']([^"\']+)["\']', html_content) - # Extract auth timestamp - auth_ts_match = re.search(r'var\s+authTs\s*=\s*["\']([^"\']+)["\']', html_content) - # Extract auth random value - auth_rnd_match = re.search(r'var\s+authRnd\s*=\s*["\']([^"\']+)["\']', html_content) - # Extract auth signature - auth_sig_match = re.search(r'var\s+authSig\s*=\s*["\']([^"\']+)["\']', html_content) - - if not all([channel_key_match, auth_ts_match, auth_rnd_match, auth_sig_match]): - return {} - - return { - "channel_key": channel_key_match.group(1), - "auth_ts": auth_ts_match.group(1), - "auth_rnd": auth_rnd_match.group(1), - "auth_sig": auth_sig_match.group(1), - } - except Exception: - return {} - - def _extract_auth_url_base(self, html_content: str) -> Optional[str]: - """Extract auth URL base from player page script content.""" - try: - # Look for auth URL or domain in fetchWithRetry call or similar patterns - auth_url_match = re.search(r'fetchWithRetry\([\'"]([^\'"]*/auth\.php)', html_content) - - if auth_url_match: - auth_url = auth_url_match.group(1) - # Extract base URL up to the auth.php part - return auth_url.split("/auth.php")[0] - - # Try finding domain directly - domain_match = re.search(r'[\'"]https://([^/\'\"]+)(?:/[^\'\"]*)?/auth\.php', html_content) - - if domain_match: - return f"https://{domain_match.group(1)}" - - return None - except Exception: - return None - - def _get_origin(self, url: str) -> str: - """Extract origin from URL.""" - parsed = urlparse(url) - return f"{parsed.scheme}://{parsed.netloc}" - - def _derive_auth_url_base(self, player_domain: str) -> Optional[str]: - """Attempt to derive auth URL base from player domain.""" - try: - # Typical pattern is to use a subdomain for auth domain - parsed = urlparse(player_domain) - domain_parts = parsed.netloc.split(".") - - # Get the top-level domain and second-level domain - if len(domain_parts) >= 2: - base_domain = ".".join(domain_parts[-2:]) - # Try common subdomains for auth - for prefix in ["auth", "api", "cdn"]: - potential_auth_domain = f"https://{prefix}.{base_domain}" - return potential_auth_domain - - return None - except Exception: - return None +import re +import base64 +import logging +from typing import Any, Dict, Optional +from urllib.parse import urlparse, quote, urlunparse + +from mediaflow_proxy.extractors.base import BaseExtractor, ExtractorError + +logger = logging.getLogger(__name__) + + +class DLHDExtractor(BaseExtractor): + """DLHD (DaddyLive) URL extractor for M3U8 streams.""" + + def __init__(self, request_headers: dict): + super().__init__(request_headers) + # Default to HLS proxy endpoint + self.mediaflow_endpoint = "hls_manifest_proxy" + # Cache for the resolved base URL to avoid repeated network calls + self._cached_base_url = None + # Store iframe context for newkso.ru requests + self._iframe_context = None + + def _get_headers_for_url(self, url: str, base_headers: dict) -> dict: + """Get appropriate headers for the given URL, applying newkso.ru specific headers if needed.""" + headers = base_headers.copy() + + # Check if URL contains newkso.ru domain + parsed_url = urlparse(url) + if "newkso.ru" in parsed_url.netloc: + # Use iframe URL as referer if available, otherwise use the newkso domain itself + if self._iframe_context: + iframe_origin = f"https://{urlparse(self._iframe_context).netloc}" + newkso_headers = { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36', + 'Referer': self._iframe_context, + 'Origin': iframe_origin + } + logger.info(f"Applied newkso.ru specific headers with iframe context for URL: {url}") + logger.debug(f"Headers applied: {newkso_headers}") + else: + # Fallback to newkso domain itself + newkso_origin = f"{parsed_url.scheme}://{parsed_url.netloc}" + newkso_headers = { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36', + 'Referer': newkso_origin, + 'Origin': newkso_origin + } + logger.info(f"Applied newkso.ru specific headers (fallback) for URL: {url}") + logger.debug(f"Headers applied: {newkso_headers}") + + headers.update(newkso_headers) + + return headers + + async def _make_request(self, url: str, method: str = "GET", headers: dict = None, **kwargs): + """Override _make_request to apply newkso.ru specific headers when needed.""" + request_headers = headers or {} + + # Apply newkso.ru specific headers if the URL contains newkso.ru + final_headers = self._get_headers_for_url(url, request_headers) + + return await super()._make_request(url, method, final_headers, **kwargs) + + async def extract(self, url: str, **kwargs) -> Dict[str, Any]: + """Extract DLHD stream URL and required headers (logica tvproxy adattata async, con fallback su endpoint alternativi).""" + from urllib.parse import urlparse, quote_plus + + async def get_daddylive_base_url(): + if self._cached_base_url: + return self._cached_base_url + try: + resp = await self._make_request("https://daddylive.sx/") + # resp.url is the final URL after redirects + base_url = str(resp.url) + if not base_url.endswith('/'): + base_url += '/' + self._cached_base_url = base_url + return base_url + except Exception: + # Fallback to default if request fails + return "https://daddylive.sx/" + + def extract_channel_id(url): + match_premium = re.search(r'/premium(\d+)/mono\.m3u8$', url) + if match_premium: + return match_premium.group(1) + # Handle both normal and URL-encoded patterns + match_player = re.search(r'/(?:watch|stream|cast|player)/stream-(\d+)\.php', url) + if match_player: + return match_player.group(1) + # Handle URL-encoded patterns like %2Fstream%2Fstream-123.php or just stream-123.php + match_encoded = re.search(r'(?:%2F|/)stream-(\d+)\.php', url, re.IGNORECASE) + if match_encoded: + return match_encoded.group(1) + # Handle direct stream- pattern without path + match_direct = re.search(r'stream-(\d+)\.php', url) + if match_direct: + return match_direct.group(1) + return None + + async def try_endpoint(baseurl, endpoint, channel_id): + stream_url = f"{baseurl}{endpoint}stream-{channel_id}.php" + daddy_origin = urlparse(baseurl).scheme + "://" + urlparse(baseurl).netloc + daddylive_headers = { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36', + 'Referer': baseurl, + 'Origin': daddy_origin + } + # 1. Richiesta alla pagina stream/cast/player/watch + resp1 = await self._make_request(stream_url, headers=daddylive_headers) + # 2. Estrai link Player 2 + iframes = re.findall(r']*href="([^"]+)"[^>]*>\s*]*>\s*Player\s*2\s*', resp1.text) + if not iframes: + raise ExtractorError("No Player 2 link found") + url2 = iframes[0] + url2 = baseurl + url2 + url2 = url2.replace('//cast', '/cast') + daddylive_headers['Referer'] = url2 + daddylive_headers['Origin'] = url2 + # 3. Richiesta alla pagina Player 2 + resp2 = await self._make_request(url2, headers=daddylive_headers) + # 4. Estrai iframe + iframes2 = re.findall(r'iframe src="([^"]*)', resp2.text) + if not iframes2: + raise ExtractorError("No iframe found in Player 2 page") + iframe_url = iframes2[0] + # Store iframe context for newkso.ru requests + self._iframe_context = iframe_url + resp3 = await self._make_request(iframe_url, headers=daddylive_headers) + iframe_content = resp3.text + # 5. Estrai parametri auth (robusto) - Handle both old and new formats + def extract_var_old_format(js, name): + # Try multiple patterns for variable extraction (old format) + patterns = [ + rf'var (?:__)?{name}\s*=\s*atob\("([^"]+)"\)', + rf'var (?:__)?{name}\s*=\s*atob\(\'([^\']+)\'\)', + rf'(?:var\s+)?(?:__)?{name}\s*=\s*atob\s*\(\s*["\']([^"\']+)["\']\s*\)', + rf'(?:let|const)\s+(?:__)?{name}\s*=\s*atob\s*\(\s*["\']([^"\']+)["\']\s*\)' + ] + for pattern in patterns: + m = re.search(pattern, js) + if m: + try: + return base64.b64decode(m.group(1)).decode('utf-8') + except Exception as decode_error: + logger.warning(f"Failed to decode base64 for variable {name}: {decode_error}") + continue + return None + + def extract_bundle_format(js): + """Extract parameters from new BUNDLE format""" + try: + # Look for BUNDLE variable + bundle_patterns = [ + r'const\s+BUNDLE\s*=\s*["\']([^"\']+)["\']', + r'var\s+BUNDLE\s*=\s*["\']([^"\']+)["\']', + r'let\s+BUNDLE\s*=\s*["\']([^"\']+)["\']' + ] + + bundle_data = None + for pattern in bundle_patterns: + match = re.search(pattern, js) + if match: + bundle_data = match.group(1) + break + + if not bundle_data: + return None + + # Decode the bundle (base64 -> JSON -> decode each field) + import json + bundle_json = base64.b64decode(bundle_data).decode('utf-8') + bundle_obj = json.loads(bundle_json) + + # Decode each base64 field + decoded_bundle = {} + for key, value in bundle_obj.items(): + try: + decoded_bundle[key] = base64.b64decode(value).decode('utf-8') + except Exception as e: + logger.warning(f"Failed to decode bundle field {key}: {e}") + decoded_bundle[key] = value + + return decoded_bundle + + except Exception as e: + logger.warning(f"Failed to extract bundle format: {e}") + return None + + # Try multiple patterns for channel key extraction + channel_key = None + channel_key_patterns = [ + r'const\s+CHANNEL_KEY\s*=\s*["\']([^"\']+)["\']', + r'var\s+CHANNEL_KEY\s*=\s*["\']([^"\']+)["\']', + r'let\s+CHANNEL_KEY\s*=\s*["\']([^"\']+)["\']', + r'channelKey\s*=\s*["\']([^"\']+)["\']', + r'var\s+channelKey\s*=\s*["\']([^"\']+)["\']', + r'(?:let|const)\s+channelKey\s*=\s*["\']([^"\']+)["\']' + ] + for pattern in channel_key_patterns: + match = re.search(pattern, iframe_content) + if match: + channel_key = match.group(1) + break + + # Try new bundle format first + bundle_data = extract_bundle_format(iframe_content) + if bundle_data: + logger.info("Using new BUNDLE format for parameter extraction") + auth_host = bundle_data.get('b_host') + auth_php = bundle_data.get('b_script') + auth_ts = bundle_data.get('b_ts') + auth_rnd = bundle_data.get('b_rnd') + auth_sig = bundle_data.get('b_sig') + logger.debug(f"Bundle data extracted: {bundle_data}") + else: + logger.info("Falling back to old format for parameter extraction") + # Fall back to old format + auth_ts = extract_var_old_format(iframe_content, 'c') + auth_rnd = extract_var_old_format(iframe_content, 'd') + auth_sig = extract_var_old_format(iframe_content, 'e') + auth_host = extract_var_old_format(iframe_content, 'a') + auth_php = extract_var_old_format(iframe_content, 'b') + + # Log what we found for debugging + logger.debug(f"Extracted parameters: channel_key={channel_key}, auth_ts={auth_ts}, auth_rnd={auth_rnd}, auth_sig={auth_sig}, auth_host={auth_host}, auth_php={auth_php}") + + # Check which parameters are missing + missing_params = [] + if not channel_key: + missing_params.append('channel_key/CHANNEL_KEY') + if not auth_ts: + missing_params.append('auth_ts (var c / b_ts)') + if not auth_rnd: + missing_params.append('auth_rnd (var d / b_rnd)') + if not auth_sig: + missing_params.append('auth_sig (var e / b_sig)') + if not auth_host: + missing_params.append('auth_host (var a / b_host)') + if not auth_php: + missing_params.append('auth_php (var b / b_script)') + + if missing_params: + logger.error(f"Missing parameters: {', '.join(missing_params)}") + # Log a portion of the iframe content for debugging (first 2000 chars) + logger.debug(f"Iframe content sample: {iframe_content[:2000]}") + raise ExtractorError(f"Error extracting parameters: missing {', '.join(missing_params)}") + auth_sig = quote_plus(auth_sig) + # 6. Richiesta auth + auth_url = f'{auth_host}{auth_php}?channel_id={channel_key}&ts={auth_ts}&rnd={auth_rnd}&sig={auth_sig}' + auth_resp = await self._make_request(auth_url, headers=daddylive_headers) + # 7. Lookup server - Extract host parameter + host = None + host_patterns = [ + r'(?s)m3u8 =.*?:.*?:.*?".*?".*?"([^"]*)', # Original pattern + r'm3u8\s*=.*?"([^"]*)"', # Simplified m3u8 pattern + r'host["\']?\s*[:=]\s*["\']([^"\']*)', # host: or host= pattern + r'["\']([^"\']*\.newkso\.ru[^"\']*)', # Direct newkso.ru pattern + r'["\']([^"\']*\/premium\d+[^"\']*)', # premium path pattern + r'url.*?["\']([^"\']*newkso[^"\']*)', # URL with newkso + ] + + for pattern in host_patterns: + matches = re.findall(pattern, iframe_content) + if matches: + host = matches[0] + logger.debug(f"Found host with pattern '{pattern}': {host}") + break + + if not host: + logger.error("Failed to extract host from iframe content") + logger.debug(f"Iframe content for host extraction: {iframe_content[:2000]}") + # Try to find any newkso.ru related URLs + potential_hosts = re.findall(r'["\']([^"\']*newkso[^"\']*)', iframe_content) + if potential_hosts: + logger.debug(f"Potential host URLs found: {potential_hosts}") + raise ExtractorError("Failed to extract host parameter") + + # Extract server lookup URL from fetchWithRetry call (dynamic extraction) + server_lookup = None + + # Look for the server_lookup.php pattern in JavaScript + if "fetchWithRetry('/server_lookup.php?channel_id='" in iframe_content: + server_lookup = '/server_lookup.php?channel_id=' + logger.debug('Found server lookup URL: /server_lookup.php?channel_id=') + elif '/server_lookup.php' in iframe_content: + # Try to extract the full path + js_lines = iframe_content.split('\n') + for js_line in js_lines: + if 'server_lookup.php' in js_line and 'fetchWithRetry' in js_line: + # Extract the URL from the fetchWithRetry call + start = js_line.find("'") + if start != -1: + end = js_line.find("'", start + 1) + if end != -1: + potential_url = js_line[start+1:end] + if 'server_lookup' in potential_url: + server_lookup = potential_url + logger.debug(f'Extracted server lookup URL: {server_lookup}') + break + + if not server_lookup: + logger.error('Failed to extract server lookup URL from iframe content') + logger.debug(f'Iframe content sample: {iframe_content[:2000]}') + raise ExtractorError('Failed to extract server lookup URL') + + server_lookup_url = f"https://{urlparse(iframe_url).netloc}{server_lookup}{channel_key}" + logger.debug(f"Server lookup URL: {server_lookup_url}") + + try: + lookup_resp = await self._make_request(server_lookup_url, headers=daddylive_headers) + server_data = lookup_resp.json() + server_key = server_data.get('server_key') + if not server_key: + logger.error(f"No server_key in response: {server_data}") + raise ExtractorError("Failed to get server key from lookup response") + + logger.info(f"Server lookup successful - Server key: {server_key}") + except Exception as lookup_error: + logger.error(f"Server lookup request failed: {lookup_error}") + raise ExtractorError(f"Server lookup failed: {str(lookup_error)}") + + referer_raw = f'https://{urlparse(iframe_url).netloc}' + + # Extract URL construction logic dynamically from JavaScript + # Simple approach: look for newkso.ru URLs and construct based on server_key + + # Check if we have the special case server_key + if server_key == 'top1/cdn': + clean_m3u8_url = f'https://top1.newkso.ru/top1/cdn/{channel_key}/mono.m3u8' + logger.info(f'Using special case URL for server_key \'top1/cdn\': {clean_m3u8_url}') + else: + clean_m3u8_url = f'https://{server_key}new.newkso.ru/{server_key}/{channel_key}/mono.m3u8' + logger.info(f'Using general case URL for server_key \'{server_key}\': {clean_m3u8_url}') + + logger.info(f'Generated stream URL: {clean_m3u8_url}') + logger.debug(f'Server key: {server_key}, Channel key: {channel_key}') + + # Check if the final stream URL is on newkso.ru domain + if "newkso.ru" in clean_m3u8_url: + # For newkso.ru streams, use iframe URL as referer + stream_headers = { + 'User-Agent': daddylive_headers['User-Agent'], + 'Referer': iframe_url, + 'Origin': referer_raw + } + logger.info(f"Applied iframe-specific headers for newkso.ru stream URL: {clean_m3u8_url}") + logger.debug(f"Stream headers for newkso.ru: {stream_headers}") + else: + # For other domains, use the original logic + stream_headers = { + 'User-Agent': daddylive_headers['User-Agent'], + 'Referer': referer_raw, + 'Origin': referer_raw + } + return { + "destination_url": clean_m3u8_url, + "request_headers": stream_headers, + "mediaflow_endpoint": self.mediaflow_endpoint, + } + + try: + clean_url = url + channel_id = extract_channel_id(clean_url) + if not channel_id: + raise ExtractorError(f"Unable to extract channel ID from {clean_url}") + + baseurl = await get_daddylive_base_url() + endpoints = ["stream/", "cast/", "player/", "watch/"] + last_exc = None + for endpoint in endpoints: + try: + return await try_endpoint(baseurl, endpoint, channel_id) + except Exception as exc: + last_exc = exc + continue + raise ExtractorError(f"Extraction failed: {str(last_exc)}") + except Exception as e: + raise ExtractorError(f"Extraction failed: {str(e)}") + + async def _lookup_server( + self, lookup_url_base: str, auth_url_base: str, auth_data: Dict[str, str], headers: Dict[str, str] + ) -> str: + """Lookup server information and generate stream URL.""" + try: + # Construct server lookup URL + server_lookup_url = f"{lookup_url_base}/server_lookup.php?channel_id={quote(auth_data['channel_key'])}" + + # Make server lookup request + server_response = await self._make_request(server_lookup_url, headers=headers) + + server_data = server_response.json() + server_key = server_data.get("server_key") + + if not server_key: + raise ExtractorError("Failed to get server key") + + # Extract domain parts from auth URL for constructing stream URL + auth_domain_parts = urlparse(auth_url_base).netloc.split(".") + domain_suffix = ".".join(auth_domain_parts[1:]) if len(auth_domain_parts) > 1 else auth_domain_parts[0] + + # Generate the m3u8 URL based on server response pattern + if "/" in server_key: + # Handle special case like "top1/cdn" + parts = server_key.split("/") + return f"https://{parts[0]}.{domain_suffix}/{server_key}/{auth_data['channel_key']}/mono.m3u8" + else: + # Handle normal case + return f"https://{server_key}new.{domain_suffix}/{server_key}/{auth_data['channel_key']}/mono.m3u8" + + except Exception as e: + raise ExtractorError(f"Server lookup failed: {str(e)}") + + def _extract_auth_data(self, html_content: str) -> Dict[str, str]: + """Extract authentication data from player page.""" + try: + channel_key_match = re.search(r'var\s+channelKey\s*=\s*["\']([^"\']+)["\']', html_content) + if not channel_key_match: + return {} + channel_key = channel_key_match.group(1) + + # New pattern with atob + auth_ts_match = re.search(r'var\s+__c\s*=\s*atob\([\'"]([^\'"]+)[\'"]\)', html_content) + auth_rnd_match = re.search(r'var\s+__d\s*=\s*atob\([\'"]([^\'"]+)[\'"]\)', html_content) + auth_sig_match = re.search(r'var\s+__e\s*=\s*atob\([\'"]([^\'"]+)[\'"]\)', html_content) + + if auth_ts_match and auth_rnd_match and auth_sig_match: + return { + "channel_key": channel_key, + "auth_ts": base64.b64decode(auth_ts_match.group(1)).decode("utf-8"), + "auth_rnd": base64.b64decode(auth_rnd_match.group(1)).decode("utf-8"), + "auth_sig": base64.b64decode(auth_sig_match.group(1)).decode("utf-8"), + } + + # Original pattern + auth_ts_match = re.search(r'var\s+authTs\s*=\s*["\']([^"\']+)["\']', html_content) + auth_rnd_match = re.search(r'var\s+authRnd\s*=\s*["\']([^"\']+)["\']', html_content) + auth_sig_match = re.search(r'var\s+authSig\s*=\s*["\']([^"\']+)["\']', html_content) + + if auth_ts_match and auth_rnd_match and auth_sig_match: + return { + "channel_key": channel_key, + "auth_ts": auth_ts_match.group(1), + "auth_rnd": auth_rnd_match.group(1), + "auth_sig": auth_sig_match.group(1), + } + return {} + except Exception: + return {} + + def _extract_auth_url_base(self, html_content: str) -> Optional[str]: + """Extract auth URL base from player page script content.""" + try: + # New atob pattern for auth base URL + auth_url_base_match = re.search(r'var\s+__a\s*=\s*atob\([\'"]([^\'"]+)[\'"]\)', html_content) + if auth_url_base_match: + decoded_url = base64.b64decode(auth_url_base_match.group(1)).decode("utf-8") + return decoded_url.strip().rstrip("/") + + # Look for auth URL or domain in fetchWithRetry call or similar patterns + auth_url_match = re.search(r'fetchWithRetry\([\'"]([^\'"]*/auth\.php)', html_content) + + if auth_url_match: + auth_url = auth_url_match.group(1) + # Extract base URL up to the auth.php part + return auth_url.split("/auth.php")[0] + + # Try finding domain directly + domain_match = re.search(r'[\'"]https://([^/\'\"]+)(?:/[^\'\"]*)?/auth\.php', html_content) + + if domain_match: + return f"https://{domain_match.group(1)}" + + return None + except Exception: + return None + + def _get_origin(self, url: str) -> str: + """Extract origin from URL.""" + parsed = urlparse(url) + return f"{parsed.scheme}://{parsed.netloc}" + + def _derive_auth_url_base(self, player_domain: str) -> Optional[str]: + """Attempt to derive auth URL base from player domain.""" + try: + # Typical pattern is to use a subdomain for auth domain + parsed = urlparse(player_domain) + domain_parts = parsed.netloc.split(".") + + # Get the top-level domain and second-level domain + if len(domain_parts) >= 2: + base_domain = ".".join(domain_parts[-2:]) + # Try common subdomains for auth + for prefix in ["auth", "api", "cdn"]: + potential_auth_domain = f"https://{prefix}.{base_domain}" + return potential_auth_domain + + return None + except Exception: + return None diff --git a/mediaflow_proxy/extractors/factory.py b/mediaflow_proxy/extractors/factory.py index b6213da..233f4e2 100644 --- a/mediaflow_proxy/extractors/factory.py +++ b/mediaflow_proxy/extractors/factory.py @@ -10,6 +10,7 @@ from mediaflow_proxy.extractors.okru import OkruExtractor from mediaflow_proxy.extractors.streamtape import StreamtapeExtractor from mediaflow_proxy.extractors.supervideo import SupervideoExtractor from mediaflow_proxy.extractors.uqload import UqloadExtractor +from mediaflow_proxy.extractors.vavoo import VavooExtractor from mediaflow_proxy.extractors.vixcloud import VixCloudExtractor from mediaflow_proxy.extractors.fastream import FastreamExtractor @@ -27,6 +28,7 @@ class ExtractorFactory: "Maxstream": MaxstreamExtractor, "LiveTV": LiveTVExtractor, "DLHD": DLHDExtractor, + "Vavoo": VavooExtractor, "Fastream": FastreamExtractor } diff --git a/mediaflow_proxy/extractors/fastream.py b/mediaflow_proxy/extractors/fastream.py index 2f8d664..532137e 100644 --- a/mediaflow_proxy/extractors/fastream.py +++ b/mediaflow_proxy/extractors/fastream.py @@ -13,10 +13,10 @@ class FastreamExtractor(BaseExtractor): self.mediaflow_endpoint = "hls_manifest_proxy" async def extract(self, url: str, **kwargs) -> Dict[str, Any]: - #Init headers needed for the request. headers = {'Accept': '*/*', 'Connection': 'keep-alive','Accept-Language': 'en-US,en;q=0.5','Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:138.0) Gecko/20100101 Firefox/138.0'} - """Extract Fastream URL.""" - final_url = await eval_solver(self,url,headers, r'file:"(.*?)"') + pattern = r'file:"(.*?)"' + + final_url = await eval_solver(self, url, headers, pattern) self.base_headers["referer"] = f'https://{url.replace("https://","").split("/")[0]}/' self.base_headers["origin"] = f'https://{url.replace("https://","").split("/")[0]}' diff --git a/mediaflow_proxy/extractors/mixdrop.py b/mediaflow_proxy/extractors/mixdrop.py index 21a1790..5b3df08 100644 --- a/mediaflow_proxy/extractors/mixdrop.py +++ b/mediaflow_proxy/extractors/mixdrop.py @@ -13,8 +13,9 @@ class MixdropExtractor(BaseExtractor): url = url.replace("club", "ps").split("/2")[0] headers = {"accept-language": "en-US,en;q=0.5"} + pattern = r'MDCore.wurl ?= ?"(.*?)"' - final_url = f"https:{await eval_solver(self, url, headers, r'MDCore.wurl ?= ?"(.*?)"')}" + final_url = f"https:{await eval_solver(self, url, headers, pattern)}" self.base_headers["referer"] = url return { diff --git a/mediaflow_proxy/extractors/supervideo.py b/mediaflow_proxy/extractors/supervideo.py index adb3c42..3161331 100644 --- a/mediaflow_proxy/extractors/supervideo.py +++ b/mediaflow_proxy/extractors/supervideo.py @@ -14,12 +14,10 @@ class SupervideoExtractor(BaseExtractor): self.mediaflow_endpoint = "hls_manifest_proxy" async def extract(self, url: str, **kwargs) -> Dict[str, Any]: - #Init headers needed for the request. headers = {'Accept': '*/*', 'Connection': 'keep-alive', 'User-Agent': 'Mozilla/5.0 (Linux; Android 12) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.71 Mobile Safari/537.36', 'user-agent': 'Mozilla/5.0 (Linux; Android 12) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.71 Mobile Safari/537.36'} + pattern = r'file:"(.*?)"' - - """Extract Supervideo URL.""" - final_url = await eval_solver(self,url,headers, r'file:"(.*?)"') + final_url = await eval_solver(self, url, headers, pattern) self.base_headers["referer"] = url return { diff --git a/mediaflow_proxy/extractors/vavoo.py b/mediaflow_proxy/extractors/vavoo.py new file mode 100644 index 0000000..366c54a --- /dev/null +++ b/mediaflow_proxy/extractors/vavoo.py @@ -0,0 +1,169 @@ +import logging +from typing import Any, Dict, Optional +from mediaflow_proxy.extractors.base import BaseExtractor, ExtractorError + +logger = logging.getLogger(__name__) + +class VavooExtractor(BaseExtractor): + """Vavoo URL extractor for resolving vavoo.to links (solo httpx, async).""" + + def __init__(self, request_headers: dict): + super().__init__(request_headers) + self.mediaflow_endpoint = "proxy_stream_endpoint" + + async def get_auth_signature(self) -> Optional[str]: + """Get authentication signature for Vavoo API (async, httpx, pulito).""" + headers = { + "user-agent": "okhttp/4.11.0", + "accept": "application/json", + "content-type": "application/json; charset=utf-8", + "accept-encoding": "gzip" + } + import time + current_time = int(time.time() * 1000) + + data = { + "token": "tosFwQCJMS8qrW_AjLoHPQ41646J5dRNha6ZWHnijoYQQQoADQoXYSo7ki7O5-CsgN4CH0uRk6EEoJ0728ar9scCRQW3ZkbfrPfeCXW2VgopSW2FWDqPOoVYIuVPAOnXCZ5g", + "reason": "app-blur", + "locale": "de", + "theme": "dark", + "metadata": { + "device": { + "type": "Handset", + "brand": "google", + "model": "Pixel", + "name": "sdk_gphone64_arm64", + "uniqueId": "d10e5d99ab665233" + }, + "os": { + "name": "android", + "version": "13", + "abis": ["arm64-v8a", "armeabi-v7a", "armeabi"], + "host": "android" + }, + "app": { + "platform": "android", + "version": "3.1.21", + "buildId": "289515000", + "engine": "hbc85", + "signatures": ["6e8a975e3cbf07d5de823a760d4c2547f86c1403105020adee5de67ac510999e"], + "installer": "app.revanced.manager.flutter" + }, + "version": { + "package": "tv.vavoo.app", + "binary": "3.1.21", + "js": "3.1.21" + } + }, + "appFocusTime": 0, + "playerActive": False, + "playDuration": 0, + "devMode": False, + "hasAddon": True, + "castConnected": False, + "package": "tv.vavoo.app", + "version": "3.1.21", + "process": "app", + "firstAppStart": current_time, + "lastAppStart": current_time, + "ipLocation": "", + "adblockEnabled": True, + "proxy": { + "supported": ["ss", "openvpn"], + "engine": "ss", + "ssVersion": 1, + "enabled": True, + "autoServer": True, + "id": "de-fra" + }, + "iap": { + "supported": False + } + } + + try: + resp = await self._make_request( + "https://www.vavoo.tv/api/app/ping", + method="POST", + json=data, + headers=headers + ) + result = resp.json() + addon_sig = result.get("addonSig") + if addon_sig: + logger.info("Successfully obtained Vavoo authentication signature") + return addon_sig + else: + logger.warning("No addonSig in Vavoo API response") + return None + except Exception as e: + logger.exception(f"Failed to get Vavoo authentication signature: {str(e)}") + return None + + async def extract(self, url: str, **kwargs) -> Dict[str, Any]: + """Extract Vavoo stream URL (async, httpx).""" + if "vavoo.to" not in url: + raise ExtractorError("Not a valid Vavoo URL") + + # Get authentication signature + signature = await self.get_auth_signature() + if not signature: + raise ExtractorError("Failed to get Vavoo authentication signature") + + # Resolve the URL + resolved_url = await self._resolve_vavoo_link(url, signature) + if not resolved_url: + raise ExtractorError("Failed to resolve Vavoo URL") + + # Set up headers for the resolved stream + stream_headers = { + "user-agent": self.base_headers["user-agent"], + "referer": "https://vavoo.to/", + } + + return { + "destination_url": resolved_url, + "request_headers": stream_headers, + "mediaflow_endpoint": self.mediaflow_endpoint, + } + + async def _resolve_vavoo_link(self, link: str, signature: str) -> Optional[str]: + """Resolve a Vavoo link using the MediaHubMX API (async, httpx).""" + headers = { + "user-agent": "MediaHubMX/2", + "accept": "application/json", + "content-type": "application/json; charset=utf-8", + "accept-encoding": "gzip", + "mediahubmx-signature": signature + } + data = { + "language": "de", + "region": "AT", + "url": link, + "clientVersion": "3.1.21" + } + try: + logger.info(f"Attempting to resolve Vavoo URL: {link}") + resp = await self._make_request( + "https://vavoo.to/mediahubmx-resolve.json", + method="POST", + json=data, + headers=headers + ) + result = resp.json() + logger.info(f"Vavoo API response: {result}") + + if isinstance(result, list) and result and result[0].get("url"): + resolved_url = result[0]["url"] + logger.info(f"Successfully resolved Vavoo URL to: {resolved_url}") + return resolved_url + elif isinstance(result, dict) and result.get("url"): + resolved_url = result["url"] + logger.info(f"Successfully resolved Vavoo URL to: {resolved_url}") + return resolved_url + else: + logger.warning(f"No URL found in Vavoo API response: {result}") + return None + except Exception as e: + logger.exception(f"Vavoo resolution failed for URL {link}: {str(e)}") + raise ExtractorError(f"Vavoo resolution failed: {str(e)}") from e diff --git a/mediaflow_proxy/handlers.py b/mediaflow_proxy/handlers.py index 193e1c3..3afbd79 100644 --- a/mediaflow_proxy/handlers.py +++ b/mediaflow_proxy/handlers.py @@ -85,6 +85,20 @@ async def handle_hls_stream_proxy( proxy_headers.request.update({"range": content_range}) try: + # Auto-detect and resolve Vavoo links + if "vavoo.to" in hls_params.destination: + try: + from mediaflow_proxy.extractors.vavoo import VavooExtractor + vavoo_extractor = VavooExtractor(proxy_headers.request) + resolved_data = await vavoo_extractor.extract(hls_params.destination) + resolved_url = resolved_data["destination_url"] + logger.info(f"Auto-resolved Vavoo URL: {hls_params.destination} -> {resolved_url}") + # Update destination with resolved URL + hls_params.destination = resolved_url + except Exception as e: + logger.warning(f"Failed to auto-resolve Vavoo URL: {e}") + # Continue with original URL if resolution fails + # If force_playlist_proxy is enabled, skip detection and directly process as m3u8 if hls_params.force_playlist_proxy: return await fetch_and_process_m3u8( @@ -141,6 +155,20 @@ async def handle_stream_request( client, streamer = await setup_client_and_streamer() try: + # Auto-detect and resolve Vavoo links + if "vavoo.to" in video_url: + try: + from mediaflow_proxy.extractors.vavoo import VavooExtractor + vavoo_extractor = VavooExtractor(proxy_headers.request) + resolved_data = await vavoo_extractor.extract(video_url) + resolved_url = resolved_data["destination_url"] + logger.info(f"Auto-resolved Vavoo URL: {video_url} -> {resolved_url}") + # Update video_url with resolved URL + video_url = resolved_url + except Exception as e: + logger.warning(f"Failed to auto-resolve Vavoo URL: {e}") + # Continue with original URL if resolution fails + await streamer.create_streaming_response(video_url, proxy_headers.request) response_headers = prepare_response_headers(streamer.response.headers, proxy_headers.response) diff --git a/mediaflow_proxy/main.py b/mediaflow_proxy/main.py index 8156601..f52dff2 100644 --- a/mediaflow_proxy/main.py +++ b/mediaflow_proxy/main.py @@ -10,10 +10,11 @@ from starlette.staticfiles import StaticFiles from mediaflow_proxy.configs import settings from mediaflow_proxy.middleware import UIAccessControlMiddleware -from mediaflow_proxy.routes import proxy_router, extractor_router, speedtest_router +from mediaflow_proxy.routes import proxy_router, extractor_router, speedtest_router, playlist_builder_router from mediaflow_proxy.schemas import GenerateUrlRequest, GenerateMultiUrlRequest, MultiUrlRequestItem from mediaflow_proxy.utils.crypto_utils import EncryptionHandler, EncryptionMiddleware from mediaflow_proxy.utils.http_utils import encode_mediaflow_proxy_url +from mediaflow_proxy.utils.base64_utils import encode_url_to_base64, decode_base64_url, is_base64_url logging.basicConfig(level=settings.log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") app = FastAPI() @@ -99,10 +100,15 @@ async def generate_url(request: GenerateUrlRequest): # Convert IP to string if provided ip_str = str(request.ip) if request.ip else None + # Handle base64 encoding of destination URL if requested + destination_url = request.destination_url + if request.base64_encode_destination and destination_url: + destination_url = encode_url_to_base64(destination_url) + encoded_url = encode_mediaflow_proxy_url( mediaflow_proxy_url=request.mediaflow_proxy_url, endpoint=request.endpoint, - destination_url=request.destination_url, + destination_url=destination_url, query_params=query_params, request_headers=request.request_headers, response_headers=request.response_headers, @@ -156,9 +162,83 @@ async def generate_urls(request: GenerateMultiUrlRequest): return {"urls": encoded_urls} +@app.post( + "/base64/encode", + description="Encode a URL to base64 format", + response_description="Returns the base64 encoded URL", + tags=["base64"], +) +async def encode_url_base64(url: str): + """ + Encode a URL to base64 format. + + Args: + url (str): The URL to encode. + + Returns: + dict: A dictionary containing the encoded URL. + """ + try: + encoded_url = encode_url_to_base64(url) + return {"encoded_url": encoded_url, "original_url": url} + except Exception as e: + raise HTTPException(status_code=400, detail=f"Failed to encode URL: {str(e)}") + + +@app.post( + "/base64/decode", + description="Decode a base64 encoded URL", + response_description="Returns the decoded URL", + tags=["base64"], +) +async def decode_url_base64(encoded_url: str): + """ + Decode a base64 encoded URL. + + Args: + encoded_url (str): The base64 encoded URL to decode. + + Returns: + dict: A dictionary containing the decoded URL. + """ + decoded_url = decode_base64_url(encoded_url) + if decoded_url is None: + raise HTTPException(status_code=400, detail="Invalid base64 encoded URL") + + return {"decoded_url": decoded_url, "encoded_url": encoded_url} + + +@app.get( + "/base64/check", + description="Check if a string appears to be a base64 encoded URL", + response_description="Returns whether the string is likely base64 encoded", + tags=["base64"], +) +async def check_base64_url(url: str): + """ + Check if a string appears to be a base64 encoded URL. + + Args: + url (str): The string to check. + + Returns: + dict: A dictionary indicating if the string is likely base64 encoded. + """ + is_base64 = is_base64_url(url) + result = {"url": url, "is_base64": is_base64} + + if is_base64: + decoded_url = decode_base64_url(url) + if decoded_url: + result["decoded_url"] = decoded_url + + return result + + app.include_router(proxy_router, prefix="/proxy", tags=["proxy"], dependencies=[Depends(verify_api_key)]) app.include_router(extractor_router, prefix="/extractor", tags=["extractors"], dependencies=[Depends(verify_api_key)]) app.include_router(speedtest_router, prefix="/speedtest", tags=["speedtest"], dependencies=[Depends(verify_api_key)]) +app.include_router(playlist_builder_router, prefix="/playlist", tags=["playlist"]) static_path = resources.files("mediaflow_proxy").joinpath("static") app.mount("/", StaticFiles(directory=str(static_path), html=True), name="static") diff --git a/mediaflow_proxy/mpd_processor.py b/mediaflow_proxy/mpd_processor.py index 5540698..a6748dd 100644 --- a/mediaflow_proxy/mpd_processor.py +++ b/mediaflow_proxy/mpd_processor.py @@ -1,3 +1,4 @@ +import asyncio import logging import math import time @@ -7,6 +8,8 @@ from fastapi import Request, Response, HTTPException from mediaflow_proxy.drm.decrypter import decrypt_segment from mediaflow_proxy.utils.crypto_utils import encryption_handler from mediaflow_proxy.utils.http_utils import encode_mediaflow_proxy_url, get_original_scheme, ProxyRequestHeaders +from mediaflow_proxy.utils.dash_prebuffer import dash_prebuffer +from mediaflow_proxy.configs import settings logger = logging.getLogger(__name__) @@ -28,6 +31,23 @@ async def process_manifest( Response: The HLS manifest as an HTTP response. """ hls_content = build_hls(mpd_dict, request, key_id, key) + + # Start DASH pre-buffering in background if enabled + if settings.enable_dash_prebuffer: + # Extract headers for pre-buffering + headers = {} + for key, value in request.query_params.items(): + if key.startswith("h_"): + headers[key[2:]] = value + + # Get the original MPD URL from the request + mpd_url = request.query_params.get("d", "") + if mpd_url: + # Start pre-buffering in background + asyncio.create_task( + dash_prebuffer.prebuffer_dash_manifest(mpd_url, headers) + ) + return Response(content=hls_content, media_type="application/vnd.apple.mpegurl", headers=proxy_headers.response) diff --git a/mediaflow_proxy/routes/__init__.py b/mediaflow_proxy/routes/__init__.py index 4a8a05c..047ad6f 100644 --- a/mediaflow_proxy/routes/__init__.py +++ b/mediaflow_proxy/routes/__init__.py @@ -1,5 +1,6 @@ from .proxy import proxy_router from .extractor import extractor_router from .speedtest import speedtest_router +from .playlist_builder import playlist_builder_router -__all__ = ["proxy_router", "extractor_router", "speedtest_router"] +__all__ = ["proxy_router", "extractor_router", "speedtest_router", "playlist_builder_router"] diff --git a/mediaflow_proxy/routes/extractor.py b/mediaflow_proxy/routes/extractor.py index 09981f6..8de5c99 100644 --- a/mediaflow_proxy/routes/extractor.py +++ b/mediaflow_proxy/routes/extractor.py @@ -14,6 +14,7 @@ from mediaflow_proxy.utils.http_utils import ( ProxyRequestHeaders, get_proxy_headers, ) +from mediaflow_proxy.utils.base64_utils import process_potential_base64_url extractor_router = APIRouter() logger = logging.getLogger(__name__) @@ -28,6 +29,10 @@ async def extract_url( ): """Extract clean links from various video hosting services.""" try: + # Process potential base64 encoded destination URL + processed_destination = process_potential_base64_url(extractor_params.destination) + extractor_params.destination = processed_destination + cache_key = f"{extractor_params.host}_{extractor_params.model_dump_json()}" response = await get_cached_extractor_result(cache_key) if not response: diff --git a/mediaflow_proxy/routes/playlist_builder.py b/mediaflow_proxy/routes/playlist_builder.py new file mode 100644 index 0000000..22f1e01 --- /dev/null +++ b/mediaflow_proxy/routes/playlist_builder.py @@ -0,0 +1,270 @@ +import json +import logging +import urllib.parse +from typing import Iterator, Dict, Optional +from fastapi import APIRouter, Request, HTTPException, Query +from fastapi.responses import StreamingResponse +from starlette.responses import RedirectResponse +import httpx +from mediaflow_proxy.configs import settings +from mediaflow_proxy.utils.http_utils import get_original_scheme +import asyncio + +logger = logging.getLogger(__name__) +playlist_builder_router = APIRouter() + + +def rewrite_m3u_links_streaming(m3u_lines_iterator: Iterator[str], base_url: str, api_password: Optional[str]) -> Iterator[str]: + """ + Riscrive i link da un iteratore di linee M3U secondo le regole specificate, + includendo gli headers da #EXTVLCOPT e #EXTHTTP. Yields rewritten lines. + """ + current_ext_headers: Dict[str, str] = {} # Dizionario per conservare gli headers dalle direttive + + for line_with_newline in m3u_lines_iterator: + line_content = line_with_newline.rstrip('\n') + logical_line = line_content.strip() + + is_header_tag = False + if logical_line.startswith('#EXTVLCOPT:'): + is_header_tag = True + try: + option_str = logical_line.split(':', 1)[1] + if '=' in option_str: + key_vlc, value_vlc = option_str.split('=', 1) + key_vlc = key_vlc.strip() + value_vlc = value_vlc.strip() + + # Gestione speciale per http-header che contiene "Key: Value" + if key_vlc == 'http-header' and ':' in value_vlc: + header_key, header_value = value_vlc.split(':', 1) + header_key = header_key.strip() + header_value = header_value.strip() + current_ext_headers[header_key] = header_value + elif key_vlc.startswith('http-'): + # Gestisce http-user-agent, http-referer etc. + header_key = '-'.join(word.capitalize() for word in key_vlc[len('http-'):].split('-')) + current_ext_headers[header_key] = value_vlc + except Exception as e: + logger.error(f"⚠️ Error parsing #EXTVLCOPT '{logical_line}': {e}") + + elif logical_line.startswith('#EXTHTTP:'): + is_header_tag = True + try: + json_str = logical_line.split(':', 1)[1] + # Sostituisce tutti gli header correnti con quelli del JSON + current_ext_headers = json.loads(json_str) + except Exception as e: + logger.error(f"⚠️ Error parsing #EXTHTTP '{logical_line}': {e}") + current_ext_headers = {} # Resetta in caso di errore + + if is_header_tag: + yield line_with_newline + continue + + if logical_line and not logical_line.startswith('#') and \ + ('http://' in logical_line or 'https://' in logical_line): + + processed_url_content = logical_line + + # Non modificare link pluto.tv + if 'pluto.tv' in logical_line: + processed_url_content = logical_line + elif 'vavoo.to' in logical_line: + encoded_url = urllib.parse.quote(logical_line, safe='') + processed_url_content = f"{base_url}/proxy/hls/manifest.m3u8?d={encoded_url}" + elif 'vixsrc.to' in logical_line: + encoded_url = urllib.parse.quote(logical_line, safe='') + processed_url_content = f"{base_url}/extractor/video?host=VixCloud&redirect_stream=true&d={encoded_url}" + elif '.m3u8' in logical_line: + encoded_url = urllib.parse.quote(logical_line, safe='') + processed_url_content = f"{base_url}/proxy/hls/manifest.m3u8?d={encoded_url}" + elif '.mpd' in logical_line: + # Estrai parametri DRM dall'URL MPD se presenti + from urllib.parse import urlparse, parse_qs, urlencode, urlunparse + + # Parse dell'URL per estrarre parametri + parsed_url = urlparse(logical_line) + query_params = parse_qs(parsed_url.query) + + # Estrai key_id e key se presenti + key_id = query_params.get('key_id', [None])[0] + key = query_params.get('key', [None])[0] + + # Rimuovi key_id e key dai parametri originali + clean_params = {k: v for k, v in query_params.items() if k not in ['key_id', 'key']} + + # Ricostruisci l'URL senza i parametri DRM + clean_query = urlencode(clean_params, doseq=True) if clean_params else '' + clean_url = urlunparse(( + parsed_url.scheme, + parsed_url.netloc, + parsed_url.path, + parsed_url.params, + clean_query, + parsed_url.fragment + )) + + # Encode the MPD URL like other URL types + clean_url_for_param = urllib.parse.quote(clean_url, safe='') + + # Costruisci l'URL MediaFlow con parametri DRM separati + processed_url_content = f"{base_url}/proxy/mpd/manifest.m3u8?d={clean_url_for_param}" + + # Aggiungi parametri DRM se presenti + if key_id: + processed_url_content += f"&key_id={key_id}" + if key: + processed_url_content += f"&key={key}" + elif '.php' in logical_line: + encoded_url = urllib.parse.quote(logical_line, safe='') + processed_url_content = f"{base_url}/proxy/hls/manifest.m3u8?d={encoded_url}" + else: + # Per tutti gli altri link senza estensioni specifiche, trattali come .m3u8 con codifica + encoded_url = urllib.parse.quote(logical_line, safe='') + processed_url_content = f"{base_url}/proxy/hls/manifest.m3u8?d={encoded_url}" + + # Applica gli header raccolti prima di api_password + if current_ext_headers: + header_params_str = "".join([f"&h_{urllib.parse.quote(key)}={urllib.parse.quote(value)}" for key, value in current_ext_headers.items()]) + processed_url_content += header_params_str + current_ext_headers = {} + + # Aggiungi api_password sempre alla fine + if api_password: + processed_url_content += f"&api_password={api_password}" + + yield processed_url_content + '\n' + else: + yield line_with_newline + + +async def async_download_m3u_playlist(url: str) -> list[str]: + """Scarica una playlist M3U in modo asincrono e restituisce le righe.""" + headers = { + 'User-Agent': settings.user_agent, + 'Accept': '*/*', + 'Accept-Language': 'en-US,en;q=0.9', + 'Accept-Encoding': 'gzip, deflate', + 'Connection': 'keep-alive' + } + lines = [] + try: + async with httpx.AsyncClient(verify=True, timeout=30) as client: + async with client.stream('GET', url, headers=headers) as response: + response.raise_for_status() + async for line_bytes in response.aiter_lines(): + if isinstance(line_bytes, bytes): + decoded_line = line_bytes.decode('utf-8', errors='replace') + else: + decoded_line = str(line_bytes) + lines.append(decoded_line + '\n' if decoded_line else '') + except Exception as e: + logger.error(f"Error downloading playlist (async): {str(e)}") + raise + return lines + +async def async_generate_combined_playlist(playlist_definitions: list[str], base_url: str, api_password: Optional[str]): + """Genera una playlist combinata da multiple definizioni, scaricando in parallelo.""" + # Prepara gli URL + playlist_urls = [] + for definition in playlist_definitions: + if '&' in definition: + parts = definition.split('&', 1) + playlist_url_str = parts[1] if len(parts) > 1 else parts[0] + else: + playlist_url_str = definition + playlist_urls.append(playlist_url_str) + + # Scarica tutte le playlist in parallelo + results = await asyncio.gather(*[async_download_m3u_playlist(url) for url in playlist_urls], return_exceptions=True) + + first_playlist_header_handled = False + for idx, lines in enumerate(results): + if isinstance(lines, Exception): + yield f"# ERROR processing playlist {playlist_urls[idx]}: {str(lines)}\n" + continue + playlist_lines: list[str] = lines # type: ignore + current_playlist_had_lines = False + first_line_of_this_segment = True + lines_processed_for_current_playlist = 0 + rewritten_lines_iter = rewrite_m3u_links_streaming(iter(playlist_lines), base_url, api_password) + for line in rewritten_lines_iter: + current_playlist_had_lines = True + is_extm3u_line = line.strip().startswith('#EXTM3U') + lines_processed_for_current_playlist += 1 + if not first_playlist_header_handled: + yield line + if is_extm3u_line: + first_playlist_header_handled = True + else: + if first_line_of_this_segment and is_extm3u_line: + pass + else: + yield line + first_line_of_this_segment = False + if current_playlist_had_lines and not first_playlist_header_handled: + first_playlist_header_handled = True + + +@playlist_builder_router.get("/playlist") +async def proxy_handler( + request: Request, + d: str = Query(..., description="Query string con le definizioni delle playlist", alias="d"), + api_password: Optional[str] = Query(None, description="Password API per MFP"), +): + """ + Endpoint per il proxy delle playlist M3U con supporto MFP. + + Formato query string: playlist1&url1;playlist2&url2 + Esempio: https://mfp.com:pass123&http://provider.com/playlist.m3u + """ + try: + if not d: + raise HTTPException(status_code=400, detail="Query string mancante") + + if not d.strip(): + raise HTTPException(status_code=400, detail="Query string cannot be empty") + + # Validate that we have at least one valid definition + playlist_definitions = [def_.strip() for def_ in d.split(';') if def_.strip()] + if not playlist_definitions: + raise HTTPException(status_code=400, detail="No valid playlist definitions found") + + # Costruisci base_url con lo schema corretto + original_scheme = get_original_scheme(request) + base_url = f"{original_scheme}://{request.url.netloc}" + + # Estrai base_url dalla prima definizione se presente + if playlist_definitions and '&' in playlist_definitions[0]: + parts = playlist_definitions[0].split('&', 1) + if ':' in parts[0] and not parts[0].startswith('http'): + # Estrai base_url dalla prima parte se contiene password + base_url_part = parts[0].rsplit(':', 1)[0] + if base_url_part.startswith('http'): + base_url = base_url_part + + async def generate_response(): + async for line in async_generate_combined_playlist(playlist_definitions, base_url, api_password): + yield line + + return StreamingResponse( + generate_response(), + media_type='application/vnd.apple.mpegurl', + headers={ + 'Content-Disposition': 'attachment; filename="playlist.m3u"', + 'Access-Control-Allow-Origin': '*' + } + ) + + except Exception as e: + logger.error(f"General error in playlist handler: {str(e)}") + raise HTTPException(status_code=500, detail=f"Error: {str(e)}") from e + + +@playlist_builder_router.get("/builder") +async def url_builder(): + """ + Pagina con un'interfaccia per generare l'URL del proxy MFP. + """ + return RedirectResponse(url="/playlist_builder.html") diff --git a/mediaflow_proxy/routes/proxy.py b/mediaflow_proxy/routes/proxy.py index 46b8a40..dd0164c 100644 --- a/mediaflow_proxy/routes/proxy.py +++ b/mediaflow_proxy/routes/proxy.py @@ -1,10 +1,14 @@ from typing import Annotated -from urllib.parse import quote +from urllib.parse import quote, unquote +import re +import logging from fastapi import Request, Depends, APIRouter, Query, HTTPException +from fastapi.responses import Response, RedirectResponse from mediaflow_proxy.handlers import ( handle_hls_stream_proxy, + handle_stream_request, proxy_stream, get_manifest, get_playlist, @@ -18,10 +22,144 @@ from mediaflow_proxy.schemas import ( MPDManifestParams, ) from mediaflow_proxy.utils.http_utils import get_proxy_headers, ProxyRequestHeaders +from mediaflow_proxy.utils.base64_utils import process_potential_base64_url proxy_router = APIRouter() +def sanitize_url(url: str) -> str: + """ + Sanitize URL to fix common encoding issues and handle base64 encoded URLs. + + Args: + url (str): The URL to sanitize. + + Returns: + str: The sanitized URL. + """ + logger = logging.getLogger(__name__) + original_url = url + + # First, try to process potential base64 encoded URLs + url = process_potential_base64_url(url) + + # Fix malformed URLs where https%22// should be https:// + url = re.sub(r'https%22//', 'https://', url) + url = re.sub(r'http%22//', 'http://', url) + + # Fix malformed URLs where https%3A%22// should be https:// + url = re.sub(r'https%3A%22//', 'https://', url) + url = re.sub(r'http%3A%22//', 'http://', url) + + # Fix malformed URLs where https:"// should be https:// (after partial decoding) + url = re.sub(r'https:"//', 'https://', url) + url = re.sub(r'http:"//', 'http://', url) + + # Fix URLs where key_id and key parameters are incorrectly appended to the base URL + # This happens when the URL contains &key_id= and &key= which should be handled as proxy parameters + if '&key_id=' in url and '&key=' in url: + # Split the URL at the first occurrence of &key_id= to separate the base URL from the incorrectly appended parameters + base_url = url.split('&key_id=')[0] + logger.info(f"Removed incorrectly appended key parameters from URL: '{url}' -> '{base_url}'") + url = base_url + + # Log if URL was changed + if url != original_url: + logger.info(f"URL sanitized: '{original_url}' -> '{url}'") + + # Also try URL decoding to see what we get + try: + decoded_url = unquote(url) + if decoded_url != url: + logger.info(f"URL after decoding: '{decoded_url}'") + # If after decoding we still have malformed protocol, fix it + if ':"/' in decoded_url: + # Fix https:"// or http:"// patterns + fixed_decoded = re.sub(r'([a-z]+):"//', r'\1://', decoded_url) + logger.info(f"Fixed decoded URL: '{fixed_decoded}'") + return fixed_decoded + except Exception as e: + logger.warning(f"Error decoding URL '{url}': {e}") + + return url + + +def extract_drm_params_from_url(url: str) -> tuple[str, str, str]: + """ + Extract DRM parameters (key_id and key) from a URL if they are incorrectly appended. + + Args: + url (str): The URL that may contain appended DRM parameters. + + Returns: + tuple: (clean_url, key_id, key) where clean_url has the parameters removed, + and key_id/key are the extracted values (or None if not found). + """ + logger = logging.getLogger(__name__) + key_id = None + key = None + clean_url = url + + # Check if URL contains incorrectly appended key_id and key parameters + if '&key_id=' in url and '&key=' in url: + # Extract key_id + key_id_match = re.search(r'&key_id=([^&]+)', url) + if key_id_match: + key_id = key_id_match.group(1) + + # Extract key + key_match = re.search(r'&key=([^&]+)', url) + if key_match: + key = key_match.group(1) + + # Remove the parameters from the URL + clean_url = re.sub(r'&key_id=[^&]*', '', url) + clean_url = re.sub(r'&key=[^&]*', '', clean_url) + + logger.info(f"Extracted DRM parameters from URL: key_id={key_id}, key={key}") + logger.info(f"Cleaned URL: '{url}' -> '{clean_url}'") + + return clean_url, key_id, key + + +def _check_and_redirect_dlhd_stream(request: Request, destination: str) -> RedirectResponse | None: + """ + Check if destination contains stream-{numero} pattern and redirect to extractor if needed. + + Args: + request (Request): The incoming HTTP request. + destination (str): The destination URL to check. + + Returns: + RedirectResponse | None: RedirectResponse if redirect is needed, None otherwise. + """ + import re + + # Check for stream-{numero} pattern (e.g., stream-1, stream-123, etc.) + if re.search(r'stream-\d+', destination): + from urllib.parse import urlencode + + # Build redirect URL to extractor + redirect_params = { + "host": "DLHD", + "redirect_stream": "true", + "d": destination + } + + # Preserve api_password if present + if "api_password" in request.query_params: + redirect_params["api_password"] = request.query_params["api_password"] + + # Build the redirect URL + base_url = str(request.url_for("extract_url")) + redirect_url = f"{base_url}?{urlencode(redirect_params)}" + + return RedirectResponse(url=redirect_url, status_code=302) + + return None + + +@proxy_router.head("/hls/manifest.m3u8") @proxy_router.head("/hls/manifest.m3u8") @proxy_router.get("/hls/manifest.m3u8") async def hls_manifest_proxy( @@ -40,9 +178,111 @@ async def hls_manifest_proxy( Returns: Response: The HTTP response with the processed m3u8 playlist or streamed content. """ + # Sanitize destination URL to fix common encoding issues + hls_params.destination = sanitize_url(hls_params.destination) + + # Check if destination contains stream-{numero} pattern and redirect to extractor + redirect_response = _check_and_redirect_dlhd_stream(request, hls_params.destination) + if redirect_response: + return redirect_response + return await handle_hls_stream_proxy(request, hls_params, proxy_headers) +@proxy_router.get("/hls/segment") +async def hls_segment_proxy( + request: Request, + proxy_headers: Annotated[ProxyRequestHeaders, Depends(get_proxy_headers)], + segment_url: str = Query(..., description="URL of the HLS segment"), +): + """ + Proxy HLS segments with optional pre-buffering support. + + Args: + request (Request): The incoming HTTP request. + segment_url (str): URL of the HLS segment to proxy. + proxy_headers (ProxyRequestHeaders): The headers to include in the request. + + Returns: + Response: The HTTP response with the segment content. + """ + from mediaflow_proxy.utils.hls_prebuffer import hls_prebuffer + from mediaflow_proxy.configs import settings + + # Sanitize segment URL to fix common encoding issues + segment_url = sanitize_url(segment_url) + + # Extract headers for pre-buffering + headers = {} + for key, value in request.query_params.items(): + if key.startswith("h_"): + headers[key[2:]] = value + + # Try to get segment from pre-buffer cache first + if settings.enable_hls_prebuffer: + cached_segment = await hls_prebuffer.get_segment(segment_url, headers) + if cached_segment: + return Response( + content=cached_segment, + media_type="video/mp2t", + headers={ + "Content-Type": "video/mp2t", + "Cache-Control": "public, max-age=3600", + "Access-Control-Allow-Origin": "*" + } + ) + + # Fallback to direct streaming if not in cache + return await handle_stream_request("GET", segment_url, proxy_headers) + + +@proxy_router.get("/dash/segment") +async def dash_segment_proxy( + request: Request, + proxy_headers: Annotated[ProxyRequestHeaders, Depends(get_proxy_headers)], + segment_url: str = Query(..., description="URL of the DASH segment"), +): + """ + Proxy DASH segments with optional pre-buffering support. + + Args: + request (Request): The incoming HTTP request. + segment_url (str): URL of the DASH segment to proxy. + proxy_headers (ProxyRequestHeaders): The headers to include in the request. + + Returns: + Response: The HTTP response with the segment content. + """ + from mediaflow_proxy.utils.dash_prebuffer import dash_prebuffer + from mediaflow_proxy.configs import settings + + # Sanitize segment URL to fix common encoding issues + segment_url = sanitize_url(segment_url) + + # Extract headers for pre-buffering + headers = {} + for key, value in request.query_params.items(): + if key.startswith("h_"): + headers[key[2:]] = value + + # Try to get segment from pre-buffer cache first + if settings.enable_dash_prebuffer: + cached_segment = await dash_prebuffer.get_segment(segment_url, headers) + if cached_segment: + return Response( + content=cached_segment, + media_type="video/mp4", + headers={ + "Content-Type": "video/mp4", + "Cache-Control": "public, max-age=3600", + "Access-Control-Allow-Origin": "*" + } + ) + + # Fallback to direct streaming if not in cache + return await handle_stream_request("GET", segment_url, proxy_headers) + + @proxy_router.head("/stream") @proxy_router.get("/stream") @proxy_router.head("/stream/{filename:path}") @@ -65,6 +305,14 @@ async def proxy_stream_endpoint( Returns: Response: The HTTP response with the streamed content. """ + # Sanitize destination URL to fix common encoding issues + destination = sanitize_url(destination) + + # Check if destination contains stream-{numero} pattern and redirect to extractor + redirect_response = _check_and_redirect_dlhd_stream(request, destination) + if redirect_response: + return redirect_response + content_range = proxy_headers.request.get("range", "bytes=0-") if "nan" in content_range.casefold(): # Handle invalid range requests "bytes=NaN-NaN" @@ -103,6 +351,21 @@ async def mpd_manifest_proxy( Returns: Response: The HTTP response with the HLS manifest. """ + # Extract DRM parameters from destination URL if they are incorrectly appended + clean_url, extracted_key_id, extracted_key = extract_drm_params_from_url(manifest_params.destination) + + # Update the destination with the cleaned URL + manifest_params.destination = clean_url + + # Use extracted parameters if they exist and the manifest params don't already have them + if extracted_key_id and not manifest_params.key_id: + manifest_params.key_id = extracted_key_id + if extracted_key and not manifest_params.key: + manifest_params.key = extracted_key + + # Sanitize destination URL to fix common encoding issues + manifest_params.destination = sanitize_url(manifest_params.destination) + return await get_manifest(request, manifest_params, proxy_headers) @@ -123,6 +386,21 @@ async def playlist_endpoint( Returns: Response: The HTTP response with the HLS playlist. """ + # Extract DRM parameters from destination URL if they are incorrectly appended + clean_url, extracted_key_id, extracted_key = extract_drm_params_from_url(playlist_params.destination) + + # Update the destination with the cleaned URL + playlist_params.destination = clean_url + + # Use extracted parameters if they exist and the playlist params don't already have them + if extracted_key_id and not playlist_params.key_id: + playlist_params.key_id = extracted_key_id + if extracted_key and not playlist_params.key: + playlist_params.key = extracted_key + + # Sanitize destination URL to fix common encoding issues + playlist_params.destination = sanitize_url(playlist_params.destination) + return await get_playlist(request, playlist_params, proxy_headers) @@ -152,4 +430,4 @@ async def get_mediaflow_proxy_public_ip(): Returns: Response: The HTTP response with the public IP address in the form of a JSON object. {"ip": "xxx.xxx.xxx.xxx"} """ - return await get_public_ip() + return await get_public_ip() \ No newline at end of file diff --git a/mediaflow_proxy/schemas.py b/mediaflow_proxy/schemas.py index 18da1c6..2684d98 100644 --- a/mediaflow_proxy/schemas.py +++ b/mediaflow_proxy/schemas.py @@ -25,6 +25,9 @@ class GenerateUrlRequest(BaseModel): ) ip: Optional[IPvAnyAddress] = Field(None, description="The IP address to restrict the URL to.") filename: Optional[str] = Field(None, description="Filename to be preserved for media players like Infuse.") + base64_encode_destination: Optional[bool] = Field( + False, description="Whether to encode the destination URL in base64 format before processing." + ) class MultiUrlRequestItem(BaseModel): diff --git a/mediaflow_proxy/static/playlist_builder.html b/mediaflow_proxy/static/playlist_builder.html new file mode 100644 index 0000000..75b6f3c --- /dev/null +++ b/mediaflow_proxy/static/playlist_builder.html @@ -0,0 +1,132 @@ + + + + + + MFP Playlist Builder + + + +
+

🔗 MFP Playlist Builder

+ +
+ + +
+ +
+ + +
+ +

Playlists to Merge

+
+ +
+ + +
+ + + +
+ +
The URL will appear here...
+ +
+
+ + + + + + + \ No newline at end of file diff --git a/mediaflow_proxy/utils/base64_utils.py b/mediaflow_proxy/utils/base64_utils.py new file mode 100644 index 0000000..67063b2 --- /dev/null +++ b/mediaflow_proxy/utils/base64_utils.py @@ -0,0 +1,123 @@ +import base64 +import logging +from typing import Optional +from urllib.parse import urlparse + +logger = logging.getLogger(__name__) + + +def is_base64_url(url: str) -> bool: + """ + Check if a URL appears to be base64 encoded. + + Args: + url (str): The URL to check. + + Returns: + bool: True if the URL appears to be base64 encoded, False otherwise. + """ + # Check if the URL doesn't start with http/https and contains base64-like characters + if url.startswith(('http://', 'https://', 'ftp://', 'ftps://')): + return False + + # Base64 URLs typically contain only alphanumeric characters, +, /, and = + # and don't contain typical URL characters like :// + base64_chars = set('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=') + url_chars = set(url) + + # If the URL contains characters not in base64 charset, it's likely not base64 + if not url_chars.issubset(base64_chars): + return False + + # Additional heuristic: base64 strings are typically longer and don't contain common URL patterns + if len(url) < 10: # Too short to be a meaningful base64 encoded URL + return False + + return True + + +def decode_base64_url(encoded_url: str) -> Optional[str]: + """ + Decode a base64 encoded URL. + + Args: + encoded_url (str): The base64 encoded URL string. + + Returns: + Optional[str]: The decoded URL if successful, None if decoding fails. + """ + try: + # Handle URL-safe base64 encoding (replace - with + and _ with /) + url_safe_encoded = encoded_url.replace('-', '+').replace('_', '/') + + # Add padding if necessary + missing_padding = len(url_safe_encoded) % 4 + if missing_padding: + url_safe_encoded += '=' * (4 - missing_padding) + + # Decode the base64 string + decoded_bytes = base64.b64decode(url_safe_encoded) + decoded_url = decoded_bytes.decode('utf-8') + + # Validate that the decoded string is a valid URL + parsed = urlparse(decoded_url) + if parsed.scheme and parsed.netloc: + logger.info(f"Successfully decoded base64 URL: {encoded_url[:50]}... -> {decoded_url}") + return decoded_url + else: + logger.warning(f"Decoded string is not a valid URL: {decoded_url}") + return None + + except (base64.binascii.Error, UnicodeDecodeError, ValueError) as e: + logger.debug(f"Failed to decode base64 URL '{encoded_url[:50]}...': {e}") + return None + + +def encode_url_to_base64(url: str, url_safe: bool = True) -> str: + """ + Encode a URL to base64. + + Args: + url (str): The URL to encode. + url_safe (bool): Whether to use URL-safe base64 encoding (default: True). + + Returns: + str: The base64 encoded URL. + """ + try: + url_bytes = url.encode('utf-8') + if url_safe: + # Use URL-safe base64 encoding (replace + with - and / with _) + encoded = base64.urlsafe_b64encode(url_bytes).decode('utf-8') + # Remove padding for cleaner URLs + encoded = encoded.rstrip('=') + else: + encoded = base64.b64encode(url_bytes).decode('utf-8') + + logger.debug(f"Encoded URL to base64: {url} -> {encoded}") + return encoded + + except Exception as e: + logger.error(f"Failed to encode URL to base64: {e}") + raise + + +def process_potential_base64_url(url: str) -> str: + """ + Process a URL that might be base64 encoded. If it's base64 encoded, decode it. + Otherwise, return the original URL. + + Args: + url (str): The URL to process. + + Returns: + str: The processed URL (decoded if it was base64, original otherwise). + """ + if is_base64_url(url): + decoded_url = decode_base64_url(url) + if decoded_url: + return decoded_url + else: + logger.warning(f"URL appears to be base64 but failed to decode: {url[:50]}...") + + return url \ No newline at end of file diff --git a/mediaflow_proxy/utils/dash_prebuffer.py b/mediaflow_proxy/utils/dash_prebuffer.py new file mode 100644 index 0000000..c312ebd --- /dev/null +++ b/mediaflow_proxy/utils/dash_prebuffer.py @@ -0,0 +1,373 @@ +import logging +import psutil +from typing import Dict, Optional, List +from urllib.parse import urljoin +import xmltodict +from mediaflow_proxy.utils.http_utils import create_httpx_client +from mediaflow_proxy.configs import settings + +logger = logging.getLogger(__name__) + + +class DASHPreBuffer: + """ + Pre-buffer system for DASH streams to reduce latency and improve streaming performance. + """ + + def __init__(self, max_cache_size: Optional[int] = None, prebuffer_segments: Optional[int] = None): + """ + Initialize the DASH pre-buffer system. + + Args: + max_cache_size (int): Maximum number of segments to cache (uses config if None) + prebuffer_segments (int): Number of segments to pre-buffer ahead (uses config if None) + """ + self.max_cache_size = max_cache_size or settings.dash_prebuffer_cache_size + self.prebuffer_segments = prebuffer_segments or settings.dash_prebuffer_segments + self.max_memory_percent = settings.dash_prebuffer_max_memory_percent + self.emergency_threshold = settings.dash_prebuffer_emergency_threshold + + # Cache for different types of DASH content + self.segment_cache: Dict[str, bytes] = {} + self.init_segment_cache: Dict[str, bytes] = {} + self.manifest_cache: Dict[str, dict] = {} + + # Track segment URLs for each adaptation set + self.adaptation_segments: Dict[str, List[str]] = {} + self.client = create_httpx_client() + + def _get_memory_usage_percent(self) -> float: + """ + Get current memory usage percentage. + + Returns: + float: Memory usage percentage + """ + try: + memory = psutil.virtual_memory() + return memory.percent + except Exception as e: + logger.warning(f"Failed to get memory usage: {e}") + return 0.0 + + def _check_memory_threshold(self) -> bool: + """ + Check if memory usage exceeds the emergency threshold. + + Returns: + bool: True if emergency cleanup is needed + """ + memory_percent = self._get_memory_usage_percent() + return memory_percent > self.emergency_threshold + + def _emergency_cache_cleanup(self) -> None: + """ + Perform emergency cache cleanup when memory usage is high. + """ + if self._check_memory_threshold(): + logger.warning("Emergency DASH cache cleanup triggered due to high memory usage") + + # Clear 50% of segment cache + segment_cache_size = len(self.segment_cache) + segment_keys_to_remove = list(self.segment_cache.keys())[:segment_cache_size // 2] + for key in segment_keys_to_remove: + del self.segment_cache[key] + + # Clear 50% of init segment cache + init_cache_size = len(self.init_segment_cache) + init_keys_to_remove = list(self.init_segment_cache.keys())[:init_cache_size // 2] + for key in init_keys_to_remove: + del self.init_segment_cache[key] + + logger.info(f"Emergency cleanup removed {len(segment_keys_to_remove)} segments and {len(init_keys_to_remove)} init segments from cache") + + async def prebuffer_dash_manifest(self, mpd_url: str, headers: Dict[str, str]) -> None: + """ + Pre-buffer segments from a DASH manifest. + + Args: + mpd_url (str): URL of the DASH manifest + headers (Dict[str, str]): Headers to use for requests + """ + try: + # Download and parse MPD manifest + response = await self.client.get(mpd_url, headers=headers) + response.raise_for_status() + mpd_content = response.text + + # Parse MPD XML + mpd_dict = xmltodict.parse(mpd_content) + + # Store manifest in cache + self.manifest_cache[mpd_url] = mpd_dict + + # Extract initialization segments and first few segments + await self._extract_and_prebuffer_segments(mpd_dict, mpd_url, headers) + + logger.info(f"Pre-buffered DASH manifest: {mpd_url}") + + except Exception as e: + logger.warning(f"Failed to pre-buffer DASH manifest {mpd_url}: {e}") + + async def _extract_and_prebuffer_segments(self, mpd_dict: dict, base_url: str, headers: Dict[str, str]) -> None: + """ + Extract and pre-buffer segments from MPD manifest. + + Args: + mpd_dict (dict): Parsed MPD manifest + base_url (str): Base URL for resolving relative URLs + headers (Dict[str, str]): Headers to use for requests + """ + try: + # Extract Period and AdaptationSet information + mpd = mpd_dict.get('MPD', {}) + periods = mpd.get('Period', []) + if not isinstance(periods, list): + periods = [periods] + + for period in periods: + adaptation_sets = period.get('AdaptationSet', []) + if not isinstance(adaptation_sets, list): + adaptation_sets = [adaptation_sets] + + for adaptation_set in adaptation_sets: + # Extract initialization segment + init_segment = adaptation_set.get('SegmentTemplate', {}).get('@initialization') + if init_segment: + init_url = urljoin(base_url, init_segment) + await self._download_init_segment(init_url, headers) + + # Extract segment template + segment_template = adaptation_set.get('SegmentTemplate', {}) + if segment_template: + await self._prebuffer_template_segments(segment_template, base_url, headers) + + # Extract segment list + segment_list = adaptation_set.get('SegmentList', {}) + if segment_list: + await self._prebuffer_list_segments(segment_list, base_url, headers) + + except Exception as e: + logger.warning(f"Failed to extract segments from MPD: {e}") + + async def _download_init_segment(self, init_url: str, headers: Dict[str, str]) -> None: + """ + Download and cache initialization segment. + + Args: + init_url (str): URL of the initialization segment + headers (Dict[str, str]): Headers to use for request + """ + try: + # Check memory usage before downloading + memory_percent = self._get_memory_usage_percent() + if memory_percent > self.max_memory_percent: + logger.warning(f"Memory usage {memory_percent}% exceeds limit {self.max_memory_percent}%, skipping init segment download") + return + + response = await self.client.get(init_url, headers=headers) + response.raise_for_status() + + # Cache the init segment + self.init_segment_cache[init_url] = response.content + + # Check for emergency cleanup + if self._check_memory_threshold(): + self._emergency_cache_cleanup() + + logger.debug(f"Cached init segment: {init_url}") + + except Exception as e: + logger.warning(f"Failed to download init segment {init_url}: {e}") + + async def _prebuffer_template_segments(self, segment_template: dict, base_url: str, headers: Dict[str, str]) -> None: + """ + Pre-buffer segments using segment template. + + Args: + segment_template (dict): Segment template from MPD + base_url (str): Base URL for resolving relative URLs + headers (Dict[str, str]): Headers to use for requests + """ + try: + media_template = segment_template.get('@media') + if not media_template: + return + + # Extract template parameters + start_number = int(segment_template.get('@startNumber', 1)) + duration = float(segment_template.get('@duration', 0)) + timescale = float(segment_template.get('@timescale', 1)) + + # Pre-buffer first few segments + for i in range(self.prebuffer_segments): + segment_number = start_number + i + segment_url = media_template.replace('$Number$', str(segment_number)) + full_url = urljoin(base_url, segment_url) + + await self._download_segment(full_url, headers) + + except Exception as e: + logger.warning(f"Failed to pre-buffer template segments: {e}") + + async def _prebuffer_list_segments(self, segment_list: dict, base_url: str, headers: Dict[str, str]) -> None: + """ + Pre-buffer segments from segment list. + + Args: + segment_list (dict): Segment list from MPD + base_url (str): Base URL for resolving relative URLs + headers (Dict[str, str]): Headers to use for requests + """ + try: + segments = segment_list.get('SegmentURL', []) + if not isinstance(segments, list): + segments = [segments] + + # Pre-buffer first few segments + for segment in segments[:self.prebuffer_segments]: + segment_url = segment.get('@src') + if segment_url: + full_url = urljoin(base_url, segment_url) + await self._download_segment(full_url, headers) + + except Exception as e: + logger.warning(f"Failed to pre-buffer list segments: {e}") + + async def _download_segment(self, segment_url: str, headers: Dict[str, str]) -> None: + """ + Download a single segment and cache it. + + Args: + segment_url (str): URL of the segment to download + headers (Dict[str, str]): Headers to use for request + """ + try: + # Check memory usage before downloading + memory_percent = self._get_memory_usage_percent() + if memory_percent > self.max_memory_percent: + logger.warning(f"Memory usage {memory_percent}% exceeds limit {self.max_memory_percent}%, skipping segment download") + return + + response = await self.client.get(segment_url, headers=headers) + response.raise_for_status() + + # Cache the segment + self.segment_cache[segment_url] = response.content + + # Check for emergency cleanup + if self._check_memory_threshold(): + self._emergency_cache_cleanup() + # Maintain cache size + elif len(self.segment_cache) > self.max_cache_size: + # Remove oldest entries (simple FIFO) + oldest_key = next(iter(self.segment_cache)) + del self.segment_cache[oldest_key] + + logger.debug(f"Cached DASH segment: {segment_url}") + + except Exception as e: + logger.warning(f"Failed to download DASH segment {segment_url}: {e}") + + async def get_segment(self, segment_url: str, headers: Dict[str, str]) -> Optional[bytes]: + """ + Get a segment from cache or download it. + + Args: + segment_url (str): URL of the segment + headers (Dict[str, str]): Headers to use for request + + Returns: + Optional[bytes]: Cached segment data or None if not available + """ + # Check segment cache first + if segment_url in self.segment_cache: + logger.debug(f"DASH cache hit for segment: {segment_url}") + return self.segment_cache[segment_url] + + # Check init segment cache + if segment_url in self.init_segment_cache: + logger.debug(f"DASH cache hit for init segment: {segment_url}") + return self.init_segment_cache[segment_url] + + # Check memory usage before downloading + memory_percent = self._get_memory_usage_percent() + if memory_percent > self.max_memory_percent: + logger.warning(f"Memory usage {memory_percent}% exceeds limit {self.max_memory_percent}%, skipping download") + return None + + # Download if not in cache + try: + response = await self.client.get(segment_url, headers=headers) + response.raise_for_status() + segment_data = response.content + + # Determine if it's an init segment or regular segment + if 'init' in segment_url.lower() or segment_url.endswith('.mp4'): + self.init_segment_cache[segment_url] = segment_data + else: + self.segment_cache[segment_url] = segment_data + + # Check for emergency cleanup + if self._check_memory_threshold(): + self._emergency_cache_cleanup() + # Maintain cache size + elif len(self.segment_cache) > self.max_cache_size: + oldest_key = next(iter(self.segment_cache)) + del self.segment_cache[oldest_key] + + logger.debug(f"Downloaded and cached DASH segment: {segment_url}") + return segment_data + + except Exception as e: + logger.warning(f"Failed to get DASH segment {segment_url}: {e}") + return None + + async def get_manifest(self, mpd_url: str, headers: Dict[str, str]) -> Optional[dict]: + """ + Get MPD manifest from cache or download it. + + Args: + mpd_url (str): URL of the MPD manifest + headers (Dict[str, str]): Headers to use for request + + Returns: + Optional[dict]: Cached manifest data or None if not available + """ + # Check cache first + if mpd_url in self.manifest_cache: + logger.debug(f"DASH cache hit for manifest: {mpd_url}") + return self.manifest_cache[mpd_url] + + # Download if not in cache + try: + response = await self.client.get(mpd_url, headers=headers) + response.raise_for_status() + mpd_content = response.text + mpd_dict = xmltodict.parse(mpd_content) + + # Cache the manifest + self.manifest_cache[mpd_url] = mpd_dict + + logger.debug(f"Downloaded and cached DASH manifest: {mpd_url}") + return mpd_dict + + except Exception as e: + logger.warning(f"Failed to get DASH manifest {mpd_url}: {e}") + return None + + def clear_cache(self) -> None: + """Clear the DASH cache.""" + self.segment_cache.clear() + self.init_segment_cache.clear() + self.manifest_cache.clear() + self.adaptation_segments.clear() + logger.info("DASH pre-buffer cache cleared") + + async def close(self) -> None: + """Close the pre-buffer system.""" + await self.client.aclose() + + +# Global DASH pre-buffer instance +dash_prebuffer = DASHPreBuffer() diff --git a/mediaflow_proxy/utils/hls_prebuffer.py b/mediaflow_proxy/utils/hls_prebuffer.py new file mode 100644 index 0000000..9748406 --- /dev/null +++ b/mediaflow_proxy/utils/hls_prebuffer.py @@ -0,0 +1,321 @@ +import asyncio +import logging +import psutil +from typing import Dict, Optional, List +from urllib.parse import urlparse +import httpx +from mediaflow_proxy.utils.http_utils import create_httpx_client +from mediaflow_proxy.configs import settings + +logger = logging.getLogger(__name__) + + +class HLSPreBuffer: + """ + Pre-buffer system for HLS streams to reduce latency and improve streaming performance. + """ + + def __init__(self, max_cache_size: Optional[int] = None, prebuffer_segments: Optional[int] = None): + """ + Initialize the HLS pre-buffer system. + + Args: + max_cache_size (int): Maximum number of segments to cache (uses config if None) + prebuffer_segments (int): Number of segments to pre-buffer ahead (uses config if None) + """ + self.max_cache_size = max_cache_size or settings.hls_prebuffer_cache_size + self.prebuffer_segments = prebuffer_segments or settings.hls_prebuffer_segments + self.max_memory_percent = settings.hls_prebuffer_max_memory_percent + self.emergency_threshold = settings.hls_prebuffer_emergency_threshold + self.segment_cache: Dict[str, bytes] = {} + self.segment_urls: Dict[str, List[str]] = {} + self.client = create_httpx_client() + + async def prebuffer_playlist(self, playlist_url: str, headers: Dict[str, str]) -> None: + """ + Pre-buffer segments from an HLS playlist. + + Args: + playlist_url (str): URL of the HLS playlist + headers (Dict[str, str]): Headers to use for requests + """ + try: + logger.debug(f"Starting pre-buffer for playlist: {playlist_url}") + + # Download and parse playlist + response = await self.client.get(playlist_url, headers=headers) + response.raise_for_status() + playlist_content = response.text + + # Check if this is a master playlist (contains variants) + if "#EXT-X-STREAM-INF" in playlist_content: + logger.debug(f"Master playlist detected, finding first variant") + # Extract variant URLs + variant_urls = self._extract_variant_urls(playlist_content, playlist_url) + if variant_urls: + # Pre-buffer the first variant + first_variant_url = variant_urls[0] + logger.debug(f"Pre-buffering first variant: {first_variant_url}") + await self.prebuffer_playlist(first_variant_url, headers) + else: + logger.warning("No variants found in master playlist") + return + + # Extract segment URLs + segment_urls = self._extract_segment_urls(playlist_content, playlist_url) + + # Store segment URLs for this playlist + self.segment_urls[playlist_url] = segment_urls + + # Pre-buffer first few segments + await self._prebuffer_segments(segment_urls[:self.prebuffer_segments], headers) + + logger.info(f"Pre-buffered {min(self.prebuffer_segments, len(segment_urls))} segments for {playlist_url}") + + except Exception as e: + logger.warning(f"Failed to pre-buffer playlist {playlist_url}: {e}") + + def _extract_segment_urls(self, playlist_content: str, base_url: str) -> List[str]: + """ + Extract segment URLs from HLS playlist content. + + Args: + playlist_content (str): Content of the HLS playlist + base_url (str): Base URL for resolving relative URLs + + Returns: + List[str]: List of segment URLs + """ + segment_urls = [] + lines = playlist_content.split('\n') + + logger.debug(f"Analyzing playlist with {len(lines)} lines") + + for line in lines: + line = line.strip() + if line and not line.startswith('#'): + # Check if line contains a URL (http/https) or is a relative path + if 'http://' in line or 'https://' in line: + segment_urls.append(line) + logger.debug(f"Found absolute URL: {line}") + elif line and not line.startswith('#'): + # This might be a relative path to a segment + parsed_base = urlparse(base_url) + # Ensure proper path joining + if line.startswith('/'): + segment_url = f"{parsed_base.scheme}://{parsed_base.netloc}{line}" + else: + # Get the directory path from base_url + base_path = parsed_base.path.rsplit('/', 1)[0] if '/' in parsed_base.path else '' + segment_url = f"{parsed_base.scheme}://{parsed_base.netloc}{base_path}/{line}" + segment_urls.append(segment_url) + logger.debug(f"Found relative path: {line} -> {segment_url}") + + logger.debug(f"Extracted {len(segment_urls)} segment URLs from playlist") + if segment_urls: + logger.debug(f"First segment URL: {segment_urls[0]}") + else: + logger.debug("No segment URLs found in playlist") + # Log first few lines for debugging + for i, line in enumerate(lines[:10]): + logger.debug(f"Line {i}: {line}") + + return segment_urls + + def _extract_variant_urls(self, playlist_content: str, base_url: str) -> List[str]: + """ + Extract variant URLs from master playlist content. + + Args: + playlist_content (str): Content of the master playlist + base_url (str): Base URL for resolving relative URLs + + Returns: + List[str]: List of variant URLs + """ + variant_urls = [] + lines = playlist_content.split('\n') + + for line in lines: + line = line.strip() + if line and not line.startswith('#') and ('http://' in line or 'https://' in line): + # Resolve relative URLs + if line.startswith('http'): + variant_urls.append(line) + else: + # Join with base URL for relative paths + parsed_base = urlparse(base_url) + variant_url = f"{parsed_base.scheme}://{parsed_base.netloc}{line}" + variant_urls.append(variant_url) + + logger.debug(f"Extracted {len(variant_urls)} variant URLs from master playlist") + if variant_urls: + logger.debug(f"First variant URL: {variant_urls[0]}") + + return variant_urls + + async def _prebuffer_segments(self, segment_urls: List[str], headers: Dict[str, str]) -> None: + """ + Pre-buffer specific segments. + + Args: + segment_urls (List[str]): List of segment URLs to pre-buffer + headers (Dict[str, str]): Headers to use for requests + """ + tasks = [] + for url in segment_urls: + if url not in self.segment_cache: + tasks.append(self._download_segment(url, headers)) + + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + + def _get_memory_usage_percent(self) -> float: + """ + Get current memory usage percentage. + + Returns: + float: Memory usage percentage + """ + try: + memory = psutil.virtual_memory() + return memory.percent + except Exception as e: + logger.warning(f"Failed to get memory usage: {e}") + return 0.0 + + def _check_memory_threshold(self) -> bool: + """ + Check if memory usage exceeds the emergency threshold. + + Returns: + bool: True if emergency cleanup is needed + """ + memory_percent = self._get_memory_usage_percent() + return memory_percent > self.emergency_threshold + + def _emergency_cache_cleanup(self) -> None: + """ + Perform emergency cache cleanup when memory usage is high. + """ + if self._check_memory_threshold(): + logger.warning("Emergency cache cleanup triggered due to high memory usage") + # Clear 50% of cache + cache_size = len(self.segment_cache) + keys_to_remove = list(self.segment_cache.keys())[:cache_size // 2] + for key in keys_to_remove: + del self.segment_cache[key] + logger.info(f"Emergency cleanup removed {len(keys_to_remove)} segments from cache") + + async def _download_segment(self, segment_url: str, headers: Dict[str, str]) -> None: + """ + Download a single segment and cache it. + + Args: + segment_url (str): URL of the segment to download + headers (Dict[str, str]): Headers to use for request + """ + try: + # Check memory usage before downloading + memory_percent = self._get_memory_usage_percent() + if memory_percent > self.max_memory_percent: + logger.warning(f"Memory usage {memory_percent}% exceeds limit {self.max_memory_percent}%, skipping download") + return + + response = await self.client.get(segment_url, headers=headers) + response.raise_for_status() + + # Cache the segment + self.segment_cache[segment_url] = response.content + + # Check for emergency cleanup + if self._check_memory_threshold(): + self._emergency_cache_cleanup() + # Maintain cache size + elif len(self.segment_cache) > self.max_cache_size: + # Remove oldest entries (simple FIFO) + oldest_key = next(iter(self.segment_cache)) + del self.segment_cache[oldest_key] + + logger.debug(f"Cached segment: {segment_url}") + + except Exception as e: + logger.warning(f"Failed to download segment {segment_url}: {e}") + + async def get_segment(self, segment_url: str, headers: Dict[str, str]) -> Optional[bytes]: + """ + Get a segment from cache or download it. + + Args: + segment_url (str): URL of the segment + headers (Dict[str, str]): Headers to use for request + + Returns: + Optional[bytes]: Cached segment data or None if not available + """ + # Check cache first + if segment_url in self.segment_cache: + logger.debug(f"Cache hit for segment: {segment_url}") + return self.segment_cache[segment_url] + + # Check memory usage before downloading + memory_percent = self._get_memory_usage_percent() + if memory_percent > self.max_memory_percent: + logger.warning(f"Memory usage {memory_percent}% exceeds limit {self.max_memory_percent}%, skipping download") + return None + + # Download if not in cache + try: + response = await self.client.get(segment_url, headers=headers) + response.raise_for_status() + segment_data = response.content + + # Cache the segment + self.segment_cache[segment_url] = segment_data + + # Check for emergency cleanup + if self._check_memory_threshold(): + self._emergency_cache_cleanup() + # Maintain cache size + elif len(self.segment_cache) > self.max_cache_size: + oldest_key = next(iter(self.segment_cache)) + del self.segment_cache[oldest_key] + + logger.debug(f"Downloaded and cached segment: {segment_url}") + return segment_data + + except Exception as e: + logger.warning(f"Failed to get segment {segment_url}: {e}") + return None + + async def prebuffer_next_segments(self, playlist_url: str, current_segment_index: int, headers: Dict[str, str]) -> None: + """ + Pre-buffer next segments based on current playback position. + + Args: + playlist_url (str): URL of the playlist + current_segment_index (int): Index of current segment + headers (Dict[str, str]): Headers to use for requests + """ + if playlist_url not in self.segment_urls: + return + + segment_urls = self.segment_urls[playlist_url] + next_segments = segment_urls[current_segment_index + 1:current_segment_index + 1 + self.prebuffer_segments] + + if next_segments: + await self._prebuffer_segments(next_segments, headers) + + def clear_cache(self) -> None: + """Clear the segment cache.""" + self.segment_cache.clear() + self.segment_urls.clear() + logger.info("HLS pre-buffer cache cleared") + + async def close(self) -> None: + """Close the pre-buffer system.""" + await self.client.aclose() + + +# Global pre-buffer instance +hls_prebuffer = HLSPreBuffer() \ No newline at end of file diff --git a/mediaflow_proxy/utils/m3u8_processor.py b/mediaflow_proxy/utils/m3u8_processor.py index a8c3f73..08f8151 100644 --- a/mediaflow_proxy/utils/m3u8_processor.py +++ b/mediaflow_proxy/utils/m3u8_processor.py @@ -1,3 +1,4 @@ +import asyncio import codecs import re from typing import AsyncGenerator @@ -6,6 +7,7 @@ from urllib import parse from mediaflow_proxy.configs import settings from mediaflow_proxy.utils.crypto_utils import encryption_handler from mediaflow_proxy.utils.http_utils import encode_mediaflow_proxy_url, encode_stremio_proxy_url, get_original_scheme +from mediaflow_proxy.utils.hls_prebuffer import hls_prebuffer class M3U8Processor: @@ -24,6 +26,7 @@ class M3U8Processor: self.mediaflow_proxy_url = str( request.url_for("hls_manifest_proxy").replace(scheme=get_original_scheme(request)) ) + self.playlist_url = None # Will be set when processing starts async def process_m3u8(self, content: str, base_url: str) -> str: """ @@ -36,6 +39,9 @@ class M3U8Processor: Returns: str: The processed m3u8 content. """ + # Store the playlist URL for prebuffering + self.playlist_url = base_url + lines = content.splitlines() processed_lines = [] for line in lines: @@ -45,6 +51,23 @@ class M3U8Processor: processed_lines.append(await self.proxy_content_url(line, base_url)) else: processed_lines.append(line) + + # Pre-buffer segments if enabled and this is a playlist + if (settings.enable_hls_prebuffer and + "#EXTM3U" in content and + self.playlist_url): + + # Extract headers from request for pre-buffering + headers = {} + for key, value in self.request.query_params.items(): + if key.startswith("h_"): + headers[key[2:]] = value + + # Start pre-buffering in background using the actual playlist URL + asyncio.create_task( + hls_prebuffer.prebuffer_playlist(self.playlist_url, headers) + ) + return "\n".join(processed_lines) async def process_m3u8_streaming( @@ -52,6 +75,7 @@ class M3U8Processor: ) -> AsyncGenerator[str, None]: """ Processes the m3u8 content on-the-fly, yielding processed lines as they are read. + Optimized to avoid accumulating the entire playlist content in memory. Args: content_iterator: An async iterator that yields chunks of the m3u8 content. @@ -60,8 +84,13 @@ class M3U8Processor: Yields: str: Processed lines of the m3u8 content. """ + # Store the playlist URL for prebuffering + self.playlist_url = base_url + buffer = "" # String buffer for decoded content decoder = codecs.getincrementaldecoder("utf-8")(errors="replace") + is_playlist_detected = False + is_prebuffer_started = False # Process the content chunk by chunk async for chunk in content_iterator: @@ -72,6 +101,10 @@ class M3U8Processor: decoded_chunk = decoder.decode(chunk) buffer += decoded_chunk + # Check for playlist marker early to avoid accumulating content + if not is_playlist_detected and "#EXTM3U" in buffer: + is_playlist_detected = True + # Process complete lines lines = buffer.split("\n") if len(lines) > 1: @@ -84,6 +117,25 @@ class M3U8Processor: # Keep the last line in the buffer (it might be incomplete) buffer = lines[-1] + # Start pre-buffering early once we detect this is a playlist + # This avoids waiting until the entire playlist is processed + if (settings.enable_hls_prebuffer and + is_playlist_detected and + not is_prebuffer_started and + self.playlist_url): + + # Extract headers from request for pre-buffering + headers = {} + for key, value in self.request.query_params.items(): + if key.startswith("h_"): + headers[key[2:]] = value + + # Start pre-buffering in background using the actual playlist URL + asyncio.create_task( + hls_prebuffer.prebuffer_playlist(self.playlist_url, headers) + ) + is_prebuffer_started = True + # Process any remaining data in the buffer plus final bytes final_chunk = decoder.decode(b"", final=True) if final_chunk: @@ -209,4 +261,4 @@ class M3U8Processor: full_url, query_params=query_params, encryption_handler=encryption_handler if has_encrypted else None, - ) + ) \ No newline at end of file