oss2.auth 源代码
优质
小牛编辑
134浏览
2023-12-01
# -*- coding: utf-8 -*- import hmac import hashlib import time from . import utils from .compat import urlquote, to_bytes from .defaults import get_logger import logging AUTH_VERSION_1 = 'v1' AUTH_VERSION_2 = 'v2' [文档]def make_auth(access_key_id, access_key_secret, auth_version=AUTH_VERSION_1): if auth_version == AUTH_VERSION_2: return AuthV2(access_key_id.strip(), access_key_secret.strip()) else: return Auth(access_key_id.strip(), access_key_secret.strip()) [文档]class AuthBase(object): """Store user's AccessKeyId,AccessKeySecret information and calcualte the signature. """ def __init__(self, access_key_id, access_key_secret): self.id = access_key_id.strip() self.secret = access_key_secret.strip() def _sign_rtmp_url(self, url, bucket_name, channel_name, playlist_name, expires, params): expiration_time = int(time.time()) + expires canonicalized_resource = "/%s/%s" % (bucket_name, channel_name) canonicalized_params = [] if params: items = params.items() for k, v in items: if k != "OSSAccessKeyId" and k != "Signature" and k != "Expires" and k != "SecurityToken": canonicalized_params.append((k, v)) canonicalized_params.sort(key=lambda e: e[0]) canon_params_str = '' for k, v in canonicalized_params: canon_params_str += '%s:%s\n' % (k, v) p = params if params else {} string_to_sign = str(expiration_time) + "\n" + canon_params_str + canonicalized_resource get_logger().debug('string_to_sign={0}'.format(string_to_sign)) h = hmac.new(to_bytes(self.secret), to_bytes(string_to_sign), hashlib.sha1) signature = utils.b64encode_as_string(h.digest()) p['OSSAccessKeyId'] = self.id p['Expires'] = str(expiration_time) p['Signature'] = signature return url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in p.items()) [文档]class Auth(AuthBase): """The first version of the signature. Stores AccessKeyId and AccessKeySecret of the user, and calculates the signature. """ _subresource_key_set = frozenset( ['response-content-type', 'response-content-language', 'response-cache-control', 'logging', 'response-content-encoding', 'acl', 'uploadId', 'uploads', 'partNumber', 'group', 'link', 'delete', 'website', 'location', 'objectInfo', 'objectMeta', 'response-expires', 'response-content-disposition', 'cors', 'lifecycle', 'restore', 'qos', 'referer', 'stat', 'bucketInfo', 'append', 'position', 'security-token', 'live', 'comp', 'status', 'vod', 'startTime', 'endTime', 'x-oss-process', 'symlink', 'callback', 'callback-var'] ) def _sign_request(self, req, bucket_name, key): req.headers['date'] = utils.http_date() signature = self.__make_signature(req, bucket_name, key) req.headers['authorization'] = "OSS {0}:{1}".format(self.id, signature) def _sign_url(self, req, bucket_name, key, expires): expiration_time = int(time.time()) + expires req.headers['date'] = str(expiration_time) signature = self.__make_signature(req, bucket_name, key) req.params['OSSAccessKeyId'] = self.id req.params['Expires'] = str(expiration_time) req.params['Signature'] = signature return req.url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in req.params.items()) def __make_signature(self, req, bucket_name, key): string_to_sign = self.__get_string_to_sign(req, bucket_name, key) get_logger().debug('string_to_sign={0}'.format(string_to_sign)) h = hmac.new(to_bytes(self.secret), to_bytes(string_to_sign), hashlib.sha1) return utils.b64encode_as_string(h.digest()) def __get_string_to_sign(self, req, bucket_name, key): resource_string = self.__get_resource_string(req, bucket_name, key) headers_string = self.__get_headers_string(req) content_md5 = req.headers.get('content-md5', '') content_type = req.headers.get('content-type', '') date = req.headers.get('date', '') return '\n'.join([req.method, content_md5, content_type, date, headers_string + resource_string]) def __get_headers_string(self, req): headers = req.headers canon_headers = [] for k, v in headers.items(): lower_key = k.lower() if lower_key.startswith('x-oss-'): canon_headers.append((lower_key, v)) canon_headers.sort(key=lambda x: x[0]) if canon_headers: return '\n'.join(k + ':' + v for k, v in canon_headers) + '\n' else: return '' def __get_resource_string(self, req, bucket_name, key): if not bucket_name: return '/' else: return '/{0}/{1}{2}'.format(bucket_name, key, self.__get_subresource_string(req.params)) def __get_subresource_string(self, params): if not params: return '' subresource_params = [] for key, value in params.items(): if key in self._subresource_key_set: subresource_params.append((key, value)) subresource_params.sort(key=lambda e: e[0]) if subresource_params: return '?' + '&'.join(self.__param_to_query(k, v) for k, v in subresource_params) else: return '' def __param_to_query(self, k, v): if v: return k + '=' + v else: return k [文档]class AnonymousAuth(object): """Anonymous Auth .. Note:: Anonymous users can only read buckets with public-read permissions, or read from or write to buckets with public-read-write permissions. They are unable to execute service or bucket related operations, such as listing objects under a bucket. """ def _sign_request(self, req, bucket_name, key): pass def _sign_url(self, req, bucket_name, key, expires): return req.url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in req.params.items()) def _sign_rtmp_url(self, url, bucket_name, channel_name, playlist_name, expires, params): return url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in params.items()) [文档]class StsAuth(object): """Used for STS authentication. Users can get the AccessKeyID, AccessKeySecret, and SecurityToken from the Alibaba Cloud STS service (https://aliyuncs.com). .. Note:: The AccessKeyId/Secret and SecurityToken have expiration times. When they are renewed, the `STSAuth` property of class :class:`Bucket <oss2.Bucket>` instance must be updated with the new credentials. :param str access_key_id: Temporary AccessKeyId :param str access_key_secret: Temporary AccessKeySecret :param str security_token: Temporary SecurityToken :param str auth_version: The version of the auth needs to be generated, the default value is AUTH_VERSION_1(v1). """ def __init__(self, access_key_id, access_key_secret, security_token, auth_version=AUTH_VERSION_1): self.__auth = make_auth(access_key_id, access_key_secret, auth_version) self.__security_token = security_token def _sign_request(self, req, bucket_name, key): req.headers['x-oss-security-token'] = self.__security_token self.__auth._sign_request(req, bucket_name, key) def _sign_url(self, req, bucket_name, key, expires): req.params['security-token'] = self.__security_token return self.__auth._sign_url(req, bucket_name, key, expires) def _sign_rtmp_url(self, url, bucket_name, channel_name, playlist_name, expires, params): params['security-token'] = self.__security_token return self.__auth._sign_rtmp_url(url, bucket_name, channel_name, playlist_name, expires, params) def _param_to_quoted_query(k, v): if v: return urlquote(k, '') + '=' + urlquote(v, '') else: return urlquote(k, '') [文档]def v2_uri_encode(raw_text): raw_text = to_bytes(raw_text) res = '' for b in raw_text: if isinstance(b, int): c = chr(b) else: c = b if (c >= 'A' and c <= 'Z') or (c >= 'a' and c <= 'z')\ or (c >= '0' and c <= '9') or c in ['_', '-', '~', '.']: res += c else: res += "%{0:02X}".format(ord(c)) return res _DEFAULT_ADDITIONAL_HEADERS = set(['range', 'if-modified-since']) [文档]class AuthV2(AuthBase): """The signature version 2. The differences from version 1 are below : 1. Using the SHA256 algorithm, it has higher security. 2. Parameter calculation contains all HTTP query parameters. """ def _sign_request(self, req, bucket_name, key, in_additional_headers=None): """Authorization is placed in the header of req :param req: The req whose header will be put Authorization in. :type req: oss2.http.Request :param bucket_name: Bucket name. :param key: The object name in OSS. :param in_additional_headers: The signature calculation filled with signature calculations. """ if in_additional_headers is None: in_additional_headers = _DEFAULT_ADDITIONAL_HEADERS additional_headers = self.__get_additional_headers(req, in_additional_headers) req.headers['date'] = utils.http_date() signature = self.__make_signature(req, bucket_name, key, additional_headers) if additional_headers: req.headers['authorization'] = "OSS2 AccessKeyId:{0},AdditionalHeaders:{1},Signature:{2}"\ .format(self.id, ';'.join(additional_headers), signature) else: req.headers['authorization'] = "OSS2 AccessKeyId:{0},Signature:{1}".format(self.id, signature) def _sign_url(self, req, bucket_name, key, expires, in_additional_headers=None): """Return a signed URL. :param req: A request for a signature. :type req: oss2.http.Request :param bucket_name: Bucket name. :param key: The object name in OSS. :param int expires: The returned URL will expire after `expires` seconds. :param in_additional_headers: The signature calculation filled with signature calculations. :return: a signed URL. """ if in_additional_headers is None: in_additional_headers = set() additional_headers = self.__get_additional_headers(req, in_additional_headers) expiration_time = int(time.time()) + expires req.headers['date'] = str(expiration_time) # re-use __make_signature by setting the 'date' header req.params['x-oss-signature-version'] = 'OSS2' req.params['x-oss-expires'] = str(expiration_time) req.params['x-oss-access-key-id'] = self.id signature = self.__make_signature(req, bucket_name, key, additional_headers) req.params['x-oss-signature'] = signature return req.url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in req.params.items()) def __make_signature(self, req, bucket_name, key, additional_headers): string_to_sign = self.__get_string_to_sign(req, bucket_name, key, additional_headers) logging.info('string_to_sign={0}'.format(string_to_sign)) h = hmac.new(to_bytes(self.secret), to_bytes(string_to_sign), hashlib.sha256) return utils.b64encode_as_string(h.digest()) def __get_additional_headers(self, req, in_additional_headers): # we add a header into additional_headers only if it is already in req's headers. additional_headers = set(h.lower() for h in in_additional_headers) keys_in_header = set(k.lower() for k in req.headers.keys()) return additional_headers & keys_in_header def __get_string_to_sign(self, req, bucket_name, key, additional_header_list): verb = req.method content_md5 = req.headers.get('content-md5', '') content_type = req.headers.get('content-type', '') date = req.headers.get('date', '') canonicalized_oss_headers = self.__get_canonicalized_oss_headers(req, additional_header_list) additional_headers = ';'.join(sorted(additional_header_list)) canonicalized_resource = self.__get_resource_string(req, bucket_name, key) return verb + '\n' +\ content_md5 + '\n' +\ content_type + '\n' +\ date + '\n' +\ canonicalized_oss_headers +\ additional_headers + '\n' +\ canonicalized_resource def __get_resource_string(self, req, bucket_name, key): if bucket_name: encoded_uri = v2_uri_encode('/' + bucket_name + '/' + key) else: encoded_uri = v2_uri_encode('/') logging.info('encoded_uri={0} key={1}'.format(encoded_uri, key)) return encoded_uri + self.__get_canonalized_query_string(req) def __get_canonalized_query_string(self, req): encoded_params = {} for param, value in req.params.items(): encoded_params[v2_uri_encode(param)] = v2_uri_encode(value) if not encoded_params: return '' sorted_params = sorted(encoded_params.items(), key=lambda e: e[0]) return '?' + '&'.join(self.__param_to_query(k, v) for k, v in sorted_params) def __param_to_query(self, k, v): if v: return k + '=' + v else: return k def __get_canonicalized_oss_headers(self, req, additional_headers): """ :param additional_headers: Lowercase headers list, and these headers do not prefix with 'x-oss-'. """ canon_headers = [] for k, v in req.headers.items(): lower_key = k.lower() if lower_key.startswith('x-oss-') or lower_key in additional_headers: canon_headers.append((lower_key, v)) canon_headers.sort(key=lambda x: x[0]) return ''.join(v[0] + ':' + v[1] + '\n' for v in canon_headers)