当前位置: 首页 > 知识库问答 >
问题:

使用spring批处理分区处理海量数据

鲁烨熠
2023-03-14

我正在实现spring批处理作业,用于使用分区方法处理一个DB表中的数百万条记录,如下所示-

>

  • 从分区器中的表中提取唯一的分区代码,并在执行上下文中设置相同的代码。

    创建一个包含读取器、处理器和写入器的块步骤,以基于特定分区代码处理记录。

    是否可以创建分区/线程来处理像thread1进程1-1000,thread2进程1001-2000等?

    如何控制创建的线程数,因为分区代码可以是100个左右,我希望在5次迭代中只创建20个线程和进程?

    如果一个分区失败,会发生什么,所有的处理会停止并恢复吗?

     <bean id="MyPartitioner" class="com.MyPartitioner" />
     <bean id="itemProcessor" class="com.MyProcessor" scope="step" />
     <bean id="itemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step" >
      <property name="dataSource" ref="dataSource"/>
      <property name="sql" value="select * from mytable WHERE code = '#{stepExecutionContext[code]}' "/>
      <property name="rowMapper">
          <bean class="com.MyRowMapper" scope="step"/>
      </property>
    </bean>
    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
        <property name="corePoolSize" value="20"/>
        <property name="maxPoolSize" value="20"/>
        <property name="allowCoreThreadTimeOut" value="true"/>
    </bean>
    
    <batch:step id="Step1" xmlns="http://www.springframework.org/schema/batch">
        <batch:tasklet transaction-manager="transactionManager">
            <batch:chunk reader="itemReader"  processor="itemProcessor" writer="itemWriter" commit-interval="200"/>
        </batch:tasklet>
    </batch:step>
    <batch:job id="myjob">
        <batch:step id="mystep">
            <batch:partition step="Step1" partitioner="MyPartitioner">
                <batch:handler grid-size="20" task-executor="taskExecutor"/>
            </batch:partition>
        </batch:step>
    </batch:job>
    
    public class MyPartitioner implements Partitioner{
    @Override
    public Map<String, ExecutionContext> partition(int gridSize)
    {
    Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
    List<String> codes = getCodes();
    
    for (String code : codes)
    {
        ExecutionContext context = new ExecutionContext();
        context.put("code", code);
        partitionMap.put(code, context);
    }
    return partitionMap;}}
    

    谢谢

  • 共有1个答案

    通奕
    2023-03-14

    我会说这是正确的方法,我不明白为什么你需要有一个线程每1000个项目,如果你分区每个唯一的分区代码和有1000个项目块你将有1000个项目的事务每线程,这是IMO OK。

    >

  • 除了保存唯一的分区代码之外,您还可以通过为每1000个相同的分区代码创建新的子上下文来计算每个分区代码和更多的分区代码的个数(这样,对于具有2200个记录的分区代码,您将使用上下文参数调用3个线程:1=>partition_key=key1,skip=0,count=1000,2=>partition_key=key1,skip=1000,count=1000和3=>partition_key=key1,skip=2000,count=1000),如果这是您想要的,但我仍然会不使用它

    线程数是通过ThreadPoolTaskExecutor控制的,它在创建分区时传递给分区步骤。您有setCorePoolSize()方法,您可以在20个线程时设置该方法,您将最多获得20个线程。下一个细粒度配置是grid-size,它告诉将从完整分区映射中创建多少个分区。这里是网格大小的解释。所以分区就是分工。之后,线程配置将定义实际处理的并发性。

    如果一个分区失败,整个分区步骤将失败,并提供哪个分区失败的信息。成功的分区已经完成,不会再被调用,当作业重新启动时,它将通过重做失败的和未处理的分区来恢复它停止的部分。

    希望我能回答你的所有问题,因为你有很多问题。

    public class MyPartitioner implements Partitioner{
    @Override
    public Map<String, ExecutionContext> partition(int gridSize)
    {
        Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
        Map<String, int> codesWithCounts = getCodesWithCounts();
    
        for (Entry<String, int> codeWithCount : codesWithCounts.entrySet())
        {
            for (int i = 0; i < codeWithCount.getValue(); i + 1000){
                ExecutionContext context = new ExecutionContext();
                context.put("code", code);
                context.put("skip", i);
                context.put("count", 1000);
                partitionMap.put(code, context);
            }
        }
        return partitionMap;
    }
    

  •  类似资料:
    • 我的数据库中有大约1000万个blob格式的文件,我需要转换并以pdf格式保存它们。每个文件大小约为0.5-10mb,组合文件大小约为20 TB。我正在尝试使用spring批处理实现该功能。然而,我的问题是,当我运行批处理时,服务器内存是否可以容纳那么多的数据?我正在尝试使用基于块的处理和线程池任务执行器。请建议运行作业的最佳方法是否可以在更短的时间内处理如此多的数据

    • 备忘 1 GB: 十亿个字节(Byte) 1(B) * 10*10^8 / 1024 / 1024 ≈ 953.67(MB) ≈ 1000(MB) ≈ 1(GB) 400 MB: 一亿个 4 字节(Byte) int 整型占用的内存 4(B) * 10^8 / 1024 / 1024 ≈ 381.57(MB) ≈ 382(MB) ≈ 400(MB) 10 亿个整型 -> 400(MB) * 10

    • 我正在尝试为分区配置Spring批处理步骤。这里很好的示例显示了一个关于“ID范围”的分区,但我不知道如何从“数据页”范围开始。 在我的顺序步骤中,我有: null

    • 我正在处理200万记录和网格大小为20的Spring批处理应用程序。对于100200k记录,它工作正常,但200万分区后失败。 分区信息: 它打印的最后一个日志是创建的分区,然后控件永远不会进入writer。对于20万条记录来说,它很困难,所以我增加了连接池的大小,这解决了这个问题,但在200万行中失败了。 没有错误,只是挂在那里。

    • 所谓海量数据处理,无非就是基于海量数据上的存储、处理、操作。何谓海量,就是数据量太大,所以导致要么是无法在较短时间内迅速解决,要么是数据太大,导致无法一次性装入内存。 那解决办法呢? 针对时间,我们可以采用巧妙的算法搭配合适的数据结构,如Bloom filter/Hash/bit-map/堆/trie树。 针对空间,无非就一个办法:大而化小,分而治之(hash映射)。 二、算法/数据结构基础 1.

    • 我使用的是Spring Batch 2.1.8。释放我有一个文件,它由一些头信息和一些需要处理的记录组成。 我有一个使用面向块处理的步骤。该步骤包含ItemReader和ItemWriter的实现。ItemReader实现是线程安全的,而ItemWriter不是。 我想在处理(或写入)任何记录之前使用标题信息。在继续使用面向块的处理时,如何确保这一点? 建议的解决方案:一种解决方案可以是编写一个预