Switch to cognito user/password authentication. Major code refactor.
Some checks failed
AWS Deploy on Push / build (push) Failing after 48s

This commit is contained in:
2025-05-16 11:05:54 -05:00
parent 8d1997fa5a
commit c221a8cded
17 changed files with 220 additions and 109 deletions

0
app/iptv/__init__.py Normal file
View File

147
app/iptv/createEpg.py Normal file
View File

@@ -0,0 +1,147 @@
import os
import re
import gzip
import json
import xml.etree.ElementTree as ET
import requests
import argparse
from utils.constants import IPTV_SERVER_ADMIN_PASSWORD, IPTV_SERVER_ADMIN_USER, IPTV_SERVER_URL
def parse_arguments():
parser = argparse.ArgumentParser(description='EPG Grabber')
parser.add_argument('--playlist',
default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'playlist.m3u8'),
help='Path to playlist file')
parser.add_argument('--output',
default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'epg.xml'),
help='Path to output EPG XML file')
parser.add_argument('--epg-sources',
default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'epg_sources.json'),
help='Path to EPG sources JSON configuration file')
parser.add_argument('--save-as-gz',
action='store_true',
default=True,
help='Save an additional gzipped version of the EPG file')
return parser.parse_args()
def load_epg_sources(config_path):
"""Load EPG sources from JSON configuration file"""
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config.get('epg_sources', [])
except (FileNotFoundError, json.JSONDecodeError) as e:
print(f"Error loading EPG sources: {e}")
return []
def get_tvg_ids(playlist_path):
"""
Extracts unique tvg-id values from an M3U playlist file.
Args:
playlist_path (str): Path to the M3U playlist file.
Returns:
list[str]: A list of unique tvg-id strings.
"""
unique_tvg_ids = set()
# Regular expression to find tvg-id="<value>" and capture the value
# It looks for tvg-id= followed by a double quote,
# then captures any characters that are NOT a double quote (non-greedy),
# and ends with a double quote.
tvg_id_pattern = re.compile(r'tvg-id="([^"]*)"')
with open(playlist_path, 'r', encoding='utf-8') as file:
for line in file:
if line.startswith('#EXTINF'):
# Search for the tvg-id pattern in the line
match = tvg_id_pattern.search(line)
if match:
# Extract the captured group (the value inside the quotes)
tvg_id = match.group(1)
if tvg_id: # Ensure the extracted id is not empty
unique_tvg_ids.add(tvg_id)
return list(unique_tvg_ids)
def fetch_and_extract_xml(url):
response = requests.get(url)
if response.status_code != 200:
print(f"Failed to fetch {url}")
return None
if url.endswith('.gz'):
try:
decompressed_data = gzip.decompress(response.content)
return ET.fromstring(decompressed_data)
except Exception as e:
print(f"Failed to decompress and parse XML from {url}: {e}")
return None
else:
try:
return ET.fromstring(response.content)
except Exception as e:
print(f"Failed to parse XML from {url}: {e}")
return None
def filter_and_build_epg(urls, tvg_ids, output_file, save_as_gz=True):
root = ET.Element('tv')
for url in urls:
epg_data = fetch_and_extract_xml(url)
if epg_data is None:
continue
for channel in epg_data.findall('channel'):
tvg_id = channel.get('id')
if tvg_id in tvg_ids:
root.append(channel)
for programme in epg_data.findall('programme'):
tvg_id = programme.get('channel')
if tvg_id in tvg_ids:
root.append(programme)
tree = ET.ElementTree(root)
tree.write(output_file, encoding='utf-8', xml_declaration=True)
print(f"New EPG saved to {output_file}")
if save_as_gz:
output_file_gz = output_file + '.gz'
with gzip.open(output_file_gz, 'wb') as f:
tree.write(f, encoding='utf-8', xml_declaration=True)
print(f"New EPG saved to {output_file_gz}")
def upload_epg(file_path):
"""Uploads gzipped EPG file to IPTV server using HTTP Basic Auth"""
try:
with open(file_path, 'rb') as f:
response = requests.post(
IPTV_SERVER_URL + '/admin/epg',
auth=requests.auth.HTTPBasicAuth(IPTV_SERVER_ADMIN_USER, IPTV_SERVER_ADMIN_PASSWORD),
files={'file': (os.path.basename(file_path), f)}
)
if response.status_code == 200:
print("EPG successfully uploaded to server")
else:
print(f"Upload failed ({response.status_code}): {response.text}")
except Exception as e:
print(f"Upload error: {str(e)}")
if __name__ == "__main__":
args = parse_arguments()
playlist_file = args.playlist
output_file = args.output
tvg_ids = get_tvg_ids(playlist_file)
urls = load_epg_sources(args.epg_sources)
if not urls:
print("No EPG URLs loaded - check configuration file")
exit(1)
filter_and_build_epg(urls, tvg_ids, output_file, args.save_as_gz)
if args.save_as_gz:
upload_epg(output_file + '.gz')

