import json import logging import os import pprint import re import sys import aiohttp import streamlink from tornado import template logging.basicConfig( format="[%(filename)s:%(lineno)d] %(message)s", stream=sys.stdout, level=logging.INFO, ) logger = logging.getLogger(__name__) providers: dict[str, str] = {} providers["nrk"] = "https://tv.nrk.no" providers["nrk_web"] = "https://nrk.no" providers["svt"] = "https://svtplay.se" providers["youtube"] = "https://www.youtube.com/watch?v=" providers["twitch"] = "https://twitch.tv" providers["twitter"] = "https://twitter.com" nextcloud_server = os.environ.get("NEXTCLOUD_SERVER") if nextcloud_server is not None: providers["nextcloud"] = nextcloud_server seafile_server = os.environ.get("SEAFILE_SERVER") if seafile_server is not None: providers["seafile"] = seafile_server playlist = None icecast_server = os.environ.get("ICECAST_SERVER") stream_server = os.environ.get("STREAM_SERVER") proxy_server = os.environ.get("PROXY_SERVER") class ProxyElem: def __init__(self, proxy, region): self.proxy = proxy self.region = region def local(self): timeout = aiohttp.ClientTimeout(total=5) return aiohttp.ClientSession(timeout=timeout) def __repr__(self): return f" proxy=<{str(self.proxy)}>>" async def proxy_url(self, urls): clean_urls = [] for url in urls: if isinstance(url, tuple): clean_urls.append(url[0]) else: clean_urls.append(url) if not isinstance(proxy_server, str): return clean_urls jdata = None data_list = [] for url in urls: data = {} if isinstance(url, str): data["upstream"] = url elif isinstance(url, tuple): if isinstance(url[0], str): data["upstream"] = url[0] if isinstance(url[1], str): data["ctype"] = url[1] data["region"] = self.region data_list.append(data) try: async with self.local() as session: resp = await session.post(proxy_server, json=data_list) text = await resp.text() jdata = json.loads(text) except Exception as e: logger.info(e) if isinstance(jdata, list): ret_data = [] for src, dst in zip(clean_urls, jdata): if isinstance(src, str): ret_data.append(dst) else: ret_data.append(None) return ret_data else: return clean_urls proxies: dict[str, list[ProxyElem]] = {} new_providers = {} for key in providers: region_expr = re.compile(f"^{key}_region(_[a-z][a-z])?[0-9]+$", re.IGNORECASE) region_matches = list(filter(region_expr.match, os.environ.keys())) proxy_expr = re.compile(f"^{key}_proxy(_[a-z][a-z])?[0-9]+$", re.IGNORECASE) proxy_matches = list(filter(proxy_expr.match, os.environ.keys())) proxy_current = [] proxy_current_keys = set() proxy_current_keys.add(key) proxy_countries = [] proxy_empty = True region_current = [] region_current_keys = set() region_current_keys.add(key) region_countries = [] region_empty = True for match in proxy_matches: proxy_match = proxy_expr.match(match.lower()) if proxy_match is None: continue proxy_country_groups = proxy_match.groups() proxy_country = None pos = len(proxy_country_groups) - 1 if pos >= 0: country_temp = proxy_country_groups[pos] if isinstance(country_temp, str): proxy_country = country_temp.strip("_") proxy_current_keys.add(f"{key}_{proxy_country}") proxy_link = os.environ.get(match) if proxy_link is not None: proxy_current.append(proxy_link) proxy_countries.append(proxy_country) if proxy_country is None: proxy_empty = False for match in region_matches: region_match = region_expr.match(match.lower()) if region_match is None: continue region_country_groups = region_match.groups() region_country = None pos = len(region_country_groups) - 1 if pos >= 0: country_temp = region_country_groups[pos] if isinstance(country_temp, str): region_country = country_temp.strip("_") region_current_keys.add(f"{key}_{region_country}") region_name = os.environ.get(match) if region_name is not None: region_current.append(region_name) region_countries.append(region_country) if region_country is None: region_empty = False for elem in proxy_current_keys: proxies[elem] = [] new_providers[elem] = providers[key] for proxy_link, region_name, country in zip( proxy_current, region_current, proxy_countries ): new_key = key if country is not None: new_key = f"{key}_{country}" proxies[new_key].append(ProxyElem(proxy_link, region_name)) for elem in proxy_current_keys: if len(proxies[elem]) == 0: proxies[elem].append(ProxyElem(None, None)) pprint.pp(proxies) providers = new_providers streamlink_sessions: dict[str, streamlink.session.Streamlink] = {} streamlink_default_session = streamlink.Streamlink() proxy_keys: list[str] = [] for proxy_provider in proxies.values(): for proxy_elem in proxy_provider: if isinstance(proxy_elem, ProxyElem) and isinstance(proxy_elem.proxy, str): proxy_keys.append(proxy_elem.proxy) streamlink_sessions[proxy_elem.proxy] = streamlink.Streamlink() streamlink_sessions[proxy_elem.proxy].set_option( "http-proxy", "socks5://" + proxy_elem.proxy ) streamlink_sessions[proxy_elem.proxy].set_option( "https-proxy", "socks5://" + proxy_elem.proxy ) if icecast_server is not None and stream_server is not None: try: with open("/app/sources.json", "r", encoding="utf-8") as f: data = json.loads(f.read()) playlist = "#EXTM3U\n" for key in data: current = data[key] name = current["name"] radio = current["radio"] if radio: playlist += f'#EXTINF:0 radio="true", {name}\n' playlist += icecast_server + key + "\n" else: playlist += f'#EXTINF:0 radio="false", {name}\n' playlist += stream_server + key + "\n" except OSError as e: logger.info(e) template_html = None template_script = None videojs_version = None font_awesome_version = None custom_style = None favicon = None try: with open("/app/index.html", "r", encoding="utf-8") as f: template_html = template.Template(f.read().strip()) with open("/app/script.js", "r", encoding="utf-8") as f: template_script = template.Template(f.read().strip()) with open("/app/version/video.js.txt", "r", encoding="utf-8") as f: videojs_version = f.read().strip() with open("/app/version/chromecast.txt", "r", encoding="utf-8") as f: chromecast_version = f.read().strip() with open("/app/version/font-awesome.txt", "r", encoding="utf-8") as f: font_awesome_version = f.read().strip() with open("/app/favicon.png", "rb") as fb: favicon = fb.read() with open("/app/style.css", "r", encoding="utf-8") as f: custom_style = f.read() except OSError as e: logger.info(e)