#!/usr/bin/env python3 import json import sys import urllib.parse import re import os import time import base64 import logging import asyncio import tornado.web import tornado.routing import aiohttp import aiohttp_socks import stream_providers logging.basicConfig(format='[%(filename)s:%(lineno)d] %(message)s', stream=sys.stdout, level=logging.INFO) logger = logging.getLogger(__name__) providers = {} providers["nrk"] = "https://tv.nrk.no" providers["svt"] = "https://svtplay.se" providers["youtube"] = "https://www.youtube.com/watch?v=" providers["twitch"] = "https://twitch.tv" nextcloud_server = os.environ.get("NEXTCLOUD_SERVER") if nextcloud_server is not None: providers["nextcloud"] = nextcloud_server playlist = None icecast_server = os.environ.get("ICECAST_SERVER") stream_server = os.environ.get("STREAM_SERVER") proxy_server = os.environ.get("PROXY_SERVER") class ProxyElem(): def __init__(self, proxy): self.proxy = proxy def local(self): timeout = aiohttp.ClientTimeout(total=1) return aiohttp.ClientSession(timeout=timeout) def session(self): connector = None if self.proxy is not None: connector = aiohttp_socks.ProxyConnector.from_url("socks5://" + self.proxy) timeout = aiohttp.ClientTimeout(total=2) return aiohttp.ClientSession(connector=connector, timeout=timeout) def __repr__(self): return str(self.proxy) async def content_type(self, url): ctype = None async with self.session() as session: for i in range(5): try: resp = await session.head(url) ctype = resp.headers.get("Content-Type", None) except Exception as e: logger.info(e) else: if isinstance(ctype, str): return ctype return "binary/octet-type" async def proxy_url(self, current, path): jdata = None data = {} data_list = [data] if path is None: data["upstream"] = current else: data["upstream"] = urllib.parse.urljoin(current, path) data["proxied"] = True if self.proxy is None: data["proxied"] = False else: data["proxy"] = self.proxy if proxy_server is None: return data["upstream"] try: async with self.local() as session: resp = await session.post(proxy_server, json=data_list) text = await resp.text() jdata = json.loads(text) except Exception as e: logger.info(e) if isinstance(jdata, list) and len(jdata) == 1: return jdata[0] else: return data["upstream"] class AsyncSessionData(): def __init__(self, resp, current): self.resp = resp self.current = current class AsyncSession(): def __init__(self, resp, current): self.sdata = AsyncSessionData(resp, current) async def task(self): resp = await self.sdata.resp return AsyncSessionData(resp, self.sdata.current) proxies = {} for key in providers: proxies[key] = [] current = [] for i in range(0,9): proxy = os.environ.get(f'{key}_proxy{i}'.upper()) if proxy is not None: current.append(proxy) if len(current) == 0: proxies[key].append(ProxyElem(None)) else: for proxy in current: proxies[key].append(ProxyElem(proxy)) class UpstreamHandler(): def __init__(self): self.provider = None self.render_url = None self.stream_url = None self.proxy = None self.upstream = None self.upstream_safe = None self.render = False self.stream = False async def setup(self, handler): self.provider = handler.get_query_argument("provider", None) render_str = handler.get_query_argument("render", "false") if self.provider in providers.keys(): if render_str.lower() == "true": self.render = True else: self.stream = True path = handler.request.path if self.provider == "nextcloud": path = path.removesuffix("/").removesuffix("download").removesuffix("/") elif self.provider == "youtube": path = path.removeprefix("/") src = providers[self.provider] + path proxy_list = proxies.get(self.provider) if isinstance(proxy_list, list): futures = [] sessions = [] for current in proxy_list: session = current.session() sessions.append(session) future = AsyncSession(session.head(src), current) task = asyncio.create_task(future.task()) futures.append(task) done = False for future in asyncio.as_completed(futures): try: result = await future resp = result.resp except Exception as e: logger.info(e) else: done = True new_url = str(resp.url) if new_url.lower().startswith("https://consent.youtube.com"): self.upstream = src self.upstream_safe = urllib.parse.quote(src) else: self.upstream = new_url self.upstream_safe = urllib.parse.quote(new_url) self.proxy = result.current break for future in futures: if not future.done(): future.cancel() for session in sessions: await session.close() if icecast_server is not None and stream_server is not None: try: with open("/app/sources.json", "r") as f: data = json.loads(f.read()) playlist = "#EXTM3U\n" for key in data: current = data[key] name = current["name"] radio = current["radio"] if radio: playlist += f'#EXTINF:0 radio="true", {name}\n' playlist += icecast_server + key + "\n" else: playlist += f'#EXTINF:0 radio="false", {name}\n' playlist += stream_server + key + "\n" except Exception as e: logger.info(e) template_html = None script_file = None videojs_version = None font_awesome_version = None custom_style = None favicon = None try: with open("/app/index.html", "r") as f: template_html = tornado.template.Template(f.read().strip()) with open("/app/script.js", "r") as f: script_raw = bytes(f.read().strip(), "utf-8") b64 = str(base64.b64encode(script_raw), "ascii") script_file = f'data:text/javascript;charset=utf-8;base64,{b64}' with open("/app/version/video.js.txt", "r") as f: videojs_version = f.read().strip() with open("/app/version/chromecast.txt", "r") as f: chromecast_version = f.read().strip() with open("/app/version/font-awesome.txt", "r") as f: font_awesome_version = f.read().strip() with open("/app/favicon.png", "rb") as f: favicon = f.read() with open("/app/style.css", "r") as f: custom_style_raw = bytes(f.read().strip(), "utf-8") b64 = str(base64.b64encode(custom_style_raw), "ascii") custom_style = f'data:text/css;charset=utf-8;base64,{b64}' except Exception as e: logger.info(e) async def rewrite(upstream, current, proxy): rewrite_start = (time.time_ns() // 1_000_000) ndata = None text = None try: async with proxy.session() as session: resp = await session.get(upstream) text = await resp.text() except Exception as e: logger.info(e) if text is not None: lines = text.splitlines() links = [] for line in lines: if line.startswith("#EXT-X-KEY:METHOD="): matches = re.findall(r'(?<=URI=").+(?=")', line) if len(matches) == 1: ldata = {} ldata["upstream"] = urllib.parse.urljoin(current, matches[0]) ldata["proxy"] = proxy.proxy ldata["proxied"] = isinstance(proxy.proxy, str) links.append(ldata) elif line.startswith("#"): pass else: ldata = {} ldata["upstream"] = urllib.parse.urljoin(current, line) ldata["proxy"] = proxy.proxy ldata["proxied"] = isinstance(proxy.proxy, str) links.append(ldata) if isinstance(proxy_server, str): ndata = "" try: async with proxy.local() as session: resp = await session.post(proxy_server, json=links) link_text = await resp.text() except Exception as e: logger.info(e) else: if isinstance(link_text, str): links = json.loads(link_text) for line in lines: if line.startswith("#EXT-X-KEY:METHOD="): matches = re.findall(r'(?<=URI=").+(?=")', line) if len(matches) == 1: new_url = links.pop(0) ndata += re.sub(r'URI=".+"', f'URI="{new_url}"', line) elif line.startswith("#"): ndata += line else: ndata += links.pop(0) ndata += "\n" rewrite_stop = (time.time_ns() // 1_000_000) logger.info(str(rewrite_stop - rewrite_start)) return ndata class MainHandler(tornado.web.RequestHandler): async def handle_any(self, redir): handler = UpstreamHandler() await handler.setup(self) if handler.render: await self.handle_render(handler) elif handler.stream: await self.handle_stream(handler, redir) else: logger.info(f'provider missing {self.request.uri}') self.set_status(404) self.write("Stream not found. (provider missing)") async def handle_render(self, handler): if script_file is not None and template_html is not None: provider_data = await stream_providers.get_any(handler.upstream, handler.proxy, logger) data["script"] = script_file data["videojs_version"] = videojs_version data["chromecast_version"] = chromecast_version data["font_awesome_version"] = font_awesome_version data["custom_style"] = custom_style rendered_html = template_html.generate(data=data, meta=provider_data.meta(), title=provider_data.title()) self.write(rendered_html) else: self.set_status(404) self.write("HTML template missing.") async def handle_stream(self, handler, redir): upstream = None if handler.provider == "nextcloud": upstream = handler.upstream + "/download" else: provider_data = await stream_providers.get_any(handler.upstream, handler.proxy, logger) upstream = provider_data.upstream() if isinstance(provider_data.thumbnail(), str): image = await handler.proxy.proxy_url(provider_data.thumbnail(), None) if isinstance(image, str): self.set_header("Custom-Poster", image) if upstream is None: logger.info(f'invalid upstream ({handler.provider})') self.set_status(404) self.write("Stream not found. (invalid upstream)") else: upstream_proxy = await handler.proxy.proxy_url(upstream, None) ctype = await handler.proxy.content_type(upstream_proxy) data = None if "mpegurl" in ctype.lower(): data = await rewrite(upstream_proxy, upstream, handler.proxy) if isinstance(data, str): self.set_header("Content-Type", "application/vnd.apple.mpegurl") self.write(data) else: self.set_header("Content-Type", ctype) if redir: self.redirect(upstream_proxy, status=303) async def get(self): await self.handle_any(True) async def head(self): await self.handle_any(False) class FileHandler(tornado.web.RequestHandler): def get(self): self.set_header("Content-Type", "text/plain; charset=utf-8") self.write(playlist) class IconHandler(tornado.web.RequestHandler): def get(self): self.set_header("Content-Type", "image/png") self.write(favicon) try: handlers = [] handlers.append((tornado.routing.PathMatches("/sources.m3u8"), FileHandler)) handlers.append((tornado.routing.PathMatches("/favicon.ico"), IconHandler)) handlers.append((tornado.routing.AnyMatches(), MainHandler)) app_web = tornado.web.Application(handlers) app_web.listen(8080) tornado.ioloop.IOLoop.current().start() except KeyboardInterrupt: print()