import argparse import json import logging import os from datetime import datetime import requests from utils.check_streams import StreamValidator from utils.constants import ( EPG_URL, IPTV_SERVER_ADMIN_PASSWORD, IPTV_SERVER_ADMIN_USER, IPTV_SERVER_URL, ) def parse_arguments(): parser = argparse.ArgumentParser(description="IPTV playlist generator") parser.add_argument( "--output", default=os.path.join( os.path.dirname(os.path.dirname(__file__)), "playlist.m3u8" ), help="Path to output playlist file", ) parser.add_argument( "--channels", default=os.path.join( os.path.dirname(os.path.dirname(__file__)), "channels.json" ), help="Path to channels definition JSON file", ) parser.add_argument( "--dead-channels-log", default=os.path.join( os.path.dirname(os.path.dirname(__file__)), "dead_channels.log" ), help="Path to log file to store a list of dead channels", ) return parser.parse_args() def find_working_stream(validator, urls): """Test all URLs and return the first working one""" for url in urls: valid, message = validator.validate_stream(url) if valid: return url return None def create_playlist(channels_file, output_file): # Read channels from JSON file with open(channels_file, encoding="utf-8") as f: channels = json.load(f) # Initialize validator validator = StreamValidator(timeout=45) # Prepare M3U8 header m3u8_content = f'#EXTM3U url-tvg="{EPG_URL}"\n' for channel in channels: if "urls" in channel: # Check if channel has URLs # Find first working stream working_url = find_working_stream(validator, channel["urls"]) if working_url: # Add channel to playlist m3u8_content += f'#EXTINF:-1 tvg-id="{channel.get("tvg-id", "")}" ' m3u8_content += f'tvg-name="{channel.get("tvg-name", "")}" ' m3u8_content += f'tvg-logo="{channel.get("tvg-logo", "")}" ' m3u8_content += f'group-title="{channel.get("group-title", "")}", ' m3u8_content += f"{channel.get('name', '')}\n" m3u8_content += f"{working_url}\n" else: # Log dead channel logging.info( f"Dead channel: {channel.get('name', 'Unknown')} - " "No working streams found" ) # Write playlist file with open(output_file, "w", encoding="utf-8") as f: f.write(m3u8_content) def upload_playlist(file_path): """Uploads playlist file to IPTV server using HTTP Basic Auth""" try: with open(file_path, "rb") as f: response = requests.post( IPTV_SERVER_URL + "/admin/playlist", auth=requests.auth.HTTPBasicAuth( IPTV_SERVER_ADMIN_USER, IPTV_SERVER_ADMIN_PASSWORD ), files={"file": (os.path.basename(file_path), f)}, ) if response.status_code == 200: print("Playlist successfully uploaded to server") else: print(f"Upload failed ({response.status_code}): {response.text}") except Exception as e: print(f"Upload error: {str(e)}") def main(): args = parse_arguments() channels_file = args.channels output_file = args.output dead_channels_log_file = args.dead_channels_log # Clear previous log file with open(dead_channels_log_file, "w") as f: f.write(f"Log created on {datetime.now()}\n") # Configure logging logging.basicConfig( filename=dead_channels_log_file, level=logging.INFO, format="%(asctime)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) # Create playlist create_playlist(channels_file, output_file) # upload playlist to server upload_playlist(output_file) print("Playlist creation completed!") if __name__ == "__main__": main()