#!/usr/bin/env python3 import json import sys import urllib.parse import os import time import base64 import logging import asyncio import tornado.web import tornado.routing import aiohttp import aiohttp_socks import stream_providers import expiringdict logging.basicConfig(format='[%(filename)s:%(lineno)d] %(message)s', stream=sys.stdout, level=logging.INFO) logger = logging.getLogger(__name__) providers = {} providers["nrk"] = "https://tv.nrk.no" providers["svt"] = "https://svtplay.se" providers["youtube"] = "https://www.youtube.com/watch?v=" providers["twitch"] = "https://twitch.tv" nextcloud_server = os.environ.get("NEXTCLOUD_SERVER") if nextcloud_server is not None: providers["nextcloud"] = nextcloud_server playlist = None icecast_server = os.environ.get("ICECAST_SERVER") stream_server = os.environ.get("STREAM_SERVER") proxy_server = os.environ.get("PROXY_SERVER") ctype_cache = expiringdict.ExpiringDict(max_len=128, max_age_seconds=1800) class ProxyElem(): def __init__(self, proxy): self.proxy = proxy def local(self): timeout = aiohttp.ClientTimeout(total=1) return aiohttp.ClientSession(timeout=timeout) def session(self): connector = None if self.proxy is not None: connector = aiohttp_socks.ProxyConnector.from_url("socks5://" + self.proxy) timeout = aiohttp.ClientTimeout(total=2) return aiohttp.ClientSession(connector=connector, timeout=timeout) def __repr__(self): return str(self.proxy) async def proxy_url(self, urls): if not isinstance(proxy_server, str): return urls jdata = None data_list = [] for url in urls: data = {} data["upstream"] = url data["proxy"] = self.proxy data["proxied"] = isinstance(self.proxy, str) data_list.append(data) try: async with self.local() as session: resp = await session.post(proxy_server, json=data_list) text = await resp.text() jdata = json.loads(text) except Exception as e: logger.info(e) if isinstance(jdata, list): ret_data = [] for src, dst in zip(urls, jdata): if isinstance(src, str): ret_data.append(dst) else: ret_data.append(None) return ret_data else: return urls class AsyncSessionData(): def __init__(self, resp, current): self.resp = resp self.current = current class AsyncSession(): def __init__(self, resp, current): self.sdata = AsyncSessionData(resp, current) async def task(self): resp = await self.sdata.resp return AsyncSessionData(resp, self.sdata.current) proxies = {} for key in providers: proxies[key] = [] current = [] for i in range(0,9): proxy = os.environ.get(f'{key}_proxy{i}'.upper()) if proxy is not None: current.append(proxy) if len(current) == 0: proxies[key].append(ProxyElem(None)) else: for proxy in current: proxies[key].append(ProxyElem(proxy)) proxy_keys = [] for proxy_provider in proxies.values(): for proxy in proxy_provider: if isinstance(proxy, ProxyElem) and isinstance(proxy.proxy, str): proxy_keys.append(proxy.proxy) stream_providers.setup(proxy_keys) class UpstreamHandler(): def __init__(self): self.provider = None self.valid = False self.proxy = None self.upstream = None async def setup(self, handler): self.provider = handler.get_query_argument("provider", None) if self.provider in providers.keys(): self.valid = True path = handler.request.path if self.provider == "nextcloud": path = path.removesuffix("/").removesuffix("download").removesuffix("/") elif self.provider == "youtube": path = path.removeprefix("/") src = providers[self.provider] + path proxy_list = proxies.get(self.provider) if isinstance(proxy_list, list): futures = [] sessions = [] for current in proxy_list: session = current.session() sessions.append(session) future = AsyncSession(session.head(src), current) task = asyncio.create_task(future.task()) futures.append(task) done = False for future in asyncio.as_completed(futures): try: result = await future resp = result.resp except Exception as e: logger.info(e) else: done = True new_url = str(resp.url) if new_url.lower().startswith("https://consent.youtube.com"): self.upstream = src else: self.upstream = new_url self.proxy = result.current break for future in futures: if not future.done(): future.cancel() for session in sessions: await session.close() if icecast_server is not None and stream_server is not None: try: with open("/app/sources.json", "r") as f: data = json.loads(f.read()) playlist = "#EXTM3U\n" for key in data: current = data[key] name = current["name"] radio = current["radio"] if radio: playlist += f'#EXTINF:0 radio="true", {name}\n' playlist += icecast_server + key + "\n" else: playlist += f'#EXTINF:0 radio="false", {name}\n' playlist += stream_server + key + "\n" except Exception as e: logger.info(e) template_html = None template_script = None videojs_version = None font_awesome_version = None custom_style = None favicon = None try: with open("/app/index.html", "r") as f: template_html = tornado.template.Template(f.read().strip()) with open("/app/script.js", "r") as f: template_script = tornado.template.Template(f.read().strip()) with open("/app/version/video.js.txt", "r") as f: videojs_version = f.read().strip() with open("/app/version/chromecast.txt", "r") as f: chromecast_version = f.read().strip() with open("/app/version/font-awesome.txt", "r") as f: font_awesome_version = f.read().strip() with open("/app/favicon.png", "rb") as f: favicon = f.read() with open("/app/style.css", "r") as f: custom_style_raw = bytes(f.read().strip(), "utf-8") b64 = str(base64.b64encode(custom_style_raw), "ascii") custom_style = f'data:text/css;charset=utf-8;base64,{b64}' except Exception as e: logger.info(e) class MainHandler(tornado.web.RequestHandler): async def handle_any(self, redir): handler = UpstreamHandler() await handler.setup(self) if handler.valid: await self.handle_render(handler) else: logger.info(f'provider missing {self.request.uri}') self.set_status(404) self.write("Stream not found. (provider missing)") async def handle_render(self, handler): if template_script is not None and template_html is not None: provider_data = None if handler.provider == "nextcloud": provider_data = await stream_providers.get_nextcloud(handler.upstream, handler.proxy, logger) else: provider_data = await stream_providers.get_any(handler.upstream, handler.proxy, logger) proxied = await handler.proxy.proxy_url([provider_data.upstream(), provider_data.thumbnail()]) video_info = {} video_info["upstream"] = proxied[0] video_info["poster"] = proxied[1] video_info["ctype"] = provider_data.ctype() script = template_script.generate(info=json.dumps(video_info)) b64 = str(base64.b64encode(script), "ascii") script_file = f'data:text/javascript;charset=utf-8;base64,{b64}' data["script"] = script_file data["videojs_version"] = videojs_version data["chromecast_version"] = chromecast_version data["font_awesome_version"] = font_awesome_version data["custom_style"] = custom_style rendered_html = template_html.generate(data=data, meta=provider_data.meta(), title=provider_data.title()) self.write(rendered_html) else: self.set_status(404) self.write("HTML template missing.") async def get(self): await self.handle_any(True) async def head(self): await self.handle_any(False) class FileHandler(tornado.web.RequestHandler): def get(self): self.set_header("Content-Type", "text/plain; charset=utf-8") self.write(playlist) class IconHandler(tornado.web.RequestHandler): def get(self): self.set_header("Content-Type", "image/png") self.write(favicon) try: handlers = [] handlers.append((tornado.routing.PathMatches("/sources.m3u8"), FileHandler)) handlers.append((tornado.routing.PathMatches("/favicon.ico"), IconHandler)) handlers.append((tornado.routing.AnyMatches(), MainHandler)) app_web = tornado.web.Application(handlers) app_web.listen(8080) tornado.ioloop.IOLoop.current().start() except KeyboardInterrupt: print()