#!/usr/bin/env python3 import json import sys import base64 import logging import asyncio import tornado.web import tornado.routing import config import stream_providers from typing import Optional, cast, Any logging.basicConfig( format="[%(filename)s:%(lineno)d] %(message)s", stream=sys.stdout, level=logging.INFO, ) logger = logging.getLogger(__name__) class UpstreamHandler: def __init__(self): self.provider: Optional[str] = None self.raw: bool = False self.valid: bool = False self.proxy: config.ProxyElem = config.ProxyElem(None, None) self.direct: bool = False self.upstream: Optional[str] = None async def test_socks(self, proxy): if not hasattr(proxy, "proxy") or not isinstance(proxy.proxy, str): return (True, config.ProxyElem(None, None)) try: splitted = proxy.proxy.rsplit(":", 1) host = proxy port = 1080 if len(splitted) == 2: host = splitted[0] port = splitted[1] future = asyncio.open_connection(host=host, port=port) await asyncio.wait_for(future, timeout=1) except Exception: return (False, proxy) else: return (True, proxy) async def setup(self, handler): self.provider = handler.get_query_argument("provider", None) raw_str = handler.get_query_argument("raw", None) direct_str = handler.get_query_argument("direct", None) true_values = ["y", "yes", "t", "true", "on", "1"] if isinstance(direct_str, str): try: self.direct = direct_str.lower() in true_values except ValueError as e: logger.info(e) if isinstance(raw_str, str): try: self.raw = raw_str.lower() in true_values except ValueError as e: logger.info(e) if self.provider in config.providers: self.valid = True path = handler.request.path if self.provider.startswith("nextcloud"): path = path.removesuffix("/").removesuffix("download").removesuffix("/") elif self.provider.startswith("youtube"): path = path.removeprefix("/") self.upstream = config.providers[self.provider] + path if not self.direct: proxy_list = config.proxies.get(self.provider) if isinstance(proxy_list, list): futures = [] for current in proxy_list: future = asyncio.create_task(self.test_socks(current)) futures.append(future) for future in asyncio.as_completed(futures): success, current = await future if success: self.proxy = current break for future in futures: if not future.done(): future.cancel() class MainHandler(tornado.web.RequestHandler): async def handle_any(self): handler = UpstreamHandler() await handler.setup(self) if handler.valid: if handler.raw: await self.handle_raw(handler) else: await self.handle_render(handler) else: logger.info("provider missing %s", self.request.uri) self.set_status(404) self.write("Stream not found. (provider missing)") async def get_data(self, handler): video_info = None meta = None title = None if config.template_script is not None and config.template_html is not None: provider_data = None if handler.provider.startswith("nextcloud"): provider_data = await stream_providers.get_nextcloud( handler.upstream, handler.proxy, logger ) elif handler.provider.startswith("seafile"): provider_data = await stream_providers.get_seafile( handler.upstream, handler.proxy, logger ) else: provider_data = await stream_providers.get_any( handler.upstream, handler.proxy, logger ) video_info = {} if handler.direct: video_info["upstream"] = provider_data.upstream() video_info["poster"] = provider_data.thumbnail() else: proxied = await handler.proxy.proxy_url( [ (provider_data.upstream(), provider_data.proxy_ctype()), provider_data.thumbnail(), ] ) video_info["upstream"] = proxied[0] video_info["poster"] = proxied[1] video_info["ctype"] = provider_data.ctype() meta = provider_data.meta() title = provider_data.title() return (video_info, meta, title) async def handle_raw(self, handler): video_info = (await self.get_data(handler))[0] if video_info is not None: self.redirect(url=video_info["upstream"], status=303) else: self.set_status(404) self.write("HTML template missing.") async def handle_render(self, handler): video_info, meta, title = await self.get_data(handler) if video_info is not None: script = config.template_script.generate(info=json.dumps(video_info)) b64 = str(base64.b64encode(script), "ascii") data = {} script_file = f"data:text/javascript;charset=utf-8;base64,{b64}" data["script"] = script_file data["videojs_version"] = config.videojs_version data["chromecast_version"] = config.chromecast_version data["font_awesome_version"] = config.font_awesome_version rendered_html = config.template_html.generate( data=data, meta=meta, title=title ) self.write(rendered_html) else: self.set_status(404) self.write("HTML template missing.") async def get(self): await self.handle_any() async def head(self): await self.handle_any() def data_received(self, _): pass class PlaylistHandler(tornado.web.RequestHandler): def get(self): self.set_header("Content-Type", "text/plain; charset=utf-8") self.write(config.playlist) def data_received(self, _): pass class IconHandler(tornado.web.RequestHandler): def get(self): self.set_header("Content-Type", "image/png") self.write(config.favicon) def data_received(self, _): pass class StyleHandler(tornado.web.RequestHandler): def get(self): self.set_header("Content-Type", "text/css; charset=utf-8") self.write(config.custom_style) def data_received(self, _): pass try: handlers: list[tuple[tornado.routing.Matcher, tornado.web.RequestHandler]] = [] handlers.append( ( cast(tornado.routing.Matcher, tornado.routing.PathMatches("/sources.m3u8")), cast(tornado.web.RequestHandler, PlaylistHandler), ) ) handlers.append( ( cast(tornado.routing.Matcher, tornado.routing.PathMatches("/favicon.ico")), cast(tornado.web.RequestHandler, IconHandler), ) ) handlers.append( ( cast(tornado.routing.Matcher, tornado.routing.PathMatches("/style.css")), cast(tornado.web.RequestHandler, StyleHandler), ) ) handlers.append( ( cast(tornado.routing.Matcher, tornado.routing.AnyMatches()), cast(tornado.web.RequestHandler, MainHandler), ) ) app_web = tornado.web.Application(cast(Any, handlers)) app_web.listen(8080) tornado.ioloop.IOLoop.current().start() except KeyboardInterrupt: print()