replace requests with aiohttp

This commit is contained in:
Roy Olav Purser 2021-05-14 15:10:12 +02:00
parent 3d46f1bb64
commit 38351f27e0
Signed by: roypur
GPG Key ID: E14D26A036F21656
2 changed files with 80 additions and 62 deletions

View File

@ -1,5 +1,5 @@
FROM alpine:edge as base FROM alpine:edge as base
RUN ["apk", "add", "--no-cache", "--repository", "https://dl-cdn.alpinelinux.org/alpine/edge/testing", "streamlink", "py3-tornado"] RUN ["apk", "add", "--no-cache", "--repository", "https://dl-cdn.alpinelinux.org/alpine/edge/testing", "streamlink", "py3-tornado", "py3-aiohttp", "py3-aiohttp-socks"]
RUN ["mkdir", "/app"] RUN ["mkdir", "/app"]
COPY ["stream.py", "/app/stream.py"] COPY ["stream.py", "/app/stream.py"]
COPY ["sources.py", "/app/sources.py"] COPY ["sources.py", "/app/sources.py"]

140
stream.py
View File

@ -3,12 +3,13 @@ import json
import urllib.parse import urllib.parse
import re import re
import os import os
import base64
import logging
import streamlink import streamlink
import tornado.web import tornado.web
import tornado.routing import tornado.routing
import requests import aiohttp
import base64 import aiohttp_socks
import logging
logging.basicConfig(format='[%(filename)s:%(lineno)d] %(message)s', level=logging.INFO) logging.basicConfig(format='[%(filename)s:%(lineno)d] %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -26,10 +27,6 @@ proxy_server = os.environ.get("PROXY_SERVER")
class ProxyElem(): class ProxyElem():
def __init__(self, proxy): def __init__(self, proxy):
self.proxy = proxy self.proxy = proxy
self.req = {}
if proxy is not None:
self.req["http"] = "socks5://" + proxy
self.req["https"] = "socks5://" + proxy
def stream(self): def stream(self):
session = streamlink.Streamlink() session = streamlink.Streamlink()
session.set_option("http-timeout", 2.0) session.set_option("http-timeout", 2.0)
@ -37,8 +34,38 @@ class ProxyElem():
session.set_option("https-proxy", "socks5://" + self.proxy) session.set_option("https-proxy", "socks5://" + self.proxy)
session.set_option("http-proxy", "socks5://" + self.proxy) session.set_option("http-proxy", "socks5://" + self.proxy)
return session return session
def session(self):
connector = None
if self.proxy is not None:
connector = aiohttp_socks.ProxyConnector.from_url("socks5://" + self.proxy)
timeout = aiohttp.ClientTimeout(total=2)
return aiohttp.ClientSession(connector=connector, timeout=timeout)
def __repr__(self): def __repr__(self):
return str(self.proxy) return str(self.proxy)
async def content_type(self, url):
async with self.session() as session:
resp = await session.head(url)
return resp.headers.get("Content-Type", "binary/octet-stream")
async def proxy_url(self, current, path):
data = {}
data["upstream"] = urllib.parse.urljoin(current, path)
data["proxied"] = True
ret = None
if self.proxy is None:
data["proxied"] = False
else:
data["proxy"] = self.proxy
if proxy_server is None:
return data["upstream"]
async with self.session() as session:
resp = await session.post(proxy_server, json=data)
text = await resp.text()
return json.loads(text)
class AsyncSession():
def __init__(self, session, future):
self.session = session
self.future = future
proxies = {} proxies = {}
for key in providers: for key in providers:
@ -55,7 +82,8 @@ for key in providers:
proxies[key].append(ProxyElem(proxy)) proxies[key].append(ProxyElem(proxy))
class UpstreamHandler(): class UpstreamHandler():
def __init__(self, handler): def __init__(self):
self.provider = None
self.render_url = None self.render_url = None
self.stream_url = None self.stream_url = None
self.proxy = None self.proxy = None
@ -63,55 +91,65 @@ class UpstreamHandler():
self.upstream_safe = None self.upstream_safe = None
self.render = False self.render = False
self.stream = True self.stream = True
provider = handler.get_query_argument("provider", None) async def setup(self, handler):
self.provider = handler.get_query_argument("provider", None)
render_str = handler.get_query_argument("render", "false") render_str = handler.get_query_argument("render", "false")
if render_str.lower() == "true": if render_str.lower() == "true":
self.render = True self.render = True
self.stream = False self.stream = False
if provider in providers.keys(): if self.provider in providers.keys():
path = handler.request.path path = handler.request.path
if provider == "youtube": if self.provider == "youtube":
path = path.strip("/") path = path.strip("/")
if isinstance(stream_server, str): if isinstance(stream_server, str):
self.render_url = f'{stream_server}{handler.request.path}?provider={provider}&render=true' self.render_url = f'{stream_server}{handler.request.path}?provider={self.provider}&render=true'
self.stream_url = f'{stream_server}{handler.request.path}?provider={provider}' self.stream_url = f'{stream_server}{handler.request.path}?provider={self.provider}'
else: else:
self.render_url = f'{handler.request.path}?provider={provider}&render=true' self.render_url = f'{handler.request.path}?provider={self.provider}&render=true'
self.stream_url = f'{handler.request.path}?provider={provider}' self.stream_url = f'{handler.request.path}?provider={self.provider}'
src = providers[provider] + path src = providers[self.provider] + path
proxy_list = None proxy_list = None
proxy_iter = None proxy_iter = None
proxy_list_orig = proxies.get(provider) proxy_list_orig = proxies.get(self.provider)
if isinstance(proxy_list_orig, list): if isinstance(proxy_list_orig, list):
proxy_list = proxy_list_orig.copy() proxy_list = proxy_list_orig.copy()
proxy_iter = proxy_list_orig.copy() proxy_iter = proxy_list_orig.copy()
if isinstance(proxy_list, list): if isinstance(proxy_list, list):
delays = []
for i in proxy_iter: for i in proxy_iter:
current_list = proxy_list.copy() current_list = proxy_list.copy()
current = proxy_list.pop() current = proxy_list.pop()
proxy_list = [current] + proxy_list proxy_list = [current] + proxy_list
session = current.session()
delays.append(AsyncSession(session, session.head(src)))
for delay in delays:
try: try:
resp = requests.head(src, allow_redirects=True, proxies=current.req, timeout=2) resp = await delay.future
except Exception as e: except Exception as e:
logger.info(e) logger.info(e)
else: else:
if resp.url.lower().startswith("https://consent.youtube.com"): new_url = str(resp.url)
if new_url.lower().startswith("https://consent.youtube.com"):
self.upstream = src self.upstream = src
self.upstream_safe = urllib.parse.quote(src) self.upstream_safe = urllib.parse.quote(src)
else: else:
self.upstream = resp.url self.upstream = new_url
self.upstream_safe = urllib.parse.quote(resp.url) self.upstream_safe = urllib.parse.quote(new_url)
self.proxy = current self.proxy = current
proxies[provider] = current_list proxies[self.provider] = current_list
return break
def meta(self): for delay in delays:
await delay.session.close()
async def meta(self):
data = [] data = []
try: try:
embed_url = f'https://noembed.com/embed?url={self.upstream_safe}' embed_url = f'https://noembed.com/embed?url={self.upstream_safe}'
logger.info(embed_url) logger.info(embed_url)
resp = requests.get(embed_url) async with self.proxy.session() as session:
data_raw = json.loads(resp.text) resp = await session.get(embed_url)
text = await resp.text()
data_raw = json.loads(text)
if isinstance(data_raw, dict): if isinstance(data_raw, dict):
data_new = {} data_new = {}
data_valid = True data_valid = True
@ -173,27 +211,6 @@ try:
except Exception as e: except Exception as e:
logger.info(e) logger.info(e)
def get_proxy_url(proxy, current, path):
data = {}
data["upstream"] = urllib.parse.urljoin(current, path)
data["proxied"] = True
ret = None
if proxy is None:
data["proxied"] = False
else:
data["proxy"] = proxy
if proxy_server is None:
return data["upstream"]
presp = requests.post(proxy_server, json=data)
return presp.text
def upstream_type(current, proxy):
if proxy is None:
resp = requests.head(current)
else:
resp = requests.head(current, proxies=proxy.req)
return resp.headers.get("Content-Type", "binary/octet-stream")
def rewrite(current, provider, proxy): def rewrite(current, provider, proxy):
resp = requests.get(current, proxies=proxy.req) resp = requests.get(current, proxies=proxy.req)
ndata = None ndata = None
@ -234,23 +251,24 @@ def rewrite(current, provider, proxy):
ndata += "\n" ndata += "\n"
return ndata return ndata
class MainHandler(tornado.web.RequestHandler): class MainHandler(tornado.web.RequestHandler):
def handle_any(self): async def handle_any(self):
handler = UpstreamHandler(self) handler = UpstreamHandler()
await handler.setup(self)
if handler.render: if handler.render:
self.handle_render(handler) await self.handle_render(handler)
elif handler.stream: elif handler.stream:
self.handle_stream(handler) await self.handle_stream(handler)
else: else:
logger.info(f'provider missing {self.request.uri}') logger.info(f'provider missing {self.request.uri}')
self.set_status(404) self.set_status(404)
self.write("Stream not found. (provider missing)") self.write("Stream not found. (provider missing)")
def handle_render(self, handler): async def handle_render(self, handler):
if template_js is not None and template_html is not None: if template_js is not None and template_html is not None:
rendered_js = template_js.generate(stream=handler.stream_url); rendered_js = template_js.generate(stream=handler.stream_url);
b64_js = str(base64.b64encode(rendered_js), "ascii") b64_js = str(base64.b64encode(rendered_js), "ascii")
script = f'data:text/javascript;charset=utf-8;base64,{b64_js}' script = f'data:text/javascript;charset=utf-8;base64,{b64_js}'
meta = handler.meta() meta = await handler.meta()
data["script"] = script data["script"] = script
data["videojs_version"] = videojs_version data["videojs_version"] = videojs_version
data["chromecast_version"] = chromecast_version data["chromecast_version"] = chromecast_version
@ -263,7 +281,7 @@ class MainHandler(tornado.web.RequestHandler):
self.set_status(404) self.set_status(404)
self.write("HTML template missing.") self.write("HTML template missing.")
def handle_stream(self, handler): async def handle_stream(self, handler):
upstream = None upstream = None
if handler.proxy is not None: if handler.proxy is not None:
try: try:
@ -279,16 +297,16 @@ class MainHandler(tornado.web.RequestHandler):
upstream = stream.url upstream = stream.url
break break
else: else:
logger.info(f'invalid provider ({provider})') logger.info(f'invalid provider ({handler.provider})')
self.set_status(404) self.set_status(404)
self.write("Stream not found. (invalid provider)") self.write("Stream not found. (invalid provider)")
return return
if upstream is None: if upstream is None:
logger.info(f'invalid upstream ({provider})') logger.info(f'invalid upstream ({handler.provider})')
self.set_status(404) self.set_status(404)
self.write("Stream not found. (invalid upstream)") self.write("Stream not found. (invalid upstream)")
else: else:
ctype = upstream_type(upstream, proxy) ctype = await handler.proxy.content_type(upstream)
data = None data = None
if "mpegurl" in ctype.lower(): if "mpegurl" in ctype.lower():
data = rewrite(upstream, provider, proxy) data = rewrite(upstream, provider, proxy)
@ -312,10 +330,10 @@ class MainHandler(tornado.web.RequestHandler):
else: else:
self.set_header("Content-Type", "application/vnd.apple.mpegurl") self.set_header("Content-Type", "application/vnd.apple.mpegurl")
self.write(data) self.write(data)
def get(self): async def get(self):
self.handle_any() await self.handle_any()
def head(self): async def head(self):
self.handle_any() await self.handle_any()
class FileHandler(tornado.web.RequestHandler): class FileHandler(tornado.web.RequestHandler):
def get(self): def get(self):