当前位置: 首页 > 知识库问答 >
问题:

与Kafka Spring Cloud Stream的多个@EnableBinding

终睿
2023-03-14

我正在尝试设置一个听Kafka的Spring Boot应用程序。

我用的是Kafka流活页夹。

用一个简单的@EnableBind

@EnableBinding(StreamExample.StreamProcessor.class)
public class StreamExample {

    @StreamListener(StreamProcessor.INPUT)
    @SendTo(StreamProcessor.OUTPUT)
    public KStream<String, String> process(KStream<String, String> input) {

        logger.info("Stream listening");

        return input
                .peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
    }

    interface StreamProcessor {

        String INPUT = "input_1";
        String OUTPUT = "output_1";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }
}

应用程序中。yml

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            brokers: localhost:29092
      bindings:
         input_1:
           destination: mytopic1
           group: readgroup
         output_1:
           destination: mytopic2
         input_2:
           destination: mytopic3
           group: readgroup
         output_2:
           destination: mytopic4
  application:
    name: stream_s1000_app

一切工作正常。

但是,如果我尝试添加第二个类与其他绑定,会发生以下错误:

以下订阅的主题不分配给任何成员:[mytopic1]

第二个绑定的示例:

@EnableBinding(StreamExampleBindingTwo.StreamProcessor.class)
public class StreamExampleBindingTwo {

    @StreamListener(StreamProcessor.INPUT)
    @SendTo(StreamProcessor.OUTPUT)
    public KStream<String, String> process(KStream<String, String> input) {

        logger.info("Stream listening binding two");

        return input
                .peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
    }

    interface StreamProcessor {

        String INPUT = "input_2";
        String OUTPUT = "output_2";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }
}

我错过了什么?我不能在同一个应用程序中使用多个输入主题和多个输出吗?这与应用有关。名称

共有2个答案

乔宏峻
2023-03-14

试试看

@EnableBinding( { StreamExample.StreamProcessor.class, StreamExampleBindingTwo.StreamProcessor.class })
吕琪
2023-03-14

我刚刚尝试了一个应用程序,它很有效。当你在同一个应用程序中有多个处理器时,你需要确保每个处理器都有自己的应用程序标识。请参阅下面我如何在application.yml中为两个输入提供两个不同的应用程序标识。

我看到两个处理器都登录到了控制台。此外,还看到了输出主题上的消息。

@SpringBootApplication
@EnableBinding({So54522918Application.StreamProcessor1.class, So54522918Application.StreamProcessor2.class})
public class So54522918Application {

    public static void main(String[] args) {
        SpringApplication.run(So54522918Application.class, args);
    }

    @StreamListener(StreamProcessor1.INPUT)
    @SendTo(StreamProcessor1.OUTPUT)
    public KStream<String, String> process1(KStream<String, String> input) {

        System.out.println("Stream listening");

        return input
                .peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
    }

    @StreamListener(StreamProcessor2.INPUT)
    @SendTo(StreamProcessor2.OUTPUT)
    public KStream<String, String> process2(KStream<String, String> input) {

        System.out.println("Stream listening binding two");

        return input
                .peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
    }

    interface StreamProcessor1 {

        String INPUT = "input_1";
        String OUTPUT = "output_1";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }

    interface StreamProcessor2 {

        String INPUT = "input_2";
        String OUTPUT = "output_2";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }

}

申请的相关部分。yml

spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
  binder.configuration:
    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  bindings.input_1.consumer.application-id: process-1
  bindings.input_2.consumer.application-id: process-2
spring.cloud.stream.bindings.input_1:
  destination: mytopic1
spring.cloud.stream.bindings.output_1:
  destination: mytopic2
spring.cloud.stream.bindings.input_2:
  destination: mytopic3
spring.cloud.stream.bindings.output_2:
  destination: mytopic4
 类似资料:
  • 我试图将QueryDSL与Spring Data JPA一起使用。

  • 问题内容: 是否有理由将多个断言分组: 而不是这样做: 问题答案: 有趣的是,无论有多少失败,它总是检查传递给它的所有断言。如果一切顺利,那么一切都很好- 如果至少有一个失败,您将得到所有错误的详细结果(正确的解决方法)。 最好用于断言概念上属于一起的一组属性。您的第一个直觉将是“我想断言这是一个”。 例 您的特定示例不是最佳用例,因为使用质数和非质数进行检查是彼此独立的-如此之多,因此我建议为此

  • 我使用的是Laravel 5.4,其模型和表格结构如下: 一个用户可以有多个账户 一个帐号可以被多个用户共享 每个账户有多个持有量 因此,用户通过他们的许多帐户间接拥有许多资产。 我需要帮助在用户模型上定义一个称为“持有量”的关系,以获得适用于用户的所有持有量(基于他们所链接的账户)。 我尝试了很多不同的东西,在谷歌上花了很长时间。我可以接近BelongToMany和hasManyThrough,

  • 我使用Spring Integration的SFTP和出站通道适配器将文件上传到远程位置。当它将文件发送到一个SFTP位置时,它可以正常工作。然而,在我的代码中,我试图基于不同的标准发送到多个SFTP位置。 以下是我的设置-从Spring集成文件以下 我的问题是: 有人知道如何配置多个SFTP会话吗 谢谢

  • 我已经面临这个问题很多天了,请帮我解决。我正在使用线程同步实现生产者-消费者示例。我在这个传统节目中做了一些调整。我没有只使用一个队列对象,而是使用了两个队列对象。但程序不起作用。。(PS-我知道我可以通过只使用队列的一个对象来让这个程序工作,但如果我想使用两个队列对象呢??) 类队列{ } 类生产者实现Runnable{ } 类消费者实现可运行{ } 公共类测试队列{ }

  • 我试图添加多个标记,每一个都有自己的信息,当点击时会出现。当我尝试时,我很难得到infowindows来,它要么只显示一个标记,而没有InfoWindow。 谢谢,如果你需要更多的信息,请告诉我