当前位置: 首页 > 工具软件 > MRQ > 使用案例 >

使用MRQ实现多机并发执行任务

郑嘉年
2023-12-01

MRQ:https://github.com/pricingassistant/mrq

服务依赖

Python 2.7+
MongoDB >= 2.4
Redis >= 2.6

MRQ依赖redis和mongodb,需要分别安装mongodb,redis
安装mrq直接使用:pip install mrq

服务配置

参考:https://github.com/pricingassistant/mrq/blob/master/docs/configuration.md

mrq-config.py:

#!/usr/bin/env python
# coding=utf-8
"""mongodb 配置"""
MONGODB_JOBS = "mongodb://127.0.0.1:27017/mrq"
"""Redis配置"""
REDIS = "redis://127.0.0.1:6379"
"""mrq-worker配置"""
QUEUES = ("test1", "default") # The queues to listen on.Defaults to default , which will listen on all queues.
GREENLETS = 1 #Max number of greenlets to use.Defaults to 1.
NAME = "test1" #Specify a different name.
DEFAULT_JOB_TIMEOUT = 3600 #In seconds, delay before interrupting the job.Defaults to 3600 seconds which is 1 hour.
REPORT_INTERVAL = 5 * 60 #Seconds between worker reports to MongoDB.Defaults to 10 seconds, floats are acceptable too.

""" 
mrq-dashboard settings
"""
DASHBOARD_HTTPAUTH = "" #HTTP Auth for the Dashboard. Format is user
DASHBOARD_QUEUE = "default" #Default queue for dashboard actions.
DASHBOARD_PORT = 5555 #Use this port for mrq-dashboard.Defaults to port 5555.
DASHBOARD_IP = "0.0.0.0" #Bind the dashboard to this IP. Default is 0.0.0.0, use 127.0.0.1 to restrict access.

#定时任务配置 https://github.com/pricingassistant/mrq/blob/master/docs/jobs-maintenance.md
SCHEDULER = True #Run the scheduler.Defaults to False.
SCHEDULER_TASKS = [
    dict(path="test.py",
         params=dict(),
         interval=300,
         queue="test1"),
    # 增加其他的定时任务
]

Task API

https://mrq.readthedocs.io/en/latest/jobs/#jobs-tasks

Task.run(self, params) :
每个任务都必须继承mrq.task.task类,该类提供Task.run(self, params) 接口,所有任务的主要运行入口。

Job API

在mrq中使用该接口创建任务并放入执行队列的方法:queue_job(main_task_path, params, queue=None)

如果未提供队列,则将使用配置中定义的该任务的默认队列。如果没有,则使用队列默认值,返回作业的ID。

例子

from mrq import context, job, task
from subprocess import Popen, PIPE, DEVNULL

class TestQueue(task.Task):
    """任务入队列"""
    def run(self, params):
        field = ''
        job.queue_job("test_run", field, queue="test1")

class TestRun(task.Task):
    """任务执行"""
    def run(self, params):
        cmd = 'cat test.txt'
        #执行某个shell 任务
        shell = Popen(cmd, stdout=PIPE, stderr=DEVNULL)
 类似资料: