通用批处理模式 - 业务原因手工停止任务

优质
小牛编辑
138浏览
2023-12-01

11.2 业务原因手工停止任务

Spring Batch通过JobLauncher接口提供一个stop()方法,但是这实际上是给维护人员用的,而不是程序员.有时有更方便和更有意义的阻止任务中的业务逻辑执行.

最简单的做法是抛出一个RuntimeException(不是无限的重试或跳过).例如,可以使用一个自定义异常类型,如以下示例:
public class PoisonPillItemWriter implements ItemWriter {

  1. public void write(T item) throws Exception {
  2. if (isPoisonPill(item)) {
  3. throw new PoisonPillException("Posion pill detected: " + item);
  4. }
  5. }
  6. }

ItemReader中阻止一个步骤执行的另一种简单的方法是简单地返回null:
public class EarlyCompletionItemReader implements ItemReader {

  1. private ItemReader<T> delegate;
  2. public void setDelegate(ItemReader<T> delegate) { ... }
  3. public T read() throws Exception {
  4. T item = delegate.read();
  5. if (isEndItem(item)) {
  6. return null; // end the step here
  7. }
  8. return item;
  9. }
  10. }

前面的例子实际上依赖于这样一个事实,当item处理是null时,有一个默认的实现CompletionPolicy的策略来实现批处理,更复杂完善是策略可以通过SimpleStepFactoryBean实现和注入Step:

  1. <step id="simpleStep">
  2. <tasklet>
  3. <chunk reader="reader" writer="writer" commit-interval="10"
  4. chunk-completion-policy="completionPolicy"/>
  5. </tasklet>
  6. </step>
  7. <bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>

另一种方法是在框架启动处理item的时候检查step设置一个标志StepExecution.实现这个替代方案,我们需要使用当前的StepExecution,这可以通过实现一个StepListener 和注册step:

  1. public class CustomItemWriter extends ItemListenerSupport implements StepListener {
  2. private StepExecution stepExecution;
  3. public void beforeStep(StepExecution stepExecution) {
  4. this.stepExecution = stepExecution;
  5. }
  6. public void afterRead(Object item) {
  7. if (isPoisonPill(item)) {
  8. stepExecution.setTerminateOnly(true);
  9. }
  10. }
  11. }

在这里flag设置默认的行为是step抛出一个JobInterruptedException.这可以通过StepInterruptionPolicy控制,但唯一的选择是抛出一个exception,所以这个job总是一个异常的结束.