如何防止重复味精在谷歌云PubSub中发生?
void messageReceiver(PubsubMessage pubsubMessage, AckReplyConsumer ackReply) {
submitHandler.handle(toMessage(pubsubMessage))
.doOnSuccess((response) -> {
log.info("Acknowledging the successfully processed message id: {}, response {}", pubsubMessage.getMessageId(), response);
ackReply.ack(); // <---- acknowledged
})
.doOnError((e) -> {
log.error("Not acknowledging due to an exception", e);
ackReply.nack();
})
.doOnTerminate(span::finish)
.subscribe();
}
Google Cloud Pub/Sub使用“至少一次”交付。从文档中:
通常,Cloud Pub/Sub将每个消息按发布顺序发送一次。但是,消息有时可能会不按顺序传递或不止一次传递。通常,在处理消息时,支持多次传递要求订阅服务器是幂等的。
这意味着它保证它将以1:N倍的速度传递消息,因此,如果您不通过其他先删除重复的东西将消息传递出去,您可能会多次获得消息。您不能定义一个设置来保证准确地交付一次。文档确实引用了您可以使用Cloud Dataflow的Pubsubio
获得您想要的行为,但该解决方案似乎不受欢迎:
如果是这种情况,只要在最后期限内确认你的信息,你就不会经常看到这些重复的信息了。
有人能帮我弄清楚这件事吗。 谢了!
我正在开发一个使用的软件。我有一个用户订阅了多个主题,我想知道是否有一个订单接收来自这些主题的消息。我在我的电脑上尝试了一些组合,但我需要确定这一点。例 null [编辑]我想指定这两个主题各有一个分区,并且只有一个生产者和一个消费者。我需要首先阅读来自第一个主题的所有消息,然后阅读来自另一个主题的消息
我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。
依赖使用阿尔帕卡Kafka3.0 我们有以下消费者设置。 enable.auto.commit = true 自动偏移。reset=最早 如果我们有enable.auto.commit = true,那么是否有可能从特定的偏移量/日期消费来自Kafka主题分区的消息?
生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较
2016-07-05 03:59:25.042 O.A.S.D.Executor[INFO]正在处理-2元组的接收消息:源:__System:-1,流:__Tick,ID:{},[30] 2016-07-05 03:59:25.946 O.A.S.D.Executor[INFO]正在处理-2元组的接收消息:源:__System:-1,流:__Metrics_Tick,ID:{},[60] 我的测试