我有一个以Functional方法实现的Spring云流应用程序。应用程序使用来自多个Kafka主题的事件,将输入规范化为输出模式(始终是相同的模式),并发布到Kafka。我不使用Kafka流,因为不需要加入/充实/状态。
我想通过控制输入主题来允许灵活的部署:您可以从所有主题消费,也可以从单个主题消费。我的方法是为每种类型声明专用函数,并为每种函数声明专用绑定。问题是绑定器(有一个)将所有传入消息路由到所有绑定,当调用错误的函数来处理某些事件类型时,我得到ClassCastExc是。
我想到了以下解决方案,但我想知道是否有更好的方法:
我的应用程序.yaml 看起来像这样:
spring:
cloud:
function:
definition: data;more
stream:
default-binder: kafka-string-avro
bindings:
data-in-0:
binder: kafka-string-avro
destination: data.emails.events
group: communication_system_events_data_gp
data-out-0:
binder: kafka-string-avro
destination: communication.system.emails.events
producer:
useNativeEncoding: true
more-in-0:
binder: kafka-string-avro
destination: communication.emails.send.status
group: communication_system_events_more_gp
more-out-0:
binder: kafka-string-avro
destination: communication.system.emails.events
producer:
useNativeEncoding: true
我的职能:
@Bean("data")
public Function<Message<Data>, Message<Output>> dataFunction() {
return new DataFunction();
}
@Bean("more")
public Function<Message<More>, Message<Output>> moreFunction() {
return new MoreFunction();
}
不确定问题在哪里,但我看到您提供的配置存在一些问题。当您复制到问题时,这可能是一个拼写错误,但下面的配置应该将两个不同的主题与其对应的功能隔离开来。
spring:
cloud:
function:
definition: dataFunction;moreFunction
stream:
default-binder: kafka-string-avro
bindings:
dataFunction-in-0:
binder: kafka-string-avro
destination: data.emails.events
group: communication_system_events_data_gp
dataFunction-out-0:
binder: kafka-string-avro
destination: communication.system.emails.events
producer:
useNativeEncoding: true
moreFunction-in-0:
binder: kafka-string-avro
destination: communication.emails.send.status
group: communication_system_events_more_gp
moreFunction-out-0:
binder: kafka-string-avro
destination: communication.system.emails.events
producer:
useNativeEncoding: true
@Bean("data")
public Function<Message<Data>, Message<Output>> dataFunction() {
return new DataFunction();
}
@Bean("more")
public Function<Message<More>, Message<Output>> moreFunction() {
return new MoreFunction();
}
如何使用Spring Cloud Stream Kafka Binder为生产者启用压缩(例如GZIP)?
我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重
我们如何使用Spring-Cloud-stream-binder-kinesis建立两个AWS kinesis连接? 第一个连接:Spring应用程序和AWS kinesis流在同一个AWS账户中。 第二个连接:其他AWS运动流位于不同的AWS帐户中。 从spring应用程序到不同AWS帐户中的两个不同运动流是否可能有两个不同的连接?如果是,我们如何实施?
我正在尝试通过SCSt频道构建并获取KTable。但这并不奏效。输入KTable没有数据,但如果我尝试查看KSTream聚合(toStream()),我可以看到一些数据。我明白了,KTable是不可查询的,它没有可查询的名称。 类别: 绑定: application.yml: