七牛---Python_SDK_Demos

洪光霁
2023-12-01

七牛云存储_Python_SDK_Demos

参考七牛云存储官方Python SDK 源码及使用指南:
https://github.com/qiniu/python-sdk
http://developer.qiniu.com/docs/v6/sdk/python-sdk.html

资源操作:

批量操作

# -*- coding: utf-8 -*-
# flake8: noqa

#批量操作
from qiniu import Auth
from qiniu import BucketManager

access_key = '...'
secret_key = '...'

q = Auth(access_key, secret_key)
bucket = BucketManager(q)


########## 批量获取文件信息 ##########

from qiniu import build_batch_stat

bucket_name = 'demo'
key_list = [ 'demo1.jpg','demo2.jpg' ]

ops = build_batch_stat(bucket_name, key_list)
ret, info = bucket.batch(ops)
print(info)
assert ret[0]['code'] == 200

######################################



############ 批量复制文件 ############

from qiniu import build_batch_copy

bucket_name_from = 'demo'#源bucket
bucket_name_to = 'test' #目标bucket
key_from = 'demo1.jpg' #源key
key_to = 'copy_test.jpg' #目标key
key_dic = {key_from: key_to} #需要复制的key对

ops = build_batch_copy(bucket_name_from, key_dic, bucket_name_to)
ret, info = bucket.batch(ops)
print(info)
assert ret[0]['code'] == 200

######################################



############ 批量移动文件 ############

from qiniu import build_batch_move

bucket_name_from = 'demo'#源bucket
bucket_name_to = 'test' #目标bucket
key_from = 'demo1.jpg' #源key
key_to = 'move_test.jpg' #目标key
key_dic = {key_from : key_to } #需要移动的key对

ops = build_batch_move(bucket_name_from, key_dic, bucket_name_to)
ret, info = bucket.batch(ops)
print(info)
assert ret[0]['code'] == 200

######################################


############ 批量删除文件 ############

from qiniu import build_batch_delete

bucket_name = 'demo'
key_list = [ 'demo1.jpg','demo2.jpg' ]

ops = build_batch_delete(bucket_name, key_list)
ret, info = bucket.batch(ops)
print(info)
assert ret[0]['code'] == 612

######################################

高级管理操作:

列出文件_遍历bucket

# -*- coding: utf-8 -*-
# flake8: noqa

#对整个bucket遍历的操作

from qiniu import Auth
from qiniu import BucketManager

access_key = '...'
secret_key = '...'
bucket_name = 'demo'

q = Auth(access_key, secret_key)

def list_all(bucket_name, bucket=None, prefix=None, limit=None):
    if bucket is None:
        bucket = BucketManager(q)
    marker = None
    eof = False
    while eof is False:
        ret, eof, info = bucket.list(bucket_name, prefix=prefix, marker=marker, limit=limit)
        marker = ret.get('marker', None)
        for item in ret['items']:
            print(item['key'])
            pass
    if eof is not True:
        # 错误处理
        pass

list_all(bucket_name)

云处理:

持久化处理_bucket内的已有文件

# -*- coding: utf-8 -*-
# flake8: noqa

from qiniu import Auth, PersistentFop, build_op, op_save

access_key = '...'
secret_key = '...'
bucket_src = 'demo'
key_src = 'video_demo.flv'
saved_bucket = 'test'
saved_key = 'video.avi'
pipeline = 'my_pipeline'

q = Auth(access_key, secret_key)

pfop = PersistentFop(q, bucket_src, pipeline)

#要进行的转码操作
fops = 'avthumb/avi' 

op = op_save(fops, saved_bucket, saved_key)
ops = []
ops.append(op)
ret, info = pfop.execute(key_src, ops, 1)
print(info)
assert ret['persistentId'] is not None

上传&触发持久化处理

# -*- coding: utf-8 -*-
# flake8: noqa

from qiniu import Auth, put_file, etag, urlsafe_base64_encode
import qiniu.config

access_key = '...'
secret_key = '...'

q = Auth(access_key, secret_key)

#目标buckct
bucket_name = 'demo'
#目标key
key = 'demo_save.jpg'

#指定转码使用的队列名称
pipeline = 'my_pipeline'

#设置转码参数1|这里为图片水印操作
fops1 = 'watermark/3/image/aHR0cDovL2RldmVsb3Blci5xaW5pdS5jb20vcmVzb3VyY2UvbG9nby0yLmpwZw==/text/5LiD54mb5LqR5a2Y5YKo'
#设置转码参数2|这里为图片裁剪操作
fops2 = 'imageView2/1/w/200/h/200'

#通过添加'|saveas'参数,指定处理后的文件保存的bucket和key,不指定默认保存在当前空间
saveas_key1 = urlsafe_base64_encode('bucket_saved:key_saved')#bucket_saved为目标bucket,key_saved为目标key
saveas_key2 = urlsafe_base64_encode('bucket_saved:key_saved')#bucket_saved为目标bucket,key_saved为目标key

#这里分别对需要上传的文件进行水印和裁剪操作并保存在指定的两个不同的bucket中。多个数据处理命令,用“;”分隔
fops = fops1+'|saveas/'+saveas_key1+';'+fops2+'|saveas/'+saveas_key2

#在上传策略中指定fobs和pipeline
policy={
  'persistentOps':fops,
  'persistentPipeline':pipeline
 }

token = q.upload_token(bucket_name, key, 3600, policy)

localfile = '/root/cqk/demo2.jpg'

ret, info = put_file(token, key, localfile)
print(info)
assert ret['key'] == key
assert ret['hash'] == etag(localfile)
 类似资料: