当前位置: 首页 > 知识库问答 >
问题:

kafka API,用于从具有相同分区键的主题中读取一组kafka消息

马魁
2023-03-14

我在Kafka Topic内部有500万条消息。

1 million  message with Partition key  -1234-Messge1
1 million  message with Partition key  -2345-Messge2
1 million  message with Partition key  -5678-Messge3
1 million  message with Partition key  -6789-Messge4
1 million  message with Partition key  -6565-Messge5

我必须加入具有相同分区密钥的消息作为单个消息的一部分,并发送给消费者主题[例如:对于密钥1234-Messge1,消费者应该收到单个消息而不是100万消息]

Kafka端是否有可用的Kafka API,使用它我可以读取组中具有相同Partition键的所有消息,而不是像传统的spring boot Kafka Listener那样一次读取单个消息。

共有2个答案

端木朝
2023-03-14

不尽然,因为Kafka不提供键驱动的API。最后(无论您是直接使用KafkaConsumer还是使用Streams),您都将从您想要的分区中读取所有记录。

如果您知道您的记录在哪个(些)分区中,那么您可以设置消费者从这些分区中读取。

然而,请记住,您可能会受到主题中分区数量增加等情况的影响(除非您使用稳定的哈希分区器,否则什么会改变哈希函数)。

诸葛亮
2023-03-14

在Kafka Streams中,您可以对特定的记录键filter,但这将读取所有分区。例如,如果您希望所有键具有相同的逻辑,您也可以groupByKey,并且可以从KTable中查询所有值。

如果您已经知道(或可以计算)主题分区,您可以分配(或使用@KafkaListenerproperty )来使 一对多地读取 if(record.key().equals(partitionKey))

 类似资料:
  • 我有一个带有2个分区的源主题,我正在用同一个应用程序启动2个kafka streams应用程序。id,但不同的接收器主题。 1) 这两个应用程序实例是否会从不同的分区接收数据? 2)如果其中一个应用程序被杀死,另一个实例会自动从两个实例中消耗吗? 3) 我如何证明上述情况?

  • 我用下面的代码给Kafka写信: 我们使用0.8.1.1版本的Kafka。 当多个线程正在写入时,其中一些线程(具有不同的负载)是否使用相同的分区键进行写入,因此Kafka会覆盖这些消息(由于相同的分区密钥)? 让我们朝这个方向思考的文献是:http://kafka.apache.org/documentation.html#compaction

  • 我们计划编写一个Kafka消费者(java),它读取Kafka队列以执行消息中的操作。

  • 我们的生产Storm集群出现了一个我们无法解决的问题。 在某个时候,似乎kafka spout停止了从一半的主题分区中读取。有40个分区,它只读取其中的20个。在这种情况开始发生的时候,我们找不到我们对Storm星团或Kafka所做的任何改变。 我们更改了使用者组 ID,并将输出配置设置为它仍然只连接到相同的20个分区。我们已经查看了节点

  • 我对Kafka是新的,所以道歉,如果我听起来很愚蠢,但我目前所理解的是…消息流可以定义为主题,就像类别一样。并且每个主题被分成一个或多个分区(每个分区可以有多个副本)。所以它们是平行的 他们说Kafka的主要网站 生成器能够选择将哪个消息分配给主题中的哪个分区。这可以通过循环的方式简单地平衡负载,也可以根据某个语义分区函数(例如基于消息中的某个键)来完成。 在0.8 beta版中创建produce

  • 我有两个kafka consumer实例,配置了相同的消费者组,并监听相同主题中的分区0。问题是我发消息到题目的时候。消息由两个实例使用,这两个实例应该不会发生,因为它们在同一个组中。我使用Spring Boot配置类来配置它们。 以下是配置: 以下是听众: