当前位置: 首页 > 知识库问答 >
问题:

Storm使用Spring AMQP从RabbitMq读取消息

璩慎之
2023-03-14

我想使用rabbitMq队列中Storm喷口中的消息。

现在,我们使用Spring AMQP异步发送和接收来自RabbitMq的消息。

Spring AMQP提供了从队列读取消息的机制(创建监听器或使用注释@RabbitListner)。

问题是我可以让一个侦听器从队列中读取消息。但是,我如何将此消息发送到Storm群上运行的Storm喷口?

拓扑将启动一个集群,但在我的spout的nextTuple()方法中,我需要从这个队列中读取消息。这里可以使用Spring AMQP吗?

我有一个配置为从队列中读取消息的侦听器:

@RabbitListener(queues = "queueName")
public void processMessage(QueueMessage message) {

} 

如何将侦听器收到的上述消息发送到运行在集群上的spout。

或者,spout的nextTuple()方法中如何包含此方法?有可能吗

我在这里使用Java作为一种语言。

共有1个答案

赫连瀚
2023-03-14

您可以使用RabbitTemboard接收接收AndConversion方法之一按需读取消息(而不是消息驱动)。

默认情况下,如果队列中没有消息,它们将返回null。

编辑:

如果您设置了接收超时(在1.5或更高版本中可用),接收方法将在该时间阻塞(它在内部使用异步消费者并且不轮询)。

但它仍然没有侦听器那么有效,因为每个方法都会创建一个新的消费者;要使用侦听器,您需要使用nextTuple()中的一些内部阻塞机制(例如,阻塞队列)来等待消息。

 类似资料:
  • 如何从动物园管理员那里获得最后一次偏移时间?当使用Storm喷口阅读来自Kafka的消息时。上下文:Kafka 不断获取消息,使用者读取一段时间,然后由于任何原因关闭,然后使用者仅读取最新消息,但不读取上次偏移量读取

  • 我有一个java客户机,它在队列中发送jms消息(“队列请求”)。该消息包含一个int属性(“id”),其中包含唯一的客户端id号。消息正在处理中,然后进入另一个队列(“队列响应”)。如何让客户机等到具有其id的消息在队列中,然后读取它。我曾尝试使用侦听器并实现onMessage,但当收到消息时,我如何停止侦听?

  • 问题内容: 我需要编写一个风暴喷口以从端口读取数据。想知道在逻辑上是否可行。 考虑到这一点,我设计了一个简单的拓扑结构,该拓扑结构设计为只有一个喷嘴和一个螺栓。喷口将收集使用wget发送的HTTP请求,螺栓将显示请求-就是那样。 我的喷口结构如下: 我也实现了其余方法。 将其转换为拓扑并运行它时,发送第一个请求时出现错误: java.lang.RuntimeException:java.io.No

  • 我正在使用Spring AMQP与RabbitMQ一起工作。以下是我的配置: 正如您所看到的,prefetchCount是1000。 我想知道预取的消息是否在消费者中并行处理;也就是说,多个线程调用onMessage(消息消息)方法。或者消息是按顺序处理的;也就是说,一个线程迭代预取的消息,并以连续的方式调用每个消息的onMessage(消息消息消息)方法。 我应该注意到,处理的顺序对我来说并不重

  • 问题内容: 我有一个简单的Java生产者,如下所示 我正在尝试读取以下数据 但是消费者没有阅读来自kafka的任何消息。如果我在下面添加以下内容 然后,消费者开始阅读该主题。但是,每当使用者重新启动时,它都会从我不希望的主题开头读取消息。如果我在启动Consumer时添加以下配置 然后它从主题读取消息,但是如果使用者在处理所有消息之前重新启动,则它不会读取未处理的消息。 有人可以让我知道出了什么问

  • 我已经开发了一个使用apache storm使用kafka消息的应用程序,当我在eclipse中运行topology using in LocalCluster时,它可以正常工作,消息也可以正常使用,但是当我使用storm命令(bin\storm jar..\kafka-storm-0.0.1-SNAPSHOT.jar com.kafka_storm.util.topology storm kaf