diff --git a/.vscode/settings.json b/.vscode/settings.json index 4e10761..1e4a1e0 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,6 +3,7 @@ "altinstall", "awscliv", "certbot", + "certifi", "devel", "dotenv", "fastapi", diff --git a/app/cabletv/__init__.py b/app/cabletv/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/cabletv/createEpg.py b/app/cabletv/createEpg.py new file mode 100644 index 0000000..d7f3129 --- /dev/null +++ b/app/cabletv/createEpg.py @@ -0,0 +1,147 @@ +import os +import re +import gzip +import json +import xml.etree.ElementTree as ET +import requests +import argparse +from utils.config import IPTV_SERVER_ADMIN_PASSWORD, IPTV_SERVER_ADMIN_USER, IPTV_SERVER_URL + +def parse_arguments(): + parser = argparse.ArgumentParser(description='EPG Grabber') + parser.add_argument('--playlist', + default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'playlist.m3u8'), + help='Path to playlist file') + parser.add_argument('--output', + default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'epg.xml'), + help='Path to output EPG XML file') + parser.add_argument('--epg-sources', + default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'epg_sources.json'), + help='Path to EPG sources JSON configuration file') + parser.add_argument('--save-as-gz', + action='store_true', + default=True, + help='Save an additional gzipped version of the EPG file') + return parser.parse_args() + +def load_epg_sources(config_path): + """Load EPG sources from JSON configuration file""" + try: + with open(config_path, 'r', encoding='utf-8') as f: + config = json.load(f) + return config.get('epg_sources', []) + except (FileNotFoundError, json.JSONDecodeError) as e: + print(f"Error loading EPG sources: {e}") + return [] + +def get_tvg_ids(playlist_path): + """ + Extracts unique tvg-id values from an M3U playlist file. + + Args: + playlist_path (str): Path to the M3U playlist file. + + Returns: + list[str]: A list of unique tvg-id strings. + """ + unique_tvg_ids = set() + # Regular expression to find tvg-id="" and capture the value + # It looks for tvg-id= followed by a double quote, + # then captures any characters that are NOT a double quote (non-greedy), + # and ends with a double quote. + tvg_id_pattern = re.compile(r'tvg-id="([^"]*)"') + + with open(playlist_path, 'r', encoding='utf-8') as file: + for line in file: + if line.startswith('#EXTINF'): + # Search for the tvg-id pattern in the line + match = tvg_id_pattern.search(line) + if match: + # Extract the captured group (the value inside the quotes) + tvg_id = match.group(1) + if tvg_id: # Ensure the extracted id is not empty + unique_tvg_ids.add(tvg_id) + + return list(unique_tvg_ids) + +def fetch_and_extract_xml(url): + response = requests.get(url) + if response.status_code != 200: + print(f"Failed to fetch {url}") + return None + + if url.endswith('.gz'): + try: + decompressed_data = gzip.decompress(response.content) + return ET.fromstring(decompressed_data) + except Exception as e: + print(f"Failed to decompress and parse XML from {url}: {e}") + return None + else: + try: + return ET.fromstring(response.content) + except Exception as e: + print(f"Failed to parse XML from {url}: {e}") + return None + +def filter_and_build_epg(urls, tvg_ids, output_file, save_as_gz=True): + root = ET.Element('tv') + + for url in urls: + epg_data = fetch_and_extract_xml(url) + if epg_data is None: + continue + + for channel in epg_data.findall('channel'): + tvg_id = channel.get('id') + if tvg_id in tvg_ids: + root.append(channel) + + for programme in epg_data.findall('programme'): + tvg_id = programme.get('channel') + if tvg_id in tvg_ids: + root.append(programme) + + tree = ET.ElementTree(root) + tree.write(output_file, encoding='utf-8', xml_declaration=True) + print(f"New EPG saved to {output_file}") + + if save_as_gz: + output_file_gz = output_file + '.gz' + with gzip.open(output_file_gz, 'wb') as f: + tree.write(f, encoding='utf-8', xml_declaration=True) + print(f"New EPG saved to {output_file_gz}") + +def upload_epg(file_path): + """Uploads gzipped EPG file to IPTV server using HTTP Basic Auth""" + try: + with open(file_path, 'rb') as f: + response = requests.post( + IPTV_SERVER_URL + '/admin/epg', + auth=requests.auth.HTTPBasicAuth(IPTV_SERVER_ADMIN_USER, IPTV_SERVER_ADMIN_PASSWORD), + files={'file': (os.path.basename(file_path), f)} + ) + + if response.status_code == 200: + print("EPG successfully uploaded to server") + else: + print(f"Upload failed ({response.status_code}): {response.text}") + except Exception as e: + print(f"Upload error: {str(e)}") + +if __name__ == "__main__": + args = parse_arguments() + playlist_file = args.playlist + output_file = args.output + + tvg_ids = get_tvg_ids(playlist_file) + urls = load_epg_sources(args.epg_sources) + + if not urls: + print("No EPG URLs loaded - check configuration file") + exit(1) + + filter_and_build_epg(urls, tvg_ids, output_file, args.save_as_gz) + + if args.save_as_gz: + upload_epg(output_file + '.gz') diff --git a/app/cabletv/createPlaylist.py b/app/cabletv/createPlaylist.py new file mode 100644 index 0000000..ddc032f --- /dev/null +++ b/app/cabletv/createPlaylist.py @@ -0,0 +1,108 @@ +import os +import argparse +import json +import logging +import requests +from pathlib import Path +from datetime import datetime +from utils.check_streams import StreamValidator +from utils.config import EPG_URL, IPTV_SERVER_ADMIN_PASSWORD, IPTV_SERVER_ADMIN_USER, IPTV_SERVER_URL + +def parse_arguments(): + parser = argparse.ArgumentParser(description='IPTV playlist generator') + parser.add_argument('--output', + default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'playlist.m3u8'), + help='Path to output playlist file') + parser.add_argument('--channels', + default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'channels.json'), + help='Path to channels definition JSON file') + parser.add_argument('--dead-channels-log', + default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'dead_channels.log'), + help='Path to log file to store a list of dead channels') + return parser.parse_args() + +def find_working_stream(validator, urls): + """Test all URLs and return the first working one""" + for url in urls: + valid, message = validator.validate_stream(url) + if valid: + return url + return None + +def create_playlist(channels_file, output_file): + # Read channels from JSON file + with open(channels_file, 'r', encoding='utf-8') as f: + channels = json.load(f) + + # Initialize validator + validator = StreamValidator(timeout=45) + + # Prepare M3U8 header + m3u8_content = f'#EXTM3U url-tvg="{EPG_URL}"\n' + + for channel in channels: + if 'urls' in channel: # Check if channel has URLs + # Find first working stream + working_url = find_working_stream(validator, channel['urls']) + + if working_url: + # Add channel to playlist + m3u8_content += f'#EXTINF:-1 tvg-id="{channel.get("tvg-id", "")}" ' + m3u8_content += f'tvg-name="{channel.get("tvg-name", "")}" ' + m3u8_content += f'tvg-logo="{channel.get("tvg-logo", "")}" ' + m3u8_content += f'group-title="{channel.get("group-title", "")}", ' + m3u8_content += f'{channel.get("name", "")}\n' + m3u8_content += f'{working_url}\n' + else: + # Log dead channel + logging.info(f'Dead channel: {channel.get("name", "Unknown")} - No working streams found') + + # Write playlist file + with open(output_file, 'w', encoding='utf-8') as f: + f.write(m3u8_content) + +def upload_playlist(file_path): + """Uploads playlist file to IPTV server using HTTP Basic Auth""" + try: + with open(file_path, 'rb') as f: + response = requests.post( + IPTV_SERVER_URL + '/admin/playlist', + auth=requests.auth.HTTPBasicAuth(IPTV_SERVER_ADMIN_USER, IPTV_SERVER_ADMIN_PASSWORD), + files={'file': (os.path.basename(file_path), f)} + ) + + if response.status_code == 200: + print("Playlist successfully uploaded to server") + else: + print(f"Upload failed ({response.status_code}): {response.text}") + except Exception as e: + print(f"Upload error: {str(e)}") + +def main(): + args = parse_arguments() + channels_file = args.channels + output_file = args.output + dead_channels_log_file = args.dead_channels_log + + # Clear previous log file + with open(dead_channels_log_file, 'w') as f: + f.write(f'Log created on {datetime.now()}\n') + + # Configure logging + logging.basicConfig( + filename=dead_channels_log_file, + level=logging.INFO, + format='%(asctime)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # Create playlist + create_playlist(channels_file, output_file) + + #upload playlist to server + upload_playlist(output_file) + + print("Playlist creation completed!") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/app/cabletv/utils/__init__.py b/app/cabletv/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/cabletv/utils/check_streams.py b/app/cabletv/utils/check_streams.py new file mode 100644 index 0000000..026e9c9 --- /dev/null +++ b/app/cabletv/utils/check_streams.py @@ -0,0 +1,135 @@ +import os +import argparse +import requests +import logging +from requests.exceptions import RequestException, Timeout, ConnectionError, HTTPError + +class StreamValidator: + def __init__(self, timeout=10, user_agent=None): + self.timeout = timeout + self.session = requests.Session() + self.session.headers.update({ + 'User-Agent': user_agent or 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' + }) + + def validate_stream(self, url): + """Validate a media stream URL with multiple fallback checks""" + try: + headers = {'Range': 'bytes=0-1024'} + with self.session.get( + url, + headers=headers, + timeout=self.timeout, + stream=True, + allow_redirects=True + ) as response: + if response.status_code not in [200, 206]: + return False, f"Invalid status code: {response.status_code}" + + content_type = response.headers.get('Content-Type', '') + if not self._is_valid_content_type(content_type): + return False, f"Invalid content type: {content_type}" + + try: + next(response.iter_content(chunk_size=1024)) + return True, "Stream is valid" + except (ConnectionError, Timeout): + return False, "Connection failed during content read" + + except HTTPError as e: + return False, f"HTTP Error: {str(e)}" + except ConnectionError as e: + return False, f"Connection Error: {str(e)}" + except Timeout: + return False, "Connection timeout" + except RequestException as e: + return False, f"Request Exception: {str(e)}" + except Exception as e: + return False, f"Validation error: {str(e)}" + + def _is_valid_content_type(self, content_type): + valid_types = [ + 'video/mp2t', 'application/vnd.apple.mpegurl', + 'application/dash+xml', 'video/mp4', + 'video/webm', 'application/octet-stream', + 'application/x-mpegURL' + ] + return any(ct in content_type for ct in valid_types) + + def parse_playlist(self, file_path): + """Extract stream URLs from M3U playlist file""" + urls = [] + try: + with open(file_path, 'r') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + urls.append(line) + except Exception as e: + logging.error(f"Error reading playlist file: {str(e)}") + raise + return urls + +def main(): + parser = argparse.ArgumentParser( + description='Validate streaming URLs from command line arguments or playlist files', + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument( + 'sources', + nargs='+', + help='List of URLs or file paths containing stream URLs' + ) + parser.add_argument( + '--timeout', + type=int, + default=20, + help='Timeout in seconds for stream checks' + ) + parser.add_argument( + '--output', + default='deadstreams.txt', + help='Output file name for inactive streams' + ) + + args = parser.parse_args() + + # Configure logging + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[logging.FileHandler('stream_check.log'), logging.StreamHandler()] + ) + + validator = StreamValidator(timeout=args.timeout) + dead_streams = [] + all_urls = [] + + # Process command line arguments + for source in args.sources: + if os.path.isfile(source): + try: + all_urls.extend(validator.parse_playlist(source)) + logging.info(f"Parsed playlist file: {source}") + except Exception as e: + logging.error(f"Failed to process file {source}: {str(e)}") + else: + all_urls.append(source) + + # Validate all collected URLs + for url in all_urls: + logging.info(f"Checking: {url}") + valid, message = validator.validate_stream(url) + status = "ALIVE" if valid else "DEAD" + print(f"{url}\nStatus: {status}\nDetails: {message}\n") + if not valid: + dead_streams.append(url) + + # Save dead streams to file + if dead_streams: + with open(args.output, 'w') as f: + f.write('\n'.join(dead_streams)) + logging.info(f"Found {len(dead_streams)} dead streams. Saved to {args.output}.") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/app/cabletv/utils/config.py b/app/cabletv/utils/config.py new file mode 100644 index 0000000..6552646 --- /dev/null +++ b/app/cabletv/utils/config.py @@ -0,0 +1,15 @@ +import os +from dotenv import load_dotenv + +# Load environment variables from a .env file if it exists +load_dotenv() + +IPTV_SERVER_URL = os.getenv("IPTV_SERVER_URL", "https://iptv.fiorinis.com") + +# Super iptv-server admin credentials for basic auth +# Reads from environment variables IPTV_SERVER_ADMIN_USER and IPTV_SERVER_ADMIN_PASSWORD +IPTV_SERVER_ADMIN_USER = os.getenv("IPTV_SERVER_ADMIN_USER", "admin") +IPTV_SERVER_ADMIN_PASSWORD = os.getenv("IPTV_SERVER_ADMIN_PASSWORD", "adminpassword") + +# URL for the EPG XML file to place in the playlist's header +EPG_URL = os.getenv("EPG_URL", "https://example.com/epg.xml.gz") \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 8bd71cd..482c9b8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,7 @@ fastapi==0.103.2 aws-cdk-lib>=2.0.0 constructs>=10.0.0 +dotenv==0.9.9 python-dotenv==0.21.1 -uvicorn==0.22.0 \ No newline at end of file +uvicorn==0.22.0 +requests==2.31.0 \ No newline at end of file