108
app/iptv/createPlaylist.py Normal file
View File

@@ -0,0 +1,108 @@
import os
import argparse
import json
import logging
import requests
from pathlib import Path
from datetime import datetime
from utils.check_streams import StreamValidator
from utils.constants import EPG_URL, IPTV_SERVER_ADMIN_PASSWORD, IPTV_SERVER_ADMIN_USER, IPTV_SERVER_URL
def parse_arguments():
parser = argparse.ArgumentParser(description='IPTV playlist generator')
parser.add_argument('--output',
default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'playlist.m3u8'),
help='Path to output playlist file')
parser.add_argument('--channels',
default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'channels.json'),
help='Path to channels definition JSON file')
parser.add_argument('--dead-channels-log',
default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'dead_channels.log'),
help='Path to log file to store a list of dead channels')
return parser.parse_args()
def find_working_stream(validator, urls):
"""Test all URLs and return the first working one"""
for url in urls:
valid, message = validator.validate_stream(url)
if valid:
return url
return None
def create_playlist(channels_file, output_file):
# Read channels from JSON file
with open(channels_file, 'r', encoding='utf-8') as f:
channels = json.load(f)
# Initialize validator
validator = StreamValidator(timeout=45)
# Prepare M3U8 header
m3u8_content = f'#EXTM3U url-tvg="{EPG_URL}"\n'
for channel in channels:
if 'urls' in channel: # Check if channel has URLs
# Find first working stream
working_url = find_working_stream(validator, channel['urls'])
if working_url:
# Add channel to playlist
m3u8_content += f'#EXTINF:-1 tvg-id="{channel.get("tvg-id", "")}" '
m3u8_content += f'tvg-name="{channel.get("tvg-name", "")}" '
m3u8_content += f'tvg-logo="{channel.get("tvg-logo", "")}" '
m3u8_content += f'group-title="{channel.get("group-title", "")}", '
m3u8_content += f'{channel.get("name", "")}\n'
m3u8_content += f'{working_url}\n'
else:
# Log dead channel
logging.info(f'Dead channel: {channel.get("name", "Unknown")} - No working streams found')
# Write playlist file
with open(output_file, 'w', encoding='utf-8') as f:
f.write(m3u8_content)
def upload_playlist(file_path):
"""Uploads playlist file to IPTV server using HTTP Basic Auth"""
try:
with open(file_path, 'rb') as f:
response = requests.post(
IPTV_SERVER_URL + '/admin/playlist',
auth=requests.auth.HTTPBasicAuth(IPTV_SERVER_ADMIN_USER, IPTV_SERVER_ADMIN_PASSWORD),
files={'file': (os.path.basename(file_path), f)}
)
if response.status_code == 200:
print("Playlist successfully uploaded to server")
else:
print(f"Upload failed ({response.status_code}): {response.text}")
except Exception as e:
print(f"Upload error: {str(e)}")
def main():
args = parse_arguments()
channels_file = args.channels
output_file = args.output
dead_channels_log_file = args.dead_channels_log
# Clear previous log file
with open(dead_channels_log_file, 'w') as f:
f.write(f'Log created on {datetime.now()}\n')
# Configure logging
logging.basicConfig(
filename=dead_channels_log_file,
level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Create playlist
create_playlist(channels_file, output_file)
#upload playlist to server
upload_playlist(output_file)
print("Playlist creation completed!")
if __name__ == "__main__":
main()