【Python】concurrency best practice for HTTP (RESTful) API - 草稿 10%

柴阳云
2023-12-01

【Python】concurrency best practice

-- for HTTP (RESTful) API


concurrent.furture.ThreadPoolExecutor

N/A


die thread

detail: N/A

"""die_threads.py"""

from functools import wraps
from logging import getLogger

logger = getLogger(__name__)
THREAD_POISON: bool = False


def poisoning(msg):
  global THREAD_POISON
  logger.warning('\n%s: you got me', str(msg))
  THREAD_POISON = True
  logger.warning('done poisoning, THREAD_POISON = %s', str(THREAD_POISON))


class DieFromPOISON(Exception):
  pass


def die_from_poison(fn):
  @wraps(fn)
  def wrapper(*args, **kwargs):
    if THREAD_POISON:
      raise DieFromPOISON(*args, **kwargs)
    return fn(*args, **kwargs)
  return wrapper


reusable code - map

detail: N/A

"""concurrency.py"""

from logging import getLogger
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

from .die_threads import poisoning
from .die_threads import die_from_poison

logger = getLogger(__name__)
DEFAULT_MAX_CUR = 5


def concurrency(fn, *iterables, max_cur: int = DEFAULT_MAX_CUR) -> tuple:
  successes, failures = list(), list()
  fn = die_from_poison(fn)  # TODO: check has been wrappered or not
  with ThreadPoolExecutor(max_workers=max_cur) as executor:
    to_do_map = {executor.submit(fn, *args): i
                 for i, args in enumerate(zip(*iterables))}
    done_iter = as_completed(to_do_map)
    # TODO: feature: processing bar.
    try:
      for future in done_iter:
        try:
          successes.append((to_do_map[future], future.result()))
        except Exception as err:
          failures.append((to_do_map[future], err))
    except KeyboardInterrupt as e:
      poisoning(e)
      logger.warning('waiting started threads finish...')
  return successes, failures


Usage Examples - N/A


Reference

  • die thread - threado
 类似资料: