#!/usr/bin/env python3 import json import sys import base64 import dataclasses from typing import Optional, cast, Any import logging import asyncio import tornado.web import tornado.routing import config import stream_providers logging.basicConfig( format="[%(filename)s:%(lineno)d] %(message)s", stream=sys.stdout, level=logging.INFO, ) logger = logging.getLogger(__name__) class UpstreamHandler: def __init__(self): self.provider: Optional[str] = None self.raw: bool = False self.valid: bool = False self.proxy: config.ProxyElem = config.ProxyElem(None, None) self.direct: bool = False self.upstream: Optional[str] = None async def test_socks(self, proxy): if not hasattr(proxy, "proxy") or not isinstance(proxy.proxy, str): return (True, config.ProxyElem(None, None)) try: splitted = proxy.proxy.rsplit(":", 1) host = proxy port = 1080 if len(splitted) == 2: host = splitted[0] port = splitted[1] future = asyncio.open_connection(host=host, port=port) await asyncio.wait_for(future, timeout=1) except Exception: return (False, proxy) else: return (True, proxy) async def setup(self, handler): self.provider = handler.get_query_argument("provider", None) raw_str = handler.get_query_argument("raw", None) direct_str = handler.get_query_argument("direct", None) true_values = ["y", "yes", "t", "true", "on", "1"] if isinstance(direct_str, str): try: self.direct = direct_str.lower() in true_values except ValueError as e: logger.info(e) if isinstance(raw_str, str): try: self.raw = raw_str.lower() in true_values except ValueError as e: logger.info(e) if self.provider in config.providers: self.valid = True path = handler.request.path if self.provider.startswith("nextcloud"): path = path.removesuffix("/").removesuffix("download").removesuffix("/") elif self.provider.startswith("youtube"): path = path.removeprefix("/") self.upstream = config.providers[self.provider] + path if not self.direct: proxy_list = config.proxies.get(self.provider) if isinstance(proxy_list, list): futures = [] for current in proxy_list: future = asyncio.create_task(self.test_socks(current)) futures.append(future) for future in asyncio.as_completed(futures): success, current = await future if success: self.proxy = current break for future in futures: if not future.done(): future.cancel() @dataclasses.dataclass class VideoInfo: upstream: None | str = None download: None | str = None poster: None | str = None ctype: None | str = None class NoDataError(Exception): pass class MissingTemplateError(Exception): pass class MainHandler(tornado.web.RequestHandler): async def handle_any(self): handler = UpstreamHandler() await handler.setup(self) if handler.valid: if handler.raw: await self.handle_raw(handler) else: await self.handle_render(handler) else: logger.info("provider missing %s", self.request.uri) self.set_status(404) self.write("Stream not found. (provider missing)") async def get_data(self, handler) -> tuple[dict[str, str], str, str]: video_info = None if config.template_script is None or config.template_html is None: raise MissingTemplateError() provider_data = None if handler.provider.startswith("nextcloud"): provider_data = await stream_providers.get_nextcloud( handler.upstream, handler.proxy, logger ) elif handler.provider.startswith("seafile"): provider_data = await stream_providers.get_seafile( handler.upstream, handler.proxy, logger ) else: provider_data = await stream_providers.get_any( handler.upstream, handler.proxy, logger ) if provider_data is None: raise NoDataError() video_info = VideoInfo() video_info.ctype = provider_data.ctype() if handler.direct: video_info.upstream = provider_data.upstream() video_info.poster = provider_data.thumbnail() else: proxied = await handler.proxy.proxy_url( [ config.LinkWithType( upstream=provider_data.upstream(), ctype=provider_data.proxy_ctype(), ), config.LinkWithType( upstream=provider_data.upstream(), ctype=provider_data.proxy_ctype(), download=True, ), config.LinkWithType( upstream=provider_data.thumbnail(), ), ] ) video_info.upstream = proxied[0] video_info.download = proxied[1] if ( isinstance(video_info.ctype, str) and "mpegurl" in video_info.ctype.lower() ): video_info.download = None video_info.poster = proxied[2] return ( dataclasses.asdict(video_info), provider_data.meta(), provider_data.title(), ) async def handle_raw(self, handler): try: video_info = (await self.get_data(handler))[0] self.redirect(url=video_info["upstream"], status=303) except NoDataError: self.set_status(404) self.write("Video not found.") except MissingTemplateError: self.set_status(404) self.write("HTML template missing.") async def handle_render(self, handler): try: video_info, meta, title = await self.get_data(handler) except NoDataError: self.set_status(404) self.write("Video not found.") return except MissingTemplateError: self.set_status(404) self.write("HTML template missing.") return script = config.template_script.generate(info=json.dumps(video_info)) b64 = str(base64.b64encode(script), "ascii") data = {} script_file = f"data:text/javascript;charset=utf-8;base64,{b64}" data["script"] = script_file data["videojs_version"] = config.videojs_version data["chromecast_version"] = config.chromecast_version data["font_awesome_version"] = config.font_awesome_version rendered_html = config.template_html.generate(data=data, meta=meta, title=title) self.write(rendered_html) async def get(self): await self.handle_any() async def head(self): await self.handle_any() def data_received(self, _): pass class PlaylistHandler(tornado.web.RequestHandler): def get(self): self.set_header("Content-Type", "text/plain; charset=utf-8") self.write(config.playlist) def data_received(self, _): pass class IconHandler(tornado.web.RequestHandler): def get(self): self.set_header("Content-Type", "image/png") self.write(config.favicon) def data_received(self, _): pass class StyleHandler(tornado.web.RequestHandler): def get(self): self.set_header("Content-Type", "text/css; charset=utf-8") self.write(config.custom_style) def data_received(self, _): pass try: handlers: list[tuple[tornado.routing.Matcher, tornado.web.RequestHandler]] = [] handlers.append( ( cast(tornado.routing.Matcher, tornado.routing.PathMatches("/sources.m3u8")), cast(tornado.web.RequestHandler, PlaylistHandler), ) ) handlers.append( ( cast(tornado.routing.Matcher, tornado.routing.PathMatches("/favicon.ico")), cast(tornado.web.RequestHandler, IconHandler), ) ) handlers.append( ( cast(tornado.routing.Matcher, tornado.routing.PathMatches("/style.css")), cast(tornado.web.RequestHandler, StyleHandler), ) ) handlers.append( ( cast(tornado.routing.Matcher, tornado.routing.AnyMatches()), cast(tornado.web.RequestHandler, MainHandler), ) ) app_web = tornado.web.Application(cast(Any, handlers)) app_web.listen(8080) tornado.ioloop.IOLoop.current().start() except KeyboardInterrupt: print()