oss2.utils 源代码
优质
小牛编辑
128浏览
2023-12-01
# -*- coding: utf-8 -*- """ oss2.utils ---------- Utils module """ from email.utils import formatdate import os.path import mimetypes import socket import hashlib import base64 import threading import calendar import datetime import time import errno import binascii import crcmod import re import random from Crypto.Cipher import AES from Crypto import Random from Crypto.Util import Counter from .compat import to_string, to_bytes from .exceptions import ClientError, InconsistentError, RequestError, OpenApiFormatError _EXTRA_TYPES_MAP = { ".js": "application/javascript", ".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ".xltx": "application/vnd.openxmlformats-officedocument.spreadsheetml.template", ".potx": "application/vnd.openxmlformats-officedocument.presentationml.template", ".ppsx": "application/vnd.openxmlformats-officedocument.presentationml.slideshow", ".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation", ".sldx": "application/vnd.openxmlformats-officedocument.presentationml.slide", ".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", ".dotx": "application/vnd.openxmlformats-officedocument.wordprocessingml.template", ".xlam": "application/vnd.ms-excel.addin.macroEnabled.12", ".xlsb": "application/vnd.ms-excel.sheet.binary.macroEnabled.12", ".apk": "application/vnd.android.package-archive" } [文档]def b64encode_as_string(data): return to_string(base64.b64encode(to_bytes(data))) [文档]def b64decode_from_string(data): try: return base64.b64decode(to_string(data)) except (TypeError, binascii.Error) as e: raise OpenApiFormatError('Base64 Error: ' + to_string(data)) [文档]def content_md5(data): """Calculate the MD5 of the data. The return value is base64 encoded str. The return value could be value of of HTTP Content-MD5 header. """ m = hashlib.md5(to_bytes(data)) return b64encode_as_string(m.digest()) [文档]def md5_string(data): """Returns MD5 value of `data` in hex string (hexdigest()).""" return hashlib.md5(to_bytes(data)).hexdigest() [文档]def content_type_by_name(name): """Return the Content-Type by file name.""" ext = os.path.splitext(name)[1].lower() if ext in _EXTRA_TYPES_MAP: return _EXTRA_TYPES_MAP[ext] return mimetypes.guess_type(name)[0] [文档]def set_content_type(headers, name): """Set the Content-Type in headers by the name. If the content-type has been set, no-op and return.""" headers = headers or {} if 'Content-Type' in headers: return headers content_type = content_type_by_name(name) if content_type: headers['Content-Type'] = content_type return headers [文档]def is_ip_or_localhost(netloc): """Determine whether the network address is IP or localhost.""" loc = netloc.split(':')[0] if loc == 'localhost': return True try: socket.inet_aton(loc) except socket.error: return False return True _ALPHA_NUM = 'abcdefghijklmnopqrstuvwxyz0123456789' _HYPHEN = '-' _BUCKET_NAME_CHARS = set(_ALPHA_NUM + _HYPHEN) [文档]def is_valid_bucket_name(name): """Check if the bucket name is valid.""" if len(name) < 3 or len(name) > 63: return False if name[-1] == _HYPHEN: return False if name[0] not in _ALPHA_NUM: return False return set(name) <= _BUCKET_NAME_CHARS [文档]class SizedFileAdapter(object): """This adapter guarantees reading of data up to only the specified size, even if the original file_object size is larger.""" def __init__(self, file_object, size): self.file_object = file_object self.size = size self.offset = 0 [文档] def read(self, amt=None): if self.offset >= self.size: return '' if (amt is None or amt < 0) or (amt + self.offset >= self.size): data = self.file_object.read(self.size - self.offset) self.offset = self.size return data self.offset += amt return self.file_object.read(amt) @property def len(self): return self.size [文档]def how_many(m, n): return (m + n - 1) // n [文档]def file_object_remaining_bytes(fileobj): current = fileobj.tell() fileobj.seek(0, os.SEEK_END) end = fileobj.tell() fileobj.seek(current, os.SEEK_SET) return end - current def _has_data_size_attr(data): return hasattr(data, '__len__') or hasattr(data, 'len') or (hasattr(data, 'seek') and hasattr(data, 'tell')) def _get_data_size(data): if hasattr(data, '__len__'): return len(data) if hasattr(data, 'len'): return data.len if hasattr(data, 'seek') and hasattr(data, 'tell'): return file_object_remaining_bytes(data) return None _CHUNK_SIZE = 8 * 1024 [文档]def make_progress_adapter(data, progress_callback, size=None): """Return an adapter instance so that the progress callback is called when reading the data. When parameter `size` is not specified and cannot be dertermined. The total size in the callback is None. :param data: It can be bytes,file object or iterable. :param progress_callback: Progress callback. See :ref:`progress_callback` for more information. :param size: Specify the `data` size, optional. :return: The adapters that could call the progress callback. """ data = to_bytes(data) if size is None: size = _get_data_size(data) if size is None: if hasattr(data, 'read'): return _FileLikeAdapter(data, progress_callback) elif hasattr(data, '__iter__'): return _IterableAdapter(data, progress_callback) else: raise ClientError('{0} is not a file object, nor an iterator'.format(data.__class__.__name__)) else: return _BytesAndFileAdapter(data, progress_callback, size) [文档]def make_crc_adapter(data, init_crc=0): """Return an adapter instance so that the CRC can be calculated during reading. :param data: It can be bytes,file object or iterable. :param init_crc: Init CRC value, optional. :return: A adapter that can calls the CRC caluclating function. """ data = to_bytes(data) # bytes or file object if _has_data_size_attr(data): return _BytesAndFileAdapter(data, size=_get_data_size(data), crc_callback=Crc64(init_crc)) # file-like object elif hasattr(data, 'read'): return _FileLikeAdapter(data, crc_callback=Crc64(init_crc)) # iterator elif hasattr(data, '__iter__'): return _IterableAdapter(data, crc_callback=Crc64(init_crc)) else: raise ClientError('{0} is not a file object, nor an iterator'.format(data.__class__.__name__)) [文档]def make_cipher_adapter(data, cipher_callback): """Return an adapter instance for encrypting during read. :param data: It can be bytes, file object or iterable. :param operation: Encrypt or decrypt operations. :param key: The password in symmetric encryption which must be 16/24/32 bytes. :param start: Counter initial value. :return: Adapter that could call the encryption function. """ data = to_bytes(data) # bytes or file object if _has_data_size_attr(data): return _BytesAndFileAdapter(data, size=_get_data_size(data), cipher_callback=cipher_callback) # file-like object elif hasattr(data, 'read'): return _FileLikeAdapter(data, cipher_callback=cipher_callback) # iterator elif hasattr(data, '__iter__'): return _IterableAdapter(data, cipher_callback=cipher_callback) else: raise ClientError('{0} is not a file object, nor an iterator'.format(data.__class__.__name__)) [文档]def check_crc(operation, client_crc, oss_crc, request_id): if client_crc is not None and oss_crc is not None and client_crc != oss_crc: raise InconsistentError('the crc of {0} between client and oss is not inconsistent'.format(operation), request_id) def _invoke_crc_callback(crc_callback, content): if crc_callback: crc_callback(content) def _invoke_progress_callback(progress_callback, consumed_bytes, total_bytes): if progress_callback: progress_callback(consumed_bytes, total_bytes) def _invoke_cipher_callback(cipher_callback, content): if cipher_callback: content = cipher_callback(content) return content class _IterableAdapter(object): def __init__(self, data, progress_callback=None, crc_callback=None, cipher_callback=None): self.iter = iter(data) self.progress_callback = progress_callback self.offset = 0 self.crc_callback = crc_callback self.cipher_callback = cipher_callback def __iter__(self): return self def __next__(self): return self.next() def next(self): _invoke_progress_callback(self.progress_callback, self.offset, None) content = next(self.iter) self.offset += len(content) _invoke_crc_callback(self.crc_callback, content) content = _invoke_cipher_callback(self.cipher_callback, content) return content @property def crc(self): if self.crc_callback: return self.crc_callback.crc elif self.iter: return self.iter.crc else: return None class _FileLikeAdapter(object): """The adapter to monitor the progress for `fileobj` that the content length could not be termined. :param fileobj: The file-like object,as long as read() is supported. :param progress_callback: Progress callback. """ def __init__(self, fileobj, progress_callback=None, crc_callback=None, cipher_callback=None): self.fileobj = fileobj self.progress_callback = progress_callback self.offset = 0 self.crc_callback = crc_callback self.cipher_callback = cipher_callback def __iter__(self): return self def __next__(self): return self.next() def next(self): content = self.read(_CHUNK_SIZE) if content: return content else: raise StopIteration def read(self, amt=None): content = self.fileobj.read(amt) if not content: _invoke_progress_callback(self.progress_callback, self.offset, None) else: _invoke_progress_callback(self.progress_callback, self.offset, None) self.offset += len(content) _invoke_crc_callback(self.crc_callback, content) content = _invoke_cipher_callback(self.cipher_callback, content) return content @property def crc(self): if self.crc_callback: return self.crc_callback.crc elif self.fileobj: return self.fileobj.crc else: return None class _BytesAndFileAdapter(object): """The adapter to monitor data's progress. :param data: It could be unicode string (internally it's convereted to UTF-8 bytes), bytes or file object. :param progress_callback: Progress callback,The signature is callback(bytes_read, total_bytes). `bytes_read` is the bytes read and `total_bytes` is the total bytes. :param int size: The size of the `data`. """ def __init__(self, data, progress_callback=None, size=None, crc_callback=None, cipher_callback=None): self.data = to_bytes(data) self.progress_callback = progress_callback self.size = size self.offset = 0 self.crc_callback = crc_callback self.cipher_callback = cipher_callback @property def len(self): return self.size # for python 2.x def __bool__(self): return True # for python 3.x __nonzero__=__bool__ def __iter__(self): return self def __next__(self): return self.next() def next(self): content = self.read(_CHUNK_SIZE) if content: return content else: raise StopIteration def read(self, amt=None): if self.offset >= self.size: return '' if amt is None or amt < 0: bytes_to_read = self.size - self.offset else: bytes_to_read = min(amt, self.size - self.offset) if isinstance(self.data, bytes): content = self.data[self.offset:self.offset+bytes_to_read] else: content = self.data.read(bytes_to_read) self.offset += bytes_to_read _invoke_progress_callback(self.progress_callback, min(self.offset, self.size), self.size) _invoke_crc_callback(self.crc_callback, content) content = _invoke_cipher_callback(self.cipher_callback, content) return content @property def crc(self): if self.crc_callback: return self.crc_callback.crc elif self.data: return self.data.crc else: return None [文档]class Crc64(object): _POLY = 0x142F0E1EBA9EA3693 _XOROUT = 0XFFFFFFFFFFFFFFFF def __init__(self, init_crc=0): self.crc64 = crcmod.Crc(self._POLY, initCrc=init_crc, rev=True, xorOut=self._XOROUT) def __call__(self, data): self.update(data) [文档] def update(self, data): self.crc64.update(data) @property def crc(self): return self.crc64.crcValue [文档]def random_aes256_key(): return Random.new().read(_AES_256_KEY_SIZE) [文档]def random_counter(begin=1, end=10): return random.randint(begin, end) # aes 256, key always is 32 bytes _AES_256_KEY_SIZE = 32 _AES_CTR_COUNTER_BITS_LEN = 8 * 16 _AES_GCM = 'AES/GCM/NoPadding' [文档]class AESCipher: """AES256 encryption implementation. :param str key: Symmetric encrypted key. :type key: str :param str start: Symmetric encryption initial random value. :type start: str .. Note:: Users can implement symmetric encryption algorithm of their own. 1: Provide a symmetric encryption algorithm name, ALGORITHM 2: Provide a static method to return the encryption key and the initial random value (if the algorithm does not require an initial random value, it also needs to be provided). 3: Provide encryption and decryption methods. """ ALGORITHM = _AES_GCM [文档] @staticmethod def get_key(): return random_aes256_key() [文档] @staticmethod def get_start(): return random_counter() def __init__(self, key=None, start=None): self.key = key if not self.key: self.key = random_aes256_key() if not start: self.start = random_counter() else: self.start = int(start) ctr = Counter.new(_AES_CTR_COUNTER_BITS_LEN, initial_value=self.start) self.__cipher = AES.new(self.key, AES.MODE_CTR, counter=ctr) [文档] def encrypt(self, raw): return self.__cipher.encrypt(raw) [文档] def decrypt(self, enc): return self.__cipher.decrypt(enc) _STRPTIME_LOCK = threading.Lock() _ISO8601_FORMAT = "%Y-%m-%dT%H:%M:%S.000Z" # A regex to match HTTP Last-Modified header, whose format is 'Sat, 05 Dec 2015 11:10:29 GMT'. # Its strftime/strptime format is '%a, %d %b %Y %H:%M:%S GMT' _HTTP_GMT_RE = re.compile( r'(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun), (?P<day>0[1-9]|([1-2]\d)|(3[0-1])) (?P<month>Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec) (?P<year>\d+) (?P<hour>([0-1]\d)|(2[0-3])):(?P<minute>[0-5]\d):(?P<second>[0-5]\d) GMT#039; ) _ISO8601_RE = re.compile( r'(?P<year>\d+)-(?P<month>01|02|03|04|05|06|07|08|09|10|11|12)-(?P<day>0[1-9]|([1-2]\d)|(3[0-1]))T(?P<hour>([0-1]\d)|(2[0-3])):(?P<minute>[0-5]\d):(?P<second>[0-5]\d)\.000Z#039; ) _MONTH_MAPPING = { 'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12 } [文档]def to_unixtime(time_string, format_string): with _STRPTIME_LOCK: return int(calendar.timegm(time.strptime(time_string, format_string))) [文档]def http_date(timeval=None): """Return the HTTP standard GMT time string. If using strftime format, it would be '%a, %d %b %Y %H:%M:%S GMT'. But strftime() cannot be used as it is locale dependent. """ return formatdate(timeval, usegmt=True) [文档]def http_to_unixtime(time_string): """Convert the HTTP date to Unix time (total seconds since 1970 Jan First, 00:00). HTTP Date such as `Sat, 05 Dec 2015 11:10:29 GMT` 。 """ m = _HTTP_GMT_RE.match(time_string) if not m: raise ValueError(time_string + " is not in valid HTTP date format") day = int(m.group('day')) month = _MONTH_MAPPING[m.group('month')] year = int(m.group('year')) hour = int(m.group('hour')) minute = int(m.group('minute')) second = int(m.group('second')) tm = datetime.datetime(year, month, day, hour, minute, second).timetuple() return calendar.timegm(tm) [文档]def iso8601_to_unixtime(time_string): """Convert the ISO8601 time string (e.g. 2012-02-24T06:07:48.000Z) to Unix time in seconds""" m = _ISO8601_RE.match(time_string) if not m: raise ValueError(time_string + " is not in valid ISO8601 format") day = int(m.group('day')) month = int(m.group('month')) year = int(m.group('year')) hour = int(m.group('hour')) minute = int(m.group('minute')) second = int(m.group('second')) tm = datetime.datetime(year, month, day, hour, minute, second).timetuple() return calendar.timegm(tm) [文档]def date_to_iso8601(d): return d.strftime(_ISO8601_FORMAT) # It's OK to use strftime, since _ISO8601_FORMAT is not locale dependent [文档]def iso8601_to_date(time_string): timestamp = iso8601_to_unixtime(time_string) return datetime.date.fromtimestamp(timestamp) [文档]def makedir_p(dirpath): try: os.makedirs(dirpath) except os.error as e: if e.errno != errno.EEXIST: raise [文档]def silently_remove(filename): """Silently remove the file. If the file does not exist, no-op and return without error.""" try: os.remove(filename) except OSError as e: if e.errno != errno.ENOENT: raise [文档]def force_rename(src, dst): try: os.rename(src, dst) except OSError as e: if e.errno == errno.EEXIST: silently_remove(dst) os.rename(src, dst) else: raise [文档]def copyfileobj_and_verify(fsrc, fdst, expected_len, chunk_size=16*1024, request_id=''): """copy data from file-like object fsrc to file-like object fdst, and verify the length""" num_read = 0 while 1: buf = fsrc.read(chunk_size) if not buf: break num_read += len(buf) fdst.write(buf) if num_read != expected_len: raise InconsistentError("IncompleteRead from source", request_id)