当前位置: 首页 > 知识库问答 >
问题:

Camel拆分器并行处理阵列列表-并发访问问题

舒枫涟
2023-03-14

使用Camel拆分数组列表,并在多达10个线程中并行处理每个项目。以下是配置。html" target="_blank">线程池配置文件被设置为最大线程数=10。

<camel:route id="ReportsRoute">
        <camel:from uri="direct:processReportsChannel" />
        <camel:to uri="bean:reportRepository?method=getPendingTransactions" />
        <camel:split parallelProcessing="true" executorServiceRef="ReportThreadPoolProfile">
            <camel:simple>${body}</camel:simple>
            <camel:doTry>
                <camel:to uri="direct:processReportChannel" />
                <camel:doCatch>
                    <camel:exception>java.lang.Exception</camel:exception>
                    <camel:handled>
                        <camel:constant>true</camel:constant>
                    </camel:handled>                        
                    <camel:to uri="bean:ReportRepository?method=markAsFailed"/>
                    <camel:wireTap uri="direct:loggingAndNotificationChannel" />
                </camel:doCatch>
            </camel:doTry>
        </camel:split>
    </camel:route>  

bean:reportRepository?method=getPendingTransactions获取ArrayList并传递给Splitter。

进程报告通道是处理项目的处理器。

问题:当作业开始时,它正在启动10个线程,但是一些线程正在拾取同一个项目。例如,如果我在数组列表中有item_no_1到10,thread_no_1和thread_no_2或者更多的线程正在获取,比如item_no_2。是因为数组列表不是线程安全的,而Splitter没有管理它吗?

我不是这方面的专家,需要帮助来指出问题在哪里。

共有1个答案

严曜文
2023-03-14

我使用以下(更简单的)设置进行了测试:

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route id="ReportsRoute">
        <from uri="direct:start" />
         <!-- By default a pool size of 10 is used. -->
        <split parallelProcessing="true">
            <simple>${body}</simple>
            <to uri="direct:sub" />
        </split>
    </route>  
    <route>
        <from uri="direct:sub"/>
        <log message="Processing item ${body}" />
    </route>
</camelContext>

测试:

 List<Object> list = new ArrayList<>();
 for (int i = 0; i < 1000; i++) {
    list.add("And we go and go: " + (i + 1));
 }
 template.sendBody("direct:start", list);

使用此设置时,不会对任何条目进行两次处理。因此,处理器中一定存在导致问题的内容,即同一列表项被多个线程选取。

 类似资料:
  • 我们有一个服务调用,它返回一个id列表,我们用它调用另一个服务,该服务一次只接受一个id,因此我们使用的是驼峰分割器,并行处理转换为。现在,我们通过seda调用该服务,因此可以设置超时。这将导致并行处理不再是并行的问题,因为seda默认只有1个并发使用者在处理它。 选项: < li >放一个?seda上的concurrent consumers = x < li >使用direct而不是seda。

  • 问题内容: 我有一个很大的数组,我想通过将它的片段交给一些异步任务来处理。作为概念证明,我编写了以下代码: 之后,将使用0到9之间的随机整数初始化。 该函数使用它们各自的插槽作为累加器,分派10个任务,这些任务从50000个项目的不相交的块中将它们相加并相加。 该程序在行崩溃。错误是: 我可以看到,在调试器中,它在崩溃前已经进行了几次迭代,并且在崩溃时变量具有正确范围内的值。 我读过尝试访问已发布

  • 问题内容: 我正在尝试遍历Python Pandas数据框的行。在数据帧的每一行中,我试图通过其列名引用行中的每个值。 这是我所拥有的: 我使用了这种方法来进行迭代,但这只是解决方案的一部分- 在每次迭代中选择一行后,如何通过列名称访问行元素? 这是我想做的事情: 我的理解是该行是熊猫系列。但是我无法索引该系列。 在遍历行的同时可以使用列名吗? 问题答案: 我也喜欢 由于行是一个名为元组,如果你打

  • 与之间的差异: > 它们可能有不同的特点: 这里讨论的似乎是另一个毫无意义的流拆分器特性策略(并行计算似乎更好):深入理解Java8和Java9中的拆分器特性 在本例中,从禁用拆分功能的顺序流创建了一个拆分器(返回null)。当以后需要转换回一个流时,该流不会从并行处理中受益。一种耻辱。 最大的问题是:作为解决办法,在调用之前总是将流转换为并行流会有什么主要影响?

  • 如何拆分列表并并行执行 我的场景-- 我从webservices获得了1000辆使用下面列表的车辆。 我想在每个列表中分割100辆车并并行执行所有列表。 谢谢

  • 问题内容: 我有一个多线程应用程序,该程序具有仅由主线程更新(写入)的集中列表。然后,我还有其他几个线程需要定期检索当前状态的列表。有没有一种方法可以让我做到这一点? 问题答案: 这取决于您要如何限制并发。最简单的方法可能是使用。当您从中获取一个迭代器时,该迭代器将反映 该列表在创建迭代器时的外观 - 迭代器 将看不到后续修改。好处是它可以应付很多争用,缺点是添加新项相当昂贵。 另一种方法是锁定,