当前位置: 首页 > 知识库问答 >
问题:

Spring Integration DSL自定义错误通道不工作

狄赞
2023-03-14
// In Class - 1
 @Bean
    public MarshallingWebServiceInboundGateway marshallingWebServiceInboundGateway(BeanFactoryChannelResolver channelResolver, Jaxb2Marshaller marshaller) {

        MarshallingWebServiceInboundGateway wsInboundGateway = new MarshallingWebServiceInboundGateway();
        wsInboundGateway.setRequestChannel(channelResolver.resolveDestination("incomingRequest.input"));
        wsInboundGateway.setReplyChannel(channelResolver.resolveDestination("outgoingResponse.input"));
        wsInboundGateway.setErrorChannel(channelResolver.resolveDestination("errorChannel"));
        wsInboundGateway.setMarshaller(marshaller);
        wsInboundGateway.setUnmarshaller(marshaller);
        return wsInboundGateway;
    }


// In Class - 2
@Bean
    public IntegrationFlow incomingRequest() {
        return f -> f.<Object, Class<?>>route(t -> t.getClass(),
                mapping -> mapping.subFlowMapping(payloadType1(),
                        sf -> sf.gateway("type1.input", ConsumerEndpointSpec::transactional))
                        .subFlowMapping(payloadType2(),
                                sf -> sf.gateway("type2.input", ConsumerEndpointSpec::transactional)),
                        conf -> conf.id("router:Incoming request router"));
    }

// In Class - 3
    @Bean
    public IntegrationFlow type1() {
        IntegrationFlow integrationFlow = f -> f
                .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "error222", true))
                .<Type1>handle((p, h) -> authentication.authenticate(p),
                        conf -> conf.id("service-activator:Authenticate"))
                .transform(transformer::transformType1MsgToDataX,
                        conf -> conf.id("transform:Unmarshall type1 Message"))
                .enrichHeaders(h -> h.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_ID, "payload.id")
                        .headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_TYPE, "payload.messageType"))
                .handle((GenericHandler<DataX>) repository::successResponseMessage,
                        conf -> conf.id("service-activator:return success"))
                .channel("outgoingResponse.input")
                ;

        return integrationFlow;
    }

// In Class - 3
@Bean
    public IntegrationFlow error222Flow() {

        return IntegrationFlows.from("error222").handle("repository", "failureResponseMessage").get()

                ;

    }
 // In Class - 1
 @Bean
    public MarshallingWebServiceInboundGateway marshallingWebServiceInboundGateway(BeanFactoryChannelResolver channelResolver, Jaxb2Marshaller marshaller) {

        MarshallingWebServiceInboundGateway wsInboundGateway = new MarshallingWebServiceInboundGateway();
        wsInboundGateway.setRequestChannel(channelResolver.resolveDestination("incomingRequest.input"));
        wsInboundGateway.setReplyChannel(channelResolver.resolveDestination("outgoingResponse.input"));
        wsInboundGateway.setErrorChannel(channelResolver.resolveDestination("errorResponse.input"));
        wsInboundGateway.setMarshaller(marshaller);
        wsInboundGateway.setUnmarshaller(marshaller);
        return wsInboundGateway;
    }


// In Class - 2
@Bean
    public IntegrationFlow incomingRequest() {
        return f -> f.<Object, Class<?>>route(t -> t.getClass(),
                mapping -> mapping.subFlowMapping(payloadType1(),
                        sf -> sf.gateway("type1.input", ConsumerEndpointSpec::transactional))
                        .subFlowMapping(payloadType2(),
                                sf -> sf.gateway("type2.input", ConsumerEndpointSpec::transactional)),
                        conf -> conf.id("router:Incoming request router"));
    }

// In Class - 2
@Bean 
public IntegrationFlow errorResponse(){ 
    return f -> f.<MessageHandlingException, Object>route(t -> t.getFailedMessage().getHeaders().get("ABCDEF"), 
                        mapping -> mapping.subFlowMapping("ABCDEF", 
                                sf -> sf.gateway("customError.input", ConsumerEndpointSpec::transactional)), 
                                conf -> conf.id("router:error response prepare"));
}

// In Class - 3
    @Bean
    public IntegrationFlow type1() {
        IntegrationFlow integrationFlow = f -> f
                .enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true)) 
                .<Type1>handle((p, h) -> authentication.authenticate(p),
                        conf -> conf.id("service-activator:Authenticate"))
                .transform(transformer::transformType1MsgToDataX,
                        conf -> conf.id("transform:Unmarshall type1 Message"))
                .enrichHeaders(h -> h.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_ID, "payload.id")
                        .headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_TYPE, "payload.messageType"))
                .handle((GenericHandler<DataX>) repository::successResponseMessage,
                        conf -> conf.id("service-activator:return success"))
                .channel("outgoingResponse.input")
                ;

        return integrationFlow;
    }

