stream-api/backend/stream_providers.py

360 lines
14 KiB
Python
Raw Normal View History

2021-05-20 11:09:00 +00:00
import youtube_dl
2021-11-18 22:02:14 +00:00
try:
import streamlink
except Exception as e:
print(e)
2021-05-28 13:57:19 +00:00
import requests
2021-05-20 11:09:00 +00:00
import asyncio
2021-05-28 13:57:19 +00:00
import html.parser
2021-06-04 15:03:59 +00:00
import urllib.parse
2021-05-30 10:26:32 +00:00
import expiringdict
2021-09-05 12:55:57 +00:00
import subprocess
2021-05-31 10:59:25 +00:00
import json
2021-09-02 13:27:51 +00:00
import re
2021-05-28 13:57:19 +00:00
2021-11-18 22:02:14 +00:00
try:
streamlink_sessions = {}
streamlink_default_session = streamlink.Streamlink()
2021-11-18 22:08:32 +00:00
except Exception as e:
print(e)
2021-05-29 08:33:10 +00:00
def setup(proxies):
2021-11-18 22:02:14 +00:00
try:
for proxy in proxies:
streamlink_sessions[proxy] = streamlink.Streamlink()
streamlink_sessions[proxy].set_option("http-proxy", "socks5://" + proxy)
streamlink_sessions[proxy].set_option("https-proxy", "socks5://" + proxy)
except Exception as e:
print(e)
2021-05-20 11:09:00 +00:00
class DummyLogger():
def debug(self, msg):
pass
def warning(self, msg):
pass
def error(self, msg):
pass
2021-05-28 13:57:19 +00:00
class MetaParser(html.parser.HTMLParser):
2021-05-31 10:59:25 +00:00
def __init__(self):
self.meta_data = {}
super().__init__()
def handle_starttag_meta(self, attrs):
name = None
for attr in (attrs + attrs):
if len(attr) == 2:
if isinstance(name, str):
if attr[0] == "content":
self.meta_data[name] = attr[1]
return
elif attr[0] == "property":
name = attr[1]
def handle_starttag_input(self, attrs):
name = None
for attr in (attrs + attrs):
if len(attr) == 2:
if isinstance(name, str):
if attr[0] == "value":
self.meta_data[name] = attr[1]
return
elif attr[0] == "name":
name = attr[1]
def handle_starttag(self, tag, attrs):
if tag == "meta":
return self.handle_starttag_meta(attrs)
elif tag == "input":
return self.handle_starttag_input(attrs)
2021-05-28 13:57:19 +00:00
class StreamData():
2021-11-19 09:26:33 +00:00
def __init__(self, upstream, ctype, proxy_ctype, thumbnail, title, description, override):
2021-05-28 13:57:19 +00:00
self.values = {}
self.values["upstream"] = upstream
2021-05-31 10:59:25 +00:00
self.values["ctype"] = ctype
2021-11-19 09:26:33 +00:00
self.values["proxy_ctype"] = proxy_ctype
2021-05-28 13:57:19 +00:00
self.values["thumbnail"] = thumbnail
self.values["title"] = title
self.values["description"] = description
self.override = override
def update(self, key, value, override):
missing = not isinstance(self.values.get(key), str)
override = override and isinstance(value, str)
if missing or override:
self.values[key] = value
def upstream(self):
return self.values.get("upstream")
2021-05-31 10:59:25 +00:00
def ctype(self):
2021-11-19 09:47:36 +00:00
ctype = self.values.get("ctype")
proxy_ctype = self.values.get("proxy_ctype")
if isinstance(ctype, str) and isinstance(proxy_ctype, str):
if not ctype.startswith("audio/") and not ctype.startswith("video/"):
return proxy_ctype
return ctype
2021-11-19 09:26:33 +00:00
def proxy_ctype(self):
2021-11-19 09:47:36 +00:00
return self.values.get("proxy_ctype")
2021-05-28 13:57:19 +00:00
def thumbnail(self):
return self.values.get("thumbnail")
def title(self):
return self.values.get("title")
def description(self):
return self.values.get("description")
2021-05-28 14:10:13 +00:00
def complete(self):
return None not in self.values.values()
2021-06-04 07:18:57 +00:00
def has_data(self):
for elem in self.values.values():
if isinstance(elem, str):
return True
return False
2021-05-28 13:57:19 +00:00
def meta(self):
data = []
if isinstance(self.values.get("thumbnail"), str):
data.append(("og:image", self.values.get("thumbnail")))
if isinstance(self.values.get("title"), str):
data.append(("og:title", self.values.get("title")))
if isinstance(self.values.get("description"), str):
data.append(("og:description", self.values.get("description")))
return data
2021-05-20 11:09:00 +00:00
class StreamProvider():
2021-05-28 13:57:19 +00:00
def __init__(self, upstream, proxy, logger):
2021-05-30 10:26:32 +00:00
self.name = self.__class__.__name__
2021-05-20 11:09:00 +00:00
self.upstream = upstream
self.proxy = None
2021-05-28 13:57:19 +00:00
self.logger = logger
2021-05-20 11:09:00 +00:00
proxy = str(proxy)
if len(proxy) > 5:
2021-05-29 09:03:46 +00:00
self.proxy = proxy
2021-06-04 15:03:59 +00:00
def extract_mime(self, upstream):
try:
url = urllib.parse.urlparse(upstream)
query = urllib.parse.parse_qs(url.query.lower())
except Exception as e:
self.logger.info(e)
else:
mime_types = query.get("mime")
if isinstance(mime_types, list):
for mime in mime_types:
if isinstance(mime, str) and not mime.startswith("text"):
if "mpegurl" in mime:
return "application/vnd.apple.mpegurl"
return mime
return None
2021-06-03 11:04:26 +00:00
def init_stream(self):
stream = {}
stream["upstream"] = None
2021-11-19 09:26:33 +00:00
stream["proxy_ctype"] = None
2021-06-03 11:04:26 +00:00
stream["ctype"] = None
stream["thumbnail"] = None
stream["title"] = None
stream["description"] = None
stream["override"] = False
return stream
2021-05-31 10:59:25 +00:00
def process(self):
data = self.stream()
2021-11-19 09:26:33 +00:00
proxy_ctype = data.proxy_ctype()
2021-05-31 11:35:45 +00:00
if not isinstance(data.upstream(), str) or isinstance(data.ctype(), str):
2021-05-31 10:59:25 +00:00
return data
proxies = None
if isinstance(self.proxy, str):
proxies = {}
proxies["http"] = "socks5://" + self.proxy
proxies["https"] = "socks5://" + self.proxy
ctype = None
upstream = data.upstream()
2021-05-31 10:59:25 +00:00
try:
resp = requests.head(data.upstream(), proxies=proxies, timeout=5, allow_redirects=True)
2021-05-31 10:59:25 +00:00
except Exception as e:
2021-06-03 11:04:26 +00:00
self.logger.info("%s <%s>", e, self.upstream)
2021-05-31 10:59:25 +00:00
else:
if resp.ok:
upstream = resp.url
ctype = resp.headers.get("Content-Type", "text/plain").lower()
if ctype.startswith("text"):
ctype = None
elif "mpegurl" in ctype:
ctype = "application/vnd.apple.mpegurl"
2021-11-19 09:26:33 +00:00
return StreamData(data.upstream(), ctype, proxy_ctype, data.thumbnail(), data.title(), data.description(), data.override)
2021-05-28 16:24:50 +00:00
async def run(self):
data = None
try:
2021-05-31 10:59:25 +00:00
future = asyncio.to_thread(self.process)
2021-05-28 16:24:50 +00:00
data = await asyncio.wait_for(future, timeout=5)
except Exception as e:
2021-06-03 11:04:26 +00:00
self.logger.info("%s <%s>", e, self.upstream)
2021-05-28 16:24:50 +00:00
return data
2021-05-20 11:09:00 +00:00
class StreamlinkRunner(StreamProvider):
def stream(self):
2021-05-28 13:57:19 +00:00
try:
2021-05-29 08:33:10 +00:00
session = None
if self.proxy is None:
session = streamlink_default_session
else:
session = streamlink_sessions.get(self.proxy)
media = session.resolve_url(self.upstream)
2021-12-27 15:57:19 +00:00
streams = None
if isinstance(media, tuple):
plugin = media[0](media[1])
streams = plugin.streams()
else:
streams = media.streams()
2021-05-28 13:57:19 +00:00
if streams is not None:
for key in reversed(streams):
stream = streams.get(key)
if hasattr(stream, "url"):
2021-11-19 09:26:33 +00:00
return StreamData(stream.url, self.extract_mime(stream.url), None, None, None, None, False)
2021-05-28 13:57:19 +00:00
except Exception as e:
2021-06-03 11:04:26 +00:00
self.logger.info("%s <%s>", e, self.upstream)
2021-11-19 09:26:33 +00:00
return StreamData(None, None, None, None, None, None, False)
2021-05-20 11:09:00 +00:00
class YoutubeRunner(StreamProvider):
def stream(self):
2021-06-03 11:04:26 +00:00
best_stream = self.init_stream()
2021-05-28 13:57:19 +00:00
try:
opts = {}
opts["logger"] = DummyLogger()
if isinstance(self.proxy, str):
2021-05-29 09:08:17 +00:00
opts["proxy"] = "socks5://" + self.proxy
2021-05-28 13:57:19 +00:00
with youtube_dl.YoutubeDL(opts) as ydl:
info = ydl.extract_info(self.upstream, download=False)
vformats = info.get("formats")
2021-06-03 11:04:26 +00:00
best_stream["thumbnail"] = info.get("thumbnail")
best_stream["description"] = info.get("channel")
best_stream["title"] = info.get("title")
2021-05-28 13:57:19 +00:00
best_format = {}
best_format["width"] = 10
best_format["height"] = 10
if isinstance(vformats, list):
for vformat in vformats:
acodec = vformat.get("acodec")
vcodec = vformat.get("vcodec")
current_width = vformat.get("height")
current_height = vformat.get("width")
best_width = best_format.get("width")
best_height = best_format.get("height")
new_url = vformat.get("url")
if (isinstance(best_width, int) and
isinstance(best_height, int) and
isinstance(current_width, int) and
isinstance(current_height, int) and
isinstance(new_url, str) and
current_width > best_width and
current_height > best_height and
acodec != "none" and
vcodec != "none"):
best_format = vformat
2021-06-03 11:04:26 +00:00
best_stream["override"] = True
best_stream["upstream"] = new_url
2021-06-04 15:03:59 +00:00
best_stream["ctype"] = self.extract_mime(new_url)
2021-05-28 13:57:19 +00:00
except Exception as e:
2021-06-03 11:04:26 +00:00
self.logger.info("%s <%s>", e, self.upstream)
return StreamData(**best_stream)
2021-05-31 10:59:25 +00:00
2021-09-02 13:27:51 +00:00
class SeafileRunner(StreamProvider):
def stream(self):
stream_data = self.init_stream()
2021-09-05 12:55:57 +00:00
json_data = None
proc = subprocess.run(["/app/seafile.js", self.upstream], capture_output=True, encoding="utf-8")
2021-09-02 13:27:51 +00:00
try:
2021-09-05 12:55:57 +00:00
json_data = json.loads(proc.stdout)
2021-09-02 13:27:51 +00:00
except Exception as e:
self.logger.info("%s <%s>", e, self.upstream)
2021-09-05 12:55:57 +00:00
else:
stream_data["title"] = json_data.get("filePath")
2021-11-18 20:26:37 +00:00
stream_data["upstream"] = json_data.get("rawPath")
if json_data.get("filePath").lower().endswith(".mp4"):
2021-11-19 09:26:33 +00:00
stream_data["proxy_ctype"] = "video/mp4"
2021-09-02 13:27:51 +00:00
return StreamData(**stream_data)
2021-06-03 11:04:26 +00:00
class MetaProvider(StreamProvider):
def parse_web(self):
stream_data = self.init_stream()
2021-05-31 10:59:25 +00:00
data = {}
try:
resp = requests.get(self.upstream)
2021-06-03 11:04:26 +00:00
parser = MetaParser()
2021-05-31 10:59:25 +00:00
parser.feed(resp.text)
data = parser.meta_data
except Exception as e:
2021-06-03 11:04:26 +00:00
self.logger.info("%s <%s>", e, self.upstream)
else:
stream_data["upstream"] = data.get("downloadURL")
stream_data["ctype"] = data.get("mimetype")
stream_data["title"] = data.get("og:title")
stream_data["thumbnail"] = data.get("og:image")
stream_data["description"] = data.get("og:description")
return stream_data
2021-05-28 13:57:19 +00:00
2021-06-03 11:04:26 +00:00
class MetaRunner(MetaProvider):
2021-05-28 13:57:19 +00:00
def stream(self):
2021-06-03 11:04:26 +00:00
stream_data = self.parse_web()
stream_data["upstream"] = None
stream_data["ctype"] = None
return StreamData(**stream_data)
class NextcloudRunner(MetaProvider):
def stream(self):
stream_data = self.parse_web()
stream_data["thumbnail"] = None
return StreamData(**stream_data)
2021-05-20 11:09:00 +00:00
2021-06-04 07:11:28 +00:00
upstream_cache = expiringdict.ExpiringDict(max_len=512, max_age_seconds=18000)
2021-05-30 10:26:32 +00:00
async def get_from_runner(cache_key, runner, logger):
2021-05-26 17:15:12 +00:00
result = None
2021-05-30 10:26:32 +00:00
cached = upstream_cache.get(cache_key)
if isinstance(cached, StreamData):
return cached
2021-05-26 17:15:12 +00:00
try:
result_temp = await runner.run()
except Exception as e:
logger.info(e)
else:
2021-06-04 07:18:57 +00:00
if isinstance(result_temp, StreamData) and result_temp.has_data():
upstream_cache[cache_key] = result_temp
result = result_temp
2021-05-25 07:39:34 +00:00
return result
2021-05-20 11:46:52 +00:00
async def get_streamlink(upstream, proxy, logger):
2021-05-30 10:26:32 +00:00
return await get_from_runner((0, upstream), StreamlinkRunner(upstream, proxy, logger), logger)
async def get_ytdl(upstream, proxy, logger):
return await get_from_runner((1, upstream), YoutubeRunner(upstream, proxy, logger), logger)
2021-05-28 13:57:19 +00:00
async def get_meta(upstream, proxy, logger):
2021-05-30 10:26:32 +00:00
return await get_from_runner((2, upstream), MetaRunner(upstream, proxy, logger), logger)
2021-05-20 11:46:52 +00:00
2021-05-31 10:59:25 +00:00
async def get_nextcloud(upstream, proxy, logger):
return await get_from_runner((3, upstream), NextcloudRunner(upstream, proxy, logger), logger)
2021-09-02 13:27:51 +00:00
async def get_seafile(upstream, proxy, logger):
return await get_from_runner((3, upstream), SeafileRunner(upstream, proxy, logger), logger)
2021-05-20 11:46:52 +00:00
async def get_any(upstream, proxy, logger):
2021-05-31 10:59:25 +00:00
cache_key = (4, upstream)
2021-05-30 17:29:35 +00:00
cached = upstream_cache.get(cache_key)
if isinstance(cached, StreamData):
return cached
2021-05-20 11:46:52 +00:00
tasks = []
tasks.append(asyncio.create_task(get_streamlink(upstream, proxy, logger)))
tasks.append(asyncio.create_task(get_ytdl(upstream, proxy, logger)))
2021-05-28 13:57:19 +00:00
tasks.append(asyncio.create_task(get_meta(upstream, proxy, logger)))
2021-05-31 10:59:25 +00:00
2021-11-19 09:26:33 +00:00
result = StreamData(None, None, None, None, None, None, False)
2021-05-28 16:24:50 +00:00
for task in asyncio.as_completed(tasks):
temp_result = await task
2021-05-28 13:57:19 +00:00
if isinstance(temp_result, StreamData):
result.update("upstream", temp_result.upstream(), temp_result.override)
2021-05-31 11:35:45 +00:00
result.update("ctype", temp_result.ctype(), temp_result.override)
2021-05-28 13:57:19 +00:00
result.update("thumbnail", temp_result.thumbnail(), temp_result.override)
result.update("title", temp_result.title(), temp_result.override)
result.update("description", temp_result.description(), temp_result.override)
2021-05-28 14:10:13 +00:00
if result.complete():
2021-05-30 17:29:35 +00:00
upstream_cache[cache_key] = result
2021-05-28 14:10:13 +00:00
break
2021-05-20 11:46:52 +00:00
for task in tasks:
if not task.done():
task.cancel()
return result