-- for HTTP (RESTful) API
N/A
detail: N/A
"""die_threads.py"""
from functools import wraps
from logging import getLogger
logger = getLogger(__name__)
THREAD_POISON: bool = False
def poisoning(msg):
global THREAD_POISON
logger.warning('\n%s: you got me', str(msg))
THREAD_POISON = True
logger.warning('done poisoning, THREAD_POISON = %s', str(THREAD_POISON))
class DieFromPOISON(Exception):
pass
def die_from_poison(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
if THREAD_POISON:
raise DieFromPOISON(*args, **kwargs)
return fn(*args, **kwargs)
return wrapper
detail: N/A
"""concurrency.py"""
from logging import getLogger
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from .die_threads import poisoning
from .die_threads import die_from_poison
logger = getLogger(__name__)
DEFAULT_MAX_CUR = 5
def concurrency(fn, *iterables, max_cur: int = DEFAULT_MAX_CUR) -> tuple:
successes, failures = list(), list()
fn = die_from_poison(fn) # TODO: check has been wrappered or not
with ThreadPoolExecutor(max_workers=max_cur) as executor:
to_do_map = {executor.submit(fn, *args): i
for i, args in enumerate(zip(*iterables))}
done_iter = as_completed(to_do_map)
# TODO: feature: processing bar.
try:
for future in done_iter:
try:
successes.append((to_do_map[future], future.result()))
except Exception as err:
failures.append((to_do_map[future], err))
except KeyboardInterrupt as e:
poisoning(e)
logger.warning('waiting started threads finish...')
return successes, failures