当前位置: 首页 > 知识库问答 >
问题:

Parellel Processing Spring Batch StaxEventItemReader

弓方伟
2023-03-14

我有一个如下定义的spring批处理作业。

<batch:step id="convert">
    <batch:tasklet >
        <batch:chunk reader="contentItemReader" writer="contentItemWriter"
                             processor="processor" commit-interval="10000" >
        </batch:chunk>
     </batch:tasklet>
</batch:step>

contentItemReader如下所示。

 @Bean
 public StaxEventItemReader contentItemReader() {
        StaxEventItemReader reader = new StaxEventItemReader();
        reader.setFragmentRootElementName("ContentItem");
        reader.setResource(new FileSystemResource(baseDirectory.concat(inputFile)));
        reader.setUnmarshaller(contentItemUnmarshaller());
        return reader;
 }

一切都很好,除了它比我想要的要慢一点。我知道这个阅读器不是线程安全的。所以我不认为我可以向任务小工具添加一个任务执行器。ContentItems 不相互依赖,因此我想将数据并行馈送到处理器中。项目处理可能相当耗时。因此,尽管我知道我不能拥有多线程读取器,但我应该能够进行多线程项目处理。

ItemWriter也需要是单线程的,因为我使用的是平面文件ItemWriter。

实现这一目标的最佳方式是什么?

共有2个答案

宇文学博
2023-03-14

自<code>3.0.4版以来

陶智
2023-03-14

只需将您的读者包装在这样的内容中:

public class SynchronizedWrapperReader<T> implements ItemStreamReader<T> {

  private ItemReader<T> itemReader;
  private boolean isStream = false;

  public void setItemReader(ItemReader<T> itemReader) {
    this.itemReader = itemReader;
    if (itemReader instanceof ItemStream) {
      isStream = true;
    }
  }

  @Override
  public void close() {
    if (isStream) {
      ((ItemStream) itemReader).close();
    }
  }

  @Override
  public void open(ExecutionContext executionContext) {
    if (isStream) {
      ((ItemStream) itemReader).open(new ExecutionContext());
    }
  }

  @Override
  public void update(ExecutionContext executionContext) {
  }

  @Override
  public synchronized T read() throws Exception {
    return itemReader.read();
  }
}

你的作家也一样。

请注意,订单不再保证。

有关于如何在配置中使用它的评论.xml。因此,下面是一个如何将包装器与FlatFileItemReader一起使用的简单示例:

<batch:step id="convert">
    <batch:tasklet >
        <batch:chunk reader="wrappedReader" writer="..."
                             processor="..." commit-interval="10000" >
        </batch:chunk>
     </batch:tasklet>
</batch:step>

<bean id="wrappedReader" class=[package].SynchronizedWrapperReader">
   <property name="itemReader">
      <bean class="org.springframework.batch.item.file.FlatFileItemReader">
          <property .../>
          <property .../>
      </bean>
   </property>
</bean>
 类似资料:

相关问答

相关文章

相关阅读