From 38351f27e00c9039ca18688871da5f5758a75d28 Mon Sep 17 00:00:00 2001 From: Roy Olav Purser Date: Fri, 14 May 2021 15:10:12 +0200 Subject: [PATCH] replace requests with aiohttp --- Dockerfile | 2 +- stream.py | 140 ++++++++++++++++++++++++++++++----------------------- 2 files changed, 80 insertions(+), 62 deletions(-) diff --git a/Dockerfile b/Dockerfile index 5b62769..6f5738e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ FROM alpine:edge as base -RUN ["apk", "add", "--no-cache", "--repository", "https://dl-cdn.alpinelinux.org/alpine/edge/testing", "streamlink", "py3-tornado"] +RUN ["apk", "add", "--no-cache", "--repository", "https://dl-cdn.alpinelinux.org/alpine/edge/testing", "streamlink", "py3-tornado", "py3-aiohttp", "py3-aiohttp-socks"] RUN ["mkdir", "/app"] COPY ["stream.py", "/app/stream.py"] COPY ["sources.py", "/app/sources.py"] diff --git a/stream.py b/stream.py index 15ce084..467b60e 100755 --- a/stream.py +++ b/stream.py @@ -3,12 +3,13 @@ import json import urllib.parse import re import os +import base64 +import logging import streamlink import tornado.web import tornado.routing -import requests -import base64 -import logging +import aiohttp +import aiohttp_socks logging.basicConfig(format='[%(filename)s:%(lineno)d] %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) @@ -26,10 +27,6 @@ proxy_server = os.environ.get("PROXY_SERVER") class ProxyElem(): def __init__(self, proxy): self.proxy = proxy - self.req = {} - if proxy is not None: - self.req["http"] = "socks5://" + proxy - self.req["https"] = "socks5://" + proxy def stream(self): session = streamlink.Streamlink() session.set_option("http-timeout", 2.0) @@ -37,8 +34,38 @@ class ProxyElem(): session.set_option("https-proxy", "socks5://" + self.proxy) session.set_option("http-proxy", "socks5://" + self.proxy) return session + def session(self): + connector = None + if self.proxy is not None: + connector = aiohttp_socks.ProxyConnector.from_url("socks5://" + self.proxy) + timeout = aiohttp.ClientTimeout(total=2) + return aiohttp.ClientSession(connector=connector, timeout=timeout) def __repr__(self): return str(self.proxy) + async def content_type(self, url): + async with self.session() as session: + resp = await session.head(url) + return resp.headers.get("Content-Type", "binary/octet-stream") + async def proxy_url(self, current, path): + data = {} + data["upstream"] = urllib.parse.urljoin(current, path) + data["proxied"] = True + ret = None + if self.proxy is None: + data["proxied"] = False + else: + data["proxy"] = self.proxy + if proxy_server is None: + return data["upstream"] + async with self.session() as session: + resp = await session.post(proxy_server, json=data) + text = await resp.text() + return json.loads(text) + +class AsyncSession(): + def __init__(self, session, future): + self.session = session + self.future = future proxies = {} for key in providers: @@ -55,7 +82,8 @@ for key in providers: proxies[key].append(ProxyElem(proxy)) class UpstreamHandler(): - def __init__(self, handler): + def __init__(self): + self.provider = None self.render_url = None self.stream_url = None self.proxy = None @@ -63,55 +91,65 @@ class UpstreamHandler(): self.upstream_safe = None self.render = False self.stream = True - provider = handler.get_query_argument("provider", None) + async def setup(self, handler): + self.provider = handler.get_query_argument("provider", None) render_str = handler.get_query_argument("render", "false") if render_str.lower() == "true": self.render = True self.stream = False - if provider in providers.keys(): + if self.provider in providers.keys(): path = handler.request.path - if provider == "youtube": + if self.provider == "youtube": path = path.strip("/") if isinstance(stream_server, str): - self.render_url = f'{stream_server}{handler.request.path}?provider={provider}&render=true' - self.stream_url = f'{stream_server}{handler.request.path}?provider={provider}' + self.render_url = f'{stream_server}{handler.request.path}?provider={self.provider}&render=true' + self.stream_url = f'{stream_server}{handler.request.path}?provider={self.provider}' else: - self.render_url = f'{handler.request.path}?provider={provider}&render=true' - self.stream_url = f'{handler.request.path}?provider={provider}' + self.render_url = f'{handler.request.path}?provider={self.provider}&render=true' + self.stream_url = f'{handler.request.path}?provider={self.provider}' - src = providers[provider] + path + src = providers[self.provider] + path proxy_list = None proxy_iter = None - proxy_list_orig = proxies.get(provider) + proxy_list_orig = proxies.get(self.provider) if isinstance(proxy_list_orig, list): proxy_list = proxy_list_orig.copy() proxy_iter = proxy_list_orig.copy() if isinstance(proxy_list, list): + delays = [] for i in proxy_iter: current_list = proxy_list.copy() current = proxy_list.pop() proxy_list = [current] + proxy_list + session = current.session() + delays.append(AsyncSession(session, session.head(src))) + for delay in delays: try: - resp = requests.head(src, allow_redirects=True, proxies=current.req, timeout=2) + resp = await delay.future except Exception as e: logger.info(e) else: - if resp.url.lower().startswith("https://consent.youtube.com"): + new_url = str(resp.url) + if new_url.lower().startswith("https://consent.youtube.com"): self.upstream = src self.upstream_safe = urllib.parse.quote(src) else: - self.upstream = resp.url - self.upstream_safe = urllib.parse.quote(resp.url) + self.upstream = new_url + self.upstream_safe = urllib.parse.quote(new_url) self.proxy = current - proxies[provider] = current_list - return - def meta(self): + proxies[self.provider] = current_list + break + for delay in delays: + await delay.session.close() + async def meta(self): data = [] try: embed_url = f'https://noembed.com/embed?url={self.upstream_safe}' logger.info(embed_url) - resp = requests.get(embed_url) - data_raw = json.loads(resp.text) + async with self.proxy.session() as session: + resp = await session.get(embed_url) + text = await resp.text() + data_raw = json.loads(text) if isinstance(data_raw, dict): data_new = {} data_valid = True @@ -173,27 +211,6 @@ try: except Exception as e: logger.info(e) -def get_proxy_url(proxy, current, path): - data = {} - data["upstream"] = urllib.parse.urljoin(current, path) - data["proxied"] = True - ret = None - if proxy is None: - data["proxied"] = False - else: - data["proxy"] = proxy - if proxy_server is None: - return data["upstream"] - presp = requests.post(proxy_server, json=data) - return presp.text - -def upstream_type(current, proxy): - if proxy is None: - resp = requests.head(current) - else: - resp = requests.head(current, proxies=proxy.req) - return resp.headers.get("Content-Type", "binary/octet-stream") - def rewrite(current, provider, proxy): resp = requests.get(current, proxies=proxy.req) ndata = None @@ -234,23 +251,24 @@ def rewrite(current, provider, proxy): ndata += "\n" return ndata class MainHandler(tornado.web.RequestHandler): - def handle_any(self): - handler = UpstreamHandler(self) + async def handle_any(self): + handler = UpstreamHandler() + await handler.setup(self) if handler.render: - self.handle_render(handler) + await self.handle_render(handler) elif handler.stream: - self.handle_stream(handler) + await self.handle_stream(handler) else: logger.info(f'provider missing {self.request.uri}') self.set_status(404) self.write("Stream not found. (provider missing)") - def handle_render(self, handler): + async def handle_render(self, handler): if template_js is not None and template_html is not None: rendered_js = template_js.generate(stream=handler.stream_url); b64_js = str(base64.b64encode(rendered_js), "ascii") script = f'data:text/javascript;charset=utf-8;base64,{b64_js}' - meta = handler.meta() + meta = await handler.meta() data["script"] = script data["videojs_version"] = videojs_version data["chromecast_version"] = chromecast_version @@ -263,7 +281,7 @@ class MainHandler(tornado.web.RequestHandler): self.set_status(404) self.write("HTML template missing.") - def handle_stream(self, handler): + async def handle_stream(self, handler): upstream = None if handler.proxy is not None: try: @@ -279,16 +297,16 @@ class MainHandler(tornado.web.RequestHandler): upstream = stream.url break else: - logger.info(f'invalid provider ({provider})') + logger.info(f'invalid provider ({handler.provider})') self.set_status(404) self.write("Stream not found. (invalid provider)") return if upstream is None: - logger.info(f'invalid upstream ({provider})') + logger.info(f'invalid upstream ({handler.provider})') self.set_status(404) self.write("Stream not found. (invalid upstream)") else: - ctype = upstream_type(upstream, proxy) + ctype = await handler.proxy.content_type(upstream) data = None if "mpegurl" in ctype.lower(): data = rewrite(upstream, provider, proxy) @@ -312,10 +330,10 @@ class MainHandler(tornado.web.RequestHandler): else: self.set_header("Content-Type", "application/vnd.apple.mpegurl") self.write(data) - def get(self): - self.handle_any() - def head(self): - self.handle_any() + async def get(self): + await self.handle_any() + async def head(self): + await self.handle_any() class FileHandler(tornado.web.RequestHandler): def get(self):