174 lines
5.4 KiB
Python
174 lines
5.4 KiB
Python
import argparse
|
|
import gzip
|
|
import json
|
|
import os
|
|
import re
|
|
import xml.etree.ElementTree as ET
|
|
|
|
import requests
|
|
from utils.constants import (
|
|
IPTV_SERVER_ADMIN_PASSWORD,
|
|
IPTV_SERVER_ADMIN_USER,
|
|
IPTV_SERVER_URL,
|
|
)
|
|
|
|
|
|
def parse_arguments():
|
|
parser = argparse.ArgumentParser(description="EPG Grabber")
|
|
parser.add_argument(
|
|
"--playlist",
|
|
default=os.path.join(
|
|
os.path.dirname(os.path.dirname(__file__)), "playlist.m3u8"
|
|
),
|
|
help="Path to playlist file",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
default=os.path.join(os.path.dirname(os.path.dirname(__file__)), "epg.xml"),
|
|
help="Path to output EPG XML file",
|
|
)
|
|
parser.add_argument(
|
|
"--epg-sources",
|
|
default=os.path.join(
|
|
os.path.dirname(os.path.dirname(__file__)), "epg_sources.json"
|
|
),
|
|
help="Path to EPG sources JSON configuration file",
|
|
)
|
|
parser.add_argument(
|
|
"--save-as-gz",
|
|
action="store_true",
|
|
default=True,
|
|
help="Save an additional gzipped version of the EPG file",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def load_epg_sources(config_path):
|
|
"""Load EPG sources from JSON configuration file"""
|
|
try:
|
|
with open(config_path, encoding="utf-8") as f:
|
|
config = json.load(f)
|
|
return config.get("epg_sources", [])
|
|
except (FileNotFoundError, json.JSONDecodeError) as e:
|
|
print(f"Error loading EPG sources: {e}")
|
|
return []
|
|
|
|
|
|
def get_tvg_ids(playlist_path):
|
|
"""
|
|
Extracts unique tvg-id values from an M3U playlist file.
|
|
|
|
Args:
|
|
playlist_path (str): Path to the M3U playlist file.
|
|
|
|
Returns:
|
|
list[str]: A list of unique tvg-id strings.
|
|
"""
|
|
unique_tvg_ids = set()
|
|
# Regular expression to find tvg-id="<value>" and capture the value
|
|
# It looks for tvg-id= followed by a double quote,
|
|
# then captures any characters that are NOT a double quote (non-greedy),
|
|
# and ends with a double quote.
|
|
tvg_id_pattern = re.compile(r'tvg-id="([^"]*)"')
|
|
|
|
with open(playlist_path, encoding="utf-8") as file:
|
|
for line in file:
|
|
if line.startswith("#EXTINF"):
|
|
# Search for the tvg-id pattern in the line
|
|
match = tvg_id_pattern.search(line)
|
|
if match:
|
|
# Extract the captured group (the value inside the quotes)
|
|
tvg_id = match.group(1)
|
|
if tvg_id: # Ensure the extracted id is not empty
|
|
unique_tvg_ids.add(tvg_id)
|
|
|
|
return list(unique_tvg_ids)
|
|
|
|
|
|
def fetch_and_extract_xml(url):
|
|
response = requests.get(url)
|
|
if response.status_code != 200:
|
|
print(f"Failed to fetch {url}")
|
|
return None
|
|
|
|
if url.endswith(".gz"):
|
|
try:
|
|
decompressed_data = gzip.decompress(response.content)
|
|
return ET.fromstring(decompressed_data)
|
|
except Exception as e:
|
|
print(f"Failed to decompress and parse XML from {url}: {e}")
|
|
return None
|
|
else:
|
|
try:
|
|
return ET.fromstring(response.content)
|
|
except Exception as e:
|
|
print(f"Failed to parse XML from {url}: {e}")
|
|
return None
|
|
|
|
|
|
def filter_and_build_epg(urls, tvg_ids, output_file, save_as_gz=True):
|
|
root = ET.Element("tv")
|
|
|
|
for url in urls:
|
|
epg_data = fetch_and_extract_xml(url)
|
|
if epg_data is None:
|
|
continue
|
|
|
|
for channel in epg_data.findall("channel"):
|
|
tvg_id = channel.get("id")
|
|
if tvg_id in tvg_ids:
|
|
root.append(channel)
|
|
|
|
for programme in epg_data.findall("programme"):
|
|
tvg_id = programme.get("channel")
|
|
if tvg_id in tvg_ids:
|
|
root.append(programme)
|
|
|
|
tree = ET.ElementTree(root)
|
|
tree.write(output_file, encoding="utf-8", xml_declaration=True)
|
|
print(f"New EPG saved to {output_file}")
|
|
|
|
if save_as_gz:
|
|
output_file_gz = output_file + ".gz"
|
|
with gzip.open(output_file_gz, "wb") as f:
|
|
tree.write(f, encoding="utf-8", xml_declaration=True)
|
|
print(f"New EPG saved to {output_file_gz}")
|
|
|
|
|
|
def upload_epg(file_path):
|
|
"""Uploads gzipped EPG file to IPTV server using HTTP Basic Auth"""
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
response = requests.post(
|
|
IPTV_SERVER_URL + "/admin/epg",
|
|
auth=requests.auth.HTTPBasicAuth(
|
|
IPTV_SERVER_ADMIN_USER, IPTV_SERVER_ADMIN_PASSWORD
|
|
),
|
|
files={"file": (os.path.basename(file_path), f)},
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
print("EPG successfully uploaded to server")
|
|
else:
|
|
print(f"Upload failed ({response.status_code}): {response.text}")
|
|
except Exception as e:
|
|
print(f"Upload error: {str(e)}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_arguments()
|
|
playlist_file = args.playlist
|
|
output_file = args.output
|
|
|
|
tvg_ids = get_tvg_ids(playlist_file)
|
|
urls = load_epg_sources(args.epg_sources)
|
|
|
|
if not urls:
|
|
print("No EPG URLs loaded - check configuration file")
|
|
exit(1)
|
|
|
|
filter_and_build_epg(urls, tvg_ids, output_file, args.save_as_gz)
|
|
|
|
if args.save_as_gz:
|
|
upload_epg(output_file + ".gz")
|