import json import logging import os import pprint import re import sys from typing import Literal, Optional, cast import pydantic import aiohttp import streamlink from tornado import template logging.basicConfig( format="[%(filename)s:%(lineno)d] %(message)s", stream=sys.stdout, level=logging.INFO, ) logger = logging.getLogger(__name__) providers: dict[str, str] = {} providers["nrk"] = "https://tv.nrk.no" providers["nrk_web"] = "https://nrk.no" providers["svt"] = "https://svtplay.se" providers["youtube"] = "https://www.youtube.com/watch?v=" providers["twitch"] = "https://twitch.tv" providers["twitter"] = "https://twitter.com" nextcloud_server = os.environ.get("NEXTCLOUD_SERVER") if nextcloud_server is not None: providers["nextcloud"] = nextcloud_server seafile_server = os.environ.get("SEAFILE_SERVER") if seafile_server is not None: providers["seafile"] = seafile_server playlist = None icecast_server = os.environ.get("ICECAST_SERVER") stream_server = os.environ.get("STREAM_SERVER") proxy_server = os.environ.get("PROXY_SERVER") class ProxyCreateLink(pydantic.BaseModel): upstream: Optional[pydantic.HttpUrl] ctype: Optional[pydantic.StrictStr] region: Optional[pydantic.StrictStr] class ProxyRequest(pydantic.BaseModel): action: Literal["create-urls", "read-config"] urls: Optional[list[ProxyCreateLink]] class ProxyResponse(pydantic.BaseModel): action: Literal["create-urls", "read-config"] urls: Optional[list[pydantic.HttpUrl]] class ProxyElem: def __init__(self, proxy, region): self.proxy = proxy self.region = region def local(self): timeout = aiohttp.ClientTimeout(total=5) return aiohttp.ClientSession(timeout=timeout) def __repr__(self): return f" proxy=<{str(self.proxy)}>>" async def proxy_url(self, urls): clean_urls = [] link_requests: list[ProxyCreateLink] = [] for url in urls: if isinstance(url, tuple): clean_urls.append(url[0]) else: clean_urls.append(url) if not isinstance(proxy_server, str): return clean_urls try: for url in urls: if isinstance(url, tuple): link_requests.append( ProxyCreateLink.parse_obj( {"upstream": url[0], "ctype": url[1], "region": self.region} ) ) else: link_requests.append( ProxyCreateLink.parse_obj( {"upstream": url, "region": self.region} ) ) except pydantic.ValidationError as e: logger.info(e) return clean_urls response_data: ProxyRequest response_data: ProxyResponse try: request_data = ProxyRequest.parse_obj( {"urls": link_requests, "action": "create-urls"} ) except pydantic.ValidationError as e: logger.info(e) return clean_urls try: async with self.local() as session: resp = await session.post(proxy_server, json=request_data) response_data = cast( ProxyResponse, ProxyResponse.parse_obj(await resp.text()) ) except (aiohttp.ClientError, pydantic.ValidationError) as e: logger.info(e) else: ret_data = [] if response_data.urls is not None: for src, dst in zip(clean_urls, response_data.urls): if isinstance(src, str): ret_data.append(dst) else: ret_data.append(None) return ret_data return clean_urls proxies: dict[str, list[ProxyElem]] = {} new_providers = {} for key in providers: region_expr = re.compile(f"^{key}_region(_[a-z][a-z])?[0-9]+$", re.IGNORECASE) region_matches = list(filter(region_expr.match, os.environ.keys())) proxy_expr = re.compile(f"^{key}_proxy(_[a-z][a-z])?[0-9]+$", re.IGNORECASE) proxy_matches = list(filter(proxy_expr.match, os.environ.keys())) proxy_current = [] proxy_current_keys = set() proxy_current_keys.add(key) proxy_countries = [] proxy_empty = True region_current = [] region_current_keys = set() region_current_keys.add(key) region_countries = [] region_empty = True for match in proxy_matches: proxy_match = proxy_expr.match(match.lower()) if proxy_match is None: continue proxy_country_groups = proxy_match.groups() proxy_country = None pos = len(proxy_country_groups) - 1 if pos >= 0: country_temp = proxy_country_groups[pos] if isinstance(country_temp, str): proxy_country = country_temp.strip("_") proxy_current_keys.add(f"{key}_{proxy_country}") proxy_link = os.environ.get(match) if proxy_link is not None: proxy_current.append(proxy_link) proxy_countries.append(proxy_country) if proxy_country is None: proxy_empty = False for match in region_matches: region_match = region_expr.match(match.lower()) if region_match is None: continue region_country_groups = region_match.groups() region_country = None pos = len(region_country_groups) - 1 if pos >= 0: country_temp = region_country_groups[pos] if isinstance(country_temp, str): region_country = country_temp.strip("_") region_current_keys.add(f"{key}_{region_country}") region_name = os.environ.get(match) if region_name is not None: region_current.append(region_name) region_countries.append(region_country) if region_country is None: region_empty = False for elem in proxy_current_keys: proxies[elem] = [] new_providers[elem] = providers[key] for proxy_link, region_name, country in zip( proxy_current, region_current, proxy_countries ): new_key = key if country is not None: new_key = f"{key}_{country}" proxies[new_key].append(ProxyElem(proxy_link, region_name)) for elem in proxy_current_keys: if len(proxies[elem]) == 0: proxies[elem].append(ProxyElem(None, None)) pprint.pp(proxies) providers = new_providers streamlink_sessions: dict[str, streamlink.session.Streamlink] = {} streamlink_default_session = streamlink.Streamlink() proxy_keys: list[str] = [] for proxy_provider in proxies.values(): for proxy_elem in proxy_provider: if isinstance(proxy_elem, ProxyElem) and isinstance(proxy_elem.proxy, str): proxy_keys.append(proxy_elem.proxy) streamlink_sessions[proxy_elem.proxy] = streamlink.Streamlink() streamlink_sessions[proxy_elem.proxy].set_option( "http-proxy", "socks5://" + proxy_elem.proxy ) streamlink_sessions[proxy_elem.proxy].set_option( "https-proxy", "socks5://" + proxy_elem.proxy ) if icecast_server is not None and stream_server is not None: try: with open("/app/sources.json", "r", encoding="utf-8") as f: data = json.loads(f.read()) playlist = "#EXTM3U\n" for key in data: current = data[key] name = current["name"] radio = current["radio"] if radio: playlist += f'#EXTINF:0 radio="true", {name}\n' playlist += icecast_server + key + "\n" else: playlist += f'#EXTINF:0 radio="false", {name}\n' playlist += stream_server + key + "\n" except OSError as e: logger.info(e) template_html = None template_script = None videojs_version = None font_awesome_version = None custom_style = None favicon = None try: with open("/app/index.html", "r", encoding="utf-8") as f: template_html = template.Template(f.read().strip()) with open("/app/script.js", "r", encoding="utf-8") as f: template_script = template.Template(f.read().strip()) with open("/app/version/video.js.txt", "r", encoding="utf-8") as f: videojs_version = f.read().strip() with open("/app/version/chromecast.txt", "r", encoding="utf-8") as f: chromecast_version = f.read().strip() with open("/app/version/font-awesome.txt", "r", encoding="utf-8") as f: font_awesome_version = f.read().strip() with open("/app/favicon.png", "rb") as fb: favicon = fb.read() with open("/app/style.css", "r", encoding="utf-8") as f: custom_style = f.read() except OSError as e: logger.info(e)