oss2.auth 源代码

优质
小牛编辑
134浏览
2023-12-01
# -*- coding: utf-8 -*-

import hmac
import hashlib
import time

from . import utils
from .compat import urlquote, to_bytes

from .defaults import get_logger
import logging

AUTH_VERSION_1 = 'v1'
AUTH_VERSION_2 = 'v2'


[文档]def make_auth(access_key_id, access_key_secret, auth_version=AUTH_VERSION_1):
    if auth_version == AUTH_VERSION_2:
        return AuthV2(access_key_id.strip(), access_key_secret.strip())
    else:
        return Auth(access_key_id.strip(), access_key_secret.strip())


[文档]class AuthBase(object):
    """Store user's AccessKeyId,AccessKeySecret information and calcualte the signature. """
    def __init__(self, access_key_id, access_key_secret):
        self.id = access_key_id.strip()
        self.secret = access_key_secret.strip()

    def _sign_rtmp_url(self, url, bucket_name, channel_name, playlist_name, expires, params):
        expiration_time = int(time.time()) + expires

        canonicalized_resource = "/%s/%s" % (bucket_name, channel_name)
        canonicalized_params = []

        if params:
            items = params.items()
            for k, v in items:
                if k != "OSSAccessKeyId" and k != "Signature" and k != "Expires" and k != "SecurityToken":
                    canonicalized_params.append((k, v))

        canonicalized_params.sort(key=lambda e: e[0])
        canon_params_str = ''
        for k, v in canonicalized_params:
            canon_params_str += '%s:%s\n' % (k, v)

        p = params if params else {}
        string_to_sign = str(expiration_time) + "\n" + canon_params_str + canonicalized_resource
        get_logger().debug('string_to_sign={0}'.format(string_to_sign))

        h = hmac.new(to_bytes(self.secret), to_bytes(string_to_sign), hashlib.sha1)
        signature = utils.b64encode_as_string(h.digest())

        p['OSSAccessKeyId'] = self.id
        p['Expires'] = str(expiration_time)
        p['Signature'] = signature

        return url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in p.items())


[文档]class Auth(AuthBase):
    """The first version of the signature. Stores AccessKeyId and AccessKeySecret of the user, and calculates the signature. """
    _subresource_key_set = frozenset(
        ['response-content-type', 'response-content-language',
         'response-cache-control', 'logging', 'response-content-encoding',
         'acl', 'uploadId', 'uploads', 'partNumber', 'group', 'link',
         'delete', 'website', 'location', 'objectInfo', 'objectMeta',
         'response-expires', 'response-content-disposition', 'cors', 'lifecycle',
         'restore', 'qos', 'referer', 'stat', 'bucketInfo', 'append', 'position', 'security-token',
         'live', 'comp', 'status', 'vod', 'startTime', 'endTime', 'x-oss-process',
         'symlink', 'callback', 'callback-var']
    )

    def _sign_request(self, req, bucket_name, key):
        req.headers['date'] = utils.http_date()

        signature = self.__make_signature(req, bucket_name, key)
        req.headers['authorization'] = "OSS {0}:{1}".format(self.id, signature)

    def _sign_url(self, req, bucket_name, key, expires):
        expiration_time = int(time.time()) + expires

        req.headers['date'] = str(expiration_time)
        signature = self.__make_signature(req, bucket_name, key)

        req.params['OSSAccessKeyId'] = self.id
        req.params['Expires'] = str(expiration_time)
        req.params['Signature'] = signature

        return req.url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in req.params.items())

    def __make_signature(self, req, bucket_name, key):
        string_to_sign = self.__get_string_to_sign(req, bucket_name, key)

        get_logger().debug('string_to_sign={0}'.format(string_to_sign))

        h = hmac.new(to_bytes(self.secret), to_bytes(string_to_sign), hashlib.sha1)
        return utils.b64encode_as_string(h.digest())

    def __get_string_to_sign(self, req, bucket_name, key):
        resource_string = self.__get_resource_string(req, bucket_name, key)
        headers_string = self.__get_headers_string(req)

        content_md5 = req.headers.get('content-md5', '')
        content_type = req.headers.get('content-type', '')
        date = req.headers.get('date', '')
        return '\n'.join([req.method,
                          content_md5,
                          content_type,
                          date,
                          headers_string + resource_string])

    def __get_headers_string(self, req):
        headers = req.headers
        canon_headers = []
        for k, v in headers.items():
            lower_key = k.lower()
            if lower_key.startswith('x-oss-'):
                canon_headers.append((lower_key, v))

        canon_headers.sort(key=lambda x: x[0])

        if canon_headers:
            return '\n'.join(k + ':' + v for k, v in canon_headers) + '\n'
        else:
            return ''

    def __get_resource_string(self, req, bucket_name, key):
        if not bucket_name:
            return '/'
        else:
            return '/{0}/{1}{2}'.format(bucket_name, key, self.__get_subresource_string(req.params))

    def __get_subresource_string(self, params):
        if not params:
            return ''

        subresource_params = []
        for key, value in params.items():
            if key in self._subresource_key_set:
                subresource_params.append((key, value))

        subresource_params.sort(key=lambda e: e[0])

        if subresource_params:
            return '?' + '&'.join(self.__param_to_query(k, v) for k, v in subresource_params)
        else:
            return ''

    def __param_to_query(self, k, v):
        if v:
            return k + '=' + v
        else:
            return k
    

