当前位置: 首页 > 知识库问答 >
问题:

Spring批处理-在处理器和写入器之间传递数据

夏侯自珍
2023-03-14

我有一个包含Reader->Processor->Writer的spring批处理。

B/W传递的数据类型为emp:

class Emp {
    iny id;
    String name;
    EmpTypeEnum empType;    // HR, Dev, Tester, etc.

    // getters and setters
}

读取器中从CSV文件读取一个简单的批处理数据时,处理器中的一些处理&输出CSV文件由写入器编写。

public class EmpItemProcessor implements ItemProcessor<Emp, Emp> {

    int countHr;
    int countDev;
    int countTester;


        @Override
        public Person process(final Emp emp) throws Exception {

        if (item.getEmpType.equals(EmpTypeEnum.HR) {
            countHr++;
        } else if // .....

        // other processor on emp

            return emp;
        }

}

请建议。如果你认为任何其他方法会更好,请建议。

谢谢

共有1个答案

管梓
2023-03-14

为此,您可以使用ItemWriteListenerJobExecutionListenerSupport

>

  • 定义一个ItemWriteListener,每次调用您的writer后都会调用它。

    在此监听器中,每次都更新执行上下文中的计数器

    @Component
    @JobScope
    public class EmployeeWriteListener implements ItemWriteListener<Emp> {
    
      @Value("#{jobExecution.executionContext}")
      private ExecutionContext executionContext;
    
    
      @Override
      public void afterWrite(final List<? extends Emp> paramList) {
    
         final int counter =
              this.executionContext.getInt("TOTAL_EXPORTED_ITEMS", 0);
          this.executionContext.putInt("TOTAL_EXPORTED_ITEMS", counter + 1);
        }
    
      }
    }
    
    
    
    @Component
    @JobScope
    public class EmployeeNotificationListener extends JobExecutionListenerSupport  {
    
    @Override
    public void afterJob(final JobExecution jobExecution) {
    
      jobExecution.getExecutionContext()
          .getInt("TOTAL_EXPORTED_ITEMS")
      ...................
       }
     }
    
    this.jobBuilders.get("someJob").incrementer(new RunIdIncrementer()).listener(new EmployeeNotificationListener())
            .flow(this.getSomeStep()).end().build();
    //instead of new(..) you should Autowire listener
    
    public Step getSomeStep() {
    
      return stepBuilders.get("someStep").<X, Y>chunk(10)
          .reader(this.yourReader).processor(this.yourProcessor)
          .writer(this.yourProcessor).listener(this.EmployeeWriteListener)
          .build();
    }
    

  •  类似资料:
    • 我有一个要求,我需要从xls(其中存在一个名为netCreditAmount的列)中读取值并将值保存在数据库中。要求是从所有行中添加netCreditAmount的值,然后在数据库中仅为xls中的第一行设置此总和,其余行插入相应的netCreditAmounts。我应该如何在Spring Batch中进行实施。普通的阅读器、处理器和写入器工作正常,但我应该在哪里插入这个实现?谢谢!

    • 我有一个批处理步骤 读取器和处理器流程如何工作?读取器是读取块并等待处理器处理它,还是一次读取所有块。

    • (A)ItemReader[第一输入]->(A)ItemProcessor[第一输入]->(B)ItemReader[使用处理过的输入从另一个源收集第二输入]->(B)ItemProcessor[使用处理过的第一输入和第二输入]->{repeat B}->ItemWriter(最终结果) 有没有人知道如何在Spring批处理中这样做?多谢了。

    • 我有“N”没有的。客户/客户。对于每个客户/客户,我需要从数据库(读取器)中获取记录,然后我必须处理(处理器)客户/客户的所有记录,然后我必须将记录写入文件(写入器)。 如何将spring批处理作业循环N次?

    • 根据已接受的答案代码,对该代码的以下调整对我起作用: 我已经将这个问题更新到了一个可以正确循环的版本,但是由于应用程序将扩展,能够处理并行是很重要的,我仍然不知道如何在运行时用javaconfig动态地做到这一点... 基于查询列表(HQL查询),我希望每个查询都有一个读取器-处理器-写入器。我当前的配置如下所示: 工单 处理机 作家 目前,该过程对于单个查询来说工作得很好。然而,我实际上有一个查

    • 我刚开始使用Spring批处理,我有一个特殊问题。我希望使用从3个不同的jpa查询中获取结果,并分别处理它们,然后使用将它们写入一个统一的XML文件。 对于eg,生成的XML看起来像是,