当前位置: 首页 > 知识库问答 >
问题:

重新分区后Kafka流未使用serde

阎星河
2023-03-14

我的Kafka Streams应用程序正在使用以下键值布局的kafka主题:String.class-

打印当前主题时,可以确认:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flow-event-stream-file-service-test-instance --property print.key=true --property key.separator=" -- " --from-beginning
flow1 --  SUCCESS     #C:\Daten\file-service\in\crypto.p12

“flow1”是字符串键,--后面的部分是序列化值。

我的流程设置如下:

    KStream<String, HistoryEvent> eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(),
            historyEventSerde));


    eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
            .groupByKey()
            .reduce((e1, e2) -> e2,
                    Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
                            .withKeySerde(new HistoryEventKeySerde()));

因此,据我所知,我告诉它使用String历史事件序列来消费主题,因为这就是主题中的内容。然后,我'rekey'它使用一个组合的关键字,应该存储在本地使用提供的Serde为历史事件ey.class。据我所知,这将导致一个额外的主题被创建(可以在Kafka容器中的主题列表中看到)与新的关键字。这很好。

现在的问题是,即使在主题中只有一个文档的干净环境中,应用程序也无法启动:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=flow-event-stream-file-service-test-instance, partition=0, offset=0
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: HistoryEventSerializer) is not compatible to the actual key or value type (key type: HistoryEventKey / value type: HistoryEvent). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

从信息中很难判断问题的确切位置。它在我的基本主题中指出,但这是不可能的,因为那里的键不是类型HistoryEventKey。由于我在reduce中为HistoryEventKey提供了serde,因此它也不能与本地存储一起使用。

对我来说唯一有意义的是,它与导致重新排列和新主题的selectKey操作有关。但是,我无法确定如何为该操作提供serde。我不想将其设置为默认值,因为它不是默认的密钥serde。


共有2个答案

堵茂勋
2023-03-14

我遇到了一条非常类似的错误消息,但我没有groupbys,而是joins。我在这里为谷歌搜索的下一个人发帖。

org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic my-processor-KSTREAM-MAP-0000000023-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.mycorp.mySession). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).

很明显,和原来的问题一样,我不想更改默认的serdes。

所以在我的例子中,解决方案是在连接中传递一个连接实例,这将允许在SERDE中传递。请注意,错误消息指向重新分区映射- 这有点像转移视线,因为修复程序会转移到其他地方。

我如何修复它(一个加入的例子)

//...omitted ...

    KStream<String,MySession> mySessions = myStream
    .map((k,v) ->{
      MySession s = new MySession(v);
      k = s.makeKey();
      return new KeyValue<>(k, s);
    });
// ^ the mapping causes the repartition, you can not however specify a serde in there.


// but in the join right below, we can pass a JOINED instance and fix it.
    return enrichedSessions
      .leftJoin(
        myTable,
        (session, info) -> {
          session.infos = info;
          return session; },
        Joined.as("my_enriched_session")
              .keySerde(Serdes.String())
              .valueSerde(MySessionSerde())
      );

邵伟
2023-03-14

在对执行进行更多调试后,我能够发现新主题是在group pByKey步骤中创建的。您可以提供一个分组实例,它可以指定用于键和值的Serde:

    eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
            .groupByKey(Grouped.<HistoryEventKey, HistoryEvent>as(null)
                    .withKeySerde(new HistoryEventKeySerde())
                    .withValueSerde(new HistoryEventSerde())
            )
            .reduce((e1, e2) -> e2,
                    Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
                            .withKeySerde(new HistoryEventKeySerde()));
 类似资料:
  • 我正在使用Kafka Producer和RoundRobin分区器来处理一个有12个分区的主题。 代码可在此处找到https://github.com/apache/kafka/blob/2.8/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java 我面临的问题是,这个分区程序让分区正确

  • 当我们基于某个键在流上应用组 by 函数时,kafka 如何计算这一点,因为相同的键可能存在于不同的分区中?我看到了()函数,它基本上对数据进行了重新分区,但我不明白它是什么意思。它会将具有相同键的所有消息移动到单个分区中吗?另外,我们可以通过()方法调用的频率如何?如果有要求,我们可以在收到每条消息后调用它吗?请建议。谢谢

  • 我创建了一个包含3个分区的主题 我使用Java制作人同步写入主题 我有一个Java的用户订阅并阅读它 我的键总是一组固定的3个不同的字符串(k1、k2、k3)。但是我的消息总是去分区1或分区2——k1和k2去分区1,k3去分区2。 为什么分区0未使用?

  • 根据Spark 1.6.3的文档,应该保留结果数据表中的分区数: 返回由给定分区表达式分区的新DataFrame,保留现有的分区数 Edit:这个问题并不涉及在Apache Spark中删除空DataFrame分区的问题(例如,如何在不产生空分区的情况下沿列重新分区),而是为什么文档所说的内容与我在示例中观察到的内容不同

  • spring cloud stream如何将多个Kafka分区分配给属于同一消费者组的反应流? 我注意到,如果我使用普通的非反应流侦听器,每个线程将被分配到一个分区,这取决于使用者并发配置。然而,在流(流量输入)的情况下,我没有注意到任何这样的并行行为。似乎只定义了一个流来处理来自所有分区的消息。 我的期望是每个Kafka主题分区都有独立的流,即使在由不同线程备份的同一节点上也是如此。

  • 我发现分区“Tracking-3”上的消息没有被消耗!! 问题每次都会重现,在新分配的分区中有一些消息丢失,你能有什么建议吗?请帮帮我,谢谢