当前位置: 首页 > 知识库问答 >
问题:

Spring批处理慢写慢读

锺离声
2023-03-14

我有一个批处理任务,从SQLServer读取记录并写入MARIADB。尽管我在批处理过程中实现了分区的概念,但该过程非常缓慢

下面是源系统和目标系统的数据源配置。

@Bean(name = "sourceSqlServerDataSource")
    public DataSource mysqlDataSource() {
        HikariDataSource hikariDataSource = new HikariDataSource();
        hikariDataSource.setMaximumPoolSize(100);
        hikariDataSource.setUsername(username);
        hikariDataSource.setPassword(password);
        hikariDataSource.setJdbcUrl(jdbcUrl);
        hikariDataSource.setDriverClassName(driverClassName);
        hikariDataSource.setPoolName("Source-SQL-Server");
        return hikariDataSource;
    } 

    @Bean(name = "targetMySqlDataSource")
    @Primary
    public DataSource mysqlDataSource() {
        HikariDataSource hikariDataSource = new HikariDataSource();
        hikariDataSource.setMaximumPoolSize(100);
        hikariDataSource.setUsername(username);
        hikariDataSource.setPassword(password);
        hikariDataSource.setJdbcUrl(jdbcUrl);
        hikariDataSource.setDriverClassName(driverClassName);
        hikariDataSource.setPoolName("Target-Myql-Server");
        return hikariDataSource;
    }
@Bean(name = "myBatchJobsThreadPollTaskExecutor")
    public ThreadPoolTaskExecutor initializeThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(100);
        threadPoolTaskExecutor.setMaxPoolSize(200);
        threadPoolTaskExecutor.setThreadNamePrefix("My-Batch-Jobs-TaskExecutor ");
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
        threadPoolTaskExecutor.initialize();
        log.info("Thread Pool Initialized with min {} and Max {} Pool Size",threadPoolTaskExecutor.getCorePoolSize(),threadPoolTaskExecutor.getMaxPoolSize() );
        return threadPoolTaskExecutor;
    }

以下是配置的步骤和分区步骤

@Bean(name = "myMainStep")
    public Step myMainStep() throws Exception{
        return stepBuilderFactory.get("myMainStep").chunk(500)
                .reader(myJdbcReader(null,null))
                .writer(myJpaWriter()).listener(chunkListener)
                .build();
    }

    @Bean
    public Step myPartitionStep() throws Exception {
        return stepBuilderFactory.get("myPartitionStep").listener(myStepListener)
                .partitioner(myMainStep()).partitioner("myPartition",myPartition)
                .gridSize(50).taskExecutor(asyncTaskExecutor).build();
    }

用读者和作者更新帖子

@Bean(name = "myJdbcReader")
    @StepScope
    public JdbcPagingItemReader myJdbcReader(@Value("#{stepExecutionContext[parameter1]}") Integer parameter1, @Value("#{stepExecutionContext[parameter2]}") Integer parameter2) throws Exception{
        JdbcPagingItemReader jdbcPagingItemReader = new JdbcPagingItemReader();
        jdbcPagingItemReader.setDataSource(myTargetDataSource);
        jdbcPagingItemReader.setPageSize(500);
        jdbcPagingItemReader.setRowMapper(myRowMapper());
        Map<String,Object> paramaterMap=new HashMap<>();
        paramaterMap.put("parameter1",parameter1);
        paramaterMap.put("parameter2",parameter2);
        jdbcPagingItemReader.setQueryProvider(myQueryProvider());
        jdbcPagingItemReader.setParameterValues(paramaterMap);
        return jdbcPagingItemReader;
    }

    @Bean(name = "myJpaWriter")
    public ItemWriter myJpaWriter(){
        JpaItemWriter<MyTargetTable> targetJpaWriter = new JpaItemWriter<>();
        targetJpaWriter.setEntityManagerFactory(localContainerEntityManagerFactoryBean.getObject());
        return targetJpaWriter;
    }

有人能介绍如何使用Spring Batch提高读写性能吗?

共有1个答案

黎曾笑
2023-03-14

提高这种应用程序的性能取决于多个参数(网格大小、块大小、页面大小、线程池大小、db连接池大小、db服务器和JVM之间的延迟等)。所以我不能给你一个准确的答案,但我会试着提供一些指导:

  • 在开始改进性能之前,您需要明确定义基线+目标。说“它很慢”是没有意义的。至少准备好一个JVM探查器和一个带有查询执行计划分析器的SQL客户机。这些都是查找JVM或数据库上的性能瓶颈所必需的。
  • 将网格大小设置为50并使用核心大小=100的线程池意味着将创建50个线程,但不使用这些线程。确保您使用的是.taskexecutor(asyncTaskExecutor)中的线程池任务执行器,而不是Simpleasynctaskexecutor中不重用线程的线程池任务执行器。
  • 50个分区用于250K记录对我来说似乎太多了。每个分区将有5000条记录,每个分区将产生10个事务(因为chunkSize=500)。因此,在两个数据库、服务器和JVM之间将有10个事务x50个分区=500个事务。这可能是一个性能问题。我建议从更少的分区开始,例如5个或10个。提高并发性并不一定意味着提高性能。总有一个盈亏平衡点,你的应用程序将花更多的时间在上下文切换和处理并发性上,而不是处理业务逻辑。发现这一点是一个经验过程。
  • 我将首先在任何Spring批处理作业之外运行任何sql查询,以查看查询本身是否存在性能问题(查询抓取太多列、太多记录等)或db模式是否存在性能问题(例如缺少索引)
  • 对于这样的ETL作业,我不会使用JPA/Hibernate。将数据映射到域对象可能代价很高,尤其是在O/R映射没有优化的情况下。在这些情况下,原始JDBC通常更快。

还有许多其他技巧,比如估计内存中的项大小,确保内存中的总块大小小于堆大小,以避免块中不必要的GC,为批处理应用程序选择正确的GC算法,等等,但这些都是先进的。上面列出的指导方针是一个很好的起点,海事组织。

希望这能有所帮助!

 类似资料:
  • 我有一个spring批处理应用程序,可以将5M条记录从一个文件加载到SQL Server数据库中。我有根据国家代码区分的不同数据源。当我使用带有@primary注释的单个数据源时,spring batch writer在5分钟内写入5M条记录。但是,当我使用@bean注释给出多个数据源并使用一个非主数据源将文件数据写入数据库时,perforamnce变得非常慢,对于同样的500万条记录,大约需要1

  • 我们在单个节点上有1个喷口和1个螺栓。Spout从RabbitMQ读取数据,并将其发送到唯一一个将数据写入Cassandra的bolt。 我们的数据源每秒生成10000条消息,而storm处理这条消息大约需要10秒,这对我们来说太慢了。 我们尝试增加拓扑的并行度,但没有任何区别。 } 更新:有没有可能使用shuffle分组,相同的元组将被处理多次?使用的配置(喷口=4.螺栓=4),现在的问题是,随

  • 针对Postgres数据库的某个索引SELECT查询所花费的时间非常可变--从50毫秒到多秒,有时甚至是几分钟,即使在最轻的负载下也是如此。 你能为26秒的差距提出一个解释吗? 关于并发的注意事项:即使只有一个请求也有很大的可变性:端到端50-300毫秒,但是当一个用户提交一批大约100个这样的查找时(可能有10-20个同时运行),很可能有几个查找需要5-10秒。然而C3P0的统计数据从来没有比:

  • 我正在寻找测量Spring批处理读取、处理和写入操作的执行时间的最佳方法。在元数据中,有关于整个步骤的信息,而不是关于每个动作的信息。 谢谢你所有的回答!

  • 你能告诉我如何正确地实施它吗?提前道谢。

  • 我有一个批处理步骤 读取器和处理器流程如何工作?读取器是读取块并等待处理器处理它,还是一次读取所有块。