当前位置: 首页 > 知识库问答 >
问题:

在同一应用程序中声明生产者和消费者时,Spring cloud stream不会向Kafka发送消息

窦成荫
2023-03-14

目前,我让生产者和消费者在同一个Spring Boot应用程序中进行配置,但触发消息的Spring云流没有通过Kafka(我正在使用Kafka控制台消费者监控消息),但消费者仍然收到消息(使用与生产者相同的线程)。

如果我在应用程序中删除consumerHandler(@StreamListener),制作人就会成功地将消息发送给Kafka。

这个有什么配置吗?默认情况下,我需要将Spring云流消息发送到Kafka。

生产者和消费者配置:

@Component
public interface NotificationProcessor {
   
    String EMAIL_NOTIFICATION = "email-notification";
    @Input(EMAIL_NOTIFICATION)
    SubscribableChannel receiveEmail();
    @Output(EMAIL_NOTIFICATION)
    MessageChannel sendEmail();
}

以下是我的一些配置:

spring:
  cloud:
    stream:
      kafka:
        binder:
          autoAddPartitions: true
          brokers: ${KAFKA_BROKERS:localhost:9092}
          auto-create-topics: true
          configuration:
            auto.offset.reset: latest
      bindings:
        email-notification:
          group: ${EMAIL_GROUP:email-group-notification}
          destination: ${EMAIL_TOPIC:email-notification}
          contentType: application/json
          producer:
            partitionCount: 9
          consumer:
            partitioned: true
            concurrency: 3
      instance-count: 1
      instance-index: 0

触发发送消息的API:

@RestController
@RequestMapping("/api")
public class TestResource {
    private final Logger log = LoggerFactory.getLogger(TestResource.class);

    private final NotificationProcessor notificationProcessor;
    public TestResource(NotificationProcessor notificationProcessor) {
        this.notificationProcessor = notificationProcessor;
    }
 
    @ApiOperation(value = "Test api")
    @GetMapping(value = "/send-email2", produces = APPLICATION_JSON_VALUE)
    public ResponseEntity<Boolean> test2() {
        EmailMessage test =  EmailMessage.builder()
                .to(Arrays.asList(Receiver.builder().email("test@nomail.com").build())
                ).type(EContentType.JSON)
                .build();
        log.info("send email message to kafka");
        notificationProcessor.sendEmail().send(MessageBuilder.withPayload(test).build());
        return ResponseEntity.ok(Boolean.TRUE);
    }
}

消费者处理者:

@EnableBinding(NotificationProcessor.class)
public class NotificationProducer {

    private final Logger log = LoggerFactory.getLogger(NotificationProducer.class);

    public NotificationProducer(){}

    @StreamListener(NotificationProcessor.EMAIL_NOTIFICATION)
    public void receiveEmail(@Payload Message<EmailMessage> message)  {
        log.info("Receive email message from kafka");
        EmailMessage emailMessage = message.getPayload();
    }
}

共有1个答案

羊舌兴德
2023-03-14

从提供的信息中不清楚您要将信息发送到哪里。什么频道?默认情况下,通道是内部和直接的,因此,如果您发送到订阅的同一个通道,您将完全绕过message broker(即Kafka)。这可以解释这两种症状(没有代理和相同的html" target="_blank">线程)。

也就是说,基于注释的配置模型已被弃用。在过去的几年里,我们完全迁移到函数式编程模型,这要简单得多,并且旨在帮助您不要考虑内部实现,例如通道,因为它们实际上是内部使用的(代码和代理适配器之间的桥梁)。

还有一个新组件,它允许您向专门为您所拥有的场景设计的代理发送消息——StreamBridge。

无论如何,看看它并考虑重构你的应用程序。至少要确保发送到绑定到代理目标的通道,并订阅绑定到同一目标的另一个通道,从而确保到代理的往返。

最后但并非最不重要的一点是,我仍然很困惑,为什么你需要发送给代理,然后在同一个应用程序中订阅它?为什么要增加网络开销?

 类似资料:
  • 这是我的消费者: 所以当运行我的制作人时,它最终会出错。任何人都知道这意味着什么,如果这可能是错的。

  • 我使用的是Spring Cloud Stream和RabbitMQ,我在同一个应用程序中配置了生产者和消费者,但使用了两个不同的通道。问题是,我的应用程序有五个副本在Kubernetes上运行,但只有发送消息的pod会使用它们,其他四个不会使用。在RabbitMQ控制台上,没有收到任何消息,消息速率图不会更改。当我注释OrdersListener时,消息会成功发送到RabbitMQ集群。有人知道可

  • 在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断

  • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前