当前位置: 首页 > 工具软件 > Tenacity > 使用案例 >

tenacity.retry Python中一个专门用来重试的库

曹鹏海
2023-12-01

tenacity.retry Python中一个专门用来重试的库

一、背景:

很多时候,我们都喜欢为代码加入retry功能。比如oauth验证,有时候网络不太灵,我们希望多试几次。

这些retry应用的场景看起来不同,其实又很类似。都是判断代码是否正常运行,如果不是则重新开始。

那么,有没有一种通用的办法来实现呢?

二、简介:

Tenacity1是一个通用的retry库,简化为任何任务加入重试的功能。

它还包含如下特性:

  • 通用的装饰器API
  • 可以设定重试停止的条件(比如设定尝试次数)
  • 可以设定重试间的等待时间(比如在尝试之间使用幂数级增长的wait等待)
  • 自定义在哪些Exception进行重试
  • 自定义在哪些返回值的情况进行重试
  • 协程的重试

三、用法

1、参数讲解

① 不要参数会一直重试,也没有时间等待
② stop:重试停止方式。
③ wait:每次重试间隔时间。
④ retry:什么样的情况下才重试(可以指定特定的错误)
⑤ before:重试前记日志
⑥ after:重试后记日志

2、实列

① 重试次数与等待时间
from tenacity import *

#基础的用法,会一直重试下去,直到函数没有抛出异常,正常返回值
@retry
def never_give_up_never_surrender():
    print("一直重试,忽略exceptions,重试间没有等待时间")
    raise Exception
    
#何时停止
#例如,在达到尝试次数后停下来:
@retry(stop=stop_after_attempt(7))
def stop_after_7_attempts():
    print("尝试7次后停下")
    raise Exception
    
#在10秒后,如果仍然没有成功,则停下:
@retry(stop=stop_after_delay(10))
def stop_after_10_s():
    print("10秒后停止")
    raise Exception
    
#可以使用|操作符,来组合多种条件:
@retry(stop=(stop_after_delay(10) | stop_after_attempt(5)))
def stop_after_10_s_or_5_retries():
    print("10秒后,或者尝试5次后,停下来")
    raise Exception
    
#尝试间的等待
@retry(wait=wait_fixed(2))
def wait_2_s():
    print("每次重试间都有2秒间隔")
    raise Exception

#间隔可以是随机的
@retry(wait=wait_random(min=1, max=2))
def wait_random_1_to_2_s():
    print("重试间隔1-2秒")
    raise Exception
    
#还可以加入指数曲线形式的间隔:
@retry(wait=wait_exponential(multiplier=1, min=4, max=10))
def wait_exponential_1():
    print("开始的时候等待 2^x * 1 秒,最少等待4秒,最多10秒,之后都是等待10秒")
    raise Exception

#多核在竞争一个共享的资源,使用指数间隔可以将冲突最小化
@retry(wait=wait_random_exponential(multiplier=1, max=60))
def wait_exponential_jitter():
    print("随机等待 2^x * 1 秒,最多60秒,之后都是等待60秒")
    raise Exception
    
#可以自定义每次等待时长:
@retry(wait=wait_chain(*[wait_fixed(3) for i in range(3)] +
                           [wait_fixed(7) for i in range(2)] +
                           [wait_fixed(9)]))
def wait_fixed_chained():
    print("前三次等待3秒,后两次等待7秒,最后一次等待9秒")
    raise Exception

② 特定情况重试与日志记录

#何时retry
#默认情况下,只有函数抛出异常时才会retry。
#你可以设置在制定的异常才进行retry

@retry(retry=retry_if_exception_type(IOError))
def might_io_error():
   print("只有在IOError的时候进行retry,其它时候照常抛出错误")
   raise Exception
#可以在判断返回值是否是需要的情况下进行retry:

def is_none_p(value):
       return value is None

@retry(retry=retry_if_result(is_none_p))
def might_return_none():
   print("因为返回值是None,所以这个函数会一直retry")
       
#这样写也是可以的,不用修改原来的代码
retry_version_func = retry(retry=retry_if_result(is_none_p))(might_return_none)    

#当然,这里也可以组合多个条件:
def is_none_p(value):
   return value is None

@retry(retry=(retry_if_result(is_none_p) | retry_if_exception_type()))
def might_return_none():
   print("在抛出任何异常,或者返回值是None的情况下,进行retry")
#其它
#在函数体内,你可以手动抛出TryAgain错误,进行重试:
@retry
def do_something():
  result = something_else()
  if result == 23:
     raise TryAgain

#通过参数reraise=True,可以抛出函数最后一次抛出的异常。如果没有设定,会抛出RetryError:

@retry(reraise=True, stop=stop_after_attempt(3))
def raise_my_exception():
   raise MyException("Fail")

try:
   raise_my_exception()
except MyException:
   print('MyException会被抛出')

#在重试的前后,记录日志
import logging
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
logger = logging.getLogger(__name__)

#重试前记录
@retry(stop=stop_after_attempt(3), before=before_log(logger, logging.DEBUG))
def raise_my_exception():
   raise MyException("Fail")
   
#重试后记录
@retry(stop=stop_after_attempt(3), after=after_log(logger, logging.DEBUG))
def raise_my_exception():
   raise MyException("Fail")

#你可以获取retry的相关统计数据:
@retry(stop=stop_after_attempt(3))
def raise_my_exception():
   raise MyException("Fail")

try:
   raise_my_exception()
except Exception:
   pass

我自己代码中用到了: OperationalError\InternalError这两种错误下才会重试,重试3次,每集间隔20s

    @retry(stop=stop_after_attempt(3), wait=wait_fixed(20), retry=(retry_if_exception_type(OperationalError)|retry_if_exception_type(InternalError)),reraise=True)
    def sql_to_many(self, *args):
        """
        批量修改和存储 支支持update\insert
        :param args: args[0]:数据库, args[1]:sql, args[2]:data [tuple,tuple]
        :return:
        """
        self.connect_(args[0])
        self.executemany_sql(args[1], args[2])
        self.close_()

以上就是 Python retry库的介绍了,用起来真的很方便哦~

 类似资料: