import re from typing import Dict, Any, Optional from urllib.parse import urlparse, quote from mediaflow_proxy.extractors.base import BaseExtractor, ExtractorError class DLHDExtractor(BaseExtractor): """DLHD (DaddyLive) URL extractor for M3U8 streams.""" def __init__(self, request_headers: dict): super().__init__(request_headers) # Default to HLS proxy endpoint self.mediaflow_endpoint = "hls_manifest_proxy" async def extract(self, url: str, **kwargs) -> Dict[str, Any]: """Extract DLHD stream URL and required headers. Args: url: The DaddyLive channel URL (required) Keyword Args: player_url: Direct player URL (optional) stream_url: The stream URL (optional) auth_url_base: Base URL for auth requests (optional) Returns: Dict containing stream URL and required headers """ try: # Channel URL is required and serves as the referer channel_url = url channel_origin = self._get_origin(channel_url) # Channel page origin # Check for direct parameters player_url_from_arg = kwargs.get("player_url") stream_url_from_arg = kwargs.get("stream_url") auth_url_base_from_arg = kwargs.get("auth_url_base") current_player_url_for_processing: str # If player URL not provided, extract it from channel page if not player_url_from_arg: # Get the channel page to extract the player iframe URL channel_headers = { "referer": channel_origin + "/", "origin": channel_origin, "user-agent": self.base_headers["user-agent"], } channel_response = await self._make_request(channel_url, headers=channel_headers) extracted_iframe_url = self._extract_player_url(channel_response.text) if not extracted_iframe_url: raise ExtractorError("Could not extract player URL from channel page") current_player_url_for_processing = extracted_iframe_url else: current_player_url_for_processing = player_url_from_arg # Attempt 1: _handle_vecloud with current_player_url_for_processing # The referer for _handle_vecloud is the origin of the channel page (channel_origin) # or the origin of the player itself if it is a /stream/ URL. try: referer_for_vecloud = channel_origin + "/" if re.search(r"/stream/([a-zA-Z0-9-]+)", current_player_url_for_processing): referer_for_vecloud = self._get_origin(current_player_url_for_processing) + "/" return await self._handle_vecloud(current_player_url_for_processing, referer_for_vecloud) except Exception: pass # Fail, Continue # Attempt 2: If _handle_vecloud fail and the URL is not /stream/, try _handle_playnow # and then _handle_vecloud again with the URL resulting from playnow. if not re.search(r"/stream/([a-zA-Z0-9-]+)", current_player_url_for_processing): try: playnow_derived_player_url = await self._handle_playnow(current_player_url_for_processing, channel_origin + "/") if re.search(r"/stream/([a-zA-Z0-9-]+)", playnow_derived_player_url): try: referer_for_vecloud_after_playnow = self._get_origin(playnow_derived_player_url) + "/" return await self._handle_vecloud(playnow_derived_player_url, referer_for_vecloud_after_playnow) except Exception: pass except Exception: pass # If all previous attempts have failed, proceed with standard authentication. player_url_for_auth = current_player_url_for_processing player_origin_for_auth = self._get_origin(player_url_for_auth) # Get player page to extract authentication information player_headers = { "referer": player_origin_for_auth + "/", "origin": player_origin_for_auth, "user-agent": self.base_headers["user-agent"], } player_response = await self._make_request(player_url_for_auth, headers=player_headers) player_content = player_response.text # Extract authentication details from script tag auth_data = self._extract_auth_data(player_content) if not auth_data: raise ExtractorError("Failed to extract authentication data from player") # Extract auth URL base if not provided final_auth_url_base = auth_url_base_from_arg if not final_auth_url_base: final_auth_url_base = self._extract_auth_url_base(player_content) # If still no auth URL base, try to derive from stream URL or player URL if not final_auth_url_base: if stream_url_from_arg: final_auth_url_base = self._get_origin(stream_url_from_arg) else: # Try to extract from player URL structure player_domain_for_auth_derive = self._get_origin(player_url_for_auth) # Attempt to construct a standard auth domain final_auth_url_base = self._derive_auth_url_base(player_domain_for_auth_derive) if not final_auth_url_base: raise ExtractorError("Could not determine auth URL base") # Construct auth URL auth_url = ( f"{final_auth_url_base}/auth.php?channel_id={auth_data['channel_key']}" f"&ts={auth_data['auth_ts']}&rnd={auth_data['auth_rnd']}" f"&sig={quote(auth_data['auth_sig'])}" ) # Make auth request auth_req_headers = { "referer": player_origin_for_auth + "/", "origin": player_origin_for_auth, "user-agent": self.base_headers["user-agent"], } auth_response = await self._make_request(auth_url, headers=auth_req_headers) # Check if authentication succeeded if auth_response.json().get("status") != "ok": raise ExtractorError("Authentication failed") # If no stream URL provided, look up the server and generate the stream URL final_stream_url = stream_url_from_arg if not final_stream_url: final_stream_url = await self._lookup_server( lookup_url_base=player_origin_for_auth, auth_url_base=final_auth_url_base, auth_data=auth_data, headers=auth_req_headers, ) # Set up the final stream headers stream_headers = { "referer": player_url_for_auth, "origin": player_origin_for_auth, "user-agent": self.base_headers["user-agent"], } # Return the stream URL with headers return { "destination_url": final_stream_url, "request_headers": stream_headers, "mediaflow_endpoint": self.mediaflow_endpoint, } except Exception as e: raise ExtractorError(f"Extraction failed: {str(e)}") async def _handle_vecloud(self, player_url: str, channel_referer: str) -> Dict[str, Any]: """Handle vecloud URLs with their specific API. Args: player_url: The vecloud player URL channel_referer: The referer of the channel page Returns: Dict containing stream URL and required headers """ try: # Extract stream ID from vecloud URL stream_id_match = re.search(r"/stream/([a-zA-Z0-9-]+)", player_url) if not stream_id_match: raise ExtractorError("Could not extract stream ID from vecloud URL") stream_id = stream_id_match.group(1) response = await self._make_request( player_url, headers={"referer": channel_referer, "user-agent": self.base_headers["user-agent"]} ) player_url = str(response.url) # Construct API URL player_parsed = urlparse(player_url) player_domain = player_parsed.netloc player_origin = f"{player_parsed.scheme}://{player_parsed.netloc}" api_url = f"{player_origin}/api/source/{stream_id}?type=live" # Set up headers for API request api_headers = { "referer": player_url, "origin": player_origin, "user-agent": self.base_headers["user-agent"], "content-type": "application/json", } api_data = {"r": channel_referer, "d": player_domain} # Make API request api_response = await self._make_request(api_url, method="POST", headers=api_headers, json=api_data) api_data = api_response.json() # Check if request was successful if not api_data.get("success"): raise ExtractorError("Vecloud API request failed") # Extract stream URL from response stream_url = api_data.get("player", {}).get("source_file") if not stream_url: raise ExtractorError("Could not find stream URL in vecloud response") # Set up stream headers stream_headers = { "referer": player_origin + "/", "origin": player_origin, "user-agent": self.base_headers["user-agent"], } # Return the stream URL with headers return { "destination_url": stream_url, "request_headers": stream_headers, "mediaflow_endpoint": self.mediaflow_endpoint, } except Exception as e: raise ExtractorError(f"Vecloud extraction failed: {str(e)}") async def _handle_playnow(self, player_iframe: str, channel_origin: str) -> str: """Handle playnow URLs.""" # Set up headers for the playnow request playnow_headers = {"referer": channel_origin + "/", "user-agent": self.base_headers["user-agent"]} # Make the playnow request playnow_response = await self._make_request(player_iframe, headers=playnow_headers) player_url = self._extract_player_url(playnow_response.text) if not player_url: raise ExtractorError("Could not extract player URL from playnow response") return player_url def _extract_player_url(self, html_content: str) -> Optional[str]: """Extract player iframe URL from channel page HTML.""" try: # Look for iframe with allowfullscreen attribute iframe_match = re.search( r']*src=["\']([^"\']+)["\'][^>]*allowfullscreen', html_content, re.IGNORECASE ) if not iframe_match: # Try alternative pattern without requiring allowfullscreen iframe_match = re.search( r']*src=["\']([^"\']+(?:premiumtv|daddylivehd|vecloud)[^"\']*)["\']', html_content, re.IGNORECASE, ) if iframe_match: return iframe_match.group(1).strip() return None except Exception: return None async def _lookup_server( self, lookup_url_base: str, auth_url_base: str, auth_data: Dict[str, str], headers: Dict[str, str] ) -> str: """Lookup server information and generate stream URL.""" try: # Construct server lookup URL server_lookup_url = f"{lookup_url_base}/server_lookup.php?channel_id={quote(auth_data['channel_key'])}" # Make server lookup request server_response = await self._make_request(server_lookup_url, headers=headers) server_data = server_response.json() server_key = server_data.get("server_key") if not server_key: raise ExtractorError("Failed to get server key") # Extract domain parts from auth URL for constructing stream URL auth_domain_parts = urlparse(auth_url_base).netloc.split(".") domain_suffix = ".".join(auth_domain_parts[1:]) if len(auth_domain_parts) > 1 else auth_domain_parts[0] # Generate the m3u8 URL based on server response pattern if "/" in server_key: # Handle special case like "top1/cdn" parts = server_key.split("/") return f"https://{parts[0]}.{domain_suffix}/{server_key}/{auth_data['channel_key']}/mono.m3u8" else: # Handle normal case return f"https://{server_key}new.{domain_suffix}/{server_key}/{auth_data['channel_key']}/mono.m3u8" except Exception as e: raise ExtractorError(f"Server lookup failed: {str(e)}") def _extract_auth_data(self, html_content: str) -> Dict[str, str]: """Extract authentication data from player page.""" try: # Extract channel key channel_key_match = re.search(r'var\s+channelKey\s*=\s*["\']([^"\']+)["\']', html_content) # Extract auth timestamp auth_ts_match = re.search(r'var\s+authTs\s*=\s*["\']([^"\']+)["\']', html_content) # Extract auth random value auth_rnd_match = re.search(r'var\s+authRnd\s*=\s*["\']([^"\']+)["\']', html_content) # Extract auth signature auth_sig_match = re.search(r'var\s+authSig\s*=\s*["\']([^"\']+)["\']', html_content) if not all([channel_key_match, auth_ts_match, auth_rnd_match, auth_sig_match]): return {} return { "channel_key": channel_key_match.group(1), "auth_ts": auth_ts_match.group(1), "auth_rnd": auth_rnd_match.group(1), "auth_sig": auth_sig_match.group(1), } except Exception: return {} def _extract_auth_url_base(self, html_content: str) -> Optional[str]: """Extract auth URL base from player page script content.""" try: # Look for auth URL or domain in fetchWithRetry call or similar patterns auth_url_match = re.search(r'fetchWithRetry\([\'"]([^\'"]*/auth\.php)', html_content) if auth_url_match: auth_url = auth_url_match.group(1) # Extract base URL up to the auth.php part return auth_url.split("/auth.php")[0] # Try finding domain directly domain_match = re.search(r'[\'"]https://([^/\'\"]+)(?:/[^\'\"]*)?/auth\.php', html_content) if domain_match: return f"https://{domain_match.group(1)}" return None except Exception: return None def _get_origin(self, url: str) -> str: """Extract origin from URL.""" parsed = urlparse(url) return f"{parsed.scheme}://{parsed.netloc}" def _derive_auth_url_base(self, player_domain: str) -> Optional[str]: """Attempt to derive auth URL base from player domain.""" try: # Typical pattern is to use a subdomain for auth domain parsed = urlparse(player_domain) domain_parts = parsed.netloc.split(".") # Get the top-level domain and second-level domain if len(domain_parts) >= 2: base_domain = ".".join(domain_parts[-2:]) # Try common subdomains for auth for prefix in ["auth", "api", "cdn"]: potential_auth_domain = f"https://{prefix}.{base_domain}" return potential_auth_domain return None except Exception: return None