From 15397ba7120fbb48df2afc63f5abec1e6d46b14f Mon Sep 17 00:00:00 2001 From: Arjen Brouwer Date: Mon, 20 Jan 2020 21:26:05 +0100 Subject: [PATCH] Added support for (created) and (expires) special headers --- requests_http_signature/__init__.py | 75 +++++++++++++++++------------ 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/requests_http_signature/__init__.py b/requests_http_signature/__init__.py index dde3d7d..e9f7e7d 100644 --- a/requests_http_signature/__init__.py +++ b/requests_http_signature/__init__.py @@ -52,18 +52,21 @@ class HTTPSignatureAuth(requests.auth.AuthBase): "ecdsa-sha256", } - def __init__(self, key, key_id, algorithm="hmac-sha256", headers=None, passphrase=None): + def __init__(self, key, key_id, algorithm="hmac-sha256", headers=None, passphrase=None, expires_in=None): + """ + :param typing.Union[bytes, string] passphrase: The passphrase for an encrypted RSA private key + :param datetime.timedelta expires_in: The time after which this signature should expire + """ assert algorithm in self.known_algorithms self.key = key self.key_id = key_id self.algorithm = algorithm self.headers = [h.lower() for h in headers] if headers is not None else ["date"] self.passphrase = passphrase if passphrase is None or isinstance(passphrase, bytes) else passphrase.encode() + self.expires_in = expires_in - def add_date(self, request, timestamp=None): + def add_date(self, request, timestamp): if "Date" not in request.headers: - if timestamp is None: - timestamp = time.time() request.headers["Date"] = email.utils.formatdate(timestamp, usegmt=True) def add_digest(self, request): @@ -74,12 +77,18 @@ class HTTPSignatureAuth(requests.auth.AuthBase): request.headers["Digest"] = "SHA-256=" + base64.b64encode(digest).decode() @classmethod - def get_string_to_sign(self, request, headers): + def get_string_to_sign(self, request, headers, created_timestamp, expires_timestamp): sts = [] for header in headers: if header == "(request-target)": path_url = requests.models.RequestEncodingMixin.path_url.fget(request) - sts.append("(request-target): {} {}".format(request.method.lower(), path_url)) + sts.append("{}: {} {}".format(header, request.method.lower(), path_url)) + elif header == "(created)": + sts.append("{}: {}".format(header, created_timestamp)) + elif header == "(expires)": + assert (expires_timestamp is not None), \ + 'You should provide the "expires_in" argument when using the (expires) header' + sts.append("{}: {}".format(header, int(expires_timestamp))) else: if header.lower() == "host": url = urlparse(request.url) @@ -91,24 +100,37 @@ class HTTPSignatureAuth(requests.auth.AuthBase): and url.port not in [443, None] ): value = "{}:{}".format(value, url.port) - else: value = request.headers[header] sts.append("{k}: {v}".format(k=header.lower(), v=value)) return "\n".join(sts).encode() - def __call__(self, request): - self.add_date(request) + def create_signature_string(self, request): + created_timestamp = int(time.time()) + expires_timestamp = None + if self.expires_in is not None: + expires_timestamp = created_timestamp + self.expires_in.total_seconds() + self.add_date(request, created_timestamp) self.add_digest(request) - raw_sig = Crypto(self.algorithm).sign(string_to_sign=self.get_string_to_sign(request, self.headers), - key=self.key, - passphrase=self.passphrase) + raw_sig = Crypto(self.algorithm).sign( + string_to_sign=self.get_string_to_sign(request, self.headers, created_timestamp, expires_timestamp), + key=self.key, + passphrase=self.passphrase, + ) sig = base64.b64encode(raw_sig).decode() - sig_struct = [("keyId", self.key_id), - ("algorithm", self.algorithm), - ("headers", " ".join(self.headers)), - ("signature", sig)] - request.headers["Authorization"] = "Signature " + ",".join('{}="{}"'.format(k, v) for k, v in sig_struct) + sig_struct = [ + ("keyId", self.key_id), + ("algorithm", self.algorithm), + ("headers", " ".join(self.headers)), + ("signature", sig), + ("created", int(created_timestamp)), + ] + if expires_timestamp is not None: + sig_struct.append(("expires", int(expires_timestamp))) + return ",".join('{}="{}"'.format(k, v) for k, v in sig_struct) + + def __call__(self, request): + request.headers["Authorization"] = "Signature " + self.create_signature_string(request) return request @classmethod @@ -125,29 +147,22 @@ class HTTPSignatureAuth(requests.auth.AuthBase): for field in "keyId", "algorithm", "signature": assert field in sig_struct, 'Required signature parameter "{}" not found'.format(field) assert sig_struct["algorithm"] in self.known_algorithms, "Unknown signature algorithm" + created_timestamp = int(sig_struct['created']) + expires_timestamp = sig_struct.get('expires') + if expires_timestamp is not None: + expires_timestamp = int(expires_timestamp) headers = sig_struct.get("headers", "date").split(" ") sig = base64.b64decode(sig_struct["signature"]) - sts = self.get_string_to_sign(request, headers) + sts = self.get_string_to_sign(request, headers, created_timestamp, expires_timestamp=expires_timestamp) key = key_resolver(key_id=sig_struct["keyId"], algorithm=sig_struct["algorithm"]) Crypto(sig_struct["algorithm"]).verify(sig, sts, key) class HTTPSignatureHeaderAuth(HTTPSignatureAuth): """ https://tools.ietf.org/html/draft-cavage-http-signatures-08#section-4 - Using "Signature" header instead of "Authorization" header. """ def __call__(self, request): - self.add_date(request) - self.add_digest(request) - raw_sig = Crypto(self.algorithm).sign(string_to_sign=self.get_string_to_sign(request, self.headers), - key=self.key, - passphrase=self.passphrase) - sig = base64.b64encode(raw_sig).decode() - sig_struct = [("keyId", self.key_id), - ("algorithm", self.algorithm), - ("headers", " ".join(self.headers)), - ("signature", sig)] - request.headers["Signature"] = ",".join('{}="{}"'.format(k, v) for k, v in sig_struct) + request.headers["Signature"] = self.create_signature_string(request) return request