Added more unit tests for routers
All checks were successful
AWS Deploy on Push / build (push) Successful in 1m10s

This commit is contained in:
2025-05-27 23:41:00 -05:00
parent 32af6bbdb5
commit 903f190ee2
5 changed files with 1049 additions and 93 deletions

26
.vscode/settings.json vendored
View File

@@ -11,10 +11,29 @@
"botocore",
"BURSTABLE",
"cabletv",
"CDUF",
"cduflogo",
"cdulogo",
"CDUNF",
"cdunflogo",
"certbot",
"certifi",
"cfulogo",
"CLEU",
"cleulogo",
"CLUF",
"cluflogo",
"clulogo",
"cpulogo",
"cuflgo",
"CUNF",
"cunflogo",
"cuulogo",
"delenv",
"delogo",
"devel",
"dflogo",
"dmlogo",
"dotenv",
"fastapi",
"filterwarnings",
@@ -23,6 +42,7 @@
"fullchain",
"gitea",
"iptv",
"lclogo",
"LETSENCRYPT",
"nohup",
"ondelete",
@@ -42,10 +62,14 @@
"sqlalchemy",
"starlette",
"stefano",
"testadmin",
"testdb",
"testpass",
"testpaths",
"uflogo",
"umlogo",
"uvicorn",
"venv"
"venv",
"wrongpass"
]
}

107
tests/routers/test_auth.py Normal file
View File

@@ -0,0 +1,107 @@
from unittest.mock import patch
import pytest
from fastapi.testclient import TestClient
from fastapi import HTTPException, status
from app.main import app
client = TestClient(app)
@pytest.fixture
def mock_successful_auth():
return {
"AccessToken": "mock_access_token",
"IdToken": "mock_id_token",
"RefreshToken": "mock_refresh_token"
}
@pytest.fixture
def mock_successful_auth_no_refresh():
return {
"AccessToken": "mock_access_token",
"IdToken": "mock_id_token"
}
def test_signin_success(mock_successful_auth):
"""Test successful signin with all tokens"""
with patch('app.routers.auth.initiate_auth', return_value=mock_successful_auth):
response = client.post(
"/auth/signin",
json={"username": "testuser", "password": "testpass"}
)
assert response.status_code == 200
data = response.json()
assert data["access_token"] == "mock_access_token"
assert data["id_token"] == "mock_id_token"
assert data["refresh_token"] == "mock_refresh_token"
assert data["token_type"] == "Bearer"
def test_signin_success_no_refresh(mock_successful_auth_no_refresh):
"""Test successful signin without refresh token"""
with patch('app.routers.auth.initiate_auth', return_value=mock_successful_auth_no_refresh):
response = client.post(
"/auth/signin",
json={"username": "testuser", "password": "testpass"}
)
assert response.status_code == 200
data = response.json()
assert data["access_token"] == "mock_access_token"
assert data["id_token"] == "mock_id_token"
assert data["refresh_token"] is None
assert data["token_type"] == "Bearer"
def test_signin_invalid_input():
"""Test signin with invalid input format"""
# Missing password
response = client.post(
"/auth/signin",
json={"username": "testuser"}
)
assert response.status_code == 422
# Missing username
response = client.post(
"/auth/signin",
json={"password": "testpass"}
)
assert response.status_code == 422
# Empty payload
response = client.post(
"/auth/signin",
json={}
)
assert response.status_code == 422
def test_signin_auth_failure():
"""Test signin with authentication failure"""
with patch('app.routers.auth.initiate_auth') as mock_auth:
mock_auth.side_effect = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password"
)
response = client.post(
"/auth/signin",
json={"username": "testuser", "password": "wrongpass"}
)
assert response.status_code == 401
data = response.json()
assert data["detail"] == "Invalid username or password"
def test_signin_user_not_found():
"""Test signin with non-existent user"""
with patch('app.routers.auth.initiate_auth') as mock_auth:
mock_auth.side_effect = HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
response = client.post(
"/auth/signin",
json={"username": "nonexistent", "password": "testpass"}
)
assert response.status_code == 404
data = response.json()
assert data["detail"] == "User not found"

View File

@@ -0,0 +1,815 @@
import pytest
import uuid
from fastapi.testclient import TestClient
from fastapi import FastAPI, status
from sqlalchemy import String
from sqlalchemy.orm import Session
from app.auth.dependencies import get_current_user
from app.routers.channels import router as channels_router
from app.models.auth import CognitoUser
from app.utils.database import get_db
# Import mocks from db_mocks
from tests.utils.db_mocks import (
MockBase,
engine_mock,
session_mock as TestingSessionLocal,
mock_get_db,
MockChannelDB,
MockChannelURL,
MockPriority
)
# Create a FastAPI instance for testing
app = FastAPI()
# Mock current user
def mock_get_current_user_admin():
return CognitoUser(
username="testadmin",
email="testadmin@example.com",
roles=["admin"],
user_status="CONFIRMED",
enabled=True
)
def mock_get_current_user_non_admin():
return CognitoUser(
username="testuser",
email="testuser@example.com",
roles=["user"], # Or any role other than admin
user_status="CONFIRMED",
enabled=True
)
# Override dependencies
app.dependency_overrides[get_db] = mock_get_db
app.include_router(channels_router)
client = TestClient(app)
@pytest.fixture(scope="function")
def db_session():
# Create tables for each test function
MockBase.metadata.create_all(bind=engine_mock)
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
# Drop tables after each test function
MockBase.metadata.drop_all(bind=engine_mock)
@pytest.fixture(scope="function")
def admin_user_client(db_session: Session):
"""Yields a TestClient configured with an admin user."""
test_app = FastAPI()
test_app.include_router(channels_router)
test_app.dependency_overrides[get_db] = mock_get_db
test_app.dependency_overrides[get_current_user] = mock_get_current_user_admin
with TestClient(test_app) as test_client:
yield test_client
@pytest.fixture(scope="function")
def non_admin_user_client(db_session: Session):
"""Yields a TestClient configured with a non-admin user."""
test_app = FastAPI()
test_app.include_router(channels_router)
test_app.dependency_overrides[get_db] = mock_get_db
test_app.dependency_overrides[get_current_user] = mock_get_current_user_non_admin
with TestClient(test_app) as test_client:
yield test_client
# --- Test Cases For Channel Creation ---
def test_create_channel_success(db_session: Session, admin_user_client: TestClient):
# Setup a priority
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
channel_data = {
"tvg_id": "channel1.tv",
"name": "Test Channel 1",
"group_title": "Test Group",
"tvg_name": "TestChannel1",
"tvg_logo": "logo.png",
"urls": [
{"url": "http://stream1.com/test", "priority_id": 100}
]
}
response = admin_user_client.post("/channels/", json=channel_data) # No headers needed now
assert response.status_code == status.HTTP_201_CREATED
data = response.json()
assert data["name"] == "Test Channel 1"
assert data["group_title"] == "Test Group"
assert data["tvg_id"] == "channel1.tv"
assert len(data["urls"]) == 1
assert data["urls"][0]["url"] == "http://stream1.com/test"
assert data["urls"][0]["priority_id"] == 100
# Verify in DB
db_channel = db_session.query(MockChannelDB).filter(MockChannelDB.name == "Test Channel 1").first()
assert db_channel is not None
assert db_channel.group_title == "Test Group"
# Query URLs using exact string comparison
db_urls = db_session.query(MockChannelURL).filter(
MockChannelURL.channel_id.cast(String()) == db_channel.id
).all()
assert len(db_urls) == 1
assert db_urls[0].url == "http://stream1.com/test"
def test_create_channel_duplicate(db_session: Session, admin_user_client: TestClient):
# Setup a priority
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
# Create initial channel
initial_channel_data = {
"tvg_id": "channel_dup.tv",
"name": "Duplicate Channel",
"group_title": "Duplicate Group",
"tvg_name": "DuplicateChannelName",
"tvg_logo": "duplicate_logo.png",
"urls": [{"url": "http://stream_dup.com/test", "priority_id": 100}]
}
response1 = admin_user_client.post("/channels/", json=initial_channel_data)
assert response1.status_code == status.HTTP_201_CREATED
# Attempt to create the same channel again
response2 = admin_user_client.post("/channels/", json=initial_channel_data)
assert response2.status_code == status.HTTP_409_CONFLICT
assert "already exists" in response2.json()["detail"]
def test_create_channel_forbidden_for_non_admin(db_session: Session, non_admin_user_client: TestClient):
# Setup a priority
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
channel_data = {
"tvg_id": "channel_forbidden.tv",
"name": "Forbidden Channel",
"group_title": "Forbidden Group",
"tvg_name": "ForbiddenChannelName",
"tvg_logo": "forbidden_logo.png",
"urls": [{"url": "http://stream_forbidden.com/test", "priority_id": 100}]
}
response = non_admin_user_client.post("/channels/", json=channel_data)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert "required roles" in response.json()["detail"]
# --- Test Cases For Get Channel ---
def test_get_channel_success(db_session: Session, admin_user_client: TestClient):
# Setup a priority
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
# Create a channel first
channel_data_create = {
"tvg_id": "get_me.tv",
"name": "Get Me Channel",
"group_title": "Get Group",
"tvg_name": "GetMeChannelName",
"tvg_logo": "get_me_logo.png",
"urls": [{"url": "http://get_me.com/stream", "priority_id": 100}]
}
create_response = admin_user_client.post("/channels/", json=channel_data_create)
assert create_response.status_code == status.HTTP_201_CREATED
created_channel_id = create_response.json()["id"]
app.dependency_overrides[get_current_user] = mock_get_current_user_admin # Or a generic authenticated user
get_response = admin_user_client.get(f"/channels/{created_channel_id}")
assert get_response.status_code == status.HTTP_200_OK
data = get_response.json()
assert data["id"] == created_channel_id
assert data["name"] == "Get Me Channel"
assert data["group_title"] == "Get Group"
assert len(data["urls"]) == 1
app.dependency_overrides.pop(get_current_user, None)
def test_get_channel_not_found(db_session: Session, admin_user_client: TestClient):
app.dependency_overrides[get_current_user] = mock_get_current_user_admin
random_uuid = uuid.uuid4()
response = admin_user_client.get(f"/channels/{random_uuid}")
assert response.status_code == status.HTTP_404_NOT_FOUND
assert "Channel not found" in response.json()["detail"]
app.dependency_overrides.pop(get_current_user, None)
# --- Test Cases For Update Channel ---
def test_update_channel_success(db_session: Session, admin_user_client: TestClient):
# Setup priority and create initial channel
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
initial_channel_data = {
"tvg_id": "update_me.tv",
"name": "Update Me Channel",
"group_title": "Update Group",
"tvg_name": "UpdateMeChannelName",
"tvg_logo": "update_me_logo.png",
"urls": [{"url": "http://update_me.com/stream", "priority_id": 100}]
}
create_response = admin_user_client.post("/channels/", json=initial_channel_data)
assert create_response.status_code == status.HTTP_201_CREATED
created_channel_id = create_response.json()["id"]
update_data = {
"name": "Updated Channel Name",
"tvg_logo": "new_logo.png"
}
response = admin_user_client.put(f"/channels/{created_channel_id}", json=update_data)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["id"] == created_channel_id
assert data["name"] == "Updated Channel Name"
assert data["group_title"] == "Update Group"
assert data["tvg_logo"] == "new_logo.png"
# Verify in DB
db_channel = db_session.query(MockChannelDB).filter(MockChannelDB.id.cast(String()) == uuid.UUID(created_channel_id)).first()
assert db_channel is not None
assert db_channel.name == "Updated Channel Name"
assert db_channel.tvg_logo == "new_logo.png"
def test_update_channel_conflict(db_session: Session, admin_user_client: TestClient):
# Setup priority
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
# Create channel 1
channel1_data = {
"tvg_id": "c1.tv", "name": "Channel One", "group_title": "Group A",
"tvg_name": "C1Name", "tvg_logo": "c1logo.png",
"urls": [{"url": "http://c1.com", "priority_id": 100}]
}
admin_user_client.post("/channels/", json=channel1_data)
# Create channel 2
channel2_data = {
"tvg_id": "c2.tv", "name": "Channel Two", "group_title": "Group B",
"tvg_name": "C2Name", "tvg_logo": "c2logo.png",
"urls": [{"url": "http://c2.com", "priority_id": 100}]
}
response_c2 = admin_user_client.post("/channels/", json=channel2_data)
channel2_id = response_c2.json()["id"]
# Attempt to update channel 2 to conflict with channel 1
update_conflict_data = {"name": "Channel One", "group_title": "Group A"}
response = admin_user_client.put(f"/channels/{channel2_id}", json=update_conflict_data)
assert response.status_code == status.HTTP_409_CONFLICT
assert "already exists" in response.json()["detail"]
def test_update_channel_not_found(db_session: Session, admin_user_client: TestClient):
random_uuid = uuid.uuid4()
update_data = {"name": "Non Existent Update"}
response = admin_user_client.put(f"/channels/{random_uuid}", json=update_data)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert "Channel not found" in response.json()["detail"]
def test_update_channel_forbidden_for_non_admin(db_session: Session, non_admin_user_client: TestClient, admin_user_client: TestClient):
# Setup priority and create initial channel with admin
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
initial_channel_data = {
"tvg_id": "update_forbidden.tv", "name": "Update Forbidden", "group_title": "Forbidden Update Group",
"tvg_name": "UFName", "tvg_logo": "uflogo.png",
"urls": [{"url": "http://update_forbidden.com", "priority_id": 100}]
}
create_response = admin_user_client.post("/channels/", json=initial_channel_data)
created_channel_id = create_response.json()["id"]
update_data = {"name": "Attempted Update"}
response = non_admin_user_client.put(f"/channels/{created_channel_id}", json=update_data)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert "required roles" in response.json()["detail"]
# --- Test Cases For Delete Channel ---
def test_delete_channel_success(db_session: Session, admin_user_client: TestClient):
# Setup priority and create initial channel
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
initial_channel_data = {
"tvg_id": "delete_me.tv",
"name": "Delete Me Channel",
"group_title": "Delete Group",
"tvg_name": "DMName", "tvg_logo": "dmlogo.png",
"urls": [{"url": "http://delete_me.com/stream", "priority_id": 100}]
}
create_response = admin_user_client.post("/channels/", json=initial_channel_data)
assert create_response.status_code == status.HTTP_201_CREATED
created_channel_id = create_response.json()["id"]
# Verify it exists before delete
db_channel_before_delete = db_session.query(MockChannelDB).filter(MockChannelDB.id.cast(String()) == uuid.UUID(created_channel_id)).first()
assert db_channel_before_delete is not None
delete_response = admin_user_client.delete(f"/channels/{created_channel_id}")
assert delete_response.status_code == status.HTTP_204_NO_CONTENT
# Verify it's gone from DB
db_channel_after_delete = db_session.query(MockChannelDB).filter(MockChannelDB.id.cast(String()) == uuid.UUID(created_channel_id)).first()
assert db_channel_after_delete is None
# Also verify associated URLs are deleted (due to CASCADE in mock model)
db_urls_after_delete = db_session.query(MockChannelURL).filter(MockChannelURL.channel_id.cast(String()) == uuid.UUID(created_channel_id)).all()
assert len(db_urls_after_delete) == 0
def test_delete_channel_not_found(db_session: Session, admin_user_client: TestClient):
random_uuid = uuid.uuid4()
response = admin_user_client.delete(f"/channels/{random_uuid}")
assert response.status_code == status.HTTP_404_NOT_FOUND
assert "Channel not found" in response.json()["detail"]
def test_delete_channel_forbidden_for_non_admin(db_session: Session, non_admin_user_client: TestClient, admin_user_client: TestClient):
# Setup priority and create initial channel with admin
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
initial_channel_data = {
"tvg_id": "delete_forbidden.tv", "name": "Delete Forbidden", "group_title": "Forbidden Delete Group",
"tvg_name": "DFName", "tvg_logo": "dflogo.png",
"urls": [{"url": "http://delete_forbidden.com", "priority_id": 100}]
}
create_response = admin_user_client.post("/channels/", json=initial_channel_data)
created_channel_id = create_response.json()["id"]
response = non_admin_user_client.delete(f"/channels/{created_channel_id}")
assert response.status_code == status.HTTP_403_FORBIDDEN
assert "required roles" in response.json()["detail"]
# Ensure channel was not deleted
db_channel_not_deleted = db_session.query(MockChannelDB).filter(MockChannelDB.id.cast(String()) == uuid.UUID(created_channel_id)).first()
assert db_channel_not_deleted is not None
# --- Test Cases For List Channels ---
def test_list_channels_empty(db_session: Session, admin_user_client: TestClient):
response = admin_user_client.get("/channels/")
assert response.status_code == status.HTTP_200_OK
assert response.json() == []
def test_list_channels_with_data_and_pagination(db_session: Session, admin_user_client: TestClient):
# Setup priority
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
# Create some channels
for i in range(5):
channel_data = {
"tvg_id": f"list_c{i}.tv", "name": f"List Channel {i}", "group_title": "List Group",
"tvg_name": f"LCName{i}", "tvg_logo": f"lclogo{i}.png",
"urls": [{"url": f"http://list_c{i}.com", "priority_id": 100}]
}
admin_user_client.post("/channels/", json=channel_data)
# Test default pagination (limit 100)
response_all = admin_user_client.get("/channels/")
assert response_all.status_code == status.HTTP_200_OK
data_all = response_all.json()
assert len(data_all) == 5
assert data_all[0]["name"] == "List Channel 0"
# Test limit
response_limit = admin_user_client.get("/channels/?limit=2")
assert response_limit.status_code == status.HTTP_200_OK
data_limit = response_limit.json()
assert len(data_limit) == 2
assert data_limit[0]["name"] == "List Channel 0"
assert data_limit[1]["name"] == "List Channel 1"
# Test skip and limit
response_skip_limit = admin_user_client.get("/channels/?skip=2&limit=2")
assert response_skip_limit.status_code == status.HTTP_200_OK
data_skip_limit = response_skip_limit.json()
assert len(data_skip_limit) == 2
assert data_skip_limit[0]["name"] == "List Channel 2"
assert data_skip_limit[1]["name"] == "List Channel 3"
# Test skip beyond data
response_skip_beyond = admin_user_client.get("/channels/?skip=10")
assert response_skip_beyond.status_code == status.HTTP_200_OK
assert response_skip_beyond.json() == []
def test_list_channels_forbidden_for_non_admin(db_session: Session, non_admin_user_client: TestClient):
response = non_admin_user_client.get("/channels/")
assert response.status_code == status.HTTP_403_FORBIDDEN
assert "required roles" in response.json()["detail"]
# --- Test Cases For Add Channel URL ---
def test_add_channel_url_success(db_session: Session, admin_user_client: TestClient):
# Setup priority and create a channel
priority1 = MockPriority(id=100, description="High")
priority2 = MockPriority(id=200, description="Medium")
db_session.add_all([priority1, priority2])
db_session.commit()
channel_data = {
"tvg_id": "channel_for_url.tv", "name": "Channel For URL", "group_title": "URL Group",
"tvg_name": "CFUName", "tvg_logo": "cfulogo.png",
"urls": [{"url": "http://initial.com/stream", "priority_id": 100}]
}
create_response = admin_user_client.post("/channels/", json=channel_data)
assert create_response.status_code == status.HTTP_201_CREATED
created_channel_id = create_response.json()["id"]
url_data = {"url": "http://new_stream.com/live", "priority_id": 200}
response = admin_user_client.post(f"/channels/{created_channel_id}/urls", json=url_data)
assert response.status_code == status.HTTP_201_CREATED
data = response.json()
assert data["url"] == "http://new_stream.com/live"
assert data["priority_id"] == 200
# assert data["channel_id"] == created_channel_id # ChannelURLResponse does not include channel_id
assert data["in_use"] is False # Default
# Verify in DB
db_url = db_session.query(MockChannelURL).filter(MockChannelURL.id.cast(String()) == uuid.UUID(data["id"])).first()
assert db_url is not None
assert db_url.url == "http://new_stream.com/live"
assert db_url.priority_id == 200
assert db_url.channel_id == uuid.UUID(created_channel_id)
# Check the channel now has two URLs
# Re-fetch channel to get updated URLs list
db_session.expire_all() # Expire to ensure fresh data from DB if ChannelResponse is not dynamic
# Let's verify by querying the database directly for the count of URLs for the channel
url_count = db_session.query(MockChannelURL).filter(MockChannelURL.channel_id.cast(String()) == uuid.UUID(created_channel_id)).count()
assert url_count == 2
# And also check the response from get_channel
channel_get_response = admin_user_client.get(f"/channels/{created_channel_id}")
assert channel_get_response.status_code == status.HTTP_200_OK
channel_details = channel_get_response.json()
assert len(channel_details["urls"]) == 2
def test_add_channel_url_channel_not_found(db_session: Session, admin_user_client: TestClient):
# Setup priority
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
random_channel_uuid = uuid.uuid4()
url_data = {"url": "http://stream_no_channel.com", "priority_id": 100}
response = admin_user_client.post(f"/channels/{random_channel_uuid}/urls", json=url_data)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert "Channel not found" in response.json()["detail"]
def test_add_channel_url_forbidden_for_non_admin(db_session: Session, non_admin_user_client: TestClient, admin_user_client: TestClient):
# Setup priority and create a channel with admin
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
channel_data = {
"tvg_id": "url_forbidden.tv", "name": "URL Forbidden", "group_title": "URL Forbidden Group",
"tvg_name": "UFName2", "tvg_logo": "uflogo2.png",
"urls": [{"url": "http://url_forbidden.com", "priority_id": 100}]
}
create_response = admin_user_client.post("/channels/", json=channel_data)
created_channel_id = create_response.json()["id"]
url_data = {"url": "http://new_stream_forbidden.com", "priority_id": 100}
response = non_admin_user_client.post(f"/channels/{created_channel_id}/urls", json=url_data)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert "required roles" in response.json()["detail"]
# --- Test Cases For Update Channel URL ---
def test_update_channel_url_success(db_session: Session, admin_user_client: TestClient):
# Setup priorities and create a channel with a URL
priority1 = MockPriority(id=100, description="High")
priority2 = MockPriority(id=200, description="Medium")
priority3 = MockPriority(id=300, description="Low") # New priority for update, Use valid priority ID
db_session.add_all([priority1, priority2, priority3])
db_session.commit()
channel_data = {
"tvg_id": "ch_update_url.tv", "name": "Channel Update URL", "group_title": "URL Update Group",
"tvg_name": "CUUName", "tvg_logo": "cuulogo.png",
"urls": [{"url": "http://original_url.com/stream", "priority_id": 100}]
}
create_ch_response = admin_user_client.post("/channels/", json=channel_data)
created_channel_id = create_ch_response.json()["id"]
# Get the ID of the initially created URL
initial_url_id = create_ch_response.json()["urls"][0]["id"]
update_url_data = {
"url": "http://updated_url.com/live",
"priority_id": 300,
"in_use": True
}
response = admin_user_client.put(f"/channels/{created_channel_id}/urls/{initial_url_id}", json=update_url_data)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["id"] == initial_url_id
assert data["url"] == "http://updated_url.com/live"
assert data["priority_id"] == 300
assert data["in_use"] is True
# Verify in DB
db_url = db_session.query(MockChannelURL).filter(MockChannelURL.id.cast(String()) == uuid.UUID(initial_url_id)).first()
assert db_url is not None
assert db_url.url == "http://updated_url.com/live"
assert db_url.priority_id == 300
assert db_url.in_use is True
def test_update_channel_url_partial_success(db_session: Session, admin_user_client: TestClient):
# Setup priorities and create a channel with a URL
priority1 = MockPriority(id=100, description="High")
db_session.add_all([priority1])
db_session.commit()
channel_data = {
"tvg_id": "ch_partial_update_url.tv", "name": "Channel Partial Update URL", "group_title": "URL Partial Update Group",
"tvg_name": "CPUName", "tvg_logo": "cpulogo.png",
"urls": [{"url": "http://partial_original.com/stream", "priority_id": 100}]
}
create_ch_response = admin_user_client.post("/channels/", json=channel_data)
created_channel_id = create_ch_response.json()["id"]
initial_url_id = create_ch_response.json()["urls"][0]["id"]
# Update only 'in_use'
update_url_data = {"in_use": True}
response = admin_user_client.put(f"/channels/{created_channel_id}/urls/{initial_url_id}", json=update_url_data)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["id"] == initial_url_id
assert data["url"] == "http://partial_original.com/stream"
assert data["priority_id"] == 100
assert data["in_use"] is True
# Verify in DB
db_url = db_session.query(MockChannelURL).filter(MockChannelURL.id.cast(String()) == uuid.UUID(initial_url_id)).first()
assert db_url is not None
assert db_url.in_use is True
assert db_url.url == "http://partial_original.com/stream"
assert db_url.priority_id == 100
def test_update_channel_url_url_not_found(db_session: Session, admin_user_client: TestClient):
# Setup priority and create a channel
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
channel_data = {
"tvg_id": "ch_url_not_found.tv", "name": "Channel URL Not Found", "group_title": "URL Not Found Group",
"tvg_name": "CUNFName", "tvg_logo": "cunflogo.png",
"urls": []
}
create_ch_response = admin_user_client.post("/channels/", json=channel_data)
created_channel_id = create_ch_response.json()["id"]
random_url_uuid = uuid.uuid4()
update_data = {"url": "http://does_not_matter.com"}
response = admin_user_client.put(f"/channels/{created_channel_id}/urls/{random_url_uuid}", json=update_data)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert "URL not found" in response.json()["detail"]
def test_update_channel_url_channel_id_mismatch_is_url_not_found(db_session: Session, admin_user_client: TestClient):
# This tests if a URL ID exists but is not associated with the given channel_id in the path
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
# Create channel 1 with a URL
ch1_data = {"tvg_id": "ch1_url_mismatch.tv", "name": "CH1 URL Mismatch", "group_title": "G1", "tvg_name":"C1UMName", "tvg_logo":"c1umlogo.png", "urls": [{"url":"http://ch1.url", "priority_id":100}]}
ch1_resp = admin_user_client.post("/channels/", json=ch1_data)
url_id_from_ch1 = ch1_resp.json()["urls"][0]["id"]
# Create channel 2
ch2_data = {"tvg_id": "ch2_url_mismatch.tv", "name": "CH2 URL Mismatch", "group_title": "G2", "tvg_name":"C2UMName", "tvg_logo":"c2umlogo.png", "urls": []} # priority_id not needed here
ch2_resp = admin_user_client.post("/channels/", json=ch2_data)
ch2_id = ch2_resp.json()["id"]
# Try to update URL from CH1 using CH2's ID in path
update_data = {"url": "http://mismatch_update.com"}
response = admin_user_client.put(f"/channels/{ch2_id}/urls/{url_id_from_ch1}", json=update_data)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert "URL not found" in response.json()["detail"]
def test_update_channel_url_forbidden_for_non_admin(db_session: Session, non_admin_user_client: TestClient, admin_user_client: TestClient):
# Setup priority and create channel with URL using admin
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
channel_data = {
"tvg_id": "ch_update_url_forbidden.tv", "name": "Channel Update URL Forbidden", "group_title": "URL Update Forbidden Group",
"tvg_name": "CUFName", "tvg_logo": "cuflgo.png",
"urls": [{"url": "http://original_forbidden.com/stream", "priority_id": 100}]
}
create_ch_response = admin_user_client.post("/channels/", json=channel_data)
created_channel_id = create_ch_response.json()["id"]
initial_url_id = create_ch_response.json()["urls"][0]["id"]
update_url_data = {"url": "http://attempted_update_forbidden.com"}
response = non_admin_user_client.put(f"/channels/{created_channel_id}/urls/{initial_url_id}", json=update_url_data)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert "required roles" in response.json()["detail"]
# --- Test Cases For Delete Channel URL ---
def test_delete_channel_url_success(db_session: Session, admin_user_client: TestClient):
# Setup priority and create a channel with a URL
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
channel_data = {
"tvg_id": "ch_delete_url.tv", "name": "Channel Delete URL", "group_title": "URL Delete Group",
"tvg_name": "CDUName", "tvg_logo": "cdulogo.png",
"urls": [{"url": "http://delete_this_url.com/stream", "priority_id": 100}]
}
create_ch_response = admin_user_client.post("/channels/", json=channel_data)
created_channel_id = create_ch_response.json()["id"]
url_to_delete_id = create_ch_response.json()["urls"][0]["id"]
# Verify URL exists before delete
db_url_before = db_session.query(MockChannelURL).filter(MockChannelURL.id.cast(String()) == uuid.UUID(url_to_delete_id)).first()
assert db_url_before is not None
delete_response = admin_user_client.delete(f"/channels/{created_channel_id}/urls/{url_to_delete_id}")
assert delete_response.status_code == status.HTTP_204_NO_CONTENT
# Verify URL is gone from DB
db_url_after = db_session.query(MockChannelURL).filter(MockChannelURL.id.cast(String()) == uuid.UUID(url_to_delete_id)).first()
assert db_url_after is None
# Verify channel still exists and has no URLs
channel_response = admin_user_client.get(f"/channels/{created_channel_id}")
assert channel_response.status_code == status.HTTP_200_OK
assert len(channel_response.json()["urls"]) == 0
def test_delete_channel_url_url_not_found(db_session: Session, admin_user_client: TestClient):
# Setup priority and create a channel
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
channel_data = {
"tvg_id": "ch_del_url_not_found.tv", "name": "Channel Del URL Not Found", "group_title": "URL Del Not Found Group",
"tvg_name": "CDUNFName", "tvg_logo": "cdunflogo.png",
"urls": []
}
create_ch_response = admin_user_client.post("/channels/", json=channel_data)
created_channel_id = create_ch_response.json()["id"]
random_url_uuid = uuid.uuid4()
response = admin_user_client.delete(f"/channels/{created_channel_id}/urls/{random_url_uuid}")
assert response.status_code == status.HTTP_404_NOT_FOUND
assert "URL not found" in response.json()["detail"]
def test_delete_channel_url_channel_id_mismatch_is_url_not_found(db_session: Session, admin_user_client: TestClient):
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
# Create channel 1 with a URL
ch1_data = {
"name": "CH1 Del URL Mismatch",
"tvg_id": "ch1_del_url_mismatch.tv",
"tvg_name": "CH1 Del URL Mismatch",
"tvg_logo": "ch1delogo.png",
"group_title": "G1Del",
"urls": [{"url":"http://ch1del.url", "priority_id":100}]}
ch1_resp = admin_user_client.post("/channels/", json=ch1_data)
print(ch1_resp.json())
url_id_from_ch1 = ch1_resp.json()["urls"][0]["id"]
# Create channel 2
ch2_data = {
"tvg_id": "ch2_del_url_mismatch.tv",
"name": "CH2 Del URL Mismatch",
"tvg_name": "CH2 Del URL Mismatch",
"tvg_logo": "ch2delogo.png",
"group_title": "G2Del",
"urls": []
}
ch2_resp = admin_user_client.post("/channels/", json=ch2_data)
ch2_id = ch2_resp.json()["id"]
# Try to delete URL from CH1 using CH2's ID in path
response = admin_user_client.delete(f"/channels/{ch2_id}/urls/{url_id_from_ch1}")
assert response.status_code == status.HTTP_404_NOT_FOUND
assert "URL not found" in response.json()["detail"]
# Ensure the original URL on CH1 was not deleted
db_url_ch1 = db_session.query(MockChannelURL).filter(MockChannelURL.id.cast(String()) == uuid.UUID(url_id_from_ch1)).first()
assert db_url_ch1 is not None
def test_delete_channel_url_forbidden_for_non_admin(db_session: Session, non_admin_user_client: TestClient, admin_user_client: TestClient):
# Setup priority and create channel with URL using admin
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
channel_data = {
"tvg_id": "ch_del_url_forbidden.tv", "name": "Channel Del URL Forbidden", "group_title": "URL Del Forbidden Group",
"tvg_name": "CDUFName", "tvg_logo": "cduflogo.png",
"urls": [{"url": "http://original_del_forbidden.com/stream", "priority_id": 100}]
}
create_ch_response = admin_user_client.post("/channels/", json=channel_data)
created_channel_id = create_ch_response.json()["id"]
initial_url_id = create_ch_response.json()["urls"][0]["id"]
response = non_admin_user_client.delete(f"/channels/{created_channel_id}/urls/{initial_url_id}")
assert response.status_code == status.HTTP_403_FORBIDDEN
assert "required roles" in response.json()["detail"]
# Ensure URL was not deleted
db_url_not_deleted = db_session.query(MockChannelURL).filter(MockChannelURL.id.cast(String()) == uuid.UUID(initial_url_id)).first()
assert db_url_not_deleted is not None
# --- Test Cases For List Channel URLs ---
def test_list_channel_urls_success(db_session: Session, admin_user_client: TestClient):
# Setup priorities and create a channel with multiple URLs
priority1 = MockPriority(id=100, description="High")
priority2 = MockPriority(id=200, description="Medium")
db_session.add_all([priority1, priority2])
db_session.commit()
channel_data = {
"tvg_id": "ch_list_urls.tv", "name": "Channel List URLs", "group_title": "URL List Group",
"tvg_name": "CLUName", "tvg_logo": "clulogo.png",
"urls": [
{"url": "http://list_url1.com/stream", "priority_id": 100},
{"url": "http://list_url2.com/live", "priority_id": 200}
]
}
create_ch_response = admin_user_client.post("/channels/", json=channel_data)
created_channel_id = create_ch_response.json()["id"]
# URLs are added during channel creation, let's get their IDs for assertion if needed
# For now, we'll just check the count and content based on what was provided.
response = admin_user_client.get(f"/channels/{created_channel_id}/urls")
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert len(data) == 2
# Check if the URLs returned match what we expect (order might not be guaranteed by default)
returned_urls_set = {(item["url"], item["priority_id"]) for item in data}
expected_urls_set = {
("http://list_url1.com/stream", 100),
("http://list_url2.com/live", 200)
}
assert returned_urls_set == expected_urls_set
def test_list_channel_urls_empty(db_session: Session, admin_user_client: TestClient):
# Create a channel with no URLs initially
# No need to set up MockPriority if no URLs with priority_id are being created.
channel_data = {
"tvg_id": "ch_list_empty_urls.tv", "name": "Channel List Empty URLs", "group_title": "URL List Empty Group",
"tvg_name": "CLEUName", "tvg_logo": "cleulogo.png",
"urls": []
}
create_ch_response = admin_user_client.post("/channels/", json=channel_data)
created_channel_id = create_ch_response.json()["id"]
response = admin_user_client.get(f"/channels/{created_channel_id}/urls")
assert response.status_code == status.HTTP_200_OK
assert response.json() == []
def test_list_channel_urls_channel_not_found(db_session: Session, admin_user_client: TestClient):
random_channel_uuid = uuid.uuid4()
response = admin_user_client.get(f"/channels/{random_channel_uuid}/urls")
assert response.status_code == status.HTTP_404_NOT_FOUND
assert "Channel not found" in response.json()["detail"]
def test_list_channel_urls_forbidden_for_non_admin(db_session: Session, non_admin_user_client: TestClient, admin_user_client: TestClient):
# Setup priority and create channel with admin
priority1 = MockPriority(id=100, description="High")
db_session.add(priority1)
db_session.commit()
channel_data = {
"tvg_id": "ch_list_url_forbidden.tv", "name": "Channel List URL Forbidden", "group_title": "URL List Forbidden Group",
"tvg_name": "CLUFName", "tvg_logo": "cluflogo.png",
"urls": [{"url": "http://list_url_forbidden.com", "priority_id": 100}]
}
create_ch_response = admin_user_client.post("/channels/", json=channel_data)
created_channel_id = create_ch_response.json()["id"]
response = non_admin_user_client.get(f"/channels/{created_channel_id}/urls")
assert response.status_code == status.HTTP_403_FORBIDDEN
assert "required roles" in response.json()["detail"]
assert response.status_code == status.HTTP_403_FORBIDDEN
assert "required roles" in response.json()["detail"]

95
tests/utils/db_mocks.py Normal file
View File

@@ -0,0 +1,95 @@
import os
import uuid
from datetime import datetime, timezone
from unittest.mock import patch, MagicMock
from sqlalchemy.pool import StaticPool
from sqlalchemy.orm import Session, sessionmaker, declarative_base
from sqlalchemy import create_engine, TypeDecorator, TEXT, Column, String, DateTime, UniqueConstraint, ForeignKey, Boolean, Integer
import pytest
# Create a mock-specific Base class for testing
MockBase = declarative_base()
class SQLiteUUID(TypeDecorator):
"""Enables UUID support for SQLite."""
impl = TEXT
cache_ok = True
def process_bind_param(self, value, dialect):
if value is None:
return value
return str(value)
def process_result_value(self, value, dialect):
if value is None:
return value
return uuid.UUID(value)
# Model classes for testing - prefix with Mock to avoid pytest collection
class MockPriority(MockBase):
__tablename__ = "priorities"
id = Column(Integer, primary_key=True)
description = Column(String, nullable=False)
class MockChannelDB(MockBase):
__tablename__ = "channels"
id = Column(SQLiteUUID(), primary_key=True, default=uuid.uuid4)
tvg_id = Column(String, nullable=False)
name = Column(String, nullable=False)
group_title = Column(String, nullable=False)
tvg_name = Column(String)
__table_args__ = (
UniqueConstraint('group_title', 'name', name='uix_group_title_name'),
)
tvg_logo = Column(String)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
class MockChannelURL(MockBase):
__tablename__ = "channels_urls"
id = Column(SQLiteUUID(), primary_key=True, default=uuid.uuid4)
channel_id = Column(SQLiteUUID(), ForeignKey('channels.id', ondelete='CASCADE'), nullable=False)
url = Column(String, nullable=False)
in_use = Column(Boolean, default=False, nullable=False)
priority_id = Column(Integer, ForeignKey('priorities.id'), nullable=False)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
# Create test engine
engine_mock = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool
)
# Create test session
session_mock = sessionmaker(autocommit=False, autoflush=False, bind=engine_mock)
# Mock the actual database functions
def mock_get_db():
db = session_mock()
try:
yield db
finally:
db.close()
@pytest.fixture(autouse=True)
def mock_env(monkeypatch):
"""Fixture for mocking environment variables"""
monkeypatch.setenv("MOCK_AUTH", "true")
monkeypatch.setenv("DB_USER", "testuser")
monkeypatch.setenv("DB_PASSWORD", "testpass")
monkeypatch.setenv("DB_HOST", "localhost")
monkeypatch.setenv("DB_NAME", "testdb")
monkeypatch.setenv("AWS_REGION", "us-east-1")
@pytest.fixture
def mock_ssm():
"""Fixture for mocking boto3 SSM client"""
with patch('boto3.client') as mock_client:
mock_ssm = MagicMock()
mock_client.return_value = mock_ssm
mock_ssm.get_parameter.return_value = {
'Parameter': {'Value': 'mocked_value'}
}
yield mock_ssm

View File

@@ -1,100 +1,15 @@
import os
import pytest
import uuid
from datetime import datetime, timezone
from unittest.mock import patch, MagicMock
from sqlalchemy.pool import StaticPool
from sqlalchemy.orm import Session, sessionmaker, declarative_base
from sqlalchemy import create_engine, TypeDecorator, TEXT, Column, String, DateTime, UniqueConstraint, ForeignKey, Boolean, Integer
from unittest.mock import patch
from sqlalchemy.orm import Session
from app.utils.database import get_db_credentials, get_db
# Create a mock-specific Base class for testing
MockBase = declarative_base()
class SQLiteUUID(TypeDecorator):
"""Enables UUID support for SQLite."""
impl = TEXT
cache_ok = True
def process_bind_param(self, value, dialect):
if value is None:
return value
return str(value)
def process_result_value(self, value, dialect):
if value is None:
return value
return uuid.UUID(value)
# Model classes for testing - prefix with Mock to avoid pytest collection
class MockPriority(MockBase):
__tablename__ = "priorities"
id = Column(Integer, primary_key=True)
description = Column(String, nullable=False)
class MockChannelDB(MockBase):
__tablename__ = "channels"
id = Column(SQLiteUUID(), primary_key=True, default=uuid.uuid4)
tvg_id = Column(String, nullable=False)
name = Column(String, nullable=False)
group_title = Column(String, nullable=False)
tvg_name = Column(String)
__table_args__ = (
UniqueConstraint('group_title', 'name', name='uix_group_title_name'),
)
tvg_logo = Column(String)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
class MockChannelURL(MockBase):
__tablename__ = "channels_urls"
id = Column(SQLiteUUID(), primary_key=True, default=uuid.uuid4)
channel_id = Column(SQLiteUUID(), ForeignKey('channels.id', ondelete='CASCADE'), nullable=False)
url = Column(String, nullable=False)
in_use = Column(Boolean, default=False, nullable=False)
priority_id = Column(Integer, ForeignKey('priorities.id'), nullable=False)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
# Create test engine
engine_mock = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool
from tests.utils.db_mocks import (
session_mock,
mock_get_db,
mock_env,
mock_ssm
)
# Create test session
session_mock = sessionmaker(autocommit=False, autoflush=False, bind=engine_mock)
# Mock the actual database functions
def mock_get_db():
db = session_mock()
try:
yield db
finally:
db.close()
@pytest.fixture(autouse=True)
def mock_env(monkeypatch):
"""Fixture for mocking environment variables"""
monkeypatch.setenv("MOCK_AUTH", "true")
monkeypatch.setenv("DB_USER", "testuser")
monkeypatch.setenv("DB_PASSWORD", "testpass")
monkeypatch.setenv("DB_HOST", "localhost")
monkeypatch.setenv("DB_NAME", "testdb")
monkeypatch.setenv("AWS_REGION", "us-east-1")
@pytest.fixture
def mock_ssm():
"""Fixture for mocking boto3 SSM client"""
with patch('boto3.client') as mock_client:
mock_ssm = MagicMock()
mock_client.return_value = mock_ssm
mock_ssm.get_parameter.return_value = {
'Parameter': {'Value': 'mocked_value'}
}
yield mock_ssm
def test_get_db_credentials_env(mock_env):
"""Test getting DB credentials from environment variables"""
conn_str = get_db_credentials()