updated to lastest version

This commit is contained in:
UrloMythus
2025-09-01 18:41:27 +02:00
parent bc41be6194
commit 8f8c3b195e
21 changed files with 2389 additions and 390 deletions

View File

@@ -40,6 +40,11 @@ class TransportConfig(BaseSettings):
verify=route.verify_ssl, proxy=route.proxy_url or self.proxy_url if route.proxy else None
)
# Hardcoded configuration for jxoplay.xyz domain - SSL verification disabled
mounts["all://jxoplay.xyz"] = transport_cls(
verify=False, proxy=self.proxy_url if self.all_proxy else None
)
# Set default proxy for all routes if enabled
if self.all_proxy:
mounts["all://"] = transport_cls(proxy=self.proxy_url)
@@ -63,6 +68,16 @@ class Settings(BaseSettings):
m3u8_content_routing: Literal["mediaflow", "stremio", "direct"] = (
"mediaflow" # Routing strategy for M3U8 content URLs: "mediaflow", "stremio", or "direct"
)
enable_hls_prebuffer: bool = False # Whether to enable HLS pre-buffering for improved streaming performance.
hls_prebuffer_segments: int = 5 # Number of segments to pre-buffer ahead.
hls_prebuffer_cache_size: int = 50 # Maximum number of segments to cache in memory.
hls_prebuffer_max_memory_percent: int = 80 # Maximum percentage of system memory to use for HLS pre-buffer cache.
hls_prebuffer_emergency_threshold: int = 90 # Emergency threshold percentage to trigger aggressive cache cleanup.
enable_dash_prebuffer: bool = False # Whether to enable DASH pre-buffering for improved streaming performance.
dash_prebuffer_segments: int = 5 # Number of segments to pre-buffer ahead.
dash_prebuffer_cache_size: int = 50 # Maximum number of segments to cache in memory.
dash_prebuffer_max_memory_percent: int = 80 # Maximum percentage of system memory to use for DASH pre-buffer cache.
dash_prebuffer_emergency_threshold: int = 90 # Emergency threshold percentage to trigger aggressive cache cleanup.
user_agent: str = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36" # The user agent to use for HTTP requests.

View File

@@ -29,7 +29,7 @@ class BaseExtractor(ABC):
"""Make HTTP request with error handling."""
try:
async with create_httpx_client() as client:
request_headers = self.base_headers
request_headers = self.base_headers.copy()
request_headers.update(headers or {})
response = await client.request(
method,
@@ -40,9 +40,9 @@ class BaseExtractor(ABC):
response.raise_for_status()
return response
except httpx.HTTPError as e:
raise ExtractorError(f"HTTP request failed: {str(e)}")
raise ExtractorError(f"HTTP request failed for URL {url}: {str(e)}")
except Exception as e:
raise ExtractorError(f"Request failed: {str(e)}")
raise ExtractorError(f"Request failed for URL {url}: {str(e)}")
@abstractmethod
async def extract(self, url: str, **kwargs) -> Dict[str, Any]:

View File

@@ -1,373 +1,501 @@
import re
from typing import Dict, Any, Optional
from urllib.parse import urlparse, quote
from mediaflow_proxy.extractors.base import BaseExtractor, ExtractorError
class DLHDExtractor(BaseExtractor):
"""DLHD (DaddyLive) URL extractor for M3U8 streams."""
def __init__(self, request_headers: dict):
super().__init__(request_headers)
# Default to HLS proxy endpoint
self.mediaflow_endpoint = "hls_manifest_proxy"
async def extract(self, url: str, **kwargs) -> Dict[str, Any]:
"""Extract DLHD stream URL and required headers.
Args:
url: The DaddyLive channel URL (required)
Keyword Args:
player_url: Direct player URL (optional)
stream_url: The stream URL (optional)
auth_url_base: Base URL for auth requests (optional)
Returns:
Dict containing stream URL and required headers
"""
try:
# Channel URL is required and serves as the referer
channel_url = url
channel_origin = self._get_origin(channel_url) # Channel page origin
# Check for direct parameters
player_url_from_arg = kwargs.get("player_url")
stream_url_from_arg = kwargs.get("stream_url")
auth_url_base_from_arg = kwargs.get("auth_url_base")
current_player_url_for_processing: str
# If player URL not provided, extract it from channel page
if not player_url_from_arg:
# Get the channel page to extract the player iframe URL
channel_headers = {
"referer": channel_origin + "/",
"origin": channel_origin,
"user-agent": self.base_headers["user-agent"],
}
channel_response = await self._make_request(channel_url, headers=channel_headers)
extracted_iframe_url = self._extract_player_url(channel_response.text)
if not extracted_iframe_url:
raise ExtractorError("Could not extract player URL from channel page")
current_player_url_for_processing = extracted_iframe_url
else:
current_player_url_for_processing = player_url_from_arg
# Attempt 1: _handle_vecloud with current_player_url_for_processing
# The referer for _handle_vecloud is the origin of the channel page (channel_origin)
# or the origin of the player itself if it is a /stream/ URL.
try:
referer_for_vecloud = channel_origin + "/"
if re.search(r"/stream/([a-zA-Z0-9-]+)", current_player_url_for_processing):
referer_for_vecloud = self._get_origin(current_player_url_for_processing) + "/"
return await self._handle_vecloud(current_player_url_for_processing, referer_for_vecloud)
except Exception:
pass # Fail, Continue
# Attempt 2: If _handle_vecloud fail and the URL is not /stream/, try _handle_playnow
# and then _handle_vecloud again with the URL resulting from playnow.
if not re.search(r"/stream/([a-zA-Z0-9-]+)", current_player_url_for_processing):
try:
playnow_derived_player_url = await self._handle_playnow(current_player_url_for_processing, channel_origin + "/")
if re.search(r"/stream/([a-zA-Z0-9-]+)", playnow_derived_player_url):
try:
referer_for_vecloud_after_playnow = self._get_origin(playnow_derived_player_url) + "/"
return await self._handle_vecloud(playnow_derived_player_url, referer_for_vecloud_after_playnow)
except Exception:
pass
except Exception:
pass
# If all previous attempts have failed, proceed with standard authentication.
player_url_for_auth = current_player_url_for_processing
player_origin_for_auth = self._get_origin(player_url_for_auth)
# Get player page to extract authentication information
player_headers = {
"referer": player_origin_for_auth + "/",
"origin": player_origin_for_auth,
"user-agent": self.base_headers["user-agent"],
}
player_response = await self._make_request(player_url_for_auth, headers=player_headers)
player_content = player_response.text
# Extract authentication details from script tag
auth_data = self._extract_auth_data(player_content)
if not auth_data:
raise ExtractorError("Failed to extract authentication data from player")
# Extract auth URL base if not provided
final_auth_url_base = auth_url_base_from_arg
if not final_auth_url_base:
final_auth_url_base = self._extract_auth_url_base(player_content)
# If still no auth URL base, try to derive from stream URL or player URL
if not final_auth_url_base:
if stream_url_from_arg:
final_auth_url_base = self._get_origin(stream_url_from_arg)
else:
# Try to extract from player URL structure
player_domain_for_auth_derive = self._get_origin(player_url_for_auth)
# Attempt to construct a standard auth domain
final_auth_url_base = self._derive_auth_url_base(player_domain_for_auth_derive)
if not final_auth_url_base:
raise ExtractorError("Could not determine auth URL base")
# Construct auth URL
auth_url = (
f"{final_auth_url_base}/auth.php?channel_id={auth_data['channel_key']}"
f"&ts={auth_data['auth_ts']}&rnd={auth_data['auth_rnd']}"
f"&sig={quote(auth_data['auth_sig'])}"
)
# Make auth request
auth_req_headers = {
"referer": player_origin_for_auth + "/",
"origin": player_origin_for_auth,
"user-agent": self.base_headers["user-agent"],
}
auth_response = await self._make_request(auth_url, headers=auth_req_headers)
# Check if authentication succeeded
if auth_response.json().get("status") != "ok":
raise ExtractorError("Authentication failed")
# If no stream URL provided, look up the server and generate the stream URL
final_stream_url = stream_url_from_arg
if not final_stream_url:
final_stream_url = await self._lookup_server(
lookup_url_base=player_origin_for_auth,
auth_url_base=final_auth_url_base,
auth_data=auth_data,
headers=auth_req_headers,
)
# Set up the final stream headers
stream_headers = {
"referer": player_url_for_auth,
"origin": player_origin_for_auth,
"user-agent": self.base_headers["user-agent"],
}
# Return the stream URL with headers
return {
"destination_url": final_stream_url,
"request_headers": stream_headers,
"mediaflow_endpoint": self.mediaflow_endpoint,
}
except Exception as e:
raise ExtractorError(f"Extraction failed: {str(e)}")
async def _handle_vecloud(self, player_url: str, channel_referer: str) -> Dict[str, Any]:
"""Handle vecloud URLs with their specific API.
Args:
player_url: The vecloud player URL
channel_referer: The referer of the channel page
Returns:
Dict containing stream URL and required headers
"""
try:
# Extract stream ID from vecloud URL
stream_id_match = re.search(r"/stream/([a-zA-Z0-9-]+)", player_url)
if not stream_id_match:
raise ExtractorError("Could not extract stream ID from vecloud URL")
stream_id = stream_id_match.group(1)
response = await self._make_request(
player_url, headers={"referer": channel_referer, "user-agent": self.base_headers["user-agent"]}
)
player_url = str(response.url)
# Construct API URL
player_parsed = urlparse(player_url)
player_domain = player_parsed.netloc
player_origin = f"{player_parsed.scheme}://{player_parsed.netloc}"
api_url = f"{player_origin}/api/source/{stream_id}?type=live"
# Set up headers for API request
api_headers = {
"referer": player_url,
"origin": player_origin,
"user-agent": self.base_headers["user-agent"],
"content-type": "application/json",
}
api_data = {"r": channel_referer, "d": player_domain}
# Make API request
api_response = await self._make_request(api_url, method="POST", headers=api_headers, json=api_data)
api_data = api_response.json()
# Check if request was successful
if not api_data.get("success"):
raise ExtractorError("Vecloud API request failed")
# Extract stream URL from response
stream_url = api_data.get("player", {}).get("source_file")
if not stream_url:
raise ExtractorError("Could not find stream URL in vecloud response")
# Set up stream headers
stream_headers = {
"referer": player_origin + "/",
"origin": player_origin,
"user-agent": self.base_headers["user-agent"],
}
# Return the stream URL with headers
return {
"destination_url": stream_url,
"request_headers": stream_headers,
"mediaflow_endpoint": self.mediaflow_endpoint,
}
except Exception as e:
raise ExtractorError(f"Vecloud extraction failed: {str(e)}")
async def _handle_playnow(self, player_iframe: str, channel_origin: str) -> str:
"""Handle playnow URLs."""
# Set up headers for the playnow request
playnow_headers = {"referer": channel_origin + "/", "user-agent": self.base_headers["user-agent"]}
# Make the playnow request
playnow_response = await self._make_request(player_iframe, headers=playnow_headers)
player_url = self._extract_player_url(playnow_response.text)
if not player_url:
raise ExtractorError("Could not extract player URL from playnow response")
return player_url
def _extract_player_url(self, html_content: str) -> Optional[str]:
"""Extract player iframe URL from channel page HTML."""
try:
# Look for iframe with allowfullscreen attribute
iframe_match = re.search(
r'<iframe[^>]*src=["\']([^"\']+)["\'][^>]*allowfullscreen', html_content, re.IGNORECASE
)
if not iframe_match:
# Try alternative pattern without requiring allowfullscreen
iframe_match = re.search(
r'<iframe[^>]*src=["\']([^"\']+(?:premiumtv|daddylivehd|vecloud)[^"\']*)["\']',
html_content,
re.IGNORECASE,
)
if iframe_match:
return iframe_match.group(1).strip()
return None
except Exception:
return None
async def _lookup_server(
self, lookup_url_base: str, auth_url_base: str, auth_data: Dict[str, str], headers: Dict[str, str]
) -> str:
"""Lookup server information and generate stream URL."""
try:
# Construct server lookup URL
server_lookup_url = f"{lookup_url_base}/server_lookup.php?channel_id={quote(auth_data['channel_key'])}"
# Make server lookup request
server_response = await self._make_request(server_lookup_url, headers=headers)
server_data = server_response.json()
server_key = server_data.get("server_key")
if not server_key:
raise ExtractorError("Failed to get server key")
# Extract domain parts from auth URL for constructing stream URL
auth_domain_parts = urlparse(auth_url_base).netloc.split(".")
domain_suffix = ".".join(auth_domain_parts[1:]) if len(auth_domain_parts) > 1 else auth_domain_parts[0]
# Generate the m3u8 URL based on server response pattern
if "/" in server_key:
# Handle special case like "top1/cdn"
parts = server_key.split("/")
return f"https://{parts[0]}.{domain_suffix}/{server_key}/{auth_data['channel_key']}/mono.m3u8"
else:
# Handle normal case
return f"https://{server_key}new.{domain_suffix}/{server_key}/{auth_data['channel_key']}/mono.m3u8"
except Exception as e:
raise ExtractorError(f"Server lookup failed: {str(e)}")
def _extract_auth_data(self, html_content: str) -> Dict[str, str]:
"""Extract authentication data from player page."""
try:
# Extract channel key
channel_key_match = re.search(r'var\s+channelKey\s*=\s*["\']([^"\']+)["\']', html_content)
# Extract auth timestamp
auth_ts_match = re.search(r'var\s+authTs\s*=\s*["\']([^"\']+)["\']', html_content)
# Extract auth random value
auth_rnd_match = re.search(r'var\s+authRnd\s*=\s*["\']([^"\']+)["\']', html_content)
# Extract auth signature
auth_sig_match = re.search(r'var\s+authSig\s*=\s*["\']([^"\']+)["\']', html_content)
if not all([channel_key_match, auth_ts_match, auth_rnd_match, auth_sig_match]):
return {}
return {
"channel_key": channel_key_match.group(1),
"auth_ts": auth_ts_match.group(1),
"auth_rnd": auth_rnd_match.group(1),
"auth_sig": auth_sig_match.group(1),
}
except Exception:
return {}
def _extract_auth_url_base(self, html_content: str) -> Optional[str]:
"""Extract auth URL base from player page script content."""
try:
# Look for auth URL or domain in fetchWithRetry call or similar patterns
auth_url_match = re.search(r'fetchWithRetry\([\'"]([^\'"]*/auth\.php)', html_content)
if auth_url_match:
auth_url = auth_url_match.group(1)
# Extract base URL up to the auth.php part
return auth_url.split("/auth.php")[0]
# Try finding domain directly
domain_match = re.search(r'[\'"]https://([^/\'\"]+)(?:/[^\'\"]*)?/auth\.php', html_content)
if domain_match:
return f"https://{domain_match.group(1)}"
return None
except Exception:
return None
def _get_origin(self, url: str) -> str:
"""Extract origin from URL."""
parsed = urlparse(url)
return f"{parsed.scheme}://{parsed.netloc}"
def _derive_auth_url_base(self, player_domain: str) -> Optional[str]:
"""Attempt to derive auth URL base from player domain."""
try:
# Typical pattern is to use a subdomain for auth domain
parsed = urlparse(player_domain)
domain_parts = parsed.netloc.split(".")
# Get the top-level domain and second-level domain
if len(domain_parts) >= 2:
base_domain = ".".join(domain_parts[-2:])
# Try common subdomains for auth
for prefix in ["auth", "api", "cdn"]:
potential_auth_domain = f"https://{prefix}.{base_domain}"
return potential_auth_domain
return None
except Exception:
return None
import re
import base64
import logging
from typing import Any, Dict, Optional
from urllib.parse import urlparse, quote, urlunparse
from mediaflow_proxy.extractors.base import BaseExtractor, ExtractorError
logger = logging.getLogger(__name__)
class DLHDExtractor(BaseExtractor):
"""DLHD (DaddyLive) URL extractor for M3U8 streams."""
def __init__(self, request_headers: dict):
super().__init__(request_headers)
# Default to HLS proxy endpoint
self.mediaflow_endpoint = "hls_manifest_proxy"
# Cache for the resolved base URL to avoid repeated network calls
self._cached_base_url = None
# Store iframe context for newkso.ru requests
self._iframe_context = None
def _get_headers_for_url(self, url: str, base_headers: dict) -> dict:
"""Get appropriate headers for the given URL, applying newkso.ru specific headers if needed."""
headers = base_headers.copy()
# Check if URL contains newkso.ru domain
parsed_url = urlparse(url)
if "newkso.ru" in parsed_url.netloc:
# Use iframe URL as referer if available, otherwise use the newkso domain itself
if self._iframe_context:
iframe_origin = f"https://{urlparse(self._iframe_context).netloc}"
newkso_headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36',
'Referer': self._iframe_context,
'Origin': iframe_origin
}
logger.info(f"Applied newkso.ru specific headers with iframe context for URL: {url}")
logger.debug(f"Headers applied: {newkso_headers}")
else:
# Fallback to newkso domain itself
newkso_origin = f"{parsed_url.scheme}://{parsed_url.netloc}"
newkso_headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36',
'Referer': newkso_origin,
'Origin': newkso_origin
}
logger.info(f"Applied newkso.ru specific headers (fallback) for URL: {url}")
logger.debug(f"Headers applied: {newkso_headers}")
headers.update(newkso_headers)
return headers
async def _make_request(self, url: str, method: str = "GET", headers: dict = None, **kwargs):
"""Override _make_request to apply newkso.ru specific headers when needed."""
request_headers = headers or {}
# Apply newkso.ru specific headers if the URL contains newkso.ru
final_headers = self._get_headers_for_url(url, request_headers)
return await super()._make_request(url, method, final_headers, **kwargs)
async def extract(self, url: str, **kwargs) -> Dict[str, Any]:
"""Extract DLHD stream URL and required headers (logica tvproxy adattata async, con fallback su endpoint alternativi)."""
from urllib.parse import urlparse, quote_plus
async def get_daddylive_base_url():
if self._cached_base_url:
return self._cached_base_url
try:
resp = await self._make_request("https://daddylive.sx/")
# resp.url is the final URL after redirects
base_url = str(resp.url)
if not base_url.endswith('/'):
base_url += '/'
self._cached_base_url = base_url
return base_url
except Exception:
# Fallback to default if request fails
return "https://daddylive.sx/"
def extract_channel_id(url):
match_premium = re.search(r'/premium(\d+)/mono\.m3u8$', url)
if match_premium:
return match_premium.group(1)
# Handle both normal and URL-encoded patterns
match_player = re.search(r'/(?:watch|stream|cast|player)/stream-(\d+)\.php', url)
if match_player:
return match_player.group(1)
# Handle URL-encoded patterns like %2Fstream%2Fstream-123.php or just stream-123.php
match_encoded = re.search(r'(?:%2F|/)stream-(\d+)\.php', url, re.IGNORECASE)
if match_encoded:
return match_encoded.group(1)
# Handle direct stream- pattern without path
match_direct = re.search(r'stream-(\d+)\.php', url)
if match_direct:
return match_direct.group(1)
return None
async def try_endpoint(baseurl, endpoint, channel_id):
stream_url = f"{baseurl}{endpoint}stream-{channel_id}.php"
daddy_origin = urlparse(baseurl).scheme + "://" + urlparse(baseurl).netloc
daddylive_headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36',
'Referer': baseurl,
'Origin': daddy_origin
}
# 1. Richiesta alla pagina stream/cast/player/watch
resp1 = await self._make_request(stream_url, headers=daddylive_headers)
# 2. Estrai link Player 2
iframes = re.findall(r'<a[^>]*href="([^"]+)"[^>]*>\s*<button[^>]*>\s*Player\s*2\s*</button>', resp1.text)
if not iframes:
raise ExtractorError("No Player 2 link found")
url2 = iframes[0]
url2 = baseurl + url2
url2 = url2.replace('//cast', '/cast')
daddylive_headers['Referer'] = url2
daddylive_headers['Origin'] = url2
# 3. Richiesta alla pagina Player 2
resp2 = await self._make_request(url2, headers=daddylive_headers)
# 4. Estrai iframe
iframes2 = re.findall(r'iframe src="([^"]*)', resp2.text)
if not iframes2:
raise ExtractorError("No iframe found in Player 2 page")
iframe_url = iframes2[0]
# Store iframe context for newkso.ru requests
self._iframe_context = iframe_url
resp3 = await self._make_request(iframe_url, headers=daddylive_headers)
iframe_content = resp3.text
# 5. Estrai parametri auth (robusto) - Handle both old and new formats
def extract_var_old_format(js, name):
# Try multiple patterns for variable extraction (old format)
patterns = [
rf'var (?:__)?{name}\s*=\s*atob\("([^"]+)"\)',
rf'var (?:__)?{name}\s*=\s*atob\(\'([^\']+)\'\)',
rf'(?:var\s+)?(?:__)?{name}\s*=\s*atob\s*\(\s*["\']([^"\']+)["\']\s*\)',
rf'(?:let|const)\s+(?:__)?{name}\s*=\s*atob\s*\(\s*["\']([^"\']+)["\']\s*\)'
]
for pattern in patterns:
m = re.search(pattern, js)
if m:
try:
return base64.b64decode(m.group(1)).decode('utf-8')
except Exception as decode_error:
logger.warning(f"Failed to decode base64 for variable {name}: {decode_error}")
continue
return None
def extract_bundle_format(js):
"""Extract parameters from new BUNDLE format"""
try:
# Look for BUNDLE variable
bundle_patterns = [
r'const\s+BUNDLE\s*=\s*["\']([^"\']+)["\']',
r'var\s+BUNDLE\s*=\s*["\']([^"\']+)["\']',
r'let\s+BUNDLE\s*=\s*["\']([^"\']+)["\']'
]
bundle_data = None
for pattern in bundle_patterns:
match = re.search(pattern, js)
if match:
bundle_data = match.group(1)
break
if not bundle_data:
return None
# Decode the bundle (base64 -> JSON -> decode each field)
import json
bundle_json = base64.b64decode(bundle_data).decode('utf-8')
bundle_obj = json.loads(bundle_json)
# Decode each base64 field
decoded_bundle = {}
for key, value in bundle_obj.items():
try:
decoded_bundle[key] = base64.b64decode(value).decode('utf-8')
except Exception as e:
logger.warning(f"Failed to decode bundle field {key}: {e}")
decoded_bundle[key] = value
return decoded_bundle
except Exception as e:
logger.warning(f"Failed to extract bundle format: {e}")
return None
# Try multiple patterns for channel key extraction
channel_key = None
channel_key_patterns = [
r'const\s+CHANNEL_KEY\s*=\s*["\']([^"\']+)["\']',
r'var\s+CHANNEL_KEY\s*=\s*["\']([^"\']+)["\']',
r'let\s+CHANNEL_KEY\s*=\s*["\']([^"\']+)["\']',
r'channelKey\s*=\s*["\']([^"\']+)["\']',
r'var\s+channelKey\s*=\s*["\']([^"\']+)["\']',
r'(?:let|const)\s+channelKey\s*=\s*["\']([^"\']+)["\']'
]
for pattern in channel_key_patterns:
match = re.search(pattern, iframe_content)
if match:
channel_key = match.group(1)
break
# Try new bundle format first
bundle_data = extract_bundle_format(iframe_content)
if bundle_data:
logger.info("Using new BUNDLE format for parameter extraction")
auth_host = bundle_data.get('b_host')
auth_php = bundle_data.get('b_script')
auth_ts = bundle_data.get('b_ts')
auth_rnd = bundle_data.get('b_rnd')
auth_sig = bundle_data.get('b_sig')
logger.debug(f"Bundle data extracted: {bundle_data}")
else:
logger.info("Falling back to old format for parameter extraction")
# Fall back to old format
auth_ts = extract_var_old_format(iframe_content, 'c')
auth_rnd = extract_var_old_format(iframe_content, 'd')
auth_sig = extract_var_old_format(iframe_content, 'e')
auth_host = extract_var_old_format(iframe_content, 'a')
auth_php = extract_var_old_format(iframe_content, 'b')
# Log what we found for debugging
logger.debug(f"Extracted parameters: channel_key={channel_key}, auth_ts={auth_ts}, auth_rnd={auth_rnd}, auth_sig={auth_sig}, auth_host={auth_host}, auth_php={auth_php}")
# Check which parameters are missing
missing_params = []
if not channel_key:
missing_params.append('channel_key/CHANNEL_KEY')
if not auth_ts:
missing_params.append('auth_ts (var c / b_ts)')
if not auth_rnd:
missing_params.append('auth_rnd (var d / b_rnd)')
if not auth_sig:
missing_params.append('auth_sig (var e / b_sig)')
if not auth_host:
missing_params.append('auth_host (var a / b_host)')
if not auth_php:
missing_params.append('auth_php (var b / b_script)')
if missing_params:
logger.error(f"Missing parameters: {', '.join(missing_params)}")
# Log a portion of the iframe content for debugging (first 2000 chars)
logger.debug(f"Iframe content sample: {iframe_content[:2000]}")
raise ExtractorError(f"Error extracting parameters: missing {', '.join(missing_params)}")
auth_sig = quote_plus(auth_sig)
# 6. Richiesta auth
auth_url = f'{auth_host}{auth_php}?channel_id={channel_key}&ts={auth_ts}&rnd={auth_rnd}&sig={auth_sig}'
auth_resp = await self._make_request(auth_url, headers=daddylive_headers)
# 7. Lookup server - Extract host parameter
host = None
host_patterns = [
r'(?s)m3u8 =.*?:.*?:.*?".*?".*?"([^"]*)', # Original pattern
r'm3u8\s*=.*?"([^"]*)"', # Simplified m3u8 pattern
r'host["\']?\s*[:=]\s*["\']([^"\']*)', # host: or host= pattern
r'["\']([^"\']*\.newkso\.ru[^"\']*)', # Direct newkso.ru pattern
r'["\']([^"\']*\/premium\d+[^"\']*)', # premium path pattern
r'url.*?["\']([^"\']*newkso[^"\']*)', # URL with newkso
]
for pattern in host_patterns:
matches = re.findall(pattern, iframe_content)
if matches:
host = matches[0]
logger.debug(f"Found host with pattern '{pattern}': {host}")
break
if not host:
logger.error("Failed to extract host from iframe content")
logger.debug(f"Iframe content for host extraction: {iframe_content[:2000]}")
# Try to find any newkso.ru related URLs
potential_hosts = re.findall(r'["\']([^"\']*newkso[^"\']*)', iframe_content)
if potential_hosts:
logger.debug(f"Potential host URLs found: {potential_hosts}")
raise ExtractorError("Failed to extract host parameter")
# Extract server lookup URL from fetchWithRetry call (dynamic extraction)
server_lookup = None
# Look for the server_lookup.php pattern in JavaScript
if "fetchWithRetry('/server_lookup.php?channel_id='" in iframe_content:
server_lookup = '/server_lookup.php?channel_id='
logger.debug('Found server lookup URL: /server_lookup.php?channel_id=')
elif '/server_lookup.php' in iframe_content:
# Try to extract the full path
js_lines = iframe_content.split('\n')
for js_line in js_lines:
if 'server_lookup.php' in js_line and 'fetchWithRetry' in js_line:
# Extract the URL from the fetchWithRetry call
start = js_line.find("'")
if start != -1:
end = js_line.find("'", start + 1)
if end != -1:
potential_url = js_line[start+1:end]
if 'server_lookup' in potential_url:
server_lookup = potential_url
logger.debug(f'Extracted server lookup URL: {server_lookup}')
break
if not server_lookup:
logger.error('Failed to extract server lookup URL from iframe content')
logger.debug(f'Iframe content sample: {iframe_content[:2000]}')
raise ExtractorError('Failed to extract server lookup URL')
server_lookup_url = f"https://{urlparse(iframe_url).netloc}{server_lookup}{channel_key}"
logger.debug(f"Server lookup URL: {server_lookup_url}")
try:
lookup_resp = await self._make_request(server_lookup_url, headers=daddylive_headers)
server_data = lookup_resp.json()
server_key = server_data.get('server_key')
if not server_key:
logger.error(f"No server_key in response: {server_data}")
raise ExtractorError("Failed to get server key from lookup response")
logger.info(f"Server lookup successful - Server key: {server_key}")
except Exception as lookup_error:
logger.error(f"Server lookup request failed: {lookup_error}")
raise ExtractorError(f"Server lookup failed: {str(lookup_error)}")
referer_raw = f'https://{urlparse(iframe_url).netloc}'
# Extract URL construction logic dynamically from JavaScript
# Simple approach: look for newkso.ru URLs and construct based on server_key
# Check if we have the special case server_key
if server_key == 'top1/cdn':
clean_m3u8_url = f'https://top1.newkso.ru/top1/cdn/{channel_key}/mono.m3u8'
logger.info(f'Using special case URL for server_key \'top1/cdn\': {clean_m3u8_url}')
else:
clean_m3u8_url = f'https://{server_key}new.newkso.ru/{server_key}/{channel_key}/mono.m3u8'
logger.info(f'Using general case URL for server_key \'{server_key}\': {clean_m3u8_url}')
logger.info(f'Generated stream URL: {clean_m3u8_url}')
logger.debug(f'Server key: {server_key}, Channel key: {channel_key}')
# Check if the final stream URL is on newkso.ru domain
if "newkso.ru" in clean_m3u8_url:
# For newkso.ru streams, use iframe URL as referer
stream_headers = {
'User-Agent': daddylive_headers['User-Agent'],
'Referer': iframe_url,
'Origin': referer_raw
}
logger.info(f"Applied iframe-specific headers for newkso.ru stream URL: {clean_m3u8_url}")
logger.debug(f"Stream headers for newkso.ru: {stream_headers}")
else:
# For other domains, use the original logic
stream_headers = {
'User-Agent': daddylive_headers['User-Agent'],
'Referer': referer_raw,
'Origin': referer_raw
}
return {
"destination_url": clean_m3u8_url,
"request_headers": stream_headers,
"mediaflow_endpoint": self.mediaflow_endpoint,
}
try:
clean_url = url
channel_id = extract_channel_id(clean_url)
if not channel_id:
raise ExtractorError(f"Unable to extract channel ID from {clean_url}")
baseurl = await get_daddylive_base_url()
endpoints = ["stream/", "cast/", "player/", "watch/"]
last_exc = None
for endpoint in endpoints:
try:
return await try_endpoint(baseurl, endpoint, channel_id)
except Exception as exc:
last_exc = exc
continue
raise ExtractorError(f"Extraction failed: {str(last_exc)}")
except Exception as e:
raise ExtractorError(f"Extraction failed: {str(e)}")
async def _lookup_server(
self, lookup_url_base: str, auth_url_base: str, auth_data: Dict[str, str], headers: Dict[str, str]
) -> str:
"""Lookup server information and generate stream URL."""
try:
# Construct server lookup URL
server_lookup_url = f"{lookup_url_base}/server_lookup.php?channel_id={quote(auth_data['channel_key'])}"
# Make server lookup request
server_response = await self._make_request(server_lookup_url, headers=headers)
server_data = server_response.json()
server_key = server_data.get("server_key")
if not server_key:
raise ExtractorError("Failed to get server key")
# Extract domain parts from auth URL for constructing stream URL
auth_domain_parts = urlparse(auth_url_base).netloc.split(".")
domain_suffix = ".".join(auth_domain_parts[1:]) if len(auth_domain_parts) > 1 else auth_domain_parts[0]
# Generate the m3u8 URL based on server response pattern
if "/" in server_key:
# Handle special case like "top1/cdn"
parts = server_key.split("/")
return f"https://{parts[0]}.{domain_suffix}/{server_key}/{auth_data['channel_key']}/mono.m3u8"
else:
# Handle normal case
return f"https://{server_key}new.{domain_suffix}/{server_key}/{auth_data['channel_key']}/mono.m3u8"
except Exception as e:
raise ExtractorError(f"Server lookup failed: {str(e)}")
def _extract_auth_data(self, html_content: str) -> Dict[str, str]:
"""Extract authentication data from player page."""
try:
channel_key_match = re.search(r'var\s+channelKey\s*=\s*["\']([^"\']+)["\']', html_content)
if not channel_key_match:
return {}
channel_key = channel_key_match.group(1)
# New pattern with atob
auth_ts_match = re.search(r'var\s+__c\s*=\s*atob\([\'"]([^\'"]+)[\'"]\)', html_content)
auth_rnd_match = re.search(r'var\s+__d\s*=\s*atob\([\'"]([^\'"]+)[\'"]\)', html_content)
auth_sig_match = re.search(r'var\s+__e\s*=\s*atob\([\'"]([^\'"]+)[\'"]\)', html_content)
if auth_ts_match and auth_rnd_match and auth_sig_match:
return {
"channel_key": channel_key,
"auth_ts": base64.b64decode(auth_ts_match.group(1)).decode("utf-8"),
"auth_rnd": base64.b64decode(auth_rnd_match.group(1)).decode("utf-8"),
"auth_sig": base64.b64decode(auth_sig_match.group(1)).decode("utf-8"),
}
# Original pattern
auth_ts_match = re.search(r'var\s+authTs\s*=\s*["\']([^"\']+)["\']', html_content)
auth_rnd_match = re.search(r'var\s+authRnd\s*=\s*["\']([^"\']+)["\']', html_content)
auth_sig_match = re.search(r'var\s+authSig\s*=\s*["\']([^"\']+)["\']', html_content)
if auth_ts_match and auth_rnd_match and auth_sig_match:
return {
"channel_key": channel_key,
"auth_ts": auth_ts_match.group(1),
"auth_rnd": auth_rnd_match.group(1),
"auth_sig": auth_sig_match.group(1),
}
return {}
except Exception:
return {}
def _extract_auth_url_base(self, html_content: str) -> Optional[str]:
"""Extract auth URL base from player page script content."""
try:
# New atob pattern for auth base URL
auth_url_base_match = re.search(r'var\s+__a\s*=\s*atob\([\'"]([^\'"]+)[\'"]\)', html_content)
if auth_url_base_match:
decoded_url = base64.b64decode(auth_url_base_match.group(1)).decode("utf-8")
return decoded_url.strip().rstrip("/")
# Look for auth URL or domain in fetchWithRetry call or similar patterns
auth_url_match = re.search(r'fetchWithRetry\([\'"]([^\'"]*/auth\.php)', html_content)
if auth_url_match:
auth_url = auth_url_match.group(1)
# Extract base URL up to the auth.php part
return auth_url.split("/auth.php")[0]
# Try finding domain directly
domain_match = re.search(r'[\'"]https://([^/\'\"]+)(?:/[^\'\"]*)?/auth\.php', html_content)
if domain_match:
return f"https://{domain_match.group(1)}"
return None
except Exception:
return None
def _get_origin(self, url: str) -> str:
"""Extract origin from URL."""
parsed = urlparse(url)
return f"{parsed.scheme}://{parsed.netloc}"
def _derive_auth_url_base(self, player_domain: str) -> Optional[str]:
"""Attempt to derive auth URL base from player domain."""
try:
# Typical pattern is to use a subdomain for auth domain
parsed = urlparse(player_domain)
domain_parts = parsed.netloc.split(".")
# Get the top-level domain and second-level domain
if len(domain_parts) >= 2:
base_domain = ".".join(domain_parts[-2:])
# Try common subdomains for auth
for prefix in ["auth", "api", "cdn"]:
potential_auth_domain = f"https://{prefix}.{base_domain}"
return potential_auth_domain
return None
except Exception:
return None

View File

@@ -10,6 +10,7 @@ from mediaflow_proxy.extractors.okru import OkruExtractor
from mediaflow_proxy.extractors.streamtape import StreamtapeExtractor
from mediaflow_proxy.extractors.supervideo import SupervideoExtractor
from mediaflow_proxy.extractors.uqload import UqloadExtractor
from mediaflow_proxy.extractors.vavoo import VavooExtractor
from mediaflow_proxy.extractors.vixcloud import VixCloudExtractor
from mediaflow_proxy.extractors.fastream import FastreamExtractor
@@ -27,6 +28,7 @@ class ExtractorFactory:
"Maxstream": MaxstreamExtractor,
"LiveTV": LiveTVExtractor,
"DLHD": DLHDExtractor,
"Vavoo": VavooExtractor,
"Fastream": FastreamExtractor
}

View File

@@ -13,10 +13,10 @@ class FastreamExtractor(BaseExtractor):
self.mediaflow_endpoint = "hls_manifest_proxy"
async def extract(self, url: str, **kwargs) -> Dict[str, Any]:
#Init headers needed for the request.
headers = {'Accept': '*/*', 'Connection': 'keep-alive','Accept-Language': 'en-US,en;q=0.5','Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:138.0) Gecko/20100101 Firefox/138.0'}
"""Extract Fastream URL."""
final_url = await eval_solver(self,url,headers, r'file:"(.*?)"')
pattern = r'file:"(.*?)"'
final_url = await eval_solver(self, url, headers, pattern)
self.base_headers["referer"] = f'https://{url.replace("https://","").split("/")[0]}/'
self.base_headers["origin"] = f'https://{url.replace("https://","").split("/")[0]}'

View File

@@ -13,8 +13,9 @@ class MixdropExtractor(BaseExtractor):
url = url.replace("club", "ps").split("/2")[0]
headers = {"accept-language": "en-US,en;q=0.5"}
pattern = r'MDCore.wurl ?= ?"(.*?)"'
final_url = f"https:{await eval_solver(self, url, headers, r'MDCore.wurl ?= ?"(.*?)"')}"
final_url = f"https:{await eval_solver(self, url, headers, pattern)}"
self.base_headers["referer"] = url
return {

View File

@@ -14,12 +14,10 @@ class SupervideoExtractor(BaseExtractor):
self.mediaflow_endpoint = "hls_manifest_proxy"
async def extract(self, url: str, **kwargs) -> Dict[str, Any]:
#Init headers needed for the request.
headers = {'Accept': '*/*', 'Connection': 'keep-alive', 'User-Agent': 'Mozilla/5.0 (Linux; Android 12) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.71 Mobile Safari/537.36', 'user-agent': 'Mozilla/5.0 (Linux; Android 12) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.71 Mobile Safari/537.36'}
pattern = r'file:"(.*?)"'
"""Extract Supervideo URL."""
final_url = await eval_solver(self,url,headers, r'file:"(.*?)"')
final_url = await eval_solver(self, url, headers, pattern)
self.base_headers["referer"] = url
return {

View File

@@ -0,0 +1,169 @@
import logging
from typing import Any, Dict, Optional
from mediaflow_proxy.extractors.base import BaseExtractor, ExtractorError
logger = logging.getLogger(__name__)
class VavooExtractor(BaseExtractor):
"""Vavoo URL extractor for resolving vavoo.to links (solo httpx, async)."""
def __init__(self, request_headers: dict):
super().__init__(request_headers)
self.mediaflow_endpoint = "proxy_stream_endpoint"
async def get_auth_signature(self) -> Optional[str]:
"""Get authentication signature for Vavoo API (async, httpx, pulito)."""
headers = {
"user-agent": "okhttp/4.11.0",
"accept": "application/json",
"content-type": "application/json; charset=utf-8",
"accept-encoding": "gzip"
}
import time
current_time = int(time.time() * 1000)
data = {
"token": "tosFwQCJMS8qrW_AjLoHPQ41646J5dRNha6ZWHnijoYQQQoADQoXYSo7ki7O5-CsgN4CH0uRk6EEoJ0728ar9scCRQW3ZkbfrPfeCXW2VgopSW2FWDqPOoVYIuVPAOnXCZ5g",
"reason": "app-blur",
"locale": "de",
"theme": "dark",
"metadata": {
"device": {
"type": "Handset",
"brand": "google",
"model": "Pixel",
"name": "sdk_gphone64_arm64",
"uniqueId": "d10e5d99ab665233"
},
"os": {
"name": "android",
"version": "13",
"abis": ["arm64-v8a", "armeabi-v7a", "armeabi"],
"host": "android"
},
"app": {
"platform": "android",
"version": "3.1.21",
"buildId": "289515000",
"engine": "hbc85",
"signatures": ["6e8a975e3cbf07d5de823a760d4c2547f86c1403105020adee5de67ac510999e"],
"installer": "app.revanced.manager.flutter"
},
"version": {
"package": "tv.vavoo.app",
"binary": "3.1.21",
"js": "3.1.21"
}
},
"appFocusTime": 0,
"playerActive": False,
"playDuration": 0,
"devMode": False,
"hasAddon": True,
"castConnected": False,
"package": "tv.vavoo.app",
"version": "3.1.21",
"process": "app",
"firstAppStart": current_time,
"lastAppStart": current_time,
"ipLocation": "",
"adblockEnabled": True,
"proxy": {
"supported": ["ss", "openvpn"],
"engine": "ss",
"ssVersion": 1,
"enabled": True,
"autoServer": True,
"id": "de-fra"
},
"iap": {
"supported": False
}
}
try:
resp = await self._make_request(
"https://www.vavoo.tv/api/app/ping",
method="POST",
json=data,
headers=headers
)
result = resp.json()
addon_sig = result.get("addonSig")
if addon_sig:
logger.info("Successfully obtained Vavoo authentication signature")
return addon_sig
else:
logger.warning("No addonSig in Vavoo API response")
return None
except Exception as e:
logger.exception(f"Failed to get Vavoo authentication signature: {str(e)}")
return None
async def extract(self, url: str, **kwargs) -> Dict[str, Any]:
"""Extract Vavoo stream URL (async, httpx)."""
if "vavoo.to" not in url:
raise ExtractorError("Not a valid Vavoo URL")
# Get authentication signature
signature = await self.get_auth_signature()
if not signature:
raise ExtractorError("Failed to get Vavoo authentication signature")
# Resolve the URL
resolved_url = await self._resolve_vavoo_link(url, signature)
if not resolved_url:
raise ExtractorError("Failed to resolve Vavoo URL")
# Set up headers for the resolved stream
stream_headers = {
"user-agent": self.base_headers["user-agent"],
"referer": "https://vavoo.to/",
}
return {
"destination_url": resolved_url,
"request_headers": stream_headers,
"mediaflow_endpoint": self.mediaflow_endpoint,
}
async def _resolve_vavoo_link(self, link: str, signature: str) -> Optional[str]:
"""Resolve a Vavoo link using the MediaHubMX API (async, httpx)."""
headers = {
"user-agent": "MediaHubMX/2",
"accept": "application/json",
"content-type": "application/json; charset=utf-8",
"accept-encoding": "gzip",
"mediahubmx-signature": signature
}
data = {
"language": "de",
"region": "AT",
"url": link,
"clientVersion": "3.1.21"
}
try:
logger.info(f"Attempting to resolve Vavoo URL: {link}")
resp = await self._make_request(
"https://vavoo.to/mediahubmx-resolve.json",
method="POST",
json=data,
headers=headers
)
result = resp.json()
logger.info(f"Vavoo API response: {result}")
if isinstance(result, list) and result and result[0].get("url"):
resolved_url = result[0]["url"]
logger.info(f"Successfully resolved Vavoo URL to: {resolved_url}")
return resolved_url
elif isinstance(result, dict) and result.get("url"):
resolved_url = result["url"]
logger.info(f"Successfully resolved Vavoo URL to: {resolved_url}")
return resolved_url
else:
logger.warning(f"No URL found in Vavoo API response: {result}")
return None
except Exception as e:
logger.exception(f"Vavoo resolution failed for URL {link}: {str(e)}")
raise ExtractorError(f"Vavoo resolution failed: {str(e)}") from e

View File

@@ -85,6 +85,20 @@ async def handle_hls_stream_proxy(
proxy_headers.request.update({"range": content_range})
try:
# Auto-detect and resolve Vavoo links
if "vavoo.to" in hls_params.destination:
try:
from mediaflow_proxy.extractors.vavoo import VavooExtractor
vavoo_extractor = VavooExtractor(proxy_headers.request)
resolved_data = await vavoo_extractor.extract(hls_params.destination)
resolved_url = resolved_data["destination_url"]
logger.info(f"Auto-resolved Vavoo URL: {hls_params.destination} -> {resolved_url}")
# Update destination with resolved URL
hls_params.destination = resolved_url
except Exception as e:
logger.warning(f"Failed to auto-resolve Vavoo URL: {e}")
# Continue with original URL if resolution fails
# If force_playlist_proxy is enabled, skip detection and directly process as m3u8
if hls_params.force_playlist_proxy:
return await fetch_and_process_m3u8(
@@ -141,6 +155,20 @@ async def handle_stream_request(
client, streamer = await setup_client_and_streamer()
try:
# Auto-detect and resolve Vavoo links
if "vavoo.to" in video_url:
try:
from mediaflow_proxy.extractors.vavoo import VavooExtractor
vavoo_extractor = VavooExtractor(proxy_headers.request)
resolved_data = await vavoo_extractor.extract(video_url)
resolved_url = resolved_data["destination_url"]
logger.info(f"Auto-resolved Vavoo URL: {video_url} -> {resolved_url}")
# Update video_url with resolved URL
video_url = resolved_url
except Exception as e:
logger.warning(f"Failed to auto-resolve Vavoo URL: {e}")
# Continue with original URL if resolution fails
await streamer.create_streaming_response(video_url, proxy_headers.request)
response_headers = prepare_response_headers(streamer.response.headers, proxy_headers.response)

View File

@@ -10,10 +10,11 @@ from starlette.staticfiles import StaticFiles
from mediaflow_proxy.configs import settings
from mediaflow_proxy.middleware import UIAccessControlMiddleware
from mediaflow_proxy.routes import proxy_router, extractor_router, speedtest_router
from mediaflow_proxy.routes import proxy_router, extractor_router, speedtest_router, playlist_builder_router
from mediaflow_proxy.schemas import GenerateUrlRequest, GenerateMultiUrlRequest, MultiUrlRequestItem
from mediaflow_proxy.utils.crypto_utils import EncryptionHandler, EncryptionMiddleware
from mediaflow_proxy.utils.http_utils import encode_mediaflow_proxy_url
from mediaflow_proxy.utils.base64_utils import encode_url_to_base64, decode_base64_url, is_base64_url
logging.basicConfig(level=settings.log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
app = FastAPI()
@@ -99,10 +100,15 @@ async def generate_url(request: GenerateUrlRequest):
# Convert IP to string if provided
ip_str = str(request.ip) if request.ip else None
# Handle base64 encoding of destination URL if requested
destination_url = request.destination_url
if request.base64_encode_destination and destination_url:
destination_url = encode_url_to_base64(destination_url)
encoded_url = encode_mediaflow_proxy_url(
mediaflow_proxy_url=request.mediaflow_proxy_url,
endpoint=request.endpoint,
destination_url=request.destination_url,
destination_url=destination_url,
query_params=query_params,
request_headers=request.request_headers,
response_headers=request.response_headers,
@@ -156,9 +162,83 @@ async def generate_urls(request: GenerateMultiUrlRequest):
return {"urls": encoded_urls}
@app.post(
"/base64/encode",
description="Encode a URL to base64 format",
response_description="Returns the base64 encoded URL",
tags=["base64"],
)
async def encode_url_base64(url: str):
"""
Encode a URL to base64 format.
Args:
url (str): The URL to encode.
Returns:
dict: A dictionary containing the encoded URL.
"""
try:
encoded_url = encode_url_to_base64(url)
return {"encoded_url": encoded_url, "original_url": url}
except Exception as e:
raise HTTPException(status_code=400, detail=f"Failed to encode URL: {str(e)}")
@app.post(
"/base64/decode",
description="Decode a base64 encoded URL",
response_description="Returns the decoded URL",
tags=["base64"],
)
async def decode_url_base64(encoded_url: str):
"""
Decode a base64 encoded URL.
Args:
encoded_url (str): The base64 encoded URL to decode.
Returns:
dict: A dictionary containing the decoded URL.
"""
decoded_url = decode_base64_url(encoded_url)
if decoded_url is None:
raise HTTPException(status_code=400, detail="Invalid base64 encoded URL")
return {"decoded_url": decoded_url, "encoded_url": encoded_url}
@app.get(
"/base64/check",
description="Check if a string appears to be a base64 encoded URL",
response_description="Returns whether the string is likely base64 encoded",
tags=["base64"],
)
async def check_base64_url(url: str):
"""
Check if a string appears to be a base64 encoded URL.
Args:
url (str): The string to check.
Returns:
dict: A dictionary indicating if the string is likely base64 encoded.
"""
is_base64 = is_base64_url(url)
result = {"url": url, "is_base64": is_base64}
if is_base64:
decoded_url = decode_base64_url(url)
if decoded_url:
result["decoded_url"] = decoded_url
return result
app.include_router(proxy_router, prefix="/proxy", tags=["proxy"], dependencies=[Depends(verify_api_key)])
app.include_router(extractor_router, prefix="/extractor", tags=["extractors"], dependencies=[Depends(verify_api_key)])
app.include_router(speedtest_router, prefix="/speedtest", tags=["speedtest"], dependencies=[Depends(verify_api_key)])
app.include_router(playlist_builder_router, prefix="/playlist", tags=["playlist"])
static_path = resources.files("mediaflow_proxy").joinpath("static")
app.mount("/", StaticFiles(directory=str(static_path), html=True), name="static")

View File

@@ -1,3 +1,4 @@
import asyncio
import logging
import math
import time
@@ -7,6 +8,8 @@ from fastapi import Request, Response, HTTPException
from mediaflow_proxy.drm.decrypter import decrypt_segment
from mediaflow_proxy.utils.crypto_utils import encryption_handler
from mediaflow_proxy.utils.http_utils import encode_mediaflow_proxy_url, get_original_scheme, ProxyRequestHeaders
from mediaflow_proxy.utils.dash_prebuffer import dash_prebuffer
from mediaflow_proxy.configs import settings
logger = logging.getLogger(__name__)
@@ -28,6 +31,23 @@ async def process_manifest(
Response: The HLS manifest as an HTTP response.
"""
hls_content = build_hls(mpd_dict, request, key_id, key)
# Start DASH pre-buffering in background if enabled
if settings.enable_dash_prebuffer:
# Extract headers for pre-buffering
headers = {}
for key, value in request.query_params.items():
if key.startswith("h_"):
headers[key[2:]] = value
# Get the original MPD URL from the request
mpd_url = request.query_params.get("d", "")
if mpd_url:
# Start pre-buffering in background
asyncio.create_task(
dash_prebuffer.prebuffer_dash_manifest(mpd_url, headers)
)
return Response(content=hls_content, media_type="application/vnd.apple.mpegurl", headers=proxy_headers.response)

View File

@@ -1,5 +1,6 @@
from .proxy import proxy_router
from .extractor import extractor_router
from .speedtest import speedtest_router
from .playlist_builder import playlist_builder_router
__all__ = ["proxy_router", "extractor_router", "speedtest_router"]
__all__ = ["proxy_router", "extractor_router", "speedtest_router", "playlist_builder_router"]

View File

@@ -14,6 +14,7 @@ from mediaflow_proxy.utils.http_utils import (
ProxyRequestHeaders,
get_proxy_headers,
)
from mediaflow_proxy.utils.base64_utils import process_potential_base64_url
extractor_router = APIRouter()
logger = logging.getLogger(__name__)
@@ -28,6 +29,10 @@ async def extract_url(
):
"""Extract clean links from various video hosting services."""
try:
# Process potential base64 encoded destination URL
processed_destination = process_potential_base64_url(extractor_params.destination)
extractor_params.destination = processed_destination
cache_key = f"{extractor_params.host}_{extractor_params.model_dump_json()}"
response = await get_cached_extractor_result(cache_key)
if not response:

View File

@@ -0,0 +1,270 @@
import json
import logging
import urllib.parse
from typing import Iterator, Dict, Optional
from fastapi import APIRouter, Request, HTTPException, Query
from fastapi.responses import StreamingResponse
from starlette.responses import RedirectResponse
import httpx
from mediaflow_proxy.configs import settings
from mediaflow_proxy.utils.http_utils import get_original_scheme
import asyncio
logger = logging.getLogger(__name__)
playlist_builder_router = APIRouter()
def rewrite_m3u_links_streaming(m3u_lines_iterator: Iterator[str], base_url: str, api_password: Optional[str]) -> Iterator[str]:
"""
Riscrive i link da un iteratore di linee M3U secondo le regole specificate,
includendo gli headers da #EXTVLCOPT e #EXTHTTP. Yields rewritten lines.
"""
current_ext_headers: Dict[str, str] = {} # Dizionario per conservare gli headers dalle direttive
for line_with_newline in m3u_lines_iterator:
line_content = line_with_newline.rstrip('\n')
logical_line = line_content.strip()
is_header_tag = False
if logical_line.startswith('#EXTVLCOPT:'):
is_header_tag = True
try:
option_str = logical_line.split(':', 1)[1]
if '=' in option_str:
key_vlc, value_vlc = option_str.split('=', 1)
key_vlc = key_vlc.strip()
value_vlc = value_vlc.strip()
# Gestione speciale per http-header che contiene "Key: Value"
if key_vlc == 'http-header' and ':' in value_vlc:
header_key, header_value = value_vlc.split(':', 1)
header_key = header_key.strip()
header_value = header_value.strip()
current_ext_headers[header_key] = header_value
elif key_vlc.startswith('http-'):
# Gestisce http-user-agent, http-referer etc.
header_key = '-'.join(word.capitalize() for word in key_vlc[len('http-'):].split('-'))
current_ext_headers[header_key] = value_vlc
except Exception as e:
logger.error(f"⚠️ Error parsing #EXTVLCOPT '{logical_line}': {e}")
elif logical_line.startswith('#EXTHTTP:'):
is_header_tag = True
try:
json_str = logical_line.split(':', 1)[1]
# Sostituisce tutti gli header correnti con quelli del JSON
current_ext_headers = json.loads(json_str)
except Exception as e:
logger.error(f"⚠️ Error parsing #EXTHTTP '{logical_line}': {e}")
current_ext_headers = {} # Resetta in caso di errore
if is_header_tag:
yield line_with_newline
continue
if logical_line and not logical_line.startswith('#') and \
('http://' in logical_line or 'https://' in logical_line):
processed_url_content = logical_line
# Non modificare link pluto.tv
if 'pluto.tv' in logical_line:
processed_url_content = logical_line
elif 'vavoo.to' in logical_line:
encoded_url = urllib.parse.quote(logical_line, safe='')
processed_url_content = f"{base_url}/proxy/hls/manifest.m3u8?d={encoded_url}"
elif 'vixsrc.to' in logical_line:
encoded_url = urllib.parse.quote(logical_line, safe='')
processed_url_content = f"{base_url}/extractor/video?host=VixCloud&redirect_stream=true&d={encoded_url}"
elif '.m3u8' in logical_line:
encoded_url = urllib.parse.quote(logical_line, safe='')
processed_url_content = f"{base_url}/proxy/hls/manifest.m3u8?d={encoded_url}"
elif '.mpd' in logical_line:
# Estrai parametri DRM dall'URL MPD se presenti
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
# Parse dell'URL per estrarre parametri
parsed_url = urlparse(logical_line)
query_params = parse_qs(parsed_url.query)
# Estrai key_id e key se presenti
key_id = query_params.get('key_id', [None])[0]
key = query_params.get('key', [None])[0]
# Rimuovi key_id e key dai parametri originali
clean_params = {k: v for k, v in query_params.items() if k not in ['key_id', 'key']}
# Ricostruisci l'URL senza i parametri DRM
clean_query = urlencode(clean_params, doseq=True) if clean_params else ''
clean_url = urlunparse((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
clean_query,
parsed_url.fragment
))
# Encode the MPD URL like other URL types
clean_url_for_param = urllib.parse.quote(clean_url, safe='')
# Costruisci l'URL MediaFlow con parametri DRM separati
processed_url_content = f"{base_url}/proxy/mpd/manifest.m3u8?d={clean_url_for_param}"
# Aggiungi parametri DRM se presenti
if key_id:
processed_url_content += f"&key_id={key_id}"
if key:
processed_url_content += f"&key={key}"
elif '.php' in logical_line:
encoded_url = urllib.parse.quote(logical_line, safe='')
processed_url_content = f"{base_url}/proxy/hls/manifest.m3u8?d={encoded_url}"
else:
# Per tutti gli altri link senza estensioni specifiche, trattali come .m3u8 con codifica
encoded_url = urllib.parse.quote(logical_line, safe='')
processed_url_content = f"{base_url}/proxy/hls/manifest.m3u8?d={encoded_url}"
# Applica gli header raccolti prima di api_password
if current_ext_headers:
header_params_str = "".join([f"&h_{urllib.parse.quote(key)}={urllib.parse.quote(value)}" for key, value in current_ext_headers.items()])
processed_url_content += header_params_str
current_ext_headers = {}
# Aggiungi api_password sempre alla fine
if api_password:
processed_url_content += f"&api_password={api_password}"
yield processed_url_content + '\n'
else:
yield line_with_newline
async def async_download_m3u_playlist(url: str) -> list[str]:
"""Scarica una playlist M3U in modo asincrono e restituisce le righe."""
headers = {
'User-Agent': settings.user_agent,
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive'
}
lines = []
try:
async with httpx.AsyncClient(verify=True, timeout=30) as client:
async with client.stream('GET', url, headers=headers) as response:
response.raise_for_status()
async for line_bytes in response.aiter_lines():
if isinstance(line_bytes, bytes):
decoded_line = line_bytes.decode('utf-8', errors='replace')
else:
decoded_line = str(line_bytes)
lines.append(decoded_line + '\n' if decoded_line else '')
except Exception as e:
logger.error(f"Error downloading playlist (async): {str(e)}")
raise
return lines
async def async_generate_combined_playlist(playlist_definitions: list[str], base_url: str, api_password: Optional[str]):
"""Genera una playlist combinata da multiple definizioni, scaricando in parallelo."""
# Prepara gli URL
playlist_urls = []
for definition in playlist_definitions:
if '&' in definition:
parts = definition.split('&', 1)
playlist_url_str = parts[1] if len(parts) > 1 else parts[0]
else:
playlist_url_str = definition
playlist_urls.append(playlist_url_str)
# Scarica tutte le playlist in parallelo
results = await asyncio.gather(*[async_download_m3u_playlist(url) for url in playlist_urls], return_exceptions=True)
first_playlist_header_handled = False
for idx, lines in enumerate(results):
if isinstance(lines, Exception):
yield f"# ERROR processing playlist {playlist_urls[idx]}: {str(lines)}\n"
continue
playlist_lines: list[str] = lines # type: ignore
current_playlist_had_lines = False
first_line_of_this_segment = True
lines_processed_for_current_playlist = 0
rewritten_lines_iter = rewrite_m3u_links_streaming(iter(playlist_lines), base_url, api_password)
for line in rewritten_lines_iter:
current_playlist_had_lines = True
is_extm3u_line = line.strip().startswith('#EXTM3U')
lines_processed_for_current_playlist += 1
if not first_playlist_header_handled:
yield line
if is_extm3u_line:
first_playlist_header_handled = True
else:
if first_line_of_this_segment and is_extm3u_line:
pass
else:
yield line
first_line_of_this_segment = False
if current_playlist_had_lines and not first_playlist_header_handled:
first_playlist_header_handled = True
@playlist_builder_router.get("/playlist")
async def proxy_handler(
request: Request,
d: str = Query(..., description="Query string con le definizioni delle playlist", alias="d"),
api_password: Optional[str] = Query(None, description="Password API per MFP"),
):
"""
Endpoint per il proxy delle playlist M3U con supporto MFP.
Formato query string: playlist1&url1;playlist2&url2
Esempio: https://mfp.com:pass123&http://provider.com/playlist.m3u
"""
try:
if not d:
raise HTTPException(status_code=400, detail="Query string mancante")
if not d.strip():
raise HTTPException(status_code=400, detail="Query string cannot be empty")
# Validate that we have at least one valid definition
playlist_definitions = [def_.strip() for def_ in d.split(';') if def_.strip()]
if not playlist_definitions:
raise HTTPException(status_code=400, detail="No valid playlist definitions found")
# Costruisci base_url con lo schema corretto
original_scheme = get_original_scheme(request)
base_url = f"{original_scheme}://{request.url.netloc}"
# Estrai base_url dalla prima definizione se presente
if playlist_definitions and '&' in playlist_definitions[0]:
parts = playlist_definitions[0].split('&', 1)
if ':' in parts[0] and not parts[0].startswith('http'):
# Estrai base_url dalla prima parte se contiene password
base_url_part = parts[0].rsplit(':', 1)[0]
if base_url_part.startswith('http'):
base_url = base_url_part
async def generate_response():
async for line in async_generate_combined_playlist(playlist_definitions, base_url, api_password):
yield line
return StreamingResponse(
generate_response(),
media_type='application/vnd.apple.mpegurl',
headers={
'Content-Disposition': 'attachment; filename="playlist.m3u"',
'Access-Control-Allow-Origin': '*'
}
)
except Exception as e:
logger.error(f"General error in playlist handler: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error: {str(e)}") from e
@playlist_builder_router.get("/builder")
async def url_builder():
"""
Pagina con un'interfaccia per generare l'URL del proxy MFP.
"""
return RedirectResponse(url="/playlist_builder.html")

View File

@@ -1,10 +1,14 @@
from typing import Annotated
from urllib.parse import quote
from urllib.parse import quote, unquote
import re
import logging
from fastapi import Request, Depends, APIRouter, Query, HTTPException
from fastapi.responses import Response, RedirectResponse
from mediaflow_proxy.handlers import (
handle_hls_stream_proxy,
handle_stream_request,
proxy_stream,
get_manifest,
get_playlist,
@@ -18,10 +22,144 @@ from mediaflow_proxy.schemas import (
MPDManifestParams,
)
from mediaflow_proxy.utils.http_utils import get_proxy_headers, ProxyRequestHeaders
from mediaflow_proxy.utils.base64_utils import process_potential_base64_url
proxy_router = APIRouter()
def sanitize_url(url: str) -> str:
"""
Sanitize URL to fix common encoding issues and handle base64 encoded URLs.
Args:
url (str): The URL to sanitize.
Returns:
str: The sanitized URL.
"""
logger = logging.getLogger(__name__)
original_url = url
# First, try to process potential base64 encoded URLs
url = process_potential_base64_url(url)
# Fix malformed URLs where https%22// should be https://
url = re.sub(r'https%22//', 'https://', url)
url = re.sub(r'http%22//', 'http://', url)
# Fix malformed URLs where https%3A%22// should be https://
url = re.sub(r'https%3A%22//', 'https://', url)
url = re.sub(r'http%3A%22//', 'http://', url)
# Fix malformed URLs where https:"// should be https:// (after partial decoding)
url = re.sub(r'https:"//', 'https://', url)
url = re.sub(r'http:"//', 'http://', url)
# Fix URLs where key_id and key parameters are incorrectly appended to the base URL
# This happens when the URL contains &key_id= and &key= which should be handled as proxy parameters
if '&key_id=' in url and '&key=' in url:
# Split the URL at the first occurrence of &key_id= to separate the base URL from the incorrectly appended parameters
base_url = url.split('&key_id=')[0]
logger.info(f"Removed incorrectly appended key parameters from URL: '{url}' -> '{base_url}'")
url = base_url
# Log if URL was changed
if url != original_url:
logger.info(f"URL sanitized: '{original_url}' -> '{url}'")
# Also try URL decoding to see what we get
try:
decoded_url = unquote(url)
if decoded_url != url:
logger.info(f"URL after decoding: '{decoded_url}'")
# If after decoding we still have malformed protocol, fix it
if ':"/' in decoded_url:
# Fix https:"// or http:"// patterns
fixed_decoded = re.sub(r'([a-z]+):"//', r'\1://', decoded_url)
logger.info(f"Fixed decoded URL: '{fixed_decoded}'")
return fixed_decoded
except Exception as e:
logger.warning(f"Error decoding URL '{url}': {e}")
return url
def extract_drm_params_from_url(url: str) -> tuple[str, str, str]:
"""
Extract DRM parameters (key_id and key) from a URL if they are incorrectly appended.
Args:
url (str): The URL that may contain appended DRM parameters.
Returns:
tuple: (clean_url, key_id, key) where clean_url has the parameters removed,
and key_id/key are the extracted values (or None if not found).
"""
logger = logging.getLogger(__name__)
key_id = None
key = None
clean_url = url
# Check if URL contains incorrectly appended key_id and key parameters
if '&key_id=' in url and '&key=' in url:
# Extract key_id
key_id_match = re.search(r'&key_id=([^&]+)', url)
if key_id_match:
key_id = key_id_match.group(1)
# Extract key
key_match = re.search(r'&key=([^&]+)', url)
if key_match:
key = key_match.group(1)
# Remove the parameters from the URL
clean_url = re.sub(r'&key_id=[^&]*', '', url)
clean_url = re.sub(r'&key=[^&]*', '', clean_url)
logger.info(f"Extracted DRM parameters from URL: key_id={key_id}, key={key}")
logger.info(f"Cleaned URL: '{url}' -> '{clean_url}'")
return clean_url, key_id, key
def _check_and_redirect_dlhd_stream(request: Request, destination: str) -> RedirectResponse | None:
"""
Check if destination contains stream-{numero} pattern and redirect to extractor if needed.
Args:
request (Request): The incoming HTTP request.
destination (str): The destination URL to check.
Returns:
RedirectResponse | None: RedirectResponse if redirect is needed, None otherwise.
"""
import re
# Check for stream-{numero} pattern (e.g., stream-1, stream-123, etc.)
if re.search(r'stream-\d+', destination):
from urllib.parse import urlencode
# Build redirect URL to extractor
redirect_params = {
"host": "DLHD",
"redirect_stream": "true",
"d": destination
}
# Preserve api_password if present
if "api_password" in request.query_params:
redirect_params["api_password"] = request.query_params["api_password"]
# Build the redirect URL
base_url = str(request.url_for("extract_url"))
redirect_url = f"{base_url}?{urlencode(redirect_params)}"
return RedirectResponse(url=redirect_url, status_code=302)
return None
@proxy_router.head("/hls/manifest.m3u8")
@proxy_router.head("/hls/manifest.m3u8")
@proxy_router.get("/hls/manifest.m3u8")
async def hls_manifest_proxy(
@@ -40,9 +178,111 @@ async def hls_manifest_proxy(
Returns:
Response: The HTTP response with the processed m3u8 playlist or streamed content.
"""
# Sanitize destination URL to fix common encoding issues
hls_params.destination = sanitize_url(hls_params.destination)
# Check if destination contains stream-{numero} pattern and redirect to extractor
redirect_response = _check_and_redirect_dlhd_stream(request, hls_params.destination)
if redirect_response:
return redirect_response
return await handle_hls_stream_proxy(request, hls_params, proxy_headers)
@proxy_router.get("/hls/segment")
async def hls_segment_proxy(
request: Request,
proxy_headers: Annotated[ProxyRequestHeaders, Depends(get_proxy_headers)],
segment_url: str = Query(..., description="URL of the HLS segment"),
):
"""
Proxy HLS segments with optional pre-buffering support.
Args:
request (Request): The incoming HTTP request.
segment_url (str): URL of the HLS segment to proxy.
proxy_headers (ProxyRequestHeaders): The headers to include in the request.
Returns:
Response: The HTTP response with the segment content.
"""
from mediaflow_proxy.utils.hls_prebuffer import hls_prebuffer
from mediaflow_proxy.configs import settings
# Sanitize segment URL to fix common encoding issues
segment_url = sanitize_url(segment_url)
# Extract headers for pre-buffering
headers = {}
for key, value in request.query_params.items():
if key.startswith("h_"):
headers[key[2:]] = value
# Try to get segment from pre-buffer cache first
if settings.enable_hls_prebuffer:
cached_segment = await hls_prebuffer.get_segment(segment_url, headers)
if cached_segment:
return Response(
content=cached_segment,
media_type="video/mp2t",
headers={
"Content-Type": "video/mp2t",
"Cache-Control": "public, max-age=3600",
"Access-Control-Allow-Origin": "*"
}
)
# Fallback to direct streaming if not in cache
return await handle_stream_request("GET", segment_url, proxy_headers)
@proxy_router.get("/dash/segment")
async def dash_segment_proxy(
request: Request,
proxy_headers: Annotated[ProxyRequestHeaders, Depends(get_proxy_headers)],
segment_url: str = Query(..., description="URL of the DASH segment"),
):
"""
Proxy DASH segments with optional pre-buffering support.
Args:
request (Request): The incoming HTTP request.
segment_url (str): URL of the DASH segment to proxy.
proxy_headers (ProxyRequestHeaders): The headers to include in the request.
Returns:
Response: The HTTP response with the segment content.
"""
from mediaflow_proxy.utils.dash_prebuffer import dash_prebuffer
from mediaflow_proxy.configs import settings
# Sanitize segment URL to fix common encoding issues
segment_url = sanitize_url(segment_url)
# Extract headers for pre-buffering
headers = {}
for key, value in request.query_params.items():
if key.startswith("h_"):
headers[key[2:]] = value
# Try to get segment from pre-buffer cache first
if settings.enable_dash_prebuffer:
cached_segment = await dash_prebuffer.get_segment(segment_url, headers)
if cached_segment:
return Response(
content=cached_segment,
media_type="video/mp4",
headers={
"Content-Type": "video/mp4",
"Cache-Control": "public, max-age=3600",
"Access-Control-Allow-Origin": "*"
}
)
# Fallback to direct streaming if not in cache
return await handle_stream_request("GET", segment_url, proxy_headers)
@proxy_router.head("/stream")
@proxy_router.get("/stream")
@proxy_router.head("/stream/{filename:path}")
@@ -65,6 +305,14 @@ async def proxy_stream_endpoint(
Returns:
Response: The HTTP response with the streamed content.
"""
# Sanitize destination URL to fix common encoding issues
destination = sanitize_url(destination)
# Check if destination contains stream-{numero} pattern and redirect to extractor
redirect_response = _check_and_redirect_dlhd_stream(request, destination)
if redirect_response:
return redirect_response
content_range = proxy_headers.request.get("range", "bytes=0-")
if "nan" in content_range.casefold():
# Handle invalid range requests "bytes=NaN-NaN"
@@ -103,6 +351,21 @@ async def mpd_manifest_proxy(
Returns:
Response: The HTTP response with the HLS manifest.
"""
# Extract DRM parameters from destination URL if they are incorrectly appended
clean_url, extracted_key_id, extracted_key = extract_drm_params_from_url(manifest_params.destination)
# Update the destination with the cleaned URL
manifest_params.destination = clean_url
# Use extracted parameters if they exist and the manifest params don't already have them
if extracted_key_id and not manifest_params.key_id:
manifest_params.key_id = extracted_key_id
if extracted_key and not manifest_params.key:
manifest_params.key = extracted_key
# Sanitize destination URL to fix common encoding issues
manifest_params.destination = sanitize_url(manifest_params.destination)
return await get_manifest(request, manifest_params, proxy_headers)
@@ -123,6 +386,21 @@ async def playlist_endpoint(
Returns:
Response: The HTTP response with the HLS playlist.
"""
# Extract DRM parameters from destination URL if they are incorrectly appended
clean_url, extracted_key_id, extracted_key = extract_drm_params_from_url(playlist_params.destination)
# Update the destination with the cleaned URL
playlist_params.destination = clean_url
# Use extracted parameters if they exist and the playlist params don't already have them
if extracted_key_id and not playlist_params.key_id:
playlist_params.key_id = extracted_key_id
if extracted_key and not playlist_params.key:
playlist_params.key = extracted_key
# Sanitize destination URL to fix common encoding issues
playlist_params.destination = sanitize_url(playlist_params.destination)
return await get_playlist(request, playlist_params, proxy_headers)
@@ -152,4 +430,4 @@ async def get_mediaflow_proxy_public_ip():
Returns:
Response: The HTTP response with the public IP address in the form of a JSON object. {"ip": "xxx.xxx.xxx.xxx"}
"""
return await get_public_ip()
return await get_public_ip()

View File

@@ -25,6 +25,9 @@ class GenerateUrlRequest(BaseModel):
)
ip: Optional[IPvAnyAddress] = Field(None, description="The IP address to restrict the URL to.")
filename: Optional[str] = Field(None, description="Filename to be preserved for media players like Infuse.")
base64_encode_destination: Optional[bool] = Field(
False, description="Whether to encode the destination URL in base64 format before processing."
)
class MultiUrlRequestItem(BaseModel):

View File

@@ -0,0 +1,132 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>MFP Playlist Builder</title>
<style>
body { font-family: Arial, sans-serif; max-width: 1200px; margin: 0 auto; padding: 20px; background-color: #f5f5f5; }
.container { background: white; padding: 30px; border-radius: 10px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); }
h1 { color: #333; text-align: center; margin-bottom: 30px; }
h2 { color: #2c5aa0; border-bottom: 2px solid #2c5aa0; padding-bottom: 5px; text-align: left; margin-top: 30px; }
.form-group { margin-bottom: 15px; }
label { display: block; margin-bottom: 5px; font-weight: bold; color: #555; }
input[type="text"], input[type="url"] { width: 100%; padding: 10px; border: 1px solid #ccc; border-radius: 4px; box-sizing: border-box; }
.btn { display: inline-block; padding: 10px 20px; background: #2c5aa0; color: white; text-decoration: none; border-radius: 5px; margin: 5px; cursor: pointer; border: none; font-size: 16px; }
.btn:hover { background: #1e3d6f; }
.btn-add { background-color: #28a745; }
.btn-remove { background-color: #dc3545; padding: 5px 10px; font-size: 12px; }
.playlist-entry { background: #f8f9fa; padding: 20px; border-radius: 5px; margin-bottom: 15px; border-left: 4px solid #17a2b8; position: relative; }
.output-area { margin-top: 20px; }
#generated-url { background: #e9ecef; padding: 10px; border: 1px solid #ced4da; border-radius: 4px; font-family: 'Courier New', monospace; word-break: break-all; min-height: 50px; white-space: pre-wrap; }
</style>
</head>
<body>
<div class="container">
<h1>🔗 MFP Playlist Builder</h1>
<div class="form-group">
<label for="server-address">MFP Server Address</label>
<input type="text" id="server-address" placeholder="Current server address" value="" readonly style="background-color: #e9ecef;">
</div>
<div class="form-group">
<label for="api-password">MFP API Password</label>
<input type="text" id="api-password" placeholder="API password for MFP">
</div>
<h2>Playlists to Merge</h2>
<div id="playlist-container">
<!-- Playlists will be added here dynamically -->
</div>
<button type="button" class="btn btn-add" onclick="addPlaylistEntry()">Add Playlist</button>
<hr style="margin: 20px 0;">
<button type="button" class="btn" onclick="generateUrl()">Generate URL</button>
<div class="output-area">
<label for="generated-url">Generated URL</label>
<div id="generated-url">The URL will appear here...</div>
<button type="button" class="btn" onclick="copyUrl()">Copy URL</button>
</div>
</div>
<!-- Template for a single playlist -->
<template id="playlist-template">
<div class="playlist-entry">
<button type="button" class="btn btn-remove" style="position: absolute; top: 10px; right: 10px;" onclick="this.parentElement.remove()">Remove</button>
<div class="form-group">
<label>M3U Playlist URL</label>
<input type="url" class="playlist-url" placeholder="Ex: http://provider.com/playlist.m3u">
</div>
</div>
</template>
<script>
document.addEventListener('DOMContentLoaded', function() {
// Set the default server address
document.getElementById('server-address').value = window.location.origin;
// Add a default playlist on startup
addPlaylistEntry();
});
function addPlaylistEntry() {
const template = document.getElementById('playlist-template').content.cloneNode(true);
document.getElementById('playlist-container').appendChild(template);
}
function generateUrl() {
let serverAddress = document.getElementById('server-address').value.trim();
if (serverAddress.endsWith('/')) {
serverAddress = serverAddress.slice(0, -1);
}
if (!serverAddress) {
alert('Server address not available. Please reload the page.');
return;
}
const apiPassword = document.getElementById('api-password').value.trim();
const entries = document.querySelectorAll('.playlist-entry');
const definitions = [];
// Single loop for URL collection and validation
for (const entry of entries) {
const playlistUrl = entry.querySelector('.playlist-url').value.trim();
if (playlistUrl) {
if (playlistUrl.startsWith('http://') || playlistUrl.startsWith('https://')) {
definitions.push(playlistUrl);
} else {
alert('Invalid URL: ' + playlistUrl + '. URLs must start with http:// or https://');
return;
}
}
}
if (definitions.length === 0) {
document.getElementById('generated-url').textContent = 'No valid playlist entered.';
return;
}
let finalUrl = serverAddress + '/playlist/playlist?d=' + definitions.join(';');
if (apiPassword) {
finalUrl += '&api_password=' + encodeURIComponent(apiPassword);
}
document.getElementById('generated-url').textContent = finalUrl;
}
function copyUrl() {
const urlText = document.getElementById('generated-url').textContent;
if (urlText && !urlText.startsWith('The URL') && !urlText.startsWith('No valid')) {
navigator.clipboard.writeText(urlText).then(() => {
alert('URL copied to clipboard!');
}).catch(err => {
alert('Error copying: ' + err);
});
} else {
alert('No URL to copy.');
}
}
</script>
</body>
</html>

View File

@@ -0,0 +1,123 @@
import base64
import logging
from typing import Optional
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
def is_base64_url(url: str) -> bool:
"""
Check if a URL appears to be base64 encoded.
Args:
url (str): The URL to check.
Returns:
bool: True if the URL appears to be base64 encoded, False otherwise.
"""
# Check if the URL doesn't start with http/https and contains base64-like characters
if url.startswith(('http://', 'https://', 'ftp://', 'ftps://')):
return False
# Base64 URLs typically contain only alphanumeric characters, +, /, and =
# and don't contain typical URL characters like ://
base64_chars = set('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=')
url_chars = set(url)
# If the URL contains characters not in base64 charset, it's likely not base64
if not url_chars.issubset(base64_chars):
return False
# Additional heuristic: base64 strings are typically longer and don't contain common URL patterns
if len(url) < 10: # Too short to be a meaningful base64 encoded URL
return False
return True
def decode_base64_url(encoded_url: str) -> Optional[str]:
"""
Decode a base64 encoded URL.
Args:
encoded_url (str): The base64 encoded URL string.
Returns:
Optional[str]: The decoded URL if successful, None if decoding fails.
"""
try:
# Handle URL-safe base64 encoding (replace - with + and _ with /)
url_safe_encoded = encoded_url.replace('-', '+').replace('_', '/')
# Add padding if necessary
missing_padding = len(url_safe_encoded) % 4
if missing_padding:
url_safe_encoded += '=' * (4 - missing_padding)
# Decode the base64 string
decoded_bytes = base64.b64decode(url_safe_encoded)
decoded_url = decoded_bytes.decode('utf-8')
# Validate that the decoded string is a valid URL
parsed = urlparse(decoded_url)
if parsed.scheme and parsed.netloc:
logger.info(f"Successfully decoded base64 URL: {encoded_url[:50]}... -> {decoded_url}")
return decoded_url
else:
logger.warning(f"Decoded string is not a valid URL: {decoded_url}")
return None
except (base64.binascii.Error, UnicodeDecodeError, ValueError) as e:
logger.debug(f"Failed to decode base64 URL '{encoded_url[:50]}...': {e}")
return None
def encode_url_to_base64(url: str, url_safe: bool = True) -> str:
"""
Encode a URL to base64.
Args:
url (str): The URL to encode.
url_safe (bool): Whether to use URL-safe base64 encoding (default: True).
Returns:
str: The base64 encoded URL.
"""
try:
url_bytes = url.encode('utf-8')
if url_safe:
# Use URL-safe base64 encoding (replace + with - and / with _)
encoded = base64.urlsafe_b64encode(url_bytes).decode('utf-8')
# Remove padding for cleaner URLs
encoded = encoded.rstrip('=')
else:
encoded = base64.b64encode(url_bytes).decode('utf-8')
logger.debug(f"Encoded URL to base64: {url} -> {encoded}")
return encoded
except Exception as e:
logger.error(f"Failed to encode URL to base64: {e}")
raise
def process_potential_base64_url(url: str) -> str:
"""
Process a URL that might be base64 encoded. If it's base64 encoded, decode it.
Otherwise, return the original URL.
Args:
url (str): The URL to process.
Returns:
str: The processed URL (decoded if it was base64, original otherwise).
"""
if is_base64_url(url):
decoded_url = decode_base64_url(url)
if decoded_url:
return decoded_url
else:
logger.warning(f"URL appears to be base64 but failed to decode: {url[:50]}...")
return url

View File

@@ -0,0 +1,373 @@
import logging
import psutil
from typing import Dict, Optional, List
from urllib.parse import urljoin
import xmltodict
from mediaflow_proxy.utils.http_utils import create_httpx_client
from mediaflow_proxy.configs import settings
logger = logging.getLogger(__name__)
class DASHPreBuffer:
"""
Pre-buffer system for DASH streams to reduce latency and improve streaming performance.
"""
def __init__(self, max_cache_size: Optional[int] = None, prebuffer_segments: Optional[int] = None):
"""
Initialize the DASH pre-buffer system.
Args:
max_cache_size (int): Maximum number of segments to cache (uses config if None)
prebuffer_segments (int): Number of segments to pre-buffer ahead (uses config if None)
"""
self.max_cache_size = max_cache_size or settings.dash_prebuffer_cache_size
self.prebuffer_segments = prebuffer_segments or settings.dash_prebuffer_segments
self.max_memory_percent = settings.dash_prebuffer_max_memory_percent
self.emergency_threshold = settings.dash_prebuffer_emergency_threshold
# Cache for different types of DASH content
self.segment_cache: Dict[str, bytes] = {}
self.init_segment_cache: Dict[str, bytes] = {}
self.manifest_cache: Dict[str, dict] = {}
# Track segment URLs for each adaptation set
self.adaptation_segments: Dict[str, List[str]] = {}
self.client = create_httpx_client()
def _get_memory_usage_percent(self) -> float:
"""
Get current memory usage percentage.
Returns:
float: Memory usage percentage
"""
try:
memory = psutil.virtual_memory()
return memory.percent
except Exception as e:
logger.warning(f"Failed to get memory usage: {e}")
return 0.0
def _check_memory_threshold(self) -> bool:
"""
Check if memory usage exceeds the emergency threshold.
Returns:
bool: True if emergency cleanup is needed
"""
memory_percent = self._get_memory_usage_percent()
return memory_percent > self.emergency_threshold
def _emergency_cache_cleanup(self) -> None:
"""
Perform emergency cache cleanup when memory usage is high.
"""
if self._check_memory_threshold():
logger.warning("Emergency DASH cache cleanup triggered due to high memory usage")
# Clear 50% of segment cache
segment_cache_size = len(self.segment_cache)
segment_keys_to_remove = list(self.segment_cache.keys())[:segment_cache_size // 2]
for key in segment_keys_to_remove:
del self.segment_cache[key]
# Clear 50% of init segment cache
init_cache_size = len(self.init_segment_cache)
init_keys_to_remove = list(self.init_segment_cache.keys())[:init_cache_size // 2]
for key in init_keys_to_remove:
del self.init_segment_cache[key]
logger.info(f"Emergency cleanup removed {len(segment_keys_to_remove)} segments and {len(init_keys_to_remove)} init segments from cache")
async def prebuffer_dash_manifest(self, mpd_url: str, headers: Dict[str, str]) -> None:
"""
Pre-buffer segments from a DASH manifest.
Args:
mpd_url (str): URL of the DASH manifest
headers (Dict[str, str]): Headers to use for requests
"""
try:
# Download and parse MPD manifest
response = await self.client.get(mpd_url, headers=headers)
response.raise_for_status()
mpd_content = response.text
# Parse MPD XML
mpd_dict = xmltodict.parse(mpd_content)
# Store manifest in cache
self.manifest_cache[mpd_url] = mpd_dict
# Extract initialization segments and first few segments
await self._extract_and_prebuffer_segments(mpd_dict, mpd_url, headers)
logger.info(f"Pre-buffered DASH manifest: {mpd_url}")
except Exception as e:
logger.warning(f"Failed to pre-buffer DASH manifest {mpd_url}: {e}")
async def _extract_and_prebuffer_segments(self, mpd_dict: dict, base_url: str, headers: Dict[str, str]) -> None:
"""
Extract and pre-buffer segments from MPD manifest.
Args:
mpd_dict (dict): Parsed MPD manifest
base_url (str): Base URL for resolving relative URLs
headers (Dict[str, str]): Headers to use for requests
"""
try:
# Extract Period and AdaptationSet information
mpd = mpd_dict.get('MPD', {})
periods = mpd.get('Period', [])
if not isinstance(periods, list):
periods = [periods]
for period in periods:
adaptation_sets = period.get('AdaptationSet', [])
if not isinstance(adaptation_sets, list):
adaptation_sets = [adaptation_sets]
for adaptation_set in adaptation_sets:
# Extract initialization segment
init_segment = adaptation_set.get('SegmentTemplate', {}).get('@initialization')
if init_segment:
init_url = urljoin(base_url, init_segment)
await self._download_init_segment(init_url, headers)
# Extract segment template
segment_template = adaptation_set.get('SegmentTemplate', {})
if segment_template:
await self._prebuffer_template_segments(segment_template, base_url, headers)
# Extract segment list
segment_list = adaptation_set.get('SegmentList', {})
if segment_list:
await self._prebuffer_list_segments(segment_list, base_url, headers)
except Exception as e:
logger.warning(f"Failed to extract segments from MPD: {e}")
async def _download_init_segment(self, init_url: str, headers: Dict[str, str]) -> None:
"""
Download and cache initialization segment.
Args:
init_url (str): URL of the initialization segment
headers (Dict[str, str]): Headers to use for request
"""
try:
# Check memory usage before downloading
memory_percent = self._get_memory_usage_percent()
if memory_percent > self.max_memory_percent:
logger.warning(f"Memory usage {memory_percent}% exceeds limit {self.max_memory_percent}%, skipping init segment download")
return
response = await self.client.get(init_url, headers=headers)
response.raise_for_status()
# Cache the init segment
self.init_segment_cache[init_url] = response.content
# Check for emergency cleanup
if self._check_memory_threshold():
self._emergency_cache_cleanup()
logger.debug(f"Cached init segment: {init_url}")
except Exception as e:
logger.warning(f"Failed to download init segment {init_url}: {e}")
async def _prebuffer_template_segments(self, segment_template: dict, base_url: str, headers: Dict[str, str]) -> None:
"""
Pre-buffer segments using segment template.
Args:
segment_template (dict): Segment template from MPD
base_url (str): Base URL for resolving relative URLs
headers (Dict[str, str]): Headers to use for requests
"""
try:
media_template = segment_template.get('@media')
if not media_template:
return
# Extract template parameters
start_number = int(segment_template.get('@startNumber', 1))
duration = float(segment_template.get('@duration', 0))
timescale = float(segment_template.get('@timescale', 1))
# Pre-buffer first few segments
for i in range(self.prebuffer_segments):
segment_number = start_number + i
segment_url = media_template.replace('$Number$', str(segment_number))
full_url = urljoin(base_url, segment_url)
await self._download_segment(full_url, headers)
except Exception as e:
logger.warning(f"Failed to pre-buffer template segments: {e}")
async def _prebuffer_list_segments(self, segment_list: dict, base_url: str, headers: Dict[str, str]) -> None:
"""
Pre-buffer segments from segment list.
Args:
segment_list (dict): Segment list from MPD
base_url (str): Base URL for resolving relative URLs
headers (Dict[str, str]): Headers to use for requests
"""
try:
segments = segment_list.get('SegmentURL', [])
if not isinstance(segments, list):
segments = [segments]
# Pre-buffer first few segments
for segment in segments[:self.prebuffer_segments]:
segment_url = segment.get('@src')
if segment_url:
full_url = urljoin(base_url, segment_url)
await self._download_segment(full_url, headers)
except Exception as e:
logger.warning(f"Failed to pre-buffer list segments: {e}")
async def _download_segment(self, segment_url: str, headers: Dict[str, str]) -> None:
"""
Download a single segment and cache it.
Args:
segment_url (str): URL of the segment to download
headers (Dict[str, str]): Headers to use for request
"""
try:
# Check memory usage before downloading
memory_percent = self._get_memory_usage_percent()
if memory_percent > self.max_memory_percent:
logger.warning(f"Memory usage {memory_percent}% exceeds limit {self.max_memory_percent}%, skipping segment download")
return
response = await self.client.get(segment_url, headers=headers)
response.raise_for_status()
# Cache the segment
self.segment_cache[segment_url] = response.content
# Check for emergency cleanup
if self._check_memory_threshold():
self._emergency_cache_cleanup()
# Maintain cache size
elif len(self.segment_cache) > self.max_cache_size:
# Remove oldest entries (simple FIFO)
oldest_key = next(iter(self.segment_cache))
del self.segment_cache[oldest_key]
logger.debug(f"Cached DASH segment: {segment_url}")
except Exception as e:
logger.warning(f"Failed to download DASH segment {segment_url}: {e}")
async def get_segment(self, segment_url: str, headers: Dict[str, str]) -> Optional[bytes]:
"""
Get a segment from cache or download it.
Args:
segment_url (str): URL of the segment
headers (Dict[str, str]): Headers to use for request
Returns:
Optional[bytes]: Cached segment data or None if not available
"""
# Check segment cache first
if segment_url in self.segment_cache:
logger.debug(f"DASH cache hit for segment: {segment_url}")
return self.segment_cache[segment_url]
# Check init segment cache
if segment_url in self.init_segment_cache:
logger.debug(f"DASH cache hit for init segment: {segment_url}")
return self.init_segment_cache[segment_url]
# Check memory usage before downloading
memory_percent = self._get_memory_usage_percent()
if memory_percent > self.max_memory_percent:
logger.warning(f"Memory usage {memory_percent}% exceeds limit {self.max_memory_percent}%, skipping download")
return None
# Download if not in cache
try:
response = await self.client.get(segment_url, headers=headers)
response.raise_for_status()
segment_data = response.content
# Determine if it's an init segment or regular segment
if 'init' in segment_url.lower() or segment_url.endswith('.mp4'):
self.init_segment_cache[segment_url] = segment_data
else:
self.segment_cache[segment_url] = segment_data
# Check for emergency cleanup
if self._check_memory_threshold():
self._emergency_cache_cleanup()
# Maintain cache size
elif len(self.segment_cache) > self.max_cache_size:
oldest_key = next(iter(self.segment_cache))
del self.segment_cache[oldest_key]
logger.debug(f"Downloaded and cached DASH segment: {segment_url}")
return segment_data
except Exception as e:
logger.warning(f"Failed to get DASH segment {segment_url}: {e}")
return None
async def get_manifest(self, mpd_url: str, headers: Dict[str, str]) -> Optional[dict]:
"""
Get MPD manifest from cache or download it.
Args:
mpd_url (str): URL of the MPD manifest
headers (Dict[str, str]): Headers to use for request
Returns:
Optional[dict]: Cached manifest data or None if not available
"""
# Check cache first
if mpd_url in self.manifest_cache:
logger.debug(f"DASH cache hit for manifest: {mpd_url}")
return self.manifest_cache[mpd_url]
# Download if not in cache
try:
response = await self.client.get(mpd_url, headers=headers)
response.raise_for_status()
mpd_content = response.text
mpd_dict = xmltodict.parse(mpd_content)
# Cache the manifest
self.manifest_cache[mpd_url] = mpd_dict
logger.debug(f"Downloaded and cached DASH manifest: {mpd_url}")
return mpd_dict
except Exception as e:
logger.warning(f"Failed to get DASH manifest {mpd_url}: {e}")
return None
def clear_cache(self) -> None:
"""Clear the DASH cache."""
self.segment_cache.clear()
self.init_segment_cache.clear()
self.manifest_cache.clear()
self.adaptation_segments.clear()
logger.info("DASH pre-buffer cache cleared")
async def close(self) -> None:
"""Close the pre-buffer system."""
await self.client.aclose()
# Global DASH pre-buffer instance
dash_prebuffer = DASHPreBuffer()

View File

@@ -0,0 +1,321 @@
import asyncio
import logging
import psutil
from typing import Dict, Optional, List
from urllib.parse import urlparse
import httpx
from mediaflow_proxy.utils.http_utils import create_httpx_client
from mediaflow_proxy.configs import settings
logger = logging.getLogger(__name__)
class HLSPreBuffer:
"""
Pre-buffer system for HLS streams to reduce latency and improve streaming performance.
"""
def __init__(self, max_cache_size: Optional[int] = None, prebuffer_segments: Optional[int] = None):
"""
Initialize the HLS pre-buffer system.
Args:
max_cache_size (int): Maximum number of segments to cache (uses config if None)
prebuffer_segments (int): Number of segments to pre-buffer ahead (uses config if None)
"""
self.max_cache_size = max_cache_size or settings.hls_prebuffer_cache_size
self.prebuffer_segments = prebuffer_segments or settings.hls_prebuffer_segments
self.max_memory_percent = settings.hls_prebuffer_max_memory_percent
self.emergency_threshold = settings.hls_prebuffer_emergency_threshold
self.segment_cache: Dict[str, bytes] = {}
self.segment_urls: Dict[str, List[str]] = {}
self.client = create_httpx_client()
async def prebuffer_playlist(self, playlist_url: str, headers: Dict[str, str]) -> None:
"""
Pre-buffer segments from an HLS playlist.
Args:
playlist_url (str): URL of the HLS playlist
headers (Dict[str, str]): Headers to use for requests
"""
try:
logger.debug(f"Starting pre-buffer for playlist: {playlist_url}")
# Download and parse playlist
response = await self.client.get(playlist_url, headers=headers)
response.raise_for_status()
playlist_content = response.text
# Check if this is a master playlist (contains variants)
if "#EXT-X-STREAM-INF" in playlist_content:
logger.debug(f"Master playlist detected, finding first variant")
# Extract variant URLs
variant_urls = self._extract_variant_urls(playlist_content, playlist_url)
if variant_urls:
# Pre-buffer the first variant
first_variant_url = variant_urls[0]
logger.debug(f"Pre-buffering first variant: {first_variant_url}")
await self.prebuffer_playlist(first_variant_url, headers)
else:
logger.warning("No variants found in master playlist")
return
# Extract segment URLs
segment_urls = self._extract_segment_urls(playlist_content, playlist_url)
# Store segment URLs for this playlist
self.segment_urls[playlist_url] = segment_urls
# Pre-buffer first few segments
await self._prebuffer_segments(segment_urls[:self.prebuffer_segments], headers)
logger.info(f"Pre-buffered {min(self.prebuffer_segments, len(segment_urls))} segments for {playlist_url}")
except Exception as e:
logger.warning(f"Failed to pre-buffer playlist {playlist_url}: {e}")
def _extract_segment_urls(self, playlist_content: str, base_url: str) -> List[str]:
"""
Extract segment URLs from HLS playlist content.
Args:
playlist_content (str): Content of the HLS playlist
base_url (str): Base URL for resolving relative URLs
Returns:
List[str]: List of segment URLs
"""
segment_urls = []
lines = playlist_content.split('\n')
logger.debug(f"Analyzing playlist with {len(lines)} lines")
for line in lines:
line = line.strip()
if line and not line.startswith('#'):
# Check if line contains a URL (http/https) or is a relative path
if 'http://' in line or 'https://' in line:
segment_urls.append(line)
logger.debug(f"Found absolute URL: {line}")
elif line and not line.startswith('#'):
# This might be a relative path to a segment
parsed_base = urlparse(base_url)
# Ensure proper path joining
if line.startswith('/'):
segment_url = f"{parsed_base.scheme}://{parsed_base.netloc}{line}"
else:
# Get the directory path from base_url
base_path = parsed_base.path.rsplit('/', 1)[0] if '/' in parsed_base.path else ''
segment_url = f"{parsed_base.scheme}://{parsed_base.netloc}{base_path}/{line}"
segment_urls.append(segment_url)
logger.debug(f"Found relative path: {line} -> {segment_url}")
logger.debug(f"Extracted {len(segment_urls)} segment URLs from playlist")
if segment_urls:
logger.debug(f"First segment URL: {segment_urls[0]}")
else:
logger.debug("No segment URLs found in playlist")
# Log first few lines for debugging
for i, line in enumerate(lines[:10]):
logger.debug(f"Line {i}: {line}")
return segment_urls
def _extract_variant_urls(self, playlist_content: str, base_url: str) -> List[str]:
"""
Extract variant URLs from master playlist content.
Args:
playlist_content (str): Content of the master playlist
base_url (str): Base URL for resolving relative URLs
Returns:
List[str]: List of variant URLs
"""
variant_urls = []
lines = playlist_content.split('\n')
for line in lines:
line = line.strip()
if line and not line.startswith('#') and ('http://' in line or 'https://' in line):
# Resolve relative URLs
if line.startswith('http'):
variant_urls.append(line)
else:
# Join with base URL for relative paths
parsed_base = urlparse(base_url)
variant_url = f"{parsed_base.scheme}://{parsed_base.netloc}{line}"
variant_urls.append(variant_url)
logger.debug(f"Extracted {len(variant_urls)} variant URLs from master playlist")
if variant_urls:
logger.debug(f"First variant URL: {variant_urls[0]}")
return variant_urls
async def _prebuffer_segments(self, segment_urls: List[str], headers: Dict[str, str]) -> None:
"""
Pre-buffer specific segments.
Args:
segment_urls (List[str]): List of segment URLs to pre-buffer
headers (Dict[str, str]): Headers to use for requests
"""
tasks = []
for url in segment_urls:
if url not in self.segment_cache:
tasks.append(self._download_segment(url, headers))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
def _get_memory_usage_percent(self) -> float:
"""
Get current memory usage percentage.
Returns:
float: Memory usage percentage
"""
try:
memory = psutil.virtual_memory()
return memory.percent
except Exception as e:
logger.warning(f"Failed to get memory usage: {e}")
return 0.0
def _check_memory_threshold(self) -> bool:
"""
Check if memory usage exceeds the emergency threshold.
Returns:
bool: True if emergency cleanup is needed
"""
memory_percent = self._get_memory_usage_percent()
return memory_percent > self.emergency_threshold
def _emergency_cache_cleanup(self) -> None:
"""
Perform emergency cache cleanup when memory usage is high.
"""
if self._check_memory_threshold():
logger.warning("Emergency cache cleanup triggered due to high memory usage")
# Clear 50% of cache
cache_size = len(self.segment_cache)
keys_to_remove = list(self.segment_cache.keys())[:cache_size // 2]
for key in keys_to_remove:
del self.segment_cache[key]
logger.info(f"Emergency cleanup removed {len(keys_to_remove)} segments from cache")
async def _download_segment(self, segment_url: str, headers: Dict[str, str]) -> None:
"""
Download a single segment and cache it.
Args:
segment_url (str): URL of the segment to download
headers (Dict[str, str]): Headers to use for request
"""
try:
# Check memory usage before downloading
memory_percent = self._get_memory_usage_percent()
if memory_percent > self.max_memory_percent:
logger.warning(f"Memory usage {memory_percent}% exceeds limit {self.max_memory_percent}%, skipping download")
return
response = await self.client.get(segment_url, headers=headers)
response.raise_for_status()
# Cache the segment
self.segment_cache[segment_url] = response.content
# Check for emergency cleanup
if self._check_memory_threshold():
self._emergency_cache_cleanup()
# Maintain cache size
elif len(self.segment_cache) > self.max_cache_size:
# Remove oldest entries (simple FIFO)
oldest_key = next(iter(self.segment_cache))
del self.segment_cache[oldest_key]
logger.debug(f"Cached segment: {segment_url}")
except Exception as e:
logger.warning(f"Failed to download segment {segment_url}: {e}")
async def get_segment(self, segment_url: str, headers: Dict[str, str]) -> Optional[bytes]:
"""
Get a segment from cache or download it.
Args:
segment_url (str): URL of the segment
headers (Dict[str, str]): Headers to use for request
Returns:
Optional[bytes]: Cached segment data or None if not available
"""
# Check cache first
if segment_url in self.segment_cache:
logger.debug(f"Cache hit for segment: {segment_url}")
return self.segment_cache[segment_url]
# Check memory usage before downloading
memory_percent = self._get_memory_usage_percent()
if memory_percent > self.max_memory_percent:
logger.warning(f"Memory usage {memory_percent}% exceeds limit {self.max_memory_percent}%, skipping download")
return None
# Download if not in cache
try:
response = await self.client.get(segment_url, headers=headers)
response.raise_for_status()
segment_data = response.content
# Cache the segment
self.segment_cache[segment_url] = segment_data
# Check for emergency cleanup
if self._check_memory_threshold():
self._emergency_cache_cleanup()
# Maintain cache size
elif len(self.segment_cache) > self.max_cache_size:
oldest_key = next(iter(self.segment_cache))
del self.segment_cache[oldest_key]
logger.debug(f"Downloaded and cached segment: {segment_url}")
return segment_data
except Exception as e:
logger.warning(f"Failed to get segment {segment_url}: {e}")
return None
async def prebuffer_next_segments(self, playlist_url: str, current_segment_index: int, headers: Dict[str, str]) -> None:
"""
Pre-buffer next segments based on current playback position.
Args:
playlist_url (str): URL of the playlist
current_segment_index (int): Index of current segment
headers (Dict[str, str]): Headers to use for requests
"""
if playlist_url not in self.segment_urls:
return
segment_urls = self.segment_urls[playlist_url]
next_segments = segment_urls[current_segment_index + 1:current_segment_index + 1 + self.prebuffer_segments]
if next_segments:
await self._prebuffer_segments(next_segments, headers)
def clear_cache(self) -> None:
"""Clear the segment cache."""
self.segment_cache.clear()
self.segment_urls.clear()
logger.info("HLS pre-buffer cache cleared")
async def close(self) -> None:
"""Close the pre-buffer system."""
await self.client.aclose()
# Global pre-buffer instance
hls_prebuffer = HLSPreBuffer()

View File

@@ -1,3 +1,4 @@
import asyncio
import codecs
import re
from typing import AsyncGenerator
@@ -6,6 +7,7 @@ from urllib import parse
from mediaflow_proxy.configs import settings
from mediaflow_proxy.utils.crypto_utils import encryption_handler
from mediaflow_proxy.utils.http_utils import encode_mediaflow_proxy_url, encode_stremio_proxy_url, get_original_scheme
from mediaflow_proxy.utils.hls_prebuffer import hls_prebuffer
class M3U8Processor:
@@ -24,6 +26,7 @@ class M3U8Processor:
self.mediaflow_proxy_url = str(
request.url_for("hls_manifest_proxy").replace(scheme=get_original_scheme(request))
)
self.playlist_url = None # Will be set when processing starts
async def process_m3u8(self, content: str, base_url: str) -> str:
"""
@@ -36,6 +39,9 @@ class M3U8Processor:
Returns:
str: The processed m3u8 content.
"""
# Store the playlist URL for prebuffering
self.playlist_url = base_url
lines = content.splitlines()
processed_lines = []
for line in lines:
@@ -45,6 +51,23 @@ class M3U8Processor:
processed_lines.append(await self.proxy_content_url(line, base_url))
else:
processed_lines.append(line)
# Pre-buffer segments if enabled and this is a playlist
if (settings.enable_hls_prebuffer and
"#EXTM3U" in content and
self.playlist_url):
# Extract headers from request for pre-buffering
headers = {}
for key, value in self.request.query_params.items():
if key.startswith("h_"):
headers[key[2:]] = value
# Start pre-buffering in background using the actual playlist URL
asyncio.create_task(
hls_prebuffer.prebuffer_playlist(self.playlist_url, headers)
)
return "\n".join(processed_lines)
async def process_m3u8_streaming(
@@ -52,6 +75,7 @@ class M3U8Processor:
) -> AsyncGenerator[str, None]:
"""
Processes the m3u8 content on-the-fly, yielding processed lines as they are read.
Optimized to avoid accumulating the entire playlist content in memory.
Args:
content_iterator: An async iterator that yields chunks of the m3u8 content.
@@ -60,8 +84,13 @@ class M3U8Processor:
Yields:
str: Processed lines of the m3u8 content.
"""
# Store the playlist URL for prebuffering
self.playlist_url = base_url
buffer = "" # String buffer for decoded content
decoder = codecs.getincrementaldecoder("utf-8")(errors="replace")
is_playlist_detected = False
is_prebuffer_started = False
# Process the content chunk by chunk
async for chunk in content_iterator:
@@ -72,6 +101,10 @@ class M3U8Processor:
decoded_chunk = decoder.decode(chunk)
buffer += decoded_chunk
# Check for playlist marker early to avoid accumulating content
if not is_playlist_detected and "#EXTM3U" in buffer:
is_playlist_detected = True
# Process complete lines
lines = buffer.split("\n")
if len(lines) > 1:
@@ -84,6 +117,25 @@ class M3U8Processor:
# Keep the last line in the buffer (it might be incomplete)
buffer = lines[-1]
# Start pre-buffering early once we detect this is a playlist
# This avoids waiting until the entire playlist is processed
if (settings.enable_hls_prebuffer and
is_playlist_detected and
not is_prebuffer_started and
self.playlist_url):
# Extract headers from request for pre-buffering
headers = {}
for key, value in self.request.query_params.items():
if key.startswith("h_"):
headers[key[2:]] = value
# Start pre-buffering in background using the actual playlist URL
asyncio.create_task(
hls_prebuffer.prebuffer_playlist(self.playlist_url, headers)
)
is_prebuffer_started = True
# Process any remaining data in the buffer plus final bytes
final_chunk = decoder.decode(b"", final=True)
if final_chunk:
@@ -209,4 +261,4 @@ class M3U8Processor:
full_url,
query_params=query_params,
encryption_handler=encryption_handler if has_encrypted else None,
)
)