Files
iptv-manager-service/app/cabletv/utils/auth.py
Stefano 30ccf86c86
All checks were successful
AWS Deploy on Push / build (push) Successful in 1m13s
Added cognito authentication - Fix 8
2025-05-15 16:52:54 -05:00

96 lines
3.4 KiB
Python

import os
import boto3
import requests
import jwt
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import OAuth2AuthorizationCodeBearer
from fastapi.security.utils import get_authorization_scheme_param
from fastapi.responses import RedirectResponse
REGION = "us-east-2"
USER_POOL_ID = os.getenv("COGNITO_USER_POOL_ID")
CLIENT_ID = os.getenv("COGNITO_CLIENT_ID")
DOMAIN = f"https://iptv-updater.auth.{REGION}.amazoncognito.com"
class BrowserAwareOAuth2(OAuth2AuthorizationCodeBearer):
async def __call__(self, request: Request) -> str:
# Check if this is a browser request
is_browser = "text/html" in request.headers.get("accept", "")
# Try to get token from cookie first, then header
authorization = request.cookies.get("token")
if not authorization:
authorization = request.headers.get("Authorization")
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
if is_browser:
redirect_uri = str(request.base_url) + "auth/callback"
# Return redirect for browser requests
return RedirectResponse(
f"{DOMAIN}/login?client_id={CLIENT_ID}"
f"&response_type=code"
f"&scope=openid+email+profile"
f"&redirect_uri={redirect_uri}",
status_code=302
)
# Return 401 for API requests
raise HTTPException(
status_code=401,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
return param
# Update the oauth2_scheme to use our custom class
oauth2_scheme = BrowserAwareOAuth2(
authorizationUrl=f"{DOMAIN}/oauth2/authorize",
tokenUrl=f"{DOMAIN}/oauth2/token"
)
def exchange_code_for_token(code: str, redirect_uri: str):
token_url = f"{DOMAIN}/oauth2/token"
data = {
'grant_type': 'authorization_code',
'client_id': CLIENT_ID,
'code': code,
'redirect_uri': redirect_uri
}
response = requests.post(token_url, data=data)
if response.status_code == 200:
return response.json()
print(f"Token exchange failed: {response.text}")
raise HTTPException(status_code=400, detail="Failed to exchange code for token")
async def get_current_user(request: Request, token: str = Depends(oauth2_scheme)):
if not token:
redirect_uri = str(request.base_url) + "auth/callback"
return RedirectResponse(
f"{DOMAIN}/login?client_id={CLIENT_ID}"
f"&response_type=code"
f"&scope=openid+email+profile"
f"&redirect_uri={redirect_uri}"
)
try:
decoded = jwt.decode(
token,
options={"verify_signature": False}
)
return {
"Username": decoded.get("email") or decoded.get("sub"),
"UserAttributes": [
{"Name": k, "Value": v}
for k, v in decoded.items()
]
}
except Exception as e:
print(f"Token verification failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)