当前位置: 首页 > 知识库问答 >
问题:

Google Pub/Sub订阅者在一段时间后没有收到消息

连时铭
2023-03-14

我有一个简单的python脚本,它使用Google pubsub来检测Google云存储中的新文件。该脚本只需将新消息添加到队列中,另一个线程将在队列中处理这些消息:

subscriber = pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project, subscription_name)

subscriber.subscribe(subscription_path, callback=callback_fun)

while True:
    if not message_queue:
        time.sleep(60)
        continue
    else:
        process_next_message(message_queue.pop())

这里,callback_fun简单地将消息添加到队列中:

def callback_fun(message):
    message.ack()
    message_queue.append(message)

我遇到的问题是,过了一段时间(可能几天),订阅服务器停止接收新文件通知。如果我停止并重新启动脚本,它会一次获得所有通知。

我想知道是否有其他人也有类似的问题和/或可以建议解决问题的方法(可能通过打印通常看不到的调试消息)。我现在正在尝试停止/重新启动订阅服务器,但我确信这不是在生产环境中使用的最佳方法。

我使用的是google-cloud0.32.0和google-cloud-pubsub0.30.1。

共有1个答案

赵骏奇
2023-03-14

通常,订户可能停止接收消息的原因有以下几个:

  1. 如果订户没有ack或nack消息,则可以达到流控制限制,这意味着不能传递更多的消息。在您的特定实例中,如果您立即ack消息,情况似乎不是这样。顺便说一句,我建议不要在队列处理消息之前对它们进行攻击,除非您对消息不被处理的可能性感到满意。否则,如果您的应用程序在ack之后崩溃,但在消息队列处理它们之前,您将没有处理消息,并且在它被ack之后也不会重新传递它。
  2. 如果另一个订阅服务器为同一订阅启动,则它可能正在接收消息。在这种情况下,我们希望订阅服务器接收到消息的子集,而不是完全没有消息。
  3. 发布服务器只是停止发布消息,因此没有消息可接收。如果重新启动订阅服务器,它又开始接收消息,那么情况可能不是这样。您还可以通过查看subscription/backlog_bytes的Stackdriver度量来验证backlog是否正在构建

如果您的问题不属于这些类别之一,最好使用您的项目名称、主题名称和订阅名称联系Google Cloud支持部门,这样他们就可以将问题缩小到您的用户代码、客户机库或服务。

 类似资料:
  • 对调试的任何帮助都将非常感谢。 这是我的堆栈: node.js socket.io express.js passport.js MongoDb react.js 流程: Anna在聊天中发送一条消息(这条消息写入数据库并发布到PubSubtopic“messages”) Node.js express应用程序运行订阅,然后根据消息内容发送给其他应该接收消息的人。 在本例中,与安娜在同一频道的鲍勃

  • 我们有一个Storm拓扑,其中我们配置了一个喷口和两个螺栓。Spout连续地从DB查询数据,并将其发送到first bolt进行某些处理。第一个bolt进行一些处理并将元组发送给第二个bolt,第二个bolt调用第三方web服务并发送数据。所以,经过一段时间后,最后一个bolt没有得到任何元组,如果我们重新启动拓扑,它工作得很好。这里只有最后一个螺栓有问题。其他喷口和第一螺栓运行良好,我不使用顶进

  • 为了使用Kafka通用地发布消息,我使用类名作为主题: 服务器属性(我从默认属性中唯一更改的内容): 注意:我还尝试了以下用户设置:

  • 我正在使用Cloud pub/sub构建一个作业队列,我希望按照pub/sub服务接收消息的顺序接收消息。我创建了一个主题和一个启用消息排序的订阅。我正在使用包用Python开发我的系统。正如本文档中所建议的,我必须发布带有排序键的消息。 如果邮件具有相同的排序键,并且您将邮件发布到相同的区域,则订阅服务器可以按顺序接收邮件。 在订阅方,我需要批量处理消息,所以我使用参数进行控制。但是,当我启用m

  • 我的应用程序成功发送Kafka消息,但只有在Kafka初始化后。在此之前,我得到错误“Dispatcher没有订阅者”。如何等待订阅者完成频道注册? 17.165 SenderClass已创建 17.816初始化类,@PostConstruct启动PollingTask 24.781PollingTask发送第一条Kafka消息 24.816第一个错误:“Dispatcher没有订阅服务器” 25

  • 我通读了RxJS文档,并希望确保我理解了< code > subscriber . unsubscribe()和< code > subscriber . complete()之间的区别。 假设我有一个有两个订阅者的可观察对象,订阅者1和订阅者2。如果订阅者1对其订阅调用取消订阅,它将不再接收来自可观察对象的通知,但订阅者2将继续接收它们。 <代码>的文档。complete(): 观察者回调,用于