当前位置: 首页 > 知识库问答 >
问题:

如何根据HttpOutBoundGateway失败的Httpstatus(401,400)重试消息

宋安晏
2023-03-14

基本上,我的用例是在HttpOutboundGateway请求中出现401时重试http请求。该请求来自jms代理,进入集成流。

 @Bean
  IntegrationFlow bank2wallet(ConnectionFactory jmsConnectionFactory,
      MessageHandler creditWalletHttpGateway) {
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
        .destination(cp.getTransactionIn()))
        .<String, CreditRequest>transform(
            request -> new Gson().fromJson(request, CreditRequest.class))

        .enrichHeaders((headerEnricherSpec -> {
          // Todo get token from cache
          headerEnricherSpec.header(HttpHeaders.AUTHORIZATION, String.join(" ", "Bearer", ""));
          headerEnricherSpec.header(HttpHeaders.ACCEPT, "application/json");
          headerEnricherSpec.header(HttpHeaders.CONTENT_TYPE, "application/json");
        }))
        .handle(creditWalletHttpGateway, (e) -> e.advice(retryAdvice()))
        .get();
  }


@Bean
  MessageHandler creditWalletHttpGateway( @Value("${api.base.uri:https:/localhost/v3/sync}") URI uri) {
    HttpRequestExecutingMessageHandler httpHandler = new HttpRequestExecutingMessageHandler(uri);
    httpHandler.setExpectedResponseType(CreditResponse.class);
    httpHandler.setHttpMethod(HttpMethod.POST);
    return httpHandler;
  }

 @Bean
  RequestHandlerRetryAdvice retryAdvice() {
    RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
    requestHandlerRetryAdvice.setRecoveryCallback(errorMessageSendingRecoverer());
    return requestHandlerRetryAdvice;
  }

  @Bean
  ErrorMessageSendingRecoverer errorMessageSendingRecoverer() {
    return new ErrorMessageSendingRecoverer(recoveryChannel());
  }

  @Bean
  MessageChannel recoveryChannel() {
    return new DirectChannel();
  }

  @Bean
  MessageChannel retryChannel() {
    return new DirectChannel();
  }

  @Bean
  IntegrationFlow handleRecovery() {
    return IntegrationFlows.from("recoveryChannel")
        .log(Level.ERROR, "error", m -> m.getPayload())
        .<RuntimeException>handle((message) -> {

          
          MessagingException exception = (MessagingException) message.getPayload();
          Message<CreditRequest> originalCreditRequest = (Message<CreditRequest>) exception.getFailedMessage();
          // String token = gateway.getToken(configProperties);
          String token = UUID.randomUUID().toString();
          Message<CreditRequest> c = MessageBuilder.fromMessage(originalCreditRequest)
              .setHeader(ApiConstants.AUTHORIZATION, String.join(" ", "Bearer", token))
              .copyHeaders(message.getHeaders())
              .build();

          retryChannel().send(c);

        })
        .get();
  }

  @Bean
  IntegrationFlow creditRequestFlow() {

    return IntegrationFlows.from(retryChannel())
        .log(Level.INFO, "info", m ->  m.getPayload())
        .handle(Http.outboundGateway("https://localhost/v3/sync")
        .httpMethod(HttpMethod.POST)
        .expectedResponseType(CreditResponse.class))
        .get();

  }

标头使用适当的http标头进行了丰富,然后我有一个建议,使用默认的简单策略重试请求,RequestHandlerAdvice方法的问题是,它将HandlerRecovery流中的异常消息默认为非HttpException类(MessageException),因此我无法检查HttpStatus代码以重新路由消息。所以我的问题基本上是如何设计一个基于HttpStatus 401重试HttpOutBoundRequest的流。

共有1个答案

龚沛
2023-03-14

我通过引入一个网关来进行出站http调用,并使用递归方式对其进行管理,从而解决了这个问题

@MessagingGateway
public interface B2WGateway {

    /**
     *
     * @param message
     * @return
     */
    @Gateway(requestChannel = "credit.input")
    CreditResponse bankToWallet(Message<CreditRequest> message);


}

然后隔离http出站集成流

 @Bean
  IntegrationFlow credit() {

    return f -> f.log(Level.INFO, "info", m -> m.getHeaders())
        .handle(Http.outboundGateway(configProperties.getBankToWalletUrl())
            .httpMethod(HttpMethod.POST)
            .expectedResponseType(CreditResponse.class)
        .errorHandler(new ResponseErrorHandler() {
          @Override
          public boolean hasError(ClientHttpResponse clientHttpResponse) throws IOException {
            return clientHttpResponse.getStatusCode().equals(HttpStatus.UNAUTHORIZED);
          }

          @Override
          public void handleError(ClientHttpResponse clientHttpResponse) throws IOException {
              throw new AuthenticationRequiredException("Authentication Required");
          }
        }));
  }

