use pydantic for proxy communication
All checks were successful
continuous-integration/drone Build is passing

This commit is contained in:
Roy Olav Purser 2022-02-12 12:23:37 +01:00
parent e2107c66eb
commit 6cf319940f
Signed by: roypur
GPG Key ID: E14D26A036F21656

View File

@ -4,7 +4,10 @@ import os
import pprint
import re
import sys
from typing import Literal, Optional, cast
import pydantic
import aiohttp
import streamlink
from tornado import template
@ -38,6 +41,22 @@ stream_server = os.environ.get("STREAM_SERVER")
proxy_server = os.environ.get("PROXY_SERVER")
class ProxyCreateLink(pydantic.BaseModel):
upstream: pydantic.HttpUrl
ctype: Optional[pydantic.StrictStr]
region: pydantic.StrictStr
class ProxyRequest(pydantic.BaseModel):
action: Literal["create-url", "read-config"]
urls: Optional[list[ProxyCreateLink]]
class ProxyResponse(pydantic.BaseModel):
action: Literal["create-url", "read-config"]
urls: Optional[list[pydantic.HttpUrl]]
class ProxyElem:
def __init__(self, proxy, region):
self.proxy = proxy
@ -52,45 +71,42 @@ class ProxyElem:
async def proxy_url(self, urls):
clean_urls = []
link_requests: list[ProxyCreateLink] = []
for url in urls:
if isinstance(url, tuple):
clean_urls.append(url[0])
link_requests.append(ProxyCreateLink.parse_obj({"upstream": url[0], "ctype": url[1], "region": self.region}))
else:
clean_urls.append(url)
link_requests.append(ProxyCreateLink.parse_obj({"upstream": url, "region": self.region}))
if not isinstance(proxy_server, str):
return clean_urls
jdata = None
data_list = []
for url in urls:
data = {}
if isinstance(url, str):
data["upstream"] = url
elif isinstance(url, tuple):
if isinstance(url[0], str):
data["upstream"] = url[0]
if isinstance(url[1], str):
data["ctype"] = url[1]
response_data: ProxyRequest
response_data: ProxyResponse
try:
request_data = ProxyRequest.parse_obj({"urls": link_requests, "action": "create-url"})
except pydantic.ValidationError as e:
logger.info(e)
return clean_urls
data["region"] = self.region
data_list.append(data)
try:
async with self.local() as session:
resp = await session.post(proxy_server, json=data_list)
text = await resp.text()
jdata = json.loads(text)
resp = await session.post(proxy_server, json=request_data)
response_data = cast(ProxyResponse, ProxyResponse.parse_obj(await resp.text()))
except Exception as e:
logger.info(e)
if isinstance(jdata, list):
ret_data = []
for src, dst in zip(clean_urls, jdata):
if isinstance(src, str):
ret_data.append(dst)
else:
ret_data.append(None)
return ret_data
else:
return clean_urls
ret_data = []
if response_data.urls is not None:
for src, dst in zip(clean_urls, response_data.urls):
if isinstance(src, str):
ret_data.append(dst)
else:
ret_data.append(None)
return ret_data
return clean_urls
proxies: dict[str, list[ProxyElem]] = {}