108 lines
4.0 KiB
Python
108 lines
4.0 KiB
Python
import os
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import requests
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from utils.check_streams import StreamValidator
|
|
from utils.constants import EPG_URL, IPTV_SERVER_ADMIN_PASSWORD, IPTV_SERVER_ADMIN_USER, IPTV_SERVER_URL
|
|
|
|
def parse_arguments():
|
|
parser = argparse.ArgumentParser(description='IPTV playlist generator')
|
|
parser.add_argument('--output',
|
|
default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'playlist.m3u8'),
|
|
help='Path to output playlist file')
|
|
parser.add_argument('--channels',
|
|
default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'channels.json'),
|
|
help='Path to channels definition JSON file')
|
|
parser.add_argument('--dead-channels-log',
|
|
default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'dead_channels.log'),
|
|
help='Path to log file to store a list of dead channels')
|
|
return parser.parse_args()
|
|
|
|
def find_working_stream(validator, urls):
|
|
"""Test all URLs and return the first working one"""
|
|
for url in urls:
|
|
valid, message = validator.validate_stream(url)
|
|
if valid:
|
|
return url
|
|
return None
|
|
|
|
def create_playlist(channels_file, output_file):
|
|
# Read channels from JSON file
|
|
with open(channels_file, 'r', encoding='utf-8') as f:
|
|
channels = json.load(f)
|
|
|
|
# Initialize validator
|
|
validator = StreamValidator(timeout=45)
|
|
|
|
# Prepare M3U8 header
|
|
m3u8_content = f'#EXTM3U url-tvg="{EPG_URL}"\n'
|
|
|
|
for channel in channels:
|
|
if 'urls' in channel: # Check if channel has URLs
|
|
# Find first working stream
|
|
working_url = find_working_stream(validator, channel['urls'])
|
|
|
|
if working_url:
|
|
# Add channel to playlist
|
|
m3u8_content += f'#EXTINF:-1 tvg-id="{channel.get("tvg-id", "")}" '
|
|
m3u8_content += f'tvg-name="{channel.get("tvg-name", "")}" '
|
|
m3u8_content += f'tvg-logo="{channel.get("tvg-logo", "")}" '
|
|
m3u8_content += f'group-title="{channel.get("group-title", "")}", '
|
|
m3u8_content += f'{channel.get("name", "")}\n'
|
|
m3u8_content += f'{working_url}\n'
|
|
else:
|
|
# Log dead channel
|
|
logging.info(f'Dead channel: {channel.get("name", "Unknown")} - No working streams found')
|
|
|
|
# Write playlist file
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
f.write(m3u8_content)
|
|
|
|
def upload_playlist(file_path):
|
|
"""Uploads playlist file to IPTV server using HTTP Basic Auth"""
|
|
try:
|
|
with open(file_path, 'rb') as f:
|
|
response = requests.post(
|
|
IPTV_SERVER_URL + '/admin/playlist',
|
|
auth=requests.auth.HTTPBasicAuth(IPTV_SERVER_ADMIN_USER, IPTV_SERVER_ADMIN_PASSWORD),
|
|
files={'file': (os.path.basename(file_path), f)}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
print("Playlist successfully uploaded to server")
|
|
else:
|
|
print(f"Upload failed ({response.status_code}): {response.text}")
|
|
except Exception as e:
|
|
print(f"Upload error: {str(e)}")
|
|
|
|
def main():
|
|
args = parse_arguments()
|
|
channels_file = args.channels
|
|
output_file = args.output
|
|
dead_channels_log_file = args.dead_channels_log
|
|
|
|
# Clear previous log file
|
|
with open(dead_channels_log_file, 'w') as f:
|
|
f.write(f'Log created on {datetime.now()}\n')
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
filename=dead_channels_log_file,
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
|
|
# Create playlist
|
|
create_playlist(channels_file, output_file)
|
|
|
|
#upload playlist to server
|
|
upload_playlist(output_file)
|
|
|
|
print("Playlist creation completed!")
|
|
|
|
if __name__ == "__main__":
|
|
main() |