[文档]class AnonymousAuth(object):
    """Anonymous Auth

    .. Note::
		Anonymous users can only read buckets with public-read permissions, or read from or write to buckets with public-read-write permissions. 
		They are unable to execute service or bucket related operations, such as listing objects under a bucket.
    """
    def _sign_request(self, req, bucket_name, key):
        pass

    def _sign_url(self, req, bucket_name, key, expires):
        return req.url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in req.params.items())
    
    def _sign_rtmp_url(self, url, bucket_name, channel_name, playlist_name, expires, params):
        return url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in params.items())
        

[文档]class StsAuth(object):
    """Used for STS authentication. Users can get the AccessKeyID, AccessKeySecret, and SecurityToken from the Alibaba Cloud STS service (https://aliyuncs.com).

    .. Note::
		The AccessKeyId/Secret and SecurityToken have expiration times. When they are renewed, the `STSAuth` property of class :class:`Bucket <oss2.Bucket>` instance must be updated with the new credentials.  

    :param str access_key_id: Temporary AccessKeyId
    :param str access_key_secret: Temporary AccessKeySecret
    :param str security_token: Temporary SecurityToken
    :param str auth_version: The version of the auth needs to be generated, the default value is AUTH_VERSION_1(v1). 
    """
    def __init__(self, access_key_id, access_key_secret, security_token, auth_version=AUTH_VERSION_1):
        self.__auth = make_auth(access_key_id, access_key_secret, auth_version)
        self.__security_token = security_token

    def _sign_request(self, req, bucket_name, key):
        req.headers['x-oss-security-token'] = self.__security_token
        self.__auth._sign_request(req, bucket_name, key)

    def _sign_url(self, req, bucket_name, key, expires):
        req.params['security-token'] = self.__security_token
        return self.__auth._sign_url(req, bucket_name, key, expires)

    def _sign_rtmp_url(self, url, bucket_name, channel_name, playlist_name, expires, params):
        params['security-token'] = self.__security_token
        return self.__auth._sign_rtmp_url(url, bucket_name, channel_name, playlist_name, expires, params)


def _param_to_quoted_query(k, v):
    if v:
        return urlquote(k, '') + '=' + urlquote(v, '')
    else:
        return urlquote(k, '')


[文档]def v2_uri_encode(raw_text):
    raw_text = to_bytes(raw_text)

    res = ''
    for b in raw_text:
        if isinstance(b, int):
            c = chr(b)
        else:
            c = b

        if (c >= 'A' and c <= 'Z') or (c >= 'a' and c <= 'z')\
            or (c >= '0' and c <= '9') or c in ['_', '-', '~', '.']:
            res += c
        else:
            res += "%{0:02X}".format(ord(c))

    return res


_DEFAULT_ADDITIONAL_HEADERS = set(['range',
                                   'if-modified-since'])


[文档]class AuthV2(AuthBase):
    """The signature version 2. The differences from version 1 are below :
    1. Using the SHA256 algorithm, it has higher security.
    2. Parameter calculation contains all HTTP query parameters.
    """
    def _sign_request(self, req, bucket_name, key, in_additional_headers=None):
        """Authorization is placed in the header of req

        :param req: The req whose header will be put Authorization in.
        :type req: oss2.http.Request

        :param bucket_name: Bucket name.
        :param key: The object name in OSS.
        :param in_additional_headers: The signature calculation filled with signature calculations. 
        """
        if in_additional_headers is None:
            in_additional_headers = _DEFAULT_ADDITIONAL_HEADERS

        additional_headers = self.__get_additional_headers(req, in_additional_headers)

        req.headers['date'] = utils.http_date()

        signature = self.__make_signature(req, bucket_name, key, additional_headers)

        if additional_headers:
            req.headers['authorization'] = "OSS2 AccessKeyId:{0},AdditionalHeaders:{1},Signature:{2}"\
                .format(self.id, ';'.join(additional_headers), signature)
        else:
            req.headers['authorization'] = "OSS2 AccessKeyId:{0},Signature:{1}".format(self.id, signature)

    def _sign_url(self, req, bucket_name, key, expires, in_additional_headers=None):
        """Return a signed URL.

        :param req: A request for a signature. 
        :type req: oss2.http.Request

        :param bucket_name: Bucket name.
        :param key: The object name in OSS.
        :param int expires: The returned URL will expire after `expires` seconds.
        :param in_additional_headers: The signature calculation filled with signature calculations. 

        :return: a signed URL. 
        """

        if in_additional_headers is None:
            in_additional_headers = set()

        additional_headers = self.__get_additional_headers(req, in_additional_headers)

        expiration_time = int(time.time()) + expires

        req.headers['date'] = str(expiration_time)  # re-use __make_signature by setting the 'date' header

        req.params['x-oss-signature-version'] = 'OSS2'
        req.params['x-oss-expires'] = str(expiration_time)
        req.params['x-oss-access-key-id'] = self.id

        signature = self.__make_signature(req, bucket_name, key, additional_headers)

        req.params['x-oss-signature'] = signature

        return req.url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in req.params.items())

    def __make_signature(self, req, bucket_name, key, additional_headers):
        string_to_sign = self.__get_string_to_sign(req, bucket_name, key, additional_headers)

        logging.info('string_to_sign={0}'.format(string_to_sign))

        h = hmac.new(to_bytes(self.secret), to_bytes(string_to_sign), hashlib.sha256)
        return utils.b64encode_as_string(h.digest())

    def __get_additional_headers(self, req, in_additional_headers):
        # we add a header into additional_headers only if it is already in req's headers.

        additional_headers = set(h.lower() for h in in_additional_headers)
        keys_in_header = set(k.lower() for k in req.headers.keys())

        return additional_headers & keys_in_header

    def __get_string_to_sign(self, req, bucket_name, key, additional_header_list):
        verb = req.method
        content_md5 = req.headers.get('content-md5', '')
        content_type = req.headers.get('content-type', '')
        date = req.headers.get('date', '')

        canonicalized_oss_headers = self.__get_canonicalized_oss_headers(req, additional_header_list)
        additional_headers = ';'.join(sorted(additional_header_list))
        canonicalized_resource = self.__get_resource_string(req, bucket_name, key)

        return verb + '\n' +\
            content_md5 + '\n' +\
            content_type + '\n' +\
            date + '\n' +\
            canonicalized_oss_headers +\
            additional_headers + '\n' +\
            canonicalized_resource

    def __get_resource_string(self, req, bucket_name, key):
        if bucket_name:
            encoded_uri = v2_uri_encode('/' + bucket_name + '/' + key)
        else:
            encoded_uri = v2_uri_encode('/')

        logging.info('encoded_uri={0} key={1}'.format(encoded_uri, key))

        return encoded_uri + self.__get_canonalized_query_string(req)

    def __get_canonalized_query_string(self, req):
        encoded_params = {}
        for param, value in req.params.items():
            encoded_params[v2_uri_encode(param)] = v2_uri_encode(value)

        if not encoded_params:
            return ''

        sorted_params = sorted(encoded_params.items(), key=lambda e: e[0])
        return '?' + '&'.join(self.__param_to_query(k, v) for k, v in sorted_params)

    def __param_to_query(self, k, v):
        if v:
            return k + '=' + v
        else:
            return k

    def __get_canonicalized_oss_headers(self, req, additional_headers):
        """
        :param additional_headers: Lowercase headers list, and these headers do not prefix with 'x-oss-'.
        """
        canon_headers = []

        for k, v in req.headers.items():
            lower_key = k.lower()
            if lower_key.startswith('x-oss-') or lower_key in additional_headers:
                canon_headers.append((lower_key, v))

        canon_headers.sort(key=lambda x: x[0])

        return ''.join(v[0] + ':' + v[1] + '\n' for v in canon_headers)