当前位置: 首页 > 知识库问答 >
问题:

如何实现Spring Kafka消费者的高性能

西门胜涝
2023-03-14

如何提高Kafka消费者的绩效?我有(并且需要)至少一次Kafka消费语义学

我有以下配置。processInDB()需要2分钟才能完成。因此,仅处理10条消息(全部在单个分区中)就需要20分钟(假设每条消息2分钟)。我可以在不同的线程中调用processInDB,但我可能会丢失消息!。如何在2到4分钟的时间窗口内处理所有10条消息?

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "grpid-mytopic120112141");
  props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

ConcurrentKafkaListenerContainerFactory<String, ValidatedConsumerClass> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(AckMode.RECORD);
        factory.setErrorHandler(errorHandler());

下面是我的Kafka消费者代码。

@KafkaListener(id = "foo", topics = "mytopic-3", concurrency = "6", groupId = "mytopic-1-groupid")
    public void consumeFromTopic1(@Payload @Valid ValidatedConsumerClass message, ConsumerRecordMetadata c)  {

        dbservice.processInDB(message);


    }

共有2个答案

楚翰
2023-03-14

另一个不完全与spring kafka相关的建议,正如您在标签中所述,您也在探索消费者api,而不仅仅是spring kafka,因此我允许自己在这里提出建议,您可能希望测试此api

https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/

https://github.com/confluentinc/parallel-consumer

  • 其处于阿尔法阶段,因此不建议用于生产,但可能也会注意这一点

但正如我在前面的评论中所述,您可能只想创建更多的分区

尹何平
2023-03-14

使用批处理侦听器会有所帮助—您只需在侦听器中保留使用者线程,直到所有单个记录都完成处理。

在下一个版本(今天发布的2.8.0-M1里程碑)中,支持无序的手动确认,其中框架将提交推迟到“填补空白”为止https://docs.spring.io/spring-kafka/docs/2.8.0-M1/reference/html/#x28-ooo提交

 类似资料:
  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 我在站点1(3个代理)有两个集群设置cluster-1,在站点2(3个代理)有两个集群设置cluster-2。使用spring kafka(1.3.6)消费者(一台机器)并通过@KafkaListener注释收听消息。我们如何为每个集群(c1和c2)实例化多个KafkaListenerContainerFactory,并同时监听来自这两个集群的数据。 我的侦听器应该同时使用来自这两个集群的消息。

  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用

  • 我正在构建一个使用来自Kafka主题的消息并执行数据库更新任务的Kafka消费者应用程序。消息是每天一次大批量生产的--所以该主题在10分钟内加载了大约100万条消息。主题有8个分区。 Spring Kafka消费者(使用@KafKalistener注释并使用ConcurrentKafkaListenerContainerFactory)在非常短的批处理中被触发。 批处理大小有时仅为1或2条消息。

  • 我正在尝试使用高级消费者批量读取Kafka主题中的消息。在这批读取期间,我的线程必须在某个时候停止。 或者,一旦主题中的所有消息都用完了。或获取消息即将被读取时的最大偏移量,并停止直到达到最大偏移量。 我尝试在高级消费者处使用代码,但 KafkaStream 上的迭代器方法似乎是一个阻塞调用,并等待另一条消息传入。 所以3个问题, > 我怎么知道没有更多消息要从该主题中读取? 如果我对上述问题有答