当前位置: 首页 > 知识库问答 >
问题:

基于apache-kafka-binder的springCould Steam函数模型

曾奇略
2023-03-14

这是这个问题的续篇。我可以使用“普通的”Apache Kafka绑定器与功能模型一起使用吗?到目前为止,使用基于注释的配置,我混合了这两种方法,spring-cloud-stream-binder-kafka用于简单的消费/生产,spring-cloud-stream-binder-kafka-streams用于在一个应用程序中进行高级流处理。

函数模型似乎只有streams绑定器才支持,如果我尝试混合使用这两种方法--基于简单用法的注释和基于流的函数,则不会注册流绑定。

spring.cloud:
        stream:
          function:
            definition: processStream
          bindings:
            processStream-in-0:
              destination:  my-topic
            simple-binding-in:
              destination: another-topic

public interface SimpleBinding {

    String INPUT = "simple-binding-in";

    @Input(INPUT)
    SubscribableChannel simpleIn();

}

@Component
public class SimpleListener {

    @StreamListener(SimpleBinding.INPUT)
    public void listen(@Payload SomeDto payload) {
    }
}

@Configuration
public class FunctionalStream {

    @Bean
    public Consumer<KStream<String>> processStream() {
        return eventStream -> eventStream.map()
    }
}

配置类上存在@enablebinding(SimpleBinding.class)。是否首选/支持将两者混合使用,或者我是否应该使用streams-binder,甚至用于简单的消息消费?

共有1个答案

洪博涛
2023-03-14

对于Kafka Binder,您可以并且绝对应该使用functional model,同时忘记StreamListener。这样它将与您的KStream函数模型保持一致。

spring.cloud:
        stream:
          function:
            definition: processStream
          bindings:
            processStream-in-0:
              destination:  my-topic
            listen-in-0:
              destination: another-topic

@Component
public class SimpleListener {

    @Bean
    public Consumer<SomeDto> listen() {
        return payload -> ...
    }
}

@Configuration
public class FunctionalStream {

    @Bean
    public Consumer<KStream<String>> processStream() {
        return eventStream -> eventStream.map()
    }
}
 类似资料:
  • 用法 对于使用Apache Kafka绑定器,您只需要使用以下Maven坐标将其添加到您的Spring Cloud Stream应用程序: <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependen

  • 我们在Spring Boot的基础上开发了一个内部公司框架,我们希望通过Spring Cloud Stream支持Kafka Streams。我们需要自动向所有出站消息注入一些头。我们通过标准的Spring Cloud Stream Kafka Binder注册了一个定制的,实现了这一点,但这不适用于Kafka Streams,因为它们似乎遵循不同的路径。 对于Spring Cloud Strea

  • 我想做 从映射结构中包含的任何arraylist元素的CEP开始,然后继续我已经开始的其余arraylist元素 地图和图案结构: 我的目的是使用map中的数组列表元素发出警告,但由于其中的流流,数组列表元素的顺序并不重要。我想继续处理这个数组的其余元素,当我从这里的任何数组开始时,我可以返回这个数组的信息。例如:

  • 问题内容: 我是python的新手,目前正在使用它。我有一个脚本,对设备执行一些API调用。我想扩展功能并根据调用脚本时给出的参数调用不同的函数。 目前,我有以下内容: 我也有一个 和 如何基于给定的参数调用函数(并且仅此函数)?我不想跑步 因为我想在以后将主要可执行文件保持整洁的同时将不同功能移至模块。 问题答案: 由于您似乎想根据给定的参数运行一个函数,并且仅运行一个函数,因此建议您使用强制性

  • 我的理解是spring-kafka是为了与Kafka Client API交互而创建的,后来,spring-cloud-stream项目是为了“构建与共享消息系统连接的高度可扩展的事件驱动微服务”而创建的,该项目包括几个绑定器,其中一个是允许与Kafka Stream API交互的绑定器: 所以我很清楚,如果我想与Kafka流API交互,我将使用Spring-Cloud-Stream方法和适当的绑

  • 顺便说一句,我确实编译了以下内容,但是专门化在运行时没有像预期的那样工作。基类型和派生类型最终要经历的非专用版本。 正确的语法是什么?