当前位置: 首页 > 知识库问答 >
问题:

基于AMQP的Spring Boot JmsListener在TextMessage上失败

楚修为
2023-03-14

我有一个Spring Boot应用程序,它在从ActiveMQ代理检索类型为TextMessage的JMS消息时遇到问题。

如果消费者尝试从代理检索消息,它无法自动将消息转换为TextMessage,而是将其视为ByteMessage。有一个JmsListener应该将队列中的消息作为TextMessage读取:

...
@JmsListener(destination = "foo")
public void jmsConsumer(TextMessage message) {
...

JmsListener产生如下警告,并删除消息:

org.springframework.jms.listener.adapter.ListenerExecutionFailedException: Listener method could not be invoked with incoming message
Endpoint handler details:
Method [public void net.aschemann.demo.springboot.jmsconsumer.JmsConsumer.jmsConsumer(javax.jms.TextMessage)]
Bean [net.aschemann.demo.springboot.jmsconsumer.JmsConsumer@4715f07]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [javax.jms.TextMessage] for org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@7c49d298, failedMessage=org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@7c49d298
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:118) ~[spring-jms-5.1.4.RELEASE.jar:5.1.4.RELEASE]

我提取了一个小样本应用程序来调试该问题:https://github.com/ascheman/springboot-camel-jms

现实生活中的生产者是一个利用Apache Camel的商业应用程序。因此,我几乎无法更改/定制制片人。我试图建立一个样本生产者,显示出同样的行为。

我可以以某种方式调整消费者以将消息视为TextMessage吗?

此外:有没有办法直接在Spring中通过编程从消息中检索额外的AMQP属性?当然,我仍然可以将消息读取为ByteMessage,并尝试解析属性。但我正在寻找一种更干净的方式,它由任何Spring API支持。到目前为止,Spring标题注释没有起到任何作用。

共有3个答案

壤驷棋
2023-03-14

如果从字节[]到字符串的转换有问题,请使用:

.convertBodyTo(String.class)

路线示例:

from(QUEUE_URL)
                .routeId("consumer")
                .convertBodyTo(String.class)
                .log("${body}")
                .to("mock:mockRoute");
景恩
2023-03-14

我有同样的错误,这是因为LazyResolutionMessageMessagingMessageConverter调用,这是MessageConverter的默认实现,它转换您的消息(实际上它没有,因为它是默认的):

return ((org.springframework.messaging.Message) payload).getPayload();

我已经实现了你想要的,最终我的消费者的工作方式是:

@JmsListener(destination = "${someName}")
public void consumeSomeMessages(MyCustomEvent e) {
  ....
}

我要做的是:

@Bean(name = "jmsListenerContainerFactory")
public DefaultJmsListenerContainerFactory whateverNameYouWant(final ConnectionFactory genericCF) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setErrorHandler(t -> log.error("bad consumer, bad", t));
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    factory.setConnectionFactory(genericCF);
    factory.setMessageConverter(
            new MessageConverter() {
                @Override
                public Message toMessage(Object object, Session session) {
                    throw new UnsupportedOperationException("since it's only for consuming!");
                }

                @Override
                public MyCustomEvent fromMessage(Message m) {
                    try {
                      // whatever transformation you want here...
                      // here you could print the message, try casting,
                      // building new objects with message's attributes, so on...
                      // example:
                      return (new ObjectMapper()).readValue(((TextMessage) m).getText(), MyCustomEvent.class);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }
    );
    return factory;
}

几个关键点:

>

请注意,在尝试转换/转换消息类型时,您还可以实现一个ErrorHandler来处理异常!

因为我想从SQS队列中消费,所以ConnectionFactory是一个Spring管理的bean,有Amazon的SQSConnectionFactory。请正确提供您的等价物。我的是:

@Bean("connectionFactory")
public SQSConnectionFactory someOtherNome() {
    return new SQSConnectionFactory(
            new ProviderConfiguration(),
            AmazonSQSClientBuilder.standard()
                    .withRegion(Regions.US_EAST_1)
                    .withCredentials(
                            new AWSStaticCredentialsProvider(
                                    new BasicAWSCredentials(
                                            "keyAccess",
                                            "keySecret"
                                    )
                            )
                    )
                    .build()
    );
}
倪振海
2023-03-14

在我跟随@AndyWilkinson的评论添加了transport之后,我曾与问题所有者面临过同样的问题。activemq中transportConnector上的transformer选项。如下图所示,问题得到了解决。

<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;transport.transformer=jms"/>
 类似资料:
  • 我想分散加工大批量。这个想法是使用Spring Batch在云中激发一堆AMQP消费者,然后加载廉价的任务(如项目ID)并将它们提交给AMQP交换。结果的书写将由消费者自己完成。 null

  • 我们在基于服务的应用程序中使用作为一种事件总线。send方法和消息处理程序基于spring-messaging的类(从Spring-Integration4.0(+)开始)。事件是对实体的更改,需要由其他服务接收。 问题是:spring-messaging类被spring-amqp视为任意对象负载,因为它不被识别为spring-amqp。这会导致以下问题: 默认消息格式是序列化的Java对象。sp

  • 问题内容: 我正在尝试使用AMQP 1.0连接到ActiveMQ代理,但是我想在我的应用程序代码中使用JMS。我对使用JMS感兴趣,主要是因为我希望开发人员能够使用他们已经熟悉的API。 我有在本地主机上运行的ActiveMQ 5.14.0和以下代码: 代码总是以相同的方式失败,并且在stacktrace中具有以下根本原因: 这发生在方法调用上。 如果我对ActiveMQ tcp端点运行相同的代码

  • 我们已经注意到,当错误消息被接收到Spring集成endpoint(从RabbitMQ)时,它们不会被重试。如果我们的业务代码(即接收消息的“服务方法”)出现了问题,导致它抛出异常,那么重试就会发生。 这是我们的配置: 方法的实现方式如下: 有人知道我如何在spring集成调用“服务方法”之前重新排队/重试失败的消息吗? 我们使用Spring Integration 5.4.6和Spring Bo

  • 我翻阅了rabbitmq文档,似乎rabbitmq不处理消息重新传递计数。如果我要手动确认/NACK消息,我需要将重试计数保存在内存中(例如,使用correlationId作为映射中的唯一键),或者在消息中设置我自己的头并重新发送它(从而将其放在队列的末尾) 然而,这是一个Spring处理的情况。具体来说,我指的是RetryInterceptorBuilder.stateful()。maxAtte

  • 问题内容: 我有一个网站,该网站完全基于Ajax(哈希导航)。 有没有一种方法可以使用Javascript刷新基于ajax的网站的Open Graph元标记?(当我单击一个链接时,标记和其中的值应更改) 问题答案: 否。OpenGraph标记必须出现在带有纯HTTP的GETable的HTML页面上。 这是因为当用户与OG对象进行交互(例如执行操作等)时,Facebook将在OG URL上执行HTT