当前位置: 首页 > 知识库问答 >
问题:

在Spring Cloud Stream上使用自定义serde序列化聚合状态存储时出错

殷德本
2023-03-14

我正在尝试用Spring Cloud Stream创建一个简单的函数bean,它处理来自KStream和GlobalKTable的消息,将它们连接起来,聚合它们,并将结果输出到一个新的流,但我在正确配置它所需的SERDE方面遇到了困难。

不用多说,以下是我的方法:

@Bean
public BiFunction<KStream<GenericRecord, GenericRecord>, GlobalKTable<Long, GenericRecord>, KStream<String, MyCustomJavaClass>> joinAndAggregate() {

    return (stream, table) -> stream
            .join(table,
                    (streamKey, streamValue) -> (Long) streamValue.get("something"),
                    (streamValue, tableValue) -> {
                        return new MyCustomJavaClass(streamValue, tableValue);
                    }).selectKey(((key, value) -> (Long) key.get("id")))
            .groupBy((key, value) -> value.getKey(), Grouped.with(Serdes.String(), new MyCustomSerde()))
            .aggregate(() -> {
                return new MyCustomJavaClass();
            }, (key, value, aggregatedValue) -> {
                // aggregation logic
                return new MyCustomJavaClass(aggregatedData);
            }).toStream()
            .peek((k, v) -> {
                if (v == null)
                    log.warn("No value for key:\n" + k.toString() + "\n");
                else
                    log.info("Aggregated result with key:\n" + k + "\nvalue:\n" + v.toString() + "\n");
            });
}

static public final class MyCustomSerde extends JsonSerde<MyCustomJavaClass> { }

这是我的属性文件中的配置:

spring.application.name: test-application
spring.cloud.stream.kafka.binder.brokers: kafka-svc:9092
spring.kafka.properties.schema.registry.url: http://schema-registry-svc:8081
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.function.definition: joinAndAggregate
spring.cloud.stream.bindings.joinAndAggregate-in-0.destination: input-stream
spring.cloud.stream.bindings.joinAndAggregate-in-1.destination: input-global-ktable
spring.cloud.stream.bindings.joinAndAggregate-out-0.destination: aggregate-output
# Serdes
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-0.consumer.application-id: joinAndAggregate-in-0-v0.1.0
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-0.consumer.key-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-0.consumer.value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-1.consumer.application-id: joinAndAggregate-in-1-v0.1.0
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-1.consumer.value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-out-0.producer.value-serde: com.package.MyClass$MyCustomSerde

当我运行上面的代码时,我得到以下错误:

Failed to process stream task 2_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=2_0, processor=KSTREAM-SOURCE-0000000011, topic=joinAndAggregate-in-0-v0.1.0-KSTREAM-AGGREGATE-STATE-STORE-0000000007-repartition, partition=0, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException: 
A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual value type (value type: com.package.model.MyCustomJavaClass). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
        at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:204)
    ... <omitting some lines here> ...
Caused by: java.lang.ClassCastException: class com.package.model.MyCustomJavaClass cannot be cast to class [B (com.package.model.MyCustomJavaClass is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)

这个班的学生是com。包裹模型MyCustomJavaClass与定义函数流方法的MyClass位于不同的包中。这就是问题所在吗?

我还验证了MyCustomJavaClass可以使用上面的自定义serde(MyCustomSerde)进行正确的序列化和反序列化。这只是一个简单的serde扩展JsonSerde。我能够在输入和输出中处理带有MyCustomSerde序列化值的消息,以及我在这里省略的其他函数方法,因此序列化程序和我使用的自定义java类不是问题所在。不知何故,只有聚合状态html" target="_blank">存储流与我的自定义serde存在问题,我无法通过查看示例和文档找到解决方法。

我做错了什么?

提前谢谢!

共有1个答案

上官恩
2023-03-14

当您看到如下错误时:

 serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual value type (value type: com.package.model.MyCustomJavaClass). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

这意味着Kafka Streams使用了与所提供的类型不匹配的(de)序列化程序。在这种情况下,Kafka Streams使用了Serdes. ByteArraySerde的默认序列化程序。如果您更新聚合方法并添加第三个参数Materialized.with(Serdes. String(),new MyCustomSerde()),那么您的应用程序应该会忽略此错误。


 .aggregate(() -> {
                return new MyCustomJavaClass();
            }, (key, value, aggregatedValue) -> {
                // aggregation logic
                return new MyCustomJavaClass(aggregatedData);
            }, Materialized.with(Serdes.String(), new MyCustomSerde()))

告诉我进展如何。

-比尔

 类似资料:
  • 下面是一个简单的场景: 用户单击“Create order”:将创建一个订单(首先保持其状态=NEW) 用户完成订单填写后,单击SAVE-->state is now Submited 当另一个检查订单并验证它时,必须进行一个过程。只有在调用了其他一些服务并给予许可时,才会验证该订单。 整个工作流程是: null null 谢谢

  • 我目前正在将一些代码从Jackson1.x迁移到Jackson2.5json映射器,遇到了一个1.x中没有的问题。 这是设置(参见下面的代码): 接口IPET 类Dog实现IPET IPET使用@jsonTypeInfo和@jsonSubtypes进行注释 类Human具有一个类型为IPet的属性,该属性使用@JSONSerialize(using=CustompetSerializer.clas

  • 问题内容: 我想使用自定义功能对a进行序列化和反序列化,但是Serde的书没有涵盖此功能,并且代码文档也无济于事。 我知道Serde可以很容易地反序列化,因为Chrono支持Serde, 但是 我想学习Serde,所以我想自己实现。当我运行此代码时,出现错误: 问题答案: 结构反序列化的默认行为是,当字段不以序列化形式出现时,为其分配各自的默认值。请注意,这与container 属性 不同,con

  • 问题内容: 我有两个要使用Jackson序列化为JSON的Java类: 我想将Item序列化为此JSON: 用户序列化为仅包含。我还将能够将所有用户对象序列化为JSON,例如: 所以我想我需要为此编写一个自定义的序列化程序并尝试过: 我使用来自Jackson How-to:Custom Serializers的 代码对JSON进行了序列化: 但是我得到这个错误: 如何在Jackson上使用自定义序

  • 问题内容: 我有一个需要序列化为XML的对象,其中包含以下字段: XStream可以很好地序列化它(在使用一些别名之后),如下所示: 就目前而言还可以,但是我希望能够将元素重命名为。从XStream站点上的别名文档中,我看不到一种明显的方法。我是否缺少明显的东西? 问题答案: 我建议将更改为,其中Tag是本质上仅包含字符串的域对象。然后你说: 您将得到您想要的。这样可以避免滚动自己的Convert

  • 我正在尝试使用actix-web服务器作为小型堆栈的网关,以保证堆栈内部的严格数据格式,同时为用户提供一些自由。 为此,我想将 JSON 字符串反序列化到结构中,然后对其进行验证,再次序列化它并将其发布在消息代理上。数据的主要部分是一个数组,其中包含整数,浮点数和日期时间。我使用 serde 进行反序列化,使用 chrono 来处理日期时间。 我尝试使用与枚举相结合的结构来允许不同的类型: 由于<