当前位置: 首页 > 知识库问答 >
问题:

SpringKafka消费者确认模式

锺离德庸
2023-03-14

我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务?

或者,在使用Kafka事务(每个侦听器调用一个事务)时,是否忽略了ack模式批处理?如果是这样的话,鉴于消费者每次读取一条记录都需要与经纪人交谈,它会对消费者的表现产生怎样的负面影响?这是否也意味着我的响应生产者不能再缓冲记录,并将它们批量发送给代理,从而失去生产者异步性?

很抱歉在同一篇文章中问了多个问题(可能太多了)。

共有1个答案

易波涛
2023-03-14

使用事务时根本不使用AckMode。记录的偏移量在侦听器退出时和事务提交之前发送。

通过使用批处理侦听器,您可以在列表中获取所有记录,从而改善情况

然而,为了正确地支持生产者保护,您还应该将subBatchPerPartition(在2.3.2中添加)设置为true,这样我们就可以为poll()返回的每个主题/分区获得一个事务。

同样,子批处理的偏移量将在侦听器退出时发送。

如果您不关心击剑,您可以在一次交易中处理整个批次。

 类似资料:
  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 我在站点1(3个代理)有两个集群设置cluster-1,在站点2(3个代理)有两个集群设置cluster-2。使用spring kafka(1.3.6)消费者(一台机器)并通过@KafkaListener注释收听消息。我们如何为每个集群(c1和c2)实例化多个KafkaListenerContainerFactory,并同时监听来自这两个集群的数据。 我的侦听器应该同时使用来自这两个集群的消息。

  • 我有一个springboot应用程序,它侦听Kafka流并将记录发送到某个服务以进行进一步处理。服务有时可能会失败。注释中提到了异常情况。到目前为止,我自己模拟了服务成功和异常场景。 侦听器代码: 用户工厂配置如下: 由于REST服务正在抛出RestClientException,它应该进入上面提到的if块。关于FixedBackOff,我不希望SeekToCurrentErrorHandler执

  • 我怎样才能暗示SpringKafka把每一个话题传播给一个不同的消费者呢? 干杯

  • 一、线程间通信的两种方式 1.wait()/notify() Object类中相关的方法有notify方法和wait方法。因为wait和notify方法定义在Object类中,因此会被所有的类所继承。这些方法都是final的,即它们都是不能被重写的,不能通过子类覆写去改变它们的行为。 ①wait()方法: 让当前线程进入等待,并释放锁。 ②wait(long)方法: 让当前线程进入等待,并释放锁,