import json import logging import os import pprint import re import sys import dataclasses from typing import Literal, cast import pydantic import aiohttp import streamlink from tornado import template logging.basicConfig( format="[%(filename)s:%(lineno)d] %(message)s", stream=sys.stdout, level=logging.INFO, ) logger = logging.getLogger(__name__) providers: dict[str, str] = {} providers["nrk"] = "https://tv.nrk.no" providers["nrk_web"] = "https://nrk.no" providers["svt"] = "https://svtplay.se" providers["youtube"] = "https://www.youtube.com/watch?v=" providers["twitch"] = "https://twitch.tv" providers["twitter"] = "https://twitter.com" nextcloud_server = os.environ.get("NEXTCLOUD_SERVER") if nextcloud_server is not None: providers["nextcloud"] = nextcloud_server seafile_server = os.environ.get("SEAFILE_SERVER") if seafile_server is not None: providers["seafile"] = seafile_server playlist = None icecast_server = os.environ.get("ICECAST_SERVER") stream_server = os.environ.get("STREAM_SERVER") proxy_server = os.environ.get("PROXY_SERVER") @dataclasses.dataclass class LinkWithType: upstream: str download: bool = False ctype: None | str = None class ProxyCreateLink(pydantic.BaseModel): upstream: None | pydantic.HttpUrl download: None | pydantic.StrictBool ctype: None | pydantic.StrictStr region: None | pydantic.StrictStr class ProxyRequest(pydantic.BaseModel): action: Literal["create-urls", "read-config"] urls: None | list[ProxyCreateLink] class ProxyResponse(pydantic.BaseModel): action: Literal["create-urls", "read-config"] urls: None | list[pydantic.HttpUrl] class ProxyElem: def __init__(self, proxy, region): self.proxy = proxy self.region = region def local(self): timeout = aiohttp.ClientTimeout(total=5) return aiohttp.ClientSession(timeout=timeout) def __repr__(self): return f" proxy=<{str(self.proxy)}>>" async def proxy_url(self, urls: list[LinkWithType]): clean_urls = [] link_requests: list[ProxyCreateLink] = [] for url in urls: clean_urls.append(url.upstream if isinstance(url.upstream, str) else None) if not isinstance(proxy_server, str): return clean_urls try: for url in urls: link_requests.append( ProxyCreateLink.parse_obj( { "upstream": url.upstream, "ctype": url.ctype, "download": url.download, "region": self.region, } ) ) except pydantic.ValidationError as e: logger.info(e) return clean_urls request_data: ProxyRequest response_data: ProxyResponse try: request_data = ProxyRequest.parse_obj( {"urls": link_requests, "action": "create-urls"} ) except pydantic.ValidationError as e: logger.info(e) return clean_urls try: async with self.local() as session: logger.log(request_data.json()) logger.log(request_data) resp = await session.post(proxy_server, json=request_data.dict()) response_data = cast( ProxyResponse, ProxyResponse.parse_raw(await resp.text()) ) except (aiohttp.ClientError, pydantic.ValidationError) as e: logger.info(e) else: ret_data: list[None | str] = [] if response_data.urls is not None: for src, dst in zip(clean_urls, response_data.urls): if isinstance(src, str): ret_data.append(dst) else: ret_data.append(None) return ret_data return clean_urls proxies: dict[str, list[ProxyElem]] = {} new_providers = {} for key in providers: region_expr = re.compile(f"^{key}_region(_[a-z][a-z])?[0-9]+$", re.IGNORECASE) region_matches = list(filter(region_expr.match, os.environ.keys())) proxy_expr = re.compile(f"^{key}_proxy(_[a-z][a-z])?[0-9]+$", re.IGNORECASE) proxy_matches = list(filter(proxy_expr.match, os.environ.keys())) proxy_current = [] proxy_current_keys = set() proxy_current_keys.add(key) proxy_countries = [] proxy_empty = True region_current = [] region_current_keys = set() region_current_keys.add(key) region_countries = [] region_empty = True for match in proxy_matches: proxy_match = proxy_expr.match(match.lower()) if proxy_match is None: continue proxy_country_groups = proxy_match.groups() proxy_country = None pos = len(proxy_country_groups) - 1 if pos >= 0: country_temp = proxy_country_groups[pos] if isinstance(country_temp, str): proxy_country = country_temp.strip("_") proxy_current_keys.add(f"{key}_{proxy_country}") proxy_link = os.environ.get(match) if proxy_link is not None: proxy_current.append(proxy_link) proxy_countries.append(proxy_country) if proxy_country is None: proxy_empty = False for match in region_matches: region_match = region_expr.match(match.lower()) if region_match is None: continue region_country_groups = region_match.groups() region_country = None pos = len(region_country_groups) - 1 if pos >= 0: country_temp = region_country_groups[pos] if isinstance(country_temp, str): region_country = country_temp.strip("_") region_current_keys.add(f"{key}_{region_country}") region_name = os.environ.get(match) if region_name is not None: region_current.append(region_name) region_countries.append(region_country) if region_country is None: region_empty = False for elem in proxy_current_keys: proxies[elem] = [] new_providers[elem] = providers[key] for proxy_link, region_name, country in zip( proxy_current, region_current, proxy_countries ): new_key = key if country is not None: new_key = f"{key}_{country}" proxies[new_key].append(ProxyElem(proxy_link, region_name)) for elem in proxy_current_keys: if len(proxies[elem]) == 0: proxies[elem].append(ProxyElem(None, None)) pprint.pp(proxies) providers = new_providers streamlink_sessions: dict[str, streamlink.session.Streamlink] = {} streamlink_default_session = streamlink.Streamlink() proxy_keys: list[str] = [] for proxy_provider in proxies.values(): for proxy_elem in proxy_provider: if isinstance(proxy_elem, ProxyElem) and isinstance(proxy_elem.proxy, str): proxy_keys.append(proxy_elem.proxy) streamlink_sessions[proxy_elem.proxy] = streamlink.Streamlink() streamlink_sessions[proxy_elem.proxy].set_option( "http-proxy", "socks5://" + proxy_elem.proxy ) streamlink_sessions[proxy_elem.proxy].set_option( "https-proxy", "socks5://" + proxy_elem.proxy ) if icecast_server is not None and stream_server is not None: try: with open("/app/sources.json", "r", encoding="utf-8") as f: data = json.loads(f.read()) playlist = "#EXTM3U\n" for key in data: current = data[key] name = current["name"] radio = current["radio"] if radio: playlist += f'#EXTINF:0 radio="true", {name}\n' playlist += icecast_server + key + "\n" else: playlist += f'#EXTINF:0 radio="false", {name}\n' playlist += stream_server + key + "\n" except OSError as e: logger.info(e) template_html = None template_script = None videojs_version = None font_awesome_version = None custom_style = None favicon = None try: with open("/app/index.html", "r", encoding="utf-8") as f: template_html = template.Template(f.read().strip()) with open("/app/script.js", "r", encoding="utf-8") as f: template_script = template.Template(f.read().strip()) with open("/app/version/video.js.txt", "r", encoding="utf-8") as f: videojs_version = f.read().strip() with open("/app/version/chromecast.txt", "r", encoding="utf-8") as f: chromecast_version = f.read().strip() with open("/app/version/font-awesome.txt", "r", encoding="utf-8") as f: font_awesome_version = f.read().strip() with open("/app/favicon.png", "rb") as fb: favicon = fb.read() with open("/app/style.css", "r", encoding="utf-8") as f: custom_style = f.read() except OSError as e: logger.info(e)