From 8d1997fa5ae750fe852e362a5ceea211f8236dc6 Mon Sep 17 00:00:00 2001 From: Stefano Date: Thu, 15 May 2025 17:27:51 -0500 Subject: [PATCH] Added cognito authentication - Fix 12 --- app/cabletv/utils/auth.py | 30 ++++++++++++------------------ app/main.py | 13 +++++++------ 2 files changed, 19 insertions(+), 24 deletions(-) diff --git a/app/cabletv/utils/auth.py b/app/cabletv/utils/auth.py index 3fa4306..5090850 100644 --- a/app/cabletv/utils/auth.py +++ b/app/cabletv/utils/auth.py @@ -1,8 +1,7 @@ import os -import boto3 import requests import jwt -from fastapi import Depends, HTTPException, status, Request +from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2AuthorizationCodeBearer from fastapi.responses import RedirectResponse @@ -10,47 +9,42 @@ REGION = "us-east-2" USER_POOL_ID = os.getenv("COGNITO_USER_POOL_ID") CLIENT_ID = os.getenv("COGNITO_CLIENT_ID") DOMAIN = f"https://iptv-updater.auth.{REGION}.amazoncognito.com" +REDIRECT_URI = f"http://localhost:8000/auth/callback" -# Remove the hardcoded REDIRECT_URI, we'll make it dynamic based on the request -class DynamicOAuth2(OAuth2AuthorizationCodeBearer): - async def __call__(self, request: Request): - self.redirect_uri = str(request.base_url) + "auth/callback" - return await super().__call__(request) - -oauth2_scheme = DynamicOAuth2( +oauth2_scheme = OAuth2AuthorizationCodeBearer( authorizationUrl=f"{DOMAIN}/oauth2/authorize", tokenUrl=f"{DOMAIN}/oauth2/token" ) -def exchange_code_for_token(code: str, redirect_uri: str): +def exchange_code_for_token(code: str): token_url = f"{DOMAIN}/oauth2/token" data = { 'grant_type': 'authorization_code', 'client_id': CLIENT_ID, 'code': code, - 'redirect_uri': redirect_uri + 'redirect_uri': REDIRECT_URI } response = requests.post(token_url, data=data) if response.status_code == 200: return response.json() - print(f"Token exchange failed: {response.text}") + print(f"Token exchange failed: {response.text}") # Add logging raise HTTPException(status_code=400, detail="Failed to exchange code for token") -async def get_current_user(request: Request, token: str = Depends(oauth2_scheme)): +async def get_current_user(token: str = Depends(oauth2_scheme)): if not token: - redirect_uri = str(request.base_url) + "auth/callback" return RedirectResponse( f"{DOMAIN}/login?client_id={CLIENT_ID}" f"&response_type=code" - f"&scope=openid+email+profile" - f"&redirect_uri={redirect_uri}" + f"&scope=openid+email+profile" # Added more scopes + f"&redirect_uri={REDIRECT_URI}" ) try: + # Decode JWT token instead of using get_user decoded = jwt.decode( token, - options={"verify_signature": False} + options={"verify_signature": False} # We trust tokens from Cognito ) return { "Username": decoded.get("email") or decoded.get("sub"), @@ -60,7 +54,7 @@ async def get_current_user(request: Request, token: str = Depends(oauth2_scheme) ] } except Exception as e: - print(f"Token verification failed: {str(e)}") + print(f"Token verification failed: {str(e)}") # Add logging raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", diff --git a/app/main.py b/app/main.py index 45deb18..dcfca00 100644 --- a/app/main.py +++ b/app/main.py @@ -1,6 +1,6 @@ -from fastapi import FastAPI, Depends, HTTPException, Request -from fastapi.responses import JSONResponse, RedirectResponse -from app.cabletv.utils.auth import exchange_code_for_token, get_current_user, DOMAIN, CLIENT_ID +from fastapi import FastAPI, Depends, HTTPException +from fastapi.responses import RedirectResponse, JSONResponse +from app.cabletv.utils.auth import get_current_user, exchange_code_for_token app = FastAPI() @@ -15,16 +15,17 @@ async def protected_route(user = Depends(get_current_user)): return {"message": "Protected content", "user": user['Username']} @app.get("/auth/callback") -async def auth_callback(request: Request, code: str): +async def auth_callback(code: str): try: - redirect_uri = str(request.base_url) + "auth/callback" - tokens = exchange_code_for_token(code, redirect_uri) + tokens = exchange_code_for_token(code) + # Use id_token instead of access_token response = JSONResponse(content={ "message": "Authentication successful", "id_token": tokens["id_token"] # Changed from access_token }) + # Store id_token in cookie response.set_cookie( key="token", value=tokens["id_token"], # Changed from access_token