updated to newest version, dlhd support

This commit is contained in:
UrloMythus
2025-05-18 22:09:07 +02:00
parent 1bdfe198b5
commit 4b5891457e
9 changed files with 486 additions and 24 deletions

View File

@@ -6,6 +6,7 @@ from urllib import parse
from urllib.parse import urlencode
import anyio
import h11
import httpx
import tenacity
from fastapi import Response
@@ -168,6 +169,20 @@ class Streamer:
except httpx.TimeoutException:
logger.warning("Timeout while streaming")
raise DownloadError(409, "Timeout while streaming")
except httpx.RemoteProtocolError as e:
# Special handling for connection closed errors
if "peer closed connection without sending complete message body" in str(e):
logger.warning(f"Remote server closed connection prematurely: {e}")
# If we've received some data, just log the warning and return normally
if self.bytes_transferred > 0:
logger.info(f"Partial content received ({self.bytes_transferred} bytes). Continuing with available data.")
return
else:
# If we haven't received any data, raise an error
raise DownloadError(502, f"Remote server closed connection without sending any data: {e}")
else:
logger.error(f"Protocol error while streaming: {e}")
raise DownloadError(502, f"Protocol error while streaming: {e}")
except GeneratorExit:
logger.info("Streaming session stopped by the user")
except Exception as e:
@@ -432,6 +447,7 @@ class EnhancedStreamingResponse(Response):
self.media_type = self.media_type if media_type is None else media_type
self.background = background
self.init_headers(headers)
self.actual_content_length = 0
@staticmethod
async def listen_for_disconnect(receive: Receive) -> None:
@@ -446,41 +462,109 @@ class EnhancedStreamingResponse(Response):
async def stream_response(self, send: Send) -> None:
try:
# Initialize headers
headers = list(self.raw_headers)
# Set the transfer-encoding to chunked for streamed responses with content-length
# when content-length is present. This ensures we don't hit protocol errors
# if the upstream connection is closed prematurely.
for i, (name, _) in enumerate(headers):
if name.lower() == b"content-length":
# Replace content-length with transfer-encoding: chunked for streaming
headers[i] = (b"transfer-encoding", b"chunked")
headers = [h for h in headers if h[0].lower() != b"content-length"]
logger.debug("Switched from content-length to chunked transfer-encoding for streaming")
break
# Start the response
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
"headers": headers,
}
)
async for chunk in self.body_iterator:
if not isinstance(chunk, (bytes, memoryview)):
chunk = chunk.encode(self.charset)
try:
await send({"type": "http.response.body", "body": chunk, "more_body": True})
except (ConnectionResetError, anyio.BrokenResourceError):
logger.info("Client disconnected during streaming")
return
await send({"type": "http.response.body", "body": b"", "more_body": False})
# Track if we've sent any data
data_sent = False
try:
async for chunk in self.body_iterator:
if not isinstance(chunk, (bytes, memoryview)):
chunk = chunk.encode(self.charset)
try:
await send({"type": "http.response.body", "body": chunk, "more_body": True})
data_sent = True
self.actual_content_length += len(chunk)
except (ConnectionResetError, anyio.BrokenResourceError):
logger.info("Client disconnected during streaming")
return
# Successfully streamed all content
await send({"type": "http.response.body", "body": b"", "more_body": False})
except (httpx.RemoteProtocolError, h11._util.LocalProtocolError) as e:
# Handle connection closed errors
if data_sent:
# We've sent some data to the client, so try to complete the response
logger.warning(f"Remote protocol error after partial streaming: {e}")
try:
await send({"type": "http.response.body", "body": b"", "more_body": False})
logger.info(f"Response finalized after partial content ({self.actual_content_length} bytes transferred)")
except Exception as close_err:
logger.warning(f"Could not finalize response after remote error: {close_err}")
else:
# No data was sent, re-raise the error
logger.error(f"Protocol error before any data was streamed: {e}")
raise
except Exception as e:
logger.exception(f"Error in stream_response: {str(e)}")
if not isinstance(e, (ConnectionResetError, anyio.BrokenResourceError)):
try:
# Try to send an error response if client is still connected
await send(
{
"type": "http.response.start",
"status": 502,
"headers": [(b"content-type", b"text/plain")],
}
)
error_message = f"Streaming error: {str(e)}".encode("utf-8")
await send({"type": "http.response.body", "body": error_message, "more_body": False})
except Exception:
# If we can't send an error response, just log it
pass
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
async with anyio.create_task_group() as task_group:
streaming_completed = False
stream_func = partial(self.stream_response, send)
listen_func = partial(self.listen_for_disconnect, receive)
async def wrap(func: typing.Callable[[], typing.Awaitable[None]]) -> None:
try:
await func()
# If this is the stream_response function and it completes successfully, mark as done
if func == stream_func:
nonlocal streaming_completed
streaming_completed = True
except Exception as e:
if not isinstance(e, anyio.get_cancelled_exc_class()):
if isinstance(e, (httpx.RemoteProtocolError, h11._util.LocalProtocolError)):
# Handle protocol errors more gracefully
logger.warning(f"Protocol error during streaming: {e}")
elif not isinstance(e, anyio.get_cancelled_exc_class()):
logger.exception("Error in streaming task")
raise
# Only re-raise if it's not a protocol error or cancellation
raise
finally:
task_group.cancel_scope.cancel()
# Only cancel the task group if we're in disconnect listener or
# if streaming_completed is True (meaning we finished normally)
if func == listen_func or streaming_completed:
task_group.cancel_scope.cancel()
task_group.start_soon(wrap, partial(self.stream_response, send))
await wrap(partial(self.listen_for_disconnect, receive))
# Start the streaming response in a separate task
task_group.start_soon(wrap, stream_func)
# Listen for disconnect events
await wrap(listen_func)
if self.background is not None:
await self.background()