当前位置: 首页 > 知识库问答 >
问题:

使用Kafka流对具有相同键值的Kafka消息进行计数

酆阳煦
2023-03-14

我有一个Java Spring Kafka应用程序,它将Book类型的对象发送到Kafka主题。然后我试着用Kafka流来映射每一条信息,把这本书的作者作为它的关键字。然后,我尝试将它们添加到一个KTable中,该表保存了密钥和拥有该密钥的消息的数量。然后,该表被发送到一个输出kafka主题。

书籍型号:

@Data
public class Book {
    private UUID id;
    private String name;
    private String author;
    private LocalTime date;
}

流结构:

StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream<String,Book> stream = streamsBuilder.stream("input_topic", Consumed.with(Serdes.String(), new BookSerde()));
        stream = stream.selectKey((key,value)->value.getAuthor());
 
        KTable<String ,Long> keyWithCount = stream.groupBy((key, value) -> value.getAuthor()).count();
        keyWithCount.toStream().to("output_topic", Produced.with(Serdes.String(), Serdes.Long()));
 
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), config);
        kafkaStreams.start();

运行应用后,输出显示每个唯一键,但随机表情,而不是计数。

在搞砸.peek()之后,我发现了一件奇怪的事情。在发送到主题之前,我打印了密钥和值,它表明它以应有的方式工作!然而,问题在于,该主题的kafka控制台消费者不会在计数中显示数字,而是显示一些随机符号,例如钻石,心形,笑脸等。

一个例子是,在将作者为“a”的书发送到我的制作人的父主题后,我的流媒体应用程序将其视为第一本书,打印“A 1 ”,但在kafka控制台消费者中,它看起来像“☺".”

共有1个答案

华良才
2023-03-14

我用peek()的发现应该是赠品。问题不在于代码,而在于 kafka 控制台消费者的配置。默认值反序列化程序无法反序列化长整型值。为了解决这个问题,我不得不在初始化消费者时添加 --value-反序列化程序 “组织.apache.kafka.common.serialization.LongDeserializer”

 类似资料:
  • 我用下面的代码给Kafka写信: 我们使用0.8.1.1版本的Kafka。 当多个线程正在写入时,其中一些线程(具有不同的负载)是否使用相同的分区键进行写入,因此Kafka会覆盖这些消息(由于相同的分区密钥)? 让我们朝这个方向思考的文献是:http://kafka.apache.org/documentation.html#compaction

  • 我在Kafka Topic内部有500万条消息。 我必须加入具有相同分区密钥的消息作为单个消息的一部分,并发送给消费者主题[例如:对于密钥1234-Messge1,消费者应该收到单个消息而不是100万消息] Kafka端是否有可用的Kafka API,使用它我可以读取组中具有相同Partition键的所有消息,而不是像传统的spring boot Kafka Listener那样一次读取单个消息。

  • 我的Kafka publisher发送以下格式的字符串消息: 例如: 另外,我们为每个消息添加一些消息键,将它们发送到相应的分区。 我如何在1分钟窗口中重新排序消息并将它们发送到另一个主题?

  • 我是Kafka的新手,我有一个使用Java Apache Camel库实现的Kafka消费者。我发现的问题是-消费者花了很长的时间(>15分钟)来处理很少的消息-这对于我们的用例来说是很好的。 需要一些配置帮助,因为相同的消息会在15分钟后重新发送,如果在15分钟内没有处理(我相信线程控制不会返回)。我想这可能是默认间隔,不确定这是哪一个属性。 那么,我必须在哪里修复配置 生产者级别,以便它不重新

  • 我有两个Kafka制作人向具有多个分区的同一主题发送消息。 正如预期的那样,来自同一生产者PR1的具有相同密钥K1的消息总是转到同一分区PA1。 问题是来自另一个生产者PR2的具有相同密钥K1的消息转到另一个分区PA2,而我希望它们也转到PA1。 Kafka不是在制片人之间保留分区分配吗? 是否与两个生产者使用不同的Kafka客户端库有关? 如果我设置两个制作人使用相同的id,会有帮助吗?