import argparse import gzip import json import os import re import xml.etree.ElementTree as ET import requests from utils.constants import ( IPTV_SERVER_ADMIN_PASSWORD, IPTV_SERVER_ADMIN_USER, IPTV_SERVER_URL, ) def parse_arguments(): parser = argparse.ArgumentParser(description="EPG Grabber") parser.add_argument( "--playlist", default=os.path.join( os.path.dirname(os.path.dirname(__file__)), "playlist.m3u8" ), help="Path to playlist file", ) parser.add_argument( "--output", default=os.path.join(os.path.dirname(os.path.dirname(__file__)), "epg.xml"), help="Path to output EPG XML file", ) parser.add_argument( "--epg-sources", default=os.path.join( os.path.dirname(os.path.dirname(__file__)), "epg_sources.json" ), help="Path to EPG sources JSON configuration file", ) parser.add_argument( "--save-as-gz", action="store_true", default=True, help="Save an additional gzipped version of the EPG file", ) return parser.parse_args() def load_epg_sources(config_path): """Load EPG sources from JSON configuration file""" try: with open(config_path, encoding="utf-8") as f: config = json.load(f) return config.get("epg_sources", []) except (FileNotFoundError, json.JSONDecodeError) as e: print(f"Error loading EPG sources: {e}") return [] def get_tvg_ids(playlist_path): """ Extracts unique tvg-id values from an M3U playlist file. Args: playlist_path (str): Path to the M3U playlist file. Returns: list[str]: A list of unique tvg-id strings. """ unique_tvg_ids = set() # Regular expression to find tvg-id="" and capture the value # It looks for tvg-id= followed by a double quote, # then captures any characters that are NOT a double quote (non-greedy), # and ends with a double quote. tvg_id_pattern = re.compile(r'tvg-id="([^"]*)"') with open(playlist_path, encoding="utf-8") as file: for line in file: if line.startswith("#EXTINF"): # Search for the tvg-id pattern in the line match = tvg_id_pattern.search(line) if match: # Extract the captured group (the value inside the quotes) tvg_id = match.group(1) if tvg_id: # Ensure the extracted id is not empty unique_tvg_ids.add(tvg_id) return list(unique_tvg_ids) def fetch_and_extract_xml(url): response = requests.get(url) if response.status_code != 200: print(f"Failed to fetch {url}") return None if url.endswith(".gz"): try: decompressed_data = gzip.decompress(response.content) return ET.fromstring(decompressed_data) except Exception as e: print(f"Failed to decompress and parse XML from {url}: {e}") return None else: try: return ET.fromstring(response.content) except Exception as e: print(f"Failed to parse XML from {url}: {e}") return None def filter_and_build_epg(urls, tvg_ids, output_file, save_as_gz=True): root = ET.Element("tv") for url in urls: epg_data = fetch_and_extract_xml(url) if epg_data is None: continue for channel in epg_data.findall("channel"): tvg_id = channel.get("id") if tvg_id in tvg_ids: root.append(channel) for programme in epg_data.findall("programme"): tvg_id = programme.get("channel") if tvg_id in tvg_ids: root.append(programme) tree = ET.ElementTree(root) tree.write(output_file, encoding="utf-8", xml_declaration=True) print(f"New EPG saved to {output_file}") if save_as_gz: output_file_gz = output_file + ".gz" with gzip.open(output_file_gz, "wb") as f: tree.write(f, encoding="utf-8", xml_declaration=True) print(f"New EPG saved to {output_file_gz}") def upload_epg(file_path): """Uploads gzipped EPG file to IPTV server using HTTP Basic Auth""" try: with open(file_path, "rb") as f: response = requests.post( IPTV_SERVER_URL + "/admin/epg", auth=requests.auth.HTTPBasicAuth( IPTV_SERVER_ADMIN_USER, IPTV_SERVER_ADMIN_PASSWORD ), files={"file": (os.path.basename(file_path), f)}, ) if response.status_code == 200: print("EPG successfully uploaded to server") else: print(f"Upload failed ({response.status_code}): {response.text}") except Exception as e: print(f"Upload error: {str(e)}") if __name__ == "__main__": args = parse_arguments() playlist_file = args.playlist output_file = args.output tvg_ids = get_tvg_ids(playlist_file) urls = load_epg_sources(args.epg_sources) if not urls: print("No EPG URLs loaded - check configuration file") exit(1) filter_and_build_epg(urls, tvg_ids, output_file, args.save_as_gz) if args.save_as_gz: upload_epg(output_file + ".gz")