扩展与并行处理 - 多线程 Step

优质
小牛编辑
132浏览
2023-12-01

启动并行处理最简单的方式就是在 Step 配置中加上一个TaskExecutor , 比如,作为 tasklet 的一个属性:

  1. <step id="loading">
  2. <tasklet task-executor="taskExecutor">...</tasklet>
  3. </step>

上面的示例中, taskExecutor指向了另一个实现 TaskExecutor 接口的Bean. TaskExecutor 是一个标准的Spring接口,具体有哪些可用的实现类,请参考 Spring用户指南. 最简单的多线程 TaskExecutor 实现是 SimpleAsyncTaskExecutor.

以上配置的结果就是在 Step 在(每次提交的块)记录的读取,处理,写入时都会在单独的线程中执行。请注意,这段话的意思就是在要处理的数据项之间没有了固定的顺序, 并且一个非连续块可能包含项目相比,单线程的例子。此外executor还有一些限制(例如,如果它是由一个线程池在后台执行的),有一个tasklet的配置项可以调整,throttle-limit默认为4。你可能根据需要增加这个值以确保线程池被充分利用,如:

  1. <step id="loading"> <tasklet
  2. task-executor="taskExecutor"
  3. throttle-limit="20">...</tasklet>
  4. </step>

还需要注意在step中并发使用连接池资源时可能会有一些限制,例如数据库连接池 DataSource. 请确保连接池中的资源数量大于或等于并发线程的数量.

在一些常见的批处理情景中,对使用多线程Step有一些实际的限制。Step中的许多部分(如readers 和 writers)是有状态的,如果某些状态没有进行线程隔离,那么这些组件在多线程Step中就是不可用的。特别是大多数Spring Batch提供的readers 和 writers不是为多线程而设计的。但是,我们也可以使用无状态或线程安全的readers 和 writers,可以参考Spring Batch Samples中(parallelJob)的这个示例(点击进入Section 6.12, “Preventing State Persistence”),示例中展示了通过指示器来跟踪数据库input表中的哪些项目已经被处理过,而哪些还没有被处理。

Spring Batch 提供了ItemWriterItemReader 的一些实现. 通常在javadoc中会指明是否是线程安全的,或者指出在并发环境中需要注意哪些问题. 假若文档中没有明确说明,你只能通过查看源代码来看看是否有什么线程不安全的共享状态. 一个并非线程安全的 reader , 也可以在你自己处理了同步的代理对象中高效地使用.

如果你的step中写操作和处理操作所消耗的时间更多,那么即使你对read()操作加锁进行同步,也会比你在单线程环境中执行要快很多.