当前位置: 首页 > 知识库问答 >
问题:

Spring-Cloud-Stream榆树。版本无法更改Serde

钱经业
2023-03-14

我无法使用留档(https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_configuration_options_3)中指定的语法更改通道(或绑定)的序列。

假设我的通道是pcin,我知道我应该使用以下属性指示valueSerde和keySerde。云流动Kafka。流。绑定。pcin。制作人参数DE和Spring。云流动Kafka。流。绑定。pcin。制作人钥匙塞。

但是,我收到一个例外:

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.Long). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

我正在尝试改编Josh Long的《Spring Tips》中的例子:https://github.com/spring-tips/spring-cloud-stream-kafka-streams

我刚刚更改了类PageViewEventProcessor,如下所示:

@Component
        public static class PageViewEventProcessor {

                @StreamListener
                @SendTo(AnalyticsBinding.PAGE_COUNT_OUT)
                public KStream<String, Long> process(@Input(AnalyticsBinding.PAGE_VIEWS_IN) KStream<String, PageViewEvent> events) {
                        return events
                            .filter((key, value) -> value.getDuration() > 10)
                            .map((key, value) -> new KeyValue<>(value.getPage(), value.getDuration()))
                            .groupByKey()
                            .aggregate(()-> 0L, 
                                    (cle, val, valAgregee) -> valAgregee + val, 
                                    Materialized.as(AnalyticsBinding.PAGE_COUNT_MV))

                            .toStream();
                }
        }

我不计算事件的数量(页面访问),而是计算每次访问的持续时间之和。

下面是应用程序的摘录。属性(来自Spring提示示例):

# page counts out
spring.cloud.stream.bindings.pcout.destination=pcs
spring.cloud.stream.bindings.pcout.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.pcout.producer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcout.producer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
#
# page counts in
spring.cloud.stream.bindings.pcin.destination=pcs
spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true
spring.cloud.stream.bindings.pcin.group=pcs
spring.cloud.stream.bindings.pcin.content-type=application/json
spring.cloud.stream.bindings.pcin.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde

是否有其他必要的更改?

共有1个答案

戴博
2023-03-14

pcin是否绑定在使用者(输入)上?如果是,您应该使用属性作为spring.cloud.stream.kafka.streams.bindings.pcin.consumer.valueSerde和spring.cloud.stream.kafka.streams.bindings.pcin.consumer.keySerde

您的传入值类型是PageViewEvent。但是,您正在将值设置为lonserde。

您可以完全删除此属性:spring.cloud.stream.bindings.pcin.consumer.use-native-decding=true并让框架为您进行JSON转换。这样,传入的类型会自动转换为PageViewEvent,而无需您明确提供值Serde。

如果您必须提供一个值Serde(在这种情况下,本机解码属性必须设置为true),那么您必须提供一个合适的JsonSerde作为值Serde

更新:

通过以下更改,我可以运行应用程序而不出现任何错误。

我这样更改了你的代码。

@StreamListener
@SendTo(AnalyticsBinding.PAGE_COUNT_OUT)
public KStream<String, Long> process(@Input(AnalyticsBinding.PAGE_VIEWS_IN) KStream<String, PageViewEvent> events) {

                    return events
                            .filter((key, value) -> value.getDuration() > 10)
                            .map((key, value) -> new KeyValue<>(value.getPage(), value.getDuration()))
                            .groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
                            .aggregate(()-> 0L,
                                    (cle, val, valAgregee) -> valAgregee + val,
                                    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(AnalyticsBinding.PAGE_COUNT_MV)
                                    .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
                            )
                            .toStream();
                }

内部Serdes ongroupByKey聚合调用是必要的,因为它们不同于默认键/值Serde组合。

我还更改了您的配置并进行了清理:

#
# defaults
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.mms=1000
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
#
# page views out
spring.cloud.stream.bindings.pvout.destination=pvs
#
# page views in
spring.cloud.stream.bindings.pvin.destination=pvs
#
# page counts out
spring.cloud.stream.bindings.pcout.destination=pcs
spring.cloud.stream.bindings.pcout.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.pcout.producer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcout.producer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
#
# page counts in
spring.cloud.stream.bindings.pcin.destination=pcs
spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true
spring.cloud.stream.bindings.pcin.group=pcs
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
 类似资料:
  • 我试图更改IntelliJ中的Java版本为。所以我去了 ,它仍然显示17,我如何将其更改为1.8,这可能最终修复我的错误?

  • 我有两个非常简单的spring-cloud-stream应用程序。消息生产者Service3通过Binder-Kafka向消费者Service4发送消息。 我用春云侦探来追踪它们之间的跨度。但是只有Service3中的跨在zipkin服务器中可用。Service4没有显示span。

  • 本节将详细介绍如何使用Spring Cloud Stream。它涵盖了创建和运行流应用程序等主题。

  • 我使用的是Spring boot 1.1.1。我有一个H2数据库,它是在启动时创建的。当我从IntelliJ运行我的主类时,一切都很好。当我运行“gradle build test”时,我的集成测试会出错: 这是我的gradle文件: 我在src/main/资源和src/test/资源中都有一个application.properties文件,其中包含以下条目: 然后在src/main/资源/db

  • 我正在使用com的Spring Cloud Streams。蔚蓝色的spring:azure Spring-Cloud-Stream绑定器eventhubs:2.8.0,我正在使用供应商。 它适用于Spring boot 2.3.12。发布。 但是,如果我去Spring启动版本 有什么想法如何使用更高版本的Spring靴吗? 如下所述,对于当前最新的spring boot 2.4.5,org需要2

  • 我有三个应用程序作为源、处理器和接收器。Source能够将消息传递给处理器,但处理器无法将消息发送到接收器并抛出异常。尝试在本地使用Spring Cloud stream执行任务,所以我在我的接收器pom文件中有spring-cloud d-confiyer-local,并且主类也用@EnableTaskLauncher注释。如果有人可以提供samole,那也会有所帮助。 加工机 源错误 请有人调