当前位置: 首页 > 知识库问答 >
问题:

如何处理http响应异常并继续流,直到在Spring集成中聚合

敖永丰
2023-03-14

我在我的项目中使用Spring集成,使用的模式是分散收集。这里正在执行三个并行过程。流2是一个出站网关方法,如果该服务关闭,那么我想处理Httpstatus异常并希望发送null。实际上,如果该服务关闭,那么整个流程就会停止。但我想处理那个异常并发送null,然后想继续使用聚合方法并结束流程

以下是代码--

//配置文件

 @Configuration
        public class IntegrationConfiguration {
          @Autowired LionsServiceImpl lionsService;
        
          long dbId = new SequenceGenerator().nextId();
      //   Main flow
      @Bean
  public IntegrationFlow flow() {
    return flow ->
        flow.handle(validatorService, "validateRequest")
            .split()
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .scatterGather(
                scatterer ->
                    scatterer
                        .applySequence(true)
                        .recipientFlow(flow1())
                        .recipientFlow(flow2())
                        .recipientFlow(flow3()),
                gatherer ->
                    gatherer
                        .releaseLockBeforeSend(true)
                        .releaseStrategy(group -> group.size() == 2))
            .aggregate(prepareSomeRequest())
            .to(getDec());
  }

  //   Saving the request to the database
  @Bean
  public IntegrationFlow flow1() {
    return integrationFlowDefinition ->
        integrationFlowDefinition
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .handle(
                (payload, header) -> {
                  ObjectMapper mapper = new ObjectMapper();
                  try {
                    String jsonString = mapper.writeValueAsString(payload);
                    JsonNode request = mapper.readTree(jsonString);
                    JsonNode csID = request.get("ApplicationDetails").get("CustomerId");
                    int customerID = mapper.treeToValue(csID, Integer.class);

                    return lionService.saveRequest(
                        payload,
                        String.valueOf(dbId),
                        customerID,
                        ((SourceSystem) Objects.requireNonNull(header.get("sourceSystem")))
                            .getSourceSystemCode());
                  } catch (JsonProcessingException e) {
                    throw new RuntimeException(e);
                  }
                }
                )
            .nullChannel();
  }

  // 
  @Bean
  public IntegrationFlow flow3() {
    return integrationFlowDefinition ->
        integrationFlowDefinition
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .transform(
                message ->
                    loansService.someMethod(
                        (LionRequest) message));
  }

 //Here I'm calling a service through HTTPOUTBOUNDGATEWAY and if that called service is down then it throws HTTP STAtus error so I want to handle that and want to send null from this flow.
  @Bean
  public IntegrationFlow flow2() {
    return integrationFlowDefinition ->
        integrationFlowDefinition
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .handle(
                (payload, header) ->
                    loansService.someMethod2(
                        (LionRequest) payload,
                        (SourceSystem) (Objects.requireNonNull(header.get("sourceSystem")))))
            .handle(
                Http.outboundGateway(someurl)
                    .httpMethod(HttpMethod.POST)
                    .expectedResponseType(String.class)
                       );
  }


  @Bean
  public IntegrationFlow getDec() {
    return flow ->
        flow.handle(
            Http.outboundGateway(ServiceURL)
                .httpMethod(HttpMethod.POST)
                .expectedResponseType(CrResponse.class));
  }


  @Bean
  public MessageChannel replyChannel() {
    return MessageChannels.executor("output-flow", outputExecutor()).get();
  }

  @Bean
  public ThreadPoolTaskExecutor outputExecutor() {
    ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
    pool.setCorePoolSize(4);
    pool.setMaxPoolSize(4);
    return pool;
  }


//here I want to take out null from messages which is sent by flow2 if the called service is down and then I want to send null to someMethod2 method.

  public MessageGroupProcessor prepareSomeRequest() {
    return group -> {
      String cData;
      Object CDReq;

      List<Message<?>> messages = group.streamMessages().collect(Collectors.toList());

      ArrayList<Object> payloads = (ArrayList<Object>) messages.get(0).getPayload();

      if (payloads.get(0).toString().contains("tribv")) {
        cData= payloads.get(0).toString();
        logger.atInfo().log("Customer data from Data Sourcing Service : " + cData);
        CDReq= payloads.get(1);
      } else {
        cData= payloads.get(1).toString();
        logger.atInfo().log("Customer data from Data Sourcing Service : " + cData);
        CDReq = payloads.get(0);
      }

      Object fReq =
          lionservice.someMethod2(cData, CDReq);

      SomeRequest somreq= new SomeRequest();

      ObjectMapper mapper = new ObjectMapper();

      JsonNode req = mapper.valueToTree(freq);
      creditDecisionRequest.setfsfgg(req);
      creditDecisionRequest.setR("234565432");
      creditDecisionRequest.setD(String.valueOf(dbId));
      creditDecisionRequest.setCID("33333333");
      creditDecisionRequest.setSourceSystemCode(SourceSystem.ONE.getSourceSystemCode());

      return somreq;
    };
  }

网关

    @Gateway(requestChannel = "flow.input")
  void processLionRequest(
      @Payload Message lionRequest, @Header("sourceSystem") SourceSystem sourceSystem);

我可以使用类似于的东西吗。outboundgateway中的errorHandler()?但是我该怎么用呢?

 @Bean
  public IntegrationFlow flow2() {
    return integrationFlowDefinition ->
        integrationFlowDefinition
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .handle(
                (payload, header) ->
                    lionService.prepareSomeRequest(
                        (LionRequest) payload,
                        (SourceSystem) (Objects.requireNonNull(header.get("sourceSystem")))))
            .handle(
                Http.outboundGateway(someurl)
                    .httpMethod(HttpMethod.POST)
                    .expectedResponseType(String.class),
                c -> c.advice(expressionAdvice()));
  }



    @Bean
  public Advice expressionAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice =
        new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setSuccessChannelName("success.input");
    advice.setOnSuccessExpressionString("payload + ' was successful'");
    advice.setFailureChannelName("failure.input");
    advice.setOnFailureExpressionString("Failed");
    advice.setReturnFailureExpressionResult(true);
    advice.setTrapException(true);
    return advice;
  }

  @Bean
  public IntegrationFlow success() {
    return f -> f.handle(System.out::println);
  }

  @Bean
  public IntegrationFlow failure() {
    return f -> f.handle(System.out::println);
  }

  public String adviceOnFailure() {
    return "Failed";
  }

我正在做这样的事情,但得到如下错误-

[GenericMessage[payload=[org.springframework.expression.spel.SpelEvalue ationException: EL1008E:属性或字段'失败'无法在'org.springframework.messaging.support.GenericMessage'类型的对象上找到-可能不是公共的或无效的?],标头={序列号=1,序列详细信息=[[f596d446-9816-e13b-240c-f365338a5eb4,1,1]],回复通道=nullChannel, source ceSystem=ONE,序列大小=1, cor的Id=f596d446-9816-e13b-240c-f365338a5eb4, id=5592b1da-19fd-0567-c728-71b47d46b2d5,时间戳=1658382273446}]]

我希望字符串“Failed”出现在消息中,以便我可以获取该字符串并进一步处理。请帮忙。

共有1个答案

朱兴安
2023-03-14

消息传递中没有有效载荷概念,因此即使您处理错误,您也绝对不能返回null作为出站网关调用的回复。

请参阅框架中的请求处理程序建议模式,特别是一个表达式评估RequestHandlerAdvice实现。它确实为特定的消息处理程序处理错误,并可能返回补偿回复,然后您可以分别对其进行聚合和处理。

医生来了:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain

 类似资料:
  • 我正在使用Spring Boot 2和带有WebFlux反应式启动器的Spring 5开发一些反应式微服务。 我面临以下问题:我想处理调用另一个REST服务时收到的所有HTTP状态,并在收到一些错误的HTTP状态时抛出异常。例如,当我调用一个endpoint并收到404 HTTP状态时,我想抛出一个异常,并在某个ExceptionHandler类中处理该异常,就像在Spring 4中使用时一样。

  • 我试图抑制从http出站网关为非2XX状态代码生成的MessageHandlingException,并将控制优雅地返回到父流,以便按照成功流中的预期在回复通道上返回具有原始有效负载的消息。 原始代码: 我尝试在Http上使用,但它为客户端响应提供了一个句柄,并且原始负载不是它的一部分。 也尝试了表达建议的途径。 如果没有成功,则通知不会返回控件 也许最简单的方法就是把纸包起来。处理并捕获Mess

  • 问题内容: Flask中是否可以将响应发送给客户端,然后继续进行某些处理?我要完成一些簿记任务,但是我不想让客户等待。 请注意,这些实际上是我想做的非常快的事情,因此在这里实际上不适合创建新线程或使用队列。(这些快速的操作之一实际上是在作业队列中添加一些内容。) 问题答案: 可悲的是,将响应返回给客户端后,拆卸回调不执行: 卷曲时,您会注意到在显示响应之前有2s的延迟,而不是卷曲立即结束,然后在2

  • 如果spring集成webflux流中发生异常,则异常本身(带有stacktrace)通过MessagePublishingErrorHandler作为有效负载发送回调用方,该处理器使用来自“errorChannel”头的错误通道,而不是默认错误通道。 如何设置类似于WebExceptionHandler的错误处理程序?我想生成一个Http状态代码,并可能生成一个DefaultErrorAttri

  • 问题内容: 我已经使用jQuery和AJAX几周了,并且在调用完成后,我看到了两种“继续”脚本的不同方式:和。 从jQuery文档的提要中,我们得到: .done():说明:添加要解析Deferred对象时要调用的处理程序。 成功:(。ajax()选项):如果请求成功,则要调用的函数。 因此,在AJAX调用完成/解决之后,两者都要做一些事情。我可以随机使用其中之一吗?有什么区别?何时使用一种替代另

  • 问题内容: 我的代码中有一个try … except块,当抛出异常时。我真的只想继续编写代码,因为在这种情况下,所有内容仍然可以正常运行。问题是,如果您将except:块保留为空或不执行任何操作,则会出现语法错误。我不能使用continue,因为它不在循环中。我可以使用一个关键字来告诉代码继续运行吗? 问题答案: except Exception: pass 适用于pass语句的Python文档