通用批处理模式 - 将数据传递给Future Steps

优质
小牛编辑
132浏览
2023-12-01

11.8 将数据传递给Future Steps

将一个步骤传递给另外一个步骤,这通常很有用.这可以通过使用ExecutionContext实现.值得注意的是,有两个ExecutionContexts:一个step层面,一个在job级别.Step级别ExecutionContext 生命周期和一个step一样长,然而 job级别ExecutionContext贯穿整个job.另一方面,每次更新Step级别ExecutionContext,该Step提交一个chunk,然而Job级别ExecutionContext更新只会存在最后一个 Step.
这种分离的结果是,step的执行过程中所有的数据必须放置在step级别ExecutionContext的执行.当step是on-going状态时,将确保数据存储是正确的.如果数据存储在job级别的ExecutionContext中,那么它将不是持久化状态的,在Step执行期间如果Step失败,则数据会丢失.

  1. public class SavingItemWriter implements ItemWriter<Object> {
  2. private StepExecution stepExecution;
  3. public void write(List<? extends Object> items) throws Exception {
  4. // ...
  5. ExecutionContext stepContext = this.stepExecution.getExecutionContext();
  6. stepContext.put("someKey", someObject);
  7. }
  8. @BeforeStep
  9. public void saveStepExecution(StepExecution stepExecution) {
  10. this.stepExecution = stepExecution;
  11. }
  12. }

使数据可用于future Steps,step完成之后Job级别ExecutionContext将有’promoted’,Spring Batch为此提供了ExecutionContextPromotionListener.该监听器必须配置和数据相关的keys到ExecutionContext,它必须被提升.可选地,也可以配置的退出代码模式(“COMPLETED” 是默认的),
与所有监听一样,它必须在step中注册

  1. <job id="job1">
  2. <step id="step1">
  3. <tasklet>
  4. <chunk reader="reader" writer="savingWriter" commit-interval="10"/>
  5. </tasklet>
  6. <listeners>
  7. <listener ref="promotionListener"/>
  8. </listeners>
  9. </step>
  10. <step id="step2">
  11. ...
  12. </step>
  13. </job>
  14. <beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
  15. <beans:property name="keys" value="someKey"/>
  16. </beans:bean>

最后,保存的值必须从job ExeuctionContext重新获得:

  1. public class RetrievingItemWriter implements ItemWriter<Object> {
  2. private Object someObject;
  3. public void write(List<? extends Object> items) throws Exception {
  4. // ...
  5. }
  6. @BeforeStep
  7. public void retrieveInterstepData(StepExecution stepExecution) {
  8. JobExecution jobExecution = stepExecution.getJobExecution();
  9. ExecutionContext jobContext = jobExecution.getExecutionContext();
  10. this.someObject = jobContext.get("someKey");
  11. }
  12. }