Implemented alternate (SHA-1, SHA-256, SHA-256) digest algorithms/content and use proper token values.

pull/29/head
Jim Duchek 2022-01-19 19:54:45 -08:00
parent 8d615eac2a
commit 3b0266df23
1 changed files with 15 additions and 4 deletions

View File

@ -45,7 +45,6 @@ class Crypto:
key.verify(signature, string_to_sign, self.PKCS1v15(), hasher)
class HTTPSignatureAuth(requests.auth.AuthBase):
hasher_constructor = hashlib.sha256
known_algorithms = {
"rsa-sha1",
"rsa-sha256",
@ -53,23 +52,34 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
"hmac-sha256",
"ecdsa-sha256",
}
known_digests = {
"sha256": "sha-256",
"sha512": "sha-512",
"sha1": "sha"
}
def __init__(self, key, key_id, algorithm="hmac-sha256", headers=None, passphrase=None, expires_in=None):
def __init__(self, key, key_id, algorithm="hmac-sha256", headers=None, passphrase=None, expires_in=None, digest="sha256"):
"""
:param typing.Union[bytes, string] passphrase: The passphrase for an encrypted RSA private key
:param datetime.timedelta expires_in: The time after which this signature should expire
"""
assert algorithm in self.known_algorithms
assert digest in self.known_digests
self.key = key
self.key_id = key_id
self.algorithm = algorithm
self.headers = [h.lower() for h in headers] if headers is not None else ["date"]
self.passphrase = passphrase if passphrase is None or isinstance(passphrase, bytes) else passphrase.encode()
self.expires_in = expires_in
self.digest_alg = digest
def add_date(self, request, timestamp):
if "Date" not in request.headers:
request.headers["Date"] = email.utils.formatdate(timestamp, usegmt=True)
def digest_content(self, request):
return request.body
def add_digest(self, request):
if request.body is None and "digest" in self.headers:
@ -77,8 +87,9 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
if request.body is not None and "Digest" not in request.headers:
if "digest" not in self.headers:
self.headers.append("digest")
digest = self.hasher_constructor(request.body).digest()
request.headers["Digest"] = "SHA-256=" + base64.b64encode(digest).decode()
hash = hashlib.new(self.digest_alg)
hash.update(self.digest_content(request))
request.headers["Digest"] = self.known_digests[self.digest_alg] + "=" + base64.b64encode(hash.digest()).decode()
@classmethod
def get_string_to_sign(self, request, headers, created_timestamp, expires_timestamp):