当前位置: 首页 > 知识库问答 >
问题:

在AsyncRabbitTemboard中回复消息后处理

米景辉
2023-03-14

在使用AsyncRabbitTemplate时,我在GZip/GUnzip消息处理方面遇到了问题。

使用同步模板设置时一切正常,如下所示:

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
    final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(jsonConverter());
    rabbitTemplate.setReplyTimeout(config.getRabbitSendAndReceiveReplyTimeout());
    rabbitTemplate.setReceiveTimeout(config.getRabbitSendAndReceiveReceiveTimeout());
    rabbitTemplate.setAfterReceivePostProcessors(new GUnzipPostProcessor(true));
    rabbitTemplate.setBeforePublishPostProcessors(new GZipPostProcessor(true));
    return rabbitTemplate;
}

但是,当我设置像这样的异步模板时:

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public AsyncRabbitTemplate rabbitTemplateAsync(final ConnectionFactory connectionFactory) {
    final AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate(connectionFactory));
    // need to manually start the reply listener container for some reason
    asyncRabbitTemplate.start();
    return asyncRabbitTemplate;
}

回复消息没有正确解压缩,我得到了这个错误消息

Caused by: java.io.UnsupportedEncodingException: gzip:UTF-8
    at java.lang.StringCoding.decode(Unknown Source) ~[?:1.8.0_192]
    at java.lang.String.<init>(Unknown Source) ~[?:1.8.0_192]
    at java.lang.String.<init>(Unknown Source) ~[?:1.8.0_192]
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertBytesToObject(AbstractJackson2MessageConverter.java:235) ~[spring-amqp-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:199) ~[spring-amqp-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.AsyncRabbitTemplate.onMessage(AsyncRabbitTemplate.java:576) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]

我已尝试为AsyncRabbitTemplate提供一个配置好的DirectReplyToMessageListenerContainer,但没有帮助

    final DirectReplyToMessageListenerContainer directReplyToMessageListenerContainer = new DirectReplyToMessageListenerContainer(
            connectionFactory);
    directReplyToMessageListenerContainer.setAfterReceivePostProcessors(new GUnzipPostProcessor(true));
    final AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate(connectionFactory),
            directReplyToMessageListenerContainer);

这只会导致以下错误:

[错误]2019-03-06 12:18:05.192[AMQP连接172.17.3.6:5672]CachingConnectionFactory.log-通道关闭:通道错误;协议方法:#方法(回复代码=406,回复文本=PRECONDITION_FAILED-快速回复消费者不存在,类id=60,方法id=40)

请注意,我通过采用SpringRabbit项目的一个分支,并将此构造函数添加到AsyncRabbitTemplate中,使事情得以顺利进行:

public IndigoAsyncRabbitTemplate(final RabbitTemplate template,
        final DirectReplyToMessageListenerContainer directReplyToContainer) {
    Assert.notNull(template, "'template' cannot be null");
    this.template = template;
    container = null;
    replyAddress = null;
    this.directReplyToContainer = directReplyToContainer;
    directReplyToContainer.setMessageListener(this);
}

那么,这是否需要对SpringRabbit库进行增强才能开始工作呢?或者,有没有一种方法可以让GUnzip在回复侦听器上工作,而不必跳过太多的障碍?

共有1个答案

涂泰平
2023-03-14

对,这必须作为框架的改进。在AsyncRabbitTemboard的情况下,我们只是忽略了ReceivePost处理器之后的这一事实。我们可以重新配置一个内部的DirectReplyToMessageListenerContainer,以便从提供的RabbitTemboard中使用在ReceivePost处理器之后

同时,您可以坚持使用常规的SimpleMessageListenerContainer注入。或者您可以尝试使用外部DirectReplyToMessageListenerContainer注入。

见本节:

/**
 * Construct an instance using the provided arguments. The first queue the container
 * is configured to listen to will be used as the reply queue. Replies will be
 * routed using the default exchange with that queue name as the routing key.
 * @param template a {@link RabbitTemplate}
 * @param container a {@link AbstractMessageListenerContainer}.
 */
public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container) {
    this(template, container, null);
}

关于这一事项的问题:https://github.com/spring-projects/spring-amqp/issues/920

 类似资料:
  • 我只是检查应用程序,正在做自动回复WhatsApp消息在后台。我也试着这么做,但不能在其中获得成功。我试过: 但它打开了WhatsApp应用程序:(而不是发送消息。 我经历了几个环节:问题1,问题2也有关于它的文章,但没有得到满意的答案。

  • 我有以下兔子听者: 我需要将listener配置为在它处理一条消息后等待15分钟,然后再接收下一条消息。不需要在此方法中等待。我所需要的只是在处理完一条后不接收任何消息。可以通过来完成,但我不确定这是否是实现这一点的最佳方法。对于这种情况有没有rabbitmq的配置?

  • 我正在使用NSeriveBus构建一个系统,它应该只在特定的时间段将消息发送给远程处理程序。到目前为止,我设法将所有消息放在一个处理队列上,并从那里检查远程处理程序的可用性,如果处理程序不可用,我就不会通过边界发送消息。要做到这一点,我正在使用 但国家安全局会继续努力。这不像其他侦听器将在几分钟内启动并运行,但它可能有几个小时的停机窗口,因此这并不完全有效。 想知道是否有办法让总线稍后重试消息,或

  • 问题内容: 我想在后台线程中运行一些Runnable。我想使用Handler,因为它便于延迟。我的意思是 凡 可运行 应当运行 后台 线程。是否可以创建这样的处理程序?是否在某个地方有“背景” Looper,或者该如何创建? PS我知道如何使用自定义类扩展Thread,但是比处理程序方式需要更多的编码工作。因此,请不要发布其他解决方案或类似的内容 如果Handler能以“干净”的方式做到这一点,我

  • 我想在后台线程中运行一些Runnable。我想使用Handler,因为它方便延迟。我的意思是 runnable应该在后台线程中运行。有可能创造这样的处理器吗?某个地方有没有“背景”Looper或者我怎么才能创建它? 附言:我知道如何使用自定义类扩展Thread来做到这一点,但它需要更多的编码工作,而不是以处理程序的方式进行。因此,请不要发布其他解决方案或类似内容 我只是想知道汉德勒是否能以“干净”

  • 我的流使用基于cron表达式的消息,我故意添加了一个groovy代码来抛出异常以测试JMS回滚。回滚不会将消耗的消息返回队列中。我在这里错过了什么吗? 这里是mule流,它应该在遇到异常后回滚mule消息。 这是此流引发的异常-