Performance and stability improvements.

This commit is contained in:
Aaron Heise 2023-02-12 22:34:36 -06:00
parent 5e755acad4
commit d76b36cd63
2 changed files with 31 additions and 30 deletions

View File

@ -264,40 +264,41 @@ class AggregateException(Exception):
def __str__(self):
return "Multiple exceptions encountered: \n\n" + "\n\n".join(map(lambda e: str(e), self.inner_exceptions))
async def event_wait_any(evts: [asyncio.Event], timeout: float = None) -> (any, any):
tasks = list(map(lambda evt: (evt, _task_from_event(evt)), evts))
# try:
finished, unfinished = await asyncio.wait(map(lambda t: t[1], tasks),
timeout=timeout,
return_when=asyncio.FIRST_COMPLETED)
try:
finished, unfinished = await asyncio.wait(map(lambda t: t[1], tasks),
timeout=timeout,
return_when=asyncio.FIRST_COMPLETED)
if len(unfinished) > 0:
for task in unfinished:
task.cancel()
await asyncio.wait(unfinished)
if len(unfinished) > 0:
for task in unfinished:
task.cancel()
await asyncio.wait(unfinished)
# exceptions = []
#
# for f in finished:
# ex = f.exception()
# if ex and not isinstance(ex, asyncio.CancelledError) and not isinstance(ex, TimeoutError):
# exceptions.append(ex)
#
# if len(exceptions) > 0:
# raise AggregateException(exceptions)
exceptions = []
return next(map(lambda t: next(map(lambda tt: tt[0], tasks)), finished), None)
# finally:
# unfinished = []
# for task in map(lambda t: t[1], tasks):
# if task.done():
# if not task.cancelled():
# task.exception()
# else:
# task.cancel()
# unfinished.append(task)
# if len(unfinished) > 0:
# await asyncio.wait(unfinished)
for f in finished:
ex = f.exception()
if ex and not isinstance(ex, asyncio.CancelledError) and not isinstance(ex, TimeoutError):
exceptions.append(ex)
if len(exceptions) > 0:
raise AggregateException(exceptions)
return next(map(lambda t: next(map(lambda tt: tt[0], tasks)), finished), None)
finally:
unfinished = []
for task in map(lambda t: t[1], tasks):
if task.done():
if not task.cancelled():
task.exception()
else:
task.cancel()
unfinished.append(task)
if len(unfinished) > 0:
await asyncio.wait(unfinished)
async def event_wait(evt: asyncio.Event, timeout: float) -> bool:

View File

@ -104,7 +104,7 @@ async def test_echo_live():
@pytest.mark.asyncio
async def test_event_wait_any():
delay = 0.1
delay = 0.5
with multiprocessing.pool.ThreadPool() as pool:
loop = asyncio.get_running_loop()
evt1 = asyncio.Event()