Python中使用threading.Timer
执行定时任务时,执行任务是一次性的,类似于JS中的setTimeout
方法。我们对其在封装,改造成可循环的定时器,类似于JS中setInterval
方法的效果。
值得注意的是,threading.Timer
是非阻塞的,不同于使用time.sleep
实现的简单定时任务,而且重要的一点是threading.Timer
可以取消定时任务的执行。
本循环定时器只是简单的实现了单任务定时器,并提供了三个基本功能:执行任务、判断任务是否执行中、取消任务,对于多任务可以通过队列、字典等数据结构存放任务实现多任务的功能。
计时器通过函数递归调用?实测不会造成栈溢出,但随着任务循环执行,线程计数器不断递增。
废话不多说,直接上代码:
# -*-coding:utf-8-*-
from threading import Timer
class SimpleIntervalTaskTimer(object):
"""
简单的循环单任务定时器,非阻塞当前线程
"""
def __init__(self):
self.__timer: Timer = None
self.__seconds = 0
self.__action = None
self.__args = None
self.__kwargs = None
def run(self, seconds: int, action, args=None, kwargs=None):
"""
执行循环定时任务
:param seconds: 任务执行间隔,单位秒
:param action: 任务函数
:param args: 函数参数
"""
if not callable(action):
raise AttributeError("参数action非法,请传入函数变量")
if self.is_running():
print("已有任务在执行,请取消后再操作")
return
self.__action = action
self.__seconds = seconds
self.__args = args if args is not None else []
self.__kwargs = kwargs if kwargs is not None else {}
self.__run_action()
def __run_action(self):
self.__timer = Timer(self.__seconds, self.__hook, self.__args, self.__kwargs)
self.__timer.start()
def __hook(self, *args, **kwargs):
self.__action(*args, **kwargs)
self.__run_action()
def is_running(self):
"""
判断任务是否在执行
"""
return self.__timer and self.__timer.is_alive()
def cancel(self):
"""
取消循环定时任务
"""
if self.is_running():
self.__timer.cancel()
self.__timer = None
# -*-coding:utf-8-*-
import threading
import time
import SimpleIntervalTaskTimer
class TestIntervalTaskTimer(object):
def __run1(self, param):
print(param)
def __run2(self, param=None):
print(param)
def __run3(self):
timer = SimpleTaskTimer()
timer.run(1, self.__run1, args=['111'])
time.sleep(5)
timer.cancel()
timer.run(1, self.__run2, kwargs={'param': '222'})
time.sleep(5)
timer.cancel()
def runTest1(self):
self.__run3()
def runTest2(self):
threading.Thread(target=lambda: self.__run3()).start()
if __name__ == '__main__':
t = TestIntervalTaskTimer()
t.runTest1()
t.runTest2()