npm install amqplib --save
新建配置文件 /config/rabbitmqConfig.js
配置内容根据自行修改
const rabbitmq = require('amqplib')
const config = {
// protocol: 'amqp',//协议 默认值amqp
hostname: 'localhost', //默认值为localhost
port: 5672, //默认值为5672
username: 'guest', //默认值为guest
password: 'guest', //默认值为guest
// locale: 'en_US',//默认值en_US
// channelMax: 0,//设置每个连接的最大允许通道数量 默认值0 设置为0表示“没有限制”
// frameMax: 0x1000,//amqp协议最大允许的字节数 默认值 0x1000
// heartbeat: 0,//心跳检测时间间隔 单位秒 默认值0 设置为0表示不使用该功能
// vhost: '/',//默认值为/
}
//队列配置
const options = {
durable: false//关闭持久化 默认为开启
}
//生产者
let send = function(queueName, msg) {
return new Promise((resolve, reject) => {
rabbitmq.connect(config)
.then((conn) => {
return conn.createChannel()
})
.then((channel) => {
return channel.assertQueue(queueName, options)
.then(function(ok) {
return channel.sendToQueue(queueName, new Buffer(msg), {
persistent: true
});
})
.then(function(data) {
if (data) {
resolve('success')
channel.close();
} else{
reject('fail')
}
})
.catch(function() {
setTimeout(() => {
if (channel) {
channel.close();
}
}, 500)
});
})
.catch(function(e) {
console.log('mq连接失败' + e)
});
})
}
//消费者
let customer = function(queueName, receiveCallBack) {
rabbitmq.connect(config)
.then((conn) => {
return conn.createChannel()
})
.then((channel) => {
return channel.assertQueue(queueName, options)
.then(function(ok) {
return channel.consume(queueName, function(msg) {
if (msg !== null) {
let data = msg.content.toString();
channel.ack(msg);
receiveCallBack(data);
}
})
})
})
.catch(function() {
console.log('mq连接失败')
});
}
//注册消费者
customer('myqueue', (data) => {
console.log('消费者接收消息:' + data)
})
module.exports = {
send,
customer
}
const mq = require('../config/rabbitmqConfig')
router.get('/send', async (ctx, next) => {
await mq.send('myqueue',ctx.query.text).then((res) => {
console.log('生产者发送结果:' + res)
})
})
接上篇 写在test.js内 完整test.js
const router = require('koa-router')()
const redis = require('../config/redisConfig')
const mq = require('../config/rabbitmqConfig')
router.prefix('/test')
router.get('/set', async (ctx, next) => {
await redis.set('key','123123').then( (res) => {
ctx.body = '保存成功'
})
})
router.get('/get', async (ctx, next) => {
await redis.get('key').then((res) => {
console.log(res)
ctx.body = res
})
})
router.get('/send', async (ctx, next) => {
await mq.send('myqueue',ctx.query.text).then((res) => {
console.log('生产者发送结果:' + res)
})
})
module.exports = router
浏览器访问 http://localhost:3000/test/send?text=abc12
生产者发送结果:success
消费者接收消息:abc12