当前位置: 首页 > 知识库问答 >
问题:

spring批处理程序链

璩慎之
2023-03-14

我需要依次执行七个不同的流程(一个接一个)。数据存储在Mysql中。我正在考虑以下选项,如果我错了,或者有更好的解决方案,请纠正我。

要求:

>

需要分块处理数据。

我的解决方案和问题:数据读取:

  1. 使用JdbcCursorItemReader读取数据,因为这是性能最好的db阅读器-但是,SQL非常复杂,所以我可能不得不考虑使用JdbcTemboard的自定义ItemReader?这让我在处理数据时更加灵活。

工艺流程:

>

  • 定义七个步骤和块,使用databean在步骤之间共享数据。但是,这不是一个好主意,因为数据在块中处理,在每个块之后,step1编写器将在数据库中创建一组新的数据。当此数据在其他步骤中共享时,数据完整性将成为一个问题。

    使用StepExecutionContext在步骤之间共享数据。但这可能会影响性能,因为这涉及批处理作业存储库。

    仅定义一个步骤、一个ItemReader和一系列流程(七个流程),并创建一个ItemWriter,将处理后的数据写入DB。但是,我无法管理或监控每个不同的流程,所有这些都将在一个步骤中完成。

  • 共有2个答案

    韩彬
    2023-03-14

    建议:

    1) 使用JdbcCursorItemReader读取数据。

    所有开箱即用的组件都是一个不错的选择,因为它们已经实现了ItemStream接口,使您的步骤可以重新启动。但就像您提到的,有时,请求只是为了复杂,或者像我一样,您已经有了可以重用的服务或DAO。

    我建议您使用ItemReaderAdapter。它允许您配置委托服务以调用以获取数据

        <bean id="MyReader" class="xxx.adapters.MyItemReaderAdapter">
           <property name="targetObject" ref="AnExistingDao" />
           <property name="targetMethod" value="next" />        
       </bean>
    

    请注意,目标方法必须尊重ItemReaders的读取契约(当没有更多数据时返回null)

    如果您的作业不需要重新启动,您可以简单地使用类:org。springframework。一批项目适配器。ItemReaderAdapter

    但是,如果需要重新启动作业,可以创建自己的ItemReaderAdapter,如下所示:

    public class MyItemReaderAdapter<T> extends AbstractMethodInvokingDelegator<T> implements ItemReader<T>, ItemStream {
    
    
    
    
    private long currentCount = 0;
    
    private final String CONTEXT_COUNT_KEY = "count"; 
    
    /**
     * @return return value of the target method.
     */
    public T read() throws Exception {
    
        super.setArguments(new Long[]{currentCount++});
        return invokeDelegateMethod();
    }
    @Override
    public void open(ExecutionContext executionContext)
            throws ItemStreamException {
        currentCount = executionContext.getLong(CONTEXT_COUNT_KEY,0);
    
    }
    
    
    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CONTEXT_COUNT_KEY, currentCount);
        log.info("Update Stream current count : " + currentCount);
    }
    
    
    @Override
    public void close() throws ItemStreamException {
        // TODO Auto-generated method stub
    
    }
    
    }
    

    因为开箱即用的itemReaderAdapter不可重新启动,所以您只需创建自己的实现ItemStream

    2) 关于7个步骤vs 1个步骤。

    在这一点上,我将使用compositeProcessor的1个步骤。7步选项只会给IMO带来问题。

    1) 7步数据库:因此,您的写入程序在一个数据库中提交,直到第7步。。然后,第7步编写器尝试提交到真实的数据库并弹出错误!!!全部丢失,批处理必须从步骤1重新启动!!

    2) 上下文的7个步骤:可能更好,因为状态将保存在spring批处理元数据中。。但在springBatch的元数据中存储大数据并不是一个好的做法!!

    3)是去IMO的方式。;-)

    陶睿
    2023-03-14

    组织。springframework。一批项目支持CompositeItemProcessor是Spring批处理框架中的一个现成组件,它将支持类似于第二个选项的需求。这将允许您执行以下操作;-在从数据库读取数据的设计/解决方案中保持分离(itemreader)-保持每个处理器关注点和配置的分离-允许任何单个处理器通过返回null来“关闭”数据块,而不管以前的进程如何

    CompositeItemProcessor迭代委托循环,因此它与动作模式“类似”。它在您描述的场景中非常有用,并且仍然允许您利用区块优势(异常、重试、提交策略等)

     类似资料:
    • 我正在使用STS 2.81附带的Spring Batch模板和Manning的Spring Batch in Action中的示例创建一个Spring Batch作业。我可以毫无问题地执行块读取器和写入器,但我的代码跳过了处理器。我甚至尝试过在处理器中取消所有对象,但什么也没有,对象仍然设法被写入,就像处理器被忽略一样。我尝试在处理器中调用system.out.println,但没有在终端中打印出

    • 我正在尝试使用多个处理器类在处理器步骤中处理记录。这些类可以并行工作。目前我已经编写了一个多线程步骤,其中我 设置处理器类的输入和输出行 提交给遗嘱执行人服务 获取所有未来对象并收集最终输出

    • null 我更新了我的步骤并添加了一个ThreadPoolTaskExecutor,如下所示 在此之后,我的处理器将被多个线程调用,但使用相同的源数据。我还有什么需要做的吗?

    • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

    • 当我使用Spring批处理管理运行长时间运行的批处理作业的多个实例时,它会在达到jobLauncher线程池任务执行程序池大小后阻止其他作业运行。但是从cron中提取多个工作似乎效果不错。下面是作业启动器配置。 Spring批处理管理员Restful API是否使用不同于xml配置中指定的作业启动器?

    • 我对Spring批处理跳过逻辑有一些问题。我已经配置了一个作业的步骤来跳过两个异常(SQLIntegrityConstraintViolation异常和乐观锁定失败异常): 但当作业运行时,由于我将其配置为跳过的异常,作业以未知状态完成: 我做错什么了吗?我希望这一步跳过负责抛出其中一个异常的项,并继续处理,以便以完成状态结束。