Files
iptv-manager-service/app/iptv/createPlaylist.py

137 lines
4.0 KiB
Python

import argparse
import json
import logging
import os
from datetime import datetime
import requests
from utils.check_streams import StreamValidator
from utils.constants import (
EPG_URL,
IPTV_SERVER_ADMIN_PASSWORD,
IPTV_SERVER_ADMIN_USER,
IPTV_SERVER_URL,
)
def parse_arguments():
parser = argparse.ArgumentParser(description="IPTV playlist generator")
parser.add_argument(
"--output",
default=os.path.join(
os.path.dirname(os.path.dirname(__file__)), "playlist.m3u8"
),
help="Path to output playlist file",
)
parser.add_argument(
"--channels",
default=os.path.join(
os.path.dirname(os.path.dirname(__file__)), "channels.json"
),
help="Path to channels definition JSON file",
)
parser.add_argument(
"--dead-channels-log",
default=os.path.join(
os.path.dirname(os.path.dirname(__file__)), "dead_channels.log"
),
help="Path to log file to store a list of dead channels",
)
return parser.parse_args()
def find_working_stream(validator, urls):
"""Test all URLs and return the first working one"""
for url in urls:
valid, message = validator.validate_stream(url)
if valid:
return url
return None
def create_playlist(channels_file, output_file):
# Read channels from JSON file
with open(channels_file, encoding="utf-8") as f:
channels = json.load(f)
# Initialize validator
validator = StreamValidator(timeout=45)
# Prepare M3U8 header
m3u8_content = f'#EXTM3U url-tvg="{EPG_URL}"\n'
for channel in channels:
if "urls" in channel: # Check if channel has URLs
# Find first working stream
working_url = find_working_stream(validator, channel["urls"])
if working_url:
# Add channel to playlist
m3u8_content += f'#EXTINF:-1 tvg-id="{channel.get("tvg-id", "")}" '
m3u8_content += f'tvg-name="{channel.get("tvg-name", "")}" '
m3u8_content += f'tvg-logo="{channel.get("tvg-logo", "")}" '
m3u8_content += f'group-title="{channel.get("group-title", "")}", '
m3u8_content += f"{channel.get('name', '')}\n"
m3u8_content += f"{working_url}\n"
else:
# Log dead channel
logging.info(
f"Dead channel: {channel.get('name', 'Unknown')} - "
"No working streams found"
)
# Write playlist file
with open(output_file, "w", encoding="utf-8") as f:
f.write(m3u8_content)
def upload_playlist(file_path):
"""Uploads playlist file to IPTV server using HTTP Basic Auth"""
try:
with open(file_path, "rb") as f:
response = requests.post(
IPTV_SERVER_URL + "/admin/playlist",
auth=requests.auth.HTTPBasicAuth(
IPTV_SERVER_ADMIN_USER, IPTV_SERVER_ADMIN_PASSWORD
),
files={"file": (os.path.basename(file_path), f)},
)
if response.status_code == 200:
print("Playlist successfully uploaded to server")
else:
print(f"Upload failed ({response.status_code}): {response.text}")
except Exception as e:
print(f"Upload error: {str(e)}")
def main():
args = parse_arguments()
channels_file = args.channels
output_file = args.output
dead_channels_log_file = args.dead_channels_log
# Clear previous log file
with open(dead_channels_log_file, "w") as f:
f.write(f"Log created on {datetime.now()}\n")
# Configure logging
logging.basicConfig(
filename=dead_channels_log_file,
level=logging.INFO,
format="%(asctime)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Create playlist
create_playlist(channels_file, output_file)
# upload playlist to server
upload_playlist(output_file)
print("Playlist creation completed!")
if __name__ == "__main__":
main()