当前位置: 首页 > 知识库问答 >
问题:

是否有Spring集成dsl错误处理的代码示例?

蔚学真
2023-03-14

JmsTests。java对理解代码的结构非常有帮助。代码位于https://github.com/spring-projects/spring-integration-java-dsl/blob/master/src/test/java/org/springframework/integration/dsl/test/jms/JmsTests.java

但是,对于错误处理,我会在进行过程中找到答案。是否有任何测试或参考代码显示了构造错误通道或使用子流委派错误处理的好方法?

为问题添加更多语境:

  1. jmsMessageDrivenFlow:从传入队列读取并打印负载。如果有错误,则将其发送到errorChannel()
  2. handleErrors():应该侦听errorChannel并将那里的任何内容发送到fatalErrorQueue

jmsMessageDrivenFlow()工作正常并打印消息。我希望只有在jmsMessageDrivenFlow期间出现错误时才会调用错误处理程序。但是,errorhandler()也会被调用,它会在致命队列中放置“Dispatcher未能传递消息”消息。

我希望在这种情况下甚至不应该调用错误通道,因为jmsMessageDrivenFlow()没有创建任何错误。显然,如果我不参加。errorChannel(errorChannel())在jmsMessageDrivenFlow()中,没有调用错误流,我只得到jmsMessageDrivenFlow()按预期执行。

@EnableIntegration
@IntegrationComponentScan
@Component
@Configuration
public class MessageReceiver {
    private static final Logger logger = LoggerFactory.getLogger(MessageReceiver.class);
    String fatalErrorQueue = "fatal";
    String incomingQueue = "incoming";

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Bean
    public Queue incomingQueue() {
        return new ActiveMQQueue(incomingQueue);
    }

    @Bean
    public MessageChannel errorChannel() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow handleErrors() {
        return IntegrationFlows
                .from("errorChannel")
                .handle((payload,headers) -> {
                    System.out.println("processing error: "+payload.toString());
                    return payload;
                })
                .handle(Jms.outboundAdapter(jmsMessagingTemplate.getConnectionFactory()).destination(fatalErrorQueue))
                .get();
    }

    @Bean
    public IntegrationFlow jmsMessageDrivenFlow() {
        return IntegrationFlows
                .from(
                        Jms.messageDriverChannelAdapter(jmsMessagingTemplate.getConnectionFactory())
                                .destination(incomingQueue)
                                .errorChannel(errorChannel())
                )
                .handle((payload,headers) ->{
                    System.out.println("processing payload: "+payload.toString());
                    return payload;
                })
                .get();
    }
}

执行日志:

2016-01-22 10:18:20,531 DEBUG DefaultMessageListenerContainer-1 ServiceActivatingHandler.handleMessage - ServiceActivator for [org.springframework.integration.dsl.LambdaMessageProcessor@522d5d91] (org.springframework.integration.handler.ServiceActivatingHandler#1) received message: GenericMessage [payload=SAMPLE QUEUE MESSAGE, headers={jms_redelivered=false, jms_correlationId=, jms_type=, id=78d5456e-4442-0c2b-c545-870d2c177802, priority=0, jms_timestamp=1453479493323, jms_messageId=ID:crsvcdevlnx01.chec.local-47440-1453310266960-6:2:1:1:1, timestamp=1453479500531}]
processing payload: SAMPLE QUEUE MESSAGE
2016-01-22 10:18:20,535 DEBUG DefaultMessageListenerContainer-1 DirectChannel.send - preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=dc87f71d-a38c-4718-a2e9-2fba2a7c847c, timestamp=1453479500535}]
2016-01-22 10:18:20,535 DEBUG DefaultMessageListenerContainer-1 ServiceActivatingHandler.handleMessage - ServiceActivator for [org.springframework.integration.dsl.LambdaMessageProcessor@5545b00b] (org.springframework.integration.handler.ServiceActivatingHandler#0) received message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=dc87f71d-a38c-4718-a2e9-2fba2a7c847c, timestamp=1453479500535}]
processing error: org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
2016-01-22 10:18:20,535 DEBUG DefaultMessageListenerContainer-1 DirectChannel.send - preSend on channel 'handleErrors.channel#0', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=f434348c-f659-1e88-2eba-3d467761ab47, timestamp=1453479500535}]
2016-01-22 10:18:20,535 DEBUG DefaultMessageListenerContainer-1 JmsSendingMessageHandler.handleMessage - org.springframework.integration.jms.JmsSendingMessageHandler#0 received message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=f434348c-f659-1e88-2eba-3d467761ab47, timestamp=1453479500535}]
2016-01-22 10:18:20,537 DEBUG DefaultMessageListenerContainer-1 DynamicJmsTemplate.execute - Executing callback on JMS Session: ActiveMQSession {id=ID:MACD13-60edd8d-49463-1453479492653-1:1:1,started=true} java.lang.Object@858db12
2016-01-22 10:18:20,568 DEBUG DefaultMessageListenerContainer-1 DynamicJmsTemplate.doSend - Sending created message: ActiveMQObjectMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@559da78d, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {timestamp=1453479500535}, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}
2016-01-22 10:18:20,570 DEBUG DefaultMessageListenerContainer-1 DirectChannel.send - postSend (sent=true) on channel 'handleErrors.channel#0', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=f434348c-f659-1e88-2eba-3d467761ab47, timestamp=1453479500535}]
2016-01-22 10:18:20,571 DEBUG DefaultMessageListenerContainer-1 DirectChannel.send - postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=dc87f71d-a38c-4718-a2e9-2fba2a7c847c, timestamp=1453479500535}]

共有1个答案

万坚壁
2023-03-14

好啊谢谢调查。。。但从大高度来看,一切都如期进行:

>

.handle((payload,headers) ->{
                System.out.println("processing payload: "+payload.toString());
                return payload;
            })

由于您返回任何内容,因此流期望标头中的输出通道回复通道ErrorMessage说明了这一点。

我讲述了完全修复错误的方法。对于您的情况,您只需更改。手柄((有效负载、收割台)-

因为在回复后的下一个endpoint确实存在这个问题。handle(),messagingGateway会对此做出正确的反应:

        this.messagingTemplate.convertAndSend(requestChannel, object, this.historyWritingPostProcessor);
    }
    catch (Exception e) {
        MessageChannel errorChannel = getErrorChannel();
        if (errorChannel != null) {
            this.messagingTemplate.send(errorChannel, new ErrorMessage(e));
        }

这就是为什么你的handleErrors()能够处理这个错误。

如果Jms.messageDriverChannelAdapter()上没有errorChannel(),我们最终会进入AbstractMessageListenerContainer

protected void executeListener(Session session, Message message) {
    try {
        doExecuteListener(session, message);
    }
    catch (Throwable ex) {
        handleListenerException(ex);
    }
}

其中默认的ErrorHandler是<代码>null,我们只看到:

logger.warn("Execution of JMS message listener failed, and no ErrorHandler has been set.", ex);

不是吗?

更新

你能解释为什么MessagingGateway将有效负载发送到errorChannel()只是因为我为非快乐路径scnenarios定义了一个错误通道吗?

???即使使用XML定义,您最终也会出现相同的错误。JavaDSL在后台使用相同的组件。我不会解释常规的Java代码:您可以看到try... catch围绕转换和发送,并且您在运行时的条件定义了有问题。这就是为什么您会得到DestinationResolutionException。我们无法在初始化阶段识别非快乐路径,因为您在中的返回。句柄()只能在运行时为幸运null。所以,从配置的角度来看,一切都很好。从另一方面来看,即使没有outputChannel,当我们有复制通道标题时,您仍然可以对请求/回复场景感到满意。

 类似资料:
  • 我已经使用最新的可用版本建立了一个新的Spring Boot Spring Integration Spring Integration Java DSL项目。项目构建正常,但当我运行应用程序时,我得到: 当前使用的依赖项如下: 错误可能是由于jar版本的错误组合吗?我不确定如何调试此错误。

  • 我正在尝试构建一个集成解决方案,其中 我的outboundgateway定义为 请求工厂在哪里 快乐之路运行良好,我面临的问题是不太快乐的道路。 当Api调用返回错误响应时。转换(exetrnaldto到dto)失败,客户端获得500 我想把error resposne json也翻译成我的json 我如何处理错误情况 我的问题是: 如何处理错误。 在错误条件下如何停止流不转换 如何将状态代码从出

  • 如果没有抛出异常,那么一切都按照我希望的方式工作。 当前,如果从该函数引发异常,我会得到以下异常:

  • 我有三种不同的系统。我使用Spring integration来同步所有这些系统中的数据。 系统2将调用服务方法来持久化数据,如果请求有效,则返回响应,否则抛出异常 我需要发送服务方法响应到系统1和系统3,只有当操作成功。调用服务方法后,根据服务方法响应,使用Transformer生成对系统3的请求。在transformer之后,我将请求放入mq队列。 更新的JMS出站代码 如果服务类失败,我需要

  • 场景可能是:我的期望可能是批量10个数据点,我想对{failed 5,pass 5}或其他什么给出响应。 我的逻辑是将批处理拆分为数据元素并进行验证 成功的验证将发送给aggreagtor, 失败的验证将抛出错误并通过错误通道拾取。 收件人列表路由器将错误通道作为输入通道,并连接2个过滤器,目的是过滤某些类型的错误直接发送响应(与用户输入无关的信息-服务器错误等),某些类型的客户端错误将转到聚合器

  • 在Spring integration中,我必须处理动态通道创建,但当我调试应用程序时,我看到不同通道之间的“阻塞”问题。 我知道是一个公共通道,在父上下文中共享,但如何为每个子上下文开发一个完整的独立场景?。公共网关是问题所在吗? 我在Spring integration flow async中看到了post错误处理,但对于每个子级,我都有一个完整的分离环境,我希望利用这些动态分离的优势。这可能