当前位置: 首页 > 知识库问答 >
问题:

Spring与AMQP的集成:在错误通道上获取原始消息的自定义头

邰伟彦
2023-03-14

在我的项目中使用Spring与RabbitMQ的集成,我面临一个问题。该项目包括从队列接收消息、跟踪传入消息、使用服务激活器处理消息,以及跟踪服务激活器引发的响应或异常。以下是示例配置:

<!-- inbound-gateway -->
<int-amqp:inbound-gateway id="inboundGateway"
         request-channel="gatewayRequestChannel"
         queue-names="myQueue"
         connection-factory="rabbitMQConnectionFactory"
         reply-channel="gatewayResponseChannel"
         error-channel="gatewayErrorChannel"
         error-handler="rabbitMQErrorHandler"
         mapped-request-headers="traceId"
         mapped-reply-headers="traceId" />

<!-- section to dispatch incoming messages to trace and execute service-activator -->
<int:publish-subscribe-channel id="gatewayRequestChannel" />
<int:bridge input-channel="gatewayRequestChannel" output-channel="traceChannel"/>
<int:bridge input-channel="gatewayRequestChannel" output-channel="serviceActivatorInputChannel"/>

<!-- the trace channel-->
<int:logging-channel-adapter id="traceChannel" 
    expression="headers['traceId'] + '= [Headers=' + headers + ', Payload=' + payload+']'" logger-name="my.logger" level="DEBUG" />

<!-- service activator which may throw an exception -->
<int:service-activator ref="myBean" method="myMethod" input-channel="serviceActivatorInputChannel" output-channel="serviceActivatorOutputChannel"/>

<!-- section to dispatch output-messages from service-activator to trace them and return them to the gateway -->
<int:publish-subscribe-channel id="serviceActivatorOutputChannel" />
<int:bridge input-channel="serviceActivatorOutputChannel"
    output-channel="traceChannel" />
<int:bridge input-channel="serviceActivatorOutputChannel"
    output-channel="gatewayResponseChannel" />

<!-- section to dispatch exceptions from service-activator to trace them and return them to the gateway -->
<int:bridge input-channel="gatewayErrorChannel"
    output-channel="traceChannel" />
<int:bridge input-channel="gatewayErrorChannel"
    output-channel="gatewayResponseChannel" />

为了符合我的解释,我简化了代码。这个想法是跟踪进出服务激活器的输入和输出/错误消息。为了做到这一点,我使用了一个名为traeId的消息头。这个标识符用作相关性标识符,以便能够将请求消息与其响应相关联(这两个消息共享相同的traeId值)。

当服务激活器没有抛出异常时,一切都正常工作。但当抛出异常时,网关似乎生成了一条新消息,而没有我原来的traceId头。

稍微研究一下网关代码,我发现下面一段代码进入了MessagingGatewaySupportorg.springframework.integration.gateway.类:

private Object doSendAndReceive(Object object, boolean shouldConvert) {
...
    if (error != null) {
                if (this.errorChannel != null) {
                    Message<?> errorMessage = new ErrorMessage(error);
                    Message<?> errorFlowReply = null;
                    try {
                        errorFlowReply = this.messagingTemplate.sendAndReceive(this.errorChannel, errorMessage);
                    }
...
}

当异常发生时,似乎会创建一条新消息,将异常消息作为有效负载,并发送到网关的errorChannel。这里是我松开自定义标题的地方。

有没有办法在发生异常时保留我的自定义头?(也许有办法配置它,我可能错过了它...)。或者也许我没有以正确的方式实现我的流。如果是这样,欢迎任何评论或建议。

顺便说一下,我使用的是4.0.3版。spring集成核心工件的发布。

谢谢你的回答

编辑:正如Gary Russel所说,此示例缺少以下pus利/订阅队列配置

<int:publish-subscribe-channel id="gatewayErrorChannel"/>

共有1个答案

祁权
2023-03-14

错误通道上的消息是一个错误消息。它有两个属性:原异常和失败消息。错误消息没有得到失败消息的头。

你不能在不做额外工作的情况下将错误消息发送回网关。

通常,错误流会在返回响应之前对错误进行一些分析。

如果你想恢复一些自定义标题,你需要在错误流中添加一个标题充实器。

差不多

<int:header-enricher ...>
    <int:header name="traceId" expression="payload.failedMessage.headers.traceId" />
</int:header-enricher>

此外,您的配置有点奇怪,因为您在gatewayErrorChannel上有两个订户。除非是

 类似资料:
  • 我需要在我的Spring集成上下文中动态地将消息分配给MessageChannel。当我知道我想要的MessageChannel的名称时,我可以通过从上下文中获取MessageChannel bean来做到这一点。 我需要做的是通过编程查找在ChannelAdapter/服务中设置的消息通道的名称/id。 但是,MessageChannel API没有与之关联的getName()或getId()方

  • 我正在通过绑定到不同Webshpere MQ的入站和出站原子和JMS使用带有JTA支持的Spring集成。流程如下: JMS入站通道适配器收到消息 一些转变 输出队列的JMS出站通道适配器 发生错误时,收到消息 异常类型路由器将未处理的错误路由到自定义重新抛出服务,并将处理的错误路由到接收者列表路由器,该路由器将它们发送到2个错误队列 我的问题是,即使消息到达errorChannel下游(在已处理

  • 安全测试人员声称,我应该清理返回的JSON(即转义这些符号),因为这可能会给旧的浏览器带来一些问题(即在浏览器中执行此JS代码)。 但是生成错误消息的是SpringBoot框架, 我在这里没有太多的控制权。 当然,我可以将参数定义为String,并自己进行验证,但我怀疑这是否是正确的方法。我的参数定义为Integer,我希望它保持这种方式。 做这件事最简单的方法是什么?

  • 我有一个入站RabbitMQ通道适配器,每天成功处理3000条消息,但是偶尔我会在RabbitMQ管理控制台看到1条未包装的消息。这似乎仍然是这样。 我确实有一个重试建议链,可以重试3次,然后通过死信路由密钥移动到DLQ,这对大多数例外情况都很有效。 在过去的几周里,unacked已经发生了两次,有一次我能够进行线程转储,并看到int-http:outbound-gateway调用在等待http响

  • 我使用了spring io文档中列出的示例配置,它运行良好。 然而,当我用下游应用程序测试它时,我从Kafka那里消费并将其发布到下游。如果下游关闭,则消息仍在使用中,不会重播。 或者说,在使用kafka主题后,如果我在service activator中发现一些异常,我还想抛出一些异常,这些异常应该回滚事务,以便可以重播kafka消息。 简而言之,如果消费应用程序有一些问题,那么我想回滚事务,这

  • 场景可能是:我的期望可能是批量10个数据点,我想对{failed 5,pass 5}或其他什么给出响应。 我的逻辑是将批处理拆分为数据元素并进行验证 成功的验证将发送给aggreagtor, 失败的验证将抛出错误并通过错误通道拾取。 收件人列表路由器将错误通道作为输入通道,并连接2个过滤器,目的是过滤某些类型的错误直接发送响应(与用户输入无关的信息-服务器错误等),某些类型的客户端错误将转到聚合器