最近由于业务上的需求,一张旧表结构中的数据,需要从oracle中提取出来,写入到MySQL表中,由于数据量过大,做了以下优化。
1.在oracle数据库取数的时候时候给jdbc设置FetchSize参数,分批次从表中取数,然后将数据封装到list中。
2.使用设置FetchSize参数的方式解决了读取数据时数据量过大的问题,但是分批次读取的数据存放在list中,在写入到mysql中时,同样面临这数据量过大的问题
3.为了解决上面的问题,解决方案是:开启一个线程,监控list中数据变化,一旦list中有值了,就写入到MySQL中,也就是说主线程负责从oracle中取数存到list中,开启的另一个线程监控list中的值,写入到MySQL中,类似生产者和消费者的关系。
话不多说,直接上代码
jdbc工具类:
/**
* 分批抓取查询
* 封装结果集为:List<HashMap<String,Object>>
*
* @param list 封装结果集为:List<HashMap<String,Object>>
* @param sql
* @param params List<Object>
* @param config 数据库jdbc信息
* @param fetchSize 游标读取条数
* @return list<HashMap < String, Object>>
* @throws SQLException
*/
public class JDBCUtil {
private static Logger logger = Logger.getLogger(JDBCUtil.class);
private Connection conn = null;
private PreparedStatement pstmt = null;
private ResultSet rs = null;
private String driverClassName = "";
private String url = "";
private String userName = "";
private String password = "";
static {
try {
Class.forName("com.mysql.cj.jdbc.Driver");//mysql驱动
Class.forName("oracle.jdbc.driver.OracleDriver");//oracle驱动
} catch (ClassNotFoundException e) {
logger.error("数据库驱动类加载异常:" + e.getMessage(), e);
}
}
private void loadConfig(DBConfig config) {
driverClassName = config.getDriverClassName();
url = config.getUrl();
userName = config.getUserName();
password = config.getPassword();
}
public Connection getConn(DBConfig config) throws SQLException {
try {
loadConfig(config);
conn = DriverManager.getConnection(url, userName, password);
} catch (RuntimeException re) {
logger.error("获取数据库连接异常:" + re.getMessage(), re);
throw re;
}
return conn;
}
/**
* 分批抓取查询
* 封装结果集为:List<HashMap<String,Object>>
*
* @param list 封装结果集为:List<HashMap<String,Object>>
* @param sql
* @param params List<Object>
* @param config 数据库jdbc信息
* @param fetchSize 游标读取条数
* @return list<HashMap < String, Object>>
* @throws SQLException
*/
public List<HashMap<String, String>> getListHashMap(String sql, List<HashMap<String, String>> list, List<Object> params, DBConfig config, int fetchSize) throws SQLException {
//List<HashMap<String,String>>list=new ArrayList<HashMap<String,String>>();
try {
HashMap<String, String> hashMap = null;
conn = getConn(config);
conn.setAutoCommit(false); //为了设置fetchSize,必须设置为false
pstmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
if (params != null && params.size() > 0) {
for (int i = 0; i < params.size(); i++) {
pstmt.setObject(i + 1, params.get(i));
}
}
pstmt.setFetchSize(fetchSize);
rs = pstmt.executeQuery();
ResultSetMetaData rsmd = rs.getMetaData();
int columnCount = rsmd.getColumnCount();
while (rs.next()) {
hashMap = new HashMap<String, String>();
try {
//if(list.size()==fetchSize)
while (list.size() > fetchSize) {
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < columnCount; i++) {
hashMap.put((rsmd.getColumnLabel(i + 1)).toLowerCase(), String.valueOf(rs.getObject(i + 1)));
}
list.add(hashMap);
}
} catch (RuntimeException re) {
logger.error("获取查询结果集异常:" + re.getMessage(), re);
throw re;
} finally {
closeAll();
}
return list;
}
public void closeAll() throws SQLException {
try {
if (rs != null) rs.close();
if (pstmt != null) pstmt.close();
if (conn != null) conn.close();
} catch (RuntimeException re) {
logger.error("数据库关闭项异常:" + re.getMessage(), re);
throw re;
}
}
}
DBConfig类
public class DBConfig {
private static Logger logger=Logger.getLogger(DBConfig.class);
private String driverClassName;
private String userName;
private String password;
private String url;
public DBConfig() {}
public DBConfig(String driverClassName, String userName, String password,
String url) {
super();
this.driverClassName = driverClassName;
this.userName = userName;
this.password = password;
this.url = url;
}
public String getDriverClassName() {
return driverClassName;
}
public void setDriverClassName(String driverClassName) {
this.driverClassName = driverClassName;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
}
测试类:
public class test {
public static void main(String[] args) throws SQLException {
JDBCUtil jdbc = new JDBCUtil();
DBConfig config = new DBConfig();
String url="jdbc:oracle:thin:@<host>:1521:<sid>";
String username="username";
String password="password";
config.setUrl(url);
config.setUserName(username);
config.setPassword(password);
String sql = "select * from table_name";
//这儿的list集合直接使用new ArrayList() 会有线程安全问题,建议使用Vector集合 或是使用 Collections.synchronizedList(new ArrayList())的方式创建线程安全的集合
List<HashMap<String, String>> list = Collections.synchronizedList(new ArrayList());
//Vector<HashMap<String, String>> list = new Vector<>();
//为了判断线程中的退出while循环标识,如果isExitLoopList长度为0,结束线程
List<Boolean> isExitLoopList = new ArrayList<>(1);
isExitLoopList.add(true);
//开启一个线程
new Thread(new Runnable() {
@Override
public void run() {
while(true){
System.out.println("list的大小==="+list.size());
try {
if(list.size()==0){
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
if(list.size()>0){
//从list中取出指定索引的数据,并移除
HashMap<String, String> item = list.remove(0);
//todo:这儿是写入数据到MySQL库的代码逻辑,此处省去
}
if(isExitLoopList.size()==0) break;
}
}
}).start();
//走jdbc工具类,取数,并存放在list中
//jdbc游标读取条数
int fetchSize = 1000;
jdbc.getListHashMap(sql,list, null, config, 1000);
while (true){
if(list.size()==0){
isExitLoopList.remove(0);
break;
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}