我正在使用Spring Boot以及Spring集成版本4.2.5。RELEASE
。我正在尝试为调用远程超文本传输协议服务的Spring集成流之一正确设置重试建议。流程的简化草案如下所示:1.网关-
我想要实现的是:如果http操作挂起或远程服务不可用,请尝试再次发送请求,最多5次,然后将消息放入恢复通道,在该通道中我可以执行一些其他操作,但我也想向作为我的流入口点的网关返回有效响应(
fetchnewidssresponsedto)。
单个流量元件的相关代码:
1、网关:
@MessagingGateway(errorChannel = GATEWAY_ERROR_CHANNEL_NAME,
defaultReplyTimeout = "2000")
public interface FetchNewIdsGateway {
@Gateway(requestChannel = GATEWAY_INPUT_CHANNEL_NAME)
FetchNewIdsResponseDTO fetch(Object o);
}
2.设置所需超文本传输协议标头的流:
@Bean
IntegrationFlow fetchNewIdsOutboundGatewayFlow(@Value("${worker-app.hostname}") String workerAppHostname,
CommonOutboundGatewayTemplate.CommonOutboundGateway gateway) {
return IntegrationFlows.from(IntegrationFlowConstants.OUTBOUND_GATEWAY_TEMPLATE_INPUT_CHANNEL)
.enrichHeaders(h -> h
.header(CommonMessageHeaders.WORKER_APP_REQUEST_HTTP_METHOD, HttpMethod.GET.toString())
.header(CommonMessageHeaders.WORKER_APP_LOCATION, workerAppHostname)
.header(CommonMessageHeaders.WORKER_APP_REQUEST_PATH, ServiceIntegrationControllerEndpoints.FETCH_NEW_IDS)
.header(CommonMessageHeaders.FLOW_NAME, IntegrationFlowTypes.FETCH_NEW_IDS.getFlowName()))
.handle(Object.class, gateway::send)
.get();
}
3、通用出站网关模板:
@MessagingGateway(name = COMMON_OUTBOUND_GATEWAY_NAME,
defaultReplyChannel = GATEWAY_OUTPUT_CHANNEL,
errorChannel = GATEWAY_ERROR_CHANNEL,
defaultReplyTimeout = "2000")
public interface CommonOutboundGateway {
@Gateway(requestChannel = COMMON_OUTBOUND_GATEWAY_INPUT_CHANNEL)
Object send(Object o, @Headers Map<String, Object> headers);
}
4.超文本传输协议-out站网关:
@Bean
IntegrationFlow commonOutboundGatewayFlow(@Qualifier(REST_TEMPLATE_NAME) RestTemplate restTemplate,
@Qualifier(COMMON_OUTBOUND_GATEWAY_RECOVERY_CHANNEL) MessageChannel recoveryChannel) {
return IntegrationFlows.from(COMMON_OUTBOUND_GATEWAY_INPUT_CHANNEL)
.enrichHeaders(h -> h.header(CommonMessageHeaders.CONTENT_TYPE, CommonMessageHeaders.CONTENT_TYPE_JSON_VALUE))
.handle(Http.outboundGateway("{url}{path}", restTemplate)
.httpMethodFunction(m -> m.getHeaders().get(CommonMessageHeaders.WORKER_APP_REQUEST_HTTP_METHOD))
.charset("UTF-8")
.encodeUri(false)
.mappedRequestHeaders(CommonMessageHeaders.CONTENT_TYPE)
.mappedResponseHeaders(CommonMessageHeaders.CONTENT_TYPE)
.expectedResponseType(String.class)
.uriVariable("url", m -> m.getHeaders().get(CommonMessageHeaders.WORKER_APP_LOCATION))
.uriVariable("path", m -> m.getHeaders().get(CommonMessageHeaders.WORKER_APP_REQUEST_PATH))
, e -> e.advice(retryAdvice(recoveryChannel)))
.channel(COMMON_OUTBOUND_GATEWAY_OUTPUT_TRANSFORMER_CHANNEL)
.get();
}
重试建议:
@Bean
public RequestHandlerRetryAdvice retryAdvice(@Qualifier(COMMON_OUTBOUND_GATEWAY_RECOVERY_CHANNEL) MessageChannel recoveryChannel) {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
advice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel));
advice.setRetryTemplate(retryTemplate());
return advice;
}
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(5);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(5.0);
backOffPolicy.setMaxInterval(1000L);
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
最后,可以执行其他建议操作和转换消息的流(错误处理)
@Bean
public IntegrationFlow fetchNewIdsErrorRecovery() {
return IntegrationFlows.from(CommonOutboundGatewayTemplate.COMMON_OUTBOUND_GATEWAY_RECOVERY_CHANNEL)
// -->ADDITIONAL RECOVERY ACTIONS WOULD GO HERE, BEFORE TRANSFORMATION!
.<MessagingException, Message>transform(p -> {
Object failedPayload = p.getFailedMessage().getPayload();
return MessageBuilder.createMessage(failedPayload, p.getFailedMessage().getHeaders());
})
我能从日志中看到什么
2017-01-11 21:56:32,318 2505 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=0
2017-01-11 21:56:32,831 3018 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=1
2017-01-11 21:56:32,831 3018 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=1
2017-01-11 21:56:33,833 4020 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=2
2017-01-11 21:56:33,833 4020 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=2
2017-01-11 21:56:34,835 5022 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=3
2017-01-11 21:56:34,835 5022 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=3
2017-01-11 21:56:35,836 6023 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=4
2017-01-11 21:56:35,836 6023 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=4
2017-01-11 21:56:35,838 6025 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=5
2017-01-11 21:56:35,838 6025 [main] DEBUG o.s.retry.support.RetryTemplate - Retry failed last attempt: count=5
2017-01-11 21:56:50,900 21087 [main] INFO o.s.i.handler.LoggingHandler - ErrorMessage [payload=org.springframework.messaging.MessagingException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds; nested exception is org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$ThrowableHolderException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds, headers={history=common-outbound-gateway-recovery-channel,fetch-new-ids-error-log-channel,trace-log-channel, id=f27f1633-9e5f-a3eb-f1b7-6bc04cecf167, timestamp=1484168210900}]
2017-01-11 21:56:50,902 21089 [main] ERROR FetchNewIdsFlowLoggers$$EnhancerBySpringCGLIB$$ccbdfda2 - [STEP 1::fetch new IDs] Error while executing the flow, DETAILS: HEADERS: {history=common-outbound-gateway-recovery-channel,fetch-new-ids-error-log-channel,com.szyszy.superscraper.integration.flows.fetchnewids.FetchNewIdsFlowLoggers.fetchNewIdsErrorLogHandler.serviceActivator.handler,fetchNewIdsErrorLogHandler, id=fe6fed47-bcca-ac61-3f09-6f573e2b0cde, timestamp=1484168210901}PAYLOAD: org.springframework.messaging.MessagingException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds; nested exception is org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$ThrowableHolderException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds
2017-01-11 21:56:50,903 21090 [main] INFO o.s.i.handler.LoggingHandler - ErrorMessage [payload=org.springframework.messaging.MessagingException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds; nested exception is org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$ThrowableHolderException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds, headers={history=common-outbound-gateway-recovery-channel,trace-log-channel, id=651ded47-22c4-d2a6-7326-c08960ff9247, timestamp=1484168210903}]
2017-01-11 21:56:52,484 22671 [main] INFO o.s.i.handler.LoggingHandler - ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4993febc, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4993febc, history=common-outbound-gateway-error-transformer-channel,common-outbound-gateway-error-log-channel,trace-log-channel, id=da0fff16-9fef-69e9-ed9d-bfcde7e01e29, timestamp=1484168212484}]
2017-01-11 21:56:52,486 22673 [main] ERROR CommonOutboundGatewayTemplateLoggers$$EnhancerBySpringCGLIB$$2fe9ed5c - [OUTBOUND GATEWAY OPERATION] Failed to send message to the worker application, DETAILS: HEADERS: {replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4993febc, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4993febc, history=common-outbound-gateway-error-transformer-channel,common-outbound-gateway-error-log-channel,com.szyszy.superscraper.integration.commonflows.outbound.CommonOutboundGatewayTemplateLoggers.commonOutboundGatewayErrorLogHandler.serviceActivator.handler,commonOutboundGatewayErrorLogHandler, id=08ca5452-68d4-4464-6fa7-961632330405, timestamp=1484168212485}PAYLOAD: org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available -- Failed payload: : org.springframework.messaging.MessagingException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds; nested exception is org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$ThrowableHolderException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds
2017-01-11 21:56:52,489 22676 [main] INFO o.s.i.handler.LoggingHandler - ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4993febc, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4993febc, history=common-outbound-gateway-error-transformer-channel,trace-log-channel, id=590c49a7-3d4b-ced2-b2bc-3250508e1986, timestamp=1484168212489}]
2017-01-11 21:56:52,490 22677 [main] INFO o.s.i.handler.LoggingHandler - ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4993febc, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4993febc, history=common-outbound-gateway-error-transformer-channel,com.szyszy.superscraper.integration.commonflows.outbound.CommonOutboundGatewayTemplate.gatewayErrorChannel.bridgeTo.handler,common-outbound-gateway-error-response-transformer-channel,trace-log-channel, id=e12b2aa5-b471-bed0-c627-7620cb9c8bcf, timestamp=1484168212490}]
特别有趣的是:
payload=org.springframework.messaging。MessaginException:调度程序未能传递消息;嵌套的异常是org.springframework.messaging.core。DestinationResolutionException:没有可用的输出通道或replyChannel标头
由于到达了错误通道,因此正确创建了响应对象(
fetchnewidssresponsedto),并返回到第一个网关。消息到达恢复通道后,抛出了一个DestinationResolutionException。事实上,我不明白为什么会这样。我的fetchnewidsrerrorrecovery流构建了一条新的消息,复制了原始的头,因此我认为回复通道应该是可用的。
我如何(通过恢复通道)执行任何其他操作,同时消除此异常?有人能告诉我这个建议的目的吗?
它是为错误处理而创建的还是以不同的方式创建的,执行一些额外的操作并最终依赖http网关的错误处理?
更新:
解决了问题,这是问题解决后的日志输出:添加<代码>。桥(null)
在调用之前调用。get()
2017-01-16 22:11:42,376 2260 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=0
2017-01-16 22:11:42,890 2774 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=1
2017-01-16 22:11:42,890 2774 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=1
2017-01-16 22:11:43,891 3775 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=2
2017-01-16 22:11:43,891 3775 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=2
2017-01-16 22:11:44,891 4775 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=3
2017-01-16 22:11:44,891 4775 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=3
2017-01-16 22:11:45,893 5777 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=4
2017-01-16 22:11:45,893 5777 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=4
2017-01-16 22:11:45,893 5777 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=5
2017-01-16 22:11:45,894 5778 [main] DEBUG o.s.retry.support.RetryTemplate - Retry failed last attempt: count=5
2017-01-16 22:11:45,895 5779 [main] INFO o.s.i.handler.LoggingHandler - ErrorMessage [payload=org.springframework.messaging.MessagingException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds; nested exception is org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$ThrowableHolderException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds, headers={history=common-outbound-gateway-recovery-channel,fetch-new-ids-error-log-channel,trace-log-channel, id=5872d71d-29fa-a71f-0c9f-800d4decc6cb, timestamp=1484601105895}]
2017-01-16 22:11:45,897 5781 [main] ERROR FetchNewIdsFlowLoggers$$EnhancerBySpringCGLIB$$f05c6801 - [STEP 1::fetch new IDs] Error while executing the flow, DETAILS: HEADERS: {history=common-outbound-gateway-recovery-channel,fetch-new-ids-error-log-channel,com.szyszy.superscraper.integration.flows.fetchnewids.FetchNewIdsFlowLoggers.fetchNewIdsErrorLogHandler.serviceActivator.handler,fetchNewIdsErrorLogHandler, id=e176b462-d02e-0603-b6ec-9a79588794e8, timestamp=1484601105896}PAYLOAD: org.springframework.messaging.MessagingException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds; nested exception is org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$ThrowableHolderException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds
2017-01-16 22:11:45,898 5782 [main] INFO o.s.i.handler.LoggingHandler - ErrorMessage [payload=org.springframework.messaging.MessagingException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds; nested exception is org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$ThrowableHolderException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds, headers={history=common-outbound-gateway-recovery-channel,trace-log-channel, id=7cc7379b-0320-661e-657c-d1d5a0c6a741, timestamp=1484601105898}]
2017-01-16 22:11:45,900 5784 [main] INFO o.s.i.handler.LoggingHandler - GenericMessage [payload=FetchNewIdsResponseDTO{links=null, ids=null, RestResponseDTO{errorMessage='Error while fetching new ids - Remote server is not available!', success=false}}, headers={workerAppLocation=http://testWorkerAppHostname, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4d0402b, workerAppRequestPath=/newIds, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4d0402b, workerAppRequestHTTPMethod=GET, history=fetchNewIdsFlow$FetchNewIdsGateway,fetch-new-ids-gateway-in-channel,in-child-context-individual-outbound-gateway-template-channel,fetchNewIdsOutboundGatewayFlow.channel#0,commonOutboundToWorkerGateway,common-outbound-gateway-input-channel,commonOutboundGatewayFlow.channel#0,fetchNewIdsErrorResponseTransformer2.channel#0,trace-log-channel, id=ee760170-a73f-ddcf-8a1a-c0835f773374, flowName=FETCH_NEW_IDS, Content-Type=application/json, timestamp=1484601105900
哦我发现了问题所在。
您必须将. bridge(null)
放在之后。
框架为此目的咨询
Request estMessage
的复制通道
标头的问题。不是回复
对象。
不确定我们是否会在框架中更改某些内容,但这是事实,
回复
不会影响输出
解析决策。
我们可以通过路由滑模式做到这一点,购买你必须提前放置一个标题。
吉拉对此事的指责。
我在应用程序中收到一个错误,我没有得到解决方案。应用程序请求REST/JSON并尝试与另一个应用程序通信。有一个轮询器可以异步调用每个请求。当一个do a请求总是分派错误“没有可用的输出通道或replyChannel头”,它被重定向到errorChannel句柄。 日志打印: 后发送(发送=true)在通道'错误通道',消息:错误消息[有效载荷=org.springframework.messag
我写了一个带有请求和回复的简单消息流。我必须使用两个独立的队列,所以我声明AmqpOutboundAdapter发送消息,声明Amqp入站Adapter接收回复。 它应该适用于@MessagingGateway: ADUsersFindResponseConfig类类似于: 发送消息正常工作,但我在接收消息时遇到问题。我希望收到的消息将被传递到名为FIND_AD_USERS_REPLY_OUTPU
我正在处理Spring集成和文件。我正在使用提供的DSL来处理我的文件。在我的集成流程结束时,我使用文件将结果输出到一个新文件。outboundGateway(…) 。但是,我不断收到以下错误消息:没有可用的输出通道或replyChannel标头。根据这篇文章底部的帖子,解决方案是将预期回复设置为false,但是,我如何使用DSL做到这一点? 下面显示了我在集成流程的最后一部分中写入文件的操作。
我在我们的项目中引入了spring集成,而不是遗留集成架构。该体系结构支持发送者和累犯。每个发件人可以配置3个目的地。 null Spring integration gateways看起来很合适。我可以使用default-request-channel来表示主流,error-channel来表示失败流。备份流的问题。如何复制网关传入消息并将其放置到备份通道? 更准确地说,这里是一个测试和代码。
我需要在我的Spring集成上下文中动态地将消息分配给MessageChannel。当我知道我想要的MessageChannel的名称时,我可以通过从上下文中获取MessageChannel bean来做到这一点。 我需要做的是通过编程查找在ChannelAdapter/服务中设置的消息通道的名称/id。 但是,MessageChannel API没有与之关联的getName()或getId()方
我想了解Spring集成中如何处理消息:串行或并行。特别是我有一个带有轮询器和HTTP出站网关的入站通道适配器。我猜拆分器、变压器、标头丰富器等不会产生自己的线程。 我可能错过了,但是这些细节在留档的某个地方指定了吗? 还可以通过编程方式获取系统中的所有频道吗?