我有一个如下定义的spring批处理作业。
<batch:step id="convert">
<batch:tasklet >
<batch:chunk reader="contentItemReader" writer="contentItemWriter"
processor="processor" commit-interval="10000" >
</batch:chunk>
</batch:tasklet>
</batch:step>
contentItemReader如下所示。
@Bean
public StaxEventItemReader contentItemReader() {
StaxEventItemReader reader = new StaxEventItemReader();
reader.setFragmentRootElementName("ContentItem");
reader.setResource(new FileSystemResource(baseDirectory.concat(inputFile)));
reader.setUnmarshaller(contentItemUnmarshaller());
return reader;
}
一切都很好,除了它比我想要的要慢一点。我知道这个阅读器不是线程安全的。所以我不认为我可以向任务小工具添加一个任务执行器。ContentItems 不相互依赖,因此我想将数据并行馈送到处理器中。项目处理可能相当耗时。因此,尽管我知道我不能拥有多线程读取器,但我应该能够进行多线程项目处理。
ItemWriter也需要是单线程的,因为我使用的是平面文件ItemWriter。
实现这一目标的最佳方式是什么?
自<code>3.0.4版以来
只需将您的读者包装在这样的内容中:
public class SynchronizedWrapperReader<T> implements ItemStreamReader<T> {
private ItemReader<T> itemReader;
private boolean isStream = false;
public void setItemReader(ItemReader<T> itemReader) {
this.itemReader = itemReader;
if (itemReader instanceof ItemStream) {
isStream = true;
}
}
@Override
public void close() {
if (isStream) {
((ItemStream) itemReader).close();
}
}
@Override
public void open(ExecutionContext executionContext) {
if (isStream) {
((ItemStream) itemReader).open(new ExecutionContext());
}
}
@Override
public void update(ExecutionContext executionContext) {
}
@Override
public synchronized T read() throws Exception {
return itemReader.read();
}
}
你的作家也一样。
请注意,订单不再保证。
有关于如何在配置中使用它的评论.xml。因此,下面是一个如何将包装器与FlatFileItemReader一起使用的简单示例:
<batch:step id="convert">
<batch:tasklet >
<batch:chunk reader="wrappedReader" writer="..."
processor="..." commit-interval="10000" >
</batch:chunk>
</batch:tasklet>
</batch:step>
<bean id="wrappedReader" class=[package].SynchronizedWrapperReader">
<property name="itemReader">
<bean class="org.springframework.batch.item.file.FlatFileItemReader">
<property .../>
<property .../>
</bean>
</property>
</bean>