当前位置: 首页 > 工具软件 > boto3 > 使用案例 >

python使用boto3 AWS S3接口,上传、下载文件

贲宜春
2023-12-01
import boto3
from botocore.exceptions import ClientError
import yaml


class My_S3():

    def __init__(self, s3_endpoint, s3_access_key, s3_secret_key):
        self.s3_endpoint = s3_endpoint
        self.s3_access_key = s3_access_key
        self.s3_secret_key = s3_secret_key

        try:
            self.s3_client = boto3.client(
                's3',
                aws_access_key_id=self.s3_access_key,
                aws_secret_access_key=self.s3_secret_key,
                endpoint_url=self.s3_endpoint,
                verify=False
            )
        except ClientError as err:
            raise err

    def get_file(self, bucket_name, filename):
        return self.s3_client.get_object(
            Bucket=bucket_name,
            Key=filename,
        )

    def list_objects(self, bucket_name, prefix):
        return self.s3_client.list_objects(
            Bucket=bucket_name,
            Prefix=prefix,
        )

    def put_file(self, bucket_name, filename, upfile):
        return self.s3_client.put_object(
            Bucket=bucket_name,
            Body=open(upfile, 'rb'),
            Key=filename,
        )

    def have_bucket(self, bucket_name):
        buckets = self.s3_client.list_buckets()['Buckets']
        for bucket in buckets:
            if bucket_name == bucket['Name']:
                return True
        return False
     
    def create_bucket(self, bucket):
        return self.s3_client.create_bucket(Bucket=bucket)

    def download_file(self, bucket, key, file_name):
        return self.s3_client.download_file(
            Bucket=bucket,
            Key=key,
            Filename=file_name)
            
    def delete_obj(self, bucket, object_name):
        return self.s3_client.delete_object(Bucket=bucket, Key=object_name)



if __name__ == "__main__":
    s3_endpoint = "http://cxxxxxxxxxxxxxx"
    s3_access_key = "xxxxxxxx="
    s3_secret_key = "xxxxxxxxxx"
    my_s3 = My_S3(s3_endpoint, s3_access_key, s3_secret_key)
    file_bucket_name = 'xxxx'
    if not my_s3.have_bucket(file_bucket_name):
        raise Exception('Error')
    file_list = my_s3.list_objects(file_bucket_name, '桶里的文件夹名称')
    # print(file_list)
    for obj in file_list['Contents']:
        version_file_name = obj['Key']
        if version_file_name.endswith(".py"):
            print(version_file_name)
            # 读取S3上的文件
            src_file = my_s3.get_file(file_bucket_name, version_file_name)['Body']
            version_data = src_file.read().decode('utf-8')
            # print(version_data)
            res = yaml.load(stream=version_data, Loader=yaml.FullLoader)
            print(res)




    # # 上传本地文件到S3
    # my_s3.put_file(file_bucket_name, 'test2.txt', 'D:\\test2.txt')

    # 下载文件到本地
	file_list = my_s3.list_objects(file_bucket_name, '')['Contents']
    for key in file_list:
        my_s3.download_file(file_bucket_name, key['Key'], key['Key'])


 类似资料: