类似于Celery,整体代码结构清新明朗,看看源码就知道咋回事了,性能也比celery要好。详情请移步官网。
pip install 'dramatiq[redis, watch]' gevent
watch是用于监控的;生产环境可以不用安装。
这里使用redis作为消息队列,其他的请看官网。
任务处理程序core.py
:
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.middleware import Prometheus, default_middleware, CurrentMessage
CONF_REDIS = {
'host': 'redis',
'port': 6379,
'db': 5,
'password': 'yuqing_sandbox:iwP5Zh1J75E5yj',
'decode_responses': False,
'encoding': 'utf-8',
'max_connections': 10, # 注意:需要大于开启的线程数量-t
}
default_middleware.remove(Prometheus) # 移除了默认的统计中间件
default_middleware.append(CurrentMessage) # 加入了可在actor中获取当前消息的中间件
cus_middleware = [m() for m in default_middleware]
redis_broker = RedisBroker(**CONF_REDIS, middleware=cus_middleware)
dramatiq.set_broker(redis_broker)
@dramatiq.actor(
queue_name="test",
on_success="success_handle", on_failure="failure_handle",
pipe_ignore=False,
)
def count_words(text: str):
print(CurrentMessage().get_current_message())
count = len(text.split(" "))
print(f"There are {count} words at <{text}>.")
return count
@dramatiq.actor(queue_name="other")
def success_handle(message: dict, result):
# message: {'queue_name': 'test', 'actor_name': 'count_words', 'args': [], 'kwargs': {'text': 'Assuming you want to
# enqueue a message on a queue named default, publish a persistent message to that queue in RabbitMQ.'}, 'options': {'redis_message_id': 'b466c984-89e5-40a7-a3f9-2cfa76219f32'}, 'message_id': '76635de0-b582-45fd-ba74-ae464b5003cd', 'message_timestamp': 1617513422067}
print(f"成功处理消息:{message}")
# 开启了结果存储中间件才会有result
print(f"成功获取结果: {result}")
@dramatiq.actor
def failure_handle(message: dict, exception_info: dict):
exception_type = exception_info["type"]
exception_msg = exception_info["message"]
@dramatiq.actor(queue_name="pipe")
def test_pipe(*args, **kwargs): # 如果是作为Pipelines的接受actor任务,函数参数不能拆解成test_pipe(content),会报错,看源码就知道为啥了。
print(f"---{args} {kwargs}---")
推送程序:
dramatiq_data = {
"text": "Assuming you want to enqueue a message on a queue named default, publish a persistent message to that queue in RabbitMQ."
}
def push_dramatiq():
def get_msg():
message = {
"queue_name": "pipe",
"actor_name": "test_pipe",
"args": [],
"kwargs": {
"content": "test pipe"
},
"options": {"redis_message_id": str(uuid.uuid4())},
"message_id": str(uuid.uuid4()), # UUID4
"message_timestamp": int(time.time() * 1000)
}
return message
queue_name = "test"
redis_message_id = str(uuid.uuid4())
message = {
"queue_name": queue_name,
"actor_name": "count_words",
"args": [], # 使用位置参数
"kwargs": dramatiq_data, # 使用命名参数
"options": { # 供中间件使用
"redis_message_id": redis_message_id, # 使用redis要加上redis_message_id
"pipe_target": get_msg()
},
"message_id": str(uuid.uuid4()), # UUID4
"message_timestamp": int(time.time() * 1000)
}
message_str = json.dumps(message, ensure_ascii=False).encode("utf-8")
# 顺序不能错。dramatiq就是namespace
redis_client.hset(f"dramatiq:{queue_name}.msgs", redis_message_id, message_str)
redis_client.rpush(f"dramatiq:{queue_name}", redis_message_id)
默认的queue_name=‘default’,通常情况下队列名称用默认的就可以了,即使有多个任务,指定不同的actor_name即可。
dramatiq core -p 1 -t 3
-p 开启的进程数,默认是系统CPU核数
-t 开启的线程数,注意redis的最大连接数要大于等于此设定值
如果你的任务大部分是IO操作,最好使用gevent来提高性能:
dramatiq-gevent core -p 1 -t 3
dramatiq默认开启了这些中间件,中间件的参数可在actor的可选参数options中以命名参数传入。以下是针对默认开启的中间件,可在actor的可选参数options中使用的部分命名参数:
actor可选参数 | 默认值 | 隶属中间件 | 说明 |
---|---|---|---|
max_retries | 20 | Retries | 重试最大次数 |
min_backoff | 15s | Retries | 重试最短时间间隔,必须大于100ms |
max_backoff | 7 days | Retries | 重试最长时间间隔,不能大于7天 |
retry_when | None | Retries | 重试自定义函数 |
throws | None | Retries | 重试排除的异常或异常元组,即这些指定的异常抛出时将不会重试 |
max_age | None | AgeLimit | 消息的过期时间 |
time_limit | 10min | TimeLimit | 消息处理超时时间:程序处理时间超过设置值将抛出TimeLimitExceeded 异常 |
notify_shutdown | False | ShutdownNotifications | 任务在接收到终端信号时是否进行中断处理 |
pipe_target | None | Pipelines | 目前版本1.9.0来看,这个作为actor参数没有任何作用,放在message.options字典中才能用,是一个初始化message类的字典,可用于将一个actor的输出作为另一个actor的输入。 |
pipe_ignore | True | Pipelines | 是否将上一个actor任务的结果作为下一个actor任务的args参数 |
on_success | None | Callbacks | 消息处理成功后,将消息和结果作为args参数塞回队列,交由on_success指定的actor任务去处理 |
on_failure | None | Callbacks | 消息处理失败后,将消息和异常信息作为args参数塞回队列,交由on_failure指定的actor任务去处理 |
默认未开启的中间件: