diff --git a/.vscode/settings.json b/.vscode/settings.json index 1712ada..012dc88 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -11,10 +11,29 @@ "botocore", "BURSTABLE", "cabletv", + "CDUF", + "cduflogo", + "cdulogo", + "CDUNF", + "cdunflogo", "certbot", "certifi", + "cfulogo", + "CLEU", + "cleulogo", + "CLUF", + "cluflogo", + "clulogo", + "cpulogo", + "cuflgo", + "CUNF", + "cunflogo", + "cuulogo", "delenv", + "delogo", "devel", + "dflogo", + "dmlogo", "dotenv", "fastapi", "filterwarnings", @@ -23,6 +42,7 @@ "fullchain", "gitea", "iptv", + "lclogo", "LETSENCRYPT", "nohup", "ondelete", @@ -42,10 +62,14 @@ "sqlalchemy", "starlette", "stefano", + "testadmin", "testdb", "testpass", "testpaths", + "uflogo", + "umlogo", "uvicorn", - "venv" + "venv", + "wrongpass" ] } \ No newline at end of file diff --git a/tests/routers/test_auth.py b/tests/routers/test_auth.py new file mode 100644 index 0000000..a88513b --- /dev/null +++ b/tests/routers/test_auth.py @@ -0,0 +1,107 @@ +from unittest.mock import patch +import pytest +from fastapi.testclient import TestClient +from fastapi import HTTPException, status +from app.main import app + +client = TestClient(app) + +@pytest.fixture +def mock_successful_auth(): + return { + "AccessToken": "mock_access_token", + "IdToken": "mock_id_token", + "RefreshToken": "mock_refresh_token" + } + +@pytest.fixture +def mock_successful_auth_no_refresh(): + return { + "AccessToken": "mock_access_token", + "IdToken": "mock_id_token" + } + +def test_signin_success(mock_successful_auth): + """Test successful signin with all tokens""" + with patch('app.routers.auth.initiate_auth', return_value=mock_successful_auth): + response = client.post( + "/auth/signin", + json={"username": "testuser", "password": "testpass"} + ) + + assert response.status_code == 200 + data = response.json() + assert data["access_token"] == "mock_access_token" + assert data["id_token"] == "mock_id_token" + assert data["refresh_token"] == "mock_refresh_token" + assert data["token_type"] == "Bearer" + +def test_signin_success_no_refresh(mock_successful_auth_no_refresh): + """Test successful signin without refresh token""" + with patch('app.routers.auth.initiate_auth', return_value=mock_successful_auth_no_refresh): + response = client.post( + "/auth/signin", + json={"username": "testuser", "password": "testpass"} + ) + + assert response.status_code == 200 + data = response.json() + assert data["access_token"] == "mock_access_token" + assert data["id_token"] == "mock_id_token" + assert data["refresh_token"] is None + assert data["token_type"] == "Bearer" + +def test_signin_invalid_input(): + """Test signin with invalid input format""" + # Missing password + response = client.post( + "/auth/signin", + json={"username": "testuser"} + ) + assert response.status_code == 422 + + # Missing username + response = client.post( + "/auth/signin", + json={"password": "testpass"} + ) + assert response.status_code == 422 + + # Empty payload + response = client.post( + "/auth/signin", + json={} + ) + assert response.status_code == 422 + +def test_signin_auth_failure(): + """Test signin with authentication failure""" + with patch('app.routers.auth.initiate_auth') as mock_auth: + mock_auth.side_effect = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid username or password" + ) + response = client.post( + "/auth/signin", + json={"username": "testuser", "password": "wrongpass"} + ) + + assert response.status_code == 401 + data = response.json() + assert data["detail"] == "Invalid username or password" + +def test_signin_user_not_found(): + """Test signin with non-existent user""" + with patch('app.routers.auth.initiate_auth') as mock_auth: + mock_auth.side_effect = HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found" + ) + response = client.post( + "/auth/signin", + json={"username": "nonexistent", "password": "testpass"} + ) + + assert response.status_code == 404 + data = response.json() + assert data["detail"] == "User not found" \ No newline at end of file diff --git a/tests/routers/test_channels.py b/tests/routers/test_channels.py new file mode 100644 index 0000000..6ac11a9 --- /dev/null +++ b/tests/routers/test_channels.py @@ -0,0 +1,815 @@ +import pytest +import uuid +from fastapi.testclient import TestClient +from fastapi import FastAPI, status +from sqlalchemy import String +from sqlalchemy.orm import Session + +from app.auth.dependencies import get_current_user +from app.routers.channels import router as channels_router +from app.models.auth import CognitoUser +from app.utils.database import get_db + +# Import mocks from db_mocks +from tests.utils.db_mocks import ( + MockBase, + engine_mock, + session_mock as TestingSessionLocal, + mock_get_db, + MockChannelDB, + MockChannelURL, + MockPriority +) + +# Create a FastAPI instance for testing +app = FastAPI() + +# Mock current user +def mock_get_current_user_admin(): + return CognitoUser( + username="testadmin", + email="testadmin@example.com", + roles=["admin"], + user_status="CONFIRMED", + enabled=True + ) + +def mock_get_current_user_non_admin(): + return CognitoUser( + username="testuser", + email="testuser@example.com", + roles=["user"], # Or any role other than admin + user_status="CONFIRMED", + enabled=True + ) + +# Override dependencies +app.dependency_overrides[get_db] = mock_get_db +app.include_router(channels_router) + +client = TestClient(app) + +@pytest.fixture(scope="function") +def db_session(): + # Create tables for each test function + MockBase.metadata.create_all(bind=engine_mock) + db = TestingSessionLocal() + try: + yield db + finally: + db.close() + # Drop tables after each test function + MockBase.metadata.drop_all(bind=engine_mock) + +@pytest.fixture(scope="function") +def admin_user_client(db_session: Session): + """Yields a TestClient configured with an admin user.""" + test_app = FastAPI() + test_app.include_router(channels_router) + test_app.dependency_overrides[get_db] = mock_get_db + test_app.dependency_overrides[get_current_user] = mock_get_current_user_admin + with TestClient(test_app) as test_client: + yield test_client + +@pytest.fixture(scope="function") +def non_admin_user_client(db_session: Session): + """Yields a TestClient configured with a non-admin user.""" + test_app = FastAPI() + test_app.include_router(channels_router) + test_app.dependency_overrides[get_db] = mock_get_db + test_app.dependency_overrides[get_current_user] = mock_get_current_user_non_admin + with TestClient(test_app) as test_client: + yield test_client + +# --- Test Cases For Channel Creation --- + +def test_create_channel_success(db_session: Session, admin_user_client: TestClient): + # Setup a priority + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + + channel_data = { + "tvg_id": "channel1.tv", + "name": "Test Channel 1", + "group_title": "Test Group", + "tvg_name": "TestChannel1", + "tvg_logo": "logo.png", + "urls": [ + {"url": "http://stream1.com/test", "priority_id": 100} + ] + } + response = admin_user_client.post("/channels/", json=channel_data) # No headers needed now + assert response.status_code == status.HTTP_201_CREATED + data = response.json() + assert data["name"] == "Test Channel 1" + assert data["group_title"] == "Test Group" + assert data["tvg_id"] == "channel1.tv" + assert len(data["urls"]) == 1 + assert data["urls"][0]["url"] == "http://stream1.com/test" + assert data["urls"][0]["priority_id"] == 100 + + # Verify in DB + db_channel = db_session.query(MockChannelDB).filter(MockChannelDB.name == "Test Channel 1").first() + assert db_channel is not None + assert db_channel.group_title == "Test Group" + + # Query URLs using exact string comparison + db_urls = db_session.query(MockChannelURL).filter( + MockChannelURL.channel_id.cast(String()) == db_channel.id + ).all() + + assert len(db_urls) == 1 + assert db_urls[0].url == "http://stream1.com/test" + +def test_create_channel_duplicate(db_session: Session, admin_user_client: TestClient): + # Setup a priority + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + + # Create initial channel + initial_channel_data = { + "tvg_id": "channel_dup.tv", + "name": "Duplicate Channel", + "group_title": "Duplicate Group", + "tvg_name": "DuplicateChannelName", + "tvg_logo": "duplicate_logo.png", + "urls": [{"url": "http://stream_dup.com/test", "priority_id": 100}] + } + response1 = admin_user_client.post("/channels/", json=initial_channel_data) + assert response1.status_code == status.HTTP_201_CREATED + + # Attempt to create the same channel again + response2 = admin_user_client.post("/channels/", json=initial_channel_data) + assert response2.status_code == status.HTTP_409_CONFLICT + assert "already exists" in response2.json()["detail"] + +def test_create_channel_forbidden_for_non_admin(db_session: Session, non_admin_user_client: TestClient): + # Setup a priority + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + + channel_data = { + "tvg_id": "channel_forbidden.tv", + "name": "Forbidden Channel", + "group_title": "Forbidden Group", + "tvg_name": "ForbiddenChannelName", + "tvg_logo": "forbidden_logo.png", + "urls": [{"url": "http://stream_forbidden.com/test", "priority_id": 100}] + } + response = non_admin_user_client.post("/channels/", json=channel_data) + assert response.status_code == status.HTTP_403_FORBIDDEN + assert "required roles" in response.json()["detail"] + +# --- Test Cases For Get Channel --- + +def test_get_channel_success(db_session: Session, admin_user_client: TestClient): + # Setup a priority + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + + # Create a channel first + channel_data_create = { + "tvg_id": "get_me.tv", + "name": "Get Me Channel", + "group_title": "Get Group", + "tvg_name": "GetMeChannelName", + "tvg_logo": "get_me_logo.png", + "urls": [{"url": "http://get_me.com/stream", "priority_id": 100}] + } + create_response = admin_user_client.post("/channels/", json=channel_data_create) + assert create_response.status_code == status.HTTP_201_CREATED + created_channel_id = create_response.json()["id"] + + app.dependency_overrides[get_current_user] = mock_get_current_user_admin # Or a generic authenticated user + get_response = admin_user_client.get(f"/channels/{created_channel_id}") + assert get_response.status_code == status.HTTP_200_OK + data = get_response.json() + assert data["id"] == created_channel_id + assert data["name"] == "Get Me Channel" + assert data["group_title"] == "Get Group" + assert len(data["urls"]) == 1 + app.dependency_overrides.pop(get_current_user, None) + + +def test_get_channel_not_found(db_session: Session, admin_user_client: TestClient): + app.dependency_overrides[get_current_user] = mock_get_current_user_admin + random_uuid = uuid.uuid4() + response = admin_user_client.get(f"/channels/{random_uuid}") + assert response.status_code == status.HTTP_404_NOT_FOUND + assert "Channel not found" in response.json()["detail"] + app.dependency_overrides.pop(get_current_user, None) + +# --- Test Cases For Update Channel --- + +def test_update_channel_success(db_session: Session, admin_user_client: TestClient): + # Setup priority and create initial channel + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + + initial_channel_data = { + "tvg_id": "update_me.tv", + "name": "Update Me Channel", + "group_title": "Update Group", + "tvg_name": "UpdateMeChannelName", + "tvg_logo": "update_me_logo.png", + "urls": [{"url": "http://update_me.com/stream", "priority_id": 100}] + } + create_response = admin_user_client.post("/channels/", json=initial_channel_data) + assert create_response.status_code == status.HTTP_201_CREATED + created_channel_id = create_response.json()["id"] + + update_data = { + "name": "Updated Channel Name", + "tvg_logo": "new_logo.png" + } + response = admin_user_client.put(f"/channels/{created_channel_id}", json=update_data) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["id"] == created_channel_id + assert data["name"] == "Updated Channel Name" + assert data["group_title"] == "Update Group" + assert data["tvg_logo"] == "new_logo.png" + + # Verify in DB + db_channel = db_session.query(MockChannelDB).filter(MockChannelDB.id.cast(String()) == uuid.UUID(created_channel_id)).first() + assert db_channel is not None + assert db_channel.name == "Updated Channel Name" + assert db_channel.tvg_logo == "new_logo.png" + +def test_update_channel_conflict(db_session: Session, admin_user_client: TestClient): + # Setup priority + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + + # Create channel 1 + channel1_data = { + "tvg_id": "c1.tv", "name": "Channel One", "group_title": "Group A", + "tvg_name": "C1Name", "tvg_logo": "c1logo.png", + "urls": [{"url": "http://c1.com", "priority_id": 100}] + } + admin_user_client.post("/channels/", json=channel1_data) + + # Create channel 2 + channel2_data = { + "tvg_id": "c2.tv", "name": "Channel Two", "group_title": "Group B", + "tvg_name": "C2Name", "tvg_logo": "c2logo.png", + "urls": [{"url": "http://c2.com", "priority_id": 100}] + } + response_c2 = admin_user_client.post("/channels/", json=channel2_data) + channel2_id = response_c2.json()["id"] + + # Attempt to update channel 2 to conflict with channel 1 + update_conflict_data = {"name": "Channel One", "group_title": "Group A"} + response = admin_user_client.put(f"/channels/{channel2_id}", json=update_conflict_data) + assert response.status_code == status.HTTP_409_CONFLICT + assert "already exists" in response.json()["detail"] + +def test_update_channel_not_found(db_session: Session, admin_user_client: TestClient): + random_uuid = uuid.uuid4() + update_data = {"name": "Non Existent Update"} + response = admin_user_client.put(f"/channels/{random_uuid}", json=update_data) + assert response.status_code == status.HTTP_404_NOT_FOUND + assert "Channel not found" in response.json()["detail"] + +def test_update_channel_forbidden_for_non_admin(db_session: Session, non_admin_user_client: TestClient, admin_user_client: TestClient): + # Setup priority and create initial channel with admin + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + initial_channel_data = { + "tvg_id": "update_forbidden.tv", "name": "Update Forbidden", "group_title": "Forbidden Update Group", + "tvg_name": "UFName", "tvg_logo": "uflogo.png", + "urls": [{"url": "http://update_forbidden.com", "priority_id": 100}] + } + create_response = admin_user_client.post("/channels/", json=initial_channel_data) + created_channel_id = create_response.json()["id"] + + update_data = {"name": "Attempted Update"} + response = non_admin_user_client.put(f"/channels/{created_channel_id}", json=update_data) + assert response.status_code == status.HTTP_403_FORBIDDEN + assert "required roles" in response.json()["detail"] +# --- Test Cases For Delete Channel --- + +def test_delete_channel_success(db_session: Session, admin_user_client: TestClient): + # Setup priority and create initial channel + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + + initial_channel_data = { + "tvg_id": "delete_me.tv", + "name": "Delete Me Channel", + "group_title": "Delete Group", + "tvg_name": "DMName", "tvg_logo": "dmlogo.png", + "urls": [{"url": "http://delete_me.com/stream", "priority_id": 100}] + } + create_response = admin_user_client.post("/channels/", json=initial_channel_data) + assert create_response.status_code == status.HTTP_201_CREATED + created_channel_id = create_response.json()["id"] + + # Verify it exists before delete + db_channel_before_delete = db_session.query(MockChannelDB).filter(MockChannelDB.id.cast(String()) == uuid.UUID(created_channel_id)).first() + assert db_channel_before_delete is not None + + delete_response = admin_user_client.delete(f"/channels/{created_channel_id}") + assert delete_response.status_code == status.HTTP_204_NO_CONTENT + + # Verify it's gone from DB + db_channel_after_delete = db_session.query(MockChannelDB).filter(MockChannelDB.id.cast(String()) == uuid.UUID(created_channel_id)).first() + assert db_channel_after_delete is None + + # Also verify associated URLs are deleted (due to CASCADE in mock model) + db_urls_after_delete = db_session.query(MockChannelURL).filter(MockChannelURL.channel_id.cast(String()) == uuid.UUID(created_channel_id)).all() + assert len(db_urls_after_delete) == 0 + + +def test_delete_channel_not_found(db_session: Session, admin_user_client: TestClient): + random_uuid = uuid.uuid4() + response = admin_user_client.delete(f"/channels/{random_uuid}") + assert response.status_code == status.HTTP_404_NOT_FOUND + assert "Channel not found" in response.json()["detail"] + +def test_delete_channel_forbidden_for_non_admin(db_session: Session, non_admin_user_client: TestClient, admin_user_client: TestClient): + # Setup priority and create initial channel with admin + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + initial_channel_data = { + "tvg_id": "delete_forbidden.tv", "name": "Delete Forbidden", "group_title": "Forbidden Delete Group", + "tvg_name": "DFName", "tvg_logo": "dflogo.png", + "urls": [{"url": "http://delete_forbidden.com", "priority_id": 100}] + } + create_response = admin_user_client.post("/channels/", json=initial_channel_data) + created_channel_id = create_response.json()["id"] + + response = non_admin_user_client.delete(f"/channels/{created_channel_id}") + assert response.status_code == status.HTTP_403_FORBIDDEN + assert "required roles" in response.json()["detail"] + + # Ensure channel was not deleted + db_channel_not_deleted = db_session.query(MockChannelDB).filter(MockChannelDB.id.cast(String()) == uuid.UUID(created_channel_id)).first() + assert db_channel_not_deleted is not None +# --- Test Cases For List Channels --- + +def test_list_channels_empty(db_session: Session, admin_user_client: TestClient): + response = admin_user_client.get("/channels/") + assert response.status_code == status.HTTP_200_OK + assert response.json() == [] + +def test_list_channels_with_data_and_pagination(db_session: Session, admin_user_client: TestClient): + # Setup priority + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + + # Create some channels + for i in range(5): + channel_data = { + "tvg_id": f"list_c{i}.tv", "name": f"List Channel {i}", "group_title": "List Group", + "tvg_name": f"LCName{i}", "tvg_logo": f"lclogo{i}.png", + "urls": [{"url": f"http://list_c{i}.com", "priority_id": 100}] + } + admin_user_client.post("/channels/", json=channel_data) + + # Test default pagination (limit 100) + response_all = admin_user_client.get("/channels/") + assert response_all.status_code == status.HTTP_200_OK + data_all = response_all.json() + assert len(data_all) == 5 + assert data_all[0]["name"] == "List Channel 0" + + # Test limit + response_limit = admin_user_client.get("/channels/?limit=2") + assert response_limit.status_code == status.HTTP_200_OK + data_limit = response_limit.json() + assert len(data_limit) == 2 + assert data_limit[0]["name"] == "List Channel 0" + assert data_limit[1]["name"] == "List Channel 1" + + # Test skip and limit + response_skip_limit = admin_user_client.get("/channels/?skip=2&limit=2") + assert response_skip_limit.status_code == status.HTTP_200_OK + data_skip_limit = response_skip_limit.json() + assert len(data_skip_limit) == 2 + assert data_skip_limit[0]["name"] == "List Channel 2" + assert data_skip_limit[1]["name"] == "List Channel 3" + + # Test skip beyond data + response_skip_beyond = admin_user_client.get("/channels/?skip=10") + assert response_skip_beyond.status_code == status.HTTP_200_OK + assert response_skip_beyond.json() == [] + + +def test_list_channels_forbidden_for_non_admin(db_session: Session, non_admin_user_client: TestClient): + response = non_admin_user_client.get("/channels/") + assert response.status_code == status.HTTP_403_FORBIDDEN + assert "required roles" in response.json()["detail"] +# --- Test Cases For Add Channel URL --- + +def test_add_channel_url_success(db_session: Session, admin_user_client: TestClient): + # Setup priority and create a channel + priority1 = MockPriority(id=100, description="High") + priority2 = MockPriority(id=200, description="Medium") + db_session.add_all([priority1, priority2]) + db_session.commit() + + channel_data = { + "tvg_id": "channel_for_url.tv", "name": "Channel For URL", "group_title": "URL Group", + "tvg_name": "CFUName", "tvg_logo": "cfulogo.png", + "urls": [{"url": "http://initial.com/stream", "priority_id": 100}] + } + create_response = admin_user_client.post("/channels/", json=channel_data) + assert create_response.status_code == status.HTTP_201_CREATED + created_channel_id = create_response.json()["id"] + + url_data = {"url": "http://new_stream.com/live", "priority_id": 200} + response = admin_user_client.post(f"/channels/{created_channel_id}/urls", json=url_data) + assert response.status_code == status.HTTP_201_CREATED + data = response.json() + assert data["url"] == "http://new_stream.com/live" + assert data["priority_id"] == 200 + # assert data["channel_id"] == created_channel_id # ChannelURLResponse does not include channel_id + assert data["in_use"] is False # Default + + # Verify in DB + db_url = db_session.query(MockChannelURL).filter(MockChannelURL.id.cast(String()) == uuid.UUID(data["id"])).first() + assert db_url is not None + assert db_url.url == "http://new_stream.com/live" + assert db_url.priority_id == 200 + assert db_url.channel_id == uuid.UUID(created_channel_id) + + # Check the channel now has two URLs + # Re-fetch channel to get updated URLs list + db_session.expire_all() # Expire to ensure fresh data from DB if ChannelResponse is not dynamic + + # Let's verify by querying the database directly for the count of URLs for the channel + url_count = db_session.query(MockChannelURL).filter(MockChannelURL.channel_id.cast(String()) == uuid.UUID(created_channel_id)).count() + assert url_count == 2 + + # And also check the response from get_channel + channel_get_response = admin_user_client.get(f"/channels/{created_channel_id}") + assert channel_get_response.status_code == status.HTTP_200_OK + channel_details = channel_get_response.json() + assert len(channel_details["urls"]) == 2 + + +def test_add_channel_url_channel_not_found(db_session: Session, admin_user_client: TestClient): + # Setup priority + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + + random_channel_uuid = uuid.uuid4() + url_data = {"url": "http://stream_no_channel.com", "priority_id": 100} + response = admin_user_client.post(f"/channels/{random_channel_uuid}/urls", json=url_data) + assert response.status_code == status.HTTP_404_NOT_FOUND + assert "Channel not found" in response.json()["detail"] + +def test_add_channel_url_forbidden_for_non_admin(db_session: Session, non_admin_user_client: TestClient, admin_user_client: TestClient): + # Setup priority and create a channel with admin + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + channel_data = { + "tvg_id": "url_forbidden.tv", "name": "URL Forbidden", "group_title": "URL Forbidden Group", + "tvg_name": "UFName2", "tvg_logo": "uflogo2.png", + "urls": [{"url": "http://url_forbidden.com", "priority_id": 100}] + } + create_response = admin_user_client.post("/channels/", json=channel_data) + created_channel_id = create_response.json()["id"] + + url_data = {"url": "http://new_stream_forbidden.com", "priority_id": 100} + response = non_admin_user_client.post(f"/channels/{created_channel_id}/urls", json=url_data) + assert response.status_code == status.HTTP_403_FORBIDDEN + assert "required roles" in response.json()["detail"] + +# --- Test Cases For Update Channel URL --- + +def test_update_channel_url_success(db_session: Session, admin_user_client: TestClient): + # Setup priorities and create a channel with a URL + priority1 = MockPriority(id=100, description="High") + priority2 = MockPriority(id=200, description="Medium") + priority3 = MockPriority(id=300, description="Low") # New priority for update, Use valid priority ID + db_session.add_all([priority1, priority2, priority3]) + db_session.commit() + + channel_data = { + "tvg_id": "ch_update_url.tv", "name": "Channel Update URL", "group_title": "URL Update Group", + "tvg_name": "CUUName", "tvg_logo": "cuulogo.png", + "urls": [{"url": "http://original_url.com/stream", "priority_id": 100}] + } + create_ch_response = admin_user_client.post("/channels/", json=channel_data) + created_channel_id = create_ch_response.json()["id"] + # Get the ID of the initially created URL + initial_url_id = create_ch_response.json()["urls"][0]["id"] + + + update_url_data = { + "url": "http://updated_url.com/live", + "priority_id": 300, + "in_use": True + } + response = admin_user_client.put(f"/channels/{created_channel_id}/urls/{initial_url_id}", json=update_url_data) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["id"] == initial_url_id + assert data["url"] == "http://updated_url.com/live" + assert data["priority_id"] == 300 + assert data["in_use"] is True + + # Verify in DB + db_url = db_session.query(MockChannelURL).filter(MockChannelURL.id.cast(String()) == uuid.UUID(initial_url_id)).first() + assert db_url is not None + assert db_url.url == "http://updated_url.com/live" + assert db_url.priority_id == 300 + assert db_url.in_use is True + +def test_update_channel_url_partial_success(db_session: Session, admin_user_client: TestClient): + # Setup priorities and create a channel with a URL + priority1 = MockPriority(id=100, description="High") + db_session.add_all([priority1]) + db_session.commit() + + channel_data = { + "tvg_id": "ch_partial_update_url.tv", "name": "Channel Partial Update URL", "group_title": "URL Partial Update Group", + "tvg_name": "CPUName", "tvg_logo": "cpulogo.png", + "urls": [{"url": "http://partial_original.com/stream", "priority_id": 100}] + } + create_ch_response = admin_user_client.post("/channels/", json=channel_data) + created_channel_id = create_ch_response.json()["id"] + initial_url_id = create_ch_response.json()["urls"][0]["id"] + + # Update only 'in_use' + update_url_data = {"in_use": True} + response = admin_user_client.put(f"/channels/{created_channel_id}/urls/{initial_url_id}", json=update_url_data) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["id"] == initial_url_id + assert data["url"] == "http://partial_original.com/stream" + assert data["priority_id"] == 100 + assert data["in_use"] is True + + # Verify in DB + db_url = db_session.query(MockChannelURL).filter(MockChannelURL.id.cast(String()) == uuid.UUID(initial_url_id)).first() + assert db_url is not None + assert db_url.in_use is True + assert db_url.url == "http://partial_original.com/stream" + assert db_url.priority_id == 100 + + +def test_update_channel_url_url_not_found(db_session: Session, admin_user_client: TestClient): + # Setup priority and create a channel + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + channel_data = { + "tvg_id": "ch_url_not_found.tv", "name": "Channel URL Not Found", "group_title": "URL Not Found Group", + "tvg_name": "CUNFName", "tvg_logo": "cunflogo.png", + "urls": [] + } + create_ch_response = admin_user_client.post("/channels/", json=channel_data) + created_channel_id = create_ch_response.json()["id"] + + random_url_uuid = uuid.uuid4() + update_data = {"url": "http://does_not_matter.com"} + response = admin_user_client.put(f"/channels/{created_channel_id}/urls/{random_url_uuid}", json=update_data) + assert response.status_code == status.HTTP_404_NOT_FOUND + assert "URL not found" in response.json()["detail"] + +def test_update_channel_url_channel_id_mismatch_is_url_not_found(db_session: Session, admin_user_client: TestClient): + # This tests if a URL ID exists but is not associated with the given channel_id in the path + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + + # Create channel 1 with a URL + ch1_data = {"tvg_id": "ch1_url_mismatch.tv", "name": "CH1 URL Mismatch", "group_title": "G1", "tvg_name":"C1UMName", "tvg_logo":"c1umlogo.png", "urls": [{"url":"http://ch1.url", "priority_id":100}]} + ch1_resp = admin_user_client.post("/channels/", json=ch1_data) + url_id_from_ch1 = ch1_resp.json()["urls"][0]["id"] + + # Create channel 2 + ch2_data = {"tvg_id": "ch2_url_mismatch.tv", "name": "CH2 URL Mismatch", "group_title": "G2", "tvg_name":"C2UMName", "tvg_logo":"c2umlogo.png", "urls": []} # priority_id not needed here + ch2_resp = admin_user_client.post("/channels/", json=ch2_data) + ch2_id = ch2_resp.json()["id"] + + # Try to update URL from CH1 using CH2's ID in path + update_data = {"url": "http://mismatch_update.com"} + response = admin_user_client.put(f"/channels/{ch2_id}/urls/{url_id_from_ch1}", json=update_data) + assert response.status_code == status.HTTP_404_NOT_FOUND + assert "URL not found" in response.json()["detail"] + + +def test_update_channel_url_forbidden_for_non_admin(db_session: Session, non_admin_user_client: TestClient, admin_user_client: TestClient): + # Setup priority and create channel with URL using admin + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + channel_data = { + "tvg_id": "ch_update_url_forbidden.tv", "name": "Channel Update URL Forbidden", "group_title": "URL Update Forbidden Group", + "tvg_name": "CUFName", "tvg_logo": "cuflgo.png", + "urls": [{"url": "http://original_forbidden.com/stream", "priority_id": 100}] + } + create_ch_response = admin_user_client.post("/channels/", json=channel_data) + created_channel_id = create_ch_response.json()["id"] + initial_url_id = create_ch_response.json()["urls"][0]["id"] + + update_url_data = {"url": "http://attempted_update_forbidden.com"} + response = non_admin_user_client.put(f"/channels/{created_channel_id}/urls/{initial_url_id}", json=update_url_data) + assert response.status_code == status.HTTP_403_FORBIDDEN + assert "required roles" in response.json()["detail"] + +# --- Test Cases For Delete Channel URL --- + +def test_delete_channel_url_success(db_session: Session, admin_user_client: TestClient): + # Setup priority and create a channel with a URL + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + + channel_data = { + "tvg_id": "ch_delete_url.tv", "name": "Channel Delete URL", "group_title": "URL Delete Group", + "tvg_name": "CDUName", "tvg_logo": "cdulogo.png", + "urls": [{"url": "http://delete_this_url.com/stream", "priority_id": 100}] + } + create_ch_response = admin_user_client.post("/channels/", json=channel_data) + created_channel_id = create_ch_response.json()["id"] + url_to_delete_id = create_ch_response.json()["urls"][0]["id"] + + # Verify URL exists before delete + db_url_before = db_session.query(MockChannelURL).filter(MockChannelURL.id.cast(String()) == uuid.UUID(url_to_delete_id)).first() + assert db_url_before is not None + + delete_response = admin_user_client.delete(f"/channels/{created_channel_id}/urls/{url_to_delete_id}") + assert delete_response.status_code == status.HTTP_204_NO_CONTENT + + # Verify URL is gone from DB + db_url_after = db_session.query(MockChannelURL).filter(MockChannelURL.id.cast(String()) == uuid.UUID(url_to_delete_id)).first() + assert db_url_after is None + + # Verify channel still exists and has no URLs + channel_response = admin_user_client.get(f"/channels/{created_channel_id}") + assert channel_response.status_code == status.HTTP_200_OK + assert len(channel_response.json()["urls"]) == 0 + + +def test_delete_channel_url_url_not_found(db_session: Session, admin_user_client: TestClient): + # Setup priority and create a channel + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + channel_data = { + "tvg_id": "ch_del_url_not_found.tv", "name": "Channel Del URL Not Found", "group_title": "URL Del Not Found Group", + "tvg_name": "CDUNFName", "tvg_logo": "cdunflogo.png", + "urls": [] + } + create_ch_response = admin_user_client.post("/channels/", json=channel_data) + created_channel_id = create_ch_response.json()["id"] + + random_url_uuid = uuid.uuid4() + response = admin_user_client.delete(f"/channels/{created_channel_id}/urls/{random_url_uuid}") + assert response.status_code == status.HTTP_404_NOT_FOUND + assert "URL not found" in response.json()["detail"] + +def test_delete_channel_url_channel_id_mismatch_is_url_not_found(db_session: Session, admin_user_client: TestClient): + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + + # Create channel 1 with a URL + ch1_data = { + "name": "CH1 Del URL Mismatch", + "tvg_id": "ch1_del_url_mismatch.tv", + "tvg_name": "CH1 Del URL Mismatch", + "tvg_logo": "ch1delogo.png", + "group_title": "G1Del", + "urls": [{"url":"http://ch1del.url", "priority_id":100}]} + ch1_resp = admin_user_client.post("/channels/", json=ch1_data) + print(ch1_resp.json()) + url_id_from_ch1 = ch1_resp.json()["urls"][0]["id"] + + # Create channel 2 + ch2_data = { + "tvg_id": "ch2_del_url_mismatch.tv", + "name": "CH2 Del URL Mismatch", + "tvg_name": "CH2 Del URL Mismatch", + "tvg_logo": "ch2delogo.png", + "group_title": "G2Del", + "urls": [] + } + ch2_resp = admin_user_client.post("/channels/", json=ch2_data) + ch2_id = ch2_resp.json()["id"] + + # Try to delete URL from CH1 using CH2's ID in path + response = admin_user_client.delete(f"/channels/{ch2_id}/urls/{url_id_from_ch1}") + assert response.status_code == status.HTTP_404_NOT_FOUND + assert "URL not found" in response.json()["detail"] + + # Ensure the original URL on CH1 was not deleted + db_url_ch1 = db_session.query(MockChannelURL).filter(MockChannelURL.id.cast(String()) == uuid.UUID(url_id_from_ch1)).first() + assert db_url_ch1 is not None + + +def test_delete_channel_url_forbidden_for_non_admin(db_session: Session, non_admin_user_client: TestClient, admin_user_client: TestClient): + # Setup priority and create channel with URL using admin + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + channel_data = { + "tvg_id": "ch_del_url_forbidden.tv", "name": "Channel Del URL Forbidden", "group_title": "URL Del Forbidden Group", + "tvg_name": "CDUFName", "tvg_logo": "cduflogo.png", + "urls": [{"url": "http://original_del_forbidden.com/stream", "priority_id": 100}] + } + create_ch_response = admin_user_client.post("/channels/", json=channel_data) + created_channel_id = create_ch_response.json()["id"] + initial_url_id = create_ch_response.json()["urls"][0]["id"] + + response = non_admin_user_client.delete(f"/channels/{created_channel_id}/urls/{initial_url_id}") + assert response.status_code == status.HTTP_403_FORBIDDEN + assert "required roles" in response.json()["detail"] + + # Ensure URL was not deleted + db_url_not_deleted = db_session.query(MockChannelURL).filter(MockChannelURL.id.cast(String()) == uuid.UUID(initial_url_id)).first() + assert db_url_not_deleted is not None + +# --- Test Cases For List Channel URLs --- + +def test_list_channel_urls_success(db_session: Session, admin_user_client: TestClient): + # Setup priorities and create a channel with multiple URLs + priority1 = MockPriority(id=100, description="High") + priority2 = MockPriority(id=200, description="Medium") + db_session.add_all([priority1, priority2]) + db_session.commit() + + channel_data = { + "tvg_id": "ch_list_urls.tv", "name": "Channel List URLs", "group_title": "URL List Group", + "tvg_name": "CLUName", "tvg_logo": "clulogo.png", + "urls": [ + {"url": "http://list_url1.com/stream", "priority_id": 100}, + {"url": "http://list_url2.com/live", "priority_id": 200} + ] + } + create_ch_response = admin_user_client.post("/channels/", json=channel_data) + created_channel_id = create_ch_response.json()["id"] + + # URLs are added during channel creation, let's get their IDs for assertion if needed + # For now, we'll just check the count and content based on what was provided. + + response = admin_user_client.get(f"/channels/{created_channel_id}/urls") + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert len(data) == 2 + + # Check if the URLs returned match what we expect (order might not be guaranteed by default) + returned_urls_set = {(item["url"], item["priority_id"]) for item in data} + expected_urls_set = { + ("http://list_url1.com/stream", 100), + ("http://list_url2.com/live", 200) + } + assert returned_urls_set == expected_urls_set + +def test_list_channel_urls_empty(db_session: Session, admin_user_client: TestClient): + # Create a channel with no URLs initially + # No need to set up MockPriority if no URLs with priority_id are being created. + channel_data = { + "tvg_id": "ch_list_empty_urls.tv", "name": "Channel List Empty URLs", "group_title": "URL List Empty Group", + "tvg_name": "CLEUName", "tvg_logo": "cleulogo.png", + "urls": [] + } + create_ch_response = admin_user_client.post("/channels/", json=channel_data) + created_channel_id = create_ch_response.json()["id"] + + response = admin_user_client.get(f"/channels/{created_channel_id}/urls") + assert response.status_code == status.HTTP_200_OK + assert response.json() == [] + +def test_list_channel_urls_channel_not_found(db_session: Session, admin_user_client: TestClient): + random_channel_uuid = uuid.uuid4() + response = admin_user_client.get(f"/channels/{random_channel_uuid}/urls") + assert response.status_code == status.HTTP_404_NOT_FOUND + assert "Channel not found" in response.json()["detail"] + +def test_list_channel_urls_forbidden_for_non_admin(db_session: Session, non_admin_user_client: TestClient, admin_user_client: TestClient): + # Setup priority and create channel with admin + priority1 = MockPriority(id=100, description="High") + db_session.add(priority1) + db_session.commit() + channel_data = { + "tvg_id": "ch_list_url_forbidden.tv", "name": "Channel List URL Forbidden", "group_title": "URL List Forbidden Group", + "tvg_name": "CLUFName", "tvg_logo": "cluflogo.png", + "urls": [{"url": "http://list_url_forbidden.com", "priority_id": 100}] + } + create_ch_response = admin_user_client.post("/channels/", json=channel_data) + created_channel_id = create_ch_response.json()["id"] + + response = non_admin_user_client.get(f"/channels/{created_channel_id}/urls") + assert response.status_code == status.HTTP_403_FORBIDDEN + assert "required roles" in response.json()["detail"] + + assert response.status_code == status.HTTP_403_FORBIDDEN + assert "required roles" in response.json()["detail"] diff --git a/tests/utils/db_mocks.py b/tests/utils/db_mocks.py new file mode 100644 index 0000000..39de62c --- /dev/null +++ b/tests/utils/db_mocks.py @@ -0,0 +1,95 @@ +import os +import uuid +from datetime import datetime, timezone +from unittest.mock import patch, MagicMock +from sqlalchemy.pool import StaticPool +from sqlalchemy.orm import Session, sessionmaker, declarative_base +from sqlalchemy import create_engine, TypeDecorator, TEXT, Column, String, DateTime, UniqueConstraint, ForeignKey, Boolean, Integer +import pytest + +# Create a mock-specific Base class for testing +MockBase = declarative_base() + +class SQLiteUUID(TypeDecorator): + """Enables UUID support for SQLite.""" + impl = TEXT + cache_ok = True + + def process_bind_param(self, value, dialect): + if value is None: + return value + return str(value) + + def process_result_value(self, value, dialect): + if value is None: + return value + return uuid.UUID(value) + +# Model classes for testing - prefix with Mock to avoid pytest collection +class MockPriority(MockBase): + __tablename__ = "priorities" + id = Column(Integer, primary_key=True) + description = Column(String, nullable=False) + +class MockChannelDB(MockBase): + __tablename__ = "channels" + id = Column(SQLiteUUID(), primary_key=True, default=uuid.uuid4) + tvg_id = Column(String, nullable=False) + name = Column(String, nullable=False) + group_title = Column(String, nullable=False) + tvg_name = Column(String) + __table_args__ = ( + UniqueConstraint('group_title', 'name', name='uix_group_title_name'), + ) + tvg_logo = Column(String) + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + +class MockChannelURL(MockBase): + __tablename__ = "channels_urls" + id = Column(SQLiteUUID(), primary_key=True, default=uuid.uuid4) + channel_id = Column(SQLiteUUID(), ForeignKey('channels.id', ondelete='CASCADE'), nullable=False) + url = Column(String, nullable=False) + in_use = Column(Boolean, default=False, nullable=False) + priority_id = Column(Integer, ForeignKey('priorities.id'), nullable=False) + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + +# Create test engine +engine_mock = create_engine( + "sqlite:///:memory:", + connect_args={"check_same_thread": False}, + poolclass=StaticPool +) + +# Create test session +session_mock = sessionmaker(autocommit=False, autoflush=False, bind=engine_mock) + +# Mock the actual database functions +def mock_get_db(): + db = session_mock() + try: + yield db + finally: + db.close() + +@pytest.fixture(autouse=True) +def mock_env(monkeypatch): + """Fixture for mocking environment variables""" + monkeypatch.setenv("MOCK_AUTH", "true") + monkeypatch.setenv("DB_USER", "testuser") + monkeypatch.setenv("DB_PASSWORD", "testpass") + monkeypatch.setenv("DB_HOST", "localhost") + monkeypatch.setenv("DB_NAME", "testdb") + monkeypatch.setenv("AWS_REGION", "us-east-1") + +@pytest.fixture +def mock_ssm(): + """Fixture for mocking boto3 SSM client""" + with patch('boto3.client') as mock_client: + mock_ssm = MagicMock() + mock_client.return_value = mock_ssm + mock_ssm.get_parameter.return_value = { + 'Parameter': {'Value': 'mocked_value'} + } + yield mock_ssm \ No newline at end of file diff --git a/tests/utils/test_database.py b/tests/utils/test_database.py index 164c338..999ab70 100644 --- a/tests/utils/test_database.py +++ b/tests/utils/test_database.py @@ -1,100 +1,15 @@ import os import pytest -import uuid -from datetime import datetime, timezone -from unittest.mock import patch, MagicMock -from sqlalchemy.pool import StaticPool -from sqlalchemy.orm import Session, sessionmaker, declarative_base -from sqlalchemy import create_engine, TypeDecorator, TEXT, Column, String, DateTime, UniqueConstraint, ForeignKey, Boolean, Integer +from unittest.mock import patch +from sqlalchemy.orm import Session from app.utils.database import get_db_credentials, get_db - -# Create a mock-specific Base class for testing -MockBase = declarative_base() - -class SQLiteUUID(TypeDecorator): - """Enables UUID support for SQLite.""" - impl = TEXT - cache_ok = True - - def process_bind_param(self, value, dialect): - if value is None: - return value - return str(value) - - def process_result_value(self, value, dialect): - if value is None: - return value - return uuid.UUID(value) - -# Model classes for testing - prefix with Mock to avoid pytest collection -class MockPriority(MockBase): - __tablename__ = "priorities" - id = Column(Integer, primary_key=True) - description = Column(String, nullable=False) - -class MockChannelDB(MockBase): - __tablename__ = "channels" - id = Column(SQLiteUUID(), primary_key=True, default=uuid.uuid4) - tvg_id = Column(String, nullable=False) - name = Column(String, nullable=False) - group_title = Column(String, nullable=False) - tvg_name = Column(String) - __table_args__ = ( - UniqueConstraint('group_title', 'name', name='uix_group_title_name'), - ) - tvg_logo = Column(String) - created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) - updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) - -class MockChannelURL(MockBase): - __tablename__ = "channels_urls" - id = Column(SQLiteUUID(), primary_key=True, default=uuid.uuid4) - channel_id = Column(SQLiteUUID(), ForeignKey('channels.id', ondelete='CASCADE'), nullable=False) - url = Column(String, nullable=False) - in_use = Column(Boolean, default=False, nullable=False) - priority_id = Column(Integer, ForeignKey('priorities.id'), nullable=False) - created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) - updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) - -# Create test engine -engine_mock = create_engine( - "sqlite:///:memory:", - connect_args={"check_same_thread": False}, - poolclass=StaticPool +from tests.utils.db_mocks import ( + session_mock, + mock_get_db, + mock_env, + mock_ssm ) -# Create test session -session_mock = sessionmaker(autocommit=False, autoflush=False, bind=engine_mock) - -# Mock the actual database functions -def mock_get_db(): - db = session_mock() - try: - yield db - finally: - db.close() - -@pytest.fixture(autouse=True) -def mock_env(monkeypatch): - """Fixture for mocking environment variables""" - monkeypatch.setenv("MOCK_AUTH", "true") - monkeypatch.setenv("DB_USER", "testuser") - monkeypatch.setenv("DB_PASSWORD", "testpass") - monkeypatch.setenv("DB_HOST", "localhost") - monkeypatch.setenv("DB_NAME", "testdb") - monkeypatch.setenv("AWS_REGION", "us-east-1") - -@pytest.fixture -def mock_ssm(): - """Fixture for mocking boto3 SSM client""" - with patch('boto3.client') as mock_client: - mock_ssm = MagicMock() - mock_client.return_value = mock_ssm - mock_ssm.get_parameter.return_value = { - 'Parameter': {'Value': 'mocked_value'} - } - yield mock_ssm - def test_get_db_credentials_env(mock_env): """Test getting DB credentials from environment variables""" conn_str = get_db_credentials()