当前位置: 首页 > 工具软件 > APScheduler > 使用案例 >

Flask_APScheduler集成Flask项目系列入门篇(一)

楚茂实
2023-12-01

Flask_APScheduler集成Flask项目系列入门篇(一)

1.概述

APScheduler集成Flask项目是一个系列文章,它不是一篇介绍工具的文章,他是在介绍开发API接口项目工程中使用APScheduler解决实际定时执行任务需求。

为什么用一个系列博客介绍APScheduler模块?

  1. 一篇文章介绍完APScheduler模块,可以了解该模块的结构和使用方法。如果到这里就结束了,那么只能是用这个模块做一些小Demo示例,就好比纸上谈兵。
  2. 将它放到项目中使用时候,会发现还需要对它做一些封装,才能与项目完美结合把这个模块的作用发挥出来。
    Flask_APScheduler插件封装业务逻辑持久化任务(二)
    https://blog.csdn.net/m0_38039437/article/details/130196860

Flask_APScheduler与APScheduler区别
Flask将APScheduler模块作为一个扩展库添加到了他的框架中,本身还是使用APScheduler完成任务调度。Flask对APScheduler的使用提供了更加完善的接口,符合Flask使用习惯。

2.APScheduler模块介绍

2.1.APScheduler结构

APScheduler模块由四个部分组成,他们各尽其职,通过调度器协调工作。

  • 任务存储(job store):存储被调度的任务,默认的任务存储是保存在内存中。同时支持任务存储在数据库中,一个任务的数据将在保存到持久化作业存储时被序列化,并在加载时被反序列化。
  • 触发器(trigger):包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个任务会运行。除了他们自己初始配置以外,触发器完全是无状态的。
  • 执行器(executor):任务的运行,通过在执行器中提交指定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
  • 调度器(scheduler):调度器相当于总指挥,协调其他组件工作。通常在应用只有一个调度器,应用的开发者不会直接调用任务的存储器、执行器和触发器,开发者通过调度器提供的接口来操作他们。

2.2.组件详细介绍

1.任务

任务是被执行对象,通过调用器提供的接口可以添加、修改、删除任务。
创建任务信息如下

  • id:指定任务的唯一ID
  • name:指定任务的名字
  • trigger:apscheduler定义的触发器,用于确定Job的执行时间,根据设置的trigger规则,计算得到下次执行此job的
    时间, 满足时将会执行
  • executor:apscheduler定义的执行器,job创建时设置执行器的名字,根据字符串你名字到scheduler获取到执行此
    job的 执行器,执行job指定的函数
  • max_instances:执行此job的最大实例数,executor执行job时,根据job的id来计算执行次数,根据设置的最大实例数
    来确定是否可执行
  • next_run_time:Job下次的执行时间,创建Job时可以指定一个时间[datetime],不指定的话则默认根据trigger获取触
    发时间
  • misfire_grace_time:Job的延迟执行时间,例如Job的计划执行时间是21:00:00,但因服务重启或其他原因导致
    21:00:31才执行,如果设置此key为40,则该job会继续执行,否则将会丢弃此job
  • coalesce:Job是否合并执行,是一个bool值。例如scheduler停止20s后重新启动,而job的触发器设置为5s执行
    一次,因此此job错过了4个执行时间,如果设置为是,则会合并到一次执行,否则会逐个执行
  • func:Job执行的函数
  • args:Job执行函数需要的位置参数
  • kwargs:Job执行函数需要的关键字参数

2.Trigger 触发器

决定任务运行条件,每个任务都可以配置自己的触发器,用于决定接下来哪一个作业会运行。除了它们自己初始配置以外,触发器完全是无状态的。

APScheduler 有三种内建的 Trigger:

  • date: 特定的时间点只触发一次,例如为发送右键设置一个定时发送任务
  • interval: 每次间隔一段实际周期性的触发,例如每隔1小时提醒喝水。
  • cron: 在特定时间周期性地触发

3.Jobstore 任务存储

APScheduler任务默认存储在内存中,如果应用重启已创建的任务就会消失。如果你的应用在每次启动的时候都会重新创建任务,那么使用默认的任务存储器(MemoryJobStore)即可,但是如果你需要应用重启后任务存储器中任然保留任务,你应该根据你的应用环境来选择具体的任务存储器。例如:使用Mongo或者SQLAlchemy JobStore (用于支持大多数RDBMS)。

4.Executor 执行器

Executor执行器在scheduler中初始化,另外也可通过scheduler的add_executor动态添加Executor执行器。
每个executor都会绑定一个alias,这个作为唯一标识绑定到Job,在实际执行时会根据Job绑定的executor。找到实际的执行器对象,然后根据执行器对象执行Job。
Executor执行器有七种可根据业务需要选择

  • BlockingScheduler : 调度器在当前进程的主线程中运行,也就是会阻塞当前线程。
  • BackgroundScheduler : 调度器在后台线程中运行,不会阻塞当前线程。
  • AsyncIOScheduler : 结合 asyncio 模块(一个异步框架)一起使用。
  • GeventScheduler : 程序中使用 gevent(高性能的Python并发框架)作为IO模型,和 GeventExecutor 配合使
  • TornadoScheduler : 程序中使用 Tornado(一个web框架)的IO模型,用 ioloop.add_timeout 完成定时唤醒。
  • TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定时唤醒。
  • QtScheduler : 你的应用是一个 Qt 应用,需使用QTimer完成定时唤醒。

5.scheduler 调度器

Scheduler是APScheduler的核心,任务调度器是属于整个调度的总指挥官。它会合理安排任务存储器、执行器、触发器进行工作,并进行添加、删除、暂停任务等。

3.APScheduler使用

下面通过一些示例介绍下不同类型的执行器和触发器使用,在以后开发项目环境中根据需求选择适合的执行器和触发器完成定时任务调度。

1.安装Flask_APScheduler

pip install flask_apscheduler
# 如果使用pipenv管理环境,使用pipenv安装
pipenv install flask_apscheduler

3.1.选择不同类型执行器执行任务

1.BlockingScheduler执行器

BlockingScheduler : Blocking解释为阻塞,该调度器在当前进程的主线程中运行,调用start函数后会阻塞当前线程。当调度器是你应用中唯一要运行的东西时使用。

from apscheduler.schedulers.blocking import BlockingScheduler
import time

def main_thread():
    # 主线程
    while (True):
        print('主线程每隔1s运行一次')
        time.sleep(1)

def job():
    print('定时任务每隔3s运行一次')


if __name__ == '__main__':
    # 创建后台执行器类型调度器
    sched = BlockingScheduler()
    # 添加任务
    sched.add_job(job, 'interval', id='3_second_job', seconds=3)
    # 启动调度器
    sched.start()
    # 启动主线程
    main_thread()

BlockingScheduler调用start函数后会阻塞应用的当前线程,导致main_thread()函数不会运行。这里用main_thread()函数表示我们应用主线程。

定时任务每隔3s运行一次
定时任务每隔3s运行一次
定时任务每隔3s运行一次

2.BackgroundScheduler执行器

BackgroundScheduler : Background解释为后台,该调度器在后台线程中运行,不会阻塞当前线程。

from apscheduler.schedulers.background import BackgroundScheduler
import time

def main_thread():
    # 主线程
    while (True):
        print('主线程每隔1s运行一次')
        time.sleep(1)

def job():
    print('定时任务每隔3s运行一次')


if __name__ == '__main__':
    # 创建后台执行器类型调度器
    sched = BackgroundScheduler()
    # 添加任务
    sched.add_job(job, 'interval', id='3_second_job', seconds=3)
    # 启动调度器
    sched.start()
    # 启动主线程
    main_thread()

BackgroundScheduler调用start函数后并不会阻塞当前线程,所以可以继续执行主程序中main_thread()函数

主线程每隔1s运行一次
主线程每隔1s运行一次
主线程每隔1s运行一次
定时任务每隔3s运行一次
主线程每隔1s运行一次
主线程每隔1s运行一次
主线程每隔1s运行一次
定时任务每隔3s运行一次

3.2.选择不同类型的触发器

1.date触发器

date: 特定的时间点只触发一次,例如为发送右键设置一个定时发送任务

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

def job():
    print('date触发器运行任务')


if __name__ == '__main__':
    # 创建后台执行器类型调度器
    sched = BlockingScheduler()
    # 添加任务,到达指定时间才会运行且只运行1次
    sched.add_job(func=job, trigger='date', run_date=datetime.datetime(2023 ,2 , 21, 10, 43, 30), id='3_second_job')
    # 或者用next_run_time指定运行时间
    # sched.add_job(func=job, trigger='date', next_run_time=datetime.datetime(2023, 2, 21, 10, 43, 30), id='3_second_job')
    
    # 启动调度器
    sched.start()

运行代码后,没有立即执行等到达设置时间后才会运行,且运行了一次。

date触发器运行任务

2.interval触发器

interval: 每次间隔一段实际周期性的触发,例如每隔1小时提醒喝水。
interval可以设置的参数:

  • weeks 周
  • days 一个月的第几天
  • hours 小时
  • minutes 分钟
  • seconds 秒
  • start_date 间隔触发的开始时间
  • end_date 间隔触发的结束时间
  • jitter 触发的时间误差
from apscheduler.schedulers.blocking import BlockingScheduler

def job():
    print('interval间隔性循环触发器运行任务')


if __name__ == '__main__':
    # 创建后台执行器类型调度器
    sched = BlockingScheduler()
    # 添加任务,循环运行
    sched.add_job(func=job, trigger='interval', seconds=3 , id='3_second_job')
    # 启动调度器
    sched.start()

设置触发器每隔3秒运行一次任务

interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务

3.cron触发器

crontab触发器:在特定时间周期性地触发,如每天,周循环等场景,它是功能最强大的触发器。
参数范围只适用于int类型,str类型有无限可能

  • year: 年,4位数字。int 或 str
  • month: 月 (范围1-12)。int 或 str
  • day: 日 (范围1-31)。int 或 str
  • week:周 (范围1-53)。int 或 str
  • day_of_week: 周内第几天或者星期几 (范围0-6,0是周一,6是周天 或者 mon,tue,wed,thu,fri,sat,sun)。int 或 str
  • hour: 时 (范围0-23)。(int 或 str)
  • minute: 分 (范围0-59)。(int 或 str)
  • second: 秒 (范围0-59)。(int 或 str)
  • start_date: 最早开始日期(包含)。(datetime 或 str)
  • end_date: 最晚结束时间(包含)。(datetime 或 str)
  • timezone: 指定时区。(datetime 或 str)
from apscheduler.schedulers.blocking import BlockingScheduler

def job():
    print('cron特定时间周期性运行任务')


# 将cron表达式转换为 秒、分、周、月、年格式,传给cron触发器
def change_cron(expression):
    args = {}
    # 以空格为分隔符拆分字符串输出列表,拆分结果 ['0/2', '*', '*', '*', '*', '?']
    expression = expression.split(' ')
    if expression[0] != '?':
        args['second'] = expression[0]
    if expression[1] != '?':
        args['minute'] = expression[1]
    if expression[2] != '?':
        args['hour'] = expression[2]
    if expression[3] != '?':
        args['day'] = expression[3]
    if expression[4] != '?':
        args['month'] = expression[4]
    if expression[5] != '?':
        args['day_of_week'] = expression[5]
    return args

if __name__ == '__main__':
    # 创建后台执行器类型调度器
    sched = BlockingScheduler()
    # 添加任务,循环运行
    # 每周一早晨9点30分执行func任务
    sched.add_job(func=job, trigger="cron", day_of_week=0, hour=9, minute=30)
    # 间隔1分钟执行一次,与interval触发器使用功能相同
    sched.add_job(func=job, trigger="cron", minute="*/1")
    # 如果要接收cron表达式,需要对格式做个转换。通过change_cron函数转换格式。
    sched.add_job(func=job, trigger="cron", **change_cron('0/2 * * * * ?'))
    # 启动调度器
    sched.start()

使用cron触发器,设置运行时间默认接收关键字类型参数,例如day_of_week=0, hour=9, minute=30。如果要接收一个cron表达式,需要通过一个函数将它转为关键字参数类型。上面的示例中change_cron() 函数将corn表达式0/2 * * * * ? 转为关键字参数。

cron特定时间周期性运行任务
cron特定时间周期性运行任务
cron特定时间周期性运行任务

3.3.任务存储job store

存储被调度的任务,默认的任务存储是保存在内存中。同时支持任务存储在数据库中,一个任务的数据将在保存到持久化作业存储时被序列化,并在加载时被反序列化。

1.添加任务add_job

使用调度器调用add_job()方法添加任务

from apscheduler.schedulers.blocking import BlockingScheduler

def job():
    print('interval间隔性循环触发器运行任务')


if __name__ == '__main__':
    # 创建后台执行器类型调度器
    sched = BlockingScheduler()
    # 添加任务,循环运行
    sched.add_job(func=job, trigger='interval', seconds=3 , id='3_second_job')
    # 启动调度器
    sched.start()

2.删除任务remove_job

通过主线程运行edit_job()函数删除job任务,是任务停止运行。

import time
from apscheduler.schedulers.background import BackgroundScheduler

def job():
    print('interval间隔性循环触发器运行任务')

def edit_job():
    time.sleep(7)
    sched.remove_job(job_id='3_second_job')
    print('删除job任务')

if __name__ == '__main__':
    # 创建后台执行器类型调度器
    sched = BackgroundScheduler()
    # 添加任务,循环运行
    sched.add_job(func=job, trigger='interval', seconds=3 , id='3_second_job')
    # 启动调度器
    sched.start()
    # 启动主线程
    edit_job()

job循环运行了两次,运行删除任务后,任务不再运行。

interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务
删除job任务

3.暂停与恢复任务

import time
from apscheduler.schedulers.background import BackgroundScheduler

def job():
    print('interval间隔性循环触发器运行任务')

def my_pause_job():
    time.sleep(7)
    sched.pause_job(job_id='3_second_job')
    print('暂停job任务')

def my_resume_job():
    time.sleep(1)
    sched.resume_job(job_id='3_second_job')
    print('恢复job任务')
    time.sleep(4)

if __name__ == '__main__':
    # 创建后台执行器类型调度器
    sched = BackgroundScheduler()
    # 添加任务,循环运行
    sched.add_job(func=job, trigger='interval', seconds=3 , id='3_second_job')
    # 启动调度器
    sched.start()
    # 启动主线程
    my_pause_job()
    my_resume_job()

恢复任务后又重新开始运行

interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务
暂停job任务
恢复job任务
interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务

4.修改任务modify_job和reschedule_job区别

修改任务可以有两个函数,modify_job()reschedule_job() ,他们区别如下

  • 1.调用modify_job() 函数修改任务时需要我们自己通过调度器创建一个触发器sched._create_trigger() 然后用这个触发器替换任务中原有触发器信息。
    1. reschedule_job()函数帮我们做了创建触发器的工作,然后调用的modify_job() 函数修改任务,因此使用这个函数修改任务操作更加方便。

modify_job()函数修改任务示例

import time
from apscheduler.schedulers.background import BackgroundScheduler

# 定时器运行的任务
def job():
    print('interval间隔性循环触发器运行任务')

# 修改任务信息
def edit_job():
    temp_dict = {"seconds": 1}
    # 创建一个触发器
    temp_trigger = sched._create_trigger(trigger='interval', trigger_args=temp_dict)
    # 修改job信息
    sched.modify_job(job_id='3_second_job', trigger=temp_trigger)
    print('修改任务每隔1s运行一次')
    time.sleep(9)


if __name__ == '__main__':
    # 创建后台执行器类型调度器
    sched = BackgroundScheduler()
    # 添加任务,循环运行
    sched.add_job(func=job, trigger='interval', seconds=3, id='3_second_job')
    # 启动调度器
    sched.start()
    # 启动主线程
    edit_job()

通过创建的触发器修改任务后由3秒运行一次变为1秒运行一次

修改任务每隔1s运行一次
interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务

reschedule_job函数修改任务示例

import time
from apscheduler.schedulers.background import BackgroundScheduler


def job():
    print('interval间隔性循环触发器运行任务')

def edit_reschedule_job():
    param = {"trigger": "interval", "seconds": 1}
    sched.reschedule_job(job_id='3_second_job', **param)
    print('修改任务每隔1s运行一次')
    time.sleep(9)

if __name__ == '__main__':
    # 创建后台执行器类型调度器
    sched = BackgroundScheduler()
    # 添加任务,循环运行
    sched.add_job(func=job, trigger='interval', seconds=3, id='3_second_job')
    # 启动调度器
    sched.start()
    # 启动主线程
    edit_reschedule_job()

reschedule_job函数修改任务更加简单,不需要单独创建触发器,直接传入修改的参数即可。

修改任务每隔1s运行一次
interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务

reschedule_job函数源码

def reschedule_job(self, job_id, jobstore=None, trigger=None, **trigger_args):
    """
    Constructs a new trigger for a job and updates its next run time.

    Extra keyword arguments are passed directly to the trigger's constructor.

    :param str|unicode job_id: the identifier of the job
    :param str|unicode jobstore: alias of the job store that contains the job
    :param trigger: alias of the trigger type or a trigger instance
    :return Job: the relevant job instance

    """
    # 创建一个触发器
    trigger = self._create_trigger(trigger, trigger_args)
    # 获取当前时间
    now = datetime.now(self.timezone)
    # 获取下次运行时间
    next_run_time = trigger.get_next_fire_time(None, now)
    # 调用modify_job函数修改任务
    return self.modify_job(job_id, jobstore, trigger=trigger, next_run_time=next_run_time)

5.关闭调度器

默认情况下,调度器会先把正在执行的任务处理完,再关闭任务储存器和执行器。但是,如果你就直接关闭,你可以添加参数:
scheduler.shutdown(wait=False)不管有没有任务在执行,会强制关闭调度器。

6.查询任务信息get_job

import time
from apscheduler.schedulers.background import BackgroundScheduler


def job():
    print('interval间隔性循环触发器运行任务')

def my_get_job():
    jobinfo = sched.get_job(job_id='3_second_job')
    jobs = sched.get_jobs()
    print(f'获取指定任务信息: {jobinfo}')
    print(f'获取所有任务信息: {jobs}')
    time.sleep(4)

if __name__ == '__main__':
    # 创建后台执行器类型调度器
    sched = BackgroundScheduler()
    # 添加任务,循环运行
    sched.add_job(func=job, trigger='interval', seconds=3, id='3_second_job')
    # 启动调度器
    sched.start()
    # 启动主线程
    my_get_job()

8.数据库获取任务

APScheduler默认将任务存储在了内存中,我们应用重启后任务就会失效。将任务存入数据库可以实现任务持久化

配置Mongodb数据库为存储任务源

from datetime import datetime
from pymongo import MongoClient
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
# MongoDB 参数
host = '127.0.0.1'
port = 27017
client = MongoClient(host, port)
# 输出时间
def job():
    print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 存储方式
jobstores = {
    'mongo': MongoDBJobStore(collection='job', database='test', client=client),
    'default': MemoryJobStore()
}
executors = {
    'default': ThreadPoolExecutor(10),
    'processpool': ProcessPoolExecutor(3)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
# 初始化调度器
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.add_job(job, 'interval', seconds=5, jobstore='mongo')
scheduler.start()

3.4.事件监听

可以为 scheduler 绑定事件监听器(event listen)。Scheduler 事件在某些情况下会被触发,而且它可能携带有关特定事件的细节信息。为add_listener()函数提供适当的掩码参数(mask argument)或者是将不同的常数组合到一起,可以监听特定类型的事件。可调用的listener可以通过event object作为参数而被调用。

事件对应枚举值描述归属类
EVENT_SCHEDULER_STARTED1调度程序启动SchedulerEvent
EVENT_SCHEDULER_SHUTDOWN2调度程序关闭SchedulerEvent
EVENT_SCHEDULER_PAUSED4调度程序中任务处理暂停SchedulerEvent
EVENT_SCHEDULER_RESUMED8调度程序中任务处理恢复SchedulerEvent
EVENT_EXECUTOR_ADDED16将执行器添加到调度程序中SchedulerEvent
EVENT_EXECUTOR_REMOVED32执行器从调度程序中删除SchedulerEvent
EVENT_JOBSTORE_ADDED64将任务存储添加到调度程序中SchedulerEvent
EVENT_JOBSTORE_REMOVED128任务存储从调度程序中删除SchedulerEvent
EVENT_ALL_JOBS_REMOVED256所有任务从所有任务存储中删除或从一个特定的任务存储中删除SchedulerEvent
EVENT_JOB_ADDED512任务添加到任务存储中JobEvent
EVENT_JOB_REMOVED1024从任务存储中删除了任务JobEvent
EVENT_JOB_MODIFIED2048从调度程序外部修改了任务JobEvent
EVENT_JOB_EXECUTED4096任务被成功执行JobExecutionEvent
EVENT_JOB_ERROR8192任务在执行期间引发异常JobExecutionEvent
EVENT_JOB_MISSED16384错过了任务执行JobExecutionEvent
EVENT_JOB_SUBMITTED32768任务已经提交到执行器中执行JobSubmissionEvent
EVENT_JOB_MAX_INSTANCES65536任务因为达到最大并发执行时,触发的事件JobSubmissionEvent
EVENT_ALL包含以上的所有事件
# 导入调度器,此处使用BackgroundScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入事件类
from apscheduler.events import EVENT_ALL
# 导入日志记录器
from loguru import logger
import time


# 定时任务执行函数
def my_task():
    logger.info("执行task任务")


# 事件监听函数
def my_listener(event):
    match event.code:
        case 4096:
            logger.info("任务被成功执行")
        case 32768:
            logger.info("任务已经提交到执行器中执行")
        case _:
            logger.info(event.code)


if __name__ == '__main__':
    try:
        # 实例化调度器对象
        scheduler = BackgroundScheduler()
        # 添加定时任务,指定任务函数和触发器
        my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=2))
        logger.error("开始定时任务")
        # 开始执行定时任务调度器
        scheduler.start()
        # 添加监听器
        scheduler.add_listener(my_listener, mask=EVENT_ALL)
        time.sleep(4)
    except (KeyboardInterrupt, SystemExit):
        logger.error("进程已结束运行")

该示例通过监听事件ID号,做出对应的操作。

2023-02-21 17:25:48.561 | ERROR    | __main__:<module>:41 - 开始定时任务
2023-02-21 17:25:50.565 | INFO     | __main__:my_task:21 - 执行task任务
2023-02-21 17:25:50.566 | INFO     | __main__:my_listener:28 - 任务被成功执行
2023-02-21 17:25:50.566 | INFO     | __main__:my_listener:30 - 任务已经提交到执行器中执行
 类似资料: