我在玩春云流和RabbitMQ。
我有一个生成消息的RESTendpoint。
@SpringBootApplication
@EnableBinding(MyProcessor.class)
public class ProducerDemoApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerDemoApplication.class, args);
}
}
@RestController
class ProducerController {
@Autowired
MyProcessor myProcessor;
@RequestMapping(value = "sendmessage/{message}", method = RequestMethod.GET)
public String sendMessage(@PathVariable("message") String message) {
myProcessor.anOutput().send(MessageBuilder.withPayload(message).build());
return "sent";
}
}
interface MyProcessor {
String INPUT = "myInput";
@Output("myOutput")
MessageChannel anOutput();
}
通过另一个应用程序,我正在消费这些消息。
@StreamListener(MyProcessor.INPUT)
public void eventHandler(String message) {
System.out.println("************** Message received => "+message);
}
当两个应用程序都启动并运行时。我可以发布消息并在消费者处使用它。
我在以下场景中面临的问题:
我故意让消费者失望,并通过制作人发布消息。现在,当消费者启动时,它没有收到任何消息
我想RabbitMQ保证消息传递
Github链接https://github.com/govi20/producer-demo
https://github.com/govi20/consumer-demo
在使用者输入绑定上需要一个组。否则,它是匿名的,并绑定一个自动删除队列,该队列仅在使用者运行时存在。
正如我之前提到的,您在“myInput”中已经有了未命中配置,因为您没有@Input
配置,这会导致组件需要一个名为“myInput”的bean,但找不到。消费者启动期间的
错误。因此,在消费者端需要这样的东西
interface MyProcessor {
String INPUT = "myInput";
@Input("myInput")
MessageChannel myInput();
}
此外,如果您不定义group
,它会导致Rabbit端的匿名队列(类似于这样的myInput.anonymous.pZg03h0zQ2-SHLh1_QL8DQ
),这基本上会导致每次启动时队列的名称不同,因此
spring.cloud.stream.bindings.myInput.destination=myInput
spring.cloud.stream.bindings.myInput.group=myGroup
将导致队列名称<代码>myInput。myGroup是初创公司之间持久一致的。
此外,在生产者端myOutput
会创建一个没有路由到上述(或任何其他)队列的Rabbit Exchange,因此Rabbit会丢弃消息,因此您不可能接收来自生产者的消息,直到您在myOutput
交换和myInput.myGroup
队列之间创建路由。但是,如果您按照我上面描述的方式配置输入,spring-cloud-stream还将创建一个名为myInput
的交换,该交换将自动路由到myInput.myGroup
,因此如果您将生产者更改为发送到该目的地,您将收到有关消费者的消息。
我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:
我是Kafka的新手。我在网上读了很多关于Kafka制作人和Kafka消费者的说明。我成功地实现了前者,它可以向Kafka集群发送消息。然而,我没有完成后一个。请帮我解决这个问题。我看到我的问题像StackOverflow上的一些帖子,但我想更清楚地描述一下。我在虚拟盒子的Ubuntu服务器上运行Kafka和Zookeeper。使用1个Kafka集群和1个Zookeeper集群的最简单配置(几乎是
我不知道是怎么回事,我的java客户机消费者用@KafkaListener注释后没有收到任何消息。当我通过命令行创建消费者时,它可以工作。同样,Producer也能按预期工作(同样在java中)。有人能帮我理解这种行为吗? application.yml 生产者配置: 消费者配置: 制作人 Spring控制器: 这是我的控制台输出,正如您所看到的,它发送一条消息,但该方法不接收任何内容。如果我没有
我是Kafka的新手,我有一个使用Java Apache Camel库实现的Kafka消费者。我发现的问题是-消费者花了很长的时间(>15分钟)来处理很少的消息-这对于我们的用例来说是很好的。 需要一些配置帮助,因为相同的消息会在15分钟后重新发送,如果在15分钟内没有处理(我相信线程控制不会返回)。我想这可能是默认间隔,不确定这是哪一个属性。 那么,我必须在哪里修复配置 生产者级别,以便它不重新
Kafka消费者不接收在消费者开始之前产生的消息。 ConsumerRecords始终为空 虽然,如果我启动我的消费者比生产者比它接收消息。(Kafka-客户端版本2.4.1)
我想这个话题发生了什么...偏移坏了还是我不知道... 有人知道会发生什么吗?谢谢