import os import argparse import json import logging import requests from pathlib import Path from datetime import datetime from utils.check_streams import StreamValidator from utils.config import EPG_URL, IPTV_SERVER_ADMIN_PASSWORD, IPTV_SERVER_ADMIN_USER, IPTV_SERVER_URL def parse_arguments(): parser = argparse.ArgumentParser(description='IPTV playlist generator') parser.add_argument('--output', default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'playlist.m3u8'), help='Path to output playlist file') parser.add_argument('--channels', default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'channels.json'), help='Path to channels definition JSON file') parser.add_argument('--dead-channels-log', default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'dead_channels.log'), help='Path to log file to store a list of dead channels') return parser.parse_args() def find_working_stream(validator, urls): """Test all URLs and return the first working one""" for url in urls: valid, message = validator.validate_stream(url) if valid: return url return None def create_playlist(channels_file, output_file): # Read channels from JSON file with open(channels_file, 'r', encoding='utf-8') as f: channels = json.load(f) # Initialize validator validator = StreamValidator(timeout=45) # Prepare M3U8 header m3u8_content = f'#EXTM3U url-tvg="{EPG_URL}"\n' for channel in channels: if 'urls' in channel: # Check if channel has URLs # Find first working stream working_url = find_working_stream(validator, channel['urls']) if working_url: # Add channel to playlist m3u8_content += f'#EXTINF:-1 tvg-id="{channel.get("tvg-id", "")}" ' m3u8_content += f'tvg-name="{channel.get("tvg-name", "")}" ' m3u8_content += f'tvg-logo="{channel.get("tvg-logo", "")}" ' m3u8_content += f'group-title="{channel.get("group-title", "")}", ' m3u8_content += f'{channel.get("name", "")}\n' m3u8_content += f'{working_url}\n' else: # Log dead channel logging.info(f'Dead channel: {channel.get("name", "Unknown")} - No working streams found') # Write playlist file with open(output_file, 'w', encoding='utf-8') as f: f.write(m3u8_content) def upload_playlist(file_path): """Uploads playlist file to IPTV server using HTTP Basic Auth""" try: with open(file_path, 'rb') as f: response = requests.post( IPTV_SERVER_URL + '/admin/playlist', auth=requests.auth.HTTPBasicAuth(IPTV_SERVER_ADMIN_USER, IPTV_SERVER_ADMIN_PASSWORD), files={'file': (os.path.basename(file_path), f)} ) if response.status_code == 200: print("Playlist successfully uploaded to server") else: print(f"Upload failed ({response.status_code}): {response.text}") except Exception as e: print(f"Upload error: {str(e)}") def main(): args = parse_arguments() channels_file = args.channels output_file = args.output dead_channels_log_file = args.dead_channels_log # Clear previous log file with open(dead_channels_log_file, 'w') as f: f.write(f'Log created on {datetime.now()}\n') # Configure logging logging.basicConfig( filename=dead_channels_log_file, level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Create playlist create_playlist(channels_file, output_file) #upload playlist to server upload_playlist(output_file) print("Playlist creation completed!") if __name__ == "__main__": main()