from datetime import datetime, timezone from unittest.mock import Mock from fastapi import HTTPException, Request, status from app.iptv.scheduler import StreamScheduler from app.routers.scheduler import get_scheduler from app.routers.scheduler import router as scheduler_router from app.utils.database import get_db from tests.routers.mocks import MockScheduler, create_trigger_mock, mock_get_scheduler from tests.utils.auth_test_fixtures import ( admin_user_client, db_session, non_admin_user_client, ) from tests.utils.db_mocks import mock_get_db # Scheduler Health Check Tests def test_scheduler_health_success(admin_user_client, monkeypatch): """ Test case for successful scheduler health check when accessed by an admin user. It mocks the scheduler to be running and have a next scheduled job. """ # Define the expected next run time for the scheduler job. next_run = datetime.now(timezone.utc) # Create a mock job object that simulates an APScheduler job. mock_job = Mock() mock_job.next_run_time = next_run # Mock the `get_job` method to return our mock_job for a specific ID. def mock_get_job(job_id): if job_id == "daily_stream_validation": return mock_job return None # Create a custom mock for `get_scheduler` dependency. async def custom_mock_get_scheduler(request: Request) -> StreamScheduler: return await mock_get_scheduler( request, running=True, get_job=Mock(side_effect=mock_get_job), # Use the custom mock_get_job ) # Include the scheduler router in the test application. admin_user_client.app.include_router(scheduler_router) # Override dependencies for the test. admin_user_client.app.dependency_overrides[get_scheduler] = ( custom_mock_get_scheduler ) admin_user_client.app.dependency_overrides[get_db] = mock_get_db # Make the request to the scheduler health endpoint. response = admin_user_client.get("/scheduler/health") # Assert the response status code and content. assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == "running" assert data["next_run"] == str(next_run) def test_scheduler_health_stopped(admin_user_client, monkeypatch): """ Test case for scheduler health check when the scheduler is in a stopped state. Ensures the API returns the correct status and no next run time. """ # Create a custom mock for `get_scheduler` dependency, # simulating a stopped scheduler. async def custom_mock_get_scheduler(request: Request) -> StreamScheduler: return await mock_get_scheduler( request, running=False, ) # Include the scheduler router in the test application. admin_user_client.app.include_router(scheduler_router) # Override dependencies for the test. admin_user_client.app.dependency_overrides[get_scheduler] = ( custom_mock_get_scheduler ) admin_user_client.app.dependency_overrides[get_db] = mock_get_db # Make the request to the scheduler health endpoint. response = admin_user_client.get("/scheduler/health") # Assert the response status code and content. assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == "stopped" assert data["next_run"] is None def test_scheduler_health_forbidden_for_non_admin(non_admin_user_client, monkeypatch): """ Test case to ensure that non-admin users are forbidden from accessing the scheduler health endpoint. """ # Create a custom mock for `get_scheduler` dependency. async def custom_mock_get_scheduler(request: Request) -> StreamScheduler: return await mock_get_scheduler( request, running=False, ) # Include the scheduler router in the test application. non_admin_user_client.app.include_router(scheduler_router) # Override dependencies for the test. non_admin_user_client.app.dependency_overrides[get_scheduler] = ( custom_mock_get_scheduler ) non_admin_user_client.app.dependency_overrides[get_db] = mock_get_db # Make the request to the scheduler health endpoint. response = non_admin_user_client.get("/scheduler/health") # Assert the response status code and error detail. assert response.status_code == status.HTTP_403_FORBIDDEN assert "required roles" in response.json()["detail"] def test_scheduler_health_check_exception(admin_user_client, monkeypatch): """ Test case for handling exceptions during the scheduler health check. Ensures the API returns a 500 Internal Server Error when an exception occurs. """ # Create a custom mock for `get_scheduler` dependency that raises an exception. async def custom_mock_get_scheduler(request: Request) -> StreamScheduler: return await mock_get_scheduler( request, running=True, get_job=Mock(side_effect=Exception("Test exception")) ) # Include the scheduler router in the test application. admin_user_client.app.include_router(scheduler_router) # Override dependencies for the test. admin_user_client.app.dependency_overrides[get_scheduler] = ( custom_mock_get_scheduler ) admin_user_client.app.dependency_overrides[get_db] = mock_get_db # Make the request to the scheduler health endpoint. response = admin_user_client.get("/scheduler/health") # Assert the response status code and error detail. assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR assert "Failed to check scheduler health" in response.json()["detail"] # Scheduler Trigger Tests def test_trigger_validation_success(admin_user_client, monkeypatch): """ Test case for successful manual triggering of stream validation by an admin user. It verifies that the trigger method is called and the API returns a 202 Accepted status. """ # Use a mutable reference to check if the trigger method was called. triggered_ref = {"value": False} # Initialize a custom mock scheduler. custom_scheduler = MockScheduler(running=True) custom_scheduler.get_job = Mock(return_value=None) # Create a custom mock for `get_scheduler` dependency, # overriding `trigger_manual_validation`. async def custom_mock_get_scheduler(request: Request) -> StreamScheduler: scheduler = await mock_get_scheduler( request, running=True, ) # Replace the actual trigger method with our mock to track calls. scheduler.trigger_manual_validation = create_trigger_mock( triggered_ref=triggered_ref ) return scheduler # Include the scheduler router in the test application. admin_user_client.app.include_router(scheduler_router) # Override dependencies for the test. admin_user_client.app.dependency_overrides[get_scheduler] = ( custom_mock_get_scheduler ) admin_user_client.app.dependency_overrides[get_db] = mock_get_db # Make the request to trigger stream validation. response = admin_user_client.post("/scheduler/trigger") # Assert the response status code, message, and that the trigger was called. assert response.status_code == status.HTTP_202_ACCEPTED assert response.json()["message"] == "Stream validation triggered" assert triggered_ref["value"] is True def test_trigger_validation_forbidden_for_non_admin(non_admin_user_client, monkeypatch): """ Test case to ensure that non-admin users are forbidden from manually triggering stream validation. """ # Create a custom mock for `get_scheduler` dependency. async def custom_mock_get_scheduler(request: Request) -> StreamScheduler: return await mock_get_scheduler( request, running=True, ) # Include the scheduler router in the test application. non_admin_user_client.app.include_router(scheduler_router) # Override dependencies for the test. non_admin_user_client.app.dependency_overrides[get_scheduler] = ( custom_mock_get_scheduler ) non_admin_user_client.app.dependency_overrides[get_db] = mock_get_db # Make the request to trigger stream validation. response = non_admin_user_client.post("/scheduler/trigger") # Assert the response status code and error detail. assert response.status_code == status.HTTP_403_FORBIDDEN assert "required roles" in response.json()["detail"] def test_scheduler_initialized_in_app_state(admin_user_client): """ Test case for when the scheduler is initialized in the app state but its internal scheduler attribute is not set, which should still allow health check. """ scheduler = StreamScheduler() # Set the scheduler instance in the test client's app state. admin_user_client.app.state.scheduler = scheduler # Include the scheduler router in the test application. admin_user_client.app.include_router(scheduler_router) # Override only get_db, allowing the real get_scheduler to be tested. admin_user_client.app.dependency_overrides[get_db] = mock_get_db # Make the request to the scheduler health endpoint. response = admin_user_client.get("/scheduler/health") # Assert the response status code. assert response.status_code == status.HTTP_200_OK def test_scheduler_not_initialized_in_app_state(admin_user_client): """ Test case for when the scheduler is not properly initialized in the app state. This simulates a scenario where the internal scheduler attribute is missing, leading to a 500 Internal Server Error on health check. """ scheduler = StreamScheduler() del ( scheduler.scheduler ) # Simulate uninitialized scheduler by deleting the attribute # Set the scheduler instance in the test client's app state. admin_user_client.app.state.scheduler = scheduler # Include the scheduler router in the test application. admin_user_client.app.include_router(scheduler_router) # Override only get_db, allowing the real get_scheduler to be tested. admin_user_client.app.dependency_overrides[get_db] = mock_get_db # Make the request to the scheduler health endpoint. response = admin_user_client.get("/scheduler/health") # Assert the response status code and error detail. assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR assert "Scheduler not initialized" in response.json()["detail"]