当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream Kafka通道在Spring Boot应用程序中不工作

狄睿
2023-03-14

我一直试图让入站SubscribableChannel和出站MessageChannel在我的spring boot应用程序中工作。

我已经成功设置了kafka频道并成功测试了它。

此外,我还创建了一个基本的spring-boot应用程序,用于测试从通道添加和接收内容。

我遇到的问题是,当我把相同的代码放在它所属的应用程序中时,消息似乎永远不会被发送或接收。通过调试很难确定发生了什么,但对我来说唯一不同的是频道名。在工作impl中,通道名称类似于非工作应用程序中的application.channel,其localhost:8080/channel。

我想知道是否有一些Spring引导配置阻塞或将通道的创建更改为不同的通道源?

有人有类似的问题吗?

应用程序.yml

spring:
  datasource:
    url: jdbc:h2:mem:dpemail;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
    platform: h2
    username: hello
    password: 
    driverClassName: org.h2.Driver    
  jpa:
    properties:
      hibernate:
        show_sql: true
        use_sql_comments: true
        format_sql: true

  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        email-in:
          destination: email
          contentType: application/json
        email-out:
          destination: email
          contentType: application/json

电子邮件

public class Email {

    private long timestamp;

    private String message;

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

}

绑定配置

@EnableBinding(EmailQueues.class)
public class EmailQueueConfiguration {    
}

接口

public interface EmailQueues {

    String INPUT = "email-in";
    String OUTPUT = "email-out";

    @Input(INPUT)
    SubscribableChannel inboundEmails();

    @Output(OUTPUT)
    MessageChannel outboundEmails();
}

控制器

 @RestController
    @RequestMapping("/queue")
    public class EmailQueueController {

        private EmailQueues emailQueues;


        @Autowired
        public EmailQueueController(EmailQueues emailQueues) {
            this.emailQueues = emailQueues;
        }

        @RequestMapping(value = "sendEmail", method = POST)
        @ResponseStatus(ACCEPTED)
        public void sendToQueue() {
            MessageChannel messageChannel = emailQueues.outboundEmails();
            Email email = new Email();
            email.setMessage("hello world: " + System.currentTimeMillis());
            email.setTimestamp(System.currentTimeMillis());

            messageChannel.send(MessageBuilder.withPayload(email).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());

        }

        @StreamListener(EmailQueues.INPUT)
        public void handleEmail(@Payload Email email) {
            System.out.println("received: " + email.getMessage());
        }
    }

我不确定使用Spring-Cloud的继承配置项目之一,Spring-Cloud-Sleuth可能会阻止它工作,但即使我删除它仍然不会。但与我的应用程序使用上述代码不同,我从未看到配置过ConsumeConfig,例如:

o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 100
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = consumer-2
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true

(这个配置是我在运行上述代码时在我的基本 Spring 启动应用程序中看到的,并且代码可以从 kafka 通道写入和读取)....

我假设我正在使用的一个库中有一些超Spring引导配置,创建一个不同类型的通道,我只是找不到该配置是什么。

共有2个答案

萧无尘
2023-03-14

好的,经过大量调试…我发现有东西正在创建一个测试支持绑定器(怎么还不知道),所以很明显,这是用来不影响将消息添加到真实通道的。

添加后

@SpringBootApplication(exclude = TestSupportBinderAutoConfiguration.class)

kafka 通道配置已正常工作,消息正在添加。想知道到底是什么在设置这个测试支持粘合剂会很有趣。我最终会找到那个傻瓜。

公羊渝
2023-03-14

您发布的内容包含许多不相关的配置,因此很难确定是否有任何东西妨碍了您。此外,当您说“…似乎消息从未被发送或接收…”日志中是否有任何异常?此外,请说明您正在使用的Kafka版本以及Spring Cloud Stream。现在,我确实尝试根据您的代码重现它(在清理了一点以只留下相关部分之后),并且能够成功发送/接收。

我的Kafka版本是0.11,Spring Cloud Stream 2.0.0。以下是相关代码:

spring:   
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        email-in:
          destination: email
        email-out:
          destination: email

@SpringBootApplication
@EnableBinding(KafkaQuestionSoApplication.EmailQueues.class)
public class KafkaQuestionSoApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaQuestionSoApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(EmailQueues emailQueues) {
        return new ApplicationRunner() {
            @Override
            public void run(ApplicationArguments args) throws Exception {
                emailQueues.outboundEmails().send(new GenericMessage<String>("Hello"));
            }
        };
    }

    @StreamListener(EmailQueues.INPUT)
    public void handleEmail(String payload) {
        System.out.println("received: " + payload);
    }


    public interface EmailQueues {
        String INPUT = "email-in";
        String OUTPUT = "email-out";

        @Input(INPUT)
        SubscribableChannel inboundEmails();

        @Output(OUTPUT)
        MessageChannel outboundEmails();
    }
}
 类似资料:
  • 我已经为Postgresql启用了复制,并且正在使用PGPool进行负载平衡。 我在使用HikariCP甚至Apache DBCP连接到Postgres时遇到了问题。 在SpringBoot应用程序中有没有使用PGPool的方法? 请查找堆栈跟踪: 2018-08-10 10:20:19.124信息37879----[main]com.zaxxer.hikari.hikaridatasource:

  • 我在WebSphere控制台上部署了WAR文件,并将其映射到数据源。我能够测试我用PostgreSQL服务器详细信息配置的数据源。但是我的应用程序没有连接到服务器。我是新来的WebSphere,谁能帮我配置基于下面context.xml文件的数据源。我的应用程序在tomcat中工作得很好,但在WebSphere中却不行。 我认为我在数据源配置中做错了什么。

  • 我试图在SpringMVC中运行SpringBoot应用程序,在SpringMVCPOM中添加SpringBoot应用程序依赖项,并扫描SpringBoot包,但我面临以下问题

  • 上面的代码片段显示了在一些Windows10系统中的Toast通知,而在其他一些Windows10系统中不起作用。请给我指点一下。提前道谢。 你好,维韦克

  • 我和Asterisk ARI一起工作。当我通过ARI创建通道时,我指定了应用程序名称,我可以毫无问题地将这些通道添加到网桥中。但是当我使用其他客户端(如Zoiper)进行调用,并尝试将这些通道添加到网桥时,我得到一个错误通道不在静态应用程序中。我不知道什么是停滞,我遵循这篇文章,并把以下扩展conf文件,但它仍然不工作: 那么,什么是静态应用程序,我如何能够桥接在ARI之外创建的频道?

  • 我是kubernetes的新手,需要在openshift平台上使用k8s confimap将springboot应用程序的属性文件外部化。我已将属性文件保存在git repo中,作为“greeter.message=Spring Bootmyapplication.properties已在库伯内特斯上挂载为卷!”并使用“oc create confimap myconfig--from-file=