当前位置: 首页 > 知识库问答 >
问题:

并行执行和聚合器锁定

郭兴文
2023-03-14

我使用spring集成来处理文件的一些目录,每个文件都通过一个“流”。我希望以这样一种方式设置文件的整体处理,即文件轮询器监视一个目录(或多个目录)中的新文件。一旦轮询器拾取了一个新文件,就应该将其传递给流中的后续组件,在该组件中处理该新文件,而不保持轮询过程。处理的第二个方面是,所有新文件都要经过几个步骤,然后由聚合器根据文件数(标准在不同目录中变化)进行聚合。一旦积累了足够的文件,就可以从聚合中释放它们,然后在聚合器之后以一些耗时的步骤进行处理。整个过程如下所示

file-A拾取的
file-A从轮询器传递到step1
file-A从step1传递到聚合器
file-B拾取的
file-B从轮询器传递到step1
file-B从step1传递到聚合器
file-C拾取的
file-C从轮询器传递到step1
file-C从step1传递到聚合器

文件A、B和C从聚合器
中释放文件A、B和C由final-Step
处理

所以总体来说有两个要求

  1. 在单独的线程中处理每个文件,以最大限度地增加当前正在处理的文件数
  2. 从聚合器发布的文件属于一个关联id,我们只希望在最后一步处理一组使用相同关联id的消息

我如何尝试满足这两个要求是为了#1,我只是在文件轮询器之后使用一个队列,在该队列中丢弃新文件,然后步骤a从队列中拾取文件。这分离了轮询过程,其想法是在步骤中使用线程执行器—服务激活器在单个线程中处理每个文件—第二个需求是通过在聚合器所在的线程中,在聚合器之后执行最后一个步骤来自动处理的。由于聚合器在释放另一个组以获得相同的相关id时会基于相关id设置锁,因此它只需等待相同组的前一个实例被处理

我遇到的问题是#1没有完成,因为服务激活器在尝试为第二个文件创建另一个线程之前,一直等到线程完成结束。这并没有什么帮助,因为在service activator上有一个线程执行器并没有什么用处。似乎只有在完成第一个线程后才能创建第二个therad。因此,为了解决这个问题,我将排队通道替换为调度程序通道,并在调度程序通道上分配执行器。现在,每个文件都在一个单独的线程中处理,同时处理多个文件<现在是第二部分,由于聚合器之后的组件非常耗时,我想将该html" target="_blank">进程与第一部分断开连接,所以我在聚合器之后放置了一个排队通道,但现在使用这种方法,我以前在聚合器中获得的锁定行为消失了,因为在最后耗时的步骤之前,从聚合器释放消息的线程在排队通道中死亡/完成。

对整个过程的任何想法。我如何在并行运行的同时完成我的两个需求。

谢谢

共有1个答案

百里伟
2023-03-14

根本不清楚你的问题是什么。是的,为了使下游流在锁下运行,它必须在最终的“释放”线程(处理完成组的最后一条入站消息的线程)上运行,您不能在聚合器的下游使用队列或执行器通道。

但是,这对来自执行器通道的线程没有影响;其他组(具有不同相关性)将进行处理。但是,如果您对“next”组使用相同的关联id,则其线程将被阻塞。

如果您想在第一个组正在下游处理时组装下一个组(使用相同的correlationid),则必须使用其他机制来强制下游单线程,例如带有单线程执行器的执行器通道,或使用另一个锁注册表。

 类似资料:
  • 在https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-filter-aggregation.html我们了解到: 以上代码显示了我们如何为t恤添加aggs,但我们如何做到: 和

  • 我在应用程序中使用拆分器聚合器模式。我有以下配置- 我的所有通道(CH1、CH2、CH3)都是。Splitter输入通道CH1的源代码是一个文件。 在我的测试中,我观察到即使在CH1通道中添加两个文件,在给定时间也只有一个文件被处理。所以我在我的CH1通道中添加了一个轮询器,现在正在同时处理CH1通道上的多个输入消息。 在聚合器方面,我也注意到执行总是单线程的,即直到第一个线程完成执行,第二个线程

  • 本文向大家介绍在MongoDB中执行聚合排序?,包括了在MongoDB中执行聚合排序?的使用技巧和注意事项,需要的朋友参考一下 您可以将method和$sort()运算符一起使用。为了理解这个概念,让我们用文档创建一个集合。使用文档创建集合的查询如下- 在method的帮助下显示集合中的所有文档。查询如下- 以下是输出- 这是对MongoDB聚合排序的查询。 情况1-每当您希望结果按降序排列时。查

  • 1.3 新版功能. 默认情况下,Fabric 会默认 顺序 执行所有任务(详细信息参见 Execution strategy ),这篇文档将介绍 Fabric 如何在多个主机上 并行 执行任务,包括 Fabric 参数设置、任务独立的装饰器,以及命令行全局控制。 它是如何运转的 由于 Fabric 1.x 并不是完全线程安全(以及为了更加通用,任务函数之间并不会产生交互),该功能的实现是基于 Py

  • 问题内容: 我正在尝试确定一个程序/软件,该程序/软件将使我能够有效地提取大量大型CSV文件(总计40+ GB),并输出具有导入到Elasticsearch(ES)所需的特定格式的JSON文件。 jq可以有效地获取如下数据: 按ID进行汇总(这样,多个文件中CSV行中的所有JSON文档都属于一个id条目),输出如下所示: 我用Matlab编写了一个脚本,但由于担心它的执行速度慢得多。我可能需要花费

  • 我一直在尝试在聚集中添加超时,以避免等待每个流都完成。但是当我添加超时时,它不起作用,因为聚合器等待每个流完成。 E、 在我的流中,其中一个有2秒的延迟,另一个有4秒的延迟 我使用遗嘱执行人。newCachedThreadPool()以并行运行。我想释放包含的每条消息,直到超时完成 我一直在测试的另一种方法是使用默认的gatherer,并在scatterGather中设置GathereTimeou