当前位置: 首页 > 知识库问答 >
问题:

Kafka——Spring的云流

戴正阳
2023-03-14

我正在尝试用《Spring的云流》和《Kafka》。下面是示例代码。但它似乎没有任何作用。它总是创建一个名为“输出”的主题。但这些价值观尚未公布。

应用亚马尔

spring.cloud.stream:
  function:
    definition: streamSupplier
  bindings:
    streamSupplier-out-0:
      destination: numbers

我的目标就是创造价值。

@SpringBootApplication
@EnableBinding(Source.class)
public class CloudStreamDemoApplication {

    private AtomicInteger atomicInteger = new AtomicInteger();
    public static void main(String[] args) {
        SpringApplication.run(CloudStreamDemoApplication.class, args);
    }

    @Bean
    public Supplier<Integer> streamSupplier(){
        return () -> {
            System.out.println("Publishing : " + atomicInteger.incrementAndGet());
            return atomicInteger.get();
        };
    }
}

依赖性-2.2.6。释放

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

共有2个答案

曾宏毅
2023-03-14

注释@EnableBinding导致了上述问题。

阅读以下spring Docs摘录:

与以前版本的spring cloud stream依赖@EnableBinding和@StreamListener注释不同,上面的示例看起来与任何普通的spring启动应用程序没有什么不同。它定义了一个类型为Function的bean,并且它是。那么,它是如何成为spring cloud stream应用程序的呢?它变成了spring cloud stream应用程序,只需在类路径上存在spring cloud stream和绑定器依赖项以及自动配置类,就可以有效地将引导应用程序的上下文设置为spring cloud stream应用程序。在此上下文中,类型为Supplier、Function或Consumer的bean被视为事实上的消息处理程序,触发绑定到所提供绑定器公开的目的地,遵循特定的命名约定和规则,以避免额外的配置

杜俊楚
2023-03-14

您需要从类中删除@EnableBinding(Source.class)。如果存在这种情况,功能绑定将不会发生。

 类似资料:
  • 如何使用项目Reactor背压功能与Kafka粘合剂在斯佩林云流? 如果我使用这种方式,比发布者发送延迟1秒的消息,但消费者消费消息没有任何延迟。 有可能在春雨云流中使用cunsumer上的BackPereSure吗?

  • 我正在尝试将StreamsUncaughtExceptionHandler添加到我的Kafka流处理器。该处理器使用Kafka函数编写。我查看了Artem Bilan提供的将StreamsUncaughtExceptionHandler包括到我的服务中的建议,但我的异常从未被它捕获/处理。 配置Bean: 自定义异常处理程序: 流处理功能: 我希望UnCaughtExceptionHandler处

  • 如何使用新的Spring Cloud Stream Kafka功能模型发送消息? 不推荐的方式是这样的。 但是我如何以函数式风格发送消息呢? 应用yml公司 我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!

  • 我正在从Spring XD迁移到Spring Cloud Data Flow。当我寻找模块列表时,我意识到一些源码没有在Spring Cloud Flow中列出--其中一个是Kafka源码。 我的问题是为什么在spring cloud data flow中KAFKA源从标准源列表中删除?

  • 我正在开发一个路由器(事件代理)应用程序与Spring云流在Kafka,在功能范式。应用程序从不断输入的主题消耗,映射和过滤消息,然后应该根据一些输入字段将其发送到某个主题(一次只有单个消息,而不是多个结果)。 最好的方法是设置Spring。云流动发送到。输出消息的目标标题?如果是这样,我应该如何为生产者设置绑定?

  • 我正在开发一个应用程序,在该应用程序中,事件会导致spring data repository保存数据; 此代码可以引发各种异常,如DataIntegrityViolationException(运行时异常)。 处理此类异常和 生成带有导致此错误的有效负载的消息 例外, 允许生产者采取操作。