class KsAuth( ):
def __init__(self, kwargs, get=True, region=None):
assert kwargs and kwargs[\'Action\']
self.url = cloud_account.KS_EPC_URL
self.region = region if region else cloud_account.KS_EPC_REGION
self.kwargs = kwargs
self.method = \'POST\'
self.body = {}
self.algorithm = \'AWS4-HMAC-SHA256\'
self.signed_headers = \'host;x-amz-date\'
if get:
self.body = \"\"
self.method = \'GET\'
self.__get_init()
else:
self.__post_init()
def __get_init(self):
qstring = \"Action={}&Version=2015-11-01\".format(self.kwargs[\"Action\"])
del self.kwargs[\'Action\']
for k, v in self.kwargs.items():
qstring += \"&{}={}\".format(k, v)
qstring = self._sort(qstring)
self.body = qstring
self.url += \"/?\" + self.body
self.format_time()
def _sort(self, qstring):
d = {}
for i in qstring.split(\'&\'):
k, v = i.split(\'=\')
d[k] = v
iterable = sorted(d.items(), key=lambda data: data[0])
return \'\'.join(map(lambda x: x[0]+\'=\'+x[1]+\'&\', iterable)).rstrip(\'&\')
def format_time(self):
t = datetime.utcnow()
amzdate = t.strftime(\'%Y%m%dT%H%M%SZ\')
datestamp = t.strftime(\'%Y%m%d\')
if not hasattr(self, \'delay_time\') and not hasattr(self, \'vague_time\'):
setattr(self, \'vague_time\', datestamp)
setattr(self, \'delay_time\', amzdate)
def canonical_headers(self):
head = \'host:\' + cloud_account.KS_EPC_HOST + \'\\n\' + \'x-amz-date:\' + self.delay_time + \'\\n\'
return head
def auth(self):
signature = self.get_signature()
_, credential_scope = self.hash_salt()
authorization_header = self.algorithm + \' \' + \'Credential=\' + cloud_account.KS_EPC_ACCESS_KEY + \'/\' + credential_scope + \', \' + \'SignedHeaders=\' + self.signed_headers + \', \' + \'Signature=\' + signature
headers = {\'x-amz-date\': self.delay_time, \'Authorization\': authorization_header}
return headers
def hash_salt(self):
credential_scope = self.vague_time + \'/\' + self.region + \'/\' + cloud_account.KS_EPC_SERVICE + \'/\' + \'aws4_request\'
canonical_uri = \"/\"
headers = self.canonical_headers()
signing_key = self.getSignatureKey()
payload_hash = hashlib.sha256((\'\').encode(\'utf-8\')).hexdigest()
canonical_request = self.method + \'\\n\' + canonical_uri + \'\\n\' + self.body + \'\\n\' + headers + \'\\n\' + self.signed_headers + \'\\n\' + payload_hash
string_to_sign = self.algorithm + \'\\n\' + self.delay_time + \'\\n\' + credential_scope + \'\\n\' + hashlib.sha256(
canonical_request.encode(\'utf-8\')).hexdigest()
return string_to_sign, credential_scope
def get_signature(self):
signing_key = self.getSignatureKey()
string_to_sign = self.hash_salt()[0]
signature = hmac.new(signing_key, (string_to_sign).encode(\'utf-8\'), hashlib.sha256).hexdigest()
return signature
def sign(self, key, msg):
return hmac.new(key, msg.encode(\'utf-8\'), hashlib.sha256).digest()
def getSignatureKey(self):
kDate = self.sign((\'AWS4\' + cloud_account.KS_EPC_SECURITY_KEY).encode(\'utf-8\'), self.vague_time)
kRegion = self.sign(kDate, self.region)
kService = self.sign(kRegion, cloud_account.KS_EPC_SERVICE)
kSigning = self.sign(kService, \'aws4_request\')
return kSigning
def __post_init(self):
pass
def post(self):
html = requests.get(self.url, headers=self.auth())
print(html.text)
if __name__ == \'__main__\':
s = KsAuth({\'Action\': \"Describe\"})
header = s.auth()
html = requests.get(s.url, headers=header)
print(html.text)