我在使用DSL获取Spring集成流时遇到了一些问题,DSL有一个拆分器,可以在Spring云数据流上下文中正确地满足我的需要。从本质上讲,我的微服务是一个处理器,它试图做到以下几点:
我试图克服的问题是:
验证失败的事件用hasError消息头标记,路由器使用该消息头将其发送给引发MessaginException的ServiceActivator。
经过一些调查和实验,我有缺陷的实验如下所示:
IntegrationFlows.from(Processor.INPUT).
// stuff ommitted for brevity
handle(new MyEventPublisher(.........)).
// List of events produced, split them
split().
// validate each event
transform(new MyEventValidator()).
// attempt to circumvent premature stoppage
channel(MessageChannels.executor(Executors.newCachedThreadPool())).
// route events based on validation result
<String>route("headers[hasError] != null && headers[hasError] == 'true'",
spec -> {
spec.resolutionRequired(false);
spec.defaultOutputChannel(Processor.OUTPUT);
// A failed event routes to a service that throws a MessagingException
spec.subFlowMapping("true", sf -> sf.<String>handle(new ExceptionThrowingService()));
// Otherwise events flow onwards
spec.channelMapping("false", Processor.OUTPUT);
}).
get();
如果没有通道步骤和缓存的线程池,当遇到第一个事件验证失败时,处理会停止,但失败的事件是死信的,任何成功的事件都会通过。
使用线程池,所有事件都被处理。但是,在Dataflow上下文中没有事件是死记硬背的,因为异常是在执行程序线程中引发的,但成功的事件确实会引发Dataflow流。
死锁与入站消息有关。确实,每当下游发生任何异常时,接收传入消息的容器都会将其(或基于它的某些内容)发送到DLQ。这已经是您通过拆分器生成许多消息的自定义逻辑,但从代理的角度来看,这仍然是单个传入消息的问题。
为了克服这个问题,您应该考虑在每个拆分器项目验证时手动发送到DLQ。为此,您可以查看ExpressionEvalatingRequest estHandlerAdures
:http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#expression-advice.并将异常发送到某些通道进行分析。已经在那里,您可以发送到DLQ或其他东西。
在Spring Cloud Stream和Data Flow透视图中,您可以为目标添加一个更多的绑定配置:http://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#__code_input_code_and_code_output_code
使用Spring Integr中的拆分器,我拆分了从数据库中的表中选择的数据行。每条消息完成处理后,我想像旧消息一样将每条消息聚合到一条消息中。我该怎么办?我不知道拆分器拆分了多少条消息。我只知道拆分消息头中的相关ID。即使我聚合消息,我也无法制定发布策略。 我如何解决这个问题? 以及是否有任何方法可以使用jdbc-out站网关或jdbc-out站通道适配器一次插入多行数据,而无需使用拆分器插入每
我有一个用例,我的消息被拆分两次,我想聚合所有这些消息。如何才能最好地实现这一点,我应该通过引入不同的序列头来聚合消息两次,还是有办法通过重写消息分组的方法在单个聚合步骤中聚合消息?
当RabbitMq消息到达队列时,我目前正在使用IntegrationFlow来触发作业执行。IntegrationFlow的AmqpInFronChannelAdapter和作业的第一步的ItemReader都配置为从同一队列中读取消息。 我遇到的问题是IntegrationFlow的AmqpInboundChannelAdapter读取RabbitMQ消息,然后ItemReader再也找不到该
我正在开发一个Spring应用程序,它每分钟将接收大约500条xml消息。下面的xml配置只允许每分钟处理大约60条消息,其余消息存储在队列中(持久化在DB中),并以每分钟60条消息的速率检索。 尝试从多个来源阅读文档,但仍然不清楚轮询器和任务执行器的角色。我对当前每分钟处理60条消息的理解是因为轮询器配置中的“固定延迟”值设置为10(因此它将在1分钟内轮询6次),“每轮询最大消息数”设置为10,
我正在尝试使用spring integration设置我的应用程序,作为一名新手,需要以下用例的建议- 有一个队列,来自另一个应用程序的消息将被推送到该队列。我的应用程序使用队列中的消息,进行一些数据处理,然后将其推送到另一个出站队列。目标是以并发方式处理消息。 根据我的理解,我们可以有两种方法- 1.使用#轮询器 2.使用#调度器 从基于轮询器的配置来看,池中似乎有多个可用线程,可以同时获取消息
如何使用java dsl Integrationflows从spring集成触发spring批处理作业。 我有下面的代码,它轮询目录中的文件,当新文件添加到目录中时,会生成一条消息,我想在该实例中触发一个Spring批处理作业。请建议。