oss2.utils 源代码

优质
小牛编辑
128浏览
2023-12-01
# -*- coding: utf-8 -*-

"""
oss2.utils
----------

Utils module
"""

from email.utils import formatdate

import os.path
import mimetypes
import socket
import hashlib
import base64
import threading
import calendar
import datetime
import time
import errno

import binascii
import crcmod
import re
import random

from Crypto.Cipher import AES
from Crypto import Random
from Crypto.Util import Counter

from .compat import to_string, to_bytes
from .exceptions import ClientError, InconsistentError, RequestError, OpenApiFormatError


_EXTRA_TYPES_MAP = {
    ".js": "application/javascript",
    ".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
    ".xltx": "application/vnd.openxmlformats-officedocument.spreadsheetml.template",
    ".potx": "application/vnd.openxmlformats-officedocument.presentationml.template",
    ".ppsx": "application/vnd.openxmlformats-officedocument.presentationml.slideshow",
    ".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
    ".sldx": "application/vnd.openxmlformats-officedocument.presentationml.slide",
    ".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
    ".dotx": "application/vnd.openxmlformats-officedocument.wordprocessingml.template",
    ".xlam": "application/vnd.ms-excel.addin.macroEnabled.12",
    ".xlsb": "application/vnd.ms-excel.sheet.binary.macroEnabled.12",
    ".apk": "application/vnd.android.package-archive"
}


[文档]def b64encode_as_string(data):
    return to_string(base64.b64encode(to_bytes(data)))


[文档]def b64decode_from_string(data):
    try:
        return base64.b64decode(to_string(data))
    except (TypeError, binascii.Error) as e:
        raise OpenApiFormatError('Base64 Error: ' + to_string(data))


[文档]def content_md5(data):
    """Calculate the MD5 of the data. The return value is base64 encoded str.

    The return value could be value of of HTTP Content-MD5 header.
    """
    m = hashlib.md5(to_bytes(data))
    return b64encode_as_string(m.digest())


[文档]def md5_string(data):
    """Returns MD5 value of `data` in hex string (hexdigest())."""
    return hashlib.md5(to_bytes(data)).hexdigest()


[文档]def content_type_by_name(name):
    """Return the Content-Type by file name."""
    ext = os.path.splitext(name)[1].lower()
    if ext in _EXTRA_TYPES_MAP:
        return _EXTRA_TYPES_MAP[ext]

    return mimetypes.guess_type(name)[0]


[文档]def set_content_type(headers, name):
    """Set the Content-Type in headers by the name. If the content-type has been set, no-op and return."""
    headers = headers or {}

    if 'Content-Type' in headers:
        return headers

    content_type = content_type_by_name(name)
    if content_type:
        headers['Content-Type'] = content_type

    return headers


[文档]def is_ip_or_localhost(netloc):
    """Determine whether the network address is IP or localhost."""
    loc = netloc.split(':')[0]
    if loc == 'localhost':
        return True

    try:
        socket.inet_aton(loc)
    except socket.error:
        return False

    return True


_ALPHA_NUM = 'abcdefghijklmnopqrstuvwxyz0123456789'
_HYPHEN = '-'
_BUCKET_NAME_CHARS = set(_ALPHA_NUM + _HYPHEN)


[文档]def is_valid_bucket_name(name):
    """Check if the bucket name is valid."""
    if len(name) < 3 or len(name) > 63:
        return False

    if name[-1] == _HYPHEN:
        return False

    if name[0] not in _ALPHA_NUM:
        return False

    return set(name) <= _BUCKET_NAME_CHARS


[文档]class SizedFileAdapter(object):
    """This adapter guarantees reading of data up to only the specified size, even if the original file_object size is larger."""
    def __init__(self, file_object, size):
        self.file_object = file_object
        self.size = size
        self.offset = 0

[文档]    def read(self, amt=None):
        if self.offset >= self.size:
            return ''

        if (amt is None or amt < 0) or (amt + self.offset >= self.size):
            data = self.file_object.read(self.size - self.offset)
            self.offset = self.size
            return data

        self.offset += amt
        return self.file_object.read(amt)

    @property
    def len(self):
        return self.size


[文档]def how_many(m, n):
    return (m + n - 1) // n


[文档]def file_object_remaining_bytes(fileobj):
    current = fileobj.tell()

    fileobj.seek(0, os.SEEK_END)
    end = fileobj.tell()
    fileobj.seek(current, os.SEEK_SET)

    return end - current


def _has_data_size_attr(data):
    return hasattr(data, '__len__') or hasattr(data, 'len') or (hasattr(data, 'seek') and hasattr(data, 'tell'))


def _get_data_size(data):
    if hasattr(data, '__len__'):
        return len(data)

    if hasattr(data, 'len'):
        return data.len

    if hasattr(data, 'seek') and hasattr(data, 'tell'):
        return file_object_remaining_bytes(data)

    return None


_CHUNK_SIZE = 8 * 1024


[文档]def make_progress_adapter(data, progress_callback, size=None):
    """Return an adapter instance so that the progress callback is called when reading the data.
     When parameter `size` is not specified and cannot be dertermined. The total size in the callback is None.

    :param data: It can be bytes,file object or iterable.
    :param progress_callback: Progress callback. See :ref:`progress_callback` for more information.
    :param size: Specify the `data` size, optional.

    :return: The adapters that could call the progress callback.
    """
    data = to_bytes(data)

    if size is None:
        size = _get_data_size(data)

    if size is None:
        if hasattr(data, 'read'):
            return _FileLikeAdapter(data, progress_callback)
        elif hasattr(data, '__iter__'):
            return _IterableAdapter(data, progress_callback)
        else:
            raise ClientError('{0} is not a file object, nor an iterator'.format(data.__class__.__name__))
    else:
        return _BytesAndFileAdapter(data, progress_callback, size)


[文档]def make_crc_adapter(data, init_crc=0):
    """Return an adapter instance so that the CRC can be calculated during reading.

    :param data: It can be bytes,file object or iterable.
    :param init_crc: Init CRC value, optional.

    :return: A adapter that can calls the CRC caluclating function.
    """
    data = to_bytes(data)

    # bytes or file object
    if _has_data_size_attr(data):
        return _BytesAndFileAdapter(data, 
                                    size=_get_data_size(data), 
                                    crc_callback=Crc64(init_crc))
    # file-like object
    elif hasattr(data, 'read'): 
        return _FileLikeAdapter(data, crc_callback=Crc64(init_crc))
    # iterator
    elif hasattr(data, '__iter__'):
        return _IterableAdapter(data, crc_callback=Crc64(init_crc))
    else:
        raise ClientError('{0} is not a file object, nor an iterator'.format(data.__class__.__name__))


[文档]def make_cipher_adapter(data, cipher_callback):
    """Return an adapter instance for encrypting during read.

        :param data: It can be bytes, file object or iterable.
        :param operation: Encrypt or decrypt operations.
        :param key: The password in symmetric encryption which must be 16/24/32 bytes.
        :param start: Counter initial value.

        :return: Adapter that could call the encryption function.
        """
    data = to_bytes(data)

    # bytes or file object
    if _has_data_size_attr(data):
        return _BytesAndFileAdapter(data,
                                    size=_get_data_size(data),
                                    cipher_callback=cipher_callback)
    # file-like object
    elif hasattr(data, 'read'):
        return _FileLikeAdapter(data, cipher_callback=cipher_callback)
    # iterator
    elif hasattr(data, '__iter__'):
        return _IterableAdapter(data, cipher_callback=cipher_callback)
    else:
        raise ClientError('{0} is not a file object, nor an iterator'.format(data.__class__.__name__))


[文档]def check_crc(operation, client_crc, oss_crc, request_id):
    if client_crc is not None and oss_crc is not None and client_crc != oss_crc:
        raise InconsistentError('the crc of {0} between client and oss is not inconsistent'.format(operation),
                                request_id)

def _invoke_crc_callback(crc_callback, content):
    if crc_callback:
        crc_callback(content)


def _invoke_progress_callback(progress_callback, consumed_bytes, total_bytes):
    if progress_callback:
        progress_callback(consumed_bytes, total_bytes)


def _invoke_cipher_callback(cipher_callback, content):
    if cipher_callback:
        content = cipher_callback(content)
    return content


class _IterableAdapter(object):
    def __init__(self, data, progress_callback=None, crc_callback=None, cipher_callback=None):
        self.iter = iter(data)
        self.progress_callback = progress_callback
        self.offset = 0
        
        self.crc_callback = crc_callback
        self.cipher_callback = cipher_callback

    def __iter__(self):
        return self

    def __next__(self):
        return self.next()

    def next(self):            
        _invoke_progress_callback(self.progress_callback, self.offset, None)

        content = next(self.iter)
        self.offset += len(content)
                
        _invoke_crc_callback(self.crc_callback, content)

        content = _invoke_cipher_callback(self.cipher_callback, content)

        return content
    
    @property
    def crc(self):
        if self.crc_callback:
            return self.crc_callback.crc
        elif self.iter:
            return self.iter.crc
        else:
            return None


class _FileLikeAdapter(object):
    """The adapter to monitor the progress for `fileobj` that the content length could not be termined.

    :param fileobj: The file-like object,as long as read() is supported.
    :param progress_callback: Progress callback.
    """
    def __init__(self, fileobj, progress_callback=None, crc_callback=None, cipher_callback=None):
        self.fileobj = fileobj
        self.progress_callback = progress_callback
        self.offset = 0
        
        self.crc_callback = crc_callback
        self.cipher_callback = cipher_callback

    def __iter__(self):
        return self

    def __next__(self):
        return self.next()

    def next(self):
        content = self.read(_CHUNK_SIZE)

        if content:
            return content
        else:
            raise StopIteration

    def read(self, amt=None):
        content = self.fileobj.read(amt)
        if not content:
            _invoke_progress_callback(self.progress_callback, self.offset, None) 
        else:
            _invoke_progress_callback(self.progress_callback, self.offset, None)
                
            self.offset += len(content)
                                   
            _invoke_crc_callback(self.crc_callback, content)

            content = _invoke_cipher_callback(self.cipher_callback, content)

        return content
    
    @property
    def crc(self):
        if self.crc_callback:
            return self.crc_callback.crc
        elif self.fileobj:
            return self.fileobj.crc
        else:
            return None


class _BytesAndFileAdapter(object):
    """The adapter to monitor data's progress.

    :param data: It could be unicode string (internally it's convereted to UTF-8 bytes), bytes or file object. 
    :param progress_callback: Progress callback,The signature is callback(bytes_read, total_bytes).
        `bytes_read` is the bytes read and `total_bytes` is the total bytes.
    :param int size: The size of the `data`.
    """
    def __init__(self, data, progress_callback=None, size=None, crc_callback=None, cipher_callback=None):
        self.data = to_bytes(data)
        self.progress_callback = progress_callback
        self.size = size
        self.offset = 0
        
        self.crc_callback = crc_callback
        self.cipher_callback = cipher_callback

    @property
    def len(self):
        return self.size

    # for python 2.x
    def __bool__(self):
        return True
    # for python 3.x
    __nonzero__=__bool__

    def __iter__(self):
        return self

    def __next__(self):
        return self.next()

    def next(self):
        content = self.read(_CHUNK_SIZE)

        if content:
            return content
        else:
            raise StopIteration

    def read(self, amt=None):
        if self.offset >= self.size:
            return ''

        if amt is None or amt < 0:
            bytes_to_read = self.size - self.offset
        else:
            bytes_to_read = min(amt, self.size - self.offset)

        if isinstance(self.data, bytes):
            content = self.data[self.offset:self.offset+bytes_to_read]
        else:
            content = self.data.read(bytes_to_read)

        self.offset += bytes_to_read
            
        _invoke_progress_callback(self.progress_callback, min(self.offset, self.size), self.size)

        _invoke_crc_callback(self.crc_callback, content)

        content = _invoke_cipher_callback(self.cipher_callback, content)

        return content
    
    @property
    def crc(self):
        if self.crc_callback:
            return self.crc_callback.crc
        elif self.data:
            return self.data.crc
        else:
            return None


[文档]class Crc64(object):

    _POLY = 0x142F0E1EBA9EA3693
    _XOROUT = 0XFFFFFFFFFFFFFFFF
    
    def __init__(self, init_crc=0):
        self.crc64 = crcmod.Crc(self._POLY, initCrc=init_crc, rev=True, xorOut=self._XOROUT)

    def __call__(self, data):
        self.update(data)
    
[文档]    def update(self, data):
        self.crc64.update(data)
    
    @property
    def crc(self):
        return self.crc64.crcValue


[文档]def random_aes256_key():
    return Random.new().read(_AES_256_KEY_SIZE)


[文档]def random_counter(begin=1, end=10):
    return random.randint(begin, end)


# aes 256, key always is 32 bytes
_AES_256_KEY_SIZE = 32

_AES_CTR_COUNTER_BITS_LEN = 8 * 16

_AES_GCM = 'AES/GCM/NoPadding'


[文档]class AESCipher:
    """AES256 encryption implementation.

    :param str key: Symmetric encrypted key.
    :type key: str
 
    :param str start: Symmetric encryption initial random value.
    :type start: str

    .. Note::
        Users can implement symmetric encryption algorithm of their own. 
        1: Provide a symmetric encryption algorithm name, ALGORITHM
        2: Provide a static method to return the encryption key and the initial random value (if the algorithm does not require an initial random value, it also needs to be provided). 
        3: Provide encryption and decryption methods.
    """
    ALGORITHM = _AES_GCM

[文档]    @staticmethod
    def get_key():
        return random_aes256_key()

[文档]    @staticmethod
    def get_start():
        return random_counter()

    def __init__(self, key=None, start=None):
        self.key = key
        if not self.key:
            self.key = random_aes256_key()
        if not start:
            self.start = random_counter()
        else:
            self.start = int(start)
        ctr = Counter.new(_AES_CTR_COUNTER_BITS_LEN, initial_value=self.start)
        self.__cipher = AES.new(self.key, AES.MODE_CTR, counter=ctr)

[文档]    def encrypt(self, raw):
        return self.__cipher.encrypt(raw)

[文档]    def decrypt(self, enc):
        return self.__cipher.decrypt(enc)


_STRPTIME_LOCK = threading.Lock()

_ISO8601_FORMAT = "%Y-%m-%dT%H:%M:%S.000Z"

# A regex to match HTTP Last-Modified header, whose format is 'Sat, 05 Dec 2015 11:10:29 GMT'.
# Its strftime/strptime format is '%a, %d %b %Y %H:%M:%S GMT'

_HTTP_GMT_RE = re.compile(
    r'(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun), (?P<day>0[1-9]|([1-2]\d)|(3[0-1])) (?P<month>Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec) (?P<year>\d+) (?P<hour>([0-1]\d)|(2[0-3])):(?P<minute>[0-5]\d):(?P<second>[0-5]\d) GMT
#039;
)

_ISO8601_RE = re.compile(
    r'(?P<year>\d+)-(?P<month>01|02|03|04|05|06|07|08|09|10|11|12)-(?P<day>0[1-9]|([1-2]\d)|(3[0-1]))T(?P<hour>([0-1]\d)|(2[0-3])):(?P<minute>[0-5]\d):(?P<second>[0-5]\d)\.000Z
#039;
)

_MONTH_MAPPING = {
    'Jan': 1,
    'Feb': 2,
    'Mar': 3,
    'Apr': 4,
    'May': 5,
    'Jun': 6,
    'Jul': 7,
    'Aug': 8,
    'Sep': 9,
    'Oct': 10,
    'Nov': 11,
    'Dec': 12
}


[文档]def to_unixtime(time_string, format_string):
    with _STRPTIME_LOCK:
        return int(calendar.timegm(time.strptime(time_string, format_string)))


[文档]def http_date(timeval=None):
    """Return the HTTP standard GMT time string. If using strftime format, it would be '%a, %d %b %Y %H:%M:%S GMT'.
    But strftime() cannot be used as it is locale dependent.
    """
    return formatdate(timeval, usegmt=True)


[文档]def http_to_unixtime(time_string):
    """Convert the HTTP date to Unix time (total seconds since 1970 Jan First, 00:00).

    HTTP Date such as `Sat, 05 Dec 2015 11:10:29 GMT` 。
    """
    m = _HTTP_GMT_RE.match(time_string)

    if not m:
        raise ValueError(time_string + " is not in valid HTTP date format")

    day = int(m.group('day'))
    month = _MONTH_MAPPING[m.group('month')]
    year = int(m.group('year'))
    hour = int(m.group('hour'))
    minute = int(m.group('minute'))
    second = int(m.group('second'))

    tm = datetime.datetime(year, month, day, hour, minute, second).timetuple()

    return calendar.timegm(tm)


[文档]def iso8601_to_unixtime(time_string):
    """Convert the ISO8601 time string (e.g. 2012-02-24T06:07:48.000Z) to Unix time in seconds"""

    m = _ISO8601_RE.match(time_string)

    if not m:
        raise ValueError(time_string + " is not in valid ISO8601 format")

    day = int(m.group('day'))
    month = int(m.group('month'))
    year = int(m.group('year'))
    hour = int(m.group('hour'))
    minute = int(m.group('minute'))
    second = int(m.group('second'))

    tm = datetime.datetime(year, month, day, hour, minute, second).timetuple()

    return calendar.timegm(tm)


[文档]def date_to_iso8601(d):
    return d.strftime(_ISO8601_FORMAT)  # It's OK to use strftime, since _ISO8601_FORMAT is not locale dependent


[文档]def iso8601_to_date(time_string):
    timestamp = iso8601_to_unixtime(time_string)
    return datetime.date.fromtimestamp(timestamp)


[文档]def makedir_p(dirpath):
    try:
        os.makedirs(dirpath)
    except os.error as e:
        if e.errno != errno.EEXIST:
            raise


[文档]def silently_remove(filename):
    """Silently remove the file. If the file does not exist, no-op and return without error."""
    try:
        os.remove(filename)
    except OSError as e:
        if e.errno != errno.ENOENT:
            raise


[文档]def force_rename(src, dst):
    try:
        os.rename(src, dst)
    except OSError as e:
        if e.errno == errno.EEXIST:
            silently_remove(dst)
            os.rename(src, dst)
        else:
            raise


[文档]def copyfileobj_and_verify(fsrc, fdst, expected_len,
                           chunk_size=16*1024,
                           request_id=''):
    """copy data from file-like object fsrc to file-like object fdst, and verify the length"""

    num_read = 0

    while 1:
        buf = fsrc.read(chunk_size)
        if not buf:
            break

        num_read += len(buf)
        fdst.write(buf)

    if num_read != expected_len:
        raise InconsistentError("IncompleteRead from source", request_id)