我需要依次执行七个不同的流程(一个接一个)。数据存储在Mysql中。我正在考虑以下选项,如果我错了,或者有更好的解决方案,请纠正我。
要求:
>
需要分块处理数据。
我的解决方案和问题:数据读取:
工艺流程:
>
定义七个步骤和块,使用databean在步骤之间共享数据。但是,这不是一个好主意,因为数据在块中处理,在每个块之后,step1编写器将在数据库中创建一组新的数据。当此数据在其他步骤中共享时,数据完整性将成为一个问题。
使用StepExecutionContext在步骤之间共享数据。但这可能会影响性能,因为这涉及批处理作业存储库。
仅定义一个步骤、一个ItemReader和一系列流程(七个流程),并创建一个ItemWriter,将处理后的数据写入DB。但是,我无法管理或监控每个不同的流程,所有这些都将在一个步骤中完成。
建议:
1) 使用JdbcCursorItemReader读取数据。
所有开箱即用的组件都是一个不错的选择,因为它们已经实现了ItemStream接口,使您的步骤可以重新启动。但就像您提到的,有时,请求只是为了复杂,或者像我一样,您已经有了可以重用的服务或DAO。
我建议您使用ItemReaderAdapter。它允许您配置委托服务以调用以获取数据。
<bean id="MyReader" class="xxx.adapters.MyItemReaderAdapter">
<property name="targetObject" ref="AnExistingDao" />
<property name="targetMethod" value="next" />
</bean>
请注意,目标方法必须尊重ItemReaders的读取契约(当没有更多数据时返回null)
如果您的作业不需要重新启动,您可以简单地使用类:org。springframework。一批项目适配器。ItemReaderAdapter
但是,如果需要重新启动作业,可以创建自己的ItemReaderAdapter,如下所示:
public class MyItemReaderAdapter<T> extends AbstractMethodInvokingDelegator<T> implements ItemReader<T>, ItemStream {
private long currentCount = 0;
private final String CONTEXT_COUNT_KEY = "count";
/**
* @return return value of the target method.
*/
public T read() throws Exception {
super.setArguments(new Long[]{currentCount++});
return invokeDelegateMethod();
}
@Override
public void open(ExecutionContext executionContext)
throws ItemStreamException {
currentCount = executionContext.getLong(CONTEXT_COUNT_KEY,0);
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CONTEXT_COUNT_KEY, currentCount);
log.info("Update Stream current count : " + currentCount);
}
@Override
public void close() throws ItemStreamException {
// TODO Auto-generated method stub
}
}
因为开箱即用的itemReaderAdapter不可重新启动,所以您只需创建自己的实现ItemStream
2) 关于7个步骤vs 1个步骤。
在这一点上,我将使用compositeProcessor的1个步骤。7步选项只会给IMO带来问题。
1) 7步数据库:因此,您的写入程序在一个数据库中提交,直到第7步。。然后,第7步编写器尝试提交到真实的数据库并弹出错误!!!全部丢失,批处理必须从步骤1重新启动!!
2) 上下文的7个步骤:可能更好,因为状态将保存在spring批处理元数据中。。但在springBatch的元数据中存储大数据并不是一个好的做法!!
3)是去IMO的方式。;-)
组织。springframework。一批项目支持CompositeItemProcessor是Spring批处理框架中的一个现成组件,它将支持类似于第二个选项的需求。这将允许您执行以下操作;-在从数据库读取数据的设计/解决方案中保持分离(itemreader)-保持每个处理器关注点和配置的分离-允许任何单个处理器通过返回null来“关闭”数据块,而不管以前的进程如何
CompositeItemProcessor迭代委托循环,因此它与动作模式“类似”。它在您描述的场景中非常有用,并且仍然允许您利用区块优势(异常、重试、提交策略等)
我正在使用STS 2.81附带的Spring Batch模板和Manning的Spring Batch in Action中的示例创建一个Spring Batch作业。我可以毫无问题地执行块读取器和写入器,但我的代码跳过了处理器。我甚至尝试过在处理器中取消所有对象,但什么也没有,对象仍然设法被写入,就像处理器被忽略一样。我尝试在处理器中调用system.out.println,但没有在终端中打印出
我正在尝试使用多个处理器类在处理器步骤中处理记录。这些类可以并行工作。目前我已经编写了一个多线程步骤,其中我 设置处理器类的输入和输出行 提交给遗嘱执行人服务 获取所有未来对象并收集最终输出
null 我更新了我的步骤并添加了一个ThreadPoolTaskExecutor,如下所示 在此之后,我的处理器将被多个线程调用,但使用相同的源数据。我还有什么需要做的吗?
我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。
当我使用Spring批处理管理运行长时间运行的批处理作业的多个实例时,它会在达到jobLauncher线程池任务执行程序池大小后阻止其他作业运行。但是从cron中提取多个工作似乎效果不错。下面是作业启动器配置。 Spring批处理管理员Restful API是否使用不同于xml配置中指定的作业启动器?
我对Spring批处理跳过逻辑有一些问题。我已经配置了一个作业的步骤来跳过两个异常(SQLIntegrityConstraintViolation异常和乐观锁定失败异常): 但当作业运行时,由于我将其配置为跳过的异常,作业以未知状态完成: 我做错什么了吗?我希望这一步跳过负责抛出其中一个异常的项,并继续处理,以便以完成状态结束。