我无法使用留档(https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_configuration_options_3)中指定的语法更改通道(或绑定)的序列。
假设我的通道是pcin,我知道我应该使用以下属性指示valueSerde和keySerde。云流动Kafka。流。绑定。pcin。制作人参数DE和Spring。云流动Kafka。流。绑定。pcin。制作人钥匙塞。
但是,我收到一个例外:
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.Long). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
我正在尝试改编Josh Long的《Spring Tips》中的例子:https://github.com/spring-tips/spring-cloud-stream-kafka-streams
我刚刚更改了类PageViewEventProcessor,如下所示:
@Component
public static class PageViewEventProcessor {
@StreamListener
@SendTo(AnalyticsBinding.PAGE_COUNT_OUT)
public KStream<String, Long> process(@Input(AnalyticsBinding.PAGE_VIEWS_IN) KStream<String, PageViewEvent> events) {
return events
.filter((key, value) -> value.getDuration() > 10)
.map((key, value) -> new KeyValue<>(value.getPage(), value.getDuration()))
.groupByKey()
.aggregate(()-> 0L,
(cle, val, valAgregee) -> valAgregee + val,
Materialized.as(AnalyticsBinding.PAGE_COUNT_MV))
.toStream();
}
}
我不计算事件的数量(页面访问),而是计算每次访问的持续时间之和。
下面是应用程序的摘录。属性(来自Spring提示示例):
# page counts out
spring.cloud.stream.bindings.pcout.destination=pcs
spring.cloud.stream.bindings.pcout.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.pcout.producer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcout.producer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
#
# page counts in
spring.cloud.stream.bindings.pcin.destination=pcs
spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true
spring.cloud.stream.bindings.pcin.group=pcs
spring.cloud.stream.bindings.pcin.content-type=application/json
spring.cloud.stream.bindings.pcin.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
是否有其他必要的更改?
pcin
是否绑定在使用者(输入)上?如果是,您应该使用属性作为spring.cloud.stream.kafka.streams.bindings.pcin.consumer.valueSerde和spring.cloud.stream.kafka.streams.bindings.pcin.consumer.keySerde
您的传入值类型是PageViewEvent。但是,您正在将值设置为lonserde。
您可以完全删除此属性:spring.cloud.stream.bindings.pcin.consumer.use-native-decding=true
并让框架为您进行JSON转换。这样,传入的类型会自动转换为PageViewEvent
,而无需您明确提供值Serde。
如果您必须提供一个值Serde(在这种情况下,本机解码属性必须设置为true
),那么您必须提供一个合适的JsonSerde作为值Serde
。
更新:
通过以下更改,我可以运行应用程序而不出现任何错误。
我这样更改了你的代码。
@StreamListener
@SendTo(AnalyticsBinding.PAGE_COUNT_OUT)
public KStream<String, Long> process(@Input(AnalyticsBinding.PAGE_VIEWS_IN) KStream<String, PageViewEvent> events) {
return events
.filter((key, value) -> value.getDuration() > 10)
.map((key, value) -> new KeyValue<>(value.getPage(), value.getDuration()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
.aggregate(()-> 0L,
(cle, val, valAgregee) -> valAgregee + val,
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(AnalyticsBinding.PAGE_COUNT_MV)
.withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
)
.toStream();
}
内部Serdes ongroupByKey
和聚合
调用是必要的,因为它们不同于默认键/值Serde组合。
我还更改了您的配置并进行了清理:
#
# defaults
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.mms=1000
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
#
# page views out
spring.cloud.stream.bindings.pvout.destination=pvs
#
# page views in
spring.cloud.stream.bindings.pvin.destination=pvs
#
# page counts out
spring.cloud.stream.bindings.pcout.destination=pcs
spring.cloud.stream.bindings.pcout.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.pcout.producer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcout.producer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
#
# page counts in
spring.cloud.stream.bindings.pcin.destination=pcs
spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true
spring.cloud.stream.bindings.pcin.group=pcs
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
我试图更改IntelliJ中的Java版本为。所以我去了 ,它仍然显示17,我如何将其更改为1.8,这可能最终修复我的错误?
我有两个非常简单的spring-cloud-stream应用程序。消息生产者Service3通过Binder-Kafka向消费者Service4发送消息。 我用春云侦探来追踪它们之间的跨度。但是只有Service3中的跨在zipkin服务器中可用。Service4没有显示span。
本节将详细介绍如何使用Spring Cloud Stream。它涵盖了创建和运行流应用程序等主题。
我使用的是Spring boot 1.1.1。我有一个H2数据库,它是在启动时创建的。当我从IntelliJ运行我的主类时,一切都很好。当我运行“gradle build test”时,我的集成测试会出错: 这是我的gradle文件: 我在src/main/资源和src/test/资源中都有一个application.properties文件,其中包含以下条目: 然后在src/main/资源/db
我正在使用com的Spring Cloud Streams。蔚蓝色的spring:azure Spring-Cloud-Stream绑定器eventhubs:2.8.0,我正在使用供应商。 它适用于Spring boot 2.3.12。发布。 但是,如果我去Spring启动版本 有什么想法如何使用更高版本的Spring靴吗? 如下所述,对于当前最新的spring boot 2.4.5,org需要2
我有三个应用程序作为源、处理器和接收器。Source能够将消息传递给处理器,但处理器无法将消息发送到接收器并抛出异常。尝试在本地使用Spring Cloud stream执行任务,所以我在我的接收器pom文件中有spring-cloud d-confiyer-local,并且主类也用@EnableTaskLauncher注释。如果有人可以提供samole,那也会有所帮助。 加工机 源错误 请有人调