当前位置: 首页 > 知识库问答 >
问题:

node.js Google PubSub订阅者没有收到一些消息

叶茂才
2023-03-14

对调试的任何帮助都将非常感谢。

这是我的堆栈:

  • node.js
  • socket.io
  • express.js
  • passport.js
  • MongoDb
  • react.js

流程

  • Anna在聊天中发送一条消息(这条消息写入数据库并发布到PubSubtopic“messages”)
  • Node.js express应用程序运行订阅,然后根据消息内容发送给其他应该接收消息的人。
  • 在本例中,与安娜在同一频道的鲍勃将接收消息。
const pubSubClient = require('./client');

const errorHandler = function(error) {
    console.error(`ERROR: ${error}`);
    throw error;
};

module.exports = listenForMessages =(subscriptionName="messageReceiver",io) => {
    const subscription = pubSubClient.subscription(subscriptionName);

    // Listen for new messages until timeout is hit
    subscription.on("message", (message) => {
        console.log(`Received message ${message.id}:`);
        const parsedMessage = JSON.parse(message.data)
        parsedMessage._id = message.id
        console.log(parsedMessage)

        if (parsedMessage.to === "admin") {
            io.to(`admin:${parsedMessage.from}`).emit("NewMessage", parsedMessage);
        } else {
            io.to(`${parsedMessage.to}`).emit("NewMessage", parsedMessage);
        }
        message.ack();
    });

    subscription.on('error', errorHandler);
}
...
const listenForMessages = require("./message_processing/listen");
listenForMessages("messageReceiver", io);

下面的控制台输出是通过在本地运行应用程序生成的,使用两个浏览器(其中一个匿名)互相聊天。可以看到,监听器实际上只接收到最后一条消息(并打印出来)。有趣的是,由于调用的异步性质,接收到的消息的打印输出是在消息发送的日志之前完成的(也就是说,延迟在这里肯定不是一个问题)。

[0] went into deserialize user
[0] Message 875007020424601 published.
[0] went into deserialize user
[0] Message 875006704834317 published.
[0] went into deserialize user
[0] Message 875006583857400 published.
[0] went into deserialize user
[0] Message 875006520104287 published.
[0] went into deserialize user
[0] Message 875006699141463 published.
[0] went into deserialize user
[0] Received message 875006881073134:
[0] {
[0]   from: '5e949f73aeed81beefaf6daa',
[0]   to: 'admin',
[0]   content: 'i6',
[0]   seenByUser: true,
[0]   type: 'message',
[0]   createdByUser: true,
[0]   createdAt: '2020-04-20T07:44:54.280Z',
[0]   _id: '875006881073134'
[0] }
[0] Message 875006881073134 published.

在其他一些情况下,以前的消息工作,然后监听器似乎停止。

共有1个答案

商焕
2023-03-14
    null
 类似资料:
  • 我有一个简单的python脚本,它使用Google pubsub来检测Google云存储中的新文件。该脚本只需将新消息添加到队列中,另一个线程将在队列中处理这些消息: 这里,简单地将消息添加到队列中: 我遇到的问题是,过了一段时间(可能几天),订阅服务器停止接收新文件通知。如果我停止并重新启动脚本,它会一次获得所有通知。 我想知道是否有其他人也有类似的问题和/或可以建议解决问题的方法(可能通过打印

  • 为了使用Kafka通用地发布消息,我使用类名作为主题: 服务器属性(我从默认属性中唯一更改的内容): 注意:我还尝试了以下用户设置:

  • 我的应用程序成功发送Kafka消息,但只有在Kafka初始化后。在此之前,我得到错误“Dispatcher没有订阅者”。如何等待订阅者完成频道注册? 17.165 SenderClass已创建 17.816初始化类,@PostConstruct启动PollingTask 24.781PollingTask发送第一条Kafka消息 24.816第一个错误:“Dispatcher没有订阅服务器” 25

  • 我通读了RxJS文档,并希望确保我理解了< code > subscriber . unsubscribe()和< code > subscriber . complete()之间的区别。 假设我有一个有两个订阅者的可观察对象,订阅者1和订阅者2。如果订阅者1对其订阅调用取消订阅,它将不再接收来自可观察对象的通知,但订阅者2将继续接收它们。 <代码>的文档。complete(): 观察者回调,用于

  • 我在mac上运行Kafka和Flink作为docker容器。 我已经实现了Flink作业,它应该消耗来自Kafka主题的消息。我运行一个向主题发送消息的python生产者。 工作开始时没有问题,但没有收到任何消息。我相信这些消息被发送到了正确的主题,因为我有一个能够使用消息的python消费者。 flink作业(java): Flink作业日志: 生产者作业(python):(在主机上运行-不是d

  • 拥有发布者和N个消费者,如果消费者使用,那么他们将错过订阅主题之前发布到主题的所有消息...众所周知,使用的消费者不会重播订阅主题之前存在的消息... 所以我需要: null 我想使用者必须检查现有消息的主题,如果有消息就使用它们,然后启动使用。对我来说这是最好的方法...