我是Spring整合的新手。
我有一个流,根据某些条件,我需要在其上执行超文本传输协议或tcp调用。我关注的问题与超文本传输协议调用有关。调用的其余endpoint需要一个访问令牌作为身份验证的标头参数,该参数由具有2个方法getMONtAccessToken()
和刷新AccessToken()
的Spring服务提供。我想仅在电流AccessToken
过期时调用方法刷新访问令牌
。
我想做的是在执行对rest api的调用时添加以下逻辑:
如果令牌过期,restendpoint返回401,我希望在流中拦截此错误,并通过添加刷新的访问令牌重试请求。
@Bean
public IntegrationFlow clientIn(AbstractServerConnectionFactory server,
AbstractClientConnectionFactory client, LogService logService) {
return IntegrationFlows.from(Tcp.inboundAdapter(client)
.enrichHeaders(t -> t.headerFunction(IpHeaders.CONNECTION_ID, message -> this.client, true))
.log(msg -> "client: " + logService.log(msg))
.<byte[], Boolean>route(this::shouldForwardToHttp,
mapping -> mapping.subFlowMapping(true, sf -> sf
.enrichHeaders(t -> t.header("Content-Type", MimeTypeUtils.APPLICATION_JSON_VALUE))
.<byte[], RequestMessage>transform(this::buildRequestFromMessage)
.<RequestMessage, HttpEntity>transform(this::getHttpEntity)
.handle(Http.outboundGateway(restUrl).httpMethod(HttpMethod.POST)
.expectedResponseType(ResponseMessage.class))
.<ResponseMessage, byte[]>transform(p -> this.transformResponse(p))
.handle(Tcp.outboundAdapter(client))).subFlowMapping(false,
t -> t.handle(Tcp.outboundAdapter(server).retryInterval(1000))))
.get();
}
HttpEntity getHttpEntity(RequestMessage request) {
MultiValueMap<String, String> mv = new HttpHeaders();
mv.add("accessToken", tokenProvider.getCurrentAccessToken());
HttpEntity entity = new HttpEntity(request, mv);
return entity;
}
我尝试添加Request estHandlerRetry
建议并将其重定向到恢复通道
,但我无法将某些内容返回给调用者流以获取状态代码的响应并使用新的访问令牌
重试调用。
你知道我该怎么做吗?
我认为您不需要重试建议,因为您肯定是通过捕捉401异常并使用刷新的令牌调用服务来模拟它的。听起来更像递归。为了正确实现这一点,我建议研究一下ExpressionEvaluationRequestHandlerAdvice:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-处理程序建议链。是的,它类似于重试,也有故障通道,但没有内置的重试,因为我们将在必要时模拟它反复调用同一endpoint。
为了简化递归逻辑,我将把.handle(Http.outboundGateway(restUrl).httpMethod(httpMethod.POST).expectedResponseType(ResponseMessage.class))
提取到一个单独的流中,并使用一个带有输入通道的网关()
代替主流中的该流。
故障通道
子流应将其消息重新路由回网关流的输入。
这个逻辑中最重要的部分是进行所有原始请求消息头,其中包括网关逻辑复制通道
所需的。
查看有关网关的更多文档:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#gateway.
当表达式EvaluagingRequestHandlerAdvice向故障通道发送消息时,它将作为一条错误消息,并将消息HandlingExpressionEvaluagingAdviceException作为有效负载。导致故障并具有所有必需标头的消息位于getFailedMessage()
属性中。所以,您获取该消息,请求新令牌,将其添加到基于原始令牌的新消息的头中。最后,您将这个新消息发送到HTTP请求的集成流的输入通道。当一切正常时,HTTP调用的结果将从标头转发到前面提到的replyChannel,并因此转发到下一步的主流。
我如何处理在Spring整合中未能向Kafka传达的信息? 我在“int kafka:outbound channel adapter”中没有看到“error channel”是一个选项,我想知道应该在哪里添加错误通道信息,以便我的ErrorHandler可以获得“failed to kafka”类型的错误。(包括所有类型的故障、配置、网络等) 此外,inputToKafka是排队通道,我应该在哪
基础知识: 使用带集成的Spring 4.1.1,引导和1.0.0的DSL。 多个入站SFTP适配器在不同的时间表上从不同的供应商获取文件。 每个集成流在文件下载后将标头附加到消息中,以标识供应商源。 使用MessagePublishingErrorHandler处理异常。 标准消息流在消息处理成功或消息未能完成时通知外部监控解决方案。使用消息头来识别哪个流失败。 在我们收到消息后,成功和错误流都
我正在开发一个Spring集成应用程序,我有一个地图列表,我需要将其插入到表格中。 我使用了jdbc: Outsport-网关或适配器将记录插入到表中。 但是如何使用jdbc:出站网关从我的地图列表中插入所有记录。
问题内容: 入站和出站通道适配器之间的根本区别是什么? 任何示例都将非常有帮助。 我已经查看过Spring文档,这种“方向性”的区别对我来说还不清楚。我支持配置了outbound-channel-adapter的应用程序,但是我发现使用 出站 标签可以直观地了解行为计数器。该适配器获取一个外部文件,然后 将其 引入应用程序中, 在 该应用程序中我们解析文件并保留数据。 这类似于这个问题,但是我想更
我有一个要求,在我必须使用不同的有效负载值对Httpendpoint进行Http出站调用的循环中,调用函数不必等待从出站调用收到的响应,因此基本上出站调用将在循环中异步发生。 有没有办法用Http.outbound网关
我的要求和帖子里描述的差不多。i、 例如,在文件生成器Spring批处理完成后触发Spring集成程序。这里我主要使用Spring集成示例:XML配置和测试。我观察到,测试程序将配置文件初始化为ClassPathXmlApplicationContext,然后根据需要设置一些值。忽略配置,程序的关键似乎是以下四行: 我想知道是否有一个选项可以将其移动到配置文件本身,这样我就不必从测试程序中创建消息