我有一个简单的python脚本,它使用Google pubsub来检测Google云存储中的新文件。该脚本只需将新消息添加到队列中,另一个线程将在队列中处理这些消息:
subscriber = pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
subscriber.subscribe(subscription_path, callback=callback_fun)
while True:
if not message_queue:
time.sleep(60)
continue
else:
process_next_message(message_queue.pop())
这里,callback_fun
简单地将消息添加到队列中:
def callback_fun(message):
message.ack()
message_queue.append(message)
我遇到的问题是,过了一段时间(可能几天),订阅服务器停止接收新文件通知。如果我停止并重新启动脚本,它会一次获得所有通知。
我想知道是否有其他人也有类似的问题和/或可以建议解决问题的方法(可能通过打印通常看不到的调试消息)。我现在正在尝试停止/重新启动订阅服务器,但我确信这不是在生产环境中使用的最佳方法。
我使用的是google-cloud0.32.0和google-cloud-pubsub0.30.1。
通常,订户可能停止接收消息的原因有以下几个:
如果您的问题不属于这些类别之一,最好使用您的项目名称、主题名称和订阅名称联系Google Cloud支持部门,这样他们就可以将问题缩小到您的用户代码、客户机库或服务。
对调试的任何帮助都将非常感谢。 这是我的堆栈: node.js socket.io express.js passport.js MongoDb react.js 流程: Anna在聊天中发送一条消息(这条消息写入数据库并发布到PubSubtopic“messages”) Node.js express应用程序运行订阅,然后根据消息内容发送给其他应该接收消息的人。 在本例中,与安娜在同一频道的鲍勃
我们有一个Storm拓扑,其中我们配置了一个喷口和两个螺栓。Spout连续地从DB查询数据,并将其发送到first bolt进行某些处理。第一个bolt进行一些处理并将元组发送给第二个bolt,第二个bolt调用第三方web服务并发送数据。所以,经过一段时间后,最后一个bolt没有得到任何元组,如果我们重新启动拓扑,它工作得很好。这里只有最后一个螺栓有问题。其他喷口和第一螺栓运行良好,我不使用顶进
为了使用Kafka通用地发布消息,我使用类名作为主题: 服务器属性(我从默认属性中唯一更改的内容): 注意:我还尝试了以下用户设置:
我正在使用Cloud pub/sub构建一个作业队列,我希望按照pub/sub服务接收消息的顺序接收消息。我创建了一个主题和一个启用消息排序的订阅。我正在使用包用Python开发我的系统。正如本文档中所建议的,我必须发布带有排序键的消息。 如果邮件具有相同的排序键,并且您将邮件发布到相同的区域,则订阅服务器可以按顺序接收邮件。 在订阅方,我需要批量处理消息,所以我使用参数进行控制。但是,当我启用m
我的应用程序成功发送Kafka消息,但只有在Kafka初始化后。在此之前,我得到错误“Dispatcher没有订阅者”。如何等待订阅者完成频道注册? 17.165 SenderClass已创建 17.816初始化类,@PostConstruct启动PollingTask 24.781PollingTask发送第一条Kafka消息 24.816第一个错误:“Dispatcher没有订阅服务器” 25
我通读了RxJS文档,并希望确保我理解了< code > subscriber . unsubscribe()和< code > subscriber . complete()之间的区别。 假设我有一个有两个订阅者的可观察对象,订阅者1和订阅者2。如果订阅者1对其订阅调用取消订阅,它将不再接收来自可观察对象的通知,但订阅者2将继续接收它们。 <代码>的文档。complete(): 观察者回调,用于