当前位置: 首页 > 知识库问答 >
问题:

Cloud pub/sub订阅者max_messages与消息排序无关

燕意蕴
2023-03-14

我正在使用Cloud pub/sub构建一个作业队列,我希望按照pub/sub服务接收消息的顺序接收消息。我创建了一个主题和一个启用消息排序的订阅。我正在使用google-cloud-pubsub包用Python开发我的系统。正如本文档中所建议的,我必须发布带有排序键的消息。

如果邮件具有相同的排序键,并且您将邮件发布到相同的区域,则订阅服务器可以按顺序接收邮件。

在订阅方,我需要批量处理消息,所以我使用max_messages参数进行控制。但是,当我启用message ordering选项时,每次我都不能按预期提取max_messages消息,而只能从订阅中提取一条消息。奇怪的是,当我禁用消息排序时,它返回max_messages消息。

发布服务器代码:

...
topic_path = 'xxx'
ordering_key = '202011240000'
while True:
    job = {'job_id': 'xxxxxx', 'foo': 0, 'bar': 0}
    data = json.dumps(job, default=str).encode('utf-8')
    publisher.publish(topic_path, data=data, ordering_key=ordering_key)
    time.sleep(1)
...
subscription_path = 'xxx'
subscriber.pull(request={'subscription': subscription_path, 'max_messages': 300})
...

我做错了什么还是酒吧/潜艇是这样设计的?

共有1个答案

鲜于勇
2023-03-14

max_messages属性并不意味着保证服务器返回该数量的消息,即使这些消息是可用的。使用有序传递时,返回给单个请求的消息批次不太可能包含最大数量的消息,因为必须进行更多的协调以确保消息按顺序发送,尤其是在使用单个排序键的情况下。服务器尝试不要让等待更多消息发送的请求停留太长时间,因为否则端到端的延迟会变得更加困难。

对此有两种处理方法。第一个是切换到Cloud Pub/Sub客户机库,它使用流式拉取,因此在消息可用时能够更好地传递消息,因为有一个持久的连接可以传递消息。

第二个是确保你有很多拉的同时突出。请注意,这对单个排序键的情况没有帮助,因为一次只能有一个排序键的消息列表未完成。如果您有许多排序键,这可能会有所帮助。

有关传递语义的更多信息,请参见ordering keys Medium Post中的“按顺序接收消息”部分。

 类似资料:
  • 我通读了RxJS文档,并希望确保我理解了< code > subscriber . unsubscribe()和< code > subscriber . complete()之间的区别。 假设我有一个有两个订阅者的可观察对象,订阅者1和订阅者2。如果订阅者1对其订阅调用取消订阅,它将不再接收来自可观察对象的通知,但订阅者2将继续接收它们。 <代码>的文档。complete(): 观察者回调,用于

  • 微信文档:https://developers.weixin.qq.com/miniprogram/dev/api-backend/open-api/subscribe-message/subscribeMessage.addTemplate.html 组合模板并添加至帐号下的个人模板库 $tid = 563; // 模板标题 id,可通过接口获取,也可登录小程序后台查看获取 $kidLi

  • 开普勒消息目前分为三大类:公告,告警和通知。 通知中根据不同的操作事件类型,分为十几个事件。每个事件都跟项目操作相关。便于接收项目操作变更的通知。 分类 事件 公告 Alarm 告警 Proclaim 通知 Build,Apply,Audit,Delete,Rollback,Logging,Reboot,Command,Storage,Extend... 订阅界面: 用户中心,点击头像,下拉菜单→

  • Note 本文档翻译自: http://redis.io/topics/pubsub 。 SUBSCRIBE 、 UNSUBSCRIBE 和 PUBLISH 三个命令实现了发布与订阅信息泛型(Publish/Subscribe messaging paradigm), 在这个实现中, 发送者(发送信息的客户端)不是将信息直接发送给特定的接收者(接收信息的客户端), 而是将信息发送给频道(chann

  • 我有一个简单的python脚本,它使用Google pubsub来检测Google云存储中的新文件。该脚本只需将新消息添加到队列中,另一个线程将在队列中处理这些消息: 这里,简单地将消息添加到队列中: 我遇到的问题是,过了一段时间(可能几天),订阅服务器停止接收新文件通知。如果我停止并重新启动脚本,它会一次获得所有通知。 我想知道是否有其他人也有类似的问题和/或可以建议解决问题的方法(可能通过打印

  • 我对Akka演员和演员模型有点困惑。从参与者A发送到参与者B的消息是否保持顺序?如何在本地/网络环境中实现这一点?据我所知,网络引入了可变延迟,如果M1的延迟为1秒,M2为0.5秒,那么消息M1和消息M2如何保持顺序?