#!/usr/bin/env python3 import json import sys import urllib.parse import re import os import base64 import logging import asyncio import streamlink import tornado.web import tornado.routing import aiohttp import aiohttp_socks logging.basicConfig(format='[%(filename)s:%(lineno)d] %(message)s', stream=sys.stdout, level=logging.INFO) logger = logging.getLogger(__name__) providers = {} providers["nrk"] = "https://tv.nrk.no" providers["svt"] = "https://svtplay.se" providers["youtube"] = "https://www.youtube.com/watch?v=" providers["twitch"] = "https://twitch.tv" playlist = None icecast_server = os.environ.get("ICECAST_SERVER") stream_server = os.environ.get("STREAM_SERVER") proxy_server = os.environ.get("PROXY_SERVER") class ProxyElem(): def __init__(self, proxy): self.proxy = proxy def stream(self): session = streamlink.Streamlink() session.set_option("http-timeout", 2.0) if self.proxy is not None: session.set_option("https-proxy", "socks5://" + self.proxy) session.set_option("http-proxy", "socks5://" + self.proxy) return session def local(self): timeout = aiohttp.ClientTimeout(total=1) return aiohttp.ClientSession(timeout=timeout) def session(self): connector = None if self.proxy is not None: connector = aiohttp_socks.ProxyConnector.from_url("socks5://" + self.proxy) timeout = aiohttp.ClientTimeout(total=2) return aiohttp.ClientSession(connector=connector, timeout=timeout) def __repr__(self): return str(self.proxy) async def content_type(self, url): ctype = "binary/octet-stream" try: async with self.session() as session: resp = await session.head(url) ctype = resp.headers.get("Content-Type", "binary/octet-stream") except Exception as e: logger.info(e) return ctype async def proxy_url(self, current, path): jdata = None data = {} data_list = [data] if path is None: data["upstream"] = current else: data["upstream"] = urllib.parse.urljoin(current, path) data["proxied"] = True if self.proxy is None: data["proxied"] = False else: data["proxy"] = self.proxy if proxy_server is None: return data["upstream"] try: async with self.local() as session: resp = await session.post(proxy_server, json=data_list) text = await resp.text() jdata = json.loads(text) except Exception as e: logger.info(e) if isinstance(jdata, list) and len(jdata) == 1: return jdata[0] else: return data["upstream"] class AsyncSessionData(): def __init__(self, resp, current): self.resp = resp self.current = current class AsyncSession(): def __init__(self, resp, current): self.sdata = AsyncSessionData(resp, current) async def task(self): resp = await self.sdata.resp return AsyncSessionData(resp, self.sdata.current) proxies = {} for key in providers: proxies[key] = [] current = [] for i in range(0,9): proxy = os.environ.get(f'{key}_proxy{i}'.upper()) if proxy is not None: current.append(proxy) if len(current) == 0: proxies[key].append(ProxyElem(None)) else: for proxy in current: proxies[key].append(ProxyElem(proxy)) class UpstreamHandler(): def __init__(self): self.provider = None self.render_url = None self.stream_url = None self.proxy = None self.upstream = None self.upstream_safe = None self.render = False self.stream = True async def setup(self, handler): self.provider = handler.get_query_argument("provider", None) render_str = handler.get_query_argument("render", "false") self.render = False self.stream = False if self.provider in providers.keys(): if render_str.lower() == "true": self.render = True else: self.stream = True path = handler.request.path if self.provider == "youtube": path = path.strip("/") if isinstance(stream_server, str): self.render_url = f'{stream_server}{handler.request.path}?provider={self.provider}&render=true' self.stream_url = f'{stream_server}{handler.request.path}?provider={self.provider}' else: self.render_url = f'{handler.request.path}?provider={self.provider}&render=true' self.stream_url = f'{handler.request.path}?provider={self.provider}' src = providers[self.provider] + path proxy_list = proxies.get(self.provider) if isinstance(proxy_list, list): futures = [] sessions = [] for current in proxy_list: session = current.session() sessions.append(session) future = AsyncSession(session.head(src), current) task = asyncio.create_task(future.task()) futures.append(task) done = False for future in asyncio.as_completed(futures): try: result = await future resp = result.resp except Exception as e: logger.info(e) else: done = True new_url = str(resp.url) if new_url.lower().startswith("https://consent.youtube.com"): self.upstream = src self.upstream_safe = urllib.parse.quote(src) else: self.upstream = new_url self.upstream_safe = urllib.parse.quote(new_url) self.proxy = result.current break for future in futures: if not future.done(): future.cancel() for session in sessions: await session.close() async def meta(self): data = [] try: embed_url = f'https://noembed.com/embed?url={self.upstream_safe}' async with self.proxy.session() as session: resp = await session.get(embed_url) text = await resp.text() data_raw = json.loads(text) if isinstance(data_raw, dict): data_new = {} data_valid = True data_new["og:title"] = data_raw.get("title") data_new["og:description"] = data_raw.get("author_name") data_new["og:image"] = data_raw.get("thumbnail_url") data_new["og:video:height"] = data_raw.get("height") data_new["og:video:width"] = data_raw.get("width") data_new["og:image:height"] = data_raw.get("thumbnail_height") data_new["og:image:width"] = data_raw.get("thumbnail_width") for elem in data_new: if not isinstance(elem, str): data_valid = False break if data_valid: data = list(data_new.items()) except Exception as e: logger.info(e) return data if icecast_server is not None and stream_server is not None: try: with open("/app/sources.json", "r") as f: data = json.loads(f.read()) playlist = "#EXTM3U\n" for key in data: current = data[key] name = current["name"] radio = current["radio"] if radio: playlist += f'#EXTINF:0 radio="true", {name}\n' playlist += icecast_server + key + "\n" else: playlist += f'#EXTINF:0 radio="false", {name}\n' playlist += stream_server + key + "\n" except Exception as e: logger.info(e) template_html = None template_js = None template_embed = tornado.template.Template('') videojs_version = None castjs_version = None custom_style = None try: with open("/app/index.html", "r") as f: template_html = tornado.template.Template(f.read().strip()) with open("/app/script.js", "r") as f: template_js = tornado.template.Template(f.read().strip()) with open("/app/videojs-version.txt", "r") as f: videojs_version = f.read().strip() with open("/app/chromecast-version.txt", "r") as f: chromecast_version = f.read().strip() with open("/app/style.css", "r") as f: custom_style_raw = bytes(f.read().strip(), "utf-8") b64 = str(base64.b64encode(custom_style_raw), "ascii") custom_style = f'data:text/css;charset=utf-8;base64,{b64}' except Exception as e: logger.info(e) async def rewrite(current, proxy): ndata = None text = None try: async with proxy.session() as session: resp = await session.get(current) text = await resp.text() except Exception as e: logger.info(e) if text is not None: lines = text.splitlines() links = [] for line in lines: if line.startswith("#EXT-X-KEY:METHOD="): matches = re.findall(r'(?<=URI=").+(?=")', line) if len(matches) == 1: ldata = {} ldata["upstream"] = urllib.parse.urljoin(current, matches[0]) ldata["proxy"] = proxy.proxy ldata["proxied"] = isinstance(proxy.proxy, str) links.append(ldata) elif line.startswith("#"): pass else: ldata = {} ldata["upstream"] = urllib.parse.urljoin(current, line) ldata["proxy"] = proxy.proxy ldata["proxied"] = isinstance(proxy.proxy, str) links.append(ldata) if isinstance(proxy_server, str): ndata = "" try: async with proxy.local() as session: resp = await session.post(proxy_server, json=links) link_text = await resp.text() except Exception as e: logger.info(e) else: if isinstance(link_text, str): links = json.loads(link_text) for line in lines: if line.startswith("#EXT-X-KEY:METHOD="): matches = re.findall(r'(?<=URI=").+(?=")', line) if len(matches) == 1: new_url = links.pop(0) ndata += re.sub(r'URI=".+"', f'URI="{new_url}"', line) elif line.startswith("#"): ndata += line else: ndata += links.pop(0) ndata += "\n" return ndata class MainHandler(tornado.web.RequestHandler): async def handle_any(self, redir): handler = UpstreamHandler() await handler.setup(self) if handler.render: await self.handle_render(handler) elif handler.stream: await self.handle_stream(handler, redir) else: logger.info(f'provider missing {self.request.uri}') self.set_status(404) self.write("Stream not found. (provider missing)") async def handle_render(self, handler): if template_js is not None and template_html is not None: rendered_js = template_js.generate(stream=handler.stream_url); b64_js = str(base64.b64encode(rendered_js), "ascii") script = f'data:text/javascript;charset=utf-8;base64,{b64_js}' meta = await handler.meta() data["script"] = script data["videojs_version"] = videojs_version data["chromecast_version"] = chromecast_version data["custom_style"] = custom_style data["stream_url"] = handler.stream_url data["render_url"] = handler.render_url rendered_html = template_html.generate(data=data, meta=meta) self.write(rendered_html) else: self.set_status(404) self.write("HTML template missing.") async def handle_stream(self, handler, redir): upstream = None streams = None for i in range(5): try: streams = handler.proxy.stream().streams(handler.upstream) except Exception as e: logger.info(e) else: break if streams is not None: for key in reversed(streams): stream = streams.get(key) if hasattr(stream, "url"): upstream = stream.url break if upstream is None: logger.info(f'invalid upstream ({handler.provider})') self.set_status(404) self.write("Stream not found. (invalid upstream)") else: upstream_proxy = await handler.proxy.proxy_url(upstream, None) ctype = await handler.proxy.content_type(upstream_proxy) logger.info(upstream_proxy) data = None if "mpegurl" in ctype.lower(): data = await rewrite(upstream, handler.proxy) if isinstance(data, str): self.set_header("Content-Type", "application/vnd.apple.mpegurl") self.write(data) else: self.set_header("Content-Type", ctype) if redir: self.redirect(upstream_proxy, status=303) async def get(self): await self.handle_any(True) async def head(self): await self.handle_any(False) class FileHandler(tornado.web.RequestHandler): def get(self): self.set_header("Content-Type", "text/plain; charset=utf-8") self.write(playlist) class IconHandler(tornado.web.RequestHandler): def get(self): self.set_header("Content-Type", "text/plain; charset=utf-8") self.set_status(204) try: handlers = [] handlers.append((tornado.routing.PathMatches("/sources.m3u8"), FileHandler)) handlers.append((tornado.routing.PathMatches("/favicon.ico"), IconHandler)) handlers.append((tornado.routing.AnyMatches(), MainHandler)) app_web = tornado.web.Application(handlers) app_web.listen(8080) tornado.ioloop.IOLoop.current().start() except KeyboardInterrupt: print()