as_completed

This commit is contained in:
Roy Olav Purser 2021-05-14 19:57:08 +02:00
parent b0b0055d75
commit 55de3febd9
Signed by: roypur
GPG Key ID: E14D26A036F21656

View File

@ -83,12 +83,19 @@ class ProxyElem():
else: else:
return data["upstream"] return data["upstream"]
class AsyncSession(): class AsyncSessionData():
def __init__(self, session, future, current_proxies, current_proxy): def __init__(self, resp, current_proxies, current_proxy):
self.session = session self.resp = resp
self.future = future
self.current_proxies = current_proxies self.current_proxies = current_proxies
self.current_proxy = current_proxy self.current_proxy = current_proxy
class AsyncSession():
def __init__(self, session, resp, current_proxies, current_proxy):
self.session = session
self.sdata = AsyncSessionData(resp, current_proxies, current_proxy)
async def task(self):
resp = await self.sdata.resp
await self.session.close()
return AsyncSessionData(resp, self.sdata.current_proxies, self.sdata.current_proxy)
proxies = {} proxies = {}
for key in providers: for key in providers:
@ -142,21 +149,23 @@ class UpstreamHandler():
proxy_list = proxy_list_orig.copy() proxy_list = proxy_list_orig.copy()
proxy_iter = proxy_list_orig.copy() proxy_iter = proxy_list_orig.copy()
if isinstance(proxy_list, list): if isinstance(proxy_list, list):
delays = [] futures = []
for i in proxy_iter: for i in proxy_iter:
current_list = proxy_list.copy() current_list = proxy_list.copy()
current = proxy_list.pop() current = proxy_list.pop()
proxy_list = [current] + proxy_list proxy_list = [current] + proxy_list
session = current.session() session = current.session()
task = asyncio.create_task(session.head(src)) future = AsyncSession(session, session.head(src), current_list, current)
delays.append(AsyncSession(session, task, current_list, current)) task = asyncio.create_task(future.task())
futures.append(task)
done = False done = False
for delay in delays: for future in asyncio.as_completed(futures):
if done: if done:
delay.future.cancel() future.cancel()
else: else:
try: try:
resp = await delay.future result = await future
resp = result.resp
except Exception as e: except Exception as e:
logger.info(e) logger.info(e)
else: else:
@ -168,13 +177,8 @@ class UpstreamHandler():
else: else:
self.upstream = new_url self.upstream = new_url
self.upstream_safe = urllib.parse.quote(new_url) self.upstream_safe = urllib.parse.quote(new_url)
self.proxy = delay.current_proxy self.proxy = result.current_proxy
proxies[self.provider] = delay.current_proxies proxies[self.provider] = result.current_proxies
for delay in delays:
try:
await delay.session.close()
except Exception as e:
logger.info(e)
async def meta(self): async def meta(self):
data = [] data = []
try: try: