表格存储(Tablestore)是阿里云自研的多模型结构化数据存储,提供海量结构化数据存储以及快速的查询和分析服务。
异构数据源的迁移在此介绍3种方法
1,dataworks开发组件
2,datax开源工具
3,使用脚本自行处理
此次记录介绍方法3,方法1和2后续也会介绍。
import tablestore
from tablestore.metadata import *
class otsclient():
def __init__(self,end_point=None,access_key_id=None,access_key_secret=None,instance_name=None):
'''
连接写进初始化方法
'''
self.conn_ost = tablestore.OTSClient(
end_point=end_point
,access_key_id=access_key_id
,access_key_secret=access_key_secret
,instance_name=instance_name
)
# 目标 ots
self.conn_ost_prod = tablestore.OTSClient(
end_point="https://battle-prod.cn-hangzhou.ots.aliyuncs.com"
, access_key_id="********************"
, access_key_secret="*****************************"
# 实例名称
, instance_name="battle-prod"
)
def ots_getrow(self,table_name=None,primary_key=None,columns_to_get=None):
'''
单行取
:return: primary_key attribute_columns
'''
try:
consumed, return_row, next_token = self.conn_ost.get_row(
table_name=table_name
,primary_key=primary_key
,columns_to_get=columns_to_get
)
print(return_row.attribute_columns)
return return_row
except Exception as e:
print(e)
def ots_getrange(self,table_name=None,inclusive_start_primary_key=None,columns_to_get=None,
exclusive_end_primary_key=None,column_filter=None):
'''
范围取
:return:
'''
consumed,next_start_primary_key, row_list, next_token = self.conn_ost.get_range(
table_name=table_name
,direction="FORWARD"
,columns_to_get=columns_to_get
, inclusive_start_primary_key=inclusive_start_primary_key
, exclusive_end_primary_key=exclusive_end_primary_key
, max_version = 1
, column_filter =column_filter
)
all_rows = []
all_rows.extend(row_list)
# DML
for row in all_rows:
print(row.primary_key, row.attribute_columns)#查看打印主键和属性列
# row = Row(row.primary_key,row.attribute_columns)
# self.conn_ost.delete_row("hold_on_trail_record_test", row, None) #删除操作
'''
写入操作ots提供了put_row的方法,此方法需注意
1,在迁移同步时需要先使用ots提供的查询方法,得到的数据除了主键列,属性列数据结构是包含了当前值的version信息的,
而put_row函数在写入数据的时候,并不包含version信息,它是ots自动生成的,因此在查询过后再写入是需要做特殊处理。
2,在主键列中如果包含自增列,测试发现直接查询出来的自增列值是不行的。
自增列的值应该设置为 PK_AUTO_INCR,因此也需要注意下。
本次测试,同步过后的数据,除了自增列和version 外,其他业务数据都是正常。
'''
primary = row.primary_key
primary[1] = ('sequenceId',tablestore.metadata.PK_AUTO_INCR)
column = row.attribute_columns
column_list = []
for i in column:
column_list.append(i[0:2])
row = Row(primary,column_list)
self.conn_ost_prod.put_row('hold_on_trail_record', row, None) #插入操作
print((len(row_list))) #打印满足条件检索到的条数
return row_list
def main():
'''实例化对象'''
connect = otsclient(
#实例地址,公网私网都可
end_point="https://mini-prod.cn-hangzhou.ots.aliyuncs.com"
, access_key_id="*******************"
, access_key_secret="********************"
#实例名称
, instance_name="mini-prod"
)
connect = otsclient(
#实例地址,公网私网都可
end_point="https://battle-qa.cn-hangzhou.ots.aliyuncs.com"
, access_key_id="********************"
, access_key_secret="*********************"
#实例名称
, instance_name="battle-dev"
)
'''
单行取
'''
# primary_key = [
# ('pointCount', '1')
# , ('sequenceId', 1637649013325000)
# , ('actionTime', 6)
# , ('waitNextDropPointTime', 7)
# ]
# # 要获取的字段,不写为所有字段值
# columns_to_get = []
# connect.ots_getrow(
# table_name="hold_on_trail_record"
# , primary_key=primary_key
# , columns_to_get=columns_to_get
# )
'''
范围取
'''
# 要获取的字段,不写为所有字段值
columns_to_get = []
# 主键范围 start end 相当于大于小于 是开不是闭
inclusive_start_primary_key = [('pointCount', '0')
, ('sequenceId', 1138952797876000)
, ('actionTime', 0)
, ('waitNextDropPointTime', 0)]
exclusive_end_primary_key = [('pointCount', '10')
, ('sequenceId', 1638952797876000)
, ('actionTime', 10)
, ('waitNextDropPointTime', 10)]
# 多列条件 设置条件为(checkStatus == 2)AND(modifiedUserId == 7816)。
# column_filter = CompositeColumnCondition(LogicalOperator.AND)
# column_filter.add_sub_condition(SingleColumnCondition("checkStatus", 2, ComparatorType.EQUAL))
# column_filter.add_sub_condition(SingleColumnCondition("modifiedUserId", 33994751, ComparatorType.EQUAL))
#调用方法
connect.ots_getrange(
#表名
table_name="hold_on_trail_record"
,inclusive_start_primary_key=inclusive_start_primary_key
,exclusive_end_primary_key=exclusive_end_primary_key
,columns_to_get=columns_to_get
# 条件单列 当checkStatus的值为'2'时,返回该行。
# ,column_filter = SingleColumnCondition("correctPositionStartIndex", 0, ComparatorType.EQUAL, pass_if_missing = True)
# 条件多列
# ,column_filter = column_filter
)
if __name__ == '__main__':
main()
经测试发现阿里开源工具datax,和dataworks在使用中发现,其在处理OTS表时,处理不了主键列带自增属性的情况。具体原因,还在等阿里回复。