简单队列
优质
小牛编辑
140浏览
2023-12-01
MQ 简单队列实战
- [ ] 模型: >[danger] P 是我们的生产者 > 中间的框是一个队列,代表消费者保留的消息缓冲区。 > C 是我们的消费者
代码演示:
'use strict';
const Controller = require('egg').Controller;
/**
* 一对一队列演示
*/
// 频道名称
const queueName = 'hasone'
class UserController extends Controller {
// 生成者
async send() {
// 1. 获取要发送的消息
const { msg } = this.ctx.query
// 2. 创建频道
const ch = await this.app.amqplib.createChannel();
// 3. 创建队列 durable 关闭持久化存储
await ch.assertQueue(queueName, { durable: false } );
// 4. 发送消息
const ok = await ch.sendToQueue(queueName, Buffer.from(msg));
// 5. 关闭连接
await ch.close();
this.ctx.body = ok;
this.ctx.status = 200;
}
// 消费者
async work() {
// 1. 创建频道
const ch = await this.app.amqplib.createChannel();
// 2. 选择队列
await ch.assertQueue(queueName, { durable: false });
//3. 接收队列的消息
const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => resolve(msg), { noAck: true }));
// 4. 显示消息内容
if (resultMsg !== null) {
ch.ack(resultMsg);
await ch.close();
const { content } = resultMsg;
this.status = 200;
this.ctx.body = { msg: content.toString() }
} else {
this.ctx.body = '队列消费失败'
this.ctx.status = 500;
}
}
}
module.exports = UserController;