diff --git a/.vscode/settings.json b/.vscode/settings.json index 78efe5e..69daef7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -65,6 +65,7 @@ "levelname", "mpegurl", "nohup", + "nopriority", "ondelete", "onupdate", "passlib", diff --git a/app/models/__init__.py b/app/models/__init__.py index d3a14da..0b5874a 100644 --- a/app/models/__init__.py +++ b/app/models/__init__.py @@ -1,4 +1,4 @@ -from .db import Base, ChannelDB, ChannelURL, Group +from .db import Base, ChannelDB, ChannelURL, Group, Priority from .schemas import ( ChannelCreate, ChannelResponse, @@ -20,6 +20,7 @@ __all__ = [ "ChannelURLCreate", "ChannelURLResponse", "Group", + "Priority", "GroupCreate", "GroupResponse", "GroupUpdate", diff --git a/app/models/db.py b/app/models/db.py index fb570c5..7c687da 100644 --- a/app/models/db.py +++ b/app/models/db.py @@ -39,6 +39,8 @@ class SQLiteUUID(TypeDecorator): def process_result_value(self, value, dialect): if value is None: return value + if isinstance(value, uuid.UUID): + return value return uuid.UUID(value) def compare_values(self, x, y): diff --git a/app/routers/channels.py b/app/routers/channels.py index 67c3189..ad46a95 100644 --- a/app/routers/channels.py +++ b/app/routers/channels.py @@ -14,6 +14,7 @@ from app.models import ( ChannelURLCreate, ChannelURLResponse, Group, + Priority, # Added Priority import ) from app.models.auth import CognitoUser from app.models.schemas import ChannelURLUpdate @@ -149,6 +150,41 @@ def update_channel( return db_channel +@router.delete("/", status_code=status.HTTP_200_OK) +@require_roles("admin") +def delete_channels( + db: Session = Depends(get_db), + user: CognitoUser = Depends(get_current_user), +): + """Delete all channels""" + count = 0 + try: + count = db.query(ChannelDB).count() + + # First delete all channels + db.query(ChannelDB).delete() + + # Then delete any URLs that are now orphaned (no channel references) + db.query(ChannelURL).filter( + ~ChannelURL.channel_id.in_(db.query(ChannelDB.id)) + ).delete(synchronize_session=False) + + # Then delete any groups that are now empty + db.query(Group).filter(~Group.id.in_(db.query(ChannelDB.group_id))).delete( + synchronize_session=False + ) + + db.commit() + except Exception as e: + print(f"Error deleting channels: {e}") + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to delete channels", + ) + return {"deleted": count} + + @router.delete("/{channel_id}", status_code=status.HTTP_204_NO_CONTENT) @require_roles("admin") def delete_channel( @@ -241,6 +277,130 @@ def update_channel_group( return channel +# Bulk Upload and Reset Endpoints +@router.post("/bulk-upload", status_code=status.HTTP_200_OK) +@require_roles("admin") +def bulk_upload_channels( + channels: list[dict], + db: Session = Depends(get_db), + user: CognitoUser = Depends(get_current_user), +): + """Bulk upload channels from JSON array""" + processed = 0 + + # Fetch all priorities from the database, ordered by id + priorities = db.query(Priority).order_by(Priority.id).all() + priority_map = {i: p.id for i, p in enumerate(priorities)} + + # Get the highest priority_id (which corresponds to the lowest priority level) + max_priority_id = None + if priorities: + max_priority_id = db.query(Priority.id).order_by(Priority.id.desc()).first()[0] + + for channel_data in channels: + try: + # Get or create group + group_name = channel_data.get("group-title") + if not group_name: + continue + + group = db.query(Group).filter(Group.name == group_name).first() + if not group: + group = Group(name=group_name) + db.add(group) + db.flush() # Use flush to make the group available in the session + db.refresh(group) + + # Prepare channel data + urls = channel_data.get("urls", []) + if not isinstance(urls, list): + urls = [urls] + + # Assign priorities dynamically based on fetched priorities + url_objects = [] + for i, url in enumerate(urls): # Process all URLs + priority_id = priority_map.get(i) + if priority_id is None: + # If index is out of bounds, + # assign the highest priority_id (lowest priority) + if max_priority_id is not None: + priority_id = max_priority_id + else: + print( + f"Warning: No priorities defined in database. " + f"Skipping URL {url}" + ) + continue + url_objects.append({"url": url, "priority_id": priority_id}) + + # Create channel object with required fields + channel_obj = ChannelDB( + tvg_id=channel_data.get("tvg-id", ""), + name=channel_data.get("name", ""), + group_id=group.id, + tvg_name=channel_data.get("tvg-name", ""), + tvg_logo=channel_data.get("tvg-logo", ""), + ) + + # Upsert channel + existing_channel = ( + db.query(ChannelDB) + .filter( + and_( + ChannelDB.group_id == group.id, + ChannelDB.name == channel_obj.name, + ) + ) + .first() + ) + + if existing_channel: + # Update existing + existing_channel.tvg_id = channel_obj.tvg_id + existing_channel.tvg_name = channel_obj.tvg_name + existing_channel.tvg_logo = channel_obj.tvg_logo + + # Clear and recreate URLs + db.query(ChannelURL).filter( + ChannelURL.channel_id == existing_channel.id + ).delete() + + for url in url_objects: + db_url = ChannelURL( + channel_id=existing_channel.id, + url=url["url"], + priority_id=url["priority_id"], + in_use=False, + ) + db.add(db_url) + else: + # Create new + db.add(channel_obj) + db.flush() # Flush to get the new channel's ID + db.refresh(channel_obj) + + # Add URLs for new channel + for url in url_objects: + db_url = ChannelURL( + channel_id=channel_obj.id, + url=url["url"], + priority_id=url["priority_id"], + in_use=False, + ) + db.add(db_url) + + db.commit() # Commit all changes for this channel atomically + processed += 1 + + except Exception as e: + print(f"Error processing channel: {channel_data.get('name', 'Unknown')}") + print(f"Exception details: {e}") + db.rollback() # Rollback the entire transaction for the failed channel + continue + + return {"processed": processed} + + # URL Management Endpoints @router.post( "/{channel_id}/urls", diff --git a/app/routers/groups.py b/app/routers/groups.py index 14f6f0a..06e057e 100644 --- a/app/routers/groups.py +++ b/app/routers/groups.py @@ -86,6 +86,28 @@ def update_group( return db_group +@router.delete("/", status_code=status.HTTP_200_OK) +@require_roles("admin") +def delete_groups( + db: Session = Depends(get_db), + user: CognitoUser = Depends(get_current_user), +): + """Delete all groups that have no channels (skip groups with channels)""" + groups = db.query(Group).all() + deleted = 0 + skipped = 0 + + for group in groups: + if not group.channels: + db.delete(group) + deleted += 1 + else: + skipped += 1 + + db.commit() + return {"deleted": deleted, "skipped": skipped} + + @router.delete("/{group_id}", status_code=status.HTTP_204_NO_CONTENT) @require_roles("admin") def delete_group( diff --git a/app/routers/priorities.py b/app/routers/priorities.py index 1e43f82..94b38d5 100644 --- a/app/routers/priorities.py +++ b/app/routers/priorities.py @@ -59,6 +59,34 @@ def get_priority( return priority +@router.delete("/", status_code=status.HTTP_200_OK) +@require_roles("admin") +def delete_priorities( + db: Session = Depends(get_db), + user: CognitoUser = Depends(get_current_user), +): + """Delete all priorities not in use by channel URLs""" + from app.models.db import ChannelURL + + priorities = db.query(Priority).all() + deleted = 0 + skipped = 0 + + for priority in priorities: + in_use = db.scalar( + select(ChannelURL).where(ChannelURL.priority_id == priority.id).limit(1) + ) + + if not in_use: + db.delete(priority) + deleted += 1 + else: + skipped += 1 + + db.commit() + return {"deleted": deleted, "skipped": skipped} + + @router.delete("/{priority_id}", status_code=status.HTTP_204_NO_CONTENT) @require_roles("admin") def delete_priority( diff --git a/tests/models/test_db.py b/tests/models/test_db.py index 15e2cac..8c103c9 100644 --- a/tests/models/test_db.py +++ b/tests/models/test_db.py @@ -53,6 +53,15 @@ def test_sqliteuuid_process_result_value_valid_string(): assert result == test_uuid +def test_sqliteuuid_process_result_value_uuid_object(): + """Test SQLiteUUID.process_result_value: UUID object returns itself.""" + uuid_type = SQLiteUUID() + test_uuid = uuid.uuid4() + result = uuid_type.process_result_value(test_uuid, None) + assert isinstance(result, uuid.UUID) + assert result is test_uuid # Ensure it's the same object, not a new one + + def test_sqliteuuid_compare_values_none(): """Test SQLiteUUID.compare_values handles None values""" uuid_type = SQLiteUUID() diff --git a/tests/routers/test_channels.py b/tests/routers/test_channels.py index 349ad20..b4d4048 100644 --- a/tests/routers/test_channels.py +++ b/tests/routers/test_channels.py @@ -1,10 +1,8 @@ import uuid -from datetime import datetime, timezone +from unittest.mock import patch -import pytest from fastapi import status from fastapi.testclient import TestClient -from sqlalchemy import String from sqlalchemy.orm import Session from app.auth.dependencies import get_current_user @@ -338,7 +336,69 @@ def test_update_channel_forbidden_for_non_admin( assert "required roles" in response.json()["detail"] -# --- Test Cases For Delete Channel --- +# --- Test Cases For Delete Channels --- + + +def test_delete_all_channels_success(db_session, admin_user_client): + """Test reset channels endpoint""" + # Create test data + create_mock_priorities_and_group(db_session, [(100, "High")], "Test Group") + channel_data = [ + { + "name": "Channel 1", + "group-title": "Group A", + "tvg-id": "channel1.tv", + "tvg-name": "Channel One", + "tvg-logo": "logo1.png", + "urls": ["http://stream1.com", "http://backup1.com"], + } + ] + admin_user_client.post("/channels/bulk-upload", json=channel_data) + + # Verify data exists + assert db_session.query(MockChannelDB).count() == 1 + assert db_session.query(MockChannelURL).count() == 2 + + # Reset channels + response = admin_user_client.delete("/channels") + assert response.status_code == status.HTTP_200_OK + assert response.json()["deleted"] == 1 + + # Verify data is gone + assert db_session.query(MockChannelDB).count() == 0 + assert db_session.query(MockChannelURL).count() == 0 + + +def test_delete_all_channels_forbidden_for_non_admin(db_session, non_admin_user_client): + """Test delete all channels requires admin role""" + response = non_admin_user_client.delete("/channels") + assert response.status_code == status.HTTP_403_FORBIDDEN + + +def test_delete_all_channels_error_handling(db_session, admin_user_client): + """Test error handling when deleting channels fails""" + # Create some test data + group_id = create_mock_priorities_and_group( + db_session, [(100, "High")], "Error Group" + ) + channel = MockChannelDB( + name="Error Channel", + group_id=group_id, + tvg_id="error.tv", + urls=[MockChannelURL(url="http://error.com/stream", priority_id=100)], + ) + db_session.add(channel) + db_session.commit() + + # To test the error handling of the endpoint, we need to simulate a failure + # during the database operation. We patch the 'commit' method of the session + # class to raise an exception. This patch will be active during the API call. + with patch.object( + type(db_session), "commit", side_effect=Exception("Mock database commit error") + ): + response = admin_user_client.delete("/channels/") + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert "Failed to delete channels" in response.json()["detail"] def test_delete_channel_success(db_session: Session, admin_user_client: TestClient): @@ -1275,3 +1335,254 @@ def test_list_channel_urls_forbidden_for_non_admin( assert response.status_code == status.HTTP_403_FORBIDDEN assert "required roles" in response.json()["detail"] + + +# --- Test Cases For Bulk Upload Channels --- + + +def test_bulk_upload_channels_success( + db_session: Session, admin_user_client: TestClient +): + """Test successful bulk upload of channels""" + # Create priorities + priorities = [(100, "High"), (200, "Medium"), (300, "Low")] + for id, desc in priorities: + db_session.add(MockPriority(id=id, description=desc)) + db_session.commit() + + # Test data matching channels.json format + channels = [ + { + "name": "Channel 1", + "group-title": "Group A", + "tvg-id": "channel1.tv", + "tvg-name": "Channel One", + "tvg-logo": "logo1.png", + "urls": ["http://stream1.com", "http://backup1.com"], + }, + { + "name": "Channel 2", + "group-title": "Group B", + "tvg-id": "channel2.tv", + "tvg-name": "Channel Two", + "tvg-logo": "logo2.png", + "urls": ["http://stream2.com"], + }, + ] + + response = admin_user_client.post("/channels/bulk-upload", json=channels) + assert response.status_code == status.HTTP_200_OK + assert response.json()["processed"] == 2 + + # Verify groups created + groups = db_session.query(MockGroup).all() + assert len(groups) == 2 + assert {g.name for g in groups} == {"Group A", "Group B"} + + # Verify channels and URLs + channel1 = db_session.query(MockChannelDB).filter_by(name="Channel 1").first() + assert channel1 is not None + assert len(channel1.urls) == 2 + assert channel1.urls[0].url == "http://stream1.com" + assert channel1.urls[0].priority_id == 100 + assert channel1.urls[1].url == "http://backup1.com" + assert channel1.urls[1].priority_id == 200 + + channel2 = db_session.query(MockChannelDB).filter_by(name="Channel 2").first() + assert channel2 is not None + assert len(channel2.urls) == 1 + assert channel2.urls[0].url == "http://stream2.com" + assert channel2.urls[0].priority_id == 100 + + +def test_bulk_upload_empty(db_session, admin_user_client): + """Test bulk upload with empty list""" + response = admin_user_client.post("/channels/bulk-upload", json=[]) + assert response.status_code == status.HTTP_200_OK + assert response.json()["processed"] == 0 + + +def test_bulk_upload_forbidden_for_non_admin(db_session, non_admin_user_client): + """Test bulk upload requires admin role""" + response = non_admin_user_client.post("/channels/bulk-upload", json=[{}]) + assert response.status_code == status.HTTP_403_FORBIDDEN + + +def test_bulk_upload_skip_missing_group_name(db_session, admin_user_client): + """Test bulk upload skips channels with missing group name""" + channel_data = [{"name": "No Group Channel", "urls": ["http://no-group.com"]}] + + response = admin_user_client.post("/channels/bulk-upload", json=channel_data) + assert response.status_code == status.HTTP_200_OK + assert response.json()["processed"] == 0 + assert db_session.query(MockChannelDB).count() == 0 + + +def test_bulk_upload_single_url_conversion(db_session, admin_user_client): + """Test bulk upload converts single URL string to list""" + # Create priorities + priorities = [(100, "High"), (200, "Medium"), (300, "Low")] + for id, desc in priorities: + db_session.add(MockPriority(id=id, description=desc)) + db_session.commit() + + channel_data = [ + { + "name": "Single URL Channel", + "group-title": "Test Group", + "urls": ["http://single-url.com"], + } + ] + + response = admin_user_client.post("/channels/bulk-upload", json=channel_data) + assert response.status_code == status.HTTP_200_OK + assert response.json()["processed"] == 1 + + channel = db_session.query(MockChannelDB).first() + assert channel is not None + urls = db_session.query(MockChannelURL).filter_by(channel_id=channel.id).all() + assert len(urls) == 1 + assert urls[0].url == "http://single-url.com" + + +def test_bulk_upload_update_existing_channel(db_session, admin_user_client): + """Test bulk upload updates existing channel and recreates URLs""" + # First create a group and channel + group_id = create_mock_priorities_and_group( + db_session, [(100, "High")], "Existing Group" + ) + channel = MockChannelDB( + name="Existing Channel", + group_id=group_id, + tvg_id="existing.tv", + tvg_name="Existing", + tvg_logo="existing.png", + ) + db_session.add(channel) + db_session.commit() + db_session.refresh(channel) + + # Add some URLs + db_url1 = MockChannelURL( + channel_id=channel.id, url="http://old1.com", priority_id=100, in_use=False + ) + db_url2 = MockChannelURL( + channel_id=channel.id, url="http://old2.com", priority_id=200, in_use=False + ) + db_session.add_all([db_url1, db_url2]) + db_session.commit() + + # Bulk upload with same group and channel name but different URLs + channel_data = [ + { + "name": "Existing Channel", + "group-title": "Existing Group", + "tvg-id": "updated.tv", + "tvg-name": "Updated", + "tvg-logo": "updated.png", + "urls": ["http://new1.com", "http://new2.com"], + } + ] + + response = admin_user_client.post("/channels/bulk-upload", json=channel_data) + assert response.status_code == status.HTTP_200_OK + assert response.json()["processed"] == 1 + + # Verify channel was updated + print( + "Channel after update:", + db_session.query(MockChannelDB).filter().first().id, + channel.tvg_id, + ) + updated_channel = db_session.query(MockChannelDB).filter_by(id=channel.id).first() + assert updated_channel.tvg_id == "updated.tv" + assert updated_channel.tvg_name == "Updated" + assert updated_channel.tvg_logo == "updated.png" + + # Verify old URLs were deleted and new ones created + urls = db_session.query(MockChannelURL).filter_by(channel_id=channel.id).all() + assert len(urls) == 2 + assert {url.url for url in urls} == {"http://new1.com", "http://new2.com"} + + +def test_bulk_upload_error_handling(db_session, admin_user_client): + """Test error handling in bulk upload continues processing""" + # Create a group first + create_mock_priorities_and_group(db_session, [(100, "High")], "Error Group") + + channel_data = [ + { + "name": "Good Channel", + "group-title": "Error Group", + "urls": ["http://good.com"], + }, + { + "name": "Bad Channel", + "group-title": "Error Group", + "urls": None, # This will cause an error + }, + { + "name": "Another Good Channel", + "group-title": "Error Group", + "urls": ["http://another-good.com"], + }, + ] + + response = admin_user_client.post("/channels/bulk-upload", json=channel_data) + assert response.status_code == status.HTTP_200_OK + assert response.json()["processed"] == 2 # Only 2 successful + + # Verify the good channels were processed + channels = db_session.query(MockChannelDB).all() + assert len(channels) == 2 + assert {c.name for c in channels} == {"Good Channel", "Another Good Channel"} + + +def test_bulk_upload_channels_no_priorities( + db_session: Session, admin_user_client: TestClient +): + """Test bulk upload when no priorities are defined in the database. + The channel should be processed, but its URLs should be skipped. + """ + # Ensure no priorities exist in the database + db_session.query(MockPriority).delete() + db_session.commit() + + # Create a group for the channel + group = MockGroup(name="No Priority Group") + db_session.add(group) + db_session.commit() + db_session.refresh(group) + + channel_data = [ + { + "name": "Channel With No Priority URL", + "group-title": "No Priority Group", + "tvg-id": "nopriority.tv", + "tvg-name": "NoPriorityChannel", + "tvg-logo": "nopriority.png", + "urls": ["http://nopriority.com/stream"], + } + ] + + # Call the bulk upload endpoint + response = admin_user_client.post("/channels/bulk-upload", json=channel_data) + + # Assert the response indicates 1 processed channel + assert response.status_code == status.HTTP_200_OK + assert response.json()["processed"] == 1 + + # Verify the channel was added to the database + db_channel = ( + db_session.query(MockChannelDB) + .filter_by(name="Channel With No Priority URL") + .first() + ) + assert db_channel is not None + assert db_channel.group_id == group.id + + # Verify no URLs were added for this channel + assert ( + db_session.query(MockChannelURL).filter_by(channel_id=db_channel.id).count() + == 0 + ) diff --git a/tests/routers/test_groups.py b/tests/routers/test_groups.py index 16f785b..363b856 100644 --- a/tests/routers/test_groups.py +++ b/tests/routers/test_groups.py @@ -1,12 +1,10 @@ import uuid from datetime import datetime, timezone -import pytest from fastapi import status from sqlalchemy.orm import Session from app.auth.dependencies import get_current_user -from app.routers.groups import router as groups_router from app.utils.database import get_db # Import mocks and fixtures @@ -15,7 +13,11 @@ from tests.utils.auth_test_fixtures import ( db_session, non_admin_user_client, ) -from tests.utils.db_mocks import MockChannelDB, MockGroup, SQLiteUUID +from tests.utils.db_mocks import ( + MockChannelDB, + MockGroup, + create_mock_priorities_and_group, +) # --- Test Cases For Group Creation --- @@ -188,6 +190,43 @@ def test_update_group_forbidden_for_non_admin( # --- Test Cases For Delete Group --- +def test_delete_all_groups_success(db_session, admin_user_client): + """Test reset groups endpoint""" + # Create test data + group1_id = create_mock_priorities_and_group(db_session, [], "Group A") + group2_id = create_mock_priorities_and_group(db_session, [], "Group B") + + # Add channel to group2 + channel_data = [ + { + "group-title": "Group A", + "tvg_id": "channel1.tv", + "name": "Channel One", + "url": ["http://test.com", "http://example.com"], + } + ] + admin_user_client.post("/channels/bulk-upload", json=channel_data) + + # Reset groups + response = admin_user_client.delete("/groups") + assert response.status_code == status.HTTP_200_OK + assert response.json()["deleted"] == 1 # Only group2 should be deleted + assert response.json()["skipped"] == 1 # group1 has channels + + # Verify group2 deleted, group1 remains + assert ( + db_session.query(MockGroup).filter(MockGroup.id == group1_id).first() + is not None + ) + assert db_session.query(MockGroup).filter(MockGroup.id == group2_id).first() is None + + +def test_delete_all_groups_forbidden_for_non_admin(db_session, non_admin_user_client): + """Test reset groups requires admin role""" + response = non_admin_user_client.delete("/groups") + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_delete_group_success(db_session: Session, admin_user_client): # Create group group_id = uuid.uuid4() diff --git a/tests/routers/test_priorities.py b/tests/routers/test_priorities.py index 1390520..32a5c8c 100644 --- a/tests/routers/test_priorities.py +++ b/tests/routers/test_priorities.py @@ -1,11 +1,9 @@ import uuid from datetime import datetime, timezone -import pytest from fastapi import status from sqlalchemy.orm import Session -from app.models.db import ChannelURL, Priority from app.routers.priorities import router as priorities_router # Import fixtures and mocks @@ -14,7 +12,13 @@ from tests.utils.auth_test_fixtures import ( db_session, non_admin_user_client, ) -from tests.utils.db_mocks import MockChannelDB, MockChannelURL, MockGroup, MockPriority +from tests.utils.db_mocks import ( + MockChannelDB, + MockChannelURL, + MockGroup, + MockPriority, + create_mock_priorities_and_group, +) # --- Test Cases For Priority Creation --- @@ -127,6 +131,44 @@ def test_get_priority_forbidden_for_non_admin( # --- Test Cases For Delete Priority --- +def test_delete_all_priorities_success(db_session, admin_user_client): + """Test reset priorities endpoint""" + # Create test data + priorities = [(100, "High"), (200, "Medium"), (300, "Low")] + for id, desc in priorities: + db_session.add(MockPriority(id=id, description=desc)) + db_session.commit() + + # Create channel using priority 100 + create_mock_priorities_and_group(db_session, [], "Test Group") + channel_data = [ + { + "group-title": "Test Group", + "tvg_id": "test.tv", + "name": "Test Channel", + "urls": ["http://test.com"], + } + ] + admin_user_client.post("/channels/bulk-upload", json=channel_data) + + # Delete all priorities + response = admin_user_client.delete("/priorities") + assert response.status_code == status.HTTP_200_OK + assert response.json()["deleted"] == 2 # Medium and Low priorities + assert response.json()["skipped"] == 1 # High priority is in use + + # Verify only priority 100 remains + priorities = db_session.query(MockPriority).all() + assert len(priorities) == 1 + assert priorities[0].id == 100 + + +def test_reset_priorities_forbidden_for_non_admin(db_session, non_admin_user_client): + """Test reset priorities requires admin role""" + response = non_admin_user_client.delete("/priorities") + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_delete_priority_success(db_session: Session, admin_user_client): # Create a test priority priority = MockPriority(id=100, description="To Delete") diff --git a/tests/utils/db_mocks.py b/tests/utils/db_mocks.py index ab22477..3e09ecf 100644 --- a/tests/utils/db_mocks.py +++ b/tests/utils/db_mocks.py @@ -60,6 +60,9 @@ class MockChannelDB(MockBase): onupdate=lambda: datetime.now(timezone.utc), ) group = relationship("MockGroup", back_populates="channels") + urls = relationship( + "MockChannelURL", back_populates="channel", cascade="all, delete-orphan" + ) class MockChannelURL(MockBase): @@ -77,6 +80,7 @@ class MockChannelURL(MockBase): default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc), ) + channel = relationship("MockChannelDB", back_populates="urls") def create_mock_priorities_and_group(db_session, priorities, group_name):