当前位置: 首页 > 知识库问答 >
问题:

使用自定义键创建KTable时出错

公孙驰
2023-03-14

用例-有一个带有消息的主题(空,元数据)。我需要从主题创建一个Ktable,其键(metadata.entity_id)和值为metadata。这个表稍后将被用来与具有相同键的流进行连接。

    private final static String KAFKA_BROKERS = "localhost:9092";
    private final static String APPLICATION_ID = "TestMetadataTable";
    private final static String AUTO_OFFSET_RESET_CONFIG = "earliest";
    private final static String METADATA_TOPIC = "test-metadata-topic";


    public static void main (String args[]) {

        //Setting the Stream configuration params.
        final Properties kafkaStreamConfiguration = new Properties();
        kafkaStreamConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        kafkaStreamConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, APPLICATION_ID);
        kafkaStreamConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);

        kafkaStreamConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);      

      //Creating Serdes for MetricMetadata
        GenericJsonSerializer<MetricMetadata> metadataJsonSerializer = new GenericJsonSerializer<MetricMetadata>();
        GenericJsonDeserializer<MetricMetadata> metadataJsonDeserializer = new GenericJsonDeserializer<MetricMetadata>(MetricMetadata.class);
        Serde<MetricMetadata> metadataSerde = Serdes.serdeFrom(metadataJsonSerializer, metadataJsonDeserializer);


        //Creating kafka stream.
        final StreamsBuilder builder = new StreamsBuilder();

       KTable<String, MetricMetadata> metaTable = builder.table(METADATA_TOPIC, Consumed.with(Serdes.String(), metadataSerde))
                .groupBy((key, value) -> KeyValue.pair(value.getEntity_id(), value))            
                .aggregate( () -> null,
                         (key, value, aggValue) -> value,
                         (key, value, aggValue) -> value
                        );

        final KafkaStreams streams = new KafkaStreams(builder.build(), kafkaStreamConfiguration);
        streams.start();        
    }

一旦我将消息推送到主题-METADATA_TOPIC。这会导致以下错误。我在这里遗漏了什么吗?kafka-stream 2.2.0

Exception in thread "TestMetadataTable-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store test-metadata-topic-STATE-STORE-0000000000
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:519)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:471)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:459)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
    at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.streams.kstream.internals.ChangedSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: org.apache.kafka.streams.kstream.internals.Change). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
    at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:95)
    at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:72)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
    at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:102)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:79)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
    at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:127)
    at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:72)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:224)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
    ... 10 more
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
    ... 28 more

共有1个答案

习和通
2023-03-14

在这种情况下,您需要将Serdes提供给KTable.groupBy()操作,因为调用groupBy会触发重新分区。您还需要为状态存储的聚合操作提供相同的Serdes

此外,由于键是< code>null,我认为您应该首先使用< code>KStream。然后调用< code>groupByKey(你还需要通过< code>Grouped提供< code>Serdes,aggregation就会给你想要的< code>KTable。

我突然想到这样的事情应该有用

builder.stream((METADATA_TOPIC, Consumed.with(Serdes.String(), metadataSerde))
        .selectKey((key, value) -> KeyValue.pair(value.getEntity_id(), value))
        .groupByKey(Grouped.with(Serdes.String(), metadataSerde))
        .aggregate( () -> null,
            (key, value, aggValue) -> value,
            (key, value, aggValue) -> value,
            Materialized.with(Serdes.String(), metadataSerde)
        );
 类似资料:
  • 我有一个方法来发送一些,并获得响应: 这很有效。然而,为了使超时时间可定制,我在本教程中提供了以下: 因此,类将调用restTemplate,如下所示: 但不幸的是,在构建应用程序时返回以下错误: 仅供参考,错误远不止上面显示的。 第一行是这样的:

  • 我们正在将我们的事件系统迁移到函数式编程模型。我们遵循了下一个“指南”,它对消费者非常有效,但使用StreamBridge的生产者没有正确创建消息。 我们有下一个错误: 我们正在使用< code > 2021 . 0 . 0 spring-cloud版本。 通过简单的配置: 这是我们的自定义,它用于我们所有的微服务,因此,保持消息的格式很重要: 我们还使用< code>StreamBridge来生

  • Confluent网站的留档提到以下内容: 左侧KTable可以有多条记录,这些记录映射到右侧KTable上的同一个键。如果右KTable中存在相应的键,则对单个左KTable条目的更新可能会导致单个输出事件。因此,对右KTable条目的单个更新将导致对左KTable中具有相同外键的每个记录进行更新。 查看下面的示例说明: 根据解释,如果是内部联接,则右侧的应该触发左侧的两条记录,这两条记录将被添

  • UnsatisfiedDependencyException:创建名为“Test Controller”的bean时出错:通过字段“Test Service”表示不满足的依赖关系;嵌套异常是org.springframework.beans.factory.beanCreationException:创建名为“test service”的bean时出错:调用init方法失败;嵌套异常是java.l

  • 问题内容: 我想问一个与这个问题相同的问题,但使用SWT:是否有一种方法可以制作带有您自己的按钮图形的按钮,而不仅仅是按钮内的图像?如果不是,这是在Java中创建自定义按钮的另一种方法吗? 问题答案: public class ImageButton extends Canvas { private int mouse = 0; private boolean hit = false;

  • 基本上,我想知道我是否可以创建一个树并在JavaFX上自定义它...我试着去做,但到目前为止还不能用这个代码做任何事情... 我在质疑自己,这是否是正确的“技术”,可以解决我想做的事情... 我从https://docs.oracle.com/javafx/2/ui_controls/tree-view.htm#babjgggf看到了这个教程,但我对这个教程真的很困惑...我不太了解细胞工厂的机制