当前位置: 首页 > 知识库问答 >
问题:

我们是否可以使用spring kafka批处理监听器对消息进行一次处理?

万俟招
2023-03-14
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");    
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 25);
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 4096000);
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 120000);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 600000);  
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 8192000);

但它不止一次地使用消息。有没有人面对过这个问题。此外,使用上述配置,使用者总是在一个批处理中只接收到一个消息。我尝试增加fetch.min.bytesfetch.max.wait.ms,但没有任何影响。

在对ConcurrentKafkaListenerContainerFactory进行如下更改后,批处理配置的问题得到了解决:

ConcurrentKafkaListenerContainerFactory<String, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());

factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3600000);

factory.getContainerProperties().SetackMode(org.springFramework.kafka.listner.ContainerProperties.ackMode.Manual);factory.SetMessageConverter(新建BatchMessagingMessageConverter(stringJsonMessageConverter()));

共有1个答案

章烨烨
2023-03-14

要精确获取一次语义,必须使用事务

然而,仅有一次语义只适用于

read from Kafka -> process -> write to Kafka

即使这样,它也只适用于整个流程(读/处理/写)。

 类似资料:
  • 我们有一些消息需要保持序列。我们已经决定将所有消息从一个特定的源发送到一个分区,这样就可以维护消息序列(多个源可以产生到同一个分区,但一个源不能产生到多个分区),并且我们将能够用它们的密钥标识每个源。 现在,我们需要使用这些消息并进行一些处理。我们对已消费的消息执行多个独立操作(例如,将它们存储在数据库中,转发它们等)。现在,我一直在考虑是使用Kafka Streams API还是消费者API来实

  • 常常它是有用的能够接受附加的回调为了切割关注点穿过一些不同的重试 为了这个目的Spring Batch 提供了RetryListene 接口,RetryTemplate 允许使用者注册RetryListene,并且他们将发送回调随从RetryContext和Throwable,在迭代期间可用。 这个接口看起来像这样: public interface RetryListener { voi

  • 问题内容: 从文档 如果遇到需要插入1000 000行/对象的情况: 为什么我们应该使用这种方法?与StatelessSession一相比,它给我们带来了什么好处: 我的意思是,这个(“替代”)最后一个示例不使用内存,不需要进行同步,清除缓存,那么对于这样的情况,这应该是最佳实践吗?那么为什么要使用前一个呢? 问题答案: 从文档中,您链接到: 特别是,无状态会话不会实现第一级缓存,也不会与任何第二

  • 我已经设置了一个Flink 1.2独立集群,其中包含2个JobManager和3个TaskManager,我正在使用JMeter通过生成Kafka消息/事件对其进行负载测试,然后处理这些消息/事件。处理作业在TaskManager上运行,通常需要大约15K个事件/秒。 作业已设置EXACTLY_ONCE检查点,并将状态和检查点持久化到Amazon S3。如果我关闭运行作业的TaskManager需

  • 我正在使用一个相当大的数据集(大约500Mio-Triples)存储在图形数据库免费并在我的本地开发人员机器上运行。 我想用RDF4J对数据集执行一些操作,并且必须或多或少地选择整个数据集。要进行测试,我只需选择所需的元组。代码在第一个一百万元组中运行良好,之后由于graphDB继续分配更多的RAM,速度变得非常慢。 是否有可能对非常大的数据集执行选择查询并批量获取它们? 基本上,我只想通过一些选

  • 现在我正在用Apache Kafka做一些测试。在Kafka生产者的配置中,参数batch.size和linger.ms控制批处理策略。是否可以在生产的同时动态地制作这些参数?例如。如果数据摄取率上升很快,我们可能希望增加batch.size以每批积累更多的消息。我没有找到任何动态批处理与Kafka生产者的例子。有没有可能实施?