当前位置: 首页 > 知识库问答 >
问题:

当消费者在spring cloud stream应用程序中启动时接收消息

狄誉
2023-03-14

我在玩春云流和RabbitMQ。

我有一个生成消息的RESTendpoint。

@SpringBootApplication
@EnableBinding(MyProcessor.class)
public class ProducerDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerDemoApplication.class, args);
    }
}

@RestController
class ProducerController {

    @Autowired
    MyProcessor myProcessor;

    @RequestMapping(value = "sendmessage/{message}", method = RequestMethod.GET)
    public String sendMessage(@PathVariable("message") String message) {
        myProcessor.anOutput().send(MessageBuilder.withPayload(message).build());
        return "sent";  
    }

}

interface MyProcessor {
    String INPUT = "myInput";

    @Output("myOutput")
    MessageChannel anOutput();
}

通过另一个应用程序,我正在消费这些消息。

@StreamListener(MyProcessor.INPUT)
public void eventHandler(String message) {
    System.out.println("**************  Message received => "+message);
}

当两个应用程序都启动并运行时。我可以发布消息并在消费者处使用它。

我在以下场景中面临的问题:

我故意让消费者失望,并通过制作人发布消息。现在,当消费者启动时,它没有收到任何消息

我想RabbitMQ保证消息传递

Github链接https://github.com/govi20/producer-demo
https://github.com/govi20/consumer-demo

共有2个答案

徐秋月
2023-03-14

在使用者输入绑定上需要一个组。否则,它是匿名的,并绑定一个自动删除队列,该队列仅在使用者运行时存在。

花品
2023-03-14

正如我之前提到的,您在“myInput”中已经有了未命中配置,因为您没有@Input配置,这会导致组件需要一个名为“myInput”的bean,但找不到。消费者启动期间的错误。因此,在消费者端需要这样的东西

 interface MyProcessor {
    String INPUT = "myInput";

    @Input("myInput")
    MessageChannel myInput();
}

此外,如果您不定义group,它会导致Rabbit端的匿名队列(类似于这样的myInput.anonymous.pZg03h0zQ2-SHLh1_QL8DQ),这基本上会导致每次启动时队列的名称不同,因此

spring.cloud.stream.bindings.myInput.destination=myInput
spring.cloud.stream.bindings.myInput.group=myGroup

将导致队列名称<代码>myInput。myGroup是初创公司之间持久一致的。

此外,在生产者端myOutput会创建一个没有路由到上述(或任何其他)队列的Rabbit Exchange,因此Rabbit会丢弃消息,因此您不可能接收来自生产者的消息,直到您在myOutput交换和myInput.myGroup队列之间创建路由。但是,如果您按照我上面描述的方式配置输入,spring-cloud-stream还将创建一个名为myInput的交换,该交换将自动路由到myInput.myGroup,因此如果您将生产者更改为发送到该目的地,您将收到有关消费者的消息。

 类似资料:
  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:

  • 我是Kafka的新手。我在网上读了很多关于Kafka制作人和Kafka消费者的说明。我成功地实现了前者,它可以向Kafka集群发送消息。然而,我没有完成后一个。请帮我解决这个问题。我看到我的问题像StackOverflow上的一些帖子,但我想更清楚地描述一下。我在虚拟盒子的Ubuntu服务器上运行Kafka和Zookeeper。使用1个Kafka集群和1个Zookeeper集群的最简单配置(几乎是

  • 我不知道是怎么回事,我的java客户机消费者用@KafkaListener注释后没有收到任何消息。当我通过命令行创建消费者时,它可以工作。同样,Producer也能按预期工作(同样在java中)。有人能帮我理解这种行为吗? application.yml 生产者配置: 消费者配置: 制作人 Spring控制器: 这是我的控制台输出,正如您所看到的,它发送一条消息,但该方法不接收任何内容。如果我没有

  • 我是Kafka的新手,我有一个使用Java Apache Camel库实现的Kafka消费者。我发现的问题是-消费者花了很长的时间(>15分钟)来处理很少的消息-这对于我们的用例来说是很好的。 需要一些配置帮助,因为相同的消息会在15分钟后重新发送,如果在15分钟内没有处理(我相信线程控制不会返回)。我想这可能是默认间隔,不确定这是哪一个属性。 那么,我必须在哪里修复配置 生产者级别,以便它不重新

  • Kafka消费者不接收在消费者开始之前产生的消息。 ConsumerRecords始终为空 虽然,如果我启动我的消费者比生产者比它接收消息。(Kafka-客户端版本2.4.1)

  • 我想这个话题发生了什么...偏移坏了还是我不知道... 有人知道会发生什么吗?谢谢