当前位置: 首页 > 面试题库 >

如何在Python中的函数中添加超时

佟阳飙
2023-03-14
问题内容

过去,已经进行了许多尝试以在Python中添加超时功能,以便在指定的时间限制到期时,等待的代码可以继续运行。不幸的是,以前的配方要么允许正在运行的功能继续运行并消耗资源,要么使用特定于平台的线程终止方法终止该功能。该Wiki的目的是针对这个问题开发跨平台的答案,许多程序员必须针对各种编程项目解决该问题。

#! /usr/bin/env python
"""Provide way to add timeout specifications to arbitrary functions.

There are many ways to add a timeout to a function, but no solution
is both cross-platform and capable of terminating the procedure. This
module use the multiprocessing module to solve both of those problems."""

################################################################################

__author__ = 'Stephen "Zero" Chappell <Noctis.Skytower@gmail.com>'
__date__ = '11 February 2010'
__version__ = '$Revision: 3 $'

################################################################################

import inspect
import sys
import time
import multiprocessing

################################################################################

def add_timeout(function, limit=60):
    """Add a timeout parameter to a function and return it.

    It is illegal to pass anything other than a function as the first
    parameter. If the limit is not given, it gets a default value equal
    to one minute. The function is wrapped and returned to the caller."""
    assert inspect.isfunction(function)
    if limit <= 0:
        raise ValueError()
    return _Timeout(function, limit)

class NotReadyError(Exception): pass

################################################################################

def _target(queue, function, *args, **kwargs):
    """Run a function with arguments and return output via a queue.

    This is a helper function for the Process created in _Timeout. It runs
    the function with positional arguments and keyword arguments and then
    returns the function's output by way of a queue. If an exception gets
    raised, it is returned to _Timeout to be raised by the value property."""
    try:
        queue.put((True, function(*args, **kwargs)))
    except:
        queue.put((False, sys.exc_info()[1]))

class _Timeout:

    """Wrap a function and add a timeout (limit) attribute to it.

    Instances of this class are automatically generated by the add_timeout
    function defined above. Wrapping a function allows asynchronous calls
    to be made and termination of execution after a timeout has passed."""

    def __init__(self, function, limit):
        """Initialize instance in preparation for being called."""
        self.__limit = limit
        self.__function = function
        self.__timeout = time.clock()
        self.__process = multiprocessing.Process()
        self.__queue = multiprocessing.Queue()

    def __call__(self, *args, **kwargs):
        """Execute the embedded function object asynchronously.

        The function given to the constructor is transparently called and
        requires that "ready" be intermittently polled. If and when it is
        True, the "value" property may then be checked for returned data."""
        self.cancel()
        self.__queue = multiprocessing.Queue(1)
        args = (self.__queue, self.__function) + args
        self.__process = multiprocessing.Process(target=_target,
                                                 args=args,
                                                 kwargs=kwargs)
        self.__process.daemon = True
        self.__process.start()
        self.__timeout = self.__limit + time.clock()

    def cancel(self):
        """Terminate any possible execution of the embedded function."""
        if self.__process.is_alive():
            self.__process.terminate()

    @property
    def ready(self):
        """Read-only property indicating status of "value" property."""
        if self.__queue.full():
            return True
        elif not self.__queue.empty():
            return True
        elif self.__timeout < time.clock():
            self.cancel()
        else:
            return False

    @property
    def value(self):
        """Read-only property containing data returned from function."""
        if self.ready is True:
            flag, load = self.__queue.get()
            if flag:
                return load
            raise load
        raise NotReadyError()

    def __get_limit(self):
        return self.__limit

    def __set_limit(self, value):
        if value <= 0:
            raise ValueError()
        self.__limit = value

    limit = property(__get_limit, __set_limit,
                     doc="Property for controlling the value of the timeout.")

编辑: 这段代码是为Python 3.x编写的,并非为装饰类方法而设计。该multiprocessing模块并非旨在跨流程边界修改类实例。


问题答案:

这个问题是在9年前提出的,从那以后,Python和我的经验清单都发生了很大变化。在查看了标准库中的其他API并希望部分复制其中一个API之后,编写了follow模块以达到与问题中发布的API相似的目的。

异步

#! /usr/bin/env python3
import _thread
import abc as _abc
import collections as _collections
import enum as _enum
import math as _math
import multiprocessing as _multiprocessing
import operator as _operator
import queue as _queue
import signal as _signal
import sys as _sys
import time as _time

__all__ = (
    'Executor',
    'get_timeout',
    'set_timeout',
    'submit',
    'map_',
    'shutdown'
)


class _Base(metaclass=_abc.ABCMeta):
    __slots__ = (
        '__timeout',
    )

    @_abc.abstractmethod
    def __init__(self, timeout):
        self.timeout = _math.inf if timeout is None else timeout

    def get_timeout(self):
        return self.__timeout

    def set_timeout(self, value):
        if not isinstance(value, (float, int)):
            raise TypeError('value must be of type float or int')
        if value <= 0:
            raise ValueError('value must be greater than zero')
        self.__timeout = value

    timeout = property(get_timeout, set_timeout)


def _run_and_catch(fn, args, kwargs):
    # noinspection PyPep8,PyBroadException
    try:
        return False, fn(*args, **kwargs)
    except:
        return True, _sys.exc_info()[1]


def _run(fn, args, kwargs, queue):
    queue.put_nowait(_run_and_catch(fn, args, kwargs))


class _State(_enum.IntEnum):
    PENDING = _enum.auto()
    RUNNING = _enum.auto()
    CANCELLED = _enum.auto()
    FINISHED = _enum.auto()
    ERROR = _enum.auto()


def _run_and_catch_loop(iterable, *args, **kwargs):
    exception = None
    for fn in iterable:
        error, value = _run_and_catch(fn, args, kwargs)
        if error:
            exception = value
    if exception:
        raise exception


class _Future(_Base):
    __slots__ = (
        '__queue',
        '__process',
        '__start_time',
        '__callbacks',
        '__result',
        '__mutex'
    )

    def __init__(self, timeout, fn, args, kwargs):
        super().__init__(timeout)
        self.__queue = _multiprocessing.Queue(1)
        self.__process = _multiprocessing.Process(
            target=_run,
            args=(fn, args, kwargs, self.__queue),
            daemon=True
        )
        self.__start_time = _math.inf
        self.__callbacks = _collections.deque()
        self.__result = True, TimeoutError()
        self.__mutex = _thread.allocate_lock()

    @property
    def __state(self):
        pid, exitcode = self.__process.pid, self.__process.exitcode
        return (_State.PENDING if pid is None else
                _State.RUNNING if exitcode is None else
                _State.CANCELLED if exitcode == -_signal.SIGTERM else
                _State.FINISHED if exitcode == 0 else
                _State.ERROR)

    def __repr__(self):
        root = f'{type(self).__name__} at {id(self)} state={self.__state.name}'
        if self.__state < _State.CANCELLED:
            return f'<{root}>'
        error, value = self.__result
        suffix = f'{"raised" if error else "returned"} {type(value).__name__}'
        return f'<{root} {suffix}>'

    def __consume_callbacks(self):
        while self.__callbacks:
            yield self.__callbacks.popleft()

    def __invoke_callbacks(self):
        self.__process.join()
        _run_and_catch_loop(self.__consume_callbacks(), self)

    def cancel(self):
        self.__process.terminate()
        self.__invoke_callbacks()

    def __auto_cancel(self):
        elapsed_time = _time.perf_counter() - self.__start_time
        if elapsed_time > self.timeout:
            self.cancel()
        return elapsed_time

    def cancelled(self):
        self.__auto_cancel()
        return self.__state is _State.CANCELLED

    def running(self):
        self.__auto_cancel()
        return self.__state is _State.RUNNING

    def done(self):
        self.__auto_cancel()
        return self.__state > _State.RUNNING

    def __handle_result(self, error, value):
        self.__result = error, value
        self.__invoke_callbacks()

    def __ensure_termination(self):
        with self.__mutex:
            elapsed_time = self.__auto_cancel()
            if not self.__queue.empty():
                self.__handle_result(*self.__queue.get_nowait())
            elif self.__state < _State.CANCELLED:
                remaining_time = self.timeout - elapsed_time
                if remaining_time == _math.inf:
                    remaining_time = None
                try:
                    result = self.__queue.get(True, remaining_time)
                except _queue.Empty:
                    self.cancel()
                else:
                    self.__handle_result(*result)

    def result(self):
        self.__ensure_termination()
        error, value = self.__result
        if error:
            raise value
        return value

    def exception(self):
        self.__ensure_termination()
        error, value = self.__result
        if error:
            return value

    def add_done_callback(self, fn):
        if self.done():
            fn(self)
        else:
            self.__callbacks.append(fn)

    def _set_running_or_notify_cancel(self):
        if self.__state is _State.PENDING:
            self.__process.start()
            self.__start_time = _time.perf_counter()
        else:
            self.cancel()


