很多时候,我们都喜欢为代码加入retry功能。比如oauth验证,有时候网络不太灵,我们希望多试几次。
这些retry应用的场景看起来不同,其实又很类似。都是判断代码是否正常运行,如果不是则重新开始。
那么,有没有一种通用的办法来实现呢?
Tenacity1是一个通用的retry库,简化为任何任务加入重试的功能。
它还包含如下特性:
① 不要参数会一直重试,也没有时间等待
② stop:重试停止方式。
③ wait:每次重试间隔时间。
④ retry:什么样的情况下才重试(可以指定特定的错误)
⑤ before:重试前记日志
⑥ after:重试后记日志
from tenacity import *
#基础的用法,会一直重试下去,直到函数没有抛出异常,正常返回值
@retry
def never_give_up_never_surrender():
print("一直重试,忽略exceptions,重试间没有等待时间")
raise Exception
#何时停止
#例如,在达到尝试次数后停下来:
@retry(stop=stop_after_attempt(7))
def stop_after_7_attempts():
print("尝试7次后停下")
raise Exception
#在10秒后,如果仍然没有成功,则停下:
@retry(stop=stop_after_delay(10))
def stop_after_10_s():
print("10秒后停止")
raise Exception
#可以使用|操作符,来组合多种条件:
@retry(stop=(stop_after_delay(10) | stop_after_attempt(5)))
def stop_after_10_s_or_5_retries():
print("10秒后,或者尝试5次后,停下来")
raise Exception
#尝试间的等待
@retry(wait=wait_fixed(2))
def wait_2_s():
print("每次重试间都有2秒间隔")
raise Exception
#间隔可以是随机的
@retry(wait=wait_random(min=1, max=2))
def wait_random_1_to_2_s():
print("重试间隔1-2秒")
raise Exception
#还可以加入指数曲线形式的间隔:
@retry(wait=wait_exponential(multiplier=1, min=4, max=10))
def wait_exponential_1():
print("开始的时候等待 2^x * 1 秒,最少等待4秒,最多10秒,之后都是等待10秒")
raise Exception
#多核在竞争一个共享的资源,使用指数间隔可以将冲突最小化
@retry(wait=wait_random_exponential(multiplier=1, max=60))
def wait_exponential_jitter():
print("随机等待 2^x * 1 秒,最多60秒,之后都是等待60秒")
raise Exception
#可以自定义每次等待时长:
@retry(wait=wait_chain(*[wait_fixed(3) for i in range(3)] +
[wait_fixed(7) for i in range(2)] +
[wait_fixed(9)]))
def wait_fixed_chained():
print("前三次等待3秒,后两次等待7秒,最后一次等待9秒")
raise Exception
#何时retry
#默认情况下,只有函数抛出异常时才会retry。
#你可以设置在制定的异常才进行retry
@retry(retry=retry_if_exception_type(IOError))
def might_io_error():
print("只有在IOError的时候进行retry,其它时候照常抛出错误")
raise Exception
#可以在判断返回值是否是需要的情况下进行retry:
def is_none_p(value):
return value is None
@retry(retry=retry_if_result(is_none_p))
def might_return_none():
print("因为返回值是None,所以这个函数会一直retry")
#这样写也是可以的,不用修改原来的代码
retry_version_func = retry(retry=retry_if_result(is_none_p))(might_return_none)
#当然,这里也可以组合多个条件:
def is_none_p(value):
return value is None
@retry(retry=(retry_if_result(is_none_p) | retry_if_exception_type()))
def might_return_none():
print("在抛出任何异常,或者返回值是None的情况下,进行retry")
#其它
#在函数体内,你可以手动抛出TryAgain错误,进行重试:
@retry
def do_something():
result = something_else()
if result == 23:
raise TryAgain
#通过参数reraise=True,可以抛出函数最后一次抛出的异常。如果没有设定,会抛出RetryError:
@retry(reraise=True, stop=stop_after_attempt(3))
def raise_my_exception():
raise MyException("Fail")
try:
raise_my_exception()
except MyException:
print('MyException会被抛出')
#在重试的前后,记录日志
import logging
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
logger = logging.getLogger(__name__)
#重试前记录
@retry(stop=stop_after_attempt(3), before=before_log(logger, logging.DEBUG))
def raise_my_exception():
raise MyException("Fail")
#重试后记录
@retry(stop=stop_after_attempt(3), after=after_log(logger, logging.DEBUG))
def raise_my_exception():
raise MyException("Fail")
#你可以获取retry的相关统计数据:
@retry(stop=stop_after_attempt(3))
def raise_my_exception():
raise MyException("Fail")
try:
raise_my_exception()
except Exception:
pass
我自己代码中用到了: OperationalError\InternalError这两种错误下才会重试,重试3次,每集间隔20s
@retry(stop=stop_after_attempt(3), wait=wait_fixed(20), retry=(retry_if_exception_type(OperationalError)|retry_if_exception_type(InternalError)),reraise=True)
def sql_to_many(self, *args):
"""
批量修改和存储 支支持update\insert
:param args: args[0]:数据库, args[1]:sql, args[2]:data [tuple,tuple]
:return:
"""
self.connect_(args[0])
self.executemany_sql(args[1], args[2])
self.close_()