Implemented support for (created) and (expires) special headers (#14)

pull/21/head
Arjen Brouwer 2020-04-25 16:20:06 +02:00 committed by GitHub
parent f1ced58164
commit ed24c9a04c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 53 additions and 35 deletions

View File

@ -26,14 +26,12 @@ class Crypto:
key = self.load_pem_private_key(key, password=passphrase, backend=self.default_backend())
if self.algorithm in {"rsa-sha1", "rsa-sha256"}:
hasher = self.SHA1() if self.algorithm.endswith("sha1") else self.SHA256()
signer = key.signer(padding=self.PKCS1v15(), algorithm=hasher)
return key.sign(padding=self.PKCS1v15(), algorithm=hasher, data=string_to_sign)
elif self.algorithm in {"rsa-sha512"}:
hasher = self.SHA512()
signer = key.signer(padding=self.PKCS1v15(), algorithm=hasher)
return key.sign(padding=self.PKCS1v15(), algorithm=hasher, data=string_to_sign)
elif self.algorithm == "ecdsa-sha256":
signer = key.signer(signature_algorithm=self.ec.ECDSA(algorithm=self.SHA256()))
signer.update(string_to_sign)
return signer.finalize()
return key.sign(signature_algorithm=self.ec.ECDSA(algorithm=self.SHA256()), data=string_to_sign)
def verify(self, signature, string_to_sign, key):
if self.algorithm == "hmac-sha256":
@ -56,18 +54,21 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
"ecdsa-sha256",
}
def __init__(self, key, key_id, algorithm="hmac-sha256", headers=None, passphrase=None):
def __init__(self, key, key_id, algorithm="hmac-sha256", headers=None, passphrase=None, expires_in=None):
"""
:param typing.Union[bytes, string] passphrase: The passphrase for an encrypted RSA private key
:param datetime.timedelta expires_in: The time after which this signature should expire
"""
assert algorithm in self.known_algorithms
self.key = key
self.key_id = key_id
self.algorithm = algorithm
self.headers = [h.lower() for h in headers] if headers is not None else ["date"]
self.passphrase = passphrase if passphrase is None or isinstance(passphrase, bytes) else passphrase.encode()
self.expires_in = expires_in
def add_date(self, request, timestamp=None):
def add_date(self, request, timestamp):
if "Date" not in request.headers:
if timestamp is None:
timestamp = time.time()
request.headers["Date"] = email.utils.formatdate(timestamp, usegmt=True)
def add_digest(self, request):
@ -80,32 +81,56 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
request.headers["Digest"] = "SHA-256=" + base64.b64encode(digest).decode()
@classmethod
def get_string_to_sign(self, request, headers):
def get_string_to_sign(self, request, headers, created_timestamp, expires_timestamp):
sts = []
for header in headers:
if header == "(request-target)":
path_url = requests.models.RequestEncodingMixin.path_url.fget(request)
sts.append("(request-target): {} {}".format(request.method.lower(), path_url))
sts.append("{}: {} {}".format(header, request.method.lower(), path_url))
elif header == "(created)":
sts.append("{}: {}".format(header, created_timestamp))
elif header == "(expires)":
assert (expires_timestamp is not None), \
'You should provide the "expires_in" argument when using the (expires) header'
sts.append("{}: {}".format(header, int(expires_timestamp)))
else:
if header.lower() == "host":
value = request.headers.get("host", urlparse(request.url).hostname)
url = urlparse(request.url)
value = request.headers.get("host", url.hostname)
if url.scheme == "http" and url.port not in [None, 80] or url.scheme == "https" \
and url.port not in [443, None]:
value = "{}:{}".format(value, url.port)
else:
value = request.headers[header]
sts.append("{k}: {v}".format(k=header.lower(), v=value))
return "\n".join(sts).encode()
def __call__(self, request):
self.add_date(request)
def create_signature_string(self, request):
created_timestamp = int(time.time())
expires_timestamp = None
if self.expires_in is not None:
expires_timestamp = created_timestamp + self.expires_in.total_seconds()
self.add_date(request, created_timestamp)
self.add_digest(request)
raw_sig = Crypto(self.algorithm).sign(string_to_sign=self.get_string_to_sign(request, self.headers),
key=self.key,
passphrase=self.passphrase)
raw_sig = Crypto(self.algorithm).sign(
string_to_sign=self.get_string_to_sign(request, self.headers, created_timestamp, expires_timestamp),
key=self.key,
passphrase=self.passphrase,
)
sig = base64.b64encode(raw_sig).decode()
sig_struct = [("keyId", self.key_id),
("algorithm", self.algorithm),
("headers", " ".join(self.headers)),
("signature", sig)]
request.headers["Authorization"] = "Signature " + ",".join('{}="{}"'.format(k, v) for k, v in sig_struct)
sig_struct = [
("keyId", self.key_id),
("algorithm", self.algorithm),
("headers", " ".join(self.headers)),
("signature", sig),
("created", int(created_timestamp)),
]
if expires_timestamp is not None:
sig_struct.append(("expires", int(expires_timestamp)))
return ",".join('{}="{}"'.format(k, v) for k, v in sig_struct)
def __call__(self, request):
request.headers["Authorization"] = "Signature " + self.create_signature_string(request)
return request
@classmethod
@ -130,29 +155,22 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
for field in "keyId", "algorithm", "signature":
assert field in sig_struct, 'Required signature parameter "{}" not found'.format(field)
assert sig_struct["algorithm"] in self.known_algorithms, "Unknown signature algorithm"
created_timestamp = int(sig_struct['created'])
expires_timestamp = sig_struct.get('expires')
if expires_timestamp is not None:
expires_timestamp = int(expires_timestamp)
headers = sig_struct.get("headers", "date").split(" ")
sig = base64.b64decode(sig_struct["signature"])
sts = self.get_string_to_sign(request, headers)
sts = self.get_string_to_sign(request, headers, created_timestamp, expires_timestamp=expires_timestamp)
key = key_resolver(key_id=sig_struct["keyId"], algorithm=sig_struct["algorithm"])
Crypto(sig_struct["algorithm"]).verify(sig, sts, key)
class HTTPSignatureHeaderAuth(HTTPSignatureAuth):
"""
https://tools.ietf.org/html/draft-cavage-http-signatures-08#section-4
Using "Signature" header instead of "Authorization" header.
"""
def __call__(self, request):
self.add_date(request)
self.add_digest(request)
raw_sig = Crypto(self.algorithm).sign(string_to_sign=self.get_string_to_sign(request, self.headers),
key=self.key,
passphrase=self.passphrase)
sig = base64.b64encode(raw_sig).decode()
sig_struct = [("keyId", self.key_id),
("algorithm", self.algorithm),
("headers", " ".join(self.headers)),
("signature", sig)]
request.headers["Signature"] = ",".join('{}="{}"'.format(k, v) for k, v in sig_struct)
request.headers["Signature"] = self.create_signature_string(request)
return request