Added code that generates playlist and epg
All checks were successful
AWS Deploy on Push / build (push) Successful in 1m14s
All checks were successful
AWS Deploy on Push / build (push) Successful in 1m14s
This commit is contained in:
1
.vscode/settings.json
vendored
1
.vscode/settings.json
vendored
@@ -3,6 +3,7 @@
|
|||||||
"altinstall",
|
"altinstall",
|
||||||
"awscliv",
|
"awscliv",
|
||||||
"certbot",
|
"certbot",
|
||||||
|
"certifi",
|
||||||
"devel",
|
"devel",
|
||||||
"dotenv",
|
"dotenv",
|
||||||
"fastapi",
|
"fastapi",
|
||||||
|
|||||||
0
app/cabletv/__init__.py
Normal file
0
app/cabletv/__init__.py
Normal file
147
app/cabletv/createEpg.py
Normal file
147
app/cabletv/createEpg.py
Normal file
@@ -0,0 +1,147 @@
|
|||||||
|
import os
|
||||||
|
import re
|
||||||
|
import gzip
|
||||||
|
import json
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
import requests
|
||||||
|
import argparse
|
||||||
|
from utils.config import IPTV_SERVER_ADMIN_PASSWORD, IPTV_SERVER_ADMIN_USER, IPTV_SERVER_URL
|
||||||
|
|
||||||
|
def parse_arguments():
|
||||||
|
parser = argparse.ArgumentParser(description='EPG Grabber')
|
||||||
|
parser.add_argument('--playlist',
|
||||||
|
default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'playlist.m3u8'),
|
||||||
|
help='Path to playlist file')
|
||||||
|
parser.add_argument('--output',
|
||||||
|
default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'epg.xml'),
|
||||||
|
help='Path to output EPG XML file')
|
||||||
|
parser.add_argument('--epg-sources',
|
||||||
|
default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'epg_sources.json'),
|
||||||
|
help='Path to EPG sources JSON configuration file')
|
||||||
|
parser.add_argument('--save-as-gz',
|
||||||
|
action='store_true',
|
||||||
|
default=True,
|
||||||
|
help='Save an additional gzipped version of the EPG file')
|
||||||
|
return parser.parse_args()
|
||||||
|
|
||||||
|
def load_epg_sources(config_path):
|
||||||
|
"""Load EPG sources from JSON configuration file"""
|
||||||
|
try:
|
||||||
|
with open(config_path, 'r', encoding='utf-8') as f:
|
||||||
|
config = json.load(f)
|
||||||
|
return config.get('epg_sources', [])
|
||||||
|
except (FileNotFoundError, json.JSONDecodeError) as e:
|
||||||
|
print(f"Error loading EPG sources: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
def get_tvg_ids(playlist_path):
|
||||||
|
"""
|
||||||
|
Extracts unique tvg-id values from an M3U playlist file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
playlist_path (str): Path to the M3U playlist file.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list[str]: A list of unique tvg-id strings.
|
||||||
|
"""
|
||||||
|
unique_tvg_ids = set()
|
||||||
|
# Regular expression to find tvg-id="<value>" and capture the value
|
||||||
|
# It looks for tvg-id= followed by a double quote,
|
||||||
|
# then captures any characters that are NOT a double quote (non-greedy),
|
||||||
|
# and ends with a double quote.
|
||||||
|
tvg_id_pattern = re.compile(r'tvg-id="([^"]*)"')
|
||||||
|
|
||||||
|
with open(playlist_path, 'r', encoding='utf-8') as file:
|
||||||
|
for line in file:
|
||||||
|
if line.startswith('#EXTINF'):
|
||||||
|
# Search for the tvg-id pattern in the line
|
||||||
|
match = tvg_id_pattern.search(line)
|
||||||
|
if match:
|
||||||
|
# Extract the captured group (the value inside the quotes)
|
||||||
|
tvg_id = match.group(1)
|
||||||
|
if tvg_id: # Ensure the extracted id is not empty
|
||||||
|
unique_tvg_ids.add(tvg_id)
|
||||||
|
|
||||||
|
return list(unique_tvg_ids)
|
||||||
|
|
||||||
|
def fetch_and_extract_xml(url):
|
||||||
|
response = requests.get(url)
|
||||||
|
if response.status_code != 200:
|
||||||
|
print(f"Failed to fetch {url}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
if url.endswith('.gz'):
|
||||||
|
try:
|
||||||
|
decompressed_data = gzip.decompress(response.content)
|
||||||
|
return ET.fromstring(decompressed_data)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Failed to decompress and parse XML from {url}: {e}")
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
return ET.fromstring(response.content)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Failed to parse XML from {url}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def filter_and_build_epg(urls, tvg_ids, output_file, save_as_gz=True):
|
||||||
|
root = ET.Element('tv')
|
||||||
|
|
||||||
|
for url in urls:
|
||||||
|
epg_data = fetch_and_extract_xml(url)
|
||||||
|
if epg_data is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for channel in epg_data.findall('channel'):
|
||||||
|
tvg_id = channel.get('id')
|
||||||
|
if tvg_id in tvg_ids:
|
||||||
|
root.append(channel)
|
||||||
|
|
||||||
|
for programme in epg_data.findall('programme'):
|
||||||
|
tvg_id = programme.get('channel')
|
||||||
|
if tvg_id in tvg_ids:
|
||||||
|
root.append(programme)
|
||||||
|
|
||||||
|
tree = ET.ElementTree(root)
|
||||||
|
tree.write(output_file, encoding='utf-8', xml_declaration=True)
|
||||||
|
print(f"New EPG saved to {output_file}")
|
||||||
|
|
||||||
|
if save_as_gz:
|
||||||
|
output_file_gz = output_file + '.gz'
|
||||||
|
with gzip.open(output_file_gz, 'wb') as f:
|
||||||
|
tree.write(f, encoding='utf-8', xml_declaration=True)
|
||||||
|
print(f"New EPG saved to {output_file_gz}")
|
||||||
|
|
||||||
|
def upload_epg(file_path):
|
||||||
|
"""Uploads gzipped EPG file to IPTV server using HTTP Basic Auth"""
|
||||||
|
try:
|
||||||
|
with open(file_path, 'rb') as f:
|
||||||
|
response = requests.post(
|
||||||
|
IPTV_SERVER_URL + '/admin/epg',
|
||||||
|
auth=requests.auth.HTTPBasicAuth(IPTV_SERVER_ADMIN_USER, IPTV_SERVER_ADMIN_PASSWORD),
|
||||||
|
files={'file': (os.path.basename(file_path), f)}
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
print("EPG successfully uploaded to server")
|
||||||
|
else:
|
||||||
|
print(f"Upload failed ({response.status_code}): {response.text}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Upload error: {str(e)}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
args = parse_arguments()
|
||||||
|
playlist_file = args.playlist
|
||||||
|
output_file = args.output
|
||||||
|
|
||||||
|
tvg_ids = get_tvg_ids(playlist_file)
|
||||||
|
urls = load_epg_sources(args.epg_sources)
|
||||||
|
|
||||||
|
if not urls:
|
||||||
|
print("No EPG URLs loaded - check configuration file")
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
filter_and_build_epg(urls, tvg_ids, output_file, args.save_as_gz)
|
||||||
|
|
||||||
|
if args.save_as_gz:
|
||||||
|
upload_epg(output_file + '.gz')
|
||||||
108
app/cabletv/createPlaylist.py
Normal file
108
app/cabletv/createPlaylist.py
Normal file
@@ -0,0 +1,108 @@
|
|||||||
|
import os
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import requests
|
||||||
|
from pathlib import Path
|
||||||
|
from datetime import datetime
|
||||||
|
from utils.check_streams import StreamValidator
|
||||||
|
from utils.config import EPG_URL, IPTV_SERVER_ADMIN_PASSWORD, IPTV_SERVER_ADMIN_USER, IPTV_SERVER_URL
|
||||||
|
|
||||||
|
def parse_arguments():
|
||||||
|
parser = argparse.ArgumentParser(description='IPTV playlist generator')
|
||||||
|
parser.add_argument('--output',
|
||||||
|
default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'playlist.m3u8'),
|
||||||
|
help='Path to output playlist file')
|
||||||
|
parser.add_argument('--channels',
|
||||||
|
default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'channels.json'),
|
||||||
|
help='Path to channels definition JSON file')
|
||||||
|
parser.add_argument('--dead-channels-log',
|
||||||
|
default=os.path.join(os.path.dirname(os.path.dirname(__file__)), 'dead_channels.log'),
|
||||||
|
help='Path to log file to store a list of dead channels')
|
||||||
|
return parser.parse_args()
|
||||||
|
|
||||||
|
def find_working_stream(validator, urls):
|
||||||
|
"""Test all URLs and return the first working one"""
|
||||||
|
for url in urls:
|
||||||
|
valid, message = validator.validate_stream(url)
|
||||||
|
if valid:
|
||||||
|
return url
|
||||||
|
return None
|
||||||
|
|
||||||
|
def create_playlist(channels_file, output_file):
|
||||||
|
# Read channels from JSON file
|
||||||
|
with open(channels_file, 'r', encoding='utf-8') as f:
|
||||||
|
channels = json.load(f)
|
||||||
|
|
||||||
|
# Initialize validator
|
||||||
|
validator = StreamValidator(timeout=45)
|
||||||
|
|
||||||
|
# Prepare M3U8 header
|
||||||
|
m3u8_content = f'#EXTM3U url-tvg="{EPG_URL}"\n'
|
||||||
|
|
||||||
|
for channel in channels:
|
||||||
|
if 'urls' in channel: # Check if channel has URLs
|
||||||
|
# Find first working stream
|
||||||
|
working_url = find_working_stream(validator, channel['urls'])
|
||||||
|
|
||||||
|
if working_url:
|
||||||
|
# Add channel to playlist
|
||||||
|
m3u8_content += f'#EXTINF:-1 tvg-id="{channel.get("tvg-id", "")}" '
|
||||||
|
m3u8_content += f'tvg-name="{channel.get("tvg-name", "")}" '
|
||||||
|
m3u8_content += f'tvg-logo="{channel.get("tvg-logo", "")}" '
|
||||||
|
m3u8_content += f'group-title="{channel.get("group-title", "")}", '
|
||||||
|
m3u8_content += f'{channel.get("name", "")}\n'
|
||||||
|
m3u8_content += f'{working_url}\n'
|
||||||
|
else:
|
||||||
|
# Log dead channel
|
||||||
|
logging.info(f'Dead channel: {channel.get("name", "Unknown")} - No working streams found')
|
||||||
|
|
||||||
|
# Write playlist file
|
||||||
|
with open(output_file, 'w', encoding='utf-8') as f:
|
||||||
|
f.write(m3u8_content)
|
||||||
|
|
||||||
|
def upload_playlist(file_path):
|
||||||
|
"""Uploads playlist file to IPTV server using HTTP Basic Auth"""
|
||||||
|
try:
|
||||||
|
with open(file_path, 'rb') as f:
|
||||||
|
response = requests.post(
|
||||||
|
IPTV_SERVER_URL + '/admin/playlist',
|
||||||
|
auth=requests.auth.HTTPBasicAuth(IPTV_SERVER_ADMIN_USER, IPTV_SERVER_ADMIN_PASSWORD),
|
||||||
|
files={'file': (os.path.basename(file_path), f)}
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
print("Playlist successfully uploaded to server")
|
||||||
|
else:
|
||||||
|
print(f"Upload failed ({response.status_code}): {response.text}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Upload error: {str(e)}")
|
||||||
|
|
||||||
|
def main():
|
||||||
|
args = parse_arguments()
|
||||||
|
channels_file = args.channels
|
||||||
|
output_file = args.output
|
||||||
|
dead_channels_log_file = args.dead_channels_log
|
||||||
|
|
||||||
|
# Clear previous log file
|
||||||
|
with open(dead_channels_log_file, 'w') as f:
|
||||||
|
f.write(f'Log created on {datetime.now()}\n')
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(
|
||||||
|
filename=dead_channels_log_file,
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(message)s',
|
||||||
|
datefmt='%Y-%m-%d %H:%M:%S'
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create playlist
|
||||||
|
create_playlist(channels_file, output_file)
|
||||||
|
|
||||||
|
#upload playlist to server
|
||||||
|
upload_playlist(output_file)
|
||||||
|
|
||||||
|
print("Playlist creation completed!")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
0
app/cabletv/utils/__init__.py
Normal file
0
app/cabletv/utils/__init__.py
Normal file
135
app/cabletv/utils/check_streams.py
Normal file
135
app/cabletv/utils/check_streams.py
Normal file
@@ -0,0 +1,135 @@
|
|||||||
|
import os
|
||||||
|
import argparse
|
||||||
|
import requests
|
||||||
|
import logging
|
||||||
|
from requests.exceptions import RequestException, Timeout, ConnectionError, HTTPError
|
||||||
|
|
||||||
|
class StreamValidator:
|
||||||
|
def __init__(self, timeout=10, user_agent=None):
|
||||||
|
self.timeout = timeout
|
||||||
|
self.session = requests.Session()
|
||||||
|
self.session.headers.update({
|
||||||
|
'User-Agent': user_agent or 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
|
||||||
|
})
|
||||||
|
|
||||||
|
def validate_stream(self, url):
|
||||||
|
"""Validate a media stream URL with multiple fallback checks"""
|
||||||
|
try:
|
||||||
|
headers = {'Range': 'bytes=0-1024'}
|
||||||
|
with self.session.get(
|
||||||
|
url,
|
||||||
|
headers=headers,
|
||||||
|
timeout=self.timeout,
|
||||||
|
stream=True,
|
||||||
|
allow_redirects=True
|
||||||
|
) as response:
|
||||||
|
if response.status_code not in [200, 206]:
|
||||||
|
return False, f"Invalid status code: {response.status_code}"
|
||||||
|
|
||||||
|
content_type = response.headers.get('Content-Type', '')
|
||||||
|
if not self._is_valid_content_type(content_type):
|
||||||
|
return False, f"Invalid content type: {content_type}"
|
||||||
|
|
||||||
|
try:
|
||||||
|
next(response.iter_content(chunk_size=1024))
|
||||||
|
return True, "Stream is valid"
|
||||||
|
except (ConnectionError, Timeout):
|
||||||
|
return False, "Connection failed during content read"
|
||||||
|
|
||||||
|
except HTTPError as e:
|
||||||
|
return False, f"HTTP Error: {str(e)}"
|
||||||
|
except ConnectionError as e:
|
||||||
|
return False, f"Connection Error: {str(e)}"
|
||||||
|
except Timeout:
|
||||||
|
return False, "Connection timeout"
|
||||||
|
except RequestException as e:
|
||||||
|
return False, f"Request Exception: {str(e)}"
|
||||||
|
except Exception as e:
|
||||||
|
return False, f"Validation error: {str(e)}"
|
||||||
|
|
||||||
|
def _is_valid_content_type(self, content_type):
|
||||||
|
valid_types = [
|
||||||
|
'video/mp2t', 'application/vnd.apple.mpegurl',
|
||||||
|
'application/dash+xml', 'video/mp4',
|
||||||
|
'video/webm', 'application/octet-stream',
|
||||||
|
'application/x-mpegURL'
|
||||||
|
]
|
||||||
|
return any(ct in content_type for ct in valid_types)
|
||||||
|
|
||||||
|
def parse_playlist(self, file_path):
|
||||||
|
"""Extract stream URLs from M3U playlist file"""
|
||||||
|
urls = []
|
||||||
|
try:
|
||||||
|
with open(file_path, 'r') as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
if line and not line.startswith('#'):
|
||||||
|
urls.append(line)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error reading playlist file: {str(e)}")
|
||||||
|
raise
|
||||||
|
return urls
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description='Validate streaming URLs from command line arguments or playlist files',
|
||||||
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'sources',
|
||||||
|
nargs='+',
|
||||||
|
help='List of URLs or file paths containing stream URLs'
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--timeout',
|
||||||
|
type=int,
|
||||||
|
default=20,
|
||||||
|
help='Timeout in seconds for stream checks'
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--output',
|
||||||
|
default='deadstreams.txt',
|
||||||
|
help='Output file name for inactive streams'
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||||
|
handlers=[logging.FileHandler('stream_check.log'), logging.StreamHandler()]
|
||||||
|
)
|
||||||
|
|
||||||
|
validator = StreamValidator(timeout=args.timeout)
|
||||||
|
dead_streams = []
|
||||||
|
all_urls = []
|
||||||
|
|
||||||
|
# Process command line arguments
|
||||||
|
for source in args.sources:
|
||||||
|
if os.path.isfile(source):
|
||||||
|
try:
|
||||||
|
all_urls.extend(validator.parse_playlist(source))
|
||||||
|
logging.info(f"Parsed playlist file: {source}")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Failed to process file {source}: {str(e)}")
|
||||||
|
else:
|
||||||
|
all_urls.append(source)
|
||||||
|
|
||||||
|
# Validate all collected URLs
|
||||||
|
for url in all_urls:
|
||||||
|
logging.info(f"Checking: {url}")
|
||||||
|
valid, message = validator.validate_stream(url)
|
||||||
|
status = "ALIVE" if valid else "DEAD"
|
||||||
|
print(f"{url}\nStatus: {status}\nDetails: {message}\n")
|
||||||
|
if not valid:
|
||||||
|
dead_streams.append(url)
|
||||||
|
|
||||||
|
# Save dead streams to file
|
||||||
|
if dead_streams:
|
||||||
|
with open(args.output, 'w') as f:
|
||||||
|
f.write('\n'.join(dead_streams))
|
||||||
|
logging.info(f"Found {len(dead_streams)} dead streams. Saved to {args.output}.")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
15
app/cabletv/utils/config.py
Normal file
15
app/cabletv/utils/config.py
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
import os
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
# Load environment variables from a .env file if it exists
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
IPTV_SERVER_URL = os.getenv("IPTV_SERVER_URL", "https://iptv.fiorinis.com")
|
||||||
|
|
||||||
|
# Super iptv-server admin credentials for basic auth
|
||||||
|
# Reads from environment variables IPTV_SERVER_ADMIN_USER and IPTV_SERVER_ADMIN_PASSWORD
|
||||||
|
IPTV_SERVER_ADMIN_USER = os.getenv("IPTV_SERVER_ADMIN_USER", "admin")
|
||||||
|
IPTV_SERVER_ADMIN_PASSWORD = os.getenv("IPTV_SERVER_ADMIN_PASSWORD", "adminpassword")
|
||||||
|
|
||||||
|
# URL for the EPG XML file to place in the playlist's header
|
||||||
|
EPG_URL = os.getenv("EPG_URL", "https://example.com/epg.xml.gz")
|
||||||
@@ -1,5 +1,7 @@
|
|||||||
fastapi==0.103.2
|
fastapi==0.103.2
|
||||||
aws-cdk-lib>=2.0.0
|
aws-cdk-lib>=2.0.0
|
||||||
constructs>=10.0.0
|
constructs>=10.0.0
|
||||||
|
dotenv==0.9.9
|
||||||
python-dotenv==0.21.1
|
python-dotenv==0.21.1
|
||||||
uvicorn==0.22.0
|
uvicorn==0.22.0
|
||||||
|
requests==2.31.0
|
||||||
Reference in New Issue
Block a user