当前位置: 首页 > 知识库问答 >
问题:

处理集成流中可能发生的异常

西门振
2023-03-14

我有一个REST控制器,它调用带有注释的网关(errorChannel=ERROR\u CHANNEL)

这样,无论下游发生什么错误,网关发起的集成流都将流入一个错误通道,该通道将由另一个集成流处理,这是按预期工作的。

现在,还有另一个场景,其中一个集成流从Kafka读取消息,将这些消息路由到另一个通道,另一个集成流处理这些消息,另一个流向远程服务发送HTTP请求。

public IntegrationFlowBuilder attachmentEventTenantRouter(String tenantId) {
    return attachmentEventBaseFlow(".*")
            .filter(Message.class, m -> m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.TENANT_ID_KEY) != null && m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.TENANT_ID_KEY, String.class).equalsIgnoreCase(tenantId));
}

private IntegrationFlowBuilder attachmentEventBaseFlow(String eventRegex) {
    return IntegrationFlows
            .from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory.createContainer(topic)).errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME))
            .log(LoggingHandler.Level.DEBUG, "Inside Kafka Consumer")
            .filter(Message.class, m -> filter(m, eventRegex))
            .transform(messageToEventTransformer);
}

@Bean 
public IntegrationFlow kafkaConsumerFlow() {
    return fromKafkaFlowHelper.attachmentEventTenantRouter(TENANT_ID)
            .route(Message.class, m -> m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.EVENT_TYPE_KEY, String.class), p -> p
                    .resolutionRequired(false)
                    .channelMapping("eventType", "transformMessagesFromKafkaAndPublishAnotherEvent")
                    .defaultOutputChannel("nullChannel"))
            .get();
}

@Bean
public IntegrationFlow transformMessagesFromKafkaAndPublishAnotherEvent() {
    return flow -> flow
            .transform(transformer)
            .handle( getKafkaHandler() );
}

@Bean
public IntegrationFlow sendHttpRequestToRemoteServiceFromKafkaEvent() {
    return flow -> flow
            .transform(transformer)
            .handle(gatewayCall, e -> e.advice(expressionAdvice()));
}

如何处理上述流中可能出现的异常?

如您所见,我正在使用ExpressionEvaluationRequestHandlerAdvice来处理handle方法,但不确定如何处理transformers中可能出现的异常?

配置了错误通道的按摩网关在rest控制器调用网关时会起作用,但当流量由Kafka消费者启动时,我不知道如何实现这一点。

谢谢

在Artem回复后编辑,以添加澄清:

这是集成流的配置,该集成流向远程服务发布请求,并且其异常似乎不会被捕获并路由到错误通道,而没有ExpressionEvalue atingRequest estHandler建议:

@Bean
public IntegrationFlow sendHttpRequestToRemoteServiceFromKafkaEvent() {
    return flow -> flow
            .transform(transformer)
            .handle(getOAuth2Handler(HttpMethod.PUT, "remote url"), e -> e.advice(expressionEvaluatingRequestHandlerAdvice));
}

private OAuth2RequestHandler getOAuth2Handler(HttpMethod httpMethod, String url) {
    return new OAuth2RequestHandler(oAuth2RestTemplate, httpMethod, url);
}

和实现MessageHandler的OAuth2Request estHandler类

@Override
public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
    String requestBody = (String) message.getPayload();
    ResponseEntity<String> response = oAuth2RestTemplate.exchange(url, httpMethod, new HttpEntity<>(requestBody), String.class);
}

共有1个答案

郦磊
2023-03-14

我看到您已经在Kafka消息驱动的通道适配器上使用了errorChannel()。那么,问题是什么?

这部分流与前面提到的带有错误通道配置的MesaagingGateway完全等效。

 类似资料:
  • 问题内容: 我有一个生成器和一个使用它的函数: 如果生成器引发异常,我想在使用者函数中处理该异常,然后继续使用迭代器,直到耗尽为止。请注意,我不想在生成器中有任何异常处理代码。 我想到了类似的东西: 但这对我来说看起来很尴尬。 问题答案: 这也是我不确定是否正确/优雅处理的事情。 我要做的是从生成器中获取一个,然后将其提升到其他位置。喜欢: 这样,我仍然继承了Exception而没有引发它,这将导

  • 问题内容: 我具有以下Spring Integration配置,该配置允许我从MVC Controller调用网关方法并让控制器返回,而集成流将在不阻塞控制器的单独线程中继续进行。 但是,我无法弄清楚如何使我的错误处理程序为该异步流工作。我的网关定义了错误通道,但是由于某种原因我的异常没有到达。相反,我看到被调用了。 网关: 为了查看我的错误处理程序正在处理的异步集成流程中发生的异常,我该怎么办?

  • 我目前有一个Spring集成流程,可以很好地工作(请参见图中的链接)。我想在当前配置的基础上添加批处理,以允许使用指数回退、断路器模式进行重试,并将作业持久化到数据库中以重新启动。 集成流由一个接收消息的网关组成 我已经搜索,阅读文档,看了大量的例子,我不知道如何将流程的最后一步封装到批处理作业中。如果上传过程中出现SFTP连接问题或其他异常,我需要重试上传字符串(作为文件的有效负载)的能力。我还

  • 如果spring集成webflux流中发生异常,则异常本身(带有stacktrace)通过MessagePublishingErrorHandler作为有效负载发送回调用方,该处理器使用来自“errorChannel”头的错误通道,而不是默认错误通道。 如何设置类似于WebExceptionHandler的错误处理程序?我想生成一个Http状态代码,并可能生成一个DefaultErrorAttri

  • 曾发表过多篇文章,但大多数都与处理错误消息有关,而不是处理过程中的异常处理。 我想知道如何处理流应用程序接收到的消息,并且在处理消息时出现异常?异常可能是由于多种原因造成的,如网络故障、RuntimeException等。, 有人能提出正确的方法吗?我应该使用setUncaughtExceptionHandler吗?还是有更好的方法

  • 问题内容: 这是处理生成器中引发的异常的后续操作,并讨论了一个更一般的问题。 我有一个功能,可以读取不同格式的数据。所有格式都是面向行或记录的,每种格式都有一个专用的解析功能,可以作为生成器来实现。因此,主读取函数获得一个输入和一个生成器,该生成器从输入中读取其各自的格式并将记录传递回主函数: 哪里是这样的: 我面临的问题是,尽管可能引发异常(例如,从流中读取时),但它不知道如何处理它。负责处理异