当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud数据流错误通道不工作

能钟展
2023-03-14

我正在尝试为我的SpringCloudDataflow流创建一个自定义异常处理程序,以路由一些要重新排队的错误和其他要DLQ的错误。

为此,我使用了全局Spring集成“errorChannel”和基于异常类型的路由。

这是Spring集成错误路由器的代码:

package com.acme.error.router;

import com.acme.exceptions.DlqException;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;


@MessageEndpoint
@EnableBinding({ ErrorMessageChannels.class })
public class ErrorMessageMappingRouter {
   private static final Logger LOGGER = LoggerFactory.getLogger(ErrorMessageMappingRouter.class);

   public static final String ERROR_CHANNEL = "errorChannel";

   @Router(inputChannel = ERROR_CHANNEL)
    public String onError(Message<Object> message) {
      LOGGER.debug("ERROR ROUTER - onError");
      if(message.getPayload() instanceof MessageTransformationException) {
         MessageTransformationException exception = (MessageTransformationException) message.getPayload();
         Message<?> failedMessage = exception.getFailedMessage();
          if(exceptionChainContainsDlq(exception)) {
             return ErrorMessageChannels.DLQ_QUEUE_NAME;
          }
         return ErrorMessageChannels.REQUEUE_CHANNEL;
      }
      return ErrorMessageChannels.DLQ_QUEUE_NAME;
    }

    ...

}

错误路由器由每个流应用程序通过Spring Boot应用程序上的包扫描获取:

@ComponentScan(basePackages = { "com.acme.error.router" }
@SpringBootApplication
public class StreamApp {}

当它与本地 Spring Cloud Dataflow 服务器(版本 1.5.0-RELEASE)一起部署和运行时,并且抛出 DlqException,该消息将成功路由到 errorRouter 中的 onError 方法,然后放入 dlq 主题中。

但是,当它部署为带有SCDF库伯内特斯服务器(也是1.5.0-RELEASE版本)的docker容器时,永远不会命中onError方法。(路由器开头的log语句永远不会输出)

在流应用程序的启动日志中,看起来bean被正确拾取并注册为errorChannel的侦听器,但由于某些原因,当抛出异常时,它们不会被路由器中的onError方法处理。

启动日志:

o.s.i.endpoint.EventDrivenConsumer : Adding {router:errorMessageMappingRouter.onError.router} as a subscriber to the 'errorChannel' channel
o.s.i.channel.PublishSubscribeChannel : Channel 'errorChannel' has 1 subscriber(s).
o.s.i.endpoint.EventDrivenConsumer : started errorMessageMappingRouter.onError.router

我们使用 Spring 云流和 kafka binder 配置的所有默认设置:

spring.cloud:
  stream:
    binders:
      kafka:
        type: kafka
        environment.spring.cloud.stream.kafka.binder.brokers=brokerlist
        environment.spring.cloud.stream.kafka.binder.zkNodes=zklist

编辑:添加了来自 kubectl 描述的 pod 参数

Args:
--spring.cloud.stream.bindings.input.group=delivery-stream
--spring.cloud.stream.bindings.output.producer.requiredGroups=delivery-stream
--spring.cloud.stream.bindings.output.destination=delivery-stream.enricher
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.zkNodes=<zkNodes>
--spring.cloud.stream.binders.xdkafka.type=kafka
--spring.cloud.stream.binders.xdkafka.defaultCandidate=true
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.brokers=<brokers>
--spring.cloud.stream.bindings.input.destination=delivery-stream.config-enricher

我们尝试的另一个想法是尝试使用Spring Cloud Stream-spring integration错误通道支持向代理发送关于错误的主题,但由于消息似乎根本没有到达全局Spring Integration错误通道,这也不起作用。

我们需要在SCDF Kubernetes中做什么特殊的事情来启用全局Spring Integration errorChannel吗?

我错过了什么?

根据评论更新解决方案:

在查看了您的配置后,我现在非常确定我知道问题所在。您有一个多绑定器配置场景。即使您只处理单个绑定器实例,spring.cloud.stream.binders.的存在…是框架将其视为多绑定器的原因。基本上这是一个bug-github.com/spring-cloud/spring-cloud-stream/issues/1384.正如您所看到的,它是固定的,但您需要升级到Elmhurst.SR2或获取最新的快照(我们在RC2中,2.1.0.RELEASE是在几周内)-奥列格·朱拉库斯基

这确实是我们设置的问题。我们没有升级,只是暂时取消了多活页夹的使用,问题已经解决。


共有1个答案

卢子民
2023-03-14

根据评论更新解决方案:

在查看了您的配置后,我现在非常确定我知道问题所在。您有一个多绑定器配置场景。即使您只处理单个绑定器实例,spring.cloud.stream.binders.的存在…是框架将其视为多绑定器的原因。基本上这是一个bug-github.com/spring-cloud/spring-cloud-stream/issues/1384.正如您所看到的,它是固定的,但您需要升级到Elmhurst.SR2或获取最新的快照(我们在RC2中,2.1.0.RELEASE是在几周内)-奥列格·朱拉库斯基

这确实是我们设置的问题。我们没有升级,只是暂时取消了多活页夹的使用,问题已经解决。

 类似资料: