基本上,我的用例是在HttpOutboundGateway请求中出现401时重试http请求。该请求来自jms代理,进入集成流。
@Bean
IntegrationFlow bank2wallet(ConnectionFactory jmsConnectionFactory,
MessageHandler creditWalletHttpGateway) {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
.destination(cp.getTransactionIn()))
.<String, CreditRequest>transform(
request -> new Gson().fromJson(request, CreditRequest.class))
.enrichHeaders((headerEnricherSpec -> {
// Todo get token from cache
headerEnricherSpec.header(HttpHeaders.AUTHORIZATION, String.join(" ", "Bearer", ""));
headerEnricherSpec.header(HttpHeaders.ACCEPT, "application/json");
headerEnricherSpec.header(HttpHeaders.CONTENT_TYPE, "application/json");
}))
.handle(creditWalletHttpGateway, (e) -> e.advice(retryAdvice()))
.get();
}
@Bean
MessageHandler creditWalletHttpGateway( @Value("${api.base.uri:https:/localhost/v3/sync}") URI uri) {
HttpRequestExecutingMessageHandler httpHandler = new HttpRequestExecutingMessageHandler(uri);
httpHandler.setExpectedResponseType(CreditResponse.class);
httpHandler.setHttpMethod(HttpMethod.POST);
return httpHandler;
}
@Bean
RequestHandlerRetryAdvice retryAdvice() {
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
requestHandlerRetryAdvice.setRecoveryCallback(errorMessageSendingRecoverer());
return requestHandlerRetryAdvice;
}
@Bean
ErrorMessageSendingRecoverer errorMessageSendingRecoverer() {
return new ErrorMessageSendingRecoverer(recoveryChannel());
}
@Bean
MessageChannel recoveryChannel() {
return new DirectChannel();
}
@Bean
MessageChannel retryChannel() {
return new DirectChannel();
}
@Bean
IntegrationFlow handleRecovery() {
return IntegrationFlows.from("recoveryChannel")
.log(Level.ERROR, "error", m -> m.getPayload())
.<RuntimeException>handle((message) -> {
MessagingException exception = (MessagingException) message.getPayload();
Message<CreditRequest> originalCreditRequest = (Message<CreditRequest>) exception.getFailedMessage();
// String token = gateway.getToken(configProperties);
String token = UUID.randomUUID().toString();
Message<CreditRequest> c = MessageBuilder.fromMessage(originalCreditRequest)
.setHeader(ApiConstants.AUTHORIZATION, String.join(" ", "Bearer", token))
.copyHeaders(message.getHeaders())
.build();
retryChannel().send(c);
})
.get();
}
@Bean
IntegrationFlow creditRequestFlow() {
return IntegrationFlows.from(retryChannel())
.log(Level.INFO, "info", m -> m.getPayload())
.handle(Http.outboundGateway("https://localhost/v3/sync")
.httpMethod(HttpMethod.POST)
.expectedResponseType(CreditResponse.class))
.get();
}
标头使用适当的http标头进行了丰富,然后我有一个建议,使用默认的简单策略重试请求,RequestHandlerAdvice方法的问题是,它将HandlerRecovery流中的异常消息默认为非HttpException类(MessageException),因此我无法检查HttpStatus代码以重新路由消息。所以我的问题基本上是如何设计一个基于HttpStatus 401重试HttpOutBoundRequest的流。
我通过引入一个网关来进行出站http调用,并使用递归方式对其进行管理,从而解决了这个问题
@MessagingGateway
public interface B2WGateway {
/**
*
* @param message
* @return
*/
@Gateway(requestChannel = "credit.input")
CreditResponse bankToWallet(Message<CreditRequest> message);
}
然后隔离http出站集成流
@Bean
IntegrationFlow credit() {
return f -> f.log(Level.INFO, "info", m -> m.getHeaders())
.handle(Http.outboundGateway(configProperties.getBankToWalletUrl())
.httpMethod(HttpMethod.POST)
.expectedResponseType(CreditResponse.class)
.errorHandler(new ResponseErrorHandler() {
@Override
public boolean hasError(ClientHttpResponse clientHttpResponse) throws IOException {
return clientHttpResponse.getStatusCode().equals(HttpStatus.UNAUTHORIZED);
}
@Override
public void handleError(ClientHttpResponse clientHttpResponse) throws IOException {
throw new AuthenticationRequiredException("Authentication Required");
}
}));
}
然后将消息解析为handleRecovery,以便在获得令牌刷新后发送消息
@Bean
IntegrationFlow handleRecovery() {
return IntegrationFlows.from("recoveryChannel")
.log(Level.ERROR, "error", m -> m.getPayload())
.<RuntimeException>handle((p, h) -> {
MessageHandlingExpressionEvaluatingAdviceException exception = (MessageHandlingExpressionEvaluatingAdviceException) p;
Message<CreditRequest> originalCreditRequest = (Message<CreditRequest>) exception
.getFailedMessage();
// String token = gateway.getToken(configProperties);
String token = UUID.randomUUID().toString();
Message<CreditRequest> c = MessageBuilder.fromMessage(originalCreditRequest)
.setHeader(ApiConstants.AUTHORIZATION, String.join(" ", "Bearer", token))
.copyHeaders(h)
.build();
return c;
})
.channel("credit.input")
.get();
}
然后修改了流的初始值,以使用网关服务和表达式建议。
@Bean
IntegrationFlow bank2wallet(ConnectionFactory jmsConnectionFactory) {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
.destination(cp.getTransactionIn()))
.<String, CreditRequest>transform(
request -> new Gson().fromJson(request, CreditRequest.class))
.enrichHeaders((headerEnricherSpec -> {
// Todo get token from cache
headerEnricherSpec.header(HttpHeaders.AUTHORIZATION, String.join(" ", "Bearer", ""));
headerEnricherSpec.header(HttpHeaders.ACCEPT, "application/json");
headerEnricherSpec.header(HttpHeaders.CONTENT_TYPE, "application/json");
}))
.handle((GenericHandler<CreditRequest>) (creditRequest, headers) -> gateway
.bankToWallet(MessageBuilder.withPayload(creditRequest)
.copyHeaders(headers)
.build()), (e) -> e.advice(retryAdvice()))
.get();
}
Spring集成的启示-管理超文本传输协议出站适配器调用中的401错误
我的处理来自的消息。周期性地,按摩无法处理,消费者抛出异常。不管怎样,消费者还是会做出补偿。在Kafka中,我能区分成功消息和失败消息吗?我想,我不能。这是真的吗?如果这是真的,我有一个主要问题: 如何重试失败消息?我知道一些方法,但我不确定它们是否正确。 1) 将“偏移”更改为“提前”。但通过这种方式,成功消息也会重试。 2) 当我捕捉到异常时,我会将此消息发送到另一个主题(例如错误主题)。但这
我在使用新的0.3.0-beta版WebJobs SDK的WebJob中有以下逻辑。当我的代码处理消息失败时,Azure仪表板将显示聚合异常(这是有意义的,因为这是异步的)。但是,它不会重试处理消息。 我能找到的非常小的留档表明消息应该在失败后10分钟内重试。新的SDK不是这样吗? 我得到的异常源于SQL超时异常(在我的代码中是针对SQL Azure的db查询):
我正在使用带有ActiveMQ的JMS。我在一台服务器上运行ActiveMQ代理,它创建一个队列和一个消息使用者,用于接收来自其他服务器的消息。如果另一个服务器试图向队列发送消息,但由于某种原因失败(比如服务器之间的网络中断),那么我不希望该服务器重试发送消息。只发送一次,如果失败,不要重试。这在ActiveMQ/JMS中可能吗?javax.JMS.session类有3种确认模式(auto_ack
我正在尝试使用reamer-kafka来消耗消息。其他一切都很好,但我想为失败的消息添加重试(2)。spring-kafka已经默认重试失败记录3次,我想使用reamer-kafka实现相同。 我用SpringKafka作为反应Kafka的包装。以下是我的消费者模板: 让我们考虑消耗方法如下 我使用以下逻辑在失败时重试消耗方法。 如果当前消费者记录异常失败,我想重试使用该消息。我试图用另一次重试(
我正在使用此代码从网络加载图像。 我同时发出多个请求,因此出现错误,但该文件实际上存在于服务器上。 因此,我想问,如果glide失败,我如何重试glide请求? 我研究了这个讨论,如果Glide失败,如何重试图像加载?但这没有帮助。 错误日志 类com。邦普泰克。滑行负载发动机GlideException:加载资源失败原因1:java。木卫一。FileNotFoundException(无内容提供
我已经看过这些帖子: 谷歌云数据流 - 从Pub到镶木地板 谷歌数据流“工作流程失败”无缘无故 它们很有帮助,我最终为发布/订阅消息创建了类似的东西,比如:<code>{“id”:“1”}</code>(仅用于测试): 我只能看到错误“工作流失败”。但仅对于DataflowRunner,对于DirectRunner,我没有问题。这里是“运行”命令: 以下是此作业的日志(前几行是最后出现的): 现在