import argparse import logging import os import requests from requests.exceptions import ConnectionError, HTTPError, RequestException, Timeout class StreamValidator: def __init__(self, timeout=10, user_agent=None): self.timeout = timeout self.session = requests.Session() self.session.headers.update( { "User-Agent": user_agent or ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36" ) } ) def validate_stream(self, url): """Validate a media stream URL with multiple fallback checks""" try: headers = {"Range": "bytes=0-1024"} with self.session.get( url, headers=headers, timeout=self.timeout, stream=True, allow_redirects=True, ) as response: if response.status_code not in [200, 206]: return False, f"Invalid status code: {response.status_code}" content_type = response.headers.get("Content-Type", "") if not self._is_valid_content_type(content_type): return False, f"Invalid content type: {content_type}" try: next(response.iter_content(chunk_size=1024)) return True, "Stream is valid" except (ConnectionError, Timeout): return False, "Connection failed during content read" except HTTPError as e: return False, f"HTTP Error: {str(e)}" except ConnectionError as e: return False, f"Connection Error: {str(e)}" except Timeout: return False, "Connection timeout" except RequestException as e: return False, f"Request Exception: {str(e)}" except Exception as e: return False, f"Validation error: {str(e)}" def _is_valid_content_type(self, content_type): valid_types = [ "video/mp2t", "application/vnd.apple.mpegurl", "application/dash+xml", "video/mp4", "video/webm", "application/octet-stream", "application/x-mpegURL", ] if content_type is None: return False return any(ct in content_type for ct in valid_types) def parse_playlist(self, file_path): """Extract stream URLs from M3U playlist file""" urls = [] try: with open(file_path) as f: for line in f: line = line.strip() if line and not line.startswith("#"): urls.append(line) except Exception as e: logging.error(f"Error reading playlist file: {str(e)}") raise return urls def main(): parser = argparse.ArgumentParser( description=( "Validate streaming URLs from command line arguments or playlist files" ), formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( "sources", nargs="+", help="List of URLs or file paths containing stream URLs" ) parser.add_argument( "--timeout", type=int, default=20, help="Timeout in seconds for stream checks" ) parser.add_argument( "--output", default="deadstreams.txt", help="Output file name for inactive streams", ) args = parser.parse_args() # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler("stream_check.log"), logging.StreamHandler()], ) validator = StreamValidator(timeout=args.timeout) dead_streams = [] all_urls = [] # Process command line arguments for source in args.sources: if os.path.isfile(source): try: all_urls.extend(validator.parse_playlist(source)) logging.info(f"Parsed playlist file: {source}") except Exception as e: logging.error(f"Failed to process file {source}: {str(e)}") else: all_urls.append(source) # Validate all collected URLs for url in all_urls: logging.info(f"Checking: {url}") valid, message = validator.validate_stream(url) status = "ALIVE" if valid else "DEAD" print(f"{url}\nStatus: {status}\nDetails: {message}\n") if not valid: dead_streams.append(url) # Save dead streams to file if dead_streams: with open(args.output, "w") as f: f.write("\n".join(dead_streams)) logging.info(f"Found {len(dead_streams)} dead streams. Saved to {args.output}.") if __name__ == "__main__": main()