简单队列

优质
小牛编辑
140浏览
2023-12-01

MQ 简单队列实战

  • [ ] 模型: >[danger] P 是我们的生产者 > 中间的框是一个队列,代表消费者保留的消息缓冲区。 > C 是我们的消费者

代码演示:

'use strict';
const Controller = require('egg').Controller;
/**
 * 一对一队列演示
 */

// 频道名称
const queueName = 'hasone'

class UserController extends Controller {

  // 生成者
  async send() {
    // 1. 获取要发送的消息
    const { msg } = this.ctx.query
    // 2. 创建频道
    const ch = await this.app.amqplib.createChannel();
    // 3. 创建队列 durable 关闭持久化存储
    await ch.assertQueue(queueName, { durable: false }  );
    // 4. 发送消息
    const ok = await ch.sendToQueue(queueName, Buffer.from(msg));
    // 5. 关闭连接
    await ch.close();

    this.ctx.body = ok;
    this.ctx.status = 200;
  }

  // 消费者
  async work() {
    // 1. 创建频道
    const ch = await this.app.amqplib.createChannel();
    // 2. 选择队列
    await ch.assertQueue(queueName, { durable: false });
    //3. 接收队列的消息
    const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => resolve(msg), { noAck: true }));

    // 4. 显示消息内容
    if (resultMsg !== null) {
      ch.ack(resultMsg);
      await ch.close();

      const { content } = resultMsg;
      this.status = 200;
      this.ctx.body = { msg: content.toString() }

    } else {
      this.ctx.body = '队列消费失败'
      this.ctx.status = 500;
    }
  }
}

module.exports = UserController;