当前位置: 首页 > 工具软件 > Dramatiq > 使用案例 >

dramatiq分布式任务队列处理-快速上手

甘骞尧
2023-12-01

简介

类似于Celery,整体代码结构清新明朗,看看源码就知道咋回事了,性能也比celery要好。详情请移步官网

基础

安装

pip install 'dramatiq[redis, watch]' gevent

watch是用于监控的;生产环境可以不用安装。
这里使用redis作为消息队列,其他的请看官网。

使用

任务处理程序core.py

import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.middleware import Prometheus, default_middleware, CurrentMessage


CONF_REDIS = {
    'host': 'redis',
    'port': 6379,
    'db': 5,
    'password': 'yuqing_sandbox:iwP5Zh1J75E5yj',
    'decode_responses': False,
    'encoding': 'utf-8',
    'max_connections': 10,  # 注意:需要大于开启的线程数量-t
}

default_middleware.remove(Prometheus)  # 移除了默认的统计中间件
default_middleware.append(CurrentMessage)  # 加入了可在actor中获取当前消息的中间件
cus_middleware = [m() for m in default_middleware]
redis_broker = RedisBroker(**CONF_REDIS, middleware=cus_middleware)

dramatiq.set_broker(redis_broker)


@dramatiq.actor(
    queue_name="test",
    on_success="success_handle", on_failure="failure_handle",
    pipe_ignore=False,
)
def count_words(text: str):
	print(CurrentMessage().get_current_message()) 
    count = len(text.split(" "))
    print(f"There are {count} words at <{text}>.")
    return count


@dramatiq.actor(queue_name="other")
def success_handle(message: dict, result):
    # message: {'queue_name': 'test', 'actor_name': 'count_words', 'args': [], 'kwargs': {'text': 'Assuming you want to
    # enqueue a message on a queue named default, publish a persistent message to that queue in RabbitMQ.'}, 'options': {'redis_message_id': 'b466c984-89e5-40a7-a3f9-2cfa76219f32'}, 'message_id': '76635de0-b582-45fd-ba74-ae464b5003cd', 'message_timestamp': 1617513422067}
    print(f"成功处理消息:{message}")
	# 开启了结果存储中间件才会有result
    print(f"成功获取结果: {result}")


@dramatiq.actor
def failure_handle(message: dict, exception_info: dict):
    exception_type = exception_info["type"]
    exception_msg = exception_info["message"]


@dramatiq.actor(queue_name="pipe")
def test_pipe(*args, **kwargs):  # 如果是作为Pipelines的接受actor任务,函数参数不能拆解成test_pipe(content),会报错,看源码就知道为啥了。
    print(f"---{args} {kwargs}---")

推送程序:

dramatiq_data = {
    "text": "Assuming you want to enqueue a message on a queue named default, publish a persistent message to that queue in RabbitMQ."
}

def push_dramatiq():
	def get_msg():
        message = {
            "queue_name": "pipe",
            "actor_name": "test_pipe",
            "args": [],
            "kwargs": {
                "content": "test pipe"
            },
            "options": {"redis_message_id": str(uuid.uuid4())},
            "message_id": str(uuid.uuid4()),  # UUID4
            "message_timestamp": int(time.time() * 1000)
        }
        return message

    queue_name = "test"
    redis_message_id = str(uuid.uuid4())
    message = {
        "queue_name": queue_name,
        "actor_name": "count_words",
        "args": [],  # 使用位置参数
        "kwargs": dramatiq_data,  # 使用命名参数
        "options": {   # 供中间件使用
            "redis_message_id": redis_message_id,  # 使用redis要加上redis_message_id
            "pipe_target": get_msg()
        },
        "message_id": str(uuid.uuid4()),  # UUID4
        "message_timestamp": int(time.time() * 1000)
    }

    message_str = json.dumps(message, ensure_ascii=False).encode("utf-8")
	
	# 顺序不能错。dramatiq就是namespace
    redis_client.hset(f"dramatiq:{queue_name}.msgs", redis_message_id, message_str)
    redis_client.rpush(f"dramatiq:{queue_name}", redis_message_id)

默认的queue_name=‘default’,通常情况下队列名称用默认的就可以了,即使有多个任务,指定不同的actor_name即可。

  • 一旦消息达到过期时间或者达到重试次数,消息将被移到另一个死信队列,并保存7天。如有需要可以查询死信队列的数据并判断是否重新塞到任务队列。
  • 重试时间间隔以指数级进行增长

运行

dramatiq core -p 1 -t 3
-p 开启的进程数,默认是系统CPU核数
-t 开启的线程数,注意redis的最大连接数要大于等于此设定值

如果你的任务大部分是IO操作,最好使用gevent来提高性能:
dramatiq-gevent core -p 1 -t 3

中间件

dramatiq默认开启了这些中间件,中间件的参数可在actor的可选参数options中以命名参数传入。以下是针对默认开启的中间件,可在actor的可选参数options中使用的部分命名参数:

actor可选参数默认值隶属中间件说明
max_retries20Retries重试最大次数
min_backoff15sRetries重试最短时间间隔,必须大于100ms
max_backoff7 daysRetries重试最长时间间隔,不能大于7天
retry_whenNoneRetries重试自定义函数
throwsNoneRetries重试排除的异常或异常元组,即这些指定的异常抛出时将不会重试
max_ageNoneAgeLimit消息的过期时间
time_limit10minTimeLimit消息处理超时时间:程序处理时间超过设置值将抛出TimeLimitExceeded异常
notify_shutdownFalseShutdownNotifications任务在接收到终端信号时是否进行中断处理
pipe_targetNonePipelines目前版本1.9.0来看,这个作为actor参数没有任何作用,放在message.options字典中才能用,是一个初始化message类的字典,可用于将一个actor的输出作为另一个actor的输入。
pipe_ignoreTruePipelines是否将上一个actor任务的结果作为下一个actor任务的args参数
on_successNoneCallbacks消息处理成功后,将消息和结果作为args参数塞回队列,交由on_success指定的actor任务去处理
on_failureNoneCallbacks消息处理失败后,将消息和异常信息作为args参数塞回队列,交由on_failure指定的actor任务去处理

默认未开启的中间件:

  • Results 处理结果的
  • Rate Limiters 跨服务器多进程速率限制
  • Limiters 速率限制
  • Barriers 一个分布式的barrier
 类似资料: