#!/usr/bin/env python3 import youtube_dl import streamlink import asyncio class DummyLogger(): def debug(self, msg): pass def warning(self, msg): pass def error(self, msg): pass class StreamProvider(): def __init__(self, upstream, proxy): self.upstream = upstream self.proxy = None proxy = str(proxy) if len(proxy) > 5: self.proxy = "socks5://" + proxy class StreamlinkRunner(StreamProvider): def stream(self): session = streamlink.Streamlink() if self.proxy is not None: session.set_option("https-proxy", self.proxy) session.set_option("http-proxy", self.proxy) streams = session.streams(self.upstream) if streams is not None: for key in reversed(streams): stream = streams.get(key) if hasattr(stream, "url"): return stream.url return None async def run(self): return await asyncio.to_thread(self.stream) class YoutubeRunner(StreamProvider): def stream(self): opts = {} opts["logger"] = DummyLogger() if isinstance(self.proxy, str): opts["proxy"] = self.proxy best_url = None with youtube_dl.YoutubeDL(opts) as ydl: info = ydl.extract_info(self.upstream, download=False) vformats = info.get("formats") best_format = {} best_format["width"] = 10 best_format["height"] = 10 if isinstance(vformats, list): for vformat in vformats: acodec = vformat.get("acodec") vcodec = vformat.get("vcodec") current_width = vformat.get("height") current_height = vformat.get("width") best_width = best_format.get("width") best_height = best_format.get("height") new_url = vformat.get("url") if (isinstance(best_width, int) and isinstance(best_height, int) and isinstance(current_width, int) and isinstance(current_height, int) and isinstance(new_url, str) and current_width > best_width and current_height > best_height and acodec != "none" and vcodec != "none"): best_format = vformat best_url = new_url return best_url async def run(self): return await asyncio.to_thread(self.stream) async def get_ytdl(upstream, proxy, logger): result = None try: runner = YoutubeRunner(upstream, proxy) result_temp = await runner.run() except Exception as e: logger.info(e) else: result = result_temp return result async def get_streamlink(upstream, proxy, logger): result = None try: runner = StreamlinkRunner(upstream, proxy) result_temp = await runner.run() except Exception as e: logger.info(e) else: result = result_temp return result async def get_any(upstream, proxy, logger): tasks = [] tasks.append(asyncio.create_task(get_streamlink(upstream, proxy, logger))) tasks.append(asyncio.create_task(get_ytdl(upstream, proxy, logger))) result = None for task in asyncio.as_completed(tasks, timeout=5.0): temp_result = None try: temp_result = await task except Exception as e: logger.info(e) if isinstance(temp_result, str): result = temp_result break for task in tasks: if not task.done(): task.cancel() return result