当前位置: 首页 > 知识库问答 >
问题:

Spring集成AMQP-消息偶尔取消,需要超时吗?

解阳荣
2023-03-14

我有一个入站RabbitMQ通道适配器,每天成功处理3000条消息,但是偶尔我会在RabbitMQ管理控制台看到1条未包装的消息。这似乎仍然是这样。

我确实有一个重试建议链,可以重试3次,然后通过死信路由密钥移动到DLQ,这对大多数例外情况都很有效。

在过去的几周里,unacked已经发生了两次,有一次我能够进行线程转储,并看到int-http:outbound-gateway调用在等待http响应getStatusCode()时被卡住

我在int-amqp:inbound通道适配器上有一个receive timeout=“59000”,我希望它能在线程超过超时的任何地方超时?

我现在注意到int-http:outbound-gateway上有一个reply-timeout属性,我应该设置它吗?

有什么想法值得赞赏吗?

    <int-amqp:inbound-channel-adapter id="amqpInCdbEvents"  channel="eventsAMQPChannel" channel-transacted="true"  transaction-manager="transactionManager" 
            queue-names="internal.events.queue" connection-factory="connectionFactory" 
            receive-timeout="59000" concurrent-consumers="${eventsAMQPChannel.concurrent-consumers}" 
            advice-chain="retryChain" auto-startup="false" /> 

     <int:channel id="eventsAMQPChannel" />

     <!--  CHAIN of processing for Asynch Processing of Events from intermediate Queue   -->
    <int:chain id="routeEventChain" input-channel="eventsAMQPChannel">
            <int:json-to-object-transformer type="xx.xx.xx.json.Event" object-mapper="springJacksonObjectMapper"/>
            <int:header-enricher>
                  <int:header name="originalPayload" expression="payload" overwrite="true"/>
                  <int:header name="message_id"      expression="payload.id" overwrite="true"/>
            </int:header-enricher>  
            <int:router expression="payload.eventType">
                <int:mapping value="VALUE"              channel="valueEventChannel"/>
                <int:mapping value="SWAP"               channel="swapEventChannel"/>
            </int:router>
    </int:chain>

    <int:channel id="valueEventChannel" />
    <int:channel id="swapEventChannel" />

    <int:chain id="valueEventChain" input-channel="valueEventChannel" output-channel="nullChannel">
            <int:transformer ref="syncValuationTransformer" />
            <int:object-to-json-transformer object-mapper="springJacksonObjectMapper" />
            <int:header-enricher>
                     <int:header name="contentType" value="application/json;charset=UTF-8" overwrite="true"/>
            </int:header-enricher>                  
            <int-http:outbound-gateway id="httpOutboundGatewayValuationServiceFinalValuation"                                                           
                    expected-response-type="java.lang.String"
                    http-method="POST"      charset="UTF-8"
                    extract-request-payload="true"                                          
                    url="${value.service.uri}/value"/>
    </int:chain>

共有1个答案

谭富
2023-03-14

回复超时是将回复发送到回复通道时的超时(如果它可以阻止-例如,已满的有界队列通道)。

int-http:出站网关调用被卡住等待超文本传输协议响应getStatusCode()

您可以在可以配置到出站适配器中的ClientHttp刚需工厂上设置客户端超时(readtimeout)...

/**
 * Create a new instance of the {@link RestTemplate} based on the given {@link ClientHttpRequestFactory}.
 * @param requestFactory HTTP request factory to use
 * @see org.springframework.http.client.SimpleClientHttpRequestFactory
 * @see org.springframework.http.client.HttpComponentsClientHttpRequestFactory
 */
public RestTemplate(ClientHttpRequestFactory requestFactory) {
    this();
    setRequestFactory(requestFactory);
}
 类似资料:
  • 我需要解决这个场景。我有两个amqp消费者设置来获取一条消息。 taskChannel是queuechannel,但一次只允许使用一条消息,因此没有并行处理。如果另一条消息花了太长时间才继续,我如何在超时后拒绝一条消息。那么这个消息将返回到队列,由另一个节点继续?我的意思是,这两个消费者预取了两条消息,但一次只能处理一条,所以如果第一条预取消息需要很长时间才能处理,那么如何释放第二条预取消息呢。

  • 我开发了spring批处理应用程序,该应用程序生成由json对象列表组成的amqp(rabbitmq)消息。消息具有包含一些元数据的标头。Spring cloud stream应用程序正在消费消息,我使用了功能性方法。如何访问标题<将消息头用于除路由之外的任何内容,这是一种糟糕的方法吗?

  • 我有一个集成应用程序,大部分工作,但注意到昨天一个消息丢失了。当时,service-activatorendpoint正忙于处理先前的消息。 以下是适用于该问题的配置。

  • 我想使用rabbitMq队列中Storm喷口中的消息。 现在,我们使用Spring AMQP异步发送和接收来自RabbitMq的消息。 Spring AMQP提供了从队列读取消息的机制(创建监听器或使用注释@RabbitListner)。 问题是我可以让一个侦听器从队列中读取消息。但是,我如何将此消息发送到Storm群上运行的Storm喷口? 拓扑将启动一个集群,但在我的spout的nextTup

  • 我们在RHEL 7.0 VM上部署了一个Java/spring/Tomcat应用程序,它使用AlejandRorivera/Embedded-RabbitMQ,一旦部署了war,它就启动Rabbitmq服务器,并连接到它。我们有多个队列用来处理和过滤事件。 流程如下所示: 我们接收到的事件->发布事件队列->侦听器类筛选事件->发布到另一个队列进行处理->我们发布到另一个队列进行日志记录。 问题是

  • 在Spring集成中使用出站网关时,我试图在JMS标头中发送回复Q详细信息。我了解到JIRA#INT-97中的增强功能在将Spring消息标头发送到JMS目标之前将其复制到JMS标头。 在将消息发送到出站网关之前,将消息头设置如下。message.getHeader(). setAtcm(JmsTargetAdapter.JMS_REPLY_TO, myReplyDestation); 但是我无法