然后将消息解析为handleRecovery,以便在获得令牌刷新后发送消息

@Bean
  IntegrationFlow handleRecovery() {
    return IntegrationFlows.from("recoveryChannel")
        .log(Level.ERROR, "error", m -> m.getPayload())
        .<RuntimeException>handle((p, h) -> {
          MessageHandlingExpressionEvaluatingAdviceException exception = (MessageHandlingExpressionEvaluatingAdviceException) p;
          Message<CreditRequest> originalCreditRequest = (Message<CreditRequest>) exception
              .getFailedMessage();
          // String token = gateway.getToken(configProperties);
          String token = UUID.randomUUID().toString();
          Message<CreditRequest> c = MessageBuilder.fromMessage(originalCreditRequest)
              .setHeader(ApiConstants.AUTHORIZATION, String.join(" ", "Bearer", token))
              .copyHeaders(h)
              .build();
          return c;
        })
        .channel("credit.input")
        .get();
  }

然后修改了流的初始值,以使用网关服务和表达式建议。

 @Bean
  IntegrationFlow bank2wallet(ConnectionFactory jmsConnectionFactory) {
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
        .destination(cp.getTransactionIn()))
        .<String, CreditRequest>transform(
            request -> new Gson().fromJson(request, CreditRequest.class))

        .enrichHeaders((headerEnricherSpec -> {
          // Todo get token from cache
          headerEnricherSpec.header(HttpHeaders.AUTHORIZATION, String.join(" ", "Bearer", ""));
          headerEnricherSpec.header(HttpHeaders.ACCEPT, "application/json");
          headerEnricherSpec.header(HttpHeaders.CONTENT_TYPE, "application/json");
        }))
        .handle((GenericHandler<CreditRequest>) (creditRequest, headers) -> gateway
            .bankToWallet(MessageBuilder.withPayload(creditRequest)
                .copyHeaders(headers)
                .build()), (e) -> e.advice(retryAdvice()))
        .get();
  }

Spring集成的启示-管理超文本传输协议出站适配器调用中的401错误

 类似资料:
  • 我的处理来自的消息。周期性地,按摩无法处理,消费者抛出异常。不管怎样,消费者还是会做出补偿。在Kafka中,我能区分成功消息和失败消息吗?我想,我不能。这是真的吗?如果这是真的,我有一个主要问题: 如何重试失败消息?我知道一些方法,但我不确定它们是否正确。 1) 将“偏移”更改为“提前”。但通过这种方式,成功消息也会重试。 2) 当我捕捉到异常时,我会将此消息发送到另一个主题(例如错误主题)。但这

  • 我在使用新的0.3.0-beta版WebJobs SDK的WebJob中有以下逻辑。当我的代码处理消息失败时,Azure仪表板将显示聚合异常(这是有意义的,因为这是异步的)。但是,它不会重试处理消息。 我能找到的非常小的留档表明消息应该在失败后10分钟内重试。新的SDK不是这样吗? 我得到的异常源于SQL超时异常(在我的代码中是针对SQL Azure的db查询):

  • 我正在使用带有ActiveMQ的JMS。我在一台服务器上运行ActiveMQ代理,它创建一个队列和一个消息使用者,用于接收来自其他服务器的消息。如果另一个服务器试图向队列发送消息,但由于某种原因失败(比如服务器之间的网络中断),那么我不希望该服务器重试发送消息。只发送一次,如果失败,不要重试。这在ActiveMQ/JMS中可能吗?javax.JMS.session类有3种确认模式(auto_ack

  • 我正在尝试使用reamer-kafka来消耗消息。其他一切都很好,但我想为失败的消息添加重试(2)。spring-kafka已经默认重试失败记录3次,我想使用reamer-kafka实现相同。 我用SpringKafka作为反应Kafka的包装。以下是我的消费者模板: 让我们考虑消耗方法如下 我使用以下逻辑在失败时重试消耗方法。 如果当前消费者记录异常失败,我想重试使用该消息。我试图用另一次重试(

  • 我正在使用此代码从网络加载图像。 我同时发出多个请求,因此出现错误,但该文件实际上存在于服务器上。 因此,我想问,如果glide失败,我如何重试glide请求? 我研究了这个讨论,如果Glide失败,如何重试图像加载?但这没有帮助。 错误日志 类com。邦普泰克。滑行负载发动机GlideException:加载资源失败原因1:java。木卫一。FileNotFoundException(无内容提供

  • 我已经看过这些帖子: 谷歌云数据流 - 从Pub到镶木地板 谷歌数据流“工作流程失败”无缘无故 它们很有帮助,我最终为发布/订阅消息创建了类似的东西,比如:<code>{“id”:“1”}</code>(仅用于测试): 我只能看到错误“工作流失败”。但仅对于DataflowRunner,对于DirectRunner,我没有问题。这里是“运行”命令: 以下是此作业的日志(前几行是最后出现的): 现在