当前位置: 首页 > 知识库问答 >
问题:

Node-amqp和socket.io奇怪的行为

狄宾实
2023-03-14

我有一些问题要执行。

我需要的:

  • 发布来自用户的消息
  • 将其广播给其他用户
  • 将消息发送给将在下次连接时使用该消息的脱机用户

我实际上:

(function () {

    var amqp = require('amqp');

    var connection = amqp.createConnection({ host: 'http://127.0.0.1:5672/' });
    var app = require('express')();
    var server = require('http').Server(app);
    var io = require('socket.io')(server);

    app.get('/', function (req, res) {
        res.sendfile(__dirname + '/index.html');
    });

    server.listen(8888);

// Wait for connection to become established.


    connection.on('ready', function () {

        var sendMessage = function (queue, msg) {
            connection.publish(queue, JSON.stringify(msg));
        }


        io.sockets.on('connection', function (socket) {

            socket.on('message', function (msg) {
                sendMessage('my-queue', msg);
            });

            connection.queue('my-queue', {autoDelete: false}, function (q) {
                q.bind('#');

                q.subscribe(function (message) {
                    socket.broadcast.emit('news',message);
                });
            });

        });
    });
})()
  • 在index.html页上,我连接到套接字服务器
  • 我有一个发送消息的按钮
  • 我在索引页上打开两个不同的浏览器,我的用户都已连接起来
  • 如果我向服务器发送消息,服务器就会将消息发送给其他用户
  • 如果我向服务器发送第二条消息,它就会将该消息发送给发送该消息的用户。

它在切换,每对消息(因为我有两个用户),其他用户得到消息,如果是受损消息,当前发送消息的用户接收消息。这是什么行为?

编辑:我制定了一个解决方案,每个消费者都可以通过以下方式获得信息:

(function () {

    var amqp = require('amqp');

    var connection = amqp.createConnection({ host: 'http://127.0.0.1:5672/' });
    var app = require('express')();
    var server = require('http').Server(app);
    var io = require('socket.io')(server);

    app.get('/', function (req, res) {
        res.sendfile(__dirname + '/index.html');
    });

    server.listen(8888);

// Wait for connection to become established.


    connection.on('ready', function () {


        connection.exchange('logs', {type: 'fanout', autoDelete: false}, function (exchange) {

            var sendMessage = function (queue, msg) {
                exchange.publish(queue, JSON.stringify(msg));
            }

            io.sockets.on('connection', function (socket) {

                socket.on('message', function (msg) {
                    sendMessage('', msg);
                });

                connection.queue(socket.id, {exclusive: true}, function (q) {
                    q.bind('logs', '');

                    q.subscribe(function (message) {
                        socket.emit('news', message);
                    });
                });

            });
        });
    });
})()

我的最后一个问题是我现在无法管理脱机消息...有什么解决办法吗?(明天赏金结束:-/)

共有1个答案

乌靖
2023-03-14

问题是RabbitMQ会故意将每个消息发送给单个用户。该用户确认它收到了消息(amqp自动为您执行此操作),然后就RabbitMQ而言完成了工作,因此它删除了消息。

用户轮流接收消息的原因是RabbitMQ试图将传入消息的负载均匀地分散到用户身上。

你的问题在这里之前已经回答过了。查看它来解决您的问题!

 类似资料:
  • 我有以下代码来解析一个JSON文件: 要处理以下JSON文件: 如果我执行此代码,我将收到以下错误: 所以我开始一步一步地调试应用程序,看看part processing()中的哪个代码部分抛出了这个异常。令人惊讶的是,那里的所有代码都正常执行:没有抛出异常,也没有返回结果I except。 更让我惊讶的是,当我稍微改变第一种方法的代码时,它可以在不产生异常的情况下工作。 我不知道println方

  • 运行Spring Boot应用程序时,我遇到了一些奇怪的问题。它已配置为使用Log4J2作为其记录器(Logback记录器已被禁用)。 log4j2。xml: 主要问题 我在log4j2中有一些变量替换。xml文件,以允许变化,特别是日志文件名。当我运行应用程序时,日志文件被创建在正确的目录中,并且看起来确实有实际的日志内容。问题是它们的名字不正确。例如,而不是,文件名为${sys:service

  • 问题内容: 我在GregorianCalendar类中遇到一个奇怪的行为,我想知道我是否真的做得不好。 仅当初始化日期的月份的实际Maximum大于我将日历设置为的月份时,才追加此值。 这是示例代码: 我知道问题是由于日历初始化日期是31天(可能是5月),与设置为2月(28天)的月份混淆了。修复很容易(只需在设置年和月之前将day_of_month设置为1),但是我想知道这确实是想要的行为。有什么

  • 问题内容: 我正在为一个问题而苦苦挣扎,我不明白为什么它不起作用。如何通过将变量传递并转换为? 为什么在顶部代码段中不起作用,但在行下方的底部代码段中起作用? 唯一的区别似乎是添加了一个额外的变量,该变量也被键入为? 问题答案: 该是一种原始类型,同时是一个普通的Java类。您不能在原始类型上调用方法。但是该方法在上可用,如javadoc中所示 有关这些原始类型的更多信息,请参见此处

  • 问题内容: 为什么的到哪里去了? 问题答案: 删除任何字符,并从字符串的开头和结尾。

  • 问题内容: 我认为这是一个正常程序,但这是我得到的输出: 有人可以向我解释一下吗? 问题答案: 这是有据可查的PHP行为,请参阅php.net的foreach页面上的警告。 警告 即使在 foreach 循环之后,仍保留 $ value的 引用和最后一个数组元素。建议通过unset()销毁它。 __ 编辑 尝试逐步了解此处实际发生的情况