New version

This commit is contained in:
UrloMythus
2025-06-10 22:42:56 +02:00
parent 4b5891457e
commit 1b1458e7f3
58 changed files with 1843 additions and 847 deletions

View File

@@ -14,9 +14,7 @@ from typing import Optional, Union, Any
import aiofiles
import aiofiles.os
from pydantic import ValidationError
from mediaflow_proxy.speedtest.models import SpeedTestTask
from mediaflow_proxy.utils.http_utils import download_file_with_retry, DownloadError
from mediaflow_proxy.utils.mpd_utils import parse_mpd, parse_mpd_dict
@@ -270,12 +268,6 @@ MPD_CACHE = AsyncMemoryCache(
max_memory_size=100 * 1024 * 1024, # 100MB for MPD files
)
SPEEDTEST_CACHE = HybridCache(
cache_dir_name="speedtest_cache",
ttl=3600, # 1 hour
max_memory_size=50 * 1024 * 1024,
)
EXTRACTOR_CACHE = HybridCache(
cache_dir_name="extractor_cache",
ttl=5 * 60, # 5 minutes
@@ -335,27 +327,6 @@ async def get_cached_mpd(
raise error
async def get_cached_speedtest(task_id: str) -> Optional[SpeedTestTask]:
"""Get speed test results from cache."""
cached_data = await SPEEDTEST_CACHE.get(task_id)
if cached_data is not None:
try:
return SpeedTestTask.model_validate_json(cached_data.decode())
except ValidationError as e:
logger.error(f"Error parsing cached speed test data: {e}")
await SPEEDTEST_CACHE.delete(task_id)
return None
async def set_cache_speedtest(task_id: str, task: SpeedTestTask) -> bool:
"""Cache speed test results."""
try:
return await SPEEDTEST_CACHE.set(task_id, task.model_dump_json().encode())
except Exception as e:
logger.error(f"Error caching speed test data: {e}")
return False
async def get_cached_extractor_result(key: str) -> Optional[dict]:
"""Get extractor result from cache."""
cached_data = await EXTRACTOR_CACHE.get(key)

View File

@@ -175,7 +175,9 @@ class Streamer:
logger.warning(f"Remote server closed connection prematurely: {e}")
# If we've received some data, just log the warning and return normally
if self.bytes_transferred > 0:
logger.info(f"Partial content received ({self.bytes_transferred} bytes). Continuing with available data.")
logger.info(
f"Partial content received ({self.bytes_transferred} bytes). Continuing with available data."
)
return
else:
# If we haven't received any data, raise an error
@@ -375,6 +377,70 @@ def encode_mediaflow_proxy_url(
return url
def encode_stremio_proxy_url(
stremio_proxy_url: str,
destination_url: str,
request_headers: typing.Optional[dict] = None,
response_headers: typing.Optional[dict] = None,
) -> str:
"""
Encodes a Stremio proxy URL with destination URL and headers.
Format: http://127.0.0.1:11470/proxy/d=<encoded_origin>&h=<headers>&r=<response_headers>/<path><query>
Args:
stremio_proxy_url (str): The base Stremio proxy URL.
destination_url (str): The destination URL to proxy.
request_headers (dict, optional): Headers to include as query parameters. Defaults to None.
response_headers (dict, optional): Response headers to include as query parameters. Defaults to None.
Returns:
str: The encoded Stremio proxy URL.
"""
# Parse the destination URL to separate origin, path, and query
parsed_dest = parse.urlparse(destination_url)
dest_origin = f"{parsed_dest.scheme}://{parsed_dest.netloc}"
dest_path = parsed_dest.path.lstrip("/")
dest_query = parsed_dest.query
# Prepare query parameters list for proper handling of multiple headers
query_parts = []
# Add destination origin (scheme + netloc only) with proper encoding
query_parts.append(f"d={parse.quote_plus(dest_origin)}")
# Add request headers
if request_headers:
for key, value in request_headers.items():
header_string = f"{key}:{value}"
query_parts.append(f"h={parse.quote_plus(header_string)}")
# Add response headers
if response_headers:
for key, value in response_headers.items():
header_string = f"{key}:{value}"
query_parts.append(f"r={parse.quote_plus(header_string)}")
# Ensure base_url doesn't end with a slash for consistent handling
base_url = stremio_proxy_url.rstrip("/")
# Construct the URL path with query string
query_string = "&".join(query_parts)
# Build the final URL: /proxy/{opts}/{pathname}{search}
url_path = f"/proxy/{query_string}"
# Append the path from destination URL
if dest_path:
url_path = f"{url_path}/{dest_path}"
# Append the query string from destination URL
if dest_query:
url_path = f"{url_path}?{dest_query}"
return f"{base_url}{url_path}"
def get_original_scheme(request: Request) -> str:
"""
Determines the original scheme (http or https) of the request.
@@ -509,7 +575,9 @@ class EnhancedStreamingResponse(Response):
logger.warning(f"Remote protocol error after partial streaming: {e}")
try:
await send({"type": "http.response.body", "body": b"", "more_body": False})
logger.info(f"Response finalized after partial content ({self.actual_content_length} bytes transferred)")
logger.info(
f"Response finalized after partial content ({self.actual_content_length} bytes transferred)"
)
except Exception as close_err:
logger.warning(f"Could not finalize response after remote error: {close_err}")
else:

View File

@@ -3,8 +3,9 @@ import re
from typing import AsyncGenerator
from urllib import parse
from mediaflow_proxy.configs import settings
from mediaflow_proxy.utils.crypto_utils import encryption_handler
from mediaflow_proxy.utils.http_utils import encode_mediaflow_proxy_url, get_original_scheme
from mediaflow_proxy.utils.http_utils import encode_mediaflow_proxy_url, encode_stremio_proxy_url, get_original_scheme
class M3U8Processor:
@@ -39,7 +40,7 @@ class M3U8Processor:
if "URI=" in line:
processed_lines.append(await self.process_key_line(line, base_url))
elif not line.startswith("#") and line.strip():
processed_lines.append(await self.proxy_url(line, base_url))
processed_lines.append(await self.proxy_content_url(line, base_url))
else:
processed_lines.append(line)
return "\n".join(processed_lines)
@@ -104,7 +105,7 @@ class M3U8Processor:
if "URI=" in line:
return await self.process_key_line(line, base_url)
elif not line.startswith("#") and line.strip():
return await self.proxy_url(line, base_url)
return await self.proxy_content_url(line, base_url)
else:
return line
@@ -129,9 +130,9 @@ class M3U8Processor:
line = line.replace(f'URI="{original_uri}"', f'URI="{new_uri}"')
return line
async def proxy_url(self, url: str, base_url: str) -> str:
async def proxy_content_url(self, url: str, base_url: str) -> str:
"""
Proxies a URL, encoding it with the MediaFlow proxy URL.
Proxies a content URL based on the configured routing strategy.
Args:
url (str): The URL to proxy.
@@ -141,6 +142,51 @@ class M3U8Processor:
str: The proxied URL.
"""
full_url = parse.urljoin(base_url, url)
# Determine routing strategy based on configuration
routing_strategy = settings.m3u8_content_routing
# For playlist URLs, always use MediaFlow proxy regardless of strategy
if ".m3u" in full_url:
return await self.proxy_url(full_url, base_url, use_full_url=True)
# Route non-playlist content URLs based on strategy
if routing_strategy == "direct":
# Return the URL directly without any proxying
return full_url
elif routing_strategy == "stremio" and settings.stremio_proxy_url:
# Use Stremio proxy for content URLs
query_params = dict(self.request.query_params)
request_headers = {k[2:]: v for k, v in query_params.items() if k.startswith("h_")}
response_headers = {k[2:]: v for k, v in query_params.items() if k.startswith("r_")}
return encode_stremio_proxy_url(
settings.stremio_proxy_url,
full_url,
request_headers=request_headers if request_headers else None,
response_headers=response_headers if response_headers else None,
)
else:
# Default to MediaFlow proxy (routing_strategy == "mediaflow" or fallback)
return await self.proxy_url(full_url, base_url, use_full_url=True)
async def proxy_url(self, url: str, base_url: str, use_full_url: bool = False) -> str:
"""
Proxies a URL, encoding it with the MediaFlow proxy URL.
Args:
url (str): The URL to proxy.
base_url (str): The base URL to resolve relative URLs.
use_full_url (bool): Whether to use the URL as-is (True) or join with base_url (False).
Returns:
str: The proxied URL.
"""
if use_full_url:
full_url = url
else:
full_url = parse.urljoin(base_url, url)
query_params = dict(self.request.query_params)
has_encrypted = query_params.pop("has_encrypted", False)
# Remove the response headers from the query params to avoid it being added to the consecutive requests

View File

@@ -253,6 +253,16 @@ def parse_representation(
profile["frameRate"] = round(int(frame_rate.split("/")[0]) / int(frame_rate.split("/")[1]), 3)
profile["sar"] = representation.get("@sar", "1:1")
# Extract segment template start number for adaptive sequence calculation
segment_template_data = adaptation.get("SegmentTemplate") or representation.get("SegmentTemplate")
if segment_template_data:
try:
profile["segment_template_start_number"] = int(segment_template_data.get("@startNumber", 1))
except (ValueError, TypeError):
profile["segment_template_start_number"] = 1
else:
profile["segment_template_start_number"] = 1
if parse_segment_profile_id is None or profile["id"] != parse_segment_profile_id:
return profile
@@ -502,6 +512,12 @@ def create_segment_data(segment: Dict, item: dict, profile: dict, source: str, t
"number": segment["number"],
}
# Add time and duration metadata for adaptive sequence calculation
if "time" in segment:
segment_data["time"] = segment["time"]
if "duration" in segment:
segment_data["duration_mpd_timescale"] = segment["duration"]
if "start_time" in segment and "end_time" in segment:
segment_data.update(
{