class Executor(_Base):
    __slots__ = (
        '__futures',
    )

    def __init__(self, timeout=None):
        super().__init__(timeout)
        self.__futures = set()

    def submit(self, fn, *args, **kwargs):
        future = _Future(self.timeout, fn, args, kwargs)
        self.__futures.add(future)
        future.add_done_callback(self.__futures.remove)
        # noinspection PyProtectedMember
        future._set_running_or_notify_cancel()
        return future

    @staticmethod
    def __cancel_futures(iterable):
        _run_and_catch_loop(map(_operator.attrgetter('cancel'), iterable))

    def map(self, fn, *iterables):
        futures = tuple(self.submit(fn, *args) for args in zip(*iterables))

        def result_iterator():
            future_iterator = iter(futures)
            try:
                for future in future_iterator:
                    yield future.result()
            finally:
                self.__cancel_futures(future_iterator)

        return result_iterator()

    def shutdown(self):
        self.__cancel_futures(frozenset(self.__futures))

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown()
        return False


_executor = Executor()
get_timeout = _executor.get_timeout
set_timeout = _executor.set_timeout
submit = _executor.submit
map_ = _executor.map
shutdown = _executor.shutdown
del _executor


 类似资料:
  • 我从http://kristiannissen.wordpress.com/2010/07/08/mysql-levenshtein/(archive.org链接),但是如何在MySQL中添加该功能呢?我正在使用XAMPP,我需要它在PHP中进行搜索。

  • 问题内容: 我已经安装了python 3.1,我也将它添加到了系统路径中。现在,我可以打开“ cmd”并键入python来启动python,但是每当我尝试通过使用特定的目录(在Shift +右键单击->打开命令提示符)中打开cmd并键入python时,它说“找不到命令” “! 我怎样才能解决这个问题? 问题答案: 您需要在Windows中设置环境变量。

  • 问题内容: 在所有其他与super构造函数一起使用的语言中,都是隐式调用的。如何在Python中调用它?我希望这是行不通的。 问题答案: super()在新样式类中返回类似父对象的对象:

  • 我正在尝试使用python wget下载URL,下载地址为:https://pypi.python.org/pypi/wget 这个包不支持超时选项,因此查询失败大约需要一些时间(大约10秒)。是否可以在try块中添加超时以减少函数的等待时间。 大概是这样的:

  • 问题内容: 如何在SWT表列中添加超链接?我正在使用org.eclipse.swt.widgets.Table类。 没有使用TableViewer,JFace有什么方法吗? 我尝试过这种方式,但无法正常工作(不显示超链接)。 问题答案: 是的,那肯定是可能的。为此,您必须实现(也可能是和)。 如果使用正确的方法,则更容易…