当前位置: 首页 > 知识库问答 >
问题:

无法使用使用 MessageProducer.send() 创建的 JmsTemplate.receive() 检索消息

严元白
2023-03-14

我正在使用Spring Boot来启动嵌入式ActiveMQ代理,并且我正在使用spring-jms在运行时为主题动态注册JMSendpoint。我让这部分工作,当我创建一个 JMS 消息并将其发送到主题时,它最终会被我的听众使用。目前为止,一切都好。在侦听器内部,如果在消息中设置了 JMS 标头“JMSReplyTo”,我想向其他主题发送一条简单的 ACK 消息。如果是,我将创建一个消息生产者,并将目标设置为标头中设置的主题,然后发送 ACK 消息。代码似乎有效,或者至少我认为是这样因为我没有得到任何错误。但是,在我的测试用例中,在发送初始消息后,我正在尝试读取返回另一个主题的 ACK,并且调用超时。我正在使用 JmsTemplate 来发送原始消息,以及使用 ACK。我确定我错过了一些东西,但我不确定是什么。这是相关的Spring Java配置部分,显示了SessionAwareMessageListener:

@Bean(name = "sessionAwareMessageListener")
SessionAwareMessageListener<TextMessage> createSessionAwareMessageListener() {
  return new SessionAwareMessageListener<TextMessage>() {
    @Override
    public void onMessage(TextMessage message, Session session) throws JMSException {
      log.info(" Received: {} ", message.getText());

      // Prepare an ACK reply message
      final ActiveMQTextMessage textMessage = new ActiveMQTextMessage();
      textMessage.setText("ACK");

      // Send the ACK message back to the replyTo address of the incoming
      // message. 
      if (message.getJMSReplyTo() != null) {
        log.info("Sending ACK message to {}", message.getJMSReplyTo());
        // session.createTopic(message.getJMSReplyTo().toString());
        MessageProducer producer = session.createProducer(message.getJMSReplyTo());
        producer.send(textMessage);
      }
    }
  };
}

在我的 Spock 集成测试中,我创建了一个 MessageProducer 并通过 JmsTemplate 实例发送消息:

...other stuff here...
MessageCreator messageCreator = new MessageCreator() {
  @Override
  public Message createMessage(Session session) throws JMSException {
    log.info("Sending text message")
    TextMessage message = session.createTextMessage("This is a test")
    message.setJMSReplyTo(new ActiveMQTopic("test"))
    return message
  }
}

... code that registers the JMS endpoint goes here...

JmsTemplate jmsTemplate = context.getBean(JmsTemplate)
assert jmsTemplate
jmsTemplate.send("bar", messageCreator)

消息发送成功,最终由前面所示的 SessionAwareMessageListener 处理。但是当我尝试使用 JmsTemplate.receive() 检索 ACK 消息时,它会阻塞,直到调用超时并返回 null。没有在 ACK 消息的主题上注册特定的侦听器或动态创建的endpoint,因此我认为可以使用 JmsTemplate.receive() 使用消息。显然我在这里做错了什么,但我真的想不通是什么。这是我尝试接收 ACK 的代码片段:

jmsTemplate.setReceiveTimeout(2000L)
Message received = jmsTemplate.receive("test")
assert received instanceof ActiveMQTextMessage
log.info("Received message: ${received}")
assert ((ActiveMQTextMessage) received).text == "ACK"

我将不胜感激能提供更好的方法。在这一点上,我真的只是在做实验。我知道我需要动态注册终结点,并可能将消息转发到其他主题。谢谢。

共有1个答案

向安福
2023-03-14

使用主题进行请求/回复消息传递通常不是一个好主意,尤其是在回复端。

  1. 回复可能会在 receive() 订阅主题之前到达;代理将丢弃此类消息,除非该订阅是持久的。
  2. 您通常只希望将回复发送给发起人。

使用队列而不是主题,尤其是对于回复。

考虑使用 JmsTemplate 进行回复,而不是创建自己的生产者;如果您使用的是 DefaultMessageListenerContainer 和会话事务处理(应始终使用 DMLC 的会话事务以避免丢失消息),则模板将自动使用相同的会话

如果您希望对一条消息有多个响应,则可以在请求端使用主题,但同样,除非订阅是持久的,否则回复的数量将是不确定的 - 您无法在生产者端告诉有多少消费者会收到它。

 类似资料:
  • 我使用Symfony HttpClient调用外部api。当stastusCode为200时,我可以使用方法来检索API响应。如果API响应为400,则抛出ClientExc0019,并且无法获取外部API消息。

  • 我是Flink的新手,今天我遇到了一个奇怪的情况。 我运行Kafka服务器,然后使用confluent producer发送消息。使用consumer,我得到了正确的信息,但在应用程序中,我不能。我使用此图像设置message broker 我用这个向Kafka服务器发送消息 我发送的消息是 我用这个来听Kafka的留言 这是我的密码 当我将kafka源代码更改为KafkaSource时 user

  • 我在data hub/src/main/ml config/database fields/final-database.xml中添加了几个元素范围索引 并运行./gradlew mlDeploy-PenvironmentName=local--info 它成功运行,但没有创建添加到xml中的元素范围索引。 MarkLogic版本10.0.3数据中心版本5.1 是否不支持此xml配置? 我怎样才能

  • 问题内容: 我无法在数据库(mySQL)中创建表,使用并尝试使用以下命令输入未来表的名称: 然后,在用户输入表名称之后,我尝试构造并调用该语句: 如果我尝试不输入名称就执行它(如常量字符串:“ CREATE TABLE newtable(…)”,但我需要输入名称),它将很好地工作。 问题答案: 阅读表名后,您将必须格式化字符串,例如: 然后创建像:

  • 作为一个应用程序的一部分,我正在工作,我想提供一个假期列表。我试图使用谷歌API来做到这一点。在编写代码之前,我只想看看返回的JSON是什么样子的。 我在谷歌建立了一个帐户,得到了一个API密钥,并被设置为使用谷歌日历API。我对获取假期列表所需的HTML进行了研究,但似乎无法使其工作。我浏览了所有的日历API文档,但没有找到任何关于检索假日列表的内容。以下是我尝试的一些URL和收到的回复。我在网

  • 我试图构建一个Spring Boot REST API,从MySQL DB中获取数据。这是我在Spring Boot REST API应用程序的存储库代码中定义的JPA查询方法: 这是我要查询的表的详细信息:

  • 我正在使用java高级rest客户端在我的应用程序中集成elasticsearch,但无法创建索引 在某个地方,我发现要执行请求,我们需要使用index(请求)方法(我在代码中已注释),但它表明index(请求)方法已从RestHighLevelClient类型中弃用。 这是我的代码:

  • 使用JavaAPI设置源和设置的唯一方法是使用这样的代码(这是一个只有一个@test方法的简单测试类): 当我第一次运行它时,它就起作用了。但当我第二次运行它时,我得到: JAVAlang.IllegalStateException:未能加载ApplicationContext allable.java:12DefaultCacheAware ContextLoaderDorg.springfra