当前位置: 首页 > 工具软件 > Tenacity > 使用案例 >

python之Tenacity重试库的使用

曹高阳
2023-12-01

Tenacity 是一个用于 Python 的重试库,它提供了一种简单的方法来在出现错误时自动重试代码,以便增加代码的健壮性和可靠性。下面是使用 Tenacity 的一些基本步骤:

安装 Tenacity

pip install tenacity

导入 Tenacity 模块

from tenacity import retry, stop_after_attempt, wait_fixed

为需要重试的函数添加装饰器

@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
def my_function():
    # 函数体

在上面的代码中,retry装饰器告诉 Tenacity 在出现异常时重试该函数。stop_after_attempt(3) 意味着在第 3 次尝试后停止重试,wait_fixed(1) 表示在每次重试之间等待 1 秒钟。

调用重试函数

my_function()

当 my_function 函数在前三次尝试都失败后,Tenacity 将抛出 tenacity.RetryError 异常。

常用的重试方法

在 Tenacity 中,有几种不同的重试策略可以使用,以下是常用的几种重试方法:

  1. stop_after_attempt(attempts)
    在给定的尝试次数之后停止重试。

  2. stop_after_delay(delay)
    在给定的时间后停止重试。

  3. stop_never()
    从不停止重试,这是默认的策略。

  4. wait_fixed(interval)
    在每次重试之间等待固定的时间间隔。

  5. wait_random(min_wait, max_wait)
    在每次重试之间等待介于最小等待时间和最大等待时间之间的随机时间。

  6. wait_exponential(multiplier, max_wait=None)
    在每次重试之间等待指数级增长的时间间隔。重试间隔将为 multiplier ** n,其中 n 是重试次数。

  7. retry_if_exception_type(*exception_types)
    当捕获指定的异常类型时,才进行重试。

  8. retry_if_result(predicate)
    当指定的函数谓词返回 True 时,才进行重试。predicate 应该是一个接受一个参数的函数,该参数为函数的返回值。

这些重试策略可以组合在一起使用,以实现更复杂的重试逻辑。例如,可以使用wait_randomretry_if_exception_type来创建一个随机化的重试机制,以避免在重试时出现时间同步问题。

Tenacity 示例

下面是使用 Tenacity 的几个示例,涵盖了不同的重试策略:

  1. 使用stop_after_attempt策略,在尝试 3 次之后停止重试:
from tenacity import retry, stop_after_attempt

@retry(stop=stop_after_attempt(3))
def my_function():
    # 函数体

  1. 使用wait_fixed策略,在每次重试之间等待 1 秒钟:
from tenacity import retry, stop_after_attempt, wait_fixed

@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
def my_function():
    # 函数体

  1. 使用wait_random策略,在每次重试之间等待介于 1 秒钟和 3 秒钟之间的随机时间:
from tenacity import retry, stop_after_attempt, wait_random

@retry(stop=stop_after_attempt(3), wait=wait_random(1, 3))
def my_function():
    # 函数体

  1. 使用retry_if_exception_type策略,在出现 IOError 或 OSError 异常时重试:
from tenacity import retry, retry_if_exception_type

@retry(retry=retry_if_exception_type((IOError, OSError)))
def my_function():
    # 函数体

  1. 使用wait_exponential策略,在每次重试之间等待指数级增长的时间间隔:
from tenacity import retry, wait_exponential

@retry(wait=wait_exponential(multiplier=1, max_wait=60))
def my_function():
    # 函数体

在这个示例中,my_function将会在第一次重试时等待 1 秒钟,然后在第二次重试时等待 2 秒钟,第三次重试时等待 4 秒钟,以此类推,直到最大等待时间为 60 秒。可以通过调整 multiplier 参数来控制每次等待的增长速度。通常,建议将 multiplier 设置为1,这将使每次等待时间加倍。如果您想使等待时间增长得更快,可以将 multiplier 设置为一个大于 1 的值。

  1. 使用stop_after_delay策略的示例,该策略会在指定的时间内停止重试:
from tenacity import retry, stop_after_delay

@retry(stop=stop_after_delay(30)) # 在 30 秒之后停止重试
def my_function():
    # 函数体

  1. 如果您不指定任何 retry 的参数,则 Tenacity 将使用默认的策略,该策略永远不会停止重试:
from tenacity import retry, stop_never

@retry # 永远不会停止重试
def my_function():
    # 函数体

在这个示例中,my_function 将会无限重试,直到成功。这通常不是一个好的策略,因为如果函数出现无法恢复的错误,它将永远重试。但是,在某些情况下,这可能是需要的,例如当函数执行时间非常短且您想最大化成功的机会时。请注意,当您使用 stop_never 策略时,Tenacity 将无限重试,直到您在程序中使用 KeyboardInterrupt 或其他方法明确停止它。因此,在使用此策略时,请确保在代码中包含一个明确的停止条件。

KeyboardInterrupt 是 Python 中的一个异常类型,当用户按下 Ctrl+C 键时,Python 解释器会引发此异常。它通常用于中断正在运行的程序,以便用户可以更改程序的行为或停止程序。

在使用 Tenacity 时,您可以捕获 KeyboardInterrupt 异常以便在需要时优雅地停止重试操作。例如,下面的代码示例演示了如何捕获 KeyboardInterrupt 异常并在重试之间等待 1 秒钟:

from tenacity import retry, wait_fixed
import time

@retry(wait=wait_fixed(1))
def my_function():
    try:
        # 函数体
        pass
    except KeyboardInterrupt:
        print("用户中断时停止...")

在这个示例中,my_function将捕获 KeyboardInterrupt 异常,并在用户按下 Ctrl+C 时打印一条消息。捕获异常可以确保程序在退出时优雅地处理资源并停止重试操作。

这些示例只是 Tenacity 可以实现的不同重试策略的一部分,可以根据具体的需求进行组合使用。


  • 博客主页:https://blog.csdn.net/qq233325332
  • 欢迎点赞  收藏 ⭐留言  如有错误敬请指正!
  • 本文由 陌北V1 原创,首发于 CSDN博客
  • 停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
 类似资料: