工作队列
优质
小牛编辑
137浏览
2023-12-01
rabbitMQ 工作队列 轮询分发
>[danger] 简单队列是一对一的关系,一个生成者对应一个消费者,实际开发中,一般消费者是以业务相结合的,需要时间去处理业务,如果只有一个消费者,那么生产者就会积压很多消息,消费不出去
代码演示:
'use strict';
const Controller = require('egg').Controller;
/**
* 队列一对多演示
* 生产者 ----> 队列 ----> 消费者
* ----> 消费者
----> 消费者
*/
// 频道名称
const queueName = 'hasMany'
class UserController extends Controller {
// 生成者
async send() {
const { msg } = this.ctx.query;
//1. 创建频道
const ch = await this.app.amqplib.createChannel();
// 2. 创建队列 开启持久化存储
await ch.assertQueue(queueName, { durable: true });
// 3. 发送消息
let ok = null;
for(let i=0; i<50; i++) {
// 此时我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将消息标记为持久性 - 通过使用持久性选项Channel.sendToQueue。
ok = await ch.sendToQueue(queueName, Buffer.from(msg+i), { persistent: true });
}
//4. 关闭连接
await ch.close();
this.ctx.body = ok;
this.ctx.status = 200;
}
// 消费者
async work1() {
// 1. 创建频道
const ch = await this.app.amqplib.createChannel();
//2. 选择队列
await ch.assertQueue(queueName, { durable: true });
// 3. 接收消息 noAck 关闭消息自动确认模式,需要手动 ack
const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {
setTimeout(() => {
resolve(msg)
}, 500)
}, { noAck: false }) );
if (resultMsg !== null) {
const { content } = resultMsg;
//消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它
ch.ack(resultMsg);
await ch.close();
this.ctx.body = { work1: content.toString() };
this.ctx.status = 200;
} else {
this.ctx.body = '消费者1号失败'
this.ctx.status = 500
}
}
async work2() {
// 1. 创建频道
const ch = await this.app.amqplib.createChannel();
//2. 选择队列 RabbitMQ永远不会丢失我们的队列。为此,我们需要声明它是持久的
await ch.assertQueue(queueName, { durable: true });
// 3. 接收消息 noAck 开启自动确认模式
const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {
setTimeout(() => {
resolve(msg)
}, 1000)
}, { noAck: false }) );
if (resultMsg !== null) {
const { content } = resultMsg;
ch.ack(resultMsg);
await ch.close();
this.ctx.body = { work2: content.toString() };
this.ctx.status = 200;
} else {
this.ctx.body = '消费者2号失败'
this.ctx.status = 500
}
}
async work3() {
// 1. 创建频道
const ch = await this.app.amqplib.createChannel();
//2. 选择队列
await ch.assertQueue(queueName, { durable: true });
// 3. 接收消息 noAck 开启自动确认模式
const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {
setTimeout(() => {
resolve(msg)
}, 1500)
}, { noAck: false }) );
if (resultMsg !== null) {
const { content } = resultMsg;
//消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它
ch.ack(resultMsg);
await ch.close();
this.ctx.body = { work3: content.toString() };
this.ctx.status = 200;
} else {
this.ctx.body = '消费者3号失败'
this.ctx.status = 500
}
}
}
module.exports = UserController;