当前位置: 首页 > 知识库问答 >
问题:

我们正在为Kafka使用Spring Cloud Stream,我们正在寻找消费者API的精确一次语义

党浩阔
2023-03-14

我们正在为Kafka使用Spring Cloud Stream,并寻找精确的一次性语义。我们有一个解决方案,正如预期的那样运行良好1)启用幂等元

但现在我们发现有一个属性(producer.sendfoffsetstotransaction)Kafka API,它可以帮助我们修复消费者端的重复处理,而无需任何元数据存储逻辑。现在,我不确定如何使用具有此属性的spring cloud stream实现这一点。sendOffsetsToTransaction

共有1个答案

华凡
2023-03-14

如果您将KafkaTransactionManager添加到应用程序上下文,它会由框架自动处理。

您必须向配置中添加事务id前缀。

spring.kafka.producer.transaction-id前缀

引导将自动添加事务管理器。

请参见生产者属性。

spring。云流动Kafka。粘合剂交易交易IDPrefix

启用活页夹中的事务。请参阅事务。Kafka文档中的id和SpringKafka文档中的事务。启用事务后,将忽略单个生产者属性,并且所有生产者都使用spring。云流动Kafka。粘合剂交易生产商。*属性。

当侦听器正常退出时,侦听器容器在提交事务之前将偏移量发送到事务。

 类似资料:
  • /usr/local/kafka2.12-2.6.0/config/server.properties 在开始动物园管理员和Kafka之后,创建一个新的主题 检查所有三个节点上的集群状态

  • 我是一个新的Kafka。我开始做Kafka,我面临以下问题,请帮助我解决这一个,提前谢谢。首先,我正在编写生产者API,它工作良好,但在编写消费者API时,消息不会显示。 我的代码是这样的: 已订阅主题Hello-Kafka records::org.apache.kafka.clients.consumer.consumerRecords@76b0bfab org.apache.kafka.cl

  • 我对Kafka很陌生。我有一个要求,在那里我只需要阅读一些特定的消息。

  • 我试图知道我的频道是否在YouTube上流式传输。我正在将youtube api v3与php库一起使用。我能够通过liveBroadcasts.list获得最后的广播,但是当我开始使用OBS进行流式传输时,我将参数 broadcastStatus = active 放在一起,并且不返回任何内容。我也尝试过liveStreams.list,但我仍然一无所获。我不知道我做错了什么,有人可以解释一下吗

  • 我是一个学习Kafka的新学生,我遇到了一些关于理解多个消费者的基本问题,到目前为止,文章、文档等都没有太大的帮助。 我尝试做的一件事是编写我自己的高级Kafka生产者和消费者,并同时运行他们,发布100个简单的消息到一个主题,并让我的消费者检索他们。我成功地做到了这一点,但是当我试图引入第二个消费者来消费刚刚发布消息的同一主题时,它没有收到任何消息。 我的理解是,对于每个主题,您可以有来自不同消

  • 生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较