当前位置: 首页 > 知识库问答 >
问题:

使用KafkaNativeOffsetManager时的Spring集成Kafka慢消息处理

杜俊逸
2023-03-14

我们一直在使用SI Kafka进行一个新项目,并取得了很大成功。在最近的一次切换之前,我们使用KafkaTopicOffsetManager来管理我们的消费者主题偏移量。为了避免每个消费者/主题对都有额外的主题,并使用Burrow或lag监控,我们决定使用最新的KafkaNativeOffsetManager,它使用Kafka提供的本机偏移管理。但在切换之后,我们注意到目标主题的消息消耗持续滞后。我们知道KafkaTopicOffsetManager并没有出现这种情况,因为在切换之前我们已经使用了几个月了。我们还进行了并行测试,验证了在使用KafkaTopicOffsetManager时,消息的消耗与消息的生成几乎是实时的,而KafkaNativeOffsetManager总是越来越落后。两个偏移管理器都使用默认配置,并且都在处理消息后提交偏移(自动确认)。

所以我真的有两个问题,第一个不是这个SO帖子的主要内容。

第一个问题是,为什么本机偏移管理比使用主题进行偏移管理慢?

第二个问题是,我们是否可以将SI kafka配置为在成功处理每条消息时不提交偏移量,而是提供不同的策略?我们的想法是,也许我们不应该如此频繁地提交偏移量,而应该将其作为批处理更新来执行。例如,在成功处理25条消息或30秒后提交偏移量。

非常感谢。

共有2个答案

萧伟兆
2023-03-14

不确定KafkaNativeOffsetManager有什么问题,如果您能分享一些关于这个问题的调查,我们的JIRA代码中的一些瓶颈,那就太好了。

对于延迟偏移量提交,我可以在KafkaMessageDrivenChannelAdapter上建议autoCONOffset=false。让发送到通道的消息将使用KafkaHeaders进行丰富。ACKNOWLEDGment头面对Default确认。它确实响应了您的请求:

/**
 * Invoked when the message for which the acknowledgment has been created has been processed.
 * Calling this method implies that all the previous messages in the partition have been processed already.
 */
void acknowledge();
勾裕
2023-03-14

当禁用自动提交并接收确认标头时,您需要做的唯一一件事是在处理完消息后调用acknowledge()。这假设即使您在不同的线程中处理消息,也会保留对确认实例的引用,或者作为原始消息的一部分,或者在进行转换时复制头。但是调用需要由您的代码进行。

其次,性能问题——它是由KafkaNativeOffsetManager实现对代理进行阻塞、相对更昂贵的调用这一事实引起的(相对于简单地向压缩的主题发送消息,如KafkaTopicOffsetManager所做的。一般来说,在每条消息之后进行更新是昂贵的,在Spring XD中,我们通过使用https://github.com/spring-projects/spring-xd/blob/master/extensions/spring-xd-extension-kafka/src/main/java/org/springframework/integrathtml" target="_blank">ion/x/kafka/WindowingOffsetManager.java来减轻这种情况,这减少了有效写入的数量。我想我们可以为Spring集成做类似的事情。

也就是说:相比之下,100000次更新使用KafkaNativeOffsetManager在9.8秒内完成,使用KafkaTopicOffsetManager在0.382秒内完成,如https://github.com/mbogoevici/spring-integration-kafka/blob/perftest/src/test/java/org/springframework/integration/kafka/performance/OffsetManagerPerformanceTests.java所示(在我自己的机器上收集的结果)。结果可能会有所偏差,但仍然表明存在很大差异。在YourKit中进行跟踪会确认结果。

 类似资料:
  • 当RabbitMq消息到达队列时,我目前正在使用IntegrationFlow来触发作业执行。IntegrationFlow的AmqpInFronChannelAdapter和作业的第一步的ItemReader都配置为从同一队列中读取消息。 我遇到的问题是IntegrationFlow的AmqpInboundChannelAdapter读取RabbitMQ消息,然后ItemReader再也找不到该

  • 我将下面的数据发布到kafka并通过Spring集成通道接收并转换为Log对象,我如何使用Spring集成转换器将下面的数据转换为Log对象?感谢这里的任何帮助 '日志(客户端键=字符串,有效负载=字符串)” 这是通道适配器代码 当我尝试使用下面的方法在服务激活器中进行转换时 它的失败 com.fasterxml.jackson.core.JsonParseException: 无法识别的令牌“日

  • 我试图使用ConsumerSeeKaware,阅读kafka主题中可用的最后一条消息。消息类型是Avro对象列表。我能成功地做到这一点。但在反序列化过程中会失败。该消息使用spring-cloud-stream-kafka框架生成。消息具有contentType。 我知道avro消息可以像下面这样反序列化。 但不管用。可能是因为两件事。 > 消息是avro对象的列表。但我正在尝试使用Avro模式创

  • 如果每个Kafka消息属于一个特定的会话,如何管理会话关联,以便同一个Spark执行器看到链接到一个会话的所有消息? 如何确保属于会话的消息被Spark executor按照在Kafka中报告的顺序处理?我们能以某种方式实现这一点而不对线程计数施加限制并导致处理开销(如按消息时间戳排序)吗? 何时检查会话状态?在执行器节点崩溃的情况下,如何从最后一个检查点恢复状态?在驱动程序节点崩溃的情况下,如何

  • 我正在尝试使用spring integration设置我的应用程序,作为一名新手,需要以下用例的建议- 有一个队列,来自另一个应用程序的消息将被推送到该队列。我的应用程序使用队列中的消息,进行一些数据处理,然后将其推送到另一个出站队列。目标是以并发方式处理消息。 根据我的理解,我们可以有两种方法- 1.使用#轮询器 2.使用#调度器 从基于轮询器的配置来看,池中似乎有多个可用线程,可以同时获取消息

  • 我能有一份只有奴隶而没有主人的工作,听rabbitmq队列吗?我想在spring boot应用程序中使用spring批处理和spring集成来侦听队列并以面向块的方式处理消息。 我想使用Michael Minella(https://www.youtube.com/watch?v=30Tdp1mfR0g)在Spring批处理的远程组块示例中解释的配置,但没有主配置。 下面是我的工作配置。 下面是我