当前位置: 首页 > 知识库问答 >
问题:

如何使用Spring集成DSL处理拆分的消息集合,并将故障传递给Spring云数据流

冯沛
2023-03-14

我在使用DSL获取Spring集成流时遇到了一些问题,DSL有一个拆分器,可以在Spring云数据流上下文中正确地满足我的需要。从本质上讲,我的微服务是一个处理器,它试图做到以下几点:

  1. 一些第三方报告被检索并合并在一起。这将生成一组事件。(注意:我的微服务不是一个源,因为它需要由一个可配置的时间表驱动,所以我有一个定时触发器,将命令消息传送到此微服务。)
  2. 事件集合被拆分,因此可以验证每个事件。通过验证的事件很好,并继续进行。验证失败的事件在数据流上下文中必须是死信。因此,这里没有失败或成功的一批事件的概念

我试图克服的问题是:

  • 对验证失败的事件抛出MessaginException不会导致进一步的事件处理
  • 解决这种过早的停止会导致数据流中没有死信,因为解决方法涉及在执行器中抛出异常,当然这些异常不会发生

验证失败的事件用hasError消息头标记,路由器使用该消息头将其发送给引发MessaginException的ServiceActivator。

经过一些调查和实验,我有缺陷的实验如下所示:

IntegrationFlows.from(Processor.INPUT).

    // stuff ommitted for brevity
    handle(new MyEventPublisher(.........)).

    // List of events produced, split them
    split().

    // validate each event
    transform(new MyEventValidator()).

    // attempt to circumvent premature stoppage
    channel(MessageChannels.executor(Executors.newCachedThreadPool())).

    // route events based on validation result
    <String>route("headers[hasError] != null && headers[hasError] == 'true'",
    spec -> {
                spec.resolutionRequired(false);
                spec.defaultOutputChannel(Processor.OUTPUT);

                // A failed event routes to a service that throws a MessagingException
                spec.subFlowMapping("true", sf -> sf.<String>handle(new ExceptionThrowingService()));

                // Otherwise events flow onwards
                spec.channelMapping("false", Processor.OUTPUT);
            }).
get();

如果没有通道步骤和缓存的线程池,当遇到第一个事件验证失败时,处理会停止,但失败的事件是死信的,任何成功的事件都会通过。

使用线程池,所有事件都被处理。但是,在Dataflow上下文中没有事件是死记硬背的,因为异常是在执行程序线程中引发的,但成功的事件确实会引发Dataflow流。

我是否能够使用拆分器,处理整个输入,并将消息异常传递给数据流运行时?

共有1个答案

简俊楚
2023-03-14

死锁与入站消息有关。确实,每当下游发生任何异常时,接收传入消息的容器都会将其(或基于它的某些内容)发送到DLQ。这已经是您通过拆分器生成许多消息的自定义逻辑,但从代理的角度来看,这仍然是单个传入消息的问题。

为了克服这个问题,您应该考虑在每个拆分器项目验证时手动发送到DLQ。为此,您可以查看ExpressionEvalatingRequest estHandlerAdures:http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#expression-advice.并将异常发送到某些通道进行分析。已经在那里,您可以发送到DLQ或其他东西。

在Spring Cloud Stream和Data Flow透视图中,您可以为目标添加一个更多的绑定配置:http://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#__code_input_code_and_code_output_code

 类似资料:
  • 使用Spring Integr中的拆分器,我拆分了从数据库中的表中选择的数据行。每条消息完成处理后,我想像旧消息一样将每条消息聚合到一条消息中。我该怎么办?我不知道拆分器拆分了多少条消息。我只知道拆分消息头中的相关ID。即使我聚合消息,我也无法制定发布策略。 我如何解决这个问题? 以及是否有任何方法可以使用jdbc-out站网关或jdbc-out站通道适配器一次插入多行数据,而无需使用拆分器插入每

  • 我有一个用例,我的消息被拆分两次,我想聚合所有这些消息。如何才能最好地实现这一点,我应该通过引入不同的序列头来聚合消息两次,还是有办法通过重写消息分组的方法在单个聚合步骤中聚合消息?

  • 当RabbitMq消息到达队列时,我目前正在使用IntegrationFlow来触发作业执行。IntegrationFlow的AmqpInFronChannelAdapter和作业的第一步的ItemReader都配置为从同一队列中读取消息。 我遇到的问题是IntegrationFlow的AmqpInboundChannelAdapter读取RabbitMQ消息,然后ItemReader再也找不到该

  • 我正在开发一个Spring应用程序,它每分钟将接收大约500条xml消息。下面的xml配置只允许每分钟处理大约60条消息,其余消息存储在队列中(持久化在DB中),并以每分钟60条消息的速率检索。 尝试从多个来源阅读文档,但仍然不清楚轮询器和任务执行器的角色。我对当前每分钟处理60条消息的理解是因为轮询器配置中的“固定延迟”值设置为10(因此它将在1分钟内轮询6次),“每轮询最大消息数”设置为10,

  • 我正在尝试使用spring integration设置我的应用程序,作为一名新手,需要以下用例的建议- 有一个队列,来自另一个应用程序的消息将被推送到该队列。我的应用程序使用队列中的消息,进行一些数据处理,然后将其推送到另一个出站队列。目标是以并发方式处理消息。 根据我的理解,我们可以有两种方法- 1.使用#轮询器 2.使用#调度器 从基于轮询器的配置来看,池中似乎有多个可用线程,可以同时获取消息

  • 如何使用java dsl Integrationflows从spring集成触发spring批处理作业。 我有下面的代码,它轮询目录中的文件,当新文件添加到目录中时,会生成一条消息,我想在该实例中触发一个Spring批处理作业。请建议。