import youtube_dl import streamlink import requests import asyncio import html.parser import urllib.parse import expiringdict import subprocess import json import re import config class DummyLogger: def debug(self, msg): pass def warning(self, msg): pass def error(self, msg): pass class MetaParser(html.parser.HTMLParser): def __init__(self): self.meta_data = {} super().__init__() def handle_starttag_meta(self, attrs): name = None for attr in attrs + attrs: if len(attr) == 2: if isinstance(name, str): if attr[0] == "content": self.meta_data[name] = attr[1] return elif attr[0] == "property": name = attr[1] def handle_starttag_input(self, attrs): name = None for attr in attrs + attrs: if len(attr) == 2: if isinstance(name, str): if attr[0] == "value": self.meta_data[name] = attr[1] return elif attr[0] == "name": name = attr[1] def handle_starttag(self, tag, attrs): if tag == "meta": return self.handle_starttag_meta(attrs) elif tag == "input": return self.handle_starttag_input(attrs) class StreamData: def __init__( self, upstream, ctype, proxy_ctype, thumbnail, title, description, override ): self.values = {} self.values["upstream"] = upstream self.values["ctype"] = ctype self.values["proxy_ctype"] = proxy_ctype self.values["thumbnail"] = thumbnail self.values["title"] = title self.values["description"] = description self.override = override def update(self, key, value, override): missing = not isinstance(self.values.get(key), str) override = override and isinstance(value, str) if missing or override: self.values[key] = value def upstream(self): return self.values.get("upstream") def ctype(self): ctype = self.values.get("ctype") proxy_ctype = self.values.get("proxy_ctype") if isinstance(ctype, str) and isinstance(proxy_ctype, str): if not ctype.startswith("audio/") and not ctype.startswith("video/"): return proxy_ctype return ctype def proxy_ctype(self): return self.values.get("proxy_ctype") def thumbnail(self): return self.values.get("thumbnail") def title(self): return self.values.get("title") def description(self): return self.values.get("description") def complete(self): return None not in self.values.values() def has_data(self): for elem in self.values.values(): if isinstance(elem, str): return True return False def meta(self): data = [] if isinstance(self.values.get("thumbnail"), str): data.append(("og:image", self.values.get("thumbnail"))) if isinstance(self.values.get("title"), str): data.append(("og:title", self.values.get("title"))) if isinstance(self.values.get("description"), str): data.append(("og:description", self.values.get("description"))) return data class StreamProvider: def __init__(self, upstream, proxy, logger): self.name = self.__class__.__name__ self.upstream = upstream self.proxy = None self.logger = logger if isinstance(proxy, config.ProxyElem): self.proxy = proxy def extract_mime(self, upstream): try: url = urllib.parse.urlparse(upstream) query = urllib.parse.parse_qs(url.query.lower()) except Exception as e: self.logger.info(e) else: mime_types = query.get("mime") if isinstance(mime_types, list): for mime in mime_types: if isinstance(mime, str) and not mime.startswith("text"): if "mpegurl" in mime: return "application/vnd.apple.mpegurl" return mime return None def init_stream(self): stream = {} stream["upstream"] = None stream["proxy_ctype"] = None stream["ctype"] = None stream["thumbnail"] = None stream["title"] = None stream["description"] = None stream["override"] = False return stream def process(self): data = self.stream() proxy_ctype = data.proxy_ctype() if not isinstance(data.upstream(), str) or isinstance(data.ctype(), str): return data proxies = None if self.proxy is not None and self.proxy.proxy is not None: proxies = {} proxies["http"] = "socks5://" + self.proxy.proxy proxies["https"] = "socks5://" + self.proxy.proxy ctype = None upstream = data.upstream() try: resp = requests.head( data.upstream(), proxies=proxies, timeout=5, allow_redirects=True ) except Exception as e: self.logger.info("%s <%s>", e, self.upstream) else: if resp.ok: upstream = resp.url ctype = resp.headers.get("Content-Type", "text/plain").lower() if ctype.startswith("text"): ctype = None elif "mpegurl" in ctype: ctype = "application/vnd.apple.mpegurl" return StreamData( data.upstream(), ctype, proxy_ctype, data.thumbnail(), data.title(), data.description(), data.override, ) async def run(self): data = None try: future = asyncio.to_thread(self.process) data = await asyncio.wait_for(future, timeout=5) except Exception as e: self.logger.info("%s <%s>", e, self.upstream) return data class StreamlinkRunner(StreamProvider): def stream(self): try: session = None if self.proxy is None or self.proxy.proxy is None: session = config.streamlink_default_session else: session = config.streamlink_sessions.get(self.proxy.proxy) media = session.resolve_url(self.upstream) streams = None if isinstance(media, tuple): plugin = media[0](media[1]) streams = plugin.streams() else: streams = media.streams() if streams is not None: for key in reversed(streams): stream = streams.get(key) if hasattr(stream, "url"): return StreamData( stream.url, self.extract_mime(stream.url), None, None, None, None, False, ) except Exception as e: self.logger.info("%s <%s>", e, self.upstream) return StreamData(None, None, None, None, None, None, False) class YoutubeRunner(StreamProvider): def stream(self): best_stream = self.init_stream() try: opts = {} opts["logger"] = DummyLogger() if self.proxy is not None and self.proxy.proxy is not None: opts["proxy"] = "socks5://" + self.proxy.proxy with youtube_dl.YoutubeDL(opts) as ydl: info = ydl.extract_info(self.upstream, download=False) vformats = info.get("formats") best_stream["thumbnail"] = info.get("thumbnail") best_stream["description"] = info.get("channel") best_stream["title"] = info.get("title") best_format = {} best_format["width"] = 10 best_format["height"] = 10 if isinstance(vformats, list): for vformat in vformats: acodec = vformat.get("acodec") vcodec = vformat.get("vcodec") current_width = vformat.get("height") current_height = vformat.get("width") best_width = best_format.get("width") best_height = best_format.get("height") new_url = vformat.get("url") if ( isinstance(best_width, int) and isinstance(best_height, int) and isinstance(current_width, int) and isinstance(current_height, int) and isinstance(new_url, str) and current_width > best_width and current_height > best_height and acodec != "none" and vcodec != "none" ): best_format = vformat best_stream["override"] = True best_stream["upstream"] = new_url best_stream["ctype"] = self.extract_mime(new_url) except Exception as e: self.logger.info("%s <%s>", e, self.upstream) return StreamData(**best_stream) class SeafileRunner(StreamProvider): def stream(self): stream_data = self.init_stream() json_data = None proc = subprocess.run( ["/app/seafile.js", self.upstream], capture_output=True, encoding="utf-8" ) try: json_data = json.loads(proc.stdout) except Exception as e: self.logger.info("%s <%s>", e, self.upstream) else: stream_data["title"] = json_data.get("filePath") stream_data["upstream"] = json_data.get("rawPath") if json_data.get("filePath").lower().endswith(".mp4"): stream_data["proxy_ctype"] = "video/mp4" return StreamData(**stream_data) class MetaProvider(StreamProvider): def parse_web(self): stream_data = self.init_stream() data = {} try: resp = requests.get(self.upstream) parser = MetaParser() parser.feed(resp.text) data = parser.meta_data except Exception as e: self.logger.info("%s <%s>", e, self.upstream) else: stream_data["upstream"] = data.get("downloadURL") stream_data["ctype"] = data.get("mimetype") stream_data["title"] = data.get("og:title") stream_data["thumbnail"] = data.get("og:image") stream_data["description"] = data.get("og:description") return stream_data class MetaRunner(MetaProvider): def stream(self): stream_data = self.parse_web() stream_data["upstream"] = None stream_data["ctype"] = None return StreamData(**stream_data) class NextcloudRunner(MetaProvider): def stream(self): stream_data = self.parse_web() stream_data["thumbnail"] = None return StreamData(**stream_data) upstream_cache = expiringdict.ExpiringDict(max_len=512, max_age_seconds=18000) async def get_from_runner(cache_key, runner, logger): result = None cached = upstream_cache.get(cache_key) if isinstance(cached, StreamData): return cached try: result_temp = await runner.run() except Exception as e: logger.info(e) else: if isinstance(result_temp, StreamData) and result_temp.has_data(): upstream_cache[cache_key] = result_temp result = result_temp return result async def get_streamlink(upstream, proxy, logger): return await get_from_runner( (0, upstream), StreamlinkRunner(upstream, proxy, logger), logger ) async def get_ytdl(upstream, proxy, logger): return await get_from_runner( (1, upstream), YoutubeRunner(upstream, proxy, logger), logger ) async def get_meta(upstream, proxy, logger): return await get_from_runner( (2, upstream), MetaRunner(upstream, proxy, logger), logger ) async def get_nextcloud(upstream, proxy, logger): return await get_from_runner( (3, upstream), NextcloudRunner(upstream, proxy, logger), logger ) async def get_seafile(upstream, proxy, logger): return await get_from_runner( (3, upstream), SeafileRunner(upstream, proxy, logger), logger ) async def get_any(upstream, proxy, logger): cache_key = (4, upstream) cached = upstream_cache.get(cache_key) if isinstance(cached, StreamData): return cached tasks = [] tasks.append(asyncio.create_task(get_streamlink(upstream, proxy, logger))) tasks.append(asyncio.create_task(get_ytdl(upstream, proxy, logger))) tasks.append(asyncio.create_task(get_meta(upstream, proxy, logger))) result = StreamData(None, None, None, None, None, None, False) for task in asyncio.as_completed(tasks): temp_result = await task if isinstance(temp_result, StreamData): result.update("upstream", temp_result.upstream(), temp_result.override) result.update("ctype", temp_result.ctype(), temp_result.override) result.update("thumbnail", temp_result.thumbnail(), temp_result.override) result.update("title", temp_result.title(), temp_result.override) result.update( "description", temp_result.description(), temp_result.override ) if result.complete(): upstream_cache[cache_key] = result break for task in tasks: if not task.done(): task.cancel() return result