1、安装:
pip install Flask-APScheduler
2、使用示例代码:
import json
import pickle
from flask import Flask, request
from flask_apscheduler import APScheduler # 主要插件
import datetime
import datetime as dt
import datetime
from datetime import datetime, timedelta
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
import time
import shanyouAPI
import logging
app = Flask(__name__)
scheduler = APScheduler()
def task1(a, b):
print('mission_1_',a,b)
print(datetime.datetime.now())
def task2(a, b):
print('mission_2_',a, b)
print(datetime.datetime.now())
# 暂停任务
# scheduler.pause_job('third')
# 恢复任务
# time.sleep(10)
# scheduler.resume_job('third')
# 删除任务
# scheduler.remove_job('first')
@app.route('/pause', methods=['GET'])
def pause_job(): # 暂停
job_id = request.args.get('id')
scheduler.pause_job(str(job_id))
return "pause success!"
@app.route('/resume', methods=['GET'])
def resume_job(): # 恢复
job_id = request.args.get('id')
scheduler.resume_job(str(job_id))
return "Success!"
@app.route('/get_jobs', methods=['GET'])
def get_task(): # 获取
# job_id = request.args.get('id')
jobs = scheduler.get_jobs()
print(jobs)
return 'jobs:'+str(pickle.dumps(jobs))
@app.route('/remove_job', methods=['GET'])
def remove_job(): # 移除
job_id = request.args.get('id')
scheduler.remove_job(str(job_id))
return 'remove success'
# /add_job?id=2
@app.route('/add_job', methods=['GET'])
def add_task():
data = request.args.get('id')
if data == '1':
# trigger='cron' 表示是一个定时任务
scheduler.add_job(func=task1, id='1', args=(1, 1), trigger='cron', day_of_week='0-6', hour=18, minute=24,
second=10, replace_existing=True)
else:
# trigger='interval' 表示是一个循环任务,每隔多久执行一次
scheduler.add_job(func=task2, id='2', args=(2, 2), trigger='interval', seconds=10,
replace_existing=True)
return 'sucess'
# mission_2_ 2 2
# 2019-07-17 18:19:04.663956
# mission_1_ 1 1
# 2019-07-17 18:19:10.010007
# mission_2_ 2 2
# 2019-07-17 18:19:14.664401
def test_add():
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='log1.txt',
filemode='a')
# import shanyouAPI
REDIS = {
'host': '127.0.0.1',
'port': '6379',
'db': 0,
'password': '****'
}
jobstores = {
'redis': RedisJobStore(**REDIS)
}
executors = {
'default': ThreadPoolExecutor(10), # 默认线程数
'processpool': ProcessPoolExecutor(3) # 默认进程
}
def alarm():
print('Alarm! This alarm was scheduled at %s.')
mcuid = "***"
pos_id = 2
oper_type = 8
# def control_center(mcuid, pos_id, oper_type):
# print("获取参数:", mcuid, pos_id, oper_type)
# alarm_time = datetime.now() + timedelta(seconds=15)
init_time = '2022-02-27 02:41:00'
alarm_time = dt.datetime.strptime(init_time, "%Y-%m-%d %H:%M:%S")
sched_1 = BackgroundScheduler(jobstores=jobstores, executors=executors)
# sched_1 = BlockingScheduler(jobstores=jobstores, executors=executors)
job = sched_1.add_job(func=shanyouAPI.control_center, id="shanyou1234", trigger='date', jobstore='redis',
run_date=alarm_time, args=[mcuid, pos_id, oper_type], replace_existing=True)
sched_1._logger = logging
sched_1.start()
print(sched_1.get_jobs())
print(datetime.now())
if __name__ == '__main__':
scheduler.init_app(app=app)
test_add()
scheduler.start()
# debug=True会导致重复执行两次函数
app.run(debug=True, port=8080)
优质文章: