当前位置: 首页 > 工具软件 > Tenacity > 使用案例 >

Python重试机制模块:tenacity

阎京
2023-12-01

在日常的工作中,可能由于网络波动原因,导致请求失败,下载超时,又或者是IO阻塞,导致不能及时响应。这时候就需要对功能添加重试代码,这里介绍一个python的第三方库:tenacity

安装方法:

pip3 install tenacity

官方详细文档:https://pypi.org/project/retry/

示例:

1.  最简单的重试(无限重试)

from tenacity import retry

@retry
def test_retry():
    print("重试无间隔执行...")
    raise Exception

test_retry()

装饰器@retry的常用参数说明:

@retry(
    wait=wait_fixed(3),  # 等待3秒后重试
    stop=stop_after_attempt(3),  # 只重试3次
    stop=stop_after_delay(3),  # 重试3秒后不再重试
    retry=retry_if_exception_type(exceptions_object),  # 捕获到指定异常后才重试
    retry=retry_if_result(func),  # 满足自定义条件时,进行重试,func为函数名,返回true时重试
    reraise=True,  # 重试后抛出原来的错误
    retry_error_callback=callback_func  # 最后一次重试失败,则调用回调函数    

)

2. 等待3s后进行重试

from tenacity import retry, wait_fixed

@retry(wait=wait_fixed(3))
def test_retry():
    print("等待重试...")
    raise Exception

test_retry()

3.  设置重试停止的条件

3.1 只重试3次就停止重试

from tenacity import retry, stop_after_attempt

@retry(stop=stop_after_attempt(3))
def test_retry():
    print("等待重试...")
    raise Exception

test_retry()

3.2 重试3秒后不再重试

from tenacity import retry, stop_after_delay

@retry(stop=stop_after_delay(3))
def test_retry():
    print("等待重试...")
    raise Exception

test_retry()

3.3 满足3.1或3.2其中一个条件时,不再重试

from tenacity import retry, stop_after_delay, stop_after_attempt

@retry(stop=(stop_after_delay(3) | stop_after_attempt(3)))
def test_retry():
    print("等待重试...")
    raise Exception

test_retry()

4. 满足条件时,进行重试。

4.1 捕获到指定的异常时,进行重试

from requests import exceptions
from tenacity import retry, retry_if_exception_type

@retry(retry=retry_if_exception_type(exceptions.Timeout))
def test_retry():
    print("等待重试...")
    raise exceptions.Timeout

test_retry()

4.2 满足自定义的条件时,进行重试。当is_false函数返回true,则进行重试,参数value为函数test_retry的返回值。

from tenacity import retry, stop_after_attempt, retry_if_result

def is_false(value):
    return value is False

@retry(stop=stop_after_attempt(3),
       retry=retry_if_result(is_false))
def test_retry():
    return False

test_retry()

5. 重试后抛出准确的异常

最后一次重试抛出的异常是RetryError,不会是代码逻辑真正发生的异常,想要抛出真正的异常,则设置reraise=True

from tenacity import retry, stop_after_attempt

@retry(stop=stop_after_attempt(7), reraise=True)
def test_retry():
    print("等待重试...")
    raise Exception

test_retry()

6. 最后一个重试失败后调用回调函数。

from tenacity import *

def return_last_value(retry_state):
    print("执行回调函数")
    return retry_state.outcome.result()  # 表示返回原函数的返回值

def is_false(value):
    return value is False

@retry(stop=stop_after_attempt(3),
       retry_error_callback=return_last_value,
       retry=retry_if_result(is_false))
def test_retry():
    print("等待重试中...")
    return False

print(test_retry())

参考:https://zhuanlan.zhihu.com/p/281555097

 类似资料: