工作队列

优质
小牛编辑
137浏览
2023-12-01

rabbitMQ 工作队列 轮询分发

>[danger] 简单队列是一对一的关系,一个生成者对应一个消费者,实际开发中,一般消费者是以业务相结合的,需要时间去处理业务,如果只有一个消费者,那么生产者就会积压很多消息,消费不出去


代码演示:

'use strict';
const Controller = require('egg').Controller;
/**
 * 队列一对多演示
 * 生产者 ---->  队列 ----> 消费者
*                    ----> 消费者
                     ----> 消费者
 */

// 频道名称
const queueName = 'hasMany'

class UserController extends Controller {

  // 生成者
  async send() {
    const { msg } = this.ctx.query;
    //1. 创建频道
    const ch = await this.app.amqplib.createChannel();
    // 2. 创建队列 开启持久化存储
    await ch.assertQueue(queueName, { durable: true });
    // 3. 发送消息
    let ok = null;
    for(let i=0; i<50; i++) {
      // 此时我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将消息标记为持久性 - 通过使用持久性选项Channel.sendToQueue。
      ok = await ch.sendToQueue(queueName, Buffer.from(msg+i), { persistent: true });
    }
    //4. 关闭连接
    await ch.close();

    this.ctx.body = ok;
    this.ctx.status = 200;
  }

  // 消费者
  async work1() {
   // 1. 创建频道
   const ch = await this.app.amqplib.createChannel();
   //2. 选择队列
   await ch.assertQueue(queueName, { durable: true });
   // 3. 接收消息 noAck 关闭消息自动确认模式,需要手动 ack
   const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {

    setTimeout(() => {
      resolve(msg)
     }, 500)

   }, { noAck: false }) );

   if (resultMsg !== null) {
    const { content } = resultMsg;
     //消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它
    ch.ack(resultMsg);
    await ch.close();

    this.ctx.body = { work1: content.toString() };
    this.ctx.status = 200;
   } else {
     this.ctx.body = '消费者1号失败'
     this.ctx.status = 500
   }

  }

  async work2() {
     // 1. 创建频道
   const ch = await this.app.amqplib.createChannel();
   //2. 选择队列 RabbitMQ永远不会丢失我们的队列。为此,我们需要声明它是持久的
   await ch.assertQueue(queueName, { durable: true });
   // 3. 接收消息 noAck 开启自动确认模式
   const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {
    setTimeout(() => {
      resolve(msg)
     }, 1000)

   }, { noAck: false }) );

   if (resultMsg !== null) {
    const { content } = resultMsg;
    ch.ack(resultMsg);
    await ch.close();

    this.ctx.body = { work2: content.toString() };
    this.ctx.status = 200;
   } else {
     this.ctx.body = '消费者2号失败'
     this.ctx.status = 500
   }
  }

  async work3() {
     // 1. 创建频道
   const ch = await this.app.amqplib.createChannel();
   //2. 选择队列
   await ch.assertQueue(queueName, { durable: true });
   // 3. 接收消息 noAck 开启自动确认模式
   const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {

    setTimeout(() => {
      resolve(msg)
     }, 1500)


   }, { noAck: false }) );


   if (resultMsg !== null) {
    const { content } = resultMsg;
    //消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它
    ch.ack(resultMsg);
    await ch.close();

    this.ctx.body = { work3: content.toString() };
    this.ctx.status = 200;
   } else {
     this.ctx.body = '消费者3号失败'
     this.ctx.status = 500
   }

  }
}

module.exports = UserController;