pip3 install tenacity
官方详细文档:https://pypi.org/project/retry/
示例:
from tenacity import retry
@retry
def test_retry():
print("重试无间隔执行...")
raise Exception
test_retry()
装饰器@retry的常用参数说明:
@retry(
wait=wait_fixed(3), # 等待3秒后重试
stop=stop_after_attempt(3), # 只重试3次
stop=stop_after_delay(3), # 重试3秒后不再重试
retry=retry_if_exception_type(exceptions_object), # 捕获到指定异常后才重试
retry=retry_if_result(func), # 满足自定义条件时,进行重试,func为函数名,返回true时重试
reraise=True, # 重试后抛出原来的错误
retry_error_callback=callback_func # 最后一次重试失败,则调用回调函数
)
from tenacity import retry, wait_fixed
@retry(wait=wait_fixed(3))
def test_retry():
print("等待重试...")
raise Exception
test_retry()
3.1 只重试3次就停止重试
from tenacity import retry, stop_after_attempt
@retry(stop=stop_after_attempt(3))
def test_retry():
print("等待重试...")
raise Exception
test_retry()
3.2 重试3秒后不再重试
from tenacity import retry, stop_after_delay
@retry(stop=stop_after_delay(3))
def test_retry():
print("等待重试...")
raise Exception
test_retry()
3.3 满足3.1或3.2其中一个条件时,不再重试
from tenacity import retry, stop_after_delay, stop_after_attempt
@retry(stop=(stop_after_delay(3) | stop_after_attempt(3)))
def test_retry():
print("等待重试...")
raise Exception
test_retry()
4.1 捕获到指定的异常时,进行重试
from requests import exceptions
from tenacity import retry, retry_if_exception_type
@retry(retry=retry_if_exception_type(exceptions.Timeout))
def test_retry():
print("等待重试...")
raise exceptions.Timeout
test_retry()
4.2 满足自定义的条件时,进行重试。当is_false函数返回true,则进行重试,参数value为函数test_retry的返回值。
from tenacity import retry, stop_after_attempt, retry_if_result
def is_false(value):
return value is False
@retry(stop=stop_after_attempt(3),
retry=retry_if_result(is_false))
def test_retry():
return False
test_retry()
最后一次重试抛出的异常是RetryError,不会是代码逻辑真正发生的异常,想要抛出真正的异常,则设置reraise=True
from tenacity import retry, stop_after_attempt
@retry(stop=stop_after_attempt(7), reraise=True)
def test_retry():
print("等待重试...")
raise Exception
test_retry()
from tenacity import *
def return_last_value(retry_state):
print("执行回调函数")
return retry_state.outcome.result() # 表示返回原函数的返回值
def is_false(value):
return value is False
@retry(stop=stop_after_attempt(3),
retry_error_callback=return_last_value,
retry=retry_if_result(is_false))
def test_retry():
print("等待重试中...")
return False
print(test_retry())