// In Class - 3
@Bean
    public IntegrationFlow customError(){
        return f -> f.handle((GenericHandler<MessageHandlingException>)eventRepository::failureResponseMessage,
                                conf -> conf.id("service-activator:return failure"));
    }

我尝试了Artem的测试代码,它可以在这个场景中工作。如果我像下面那样将type1流转换为子流映射(我这样做,因为我怀疑我的子流代码块),错误流不能打印ABCDEF参数值。之后,我向子流映射添加另一个头(XYZTWR),但它也不能打印。

@Bean
public IntegrationFlow type1() {
    return f -> f.<String, String>route(t -> t.toString(), mapping -> mapping.subFlowMapping("foo",
            sf -> sf.gateway("fooFlow.input", ConsumerEndpointSpec::transactional).enrichHeaders(h -> h.header("XYZTRW", "XYZTRW", true))));
}

@Bean
public IntegrationFlow fooFlow() {
    return f -> f.enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true))
            .handle((p, h) -> {
                throw new RuntimeException("intentional");
            });
}

我的s.out是:

GenericMessage [payload=foo, headers={history=testGateway,type1.input, id=1fad7a65-4abe-c41d-0b22-36839a103269, timestamp=1503029553071}]

共有1个答案

涂羽
2023-03-14

当我们将消息转移到不同的线程执行器或队列通道时,errorchannel标头开始工作。否则,标准throwtry...catch在同一个调用堆栈中工作。

因此,在您的情况下,身份验证异常只是抛出给调用者-WS入站网关。这里您已经配置了全局错误通道。

我做了这个测试

@Configuration
@EnableIntegration
@IntegrationComponentScan
public static class ContextConfiguration {

    @Bean
    public IntegrationFlow errorResponse() {
        return IntegrationFlows.from(errorChannel())
                    .<MessagingException, Message<?>>transform(MessagingException::getFailedMessage,
                            e -> e.poller(p -> p.fixedDelay(100)))
                    .get();
    }

    @Bean
    public IntegrationFlow type1() {
            return f -> f
                    .enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true))
                    .handle((p, h) -> { throw new RuntimeException("intentional"); });
    }

    @Bean
    public PollableChannel errorChannel() {
        return new QueueChannel();
    }
}

@MessagingGateway(errorChannel = "errorChannel", defaultRequestChannel = "type1.input")
public interface TestGateway {

    Message<?> sendTest(String payload);

}

...

@Autowired
private TestGateway testGateway;

@Test
public void testErrorChannel() {
    Message<?> message = this.testGateway.sendTest("foo");
    System.out.println(message);
}
GenericMessage [payload=foo, headers={ABCDEF=ABCDEF, id=ae5d2d44-46b7-912d-17d4-bf2ee656140a, timestamp=1502999446725}]
 类似资料:
  • 我正在尝试为我的SpringCloudDataflow流创建一个自定义异常处理程序,以路由一些要重新排队的错误和其他要DLQ的错误。 为此,我使用了全局Spring集成“errorChannel”和基于异常类型的路由。 这是Spring集成错误路由器的代码: 错误路由器由每个流应用程序通过Spring Boot应用程序上的包扫描获取: 当它与本地 Spring Cloud Dataflow 服务器

  • Apache可以让网站管理员自己自定义对一些错误和问题的响应。 自定义的响应可以定义为当服务器检测到错误或问题时才被激活。 如果一个脚本崩溃并产生"500 Server Error"响应,那么这个响应可以被更友好的提示替换或者干脆用重定向语句跳到其他的URL(本地的或外部的)。 行为 老式的行为 Apache1.3 会响应一些对于用户没有任何意义的错误或问题信息,而且不会将产生这些错误的原因写入日

  • 所有的错误最终都会被 Tango.ErrHandler 进行处理。 你可以自定义你的错误处理方式来替代默认的。例如: var ( prefix = "<html><head>tango</head><body><div>" suffix = fmt.Sprintf("</div><div>version: %s</div></body></html>", tango.Version

  • Flask 自带了很顺手的 abort() 函数用于以一个 HTTP 失败代码 中断一个请求,他也会提供一个非常简单的错误页面,用于提供一些基础的描述。 这个页面太朴素了以至于缺乏一点灵气。 依赖于错误代码的不同,用户看到某个错误的可能性大小也不同。 通常的错误代码 下面列出了一些用户经常遇到的错误代码,即使在这个应用准确无误的情况下也可能发生: 404 Not Found 经典的“哎呦,您输入的

  • 404和500错误客户端和服务端都会通过error.js组件处理。如果你想改写它,则新建_error.js在文件夹中: import React from 'react' export default class Error extends React.Component { static getInitialProps({ res, err }) { const statusCod

  • 我收到一个错误 (节点:2632)未经处理的PromisejectionWarning:TypeError:无法读取模块处未定义的更新成员(C:\Users\Asus\Desktop\Section 50\commands\Main commands\Mod\member count.js:5:31)的属性“channels”。在客户端导出(C:\Users\Asus\Desktop\sectio