#!/usr/bin/env python3 import json import sys import urllib.parse import re import os import time import base64 import logging import asyncio import tornado.web import tornado.routing import aiohttp import aiohttp_socks import html.parser import stream_providers logging.basicConfig(format='[%(filename)s:%(lineno)d] %(message)s', stream=sys.stdout, level=logging.INFO) logger = logging.getLogger(__name__) providers = {} providers["nrk"] = "https://tv.nrk.no" providers["svt"] = "https://svtplay.se" providers["youtube"] = "https://www.youtube.com/watch?v=" providers["twitch"] = "https://twitch.tv" nextcloud_server = os.environ.get("NEXTCLOUD_SERVER") if nextcloud_server is not None: providers["nextcloud"] = nextcloud_server playlist = None icecast_server = os.environ.get("ICECAST_SERVER") stream_server = os.environ.get("STREAM_SERVER") proxy_server = os.environ.get("PROXY_SERVER") class ProxyElem(): def __init__(self, proxy): self.proxy = proxy def local(self): timeout = aiohttp.ClientTimeout(total=1) return aiohttp.ClientSession(timeout=timeout) def session(self): connector = None if self.proxy is not None: connector = aiohttp_socks.ProxyConnector.from_url("socks5://" + self.proxy) timeout = aiohttp.ClientTimeout(total=2) return aiohttp.ClientSession(connector=connector, timeout=timeout) def __repr__(self): return str(self.proxy) async def content_type(self, url): ctype = None async with self.session() as session: for i in range(5): try: resp = await session.head(url) ctype = resp.headers.get("Content-Type", None) except Exception as e: logger.info(e) else: if isinstance(ctype, str): return ctype return "binary/octet-type" async def proxy_url(self, current, path): jdata = None data = {} data_list = [data] if path is None: data["upstream"] = current else: data["upstream"] = urllib.parse.urljoin(current, path) data["proxied"] = True if self.proxy is None: data["proxied"] = False else: data["proxy"] = self.proxy if proxy_server is None: return data["upstream"] try: async with self.local() as session: resp = await session.post(proxy_server, json=data_list) text = await resp.text() jdata = json.loads(text) except Exception as e: logger.info(e) if isinstance(jdata, list) and len(jdata) == 1: return jdata[0] else: return data["upstream"] class AsyncSessionData(): def __init__(self, resp, current): self.resp = resp self.current = current class AsyncSession(): def __init__(self, resp, current): self.sdata = AsyncSessionData(resp, current) async def task(self): resp = await self.sdata.resp return AsyncSessionData(resp, self.sdata.current) proxies = {} for key in providers: proxies[key] = [] current = [] for i in range(0,9): proxy = os.environ.get(f'{key}_proxy{i}'.upper()) if proxy is not None: current.append(proxy) if len(current) == 0: proxies[key].append(ProxyElem(None)) else: for proxy in current: proxies[key].append(ProxyElem(proxy)) class MetaParser(html.parser.HTMLParser): def __init__(self): self.meta_data = {} self.accepted_attrs = [] self.accepted_attrs.append("og:title") self.accepted_attrs.append("og:description") self.accepted_attrs.append("og:image") self.accepted_attrs.append("og:video:height") self.accepted_attrs.append("og:video:width") self.accepted_attrs.append("og:image:height") self.accepted_attrs.append("og:image:width") super().__init__() def handle_starttag(self, tag, attrs): if tag == "meta": name = None for attr in (attrs + attrs): if len(attr) == 2: if isinstance(name, str): if attr[0] == "content": self.meta_data[name] = attr[1] return elif attr[0] == "property" and attr[1] in self.accepted_attrs: name = attr[1] class UpstreamHandler(): def __init__(self): self.provider = None self.render_url = None self.stream_url = None self.proxy = None self.upstream = None self.upstream_safe = None self.render = False self.stream = False async def setup(self, handler): self.provider = handler.get_query_argument("provider", None) render_str = handler.get_query_argument("render", "false") if self.provider in providers.keys(): if render_str.lower() == "true": self.render = True else: self.stream = True path = handler.request.path if self.provider == "youtube": path = path.strip("/") if isinstance(stream_server, str): self.render_url = f'{stream_server}{handler.request.path}?provider={self.provider}&render=true' self.stream_url = f'{stream_server}{handler.request.path}?provider={self.provider}' else: self.render_url = f'{handler.request.path}?provider={self.provider}&render=true' self.stream_url = f'{handler.request.path}?provider={self.provider}' src = providers[self.provider] + path proxy_list = proxies.get(self.provider) if isinstance(proxy_list, list): futures = [] sessions = [] for current in proxy_list: session = current.session() sessions.append(session) future = AsyncSession(session.head(src), current) task = asyncio.create_task(future.task()) futures.append(task) done = False for future in asyncio.as_completed(futures): try: result = await future resp = result.resp except Exception as e: logger.info(e) else: done = True new_url = str(resp.url) if new_url.lower().startswith("https://consent.youtube.com"): self.upstream = src self.upstream_safe = urllib.parse.quote(src) else: self.upstream = new_url self.upstream_safe = urllib.parse.quote(new_url) self.proxy = result.current break for future in futures: if not future.done(): future.cancel() for session in sessions: await session.close() async def meta(self): data = {} logger.info(type(self.proxy)) try: embed_url = f'https://noembed.com/embed?url={self.upstream_safe}' async with self.proxy.session() as session: resp = await session.get(embed_url) text = await resp.text() data_raw = json.loads(text) if isinstance(data_raw, dict): data_new = {} data_valid = True data_new["og:title"] = data_raw.get("title") data_new["og:description"] = data_raw.get("author_name") data_new["og:image"] = data_raw.get("thumbnail_url") data_new["og:video:height"] = data_raw.get("height") data_new["og:video:width"] = data_raw.get("width") data_new["og:image:height"] = data_raw.get("thumbnail_height") data_new["og:image:width"] = data_raw.get("thumbnail_width") data_filtered = {} for key in data_new: value = data_new.get(key) if isinstance(value, str): data_filtered[key] = value if len(data_filtered) == 0: resp = await session.get(self.upstream) text = await resp.text() parser = MetaParser() parser.feed(text) data = parser.meta_data else: data = data_filtered except Exception as e: logger.info(e) return data if icecast_server is not None and stream_server is not None: try: with open("/app/sources.json", "r") as f: data = json.loads(f.read()) playlist = "#EXTM3U\n" for key in data: current = data[key] name = current["name"] radio = current["radio"] if radio: playlist += f'#EXTINF:0 radio="true", {name}\n' playlist += icecast_server + key + "\n" else: playlist += f'#EXTINF:0 radio="false", {name}\n' playlist += stream_server + key + "\n" except Exception as e: logger.info(e) template_html = None script_file = None videojs_version = None font_awesome_version = None custom_style = None favicon = None try: with open("/app/index.html", "r") as f: template_html = tornado.template.Template(f.read().strip()) with open("/app/script.js", "r") as f: script_raw = bytes(f.read().strip(), "utf-8") b64 = str(base64.b64encode(script_raw), "ascii") script_file = f'data:text/javascript;charset=utf-8;base64,{b64}' with open("/app/version/video.js.txt", "r") as f: videojs_version = f.read().strip() with open("/app/version/chromecast.txt", "r") as f: chromecast_version = f.read().strip() with open("/app/version/font-awesome.txt", "r") as f: font_awesome_version = f.read().strip() with open("/app/favicon.png", "rb") as f: favicon = f.read() with open("/app/style.css", "r") as f: custom_style_raw = bytes(f.read().strip(), "utf-8") b64 = str(base64.b64encode(custom_style_raw), "ascii") custom_style = f'data:text/css;charset=utf-8;base64,{b64}' except Exception as e: logger.info(e) async def rewrite(current, proxy): ndata = None text = None try: async with proxy.session() as session: resp = await session.get(current) text = await resp.text() except Exception as e: logger.info(e) if text is not None: lines = text.splitlines() links = [] for line in lines: if line.startswith("#EXT-X-KEY:METHOD="): matches = re.findall(r'(?<=URI=").+(?=")', line) if len(matches) == 1: ldata = {} ldata["upstream"] = urllib.parse.urljoin(current, matches[0]) ldata["proxy"] = proxy.proxy ldata["proxied"] = isinstance(proxy.proxy, str) links.append(ldata) elif line.startswith("#"): pass else: ldata = {} ldata["upstream"] = urllib.parse.urljoin(current, line) ldata["proxy"] = proxy.proxy ldata["proxied"] = isinstance(proxy.proxy, str) links.append(ldata) if isinstance(proxy_server, str): ndata = "" try: async with proxy.local() as session: resp = await session.post(proxy_server, json=links) link_text = await resp.text() except Exception as e: logger.info(e) else: if isinstance(link_text, str): links = json.loads(link_text) for line in lines: if line.startswith("#EXT-X-KEY:METHOD="): matches = re.findall(r'(?<=URI=").+(?=")', line) if len(matches) == 1: new_url = links.pop(0) ndata += re.sub(r'URI=".+"', f'URI="{new_url}"', line) elif line.startswith("#"): ndata += line else: ndata += links.pop(0) ndata += "\n" return ndata class MainHandler(tornado.web.RequestHandler): async def handle_any(self, redir): handler = UpstreamHandler() await handler.setup(self) if handler.render: await self.handle_render(handler) elif handler.stream: await self.handle_stream(handler, redir) else: logger.info(f'provider missing {self.request.uri}') self.set_status(404) self.write("Stream not found. (provider missing)") async def handle_render(self, handler): if script_file is not None and template_html is not None: meta = await handler.meta() meta_list = list(meta.items()) title = meta.get("og:title") data["script"] = script_file data["videojs_version"] = videojs_version data["chromecast_version"] = chromecast_version data["font_awesome_version"] = font_awesome_version data["custom_style"] = custom_style rendered_html = template_html.generate(data=data, meta=meta_list, title=title) self.write(rendered_html) else: self.set_status(404) self.write("HTML template missing.") async def handle_stream(self, handler, redir): if handler.provider != "nextcloud": meta = await handler.meta() image = meta.get("og:image") if isinstance(image, str): if handler.provider == "youtube" and image.endswith("hqdefault.jpg"): image = image.removesuffix("hqdefault.jpg") + "maxresdefault.jpg" image = await handler.proxy.proxy_url(image, None) if isinstance(image, str): self.set_header("Custom-Poster", image) upstream = None if handler.provider == "nextcloud": upstream = urllib.parse.urljoin(handler.upstream, "/download") else: upstream = await stream_providers.get_any(handler.upstream, handler.proxy, logger) if upstream is None: logger.info(f'invalid upstream ({handler.provider})') self.set_status(404) self.write("Stream not found. (invalid upstream)") else: upstream_proxy = await handler.proxy.proxy_url(upstream, None) ctype = await handler.proxy.content_type(upstream_proxy) data = None if "mpegurl" in ctype.lower(): data = await rewrite(upstream, handler.proxy) if isinstance(data, str): self.set_header("Content-Type", "application/vnd.apple.mpegurl") self.write(data) else: self.set_header("Content-Type", ctype) if redir: self.redirect(upstream_proxy, status=303) async def get(self): await self.handle_any(True) async def head(self): await self.handle_any(False) class FileHandler(tornado.web.RequestHandler): def get(self): self.set_header("Content-Type", "text/plain; charset=utf-8") self.write(playlist) class IconHandler(tornado.web.RequestHandler): def get(self): self.set_header("Content-Type", "image/png") self.write(favicon) try: handlers = [] handlers.append((tornado.routing.PathMatches("/sources.m3u8"), FileHandler)) handlers.append((tornado.routing.PathMatches("/favicon.ico"), IconHandler)) handlers.append((tornado.routing.AnyMatches(), MainHandler)) app_web = tornado.web.Application(handlers) app_web.listen(8080) tornado.ioloop.IOLoop.current().start() except KeyboardInterrupt: print()