68 lines
2.3 KiB
Python
68 lines
2.3 KiB
Python
import os
|
|
import boto3
|
|
import requests
|
|
import jwt
|
|
from fastapi import Depends, HTTPException, status, Request
|
|
from fastapi.security import OAuth2AuthorizationCodeBearer
|
|
from fastapi.responses import RedirectResponse
|
|
|
|
REGION = "us-east-2"
|
|
USER_POOL_ID = os.getenv("COGNITO_USER_POOL_ID")
|
|
CLIENT_ID = os.getenv("COGNITO_CLIENT_ID")
|
|
DOMAIN = f"https://iptv-updater.auth.{REGION}.amazoncognito.com"
|
|
|
|
# Remove the hardcoded REDIRECT_URI, we'll make it dynamic based on the request
|
|
class DynamicOAuth2(OAuth2AuthorizationCodeBearer):
|
|
async def __call__(self, request: Request):
|
|
self.redirect_uri = str(request.base_url) + "auth/callback"
|
|
return await super().__call__(request)
|
|
|
|
oauth2_scheme = DynamicOAuth2(
|
|
authorizationUrl=f"{DOMAIN}/oauth2/authorize",
|
|
tokenUrl=f"{DOMAIN}/oauth2/token"
|
|
)
|
|
|
|
def exchange_code_for_token(code: str, redirect_uri: str):
|
|
token_url = f"{DOMAIN}/oauth2/token"
|
|
data = {
|
|
'grant_type': 'authorization_code',
|
|
'client_id': CLIENT_ID,
|
|
'code': code,
|
|
'redirect_uri': redirect_uri
|
|
}
|
|
|
|
response = requests.post(token_url, data=data)
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
print(f"Token exchange failed: {response.text}")
|
|
raise HTTPException(status_code=400, detail="Failed to exchange code for token")
|
|
|
|
async def get_current_user(request: Request, token: str = Depends(oauth2_scheme)):
|
|
if not token:
|
|
redirect_uri = str(request.base_url) + "auth/callback"
|
|
return RedirectResponse(
|
|
f"{DOMAIN}/login?client_id={CLIENT_ID}"
|
|
f"&response_type=code"
|
|
f"&scope=openid+email+profile"
|
|
f"&redirect_uri={redirect_uri}"
|
|
)
|
|
|
|
try:
|
|
decoded = jwt.decode(
|
|
token,
|
|
options={"verify_signature": False}
|
|
)
|
|
return {
|
|
"Username": decoded.get("email") or decoded.get("sub"),
|
|
"UserAttributes": [
|
|
{"Name": k, "Value": v}
|
|
for k, v in decoded.items()
|
|
]
|
|
}
|
|
except Exception as e:
|
|
print(f"Token verification failed: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
) |