当前位置: 首页 > 知识库问答 >
问题:

Spring云流:按类型将函数附加到活页夹

亢雅懿
2023-03-14

我有一个以Functional方法实现的Spring云流应用程序。应用程序使用来自多个Kafka主题的事件,将输入规范化为输出模式(始终是相同的模式),并发布到Kafka。我不使用Kafka流,因为不需要加入/充实/状态。

我想通过控制输入主题来允许灵活的部署:您可以从所有主题消费,也可以从单个主题消费。我的方法是为每种类型声明专用函数,并为每种函数声明专用绑定。问题是绑定器(有一个)将所有传入消息路由到所有绑定,当调用错误的函数来处理某些事件类型时,我得到ClassCastExc是。

我想到了以下解决方案,但我想知道是否有更好的方法:

    < li >每次装订只有一个活页夹。我宁愿不这样做,特别是因为我正在使用一个配置良好的活页夹,我不想简单地复制它。 < li >具有单个活页夹和单个消息类型功能

我的应用程序.yaml 看起来像这样:

spring:
cloud:
    function:
        definition: data;more
    stream:
        default-binder: kafka-string-avro
        bindings:
            data-in-0:
                binder: kafka-string-avro
                destination: data.emails.events
                group: communication_system_events_data_gp
            data-out-0:
                binder: kafka-string-avro
                destination: communication.system.emails.events
                producer:
                    useNativeEncoding: true
            more-in-0:
                binder: kafka-string-avro
                destination: communication.emails.send.status
                group: communication_system_events_more_gp
            more-out-0:
                binder: kafka-string-avro
                destination: communication.system.emails.events
                producer:
                    useNativeEncoding: true

我的职能:

@Bean("data")
public Function<Message<Data>, Message<Output>> dataFunction() {
    return new DataFunction();
}
@Bean("more")
public Function<Message<More>, Message<Output>> moreFunction() {
    return new MoreFunction();
}

共有1个答案

姜经武
2023-03-14

不确定问题在哪里,但我看到您提供的配置存在一些问题。当您复制到问题时,这可能是一个拼写错误,但下面的配置应该将两个不同的主题与其对应的功能隔离开来。

spring:
cloud:
    function:
        definition: dataFunction;moreFunction
    stream:
        default-binder: kafka-string-avro
        bindings:
            dataFunction-in-0:
                binder: kafka-string-avro
                destination: data.emails.events
                group: communication_system_events_data_gp
            dataFunction-out-0:
                binder: kafka-string-avro
                destination: communication.system.emails.events
                producer:
                    useNativeEncoding: true
            moreFunction-in-0:
                binder: kafka-string-avro
                destination: communication.emails.send.status
                group: communication_system_events_more_gp
            moreFunction-out-0:
                binder: kafka-string-avro
                destination: communication.system.emails.events
                producer:
                    useNativeEncoding: true
@Bean("data")
public Function<Message<Data>, Message<Output>> dataFunction() {
    return new DataFunction();
}
@Bean("more")
public Function<Message<More>, Message<Output>> moreFunction() {
    return new MoreFunction();
}
 类似资料:
  • 如何使用Spring Cloud Stream Kafka Binder为生产者启用压缩(例如GZIP)?

  • 我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重

  • 我们如何使用Spring-Cloud-stream-binder-kinesis建立两个AWS kinesis连接? 第一个连接:Spring应用程序和AWS kinesis流在同一个AWS账户中。 第二个连接:其他AWS运动流位于不同的AWS帐户中。 从spring应用程序到不同AWS帐户中的两个不同运动流是否可能有两个不同的连接?如果是,我们如何实施?

  • 我正在尝试通过SCSt频道构建并获取KTable。但这并不奏效。输入KTable没有数据,但如果我尝试查看KSTream聚合(toStream()),我可以看到一些数据。我明白了,KTable是不可查询的,它没有可查询的名称。 类别: 绑定: application.yml: