当前位置: 首页 > 知识库问答 >
问题:

使用多个数据源的Spring批处理慢写

令狐献
2023-03-14

我有一个spring批处理应用程序,可以将5M条记录从一个文件加载到SQL Server数据库中。我有根据国家代码区分的不同数据源。当我使用带有@primary注释的单个数据源时,spring batch writer在5分钟内写入5M条记录。但是,当我使用@bean注释给出多个数据源并使用一个非主数据源将文件数据写入数据库时,perforamnce变得非常慢,对于同样的500万条记录,大约需要15分钟。谁能解释一下Spring Batch与主数据源和其他数据源的行为是否不同。

    @Repository
    @ComponentScan(basePackages = "com.abc.extract")
    @EnableBatchProcessing
    @EnableTransactionManagement
    @Slf4j
    public class DataSourceConfig {
    
        public DataSourceConfig() {
    
        }
    
        @Autowired
        private CCMConfiguration ccmConfig;
    
        @Autowired
        SpotlightConfig spotlight;
    
        private DataSource datasourcesVal = null;
        
        private DataSource datasourcesVal1 = null;
        
        private Map<String,DataSource> datasourcesMap = null;
    
        @Value("${app.country.code}")
        String countryCode;
        
        @Primary
        @Bean("batchprimary")
        public HikariDataSource hikariDataSource(@Value("${app.country.code}")
        String countryCode1) throws IllegalBlockSizeException, NoSuchPaddingException, BadPaddingException,
                NoSuchAlgorithmException, InvalidKeyException {
            System.out.println("Contrycode:"+countryCode1);
            SqlServerConfiguration dbConfiguration = ccmConfig.getSqlServerDatabaseConfig(countryCode1);
            HikariDataSource ds = null;
            try {
                final String password = dbConfiguration.getDataBasePwd();
                final String username = dbConfiguration.getDataBaseUserName();
                ds = (HikariDataSource) DataSourceBuilder.create().type(HikariDataSource.class).build();
                ds.setUsername(username);
                ds.setPassword(password);
                ds.setDriverClassName(dbConfiguration.getDataBaseDriver());
                ds.setJdbcUrl(dbConfiguration.getDataBaseUrl());
                ds.setConnectionTestQuery("SELECT 1");
                ds.setConnectionTimeout(dbConfiguration.getConnectionTimeout());
                ds.setIdleTimeout(dbConfiguration.getIdleTimeout());
                ds.setMaximumPoolSize(dbConfiguration.getHikariPoolSize());
                //ds.setMaximumPoolSize(2);
                ds.setMaxLifetime(dbConfiguration.getMaxLifetime());
                ds.setLeakDetectionThreshold(dbConfiguration.getLeakDetectionThreshold());
                ds.setPoolName(dbConfiguration.getPoolName());
                this.datasourcesVal = ds;
            } catch (Exception ex) {
                spotlight.sendNotification(ex, "Critical: Failed to establish Database connection");
                log.error(ex.getMessage());
            } finally {
                if (!Objects.nonNull(ds) || ds.isClosed()) {
                    ds.close();
                    spotlight.sendNotification(new NullPointerException("Primary data source creation failed"),
                            "Critical: Failed to establish Database connection");
                }
            }
            return ds;
    
        }
    
    
        

    @Bean("batchprimary1")
            public HikariDataSource hikariDataSource1(@Value("${app.country.code}")
            String countryCode1) throws IllegalBlockSizeException, NoSuchPaddingException, BadPaddingException,
                    NoSuchAlgorithmException, InvalidKeyException {
                System.out.pri

ntln("Contrycode:"+countryCode1);
            SqlServerConfiguration dbConfiguration = ccmConfig.getSqlServerDatabaseConfig(countryCode1);
            HikariDataSource ds = null;
            try {
                final String password = dbConfiguration.getDataBasePwd();
                final String username = dbConfiguration.getDataBaseUserName();
                ds = (HikariDataSource) DataSourceBuilder.create().type(HikariDataSource.class).build();
                ds.setUsername(username);
                ds.setPassword(password);
                ds.setDriverClassName(dbConfiguration.getDataBaseDriver());
                ds.setJdbcUrl(dbConfiguration.getDataBaseUrl());
                ds.setConnectionTestQuery("SELECT 1");
                ds.setConnectionTimeout(dbConfiguration.getConnectionTimeout());
                ds.setIdleTimeout(dbConfiguration.getIdleTimeout());
                ds.setMaximumPoolSize(dbConfiguration.getHikariPoolSize());
                //ds.setMaximumPoolSize(2);
                ds.setMaxLifetime(dbConfiguration.getMaxLifetime());
                ds.setLeakDetectionThreshold(dbConfiguration.getLeakDetectionThreshold());
                ds.setPoolName(dbConfiguration.getPoolName());
                //this.datasourcesVal = ds;
            } catch (Exception ex) {
                spotlight.sendNotification(ex, "Critical: Failed to establish Database connection");
                log.error(ex.getMessage());
            } finally {
                if (!Objects.nonNull(ds) || ds.isClosed()) {
                    ds.close();
                    spotlight.sendNotification(new NullPointerException("Primary data source creation failed"),
                            "Critical: Failed to establish Database connection");
                }
            }
            return ds;
    
        }





    @Autowired
        @Qualifier("batchprimary")
        public DataSource datasourcesVal;
        
        @Autowired
        @Qualifier("batchprimary1")
        public DataSource datasourcesVal1;
    @Bean
        @JobScope
        public Step ExtractNLoadItemOnHand(TaskExecutor taskExecutor,@Value("#{jobParameters}") Map<String, JobParameter> jobParameters)
                throws InvalidKeyException, IllegalBlockSizeException,
                NoSuchPaddingException, BadPaddingException,
                NoSuchAlgorithmException, SQLException, ValidationException,
                CalpiDataException {
            int chunkSize = ApplicationConstants.CHUNK_SIZE;
            log.info(" Extract and Load ItemOnHand Job - Started....");
            countryCode = jobParameters.get("CountryCode").toString();
            //dataSourceTemp = datasourcesMap.get(countryCode);
            return stepBuilderFactory
                    .get("READ ITEM ONHAND STEP")
                    .listener(stepListners)
                    .<AccumOnhand, AccumOnhand> chunk(chunkSize)
                    .reader(extractItemOnHand(null))
                    .processor(new ItemProcessor<AccumOnhand, AccumOnhand>() {
                        @Override
                        public AccumOnhand process(AccumOnhand accumOnhand)
                                throws Exception {
                            return accumOnhand;
                        }
                    })
                    .writer(compositeItemWriter.compositeItemWriter(Arrays
                            .asList(loadItemOnHand()))).faultTolerant()
                    .retryLimit(ApplicationConstants.RETRY_SKIP_LIMIT)
                    .retry(Exception.class).skip(Exception.class)
                    .skipLimit(ApplicationConstants.RETRY_SKIP_LIMIT)
                    .taskExecutor(taskExecutor)
                    .throttleLimit(ApplicationConstants.THROTTLE_LIMIT).build();
        }
    
        @Bean
        @StepScope
        public JdbcBatchItemWriter<AccumOnhand> loadItemOnHand()
                throws InvalidKeyException, IllegalBlockSizeException,
                NoSuchPaddingException, BadPaddingException,
                NoSuchAlgorithmException, ValidationException, CalpiDataException {
            try {
                //country = jobParameters.get("CountryCode");
                JdbcBatchItemWriter<AccumOnhand> writer;
                //datasource = dataSourceConfig.hikariDataSource(country.toString());
                if(countryCode.equals("MX")) {
                     writer = stepDBItemWriter.writeDBData(
                            MessageFormat.format(SQLConstants.INS_ACCUM_ONHAND,
                                    countryCode), datasourcesVal);
                } else {
                     writer = stepDBItemWriter.writeDBData(
                                MessageFormat.format(SQLConstants.INS_ACCUM_ONHAND,
                                        countryCode), datasourcesVal1);
                }
        
                
                log.info("Item on hand writer created successfully");
                return writer;
            } catch (Exception ex) {
                log.error("Load loadItemOnHand Failed {} ", ex);
                throw new ValidationException("Load loadItemOnHand Failed", ex);
            }
        }

共有1个答案

蒋文光
2023-03-14

以下是根据我使用spring Batch的经验提出的建议。

  • 确保每个数据库都具有相同的写入性能
  • 是否尝试使用commitInterval()?
  • 尽可能使用JdbcItemWriter或任何基于JDBC的策略编写
  • 运行时检查cpu利用率。如果超过90%,则速度将降低
  • 如果捕获错误,尝试捕获将影响性能
 类似资料:
  • 我试图在Spring批处理中配置几个数据源。启动时,Spring批处理抛出以下异常: 批处理配置的代码段 不知道为什么我会看到这个异常,因为我看到了一些基于xml的Spring批处理配置,这些配置声明了多个数据源。我使用的是Spring批处理核心版本3.0.1.发行版和Spring Boot版本1.1.5.发行版。如有任何帮助,将不胜感激。

  • 我正在编写Spring批的Spring Boot应用程序,其中ItemReader从Oracle数据库读取数据并将数据写入postgres sql,但我得到了以下错误 我不想创建spring批处理元数据表,我的应用程序不需要监视作业,请就此向我提出建议。提前谢谢!!

  • 我需要访问两个数据源: Spring批处理存储库:在内存H2中 我的步骤需要访问。 我在那里看到了几个关于如何创建自定义

  • 我使用的是Spring Batch 2.1.8。释放我有一个文件,它由一些头信息和一些需要处理的记录组成。 我有一个使用面向块处理的步骤。该步骤包含ItemReader和ItemWriter的实现。ItemReader实现是线程安全的,而ItemWriter不是。 我想在处理(或写入)任何记录之前使用标题信息。在继续使用面向块的处理时,如何确保这一点? 建议的解决方案:一种解决方案可以是编写一个预

  • 我必须在表中的一些配置数据库中爬行。其中每个记录指定要从中读取的模式。因此,我们必须对表格进行投票,并适当地运行作业。 考虑使用Spring批处理(JdbcPagingItemReader)从所有配置的模式中读取数据。如果我必须配置它,如何使用Spring批处理? 我应该使用不同的读取器为每个数据库读取多个作业,还是有什么方法可以在运行时发送数据源以供Spring Batch读取数据? 如何为单个

  • 我有一个批处理任务,从SQLServer读取记录并写入MARIADB。尽管我在批处理过程中实现了分区的概念,但该过程非常缓慢 下面是源系统和目标系统的数据源配置。 以下是配置的步骤和分区步骤 用读者和作者更新帖子 有人能介绍如何使用Spring Batch提高读写性能吗?