当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream Kafka活页夹中的1个输出绑定

钮兴安
2023-03-14

在本页中,您不能只进行一次输出

但我只需要使用Spring Cloud Stream Kafka活页夹进行一次输出

那我该怎么办?

一些文章说使用org.springframework.cloud.stream.function.StreamBridge但它不适合我

我让StreamBridge向Kafka发送主题,但Kafka不会为我的Spring Boot应用程序生成主题

这是我的application.yml和生产主题代码

// producer Springboot application  



  spring.cloud.stream:
    kafka:
        binder:
            brokers: {AWS.IP}:9092
            zkNodes: {AWS.IP}:2181
    bindings:
        deliveryIncoming:
            destination : deliveryIncomingtopic

@Bean
public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
  // wanna produce deliveryIncomingtopic and send to Spring's Consumer
}
// Consumer Springboot application

spring : 
     cloud:
        stream:
            kafka:
                binder: 
                    brokers: {AWS.IP}:9092
                    zkNodes: {AWS.IP}:2181
            function:
                definition : deliveryIncoming;

            bindings:
                deliveryIncoming-in-0:
                    destination : deliveryIncomingtopic

    @Bean
    public Consumer<KStream<String, String>> deliveryIncoming() { 
        return input ->
                input.foreach((key, value) -> {
                    System.out.println("deliveryIncoming is playing");
                    System.out.println("Key: " + key + " Value: " + value);
                }); 
    }

对不起,我想我说的有点不清楚

我只想像下面这样做

生产(deliveryIncomingtopic)-

共有2个答案

满耀
2023-03-14

我只是想把事情弄清楚。您是否希望使用入站主题中的消息,然后生成/生成另一条消息到另一个输出主题?如果这是您的问题,那么我相信您在应用程序中遗漏了一些东西。yml。您需要为输出主题进行其他配置。例如:

由于您使用的是Spring Cloud Function(基于我在您的application.yml中看到的内容),因此您应该为输出主题添加更多配置,如下所示:

spring : 
     cloud:
        stream:
            kafka:
                binder: 
                    brokers: {AWS.IP}:9092
                    zkNodes: {AWS.IP}:2181
            function:
                definition: deliveryIncoming;deliveryOutput

            bindings:
                deliveryIncoming-in-0:
                    destination: deliveryIncomingtopic
                deliveryOutput-out-0:
                    destination: deliveryOutput

并为producer函数定义另一个bean:

@Bean
public Producer<KStream<String, String>> deliveryOutput() { 
   // do your necessary logic here to outbound your message
}

希望这能符合你的期望。

邓欣可
2023-03-14

如果是这种情况,那么您需要更改bean函数定义以返回java.util.Function而不是java.util.消费者

@Bean
public Function<KStream<String, String>, KStream<String, String>> deliveryIncoming() {
return input ->
                input.foreach((key, value) -> {
                    System.out.println("deliveryIncoming is playing");
                    System.out.println("Key: " + key + " Value: " + value);
                }); 
}

但是,AFAIK...您仍然需要在application.yml.中定义输出通道。您可以使用相同的名称和不同的后缀。如下所示:

deliveryIncoming-in-0:
   destination: <your_topic_name>
deliveryIncoming-out-0:
   destination: <your_topic_name>
 类似资料:
  • 我有以下处理器bean方法签名: 相关物业: Spring Cloud Kafka Stream版本Hoxton. SR4(Spring-Cloud-stream-binder-kafka-stream: jar: 3.0.4. RELEASE),嵌入式Kafka版本2.5.0。 我正在使用嵌入式Kafka测试我的拓扑: 我的测试显示,来自的消息按预期到达并到达主题“out0”中,但“out1”主

  • 我是Hadoop的新手,但这是我上个月的一个学习项目。 为了使这一点足够模糊,以便对其他人有用,让我先抛出基本目标……假设: < li >显然,您有一个大型数据集,包含数百万个基本ASCII文本文件。 < ul > < li >每个文件都是一个“记录” e. g. /user/hduser/data/customer1/YYYY-MM-DD, /user/hduser/data/customer2

  • 我正在尝试制作一个定制的spring cloud stream活页夹,但它无法自动注册: 活页夹实现: 配置类: Spring活页夹文件: application.yml 我已经按照spring cloud stream的指导方针创建了一个custome活页夹,但这不起作用。此外,使用@Configuration创建绑定bean会禁用我在类路径上添加的kafka绑定。

  • 我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff

  • 是否可以创建一个单例对象,并将其作为一个实例或集合或映射的一部分独立注入? 例如,我有两个类似的Instance: ... 我通过参数直接注入它们,在我的类中使用它们。但是,我也有一个用例,我想把它们都注入到一个集合中。 比如: 是否可以创建使用已定义实例的?如何在集合中单独使用相同的实例?

  • spring . cloud . stream . Kafka . binder . zknodes是必须的吗?如果价值缺失会发生什么?