class KsAuth( ):

    def __init__(self, kwargs, get=True, region=None):
        assert kwargs and kwargs[\'Action\']

        self.url = cloud_account.KS_EPC_URL
        self.region = region if region else cloud_account.KS_EPC_REGION
        self.kwargs = kwargs
        self.method = \'POST\'
        self.body = {}
        self.algorithm = \'AWS4-HMAC-SHA256\'
        self.signed_headers = \'host;x-amz-date\'
        if get:
            self.body = \"\"
            self.method = \'GET\'
            self.__get_init()
        else:
            self.__post_init()

    def __get_init(self):
        qstring = \"Action={}&Version=2015-11-01\".format(self.kwargs[\"Action\"])
        del self.kwargs[\'Action\']
        for k, v in self.kwargs.items():
            qstring += \"&{}={}\".format(k, v)
        qstring = self._sort(qstring)
        self.body = qstring
        self.url += \"/?\" + self.body
        self.format_time()

    def _sort(self, qstring):
        d = {}
        for i in qstring.split(\'&\'):
            k, v = i.split(\'=\')
            d[k] = v
        iterable =  sorted(d.items(), key=lambda data: data[0])
        return \'\'.join(map(lambda x: x[0]+\'=\'+x[1]+\'&\', iterable)).rstrip(\'&\')

    def format_time(self):
        t = datetime.utcnow()
        amzdate = t.strftime(\'%Y%m%dT%H%M%SZ\')
        datestamp = t.strftime(\'%Y%m%d\')
        if not hasattr(self, \'delay_time\') and not hasattr(self, \'vague_time\'):
            setattr(self, \'vague_time\', datestamp)
            setattr(self, \'delay_time\', amzdate)

    def canonical_headers(self):
        head = \'host:\' + cloud_account.KS_EPC_HOST + \'\\n\' + \'x-amz-date:\' + self.delay_time + \'\\n\'
        return head

    def auth(self):
        signature = self.get_signature()
        _, credential_scope = self.hash_salt()
        authorization_header = self.algorithm + \' \' + \'Credential=\' + cloud_account.KS_EPC_ACCESS_KEY + \'/\' + credential_scope + \', \' + \'SignedHeaders=\' + self.signed_headers + \', \' + \'Signature=\' + signature
        headers = {\'x-amz-date\': self.delay_time, \'Authorization\': authorization_header}
        return headers

    def hash_salt(self):
        credential_scope = self.vague_time + \'/\' + self.region + \'/\' + cloud_account.KS_EPC_SERVICE + \'/\' + \'aws4_request\'
        canonical_uri = \"/\"
        headers = self.canonical_headers()
        signing_key = self.getSignatureKey()
        payload_hash = hashlib.sha256((\'\').encode(\'utf-8\')).hexdigest()
        canonical_request = self.method + \'\\n\' + canonical_uri + \'\\n\' + self.body + \'\\n\' + headers + \'\\n\' + self.signed_headers + \'\\n\' + payload_hash
        string_to_sign = self.algorithm + \'\\n\' + self.delay_time + \'\\n\' + credential_scope + \'\\n\' + hashlib.sha256(
            canonical_request.encode(\'utf-8\')).hexdigest()
        return string_to_sign, credential_scope

    def get_signature(self):
        signing_key = self.getSignatureKey()
        string_to_sign = self.hash_salt()[0]
        signature = hmac.new(signing_key, (string_to_sign).encode(\'utf-8\'), hashlib.sha256).hexdigest()
        return signature

    def sign(self, key, msg):
        return hmac.new(key, msg.encode(\'utf-8\'), hashlib.sha256).digest()

    def getSignatureKey(self):
        kDate = self.sign((\'AWS4\' + cloud_account.KS_EPC_SECURITY_KEY).encode(\'utf-8\'), self.vague_time)
        kRegion = self.sign(kDate, self.region)
        kService = self.sign(kRegion, cloud_account.KS_EPC_SERVICE)
        kSigning = self.sign(kService, \'aws4_request\')
        return kSigning

    def __post_init(self):
        pass

    def post(self):
        html = requests.get(self.url, headers=self.auth())
        print(html.text)
     
if __name__ == \'__main__\':
    s =  KsAuth({\'Action\': \"Describe\"})
    header = s.auth()
    html = requests.get(s.url, headers=header)
    print(html.text) 
收藏 打印