从商业数据库的使用转移到开源数据库是目前的潮流,所以我也不能免俗,在工作之中,抽出一点时间研究了一下,从oracle到pg到步骤。
从oracle 到 pg 要解决一系列的问题,如:
除了以上的问题,相信还有好多不同的问题,但是本记录中,我会更聚焦于“如何把oracle的数据迁移到pg上?”
抽取oracle的表结构
pg上重构结构
抽取oracle的数据
转换到pg数据类型格式
插入到pg
在后面到实验中一般的数据读取出来后,基本上是不需要转换,直接就可以insert、copy回去了。
为了加深理解,我觉的自己用python写一个会好一点,所以我选择第三项。
使用python编写一个数据迁移的程序,需要用到以下到包
psycopg2提够很多高效多api给我们完成数据插入到pg到工作,举例如下:
execute_values(cursor,sql,values)
cursor:顾名思义就是pg connect创建的游标。
sql:顾名思义就是要执行的sql,但这里有个特色,一个占位符,就可以代表所有参数变量,超级好用,举例:insert into table1 values %s
values:可以一维数组插入一条记录,也可以是二维数组把多条记录批量插入。
代码示例:
sql='insert into '+pgTable+' values %s'
values=[[1,'a','b'],[2,'c','d']]
psycopg2.extras.execute_values(pgconn,sql,values)
pgconn.commit()
copy_from(file, table, sep='\t', null='\\N', size=8192, columns=None)
Read data from the file-like object file appending them to the table named table.
Parameters:
file – file-like object to read data from. It must have both read() and readline() methods.
table – name of the table to copy data into.
sep – columns separator expected in the file. Defaults to a tab.
null – textual representation of NULL in the file. The default is the two characters string \N.
size – size of the buffer used to read from the file.
columns – iterable with name of the columns to import. The length and types should match the content of the file to read. If not specified, it is assumed that the entire table matches the file structure.
示例:
def copyDataFrom(self,tabname,filepath):
try:
file=open(filepath,'r')
print 'Start to COPY....'
self.pgCur.copy_from(file,tabname,',',null='')
self.pgConn.commit()
print 'copy successful!'
except Exception as e:
print 'copy failed, cause by %s %s'%('\n',e)
特别提醒两个参数:sep分隔符,默认是tab,如果是逗号,就要改成sep=','
另一个是null你的文件中用什么符合代表null,默认是两个空格,我的是没有空格,对应就是null=''
非常方便用于生成csv格式,这个是用来把oracle的数据保存到csv中,然后使用copy命令导入到pg,以下是示例代码
file=open('/work/data/tabtest.csv','w')
csvWriter=csv.writer(file,dialect='excel')
csvWriter.writerows(rows)
注意,writerows是把二维数组插入到csv中
writerow是把一纬数组插入到csv中
number -> numeric;
VARCHAR2,NVARCHAR2,NVARCHAR-> varchar;
date ,timestamp--> TIMESTAMP WITHOUT time zone ;
blob–>bytea
cblob–>text
具体数据大家压测一下就知道了
因为只是个人研究和练手用到,代码质量请各位大神忽略。
# -*-coding=utf-8 -*-
import psycopg2 as pg2
import cx_Oracle as oradb
import psycopg2.extras as pg2extra
# 解决读取数据库显示不了中文的问题
import os
import datetime
import csv
# 显示中文
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
class pg(object) :
def __init__(self,pghost,pgport,pgdatabase,pguser,pgpassword):
try:
self.pgConn=pg2.connect(host=pghost,port=pgport,database=pgdatabase,user=pguser,password=pgpassword)
print 'connect %s and %s successful!' %(pghost,pgdatabase)
self.pgCur=self.pgConn.cursor()
except Exception as e:
print 'connect failed, cause by: %s %s' %('\n',e)
def readAll(self,pgTable):
sql='select * from '+pgTable+ ' order by 1 '
#print sql
self.pgCur.execute(sql)
pgSet=self.pgCur.fetchall()
#print pgSet
self.output(pgSet)
#普通方法逐条插入,最后提交事务,1万条记录约78秒
def fullInsert(self,pgTable,values):
self.pgCur.execute('select count(*) from information_schema.columns where table_name=%s',(pgTable,) )
cols=int(self.pgCur.fetchall()[0][0])
parameters=''
for i in range(cols):
parameters=parameters+'%s,'
parameters=parameters[:-1]
#print parameters
sql='insert into '+pgTable+' values ('+parameters+')'
#print sql
self.pgCur.execute(sql,values)
#self.pgConn.commit()
#pyconpg2.extras.execute_values方法批量插入,1万条记录约4秒
def fullInsert2(self,pgTable,values):
sql='insert into '+pgTable+' values %s'
try:
pg2extra.execute_values(self.pgCur,sql,values)
except Exception as e:
print e
def copyDataFrom(self,tabname,filepath):
try:
file=open(filepath,'r')
print 'Start to COPY....'
self.pgCur.copy_from(file,tabname,',',null='')
self.pgConn.commit()
print 'copy successful!'
except Exception as e:
print 'copy failed, cause by %s %s'%('\n',e)
def execDDL(self,sql):
self.pgCur.execute(sql)
self.pgConn.commit()
def output(self,pgset):
for rows in pgset :
for field in rows :
print field,
print
def commit(self):
self.pgConn.commit()
class oracle(object):
def __init__(self,orahost,oraport,oradatabase,orauser,orapassword):
try:
connectString=orauser+'/'+orapassword+'@'+orahost+':'+str(oraport)+'/'+oradatabase
print connectString
self.oraConn=oradb.connect(connectString,threaded=True)
print 'connect %s and %s successful!' %(orahost,oradatabase)
self.oraCur=self.oraConn.cursor()
print 'cursor open!'
except Exception as e:
print 'connect failed, cause by: %s %s' %('\n',e)
def readAll(self,oraTable,rownum):
sql=''
if rownum=='ALL':
sql='select * from '+oraTable
else:
sql='select * from '+oraTable+ ' where rownum<='+str(rownum)+' order by 1 '
#print sql
print 'start read...'
self.oraCur.execute(sql)
print 'start fetch'
oraSet=self.oraCur.fetchall()
return oraSet
#self.output(oraSet)
#未完成
def exportCsv(self,oraTable,rownum):
sql=''
if rownum=='ALL':
sql='select * from '+oraTable
else:
sql='select * from '+oraTable+ ' where rownum<='+str(rownum)+' order by 1 '
#print sql
print 'start read...'
self.oraCur.execute(sql)
print 'start fetch'
oraSet=self.oraCur.fetchall()
return oraSet
#生成oracle的表结构
def genTable(self,owner,tablename):
sql='''
SELECT COLUMN_id ,column_name,data_type,data_length,data_precision,data_scale
from dba_tab_columns
where owner=:1 and table_name=:2
order BY COLUMN_ID'''
self.oraCur.execute(sql,(owner,tablename))
rows=self.oraCur.fetchall()
return rows
class oracle2pg(object):
def __init__(self):
pass
#把oracle 的表结构 转化到pg表结构格式
def migrateSturct(self,tabstruct,targetDB,targetTable):
pgstruct=[]
pgnewstru=[]
createTable='create table '+targetTable+'('
for i in tabstruct:
i=list(i)
if i[2]=='NUMBER':
i[2]='NUMERIC'
elif i[2] in ('VARCHAR2','NVARCHAR2','CHAR2'):
i[2]='VARCHAR'
elif i[2] in ('DATE','TIMESTAMP(6)'):
i[2]='TIMESTAMP WITHOUT TIME ZONE'
pgstruct.append(i)
for i in pgstruct:
if i[2]=='NUMERIC':
row=i[1]+' '+i[2]+'('+str(i[4])+','+str(i[5])+'),'
elif i[2]=='VARCHAR':
row=i[1]+' '+i[2]+'('+str(i[3])+')'+','
elif i[2]=='TIMESTAMP WITHOUT TIME ZONE':
row=i[1]+' '+i[2]+','
pgnewstru.append(row)
for i in pgnewstru:
createTable=createTable+i
createTable=createTable[:-1]+')'
print createTable
try:
targetDB.execDDL(createTable)
print 'create table successful!'
except Exception as e:
print 'Failed as %s %s'%('\n',e)
#print createTable
'''
逐条插入
'''
def migraterows(self,srows,targetDB,targetTable):
for row in srows:
targetDB.fullInsert(targetTable,row)
targetDB.commit()
'''
批量插入
'''
def migraterows2(self,srows,targetDB,targetTable):
targetDB.fullInsert2(targetTable,srows)
targetDB.commit()
if __name__=='__main__':
oraowner='TEST'
tabname='ORDER'
pg1=pg(pghost='192.168.0.1',pgport=5432,pgdatabase='test',pguser='pguser',pgpassword='password')
ora1=oracle(orahost='192.168.0.2',oraport=1521,oradatabase='test',orauser='orauser',orapassword='password')
#ora2pg1=oracle2pg()
Start_time=datetime.datetime.now()
#rows=ora1.readAll(oraowner+'.'+tabname,'5000000')
#print 'read completed ,begin write to csv'
#file=open('/work/data/so_master_new.csv','w')
#csvWriter=csv.writer(file,dialect='excel')
#csvWriter.writerows(rows)
#file.close()
#print rows
#orastru=ora1.genTable(oraowner,tabname)
#ora2pg1.migrateSturct(orastru,pg1,tabname)
pg1.copyDataFrom(tabname,'/work/data/tabtest.csv')
End_time=datetime.datetime.now()
during_time=End_time-Start_time
print during_time