当前位置: 首页 > 知识库问答 >
问题:

节点RabbitMQ使用消息并对每条消息执行操作

东门越
2023-03-14

我想使用来自rabbitmq服务的消息,对于收到的每条消息,我想对每条消息做一些事情(例如:将该消息放入数据库,处理该消息并通过另一个队列通过rabbitmq发送回复)。

目前我的RabbitMq消费者代码如下:

const all = require('bluebird').all;
const basename = require('path').basename;


function receive() {
    const severities = process.argv.slice(2);
    if (severities.length < 1) {
        console.warn('Usage: %s [info] [warning] [error]',
            basename(process.argv[1]));
        process.exit(1);
    }
    let config = {
        protocol: 'amqp',
        hostname: 'localhost',
        port: 5672,
        username: 'rumesh',
        password: 'password',
        locale: 'en_US',
        frameMax: 0,
        heartbeat: 0,
        vhost: '/',
    };
    amqp.connect(config).then(function (conn) {
        process.once('SIGINT', function () {
            conn.close();
        });
        return conn.createChannel().then(function (ch) {
            let queue = 'test';
            let exchange = 'test-exchange';
            let key = 'python-key';
            let exchange_type = 'direct';

            let ok = ch.assertExchange(exchange, exchange_type, {durable: true});

            ok = ok.then(function () {
                return ch.assertQueue(queue, { durable: true});
            });

            ok = ok.then(function (qok) {
                const queue = qok.queue;
                return all(severities.map(function (sev) {
                    ch.bindQueue(queue, exchange, sev,{durable: true});
                })).then(function () {
                    return queue;
                });
            });

            ok = ok.then(function (queue) {
                return ch.consume(queue, logMessage, {noAck: true});
            });
            return ok.then(function () {
                console.log(' [*] Waiting for logs. To exit press CTRL+C.');
            });

            function logMessage(msg) {
                console.log(" [x] %s:'%s'",
                    msg.fields.routingKey,
                    msg.content.toString());
            }
        });
    }).catch(console.warn);
}


module.exports = receive;```

共有1个答案

宋涵衍
2023-03-14

我建议您创建一个类似onNewMessage的处理函数,每当您在队列中收到新消息时,它都会被调用。

您可以以多种方式编码消息,因为您可以通过AMQP发送二进制数据。

JSON绝对是发送消息的一种方式,这在节点中处理起来非常方便。js。

下面是一些连接到服务器,然后发送和接收消息的示例代码

const amqp = require('amqplib');

const queue = 'test';

// Set your config here...
let config = {
    protocol: 'amqp',
    hostname: 'localhost',
    port: 5672,
    username: 'rumesh',
    password: 'password',
    locale: 'en_US',
    frameMax: 0,
    heartbeat: 0,
    vhost: '/',
};


async function start() {
    try {
        const conn = await createConnection(config);
        console.log("Connected to AMQP server.");
        let channel = await conn.createChannel();
        await channel.assertQueue(queue, { durable: true});

        startPollingForMessages(channel);
        startSendingMessages(channel);
    } catch (err) {
        console.error("start: Connection error:",err.message);
    }
}

async function createConnection(config) {
    const conn = await amqp.connect(config);

    conn.on("error", function(err) {
        console.error("Connection error:",err.message);
    });

    conn.on("close", function() {
        console.error("Connection closed:", err.message);
    });

    return conn;
}

function startSendingMessages(channel) {
    const SEND_INTERVAL = 5000;
    setInterval(() => { 
        sendMessage(channel, queue, JSON.stringify({ timestamp: new Date().toISOString(), message: " Some message" })); 
    }, SEND_INTERVAL);
}

async function sendMessage(channel, queue, messageContent) {
    console.log(`sendMessage: sending message: ${messageContent}...`);
    return channel.sendToQueue(queue, Buffer.from(messageContent))
}

function startPollingForMessages(ch) {
    ch.consume(queue, (msg) => {
        onNewMessage(msg);
        ch.ack(msg);
    });
}

function onNewMessage(msg) {
    // Do your database stuff or whatever here....
    console.log("On new message:", msg.content.toString())
}

start();
 类似资料:
  • 在队列选项卡的rabbitMQ web界面上,我看到了“概述”面板,我在其中找到了以下内容: 排队消息: 准备好了 未确认 总数 我猜“总数”是多少。但什么是“准备就绪”和“未确认”?“准备好了”——传递给消费者的信息?“未确认”-? 消息费率: 发表 交付 重新交付 承认 这些信息是什么?尤其是“重新交付”和“确认”?这是什么意思?

  • 我对RabbitMQ很陌生,所以如果我的问题听起来很琐碎,请原谅。我想在RabbitMQ上发布消息,它将由RabbitMQ消费者处理。 我的消费者机器是一个多核机器(最好是azure上的工作者角色)。但QueueBasicConsumer一次推送一条消息。我如何编程来利用我可以同时处理多个消息的所有核心。 一种解决方案是在多个线程中打开多个通道,然后在那里处理消息。但在这种情况下,我将如何决定线程

  • 保存/记录在AWS SNS主题上发布的每条消息的最简单方法是什么?我想可能有一个神奇的设置可以自动将它们推送到S3或数据库,或者可能是一个自动支持HTTP目标的数据库服务,但似乎并非如此。也许需要通过Lambda函数来完成? 目的只是为了在设置一些SNS发布时进行基本的诊断和调试。我并不真正关心大规模或快速查询,只想一次记录和执行几分钟对所有活动的基本查询。

  • 我有一个场景,我想“拉”RabbitMQ队列/主题的消息,并一次处理一个。特别是当消费者启动时,队列中已经有消息。我尝试了以下方法,但没有成功(这意味着,这些选项中的每一个都会读取队列,直到队列为空,或者直到另一个线程关闭上下文)。 1.第一次处理后立即停止路由 与1类似,但使用闩锁而不是while loop和sleep。 使用轮询消费者 使用ConsumerTemplate()-类似于上面的代码

  • 本文向大家介绍Python如何使用RabbitMQ的消息,包括了Python如何使用RabbitMQ的消息的使用技巧和注意事项,需要的朋友参考一下 示例 从导入库开始。 使用消息时,我们首先需要定义一个函数来处理传入的消息。这可以是任何可调用的函数,并且必须采用一个消息对象或一个消息元组(取决于中to_tuple定义的参数start_consuming)。 除了处理传入消息中的数据外,我们还必须确

  • null 谁能给我一个向RabbitMQ发送消息的标准程序的例子。我正在使用Spring Boot,也可以使用它的特性。