Added cognito authentication - Fix 12
All checks were successful
AWS Deploy on Push / build (push) Successful in 1m10s

This commit is contained in:
2025-05-15 17:27:51 -05:00
parent 795a25961f
commit 8d1997fa5a
2 changed files with 19 additions and 24 deletions

View File

@@ -1,8 +1,7 @@
import os import os
import boto3
import requests import requests
import jwt import jwt
from fastapi import Depends, HTTPException, status, Request from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2AuthorizationCodeBearer from fastapi.security import OAuth2AuthorizationCodeBearer
from fastapi.responses import RedirectResponse from fastapi.responses import RedirectResponse
@@ -10,47 +9,42 @@ REGION = "us-east-2"
USER_POOL_ID = os.getenv("COGNITO_USER_POOL_ID") USER_POOL_ID = os.getenv("COGNITO_USER_POOL_ID")
CLIENT_ID = os.getenv("COGNITO_CLIENT_ID") CLIENT_ID = os.getenv("COGNITO_CLIENT_ID")
DOMAIN = f"https://iptv-updater.auth.{REGION}.amazoncognito.com" DOMAIN = f"https://iptv-updater.auth.{REGION}.amazoncognito.com"
REDIRECT_URI = f"http://localhost:8000/auth/callback"
# Remove the hardcoded REDIRECT_URI, we'll make it dynamic based on the request oauth2_scheme = OAuth2AuthorizationCodeBearer(
class DynamicOAuth2(OAuth2AuthorizationCodeBearer):
async def __call__(self, request: Request):
self.redirect_uri = str(request.base_url) + "auth/callback"
return await super().__call__(request)
oauth2_scheme = DynamicOAuth2(
authorizationUrl=f"{DOMAIN}/oauth2/authorize", authorizationUrl=f"{DOMAIN}/oauth2/authorize",
tokenUrl=f"{DOMAIN}/oauth2/token" tokenUrl=f"{DOMAIN}/oauth2/token"
) )
def exchange_code_for_token(code: str, redirect_uri: str): def exchange_code_for_token(code: str):
token_url = f"{DOMAIN}/oauth2/token" token_url = f"{DOMAIN}/oauth2/token"
data = { data = {
'grant_type': 'authorization_code', 'grant_type': 'authorization_code',
'client_id': CLIENT_ID, 'client_id': CLIENT_ID,
'code': code, 'code': code,
'redirect_uri': redirect_uri 'redirect_uri': REDIRECT_URI
} }
response = requests.post(token_url, data=data) response = requests.post(token_url, data=data)
if response.status_code == 200: if response.status_code == 200:
return response.json() return response.json()
print(f"Token exchange failed: {response.text}") print(f"Token exchange failed: {response.text}") # Add logging
raise HTTPException(status_code=400, detail="Failed to exchange code for token") raise HTTPException(status_code=400, detail="Failed to exchange code for token")
async def get_current_user(request: Request, token: str = Depends(oauth2_scheme)): async def get_current_user(token: str = Depends(oauth2_scheme)):
if not token: if not token:
redirect_uri = str(request.base_url) + "auth/callback"
return RedirectResponse( return RedirectResponse(
f"{DOMAIN}/login?client_id={CLIENT_ID}" f"{DOMAIN}/login?client_id={CLIENT_ID}"
f"&response_type=code" f"&response_type=code"
f"&scope=openid+email+profile" f"&scope=openid+email+profile" # Added more scopes
f"&redirect_uri={redirect_uri}" f"&redirect_uri={REDIRECT_URI}"
) )
try: try:
# Decode JWT token instead of using get_user
decoded = jwt.decode( decoded = jwt.decode(
token, token,
options={"verify_signature": False} options={"verify_signature": False} # We trust tokens from Cognito
) )
return { return {
"Username": decoded.get("email") or decoded.get("sub"), "Username": decoded.get("email") or decoded.get("sub"),
@@ -60,7 +54,7 @@ async def get_current_user(request: Request, token: str = Depends(oauth2_scheme)
] ]
} }
except Exception as e: except Exception as e:
print(f"Token verification failed: {str(e)}") print(f"Token verification failed: {str(e)}") # Add logging
raise HTTPException( raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials", detail="Invalid authentication credentials",

View File

@@ -1,6 +1,6 @@
from fastapi import FastAPI, Depends, HTTPException, Request from fastapi import FastAPI, Depends, HTTPException
from fastapi.responses import JSONResponse, RedirectResponse from fastapi.responses import RedirectResponse, JSONResponse
from app.cabletv.utils.auth import exchange_code_for_token, get_current_user, DOMAIN, CLIENT_ID from app.cabletv.utils.auth import get_current_user, exchange_code_for_token
app = FastAPI() app = FastAPI()
@@ -15,16 +15,17 @@ async def protected_route(user = Depends(get_current_user)):
return {"message": "Protected content", "user": user['Username']} return {"message": "Protected content", "user": user['Username']}
@app.get("/auth/callback") @app.get("/auth/callback")
async def auth_callback(request: Request, code: str): async def auth_callback(code: str):
try: try:
redirect_uri = str(request.base_url) + "auth/callback" tokens = exchange_code_for_token(code)
tokens = exchange_code_for_token(code, redirect_uri)
# Use id_token instead of access_token
response = JSONResponse(content={ response = JSONResponse(content={
"message": "Authentication successful", "message": "Authentication successful",
"id_token": tokens["id_token"] # Changed from access_token "id_token": tokens["id_token"] # Changed from access_token
}) })
# Store id_token in cookie
response.set_cookie( response.set_cookie(
key="token", key="token",
value=tokens["id_token"], # Changed from access_token value=tokens["id_token"], # Changed from access_token