python apscheduler 动态_基于Flask-APScheduler实现添加动态定时任务

劳亦
2023-12-01

阅读目录

一、apSheduler

二、Flask-APScheduler

三、动态定时任务

四、uwsgi部署注意事项

一、apSheduler

第一部分内容限于apSheduler3.0以下版本,以上版本可移步至 FastAPI+apSheduler动态定时任务

1. 引子(Introduction)

Advanced Python Scheduler (APScheduler) 是一个轻量级但功能强大的进程内任务调度器,允许您调度函数(或任何其他python可调用文件)在您选择的时间执行。

2. 特性(Features)

没有(硬)外部依赖性

api线程安全

支持CPython、Jython、PyPy

可配置的调度机制(触发器):

类似cron调度

单次运行延迟调度(如UNIX“at”命令)

基于时间间隔(以指定的时间间隔运行)

支持多种存储空间

RAM

基于文件的简单数据库

SQLAlchem

MongoDB

Redis

3. 使用(Usage)

3.1 安装

pip install apscheduler

3.2 启动调度程序

from apscheduler.scheduler import Scheduler

sched = Scheduler()

sched.start()

3.3 调度job

3.3.1 简单日期调度job

在指定时间执行一次job。这是相当于UNIX“at”命令的进程内命令

from datetime import date

from apscheduler.scheduler import Scheduler

# Start the scheduler

sched = Scheduler()

sched.start()

# Define the function that is to be executed

def my_job(text):

print text

# The job will be executed on November 6th, 2009

exec_date = date(2009, 11, 6)

# 添加一个job

job = sched.add_date_job(my_job, exec_date, ['text'])

更具体地安排时间

from datetime import datetime

# The job will be executed on November 6th, 2009 at 16:30:05

job = sched.add_date_job(my_job, datetime(2009, 11, 6, 16, 30, 5), ['text'])

甚至可以将日期指定为字符串文本

job = sched.add_date_job(my_job, '2009-11-06 16:30:05', ['text'])

# 支持微秒级别

job = sched.add_date_job(my_job, '2009-11-06 16:30:05.720400', ['text'])

3.3.2 基于时间间隔的调度job

job的执行在给定延迟后开始,或者在start_date(如果指定)开始,start_date参数可以作为date/datetime对象或字符串文本给出。

from datetime import datetime

from apscheduler.scheduler import Scheduler

# Start the scheduler

sched = Scheduler()

sched.start()

def job_function():

print "Hello World"

# Schedule job_function to be called every two hours

sched.add_interval_job(job_function, hours=2)

# The same as before, but start after a certain time point

sched.add_interval_job(job_function, hours=2, start_date='2010-10-10 09:30')

装饰语法

from apscheduler.scheduler import Scheduler

# Start the scheduler

sched = Scheduler()

sched.start()

# Schedule job_function to be called every two hours

@sched.interval_schedule(hours=2)

def job_function():

print "Hello World"

如果需要取消对装饰功能的job,可以这样做

scheduler.unschedule_job(job_function.job)

3.3.3 cron调度job

与crontab表达式不同,您可以省略不需要的字段。大于最低有效明确定义字段的字段默认为,而较小的字段默认为其最小值,除了默认为。例如,如果仅指定day=1,minute=20,则作业将在每年每月的第一天以每小时20分钟的速度执行。下面的代码示例应该进一步说明这种行为。

省略字段默认为*

from apscheduler.scheduler import Scheduler

# Start the scheduler

sched = Scheduler()

sched.start()

def job_function():

print "Hello World"

# Schedules job_function to be run on the third Friday

# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00

sched.add_cron_job(job_function, month='6-8,11-12', day='3rd fri', hour='0-3')

# Schedule a backup to run once from Monday to Friday at 5:30 (am)

sched.add_cron_job(job_function, day_of_week='mon-fri', hour=5, minute=30)

装饰语法

@sched.cron_schedule(day='last sun')

def some_decorated_task():

print "I am printed at 00:00:00 on the last Sunday of every month!"

如果需要取消对装饰功能的job,可以这样做

scheduler.unschedule_job(job_function.job)

3.3.4 使用自定义触发器调度

以上事例基于内置触发器调度job,如果需要使用自定义触发器调度需要使用add_job()方法

from apscheduler.schedulers.blocking import BlockingScheduler

import datetime

def aps_test(x):

print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)

scheduler = BlockingScheduler()

scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', second='*/5')

scheduler.add_job(func=aps_test, args=('一次性任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))

scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3)

scheduler.start()

3.4 关闭调度器

sched.shutdown()

# 默认情况下,调度程序关闭其线程池,并等待直到所有当前正在执行的job完成。为了更快地退出,可以:

sched.shutdown(wait=False)

# 这仍然会关闭线程池,但不会等待任何正在运行的任务完成。此外,如果您给调度程序一个要在其他地方管理的线程池,您可能希望完全跳过线程池关闭:

sched.shutdown(shutdown_threadpool=False)

# 自动关闭调度程序的一个巧妙方法是为此使用atexit挂钩:

import atexit

sched = Scheduler(daemon=True)

atexit.register(lambda: sched.shutdown(wait=False))

# Proceed with starting the actual application

3.5 Job stores

如果没有指定stores存储位置,则将转到默认job存储 -> ramjobstore不提供持久化保存

其它存储stores:

ShelveJobStore

SQLAlchemyJobStore

MongoDBJobStore

RedisJobStore

通过配置选项或add_jobstore()方法添加作业存储。因此,以下是相等的:

config = {'apscheduler.jobstores.file.class': 'apscheduler.jobstores.shelve_store:ShelveJobStore',

'apscheduler.jobstores.file.path': '/tmp/dbfile'}

sched = Scheduler(config)

3.6 获取调度器列表

sched.print_jobs()

二、Flask-APScheduler

1. 引子(Introduction)

Flask-APScheduler 是Flask框架的一个扩展库,增加了Flask对apScheduler的支持

2. 特性(Features)

根据Flask配置加载调度器配置

根据Flask配置加载调度器job

允许指定调度程序将运行的主机名

提供REST API来管理调度job

为REST API提供认证

3. 安装(Installation)

pip install Flask-APScheduler

4. 使用(Usage)

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

from flask import Flask

from flask_apscheduler import APScheduler

class Config(object):

# 配置执行job

JOBS = [

{

'id': 'job1',

'func': 'advanced:job1',

'args': (1, 2),

'trigger': 'interval',

'seconds': 10

}

]

# 存储位置

SCHEDULER_JOBSTORES = {

'default': SQLAlchemyJobStore(url='sqlite://')

}

# 线程池配置

SCHEDULER_EXECUTORS = {

'default': {'type': 'threadpool', 'max_workers': 20}

}

SCHEDULER_JOB_DEFAULTS = {

'coalesce': False,

'max_instances': 3

}

# 调度器开关

SCHEDULER_API_ENABLED = True

def job1(a, b):

print(str(a) + ' ' + str(b))

if __name__ == '__main__':

app = Flask(__name__)

app.config.from_object(Config())

scheduler = APScheduler()

# 注册app

scheduler.init_app(app)

scheduler.start()

app.run()

三、动态定时任务

Flask + flask_apscheduler实现一个类似Jenkins的定时任务的功能,前端设置crontab,后端可以创建,修改,暂停,移除,恢复一个执行任务

文件目录

|--app

|----config.py 配置文件

|----run_tasks.py 开启任务

|----tasks.py 任务job

|----apSheduler.py 提供接口函数

|----extensions.py flask扩展

|----__init__.py 初始化文件

|----views.py 业务代码

|--manage.py 项目启动文件

1. config.py配置flask_apscheduler

class Config(object):

# 开关

SCHEDULER_API_ENABLED = True

# 持久化配置

SCHEDULER_JOBSTORES = {

'default': SQLAlchemyJobStore(url='sqlite:///flask_context.db')

}

SCHEDULER_EXECUTORS = {

'default': {'type': 'threadpool', 'max_workers': 20}

}

2. init.py创建app

from app.config import Config

from app.extensions import scheduler

# 创建app

def create_app(config=None, app_name=None, blueprints=None):

app = Flask(app_name, static_folder='thanos/static',

template_folder='thanos/resource/report')

# 导入flask配置 -> 这里根据自己的项目导入配置就好哇

# config = Config.get_config_from_host(app.name)

app.config.from_object(config)

# 初始化调度器配置

configure_scheduler(app)

def configure_scheduler(app):

"""Configure Scheduler"""

scheduler.init_app(app)

scheduler.start()

# 加载任务,选择了第一次请求flask后端时加载,可以选择别的方式...

@app.before_first_request

def load_tasks():

# 开启任务

from app import run_tasks

3. extensions.py实例化scheduler

from flask_apscheduler import APScheduler

scheduler = APScheduler()

4. apSheduler.py提供调度器接口

"""此文件可以根据具体业务复杂化选择写或者直接调用原apscheduler接口"""

from flask import current_app

# from .extensions import scheduler 直接导入单例对象操作也行

class APScheduler(object):

"""调度器控制方法"""

def add_job(self, jobid, func, args, **kwargs):

"""

添加任务

:param args: 元祖 -> (1,2)

:param jobstore: 存储位置

:param trigger:

data -> run_date datetime表达式

cron -> second/minute/day_of_week

interval -> seconds 延迟时间

next_run_time -> datetime.datetime.now() + datetime.timedelta(seconds=12))

:return:

"""

job_def = dict(kwargs)

job_def['id'] = jobid

job_def['func'] = func

job_def['args'] = args

job_def = self.fix_job_def(job_def)

self.remove_job(jobid) # 删除原job

current_app.apscheduler.scheduler.add_job(**job_def)

def remove_job(self, jobid, jobstore=None):

"""删除任务"""

current_app.apscheduler.remove_job(jobid, jobstore=jobstore)

def resume_job(self, jobid, jobstore=None):

"""恢复任务"""

current_app.apscheduler.resume_job(jobid, jobstore=jobstore)

def pause_job(self, jobid, jobstore=None):

"""恢复任务"""

current_app.apscheduler.pause_job(jobid, jobstore=jobstore)

def fix_job_def(self, job_def):

"""维修job工程"""

if job_def.get('trigger') == 'date':

job_def['run_date'] = job_def.get('run_date') or None

elif job_def.get('trigger') == 'cron':

job_def['hour'] = job_def.get('hour') or "*"

job_def['minute'] = job_def.get('minute') or "*"

job_def['week'] = job_def.get('week') or "*"

job_def['day'] = job_def.get('day') or "*"

job_def['month'] = job_def.get('month') or "*"

elif job_def.get('trigger') == 'interval':

job_def['seconds'] = job_def.get('seconds') or "*"

else:

if job_def.get("andTri"):

job_def['trigger'] = AndTrigger([job_def.pop("andTri", None), ])

# job_def['next_run_time'] = job_def.get('next_run_time') or None

return job_def

5. views.py 实现调度器接口

from app.apSheduler import APScheduler

# croniter库解析Linux cron格式的计划

# 以添加为例子 暂停 删除 恢复可以根据业务场景自己写接口

def add_crontab_task(self, params):

"""添加一个crontab任务"""

try:

self.crontab = params.get("crontab")

self.id = params.get("id")

self.task_id = params.get("task_id")

except Exception as e:

return False, str(e)

# 记录数据库

res = addSql()

# 更新任务信息

APScheduler().add_job(jobid=self.id, func=task_func,

args=(self.task_id,), andTri=CronTrigger.from_crontab(self.crontab))

if res is False:

return False, "数据库操作异常"

return True, croniter(self.crontab, datetime.now()).get_next(datetime)

def get_next_execute_time(self, params):

"""获取下一次执行时间"""

try:

self.crontab = params.get("crontab")

except Exception as e:

return False, str(e)

return True, str(croniter(self.crontab, datetime.now()).get_next(datetime))

6. tasks.py 任务job

def task_func(task_id):

"""业务逻辑"""

# 发邮件、写诗、画画 -> 爱干啥干啥

7. run_tasks.py 开启任务调度大门

from .task import task_func

from apscheduler.triggers.cron import CronTrigger # 可以很友好的支持添加一个crontab表达式

def run_task():

# 查询数据库的crontab信息 -> 定时任务信息

res = fetall("select * from crontab_table")

# 遍历添加任务

shche = APScheduler()

for rs in res:

shche.add_job(jobid=rs.get(id), func=task_func,

args=(rs.get(task_id)), andTri=CronTrigger.from_crontab(rs.get(crontab)))

# 最重要的

run_task() # 这样当__init__.py创建app时加载这个文件,就会执行添加历史任务啦!

8. manage.py 启动项目

from app import create_app

app = create_app()

app.run()

四、uwsgi部署注意事项

1. 常见问题及解决方案

1.1 线上部署uWSGI+APScheduler执行定时任务卡死

1.1.1问题分析:

APScheduler运行环境需要为多线程,uwsgi默认是one thread ,one process,需要在配置文件里面加上一条 enable-thread = true,也就是允许程序内部启动多线程。

1.1.2解决方案:

# uwsgi.ini文件追加以下配置

enable-threads = true

preload=True #用--preload启动uWSGI,确保scheduler只在loader的时候创建一次

lazy-apps=true

1.2 定时任务多次执行的问题

1.2.1问题分析:

1.本地原因,错过了上次执行时间,下次会多次执行

2.线上部署的,如uWSGI部署,配置了processes>1导致加载了多此apscheduler(apscheduler当前没有任何进程间同步和信令方案)

1.2.3解决方案:

1. 本地多次执行可以在Flask启动方法中加use_reloader=False

app.run(host="0.0.0.0", port=8888, use_reloader=False)

2.线上linux可以借鉴下面的方法,网上借鉴的

在__init__.py文件中修改中configure_scheduler(),用全局锁确保scheduler只运行一次, 代码如下:

import atexit

import fcntl # 只能用于linux

from .extensions import scheduler

def configure_scheduler(app):

"""Configure Scheduler"""

f = open("scheduler.lock", "wb")

try:

fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)

scheduler.init_app(app)

scheduler.start()

# 加载任务

@app.before_first_request

def load_tasks():

from thanos import run_tasks

except:

pass

def unlock():

fcntl.flock(f, fcntl.LOCK_UN)

f.close()

atexit.register(unlock)

init函数为flask项目初始化所调用,这里为scheduler模块的初始化部分。首先打开(或创建)一个scheduler.lock文件,并加上非阻塞互斥锁。成功后创建scheduler并启动。如果加文件锁失败,说明scheduler已经创建,就略过创建scheduler的部分。

最后注册一个退出事件,如果这个flask项目退出,则解锁并关闭scheduler.lock文件的锁。

 类似资料: