import youtube_dl import streamlink import requests import asyncio import html.parser import expiringdict streamlink_sessions = {} streamlink_default_session = streamlink.Streamlink() def setup(proxies): for proxy in proxies: streamlink_sessions[proxy] = streamlink.Streamlink() streamlink_sessions[proxy].set_option("http-proxy", "socks5://" + proxy) streamlink_sessions[proxy].set_option("https-proxy", "socks5://" + proxy) class DummyLogger(): def debug(self, msg): pass def warning(self, msg): pass def error(self, msg): pass class MetaParser(html.parser.HTMLParser): def __init__(self): self.meta_data = {} self.accepted_attrs = [] self.accepted_attrs.append("og:title") self.accepted_attrs.append("og:description") self.accepted_attrs.append("og:image") super().__init__() def handle_starttag(self, tag, attrs): if tag == "meta": name = None for attr in (attrs + attrs): if len(attr) == 2: if isinstance(name, str): if attr[0] == "content": self.meta_data[name] = attr[1] return elif attr[0] == "property" and attr[1] in self.accepted_attrs: name = attr[1] class StreamData(): def __init__(self, upstream, thumbnail, title, description, override): self.values = {} self.values["upstream"] = upstream self.values["thumbnail"] = thumbnail self.values["title"] = title self.values["description"] = description self.override = override def update(self, key, value, override): missing = not isinstance(self.values.get(key), str) override = override and isinstance(value, str) if missing or override: self.values[key] = value def upstream(self): return self.values.get("upstream") def thumbnail(self): return self.values.get("thumbnail") def title(self): return self.values.get("title") def description(self): return self.values.get("description") def complete(self): return None not in self.values.values() def meta(self): data = [] if isinstance(self.values.get("thumbnail"), str): data.append(("og:image", self.values.get("thumbnail"))) if isinstance(self.values.get("title"), str): data.append(("og:title", self.values.get("title"))) if isinstance(self.values.get("description"), str): data.append(("og:description", self.values.get("description"))) return data class StreamProvider(): def __init__(self, upstream, proxy, logger): self.name = self.__class__.__name__ self.upstream = upstream self.proxy = None self.logger = logger proxy = str(proxy) if len(proxy) > 5: self.proxy = proxy async def run(self): data = None try: future = asyncio.to_thread(self.stream) data = await asyncio.wait_for(future, timeout=5) except Exception as e: self.logger.info(e) return data class StreamlinkRunner(StreamProvider): def stream(self): try: session = None if self.proxy is None: session = streamlink_default_session else: session = streamlink_sessions.get(self.proxy) media = session.resolve_url(self.upstream) streams = media.streams() if streams is not None: for key in reversed(streams): stream = streams.get(key) if hasattr(stream, "url"): return StreamData(stream.url, None, None, None, False) except Exception as e: self.logger.info(e) return StreamData(None, None, None, None, False) class YoutubeRunner(StreamProvider): def stream(self): best_url = None thumbnail = None title = None description = None try: opts = {} opts["logger"] = DummyLogger() if isinstance(self.proxy, str): opts["proxy"] = "socks5://" + self.proxy with youtube_dl.YoutubeDL(opts) as ydl: info = ydl.extract_info(self.upstream, download=False) vformats = info.get("formats") thumbnail = info.get("thumbnail") description = info.get("channel") title = info.get("title") best_format = {} best_format["width"] = 10 best_format["height"] = 10 if isinstance(vformats, list): for vformat in vformats: acodec = vformat.get("acodec") vcodec = vformat.get("vcodec") current_width = vformat.get("height") current_height = vformat.get("width") best_width = best_format.get("width") best_height = best_format.get("height") new_url = vformat.get("url") if (isinstance(best_width, int) and isinstance(best_height, int) and isinstance(current_width, int) and isinstance(current_height, int) and isinstance(new_url, str) and current_width > best_width and current_height > best_height and acodec != "none" and vcodec != "none"): best_format = vformat best_url = new_url except Exception as e: self.logger.info(e) return StreamData(best_url, thumbnail, title, description, True) class MetaRunner(StreamProvider): def stream(self): data = {} try: resp = requests.get(self.upstream) parser = MetaParser() parser.feed(resp.text) data = parser.meta_data except Exception as e: self.logger.info(e) return StreamData(None, data.get("og:image"), data.get("og:title"), data.get("og:description"), False) upstream_cache = expiringdict.ExpiringDict(max_len=4096, max_age_seconds=1800) async def get_from_runner(cache_key, runner, logger): result = None cached = upstream_cache.get(cache_key) if isinstance(cached, StreamData): return cached try: result_temp = await runner.run() except Exception as e: logger.info(e) else: upstream_cache[cache_key] = result_temp result = result_temp return result async def get_streamlink(upstream, proxy, logger): return await get_from_runner((0, upstream), StreamlinkRunner(upstream, proxy, logger), logger) async def get_ytdl(upstream, proxy, logger): return await get_from_runner((1, upstream), YoutubeRunner(upstream, proxy, logger), logger) async def get_meta(upstream, proxy, logger): return await get_from_runner((2, upstream), MetaRunner(upstream, proxy, logger), logger) async def get_any(upstream, proxy, logger): tasks = [] tasks.append(asyncio.create_task(get_streamlink(upstream, proxy, logger))) tasks.append(asyncio.create_task(get_ytdl(upstream, proxy, logger))) tasks.append(asyncio.create_task(get_meta(upstream, proxy, logger))) result = StreamData(None, None, None, None, False) for task in asyncio.as_completed(tasks): temp_result = await task if isinstance(temp_result, StreamData): result.update("upstream", temp_result.upstream(), temp_result.override) result.update("thumbnail", temp_result.thumbnail(), temp_result.override) result.update("title", temp_result.title(), temp_result.override) result.update("description", temp_result.description(), temp_result.override) if result.complete(): break for task in tasks: if not task.done(): task.cancel() return result