我正在尝试为我的SpringCloudDataflow流创建一个自定义异常处理程序,以路由一些要重新排队的错误和其他要DLQ的错误。
为此,我使用了全局Spring集成“errorChannel”和基于异常类型的路由。
这是Spring集成错误路由器的代码:
package com.acme.error.router;
import com.acme.exceptions.DlqException;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
@MessageEndpoint
@EnableBinding({ ErrorMessageChannels.class })
public class ErrorMessageMappingRouter {
private static final Logger LOGGER = LoggerFactory.getLogger(ErrorMessageMappingRouter.class);
public static final String ERROR_CHANNEL = "errorChannel";
@Router(inputChannel = ERROR_CHANNEL)
public String onError(Message<Object> message) {
LOGGER.debug("ERROR ROUTER - onError");
if(message.getPayload() instanceof MessageTransformationException) {
MessageTransformationException exception = (MessageTransformationException) message.getPayload();
Message<?> failedMessage = exception.getFailedMessage();
if(exceptionChainContainsDlq(exception)) {
return ErrorMessageChannels.DLQ_QUEUE_NAME;
}
return ErrorMessageChannels.REQUEUE_CHANNEL;
}
return ErrorMessageChannels.DLQ_QUEUE_NAME;
}
...
}
错误路由器由每个流应用程序通过Spring Boot应用程序上的包扫描获取:
@ComponentScan(basePackages = { "com.acme.error.router" }
@SpringBootApplication
public class StreamApp {}
当它与本地 Spring Cloud Dataflow 服务器(版本 1.5.0-RELEASE)一起部署和运行时,并且抛出 DlqException,该消息将成功路由到 errorRouter 中的 onError 方法,然后放入 dlq 主题中。
但是,当它部署为带有SCDF库伯内特斯服务器(也是1.5.0-RELEASE版本)的docker容器时,永远不会命中onError方法。(路由器开头的log语句永远不会输出)
在流应用程序的启动日志中,看起来bean被正确拾取并注册为errorChannel的侦听器,但由于某些原因,当抛出异常时,它们不会被路由器中的onError方法处理。
启动日志:
o.s.i.endpoint.EventDrivenConsumer : Adding {router:errorMessageMappingRouter.onError.router} as a subscriber to the 'errorChannel' channel
o.s.i.channel.PublishSubscribeChannel : Channel 'errorChannel' has 1 subscriber(s).
o.s.i.endpoint.EventDrivenConsumer : started errorMessageMappingRouter.onError.router
我们使用 Spring 云流和 kafka binder 配置的所有默认设置:
spring.cloud:
stream:
binders:
kafka:
type: kafka
environment.spring.cloud.stream.kafka.binder.brokers=brokerlist
environment.spring.cloud.stream.kafka.binder.zkNodes=zklist
编辑:添加了来自 kubectl 描述的 pod 参数
Args:
--spring.cloud.stream.bindings.input.group=delivery-stream
--spring.cloud.stream.bindings.output.producer.requiredGroups=delivery-stream
--spring.cloud.stream.bindings.output.destination=delivery-stream.enricher
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.zkNodes=<zkNodes>
--spring.cloud.stream.binders.xdkafka.type=kafka
--spring.cloud.stream.binders.xdkafka.defaultCandidate=true
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.brokers=<brokers>
--spring.cloud.stream.bindings.input.destination=delivery-stream.config-enricher
我们尝试的另一个想法是尝试使用Spring Cloud Stream-spring integration错误通道支持向代理发送关于错误的主题,但由于消息似乎根本没有到达全局Spring Integration错误通道,这也不起作用。
我们需要在SCDF Kubernetes中做什么特殊的事情来启用全局Spring Integration errorChannel吗?
我错过了什么?
根据评论更新解决方案:
在查看了您的配置后,我现在非常确定我知道问题所在。您有一个多绑定器配置场景。即使您只处理单个绑定器实例,spring.cloud.stream.binders.的存在…是框架将其视为多绑定器的原因。基本上这是一个bug-github.com/spring-cloud/spring-cloud-stream/issues/1384.正如您所看到的,它是固定的,但您需要升级到Elmhurst.SR2或获取最新的快照(我们在RC2中,2.1.0.RELEASE是在几周内)-奥列格·朱拉库斯基
这确实是我们设置的问题。我们没有升级,只是暂时取消了多活页夹的使用,问题已经解决。
根据评论更新解决方案:
在查看了您的配置后,我现在非常确定我知道问题所在。您有一个多绑定器配置场景。即使您只处理单个绑定器实例,spring.cloud.stream.binders.的存在…是框架将其视为多绑定器的原因。基本上这是一个bug-github.com/spring-cloud/spring-cloud-stream/issues/1384.正如您所看到的,它是固定的,但您需要升级到Elmhurst.SR2或获取最新的快照(我们在RC2中,2.1.0.RELEASE是在几周内)-奥列格·朱拉库斯基
这确实是我们设置的问题。我们没有升级,只是暂时取消了多活页夹的使用,问题已经解决。
我有一个数据库记录器,它在通道中不执行任何操作。命令在通道中工作。 我的数据库记录器可以在其他流中工作。 类是一个简单的Java类,它用存储库写入数据库,存储库扩展了。在方法中,不会向数据库写入任何行。 稍后,我将使用日志记录,并尝试对此进行更多研究。
我尝试了Artem的测试代码,它可以在这个场景中工作。如果我像下面那样将type1流转换为子流映射(我这样做,因为我怀疑我的子流代码块),错误流不能打印ABCDEF参数值。之后,我向子流映射添加另一个头(XYZTWR),但它也不能打印。 我的s.out是:
我正在使用spring-cloud-stream:1.3.0.发行版、spring-cloud-stream-binder-kafka:1.3.0.发行版开发spring引导应用程序。我使用spring integration dsl拆分文件中的行,使用beanio将行转换为json,要求将成功的json消息写入一个kafka主题,并将错误消息写入不同的kafka主题。下面是application
工作流失败。原因:(c70954cc80d0504c):对临时位置或暂存文件的一个或多个访问检查失败。有关详细信息,请参阅其他错误消息。有关安全性和权限的详细信息 请帮我做这个。 谢了,斯里尼。