我有一个批处理任务,从SQLServer读取记录并写入MARIADB。尽管我在批处理过程中实现了分区的概念,但该过程非常缓慢
下面是源系统和目标系统的数据源配置。
@Bean(name = "sourceSqlServerDataSource")
public DataSource mysqlDataSource() {
HikariDataSource hikariDataSource = new HikariDataSource();
hikariDataSource.setMaximumPoolSize(100);
hikariDataSource.setUsername(username);
hikariDataSource.setPassword(password);
hikariDataSource.setJdbcUrl(jdbcUrl);
hikariDataSource.setDriverClassName(driverClassName);
hikariDataSource.setPoolName("Source-SQL-Server");
return hikariDataSource;
}
@Bean(name = "targetMySqlDataSource")
@Primary
public DataSource mysqlDataSource() {
HikariDataSource hikariDataSource = new HikariDataSource();
hikariDataSource.setMaximumPoolSize(100);
hikariDataSource.setUsername(username);
hikariDataSource.setPassword(password);
hikariDataSource.setJdbcUrl(jdbcUrl);
hikariDataSource.setDriverClassName(driverClassName);
hikariDataSource.setPoolName("Target-Myql-Server");
return hikariDataSource;
}
@Bean(name = "myBatchJobsThreadPollTaskExecutor")
public ThreadPoolTaskExecutor initializeThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(100);
threadPoolTaskExecutor.setMaxPoolSize(200);
threadPoolTaskExecutor.setThreadNamePrefix("My-Batch-Jobs-TaskExecutor ");
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
threadPoolTaskExecutor.initialize();
log.info("Thread Pool Initialized with min {} and Max {} Pool Size",threadPoolTaskExecutor.getCorePoolSize(),threadPoolTaskExecutor.getMaxPoolSize() );
return threadPoolTaskExecutor;
}
以下是配置的步骤和分区步骤
@Bean(name = "myMainStep")
public Step myMainStep() throws Exception{
return stepBuilderFactory.get("myMainStep").chunk(500)
.reader(myJdbcReader(null,null))
.writer(myJpaWriter()).listener(chunkListener)
.build();
}
@Bean
public Step myPartitionStep() throws Exception {
return stepBuilderFactory.get("myPartitionStep").listener(myStepListener)
.partitioner(myMainStep()).partitioner("myPartition",myPartition)
.gridSize(50).taskExecutor(asyncTaskExecutor).build();
}
用读者和作者更新帖子
@Bean(name = "myJdbcReader")
@StepScope
public JdbcPagingItemReader myJdbcReader(@Value("#{stepExecutionContext[parameter1]}") Integer parameter1, @Value("#{stepExecutionContext[parameter2]}") Integer parameter2) throws Exception{
JdbcPagingItemReader jdbcPagingItemReader = new JdbcPagingItemReader();
jdbcPagingItemReader.setDataSource(myTargetDataSource);
jdbcPagingItemReader.setPageSize(500);
jdbcPagingItemReader.setRowMapper(myRowMapper());
Map<String,Object> paramaterMap=new HashMap<>();
paramaterMap.put("parameter1",parameter1);
paramaterMap.put("parameter2",parameter2);
jdbcPagingItemReader.setQueryProvider(myQueryProvider());
jdbcPagingItemReader.setParameterValues(paramaterMap);
return jdbcPagingItemReader;
}
@Bean(name = "myJpaWriter")
public ItemWriter myJpaWriter(){
JpaItemWriter<MyTargetTable> targetJpaWriter = new JpaItemWriter<>();
targetJpaWriter.setEntityManagerFactory(localContainerEntityManagerFactoryBean.getObject());
return targetJpaWriter;
}
有人能介绍如何使用Spring Batch提高读写性能吗?
提高这种应用程序的性能取决于多个参数(网格大小、块大小、页面大小、线程池大小、db连接池大小、db服务器和JVM之间的延迟等)。所以我不能给你一个准确的答案,但我会试着提供一些指导:
.taskexecutor(asyncTaskExecutor)
中的线程池任务执行器,而不是Simpleasynctaskexecutor
中不重用线程的线程池任务执行器。还有许多其他技巧,比如估计内存中的项大小,确保内存中的总块大小小于堆大小,以避免块中不必要的GC,为批处理应用程序选择正确的GC算法,等等,但这些都是先进的。上面列出的指导方针是一个很好的起点,海事组织。
希望这能有所帮助!
我有一个spring批处理应用程序,可以将5M条记录从一个文件加载到SQL Server数据库中。我有根据国家代码区分的不同数据源。当我使用带有@primary注释的单个数据源时,spring batch writer在5分钟内写入5M条记录。但是,当我使用@bean注释给出多个数据源并使用一个非主数据源将文件数据写入数据库时,perforamnce变得非常慢,对于同样的500万条记录,大约需要1
我们在单个节点上有1个喷口和1个螺栓。Spout从RabbitMQ读取数据,并将其发送到唯一一个将数据写入Cassandra的bolt。 我们的数据源每秒生成10000条消息,而storm处理这条消息大约需要10秒,这对我们来说太慢了。 我们尝试增加拓扑的并行度,但没有任何区别。 } 更新:有没有可能使用shuffle分组,相同的元组将被处理多次?使用的配置(喷口=4.螺栓=4),现在的问题是,随
针对Postgres数据库的某个索引SELECT查询所花费的时间非常可变--从50毫秒到多秒,有时甚至是几分钟,即使在最轻的负载下也是如此。 你能为26秒的差距提出一个解释吗? 关于并发的注意事项:即使只有一个请求也有很大的可变性:端到端50-300毫秒,但是当一个用户提交一批大约100个这样的查找时(可能有10-20个同时运行),很可能有几个查找需要5-10秒。然而C3P0的统计数据从来没有比:
我正在寻找测量Spring批处理读取、处理和写入操作的执行时间的最佳方法。在元数据中,有关于整个步骤的信息,而不是关于每个动作的信息。 谢谢你所有的回答!
你能告诉我如何正确地实施它吗?提前道谢。
我有一个批处理步骤 读取器和处理器流程如何工作?读取器是读取块并等待处理器处理它,还是一次读取所有块。