当前位置: 首页 > 知识库问答 >
问题:

从Kafka主题中获取多条消息

唐泳
2023-03-14

我的用例是,从生产者端,它将一行数据(大约100字节)作为一条消息发布到kafka topic,从消费者端,我希望一次消费5条消息,并将其提供给我的消费者逻辑。

@KafkaListener(id = "listener-batch", topics = "test", containerFactory = "concurrentKafkaListenerContainerFactory")
public void receive(@Payload List<String> messages,
                    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                    @Header(KafkaHeaders.OFFSET) List<Long> offsets) {

    System.out.println("- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -");
    System.out.println("Starting the process to recieve batch messages :: " + messages);
    for (int i = 0; i < messages.size(); i++) {
        System.out.println("received message= "+ messages.get(i) +" with partition-offset= " + partitions.get(i) + "-" + offsets.get(i));
    }
    System.out.println("all the batch messages are consumed");
}

我做了一个简单的例子,它总是得到一个消息并打印在控制台上。请建议我任何需要的配置更改,以实现这一点。

请在下面找到源代码。

@EnableKafka
@Configuration
public class KafkaConfig {

@Bean
public ConsumerFactory<String, String> consumerFactory(){
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
    return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    return factory;
}
}

使用以下命令启动生产者

/kafka生产者性能测试——num记录500——主题测试——吞吐量10——有效负载文件测试。csv——生产者道具引导。servers=localhost:9092 key.serializer=org.apache.kafka.common.serialization。StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerial

测试.csv文件内容

Batch-1 message
Batch-2 message
Batch-3 message
Batch-4 message
Batch-5 message
Batch-6 message
Batch-7 message
Batch-8 message
Batch-9 message
Batch-10 message
Batch-11 message

输出如下所示。

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Starting the process to recieve batch messages :: [Batch-3 message]
received message= Batch-3 message with partition-offset= 0-839501
all the batch messages are consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Starting the process to recieve batch messages :: [Batch-7 message]
received message= Batch-7 message with partition-offset= 0-839502
all the batch messages are consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Starting the process to recieve batch messages :: [Batch-3 message]
received message= Batch-3 message with partition-offset= 0-839503
all the batch messages are consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Starting the process to recieve batch messages :: [Batch-1 message]
received message= Batch-1 message with partition-offset= 0-839504
all the batch messages are consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

提前感谢。

共有3个答案

竺捷
2023-03-14

文卡塔·克里希纳,这只能通过使用在生产者方面实现的键控机制来完成。从源系统生成的每行输入都必须具有与之关联的唯一键,并使用更好的分区策略将事件发布到具有唯一键的特定分区中。根据唯一键,可以使用可用的有状态完整操作之一或使用窗口聚合之一对它们进行分组。因此,如果您使用窗口,则可以实现类似的东西,对给定持续时间内每个键接收的事件进行分组,并将它们全部批量发布到中间主题,并使使用者进行轮询

李星波
2023-03-14

这里提供的要求非常高。如果您能从业务逻辑实现的角度告诉我们您的实际需求,那就太好了。可以根据您的需求对低级编码和其他配置参数进行微调。

为了给你一个建议,如果你想一个接一个地发出消息(5),那么你可以通过max.poll一次轮询5条记录。records=5,并遍历消费者记录。很简单。

阳俊德
2023-03-14

您应该配置批处理侦听器,然后可以设置<code>max.poll。记录属性以指定批大小。

请注意,将此值设置得太低可能会降低整体性能,因为您需要对代理进行更多轮询以获取相同数量的记录。

 类似资料:
  • 我需要创建一个消费者,能够从多个主题拉和订单消息相对于时间戳(Kafka消息时间戳) 在本例中,我订阅了“主题A”和“主题B”,并按照时间戳的顺序对消息进行排队 现在,只要所有主题只有一个分区,这很容易用这个伪代码来解决: 当我为每个主题引入多个分区时,问题就出现了。显然,不可能将多个主题按时间顺序排序到一个流中,因为在一个主题中,顺序不能保证,只能在一个分区中,所以新的问题是将多个主题排序到具有

  • 我是Kafka的新手,正在开发一个原型,将专有的流媒体服务连接到Kafka中。 我希望得到一个主题上发送的最后一条消息的密钥,因为我们的内部流消费者需要用连接时收到的最后一条消息的ID登录。 我尝试使用使用者执行以下操作,但当同时运行控制台使用者时,我看到消息被重播。 这是意料之中的行为还是我走错了路?

  • 如何在apache/kafka中使用regex消费所有主题?我尝试了上面的代码,但不起作用。

  • 我有一个@KafkaListener方法来获取主题中的所有消息,但对于@Scheduled方法工作的每个间隔时间,我只获取一条消息。如何一次从topic获取所有消息? 这是我的课; 这是我在应用程序中的Kafka属性。yml; 还有我的KafkaConfiguration课程;

  • 我一直在使用covid19api持有的数据实现Kafka生产者/消费者和流。 我试图从endpoint中提取每天的案例https://api.covid19api.com/all.然而,这个服务——以及这个API中的其他服务——拥有自疾病开始以来的所有数据(确诊、死亡和恢复病例),但积累了数据,而不是日常病例,这就是我最终要实现的。 使用transformValues和StoreBuilder(正

  • 我想要从服务器的一个主题开始所有的消息。 当使用上面的控制台命令时,我希望能够从一开始就获得一个主题中的所有消息,但我不能从一开始就使用java代码消费一个主题中的所有消息。