Python SDK介绍
安装
快速安装
$ sudo pip install pydatahub
源码安装
$ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git
$ cd aliyun-datahub-sdk-python
$ sudo python setup.py install
常见问题
1.如果安装过程中出现错误信息’Python.h: No such file or directory’,常用的操作系统安装方式如下:
$ sudo apt-getinstall python-dev# for python2.x installs
$ sudo apt-getinstall python3-dev# for python3.x installs
$ sudo yum install python-devel# for python2.x installs
$ sudo yum install python34-devel# for python3.4 installs
2.如果使用windows操作系统,根据提示信息可到 此处 下载安装对应版本的 Visual C++ SDK。
Windows 10 安装cprotobuf依赖时如果报类似如下错误,也表示需要安装Visual C++ 生成工具:
bulding'cprotobuf.internal'extention
error:[WinError2]Thesystem cannot find the file specified
推荐使用python3.6或以上,会明确提示所需版本及链接信息。
3.Windows 下如果安装依赖时报类似如下错误,是环境问题所致,请搜索相关错误,根据具体情况,拷贝所需文件,或是直接使用 developer command prompt 工具进行安装:
LINK:fatal error LNK1158:cannot run'rc.exe'
4.网络环境受限无法自动使用pip安装依赖时,可以从github中下载相关依赖,并按照readme进行安装。
安装验证
$ python-c"from datahub import DataHub"
如果上述命令执行成功,恭喜你安装Datahub Python版本SDK成功!
基本概念
准备工作
访问DataHub服务需要使用阿里云认证账号,需要提供阿里云accessId及accessKey。 同时需要提供访问的服务地址。
创建Project或使用SDK接口进行创建
初始化Datahub
importsys
importtraceback
fromdatahubimportDataHub
fromdatahub.exceptionsimportResourceExistException
fromdatahub.modelsimportFieldType,RecordSchema,TupleRecord,BlobRecord,CursorType,RecordType
access_id=***your access id***
access_key=***your access key***
endpoint=***your datahub server endpoint***
dh=DataHub(access_id,access_key,endpoint)
Project操作
创建示例
project_name='project'
comment='comment'
try:
dh.create_project(project_name,comment)
print("create project success!")
print("=======================================\n\n")
exceptResourceExistException:
print("project already exist!")
print("=======================================\n\n")
exceptExceptionase:
print(traceback.format_exc())
sys.exit(-1)
Topic操作
Tuple Topic
Tuple类型Topic写入的数据是有格式的,需要指定Record Schema,目前支持以下几种数据类型:
类型
含义
值域
Bigint
8字节有符号整型。请不要使用整型的最小值 (-9223372036854775808),这是系统保留值。
-9223372036854775807 ~ 9223372036854775807
String
字符串,只支持UTF-8编码。
单个String列最长允许1MB。
Boolean
布尔型。
可以表示为True/False,true/false, 0/1
Double
8字节双精度浮点数。
-1.0 10308 ~ 1.0 10308
TimeStamp
时间戳类型
表示到微秒的时间戳类型
创建示例
topic_name="tuple_topic"
shard_count=3
life_cycle=7
record_schema=RecordSchema.from_lists(
['bigint_field','string_field','double_field','bool_field','time_field'],
[FieldType.BIGINT,FieldType.STRING,FieldType.DOUBLE,FieldType.BOOLEAN,FieldType.TIMESTAMP])
try:
dh.create_tuple_topic(project_name,topic_name,shard_count,life_cycle,record_schema,comment)
print("create tuple topic success!")
print("=======================================\n\n")
exceptResourceExistException:
print("topic already exist!")
print("=======================================\n\n")
exceptExceptionase:
print(traceback.format_exc())
sys.exit(-1)
Blob Topic
Blob类型Topic支持写入一块二进制数据作为一个Record,数据将会以BASE64编码传输。
topic_name="blob_topic"
shard_count=3
life_cycle=7
try:
dh.create_blob_topic(project_name,topic_name,shard_count,life_cycle,comment)
print("create blob topic success!")
print("=======================================\n\n")
exceptResourceExistException:
print("topic already exist!")
print("=======================================\n\n")
exceptExceptionase:
print(traceback.format_exc())
sys.exit(-1)
数据发布/订阅
获取Shard列表
list_shards接口获取topic下的所有shard
shard_result=dh.list_shard(project_name,topic_name)
shards=shard_result.shards
print(len(shards))
返回结果是一个ListShardResult对象,包含一个Shard对象的list,list中的每个元素是一个shard,可以获取shard_id,state状态,begin_hash_key,end_hash_key等信息
发布数据
put_records接口向一个topic发布数据
put_result=dh.put_records(project_name,topic_name,records)
print(put_result.failed_record_count)
print(put_result.failed_records)
其中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型,返回结果为PutRecordsResult对象,包含failed_record_count和failed_records成员,failed_records是一个FailedRecord对象的list,FailedRecord对象包含成员index,error_code和error_message
写入Tuple类型Record示例
try:
# block等待所有shard状态ready
dh.wait_shards_ready(project_name,topic_name)
print("shards all ready!!!")
print("=======================================\n\n")
topic_result=dh.get_topic(project_name,topic_name)
print(topic_result)
iftopic_result.record_type!=RecordType.TUPLE:
print("topic type illegal!")
sys.exit(-1)
print("=======================================\n\n")
record_schema=topic_result.record_schema
records0=[]
record0=TupleRecord(schema=record_schema,values=[1,'yc1',10.01,True,1455869335000000])
record0.shard_id='0'
record0.put_attribute('AK','47')
records0.append(record0)
record1=TupleRecord(schema=record_schema)
record1.set_value('bigint_field',2)
record1.set_value('string_field','yc2')
record1.set_value('double_field',None)
record1.set_value('bool_field',False)
record1.set_value('time_field',1455869335000011)
record1.hash_key='4FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD'
records0.append(record1)
record2=TupleRecord(schema=record_schema)
record2.set_value(0,3)
record2.set_value(1,'yc3')
record2.set_value(2,1.1)
record2.set_value(3,False)
record2.set_value(4,1455869335000011)
record2.attributes={'key':'value'}
record2.partition_key='TestPartitionKey'
records0.append(record2)
put_result=dh.put_records(project_name,topic_name,records0)
print(put_result)
print("put tuple %d records, failed count: %d"%(len(records0),put_result.failed_record_count))
# failed_record_count如果大于0最好对failed record再进行重试
print("=======================================\n\n")
exceptDatahubExceptionase:
print(e)
sys.exit(-1)
写入BLOB类型Record示例
try:
records1=[]
record3=BlobRecord(blob_data='data')
record3.shard_id='0'
record3.put_attribute('a','b')
records1.append(record3)
put_result=dh.put_records(project_name,topic_name,records1)
print(put_result)
exceptDatahubExceptionase:
print(e)
sys.exit(-1)
获取cursor
获取Cursor,可以通过三种方式获取:OLDEST, LATEST, SYSTEM_TIME
OLDEST: 表示获取的cursor指向当前有效数据中时间最久远的record
LATEST: 表示获取的cursor指向当前最新的record
SYSTEM_TIME: 表示获取的cursor指向大于等于该时间(单位毫秒)的第一条record
shard_id='0'
time_stamp=0
cursor_result0=dh.get_cursor(project_name,topic_name,shard_id,CursorType.OLDEST)
cursor_result1=dh.get_cursor(project_name,topic_name,shard_id,CursorType.LATEST)
cursor_result2=dh.get_cursor(project_name,topic_name,shard_id,CursorType.SYSTEM_TIME,time_stamp)
cursor=cursor_result0.cursor
通过get_cursor接口获取用于读取指定位置之后数据的cursor
订阅数据
从指定shard读取数据,需要指定从哪个Cursor开始读,并指定读取的上限数据条数,如果从Cursor到shard结尾少于Limit条数的数据,则返回实际的条数的数据。
project_name='project'
shard_id="0"
limit=10
# 读取blob topic的record
topic_name='blob_topic'
get_result=dh.get_blob_records(project_name,topic_name,shard_id,cursor,limit)
# 读取tuple topic的record
topic_name='tuple_topic'
get_result=dh.get_tuple_records(project_name,topic_name,shard_id,record_schema,cursor,limit)
消费Tuple类型Record示例
try:
# block等待所有shard状态ready
dh.wait_shards_ready(project_name,topic_name)
print("shards all ready!!!")
print("=======================================\n\n")
topic_result=dh.get_topic(project_name,topic_name)
print(topic_result)
iftopic_result.record_type!=RecordType.TUPLE:
print("topic type illegal!")
sys.exit(-1)
print("=======================================\n\n")
shard_id='0'
limit=10
cursor_result=dh.get_cursor(project_name,topic_name,shard_id,CursorType.OLDEST)
cursor=cursor_result.cursor
whileTrue:
get_result=dh.get_tuple_records(project_name,topic_name,shard_id,record_schema,cursor,limit)
forrecordinget_result.records:
print(record)
if0==get_result.record_count:
time.sleep(1)
cursor=get_result.next_cursor
exceptDatahubExceptionase:
print(e)
sys.exit(-1)
结尾