当前位置: 首页 > 工具软件 > let-us-koa > 使用案例 >

nodejs koa2整合rabbitmq

萧业
2023-12-01

上篇Node.js学习笔记

rabbitmq(本地服务自行安装)

1.中间件


npm install amqplib --save

2.配置及封装

新建配置文件 /config/rabbitmqConfig.js
配置内容根据自行修改

const rabbitmq = require('amqplib')

const config = {
	// protocol: 'amqp',//协议 默认值amqp
	hostname: 'localhost', //默认值为localhost
	port: 5672, //默认值为5672
	username: 'guest', //默认值为guest
	password: 'guest', //默认值为guest
	// locale: 'en_US',//默认值en_US
	// channelMax: 0,//设置每个连接的最大允许通道数量 默认值0 设置为0表示“没有限制”
	// frameMax: 0x1000,//amqp协议最大允许的字节数 默认值 0x1000 
	// heartbeat: 0,//心跳检测时间间隔 单位秒 默认值0 设置为0表示不使用该功能
	// vhost: '/',//默认值为/
}
//队列配置
const options = {
	durable: false//关闭持久化  默认为开启
}
//生产者
let send = function(queueName, msg) {
	return new Promise((resolve, reject) => {
		rabbitmq.connect(config)
			.then((conn) => {
				return conn.createChannel()
			})
			.then((channel) => {
				return channel.assertQueue(queueName, options)
					.then(function(ok) {
						return channel.sendToQueue(queueName, new Buffer(msg), {
							persistent: true
						});
					})
					.then(function(data) {
						if (data) {
							resolve('success')
							channel.close();
						} else{
							reject('fail')
						}
					})
					.catch(function() {
						setTimeout(() => {
							if (channel) {
								channel.close();
							}
						}, 500)
					});
			})
			.catch(function(e) {
				console.log('mq连接失败' + e)
			});
	})
}
//消费者
let customer = function(queueName, receiveCallBack) {
	rabbitmq.connect(config)
		.then((conn) => {
			return conn.createChannel()
		})
		.then((channel) => {
			return channel.assertQueue(queueName, options)
				.then(function(ok) {
					return channel.consume(queueName, function(msg) {
						if (msg !== null) {
							let data = msg.content.toString();
							channel.ack(msg);
							receiveCallBack(data);
						}
					})
				})
		})
		.catch(function() {
			console.log('mq连接失败')
		});
}
//注册消费者
customer('myqueue', (data) => {
	console.log('消费者接收消息:' + data)
})
module.exports = {
	send,
	customer
}

3.接口

const mq = require('../config/rabbitmqConfig')

router.get('/send', async (ctx, next) => {
  await mq.send('myqueue',ctx.query.text).then((res) => {
	  console.log('生产者发送结果:' + res)
  })
})

接上篇 写在test.js内 完整test.js

const router = require('koa-router')()
const redis = require('../config/redisConfig')
const mq = require('../config/rabbitmqConfig')

router.prefix('/test')

router.get('/set', async (ctx, next) => {
  await redis.set('key','123123').then( (res) => {
	  ctx.body = '保存成功'
  })
})
  

router.get('/get', async (ctx, next) => {
  await redis.get('key').then((res) => {
	  console.log(res)
	  ctx.body = res
  })
})

router.get('/send', async (ctx, next) => {
  await mq.send('myqueue',ctx.query.text).then((res) => {
	  console.log('生产者发送结果:' + res)
  })
})

module.exports = router

4.测试

浏览器访问 http://localhost:3000/test/send?text=abc12
生产者发送结果:success
消费者接收消息:abc12

 类似资料: