Updated to newest version

This commit is contained in:
UrloMythus
2025-04-29 18:52:23 +02:00
parent c54be91e39
commit 323ca2d1b6
11 changed files with 358 additions and 237 deletions

View File

@@ -29,11 +29,13 @@ class EncryptionHandler:
iv = get_random_bytes(16)
cipher = AES.new(self.secret_key, AES.MODE_CBC, iv)
encrypted_data = cipher.encrypt(pad(json_data, AES.block_size))
return base64.urlsafe_b64encode(iv + encrypted_data).decode("utf-8")
return base64.urlsafe_b64encode(iv + encrypted_data).decode("utf-8").rstrip("=")
def decrypt_data(self, token: str, client_ip: str) -> dict:
try:
encrypted_data = base64.urlsafe_b64decode(token.encode("utf-8"))
padding_needed = (4 - len(token) % 4) % 4
encrypted_token_b64_padded = token + ("=" * padding_needed)
encrypted_data = base64.urlsafe_b64decode(encrypted_token_b64_padded.encode("utf-8"))
iv = encrypted_data[:16]
cipher = AES.new(self.secret_key, AES.MODE_CBC, iv)
decrypted_data = unpad(cipher.decrypt(encrypted_data[16:]), AES.block_size)
@@ -60,14 +62,55 @@ class EncryptionMiddleware(BaseHTTPMiddleware):
self.encryption_handler = encryption_handler
async def dispatch(self, request: Request, call_next):
encrypted_token = request.query_params.get("token")
path = request.url.path
token_marker = "/_token_"
encrypted_token = None
# Check for token in path
if token_marker in path and self.encryption_handler:
try:
# Extract token from path
token_start = path.find(token_marker) + len(token_marker)
token_end = path.find("/", token_start)
if token_end == -1: # No trailing slash (no filename after token)
token_end = len(path)
filename_part = ""
else:
# There's something after the token (likely a filename)
filename_part = path[token_end:]
# Get the encrypted token
encrypted_token = path[token_start:token_end]
# Modify the path to remove the token part but preserve the filename
original_path = path[: path.find(token_marker)]
original_path += filename_part # Add back the filename part
request.scope["path"] = original_path
# Update the raw path as well
request.scope["raw_path"] = original_path.encode()
except Exception as e:
logging.error(f"Error processing token in path: {str(e)}")
return JSONResponse(content={"error": f"Invalid token in path: {str(e)}"}, status_code=400)
# Check for token in query parameters (original method)
if not encrypted_token: # Only check if we didn't already find a token in the path
encrypted_token = request.query_params.get("token")
# Process the token if found (from either source)
if encrypted_token and self.encryption_handler:
try:
client_ip = self.get_client_ip(request)
decrypted_data = self.encryption_handler.decrypt_data(encrypted_token, client_ip)
# Modify request query parameters with decrypted data
query_params = dict(request.query_params)
query_params.pop("token") # Remove the encrypted token from query params
if "token" in query_params:
query_params.pop("token") # Remove the encrypted token from query params
query_params.update(decrypted_data) # Add decrypted data to query params
query_params["has_encrypted"] = True
@@ -75,8 +118,12 @@ class EncryptionMiddleware(BaseHTTPMiddleware):
new_query_string = urlencode(query_params)
request.scope["query_string"] = new_query_string.encode()
request._query_params = query_params
except HTTPException as e:
return JSONResponse(content={"error": str(e.detail)}, status_code=e.status_code)
except Exception as e:
logging.error(f"Error decrypting token: {str(e)}")
return JSONResponse(content={"error": f"Invalid token: {str(e)}"}, status_code=400)
try:
response = await call_next(request)