Added support for (created) and (expires) special headers
parent
374529ec7c
commit
15397ba712
|
@ -52,18 +52,21 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
|
||||||
"ecdsa-sha256",
|
"ecdsa-sha256",
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, key, key_id, algorithm="hmac-sha256", headers=None, passphrase=None):
|
def __init__(self, key, key_id, algorithm="hmac-sha256", headers=None, passphrase=None, expires_in=None):
|
||||||
|
"""
|
||||||
|
:param typing.Union[bytes, string] passphrase: The passphrase for an encrypted RSA private key
|
||||||
|
:param datetime.timedelta expires_in: The time after which this signature should expire
|
||||||
|
"""
|
||||||
assert algorithm in self.known_algorithms
|
assert algorithm in self.known_algorithms
|
||||||
self.key = key
|
self.key = key
|
||||||
self.key_id = key_id
|
self.key_id = key_id
|
||||||
self.algorithm = algorithm
|
self.algorithm = algorithm
|
||||||
self.headers = [h.lower() for h in headers] if headers is not None else ["date"]
|
self.headers = [h.lower() for h in headers] if headers is not None else ["date"]
|
||||||
self.passphrase = passphrase if passphrase is None or isinstance(passphrase, bytes) else passphrase.encode()
|
self.passphrase = passphrase if passphrase is None or isinstance(passphrase, bytes) else passphrase.encode()
|
||||||
|
self.expires_in = expires_in
|
||||||
|
|
||||||
def add_date(self, request, timestamp=None):
|
def add_date(self, request, timestamp):
|
||||||
if "Date" not in request.headers:
|
if "Date" not in request.headers:
|
||||||
if timestamp is None:
|
|
||||||
timestamp = time.time()
|
|
||||||
request.headers["Date"] = email.utils.formatdate(timestamp, usegmt=True)
|
request.headers["Date"] = email.utils.formatdate(timestamp, usegmt=True)
|
||||||
|
|
||||||
def add_digest(self, request):
|
def add_digest(self, request):
|
||||||
|
@ -74,12 +77,18 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
|
||||||
request.headers["Digest"] = "SHA-256=" + base64.b64encode(digest).decode()
|
request.headers["Digest"] = "SHA-256=" + base64.b64encode(digest).decode()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_string_to_sign(self, request, headers):
|
def get_string_to_sign(self, request, headers, created_timestamp, expires_timestamp):
|
||||||
sts = []
|
sts = []
|
||||||
for header in headers:
|
for header in headers:
|
||||||
if header == "(request-target)":
|
if header == "(request-target)":
|
||||||
path_url = requests.models.RequestEncodingMixin.path_url.fget(request)
|
path_url = requests.models.RequestEncodingMixin.path_url.fget(request)
|
||||||
sts.append("(request-target): {} {}".format(request.method.lower(), path_url))
|
sts.append("{}: {} {}".format(header, request.method.lower(), path_url))
|
||||||
|
elif header == "(created)":
|
||||||
|
sts.append("{}: {}".format(header, created_timestamp))
|
||||||
|
elif header == "(expires)":
|
||||||
|
assert (expires_timestamp is not None), \
|
||||||
|
'You should provide the "expires_in" argument when using the (expires) header'
|
||||||
|
sts.append("{}: {}".format(header, int(expires_timestamp)))
|
||||||
else:
|
else:
|
||||||
if header.lower() == "host":
|
if header.lower() == "host":
|
||||||
url = urlparse(request.url)
|
url = urlparse(request.url)
|
||||||
|
@ -91,24 +100,37 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
|
||||||
and url.port not in [443, None]
|
and url.port not in [443, None]
|
||||||
):
|
):
|
||||||
value = "{}:{}".format(value, url.port)
|
value = "{}:{}".format(value, url.port)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
value = request.headers[header]
|
value = request.headers[header]
|
||||||
sts.append("{k}: {v}".format(k=header.lower(), v=value))
|
sts.append("{k}: {v}".format(k=header.lower(), v=value))
|
||||||
return "\n".join(sts).encode()
|
return "\n".join(sts).encode()
|
||||||
|
|
||||||
def __call__(self, request):
|
def create_signature_string(self, request):
|
||||||
self.add_date(request)
|
created_timestamp = int(time.time())
|
||||||
|
expires_timestamp = None
|
||||||
|
if self.expires_in is not None:
|
||||||
|
expires_timestamp = created_timestamp + self.expires_in.total_seconds()
|
||||||
|
self.add_date(request, created_timestamp)
|
||||||
self.add_digest(request)
|
self.add_digest(request)
|
||||||
raw_sig = Crypto(self.algorithm).sign(string_to_sign=self.get_string_to_sign(request, self.headers),
|
raw_sig = Crypto(self.algorithm).sign(
|
||||||
key=self.key,
|
string_to_sign=self.get_string_to_sign(request, self.headers, created_timestamp, expires_timestamp),
|
||||||
passphrase=self.passphrase)
|
key=self.key,
|
||||||
|
passphrase=self.passphrase,
|
||||||
|
)
|
||||||
sig = base64.b64encode(raw_sig).decode()
|
sig = base64.b64encode(raw_sig).decode()
|
||||||
sig_struct = [("keyId", self.key_id),
|
sig_struct = [
|
||||||
("algorithm", self.algorithm),
|
("keyId", self.key_id),
|
||||||
("headers", " ".join(self.headers)),
|
("algorithm", self.algorithm),
|
||||||
("signature", sig)]
|
("headers", " ".join(self.headers)),
|
||||||
request.headers["Authorization"] = "Signature " + ",".join('{}="{}"'.format(k, v) for k, v in sig_struct)
|
("signature", sig),
|
||||||
|
("created", int(created_timestamp)),
|
||||||
|
]
|
||||||
|
if expires_timestamp is not None:
|
||||||
|
sig_struct.append(("expires", int(expires_timestamp)))
|
||||||
|
return ",".join('{}="{}"'.format(k, v) for k, v in sig_struct)
|
||||||
|
|
||||||
|
def __call__(self, request):
|
||||||
|
request.headers["Authorization"] = "Signature " + self.create_signature_string(request)
|
||||||
return request
|
return request
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -125,29 +147,22 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
|
||||||
for field in "keyId", "algorithm", "signature":
|
for field in "keyId", "algorithm", "signature":
|
||||||
assert field in sig_struct, 'Required signature parameter "{}" not found'.format(field)
|
assert field in sig_struct, 'Required signature parameter "{}" not found'.format(field)
|
||||||
assert sig_struct["algorithm"] in self.known_algorithms, "Unknown signature algorithm"
|
assert sig_struct["algorithm"] in self.known_algorithms, "Unknown signature algorithm"
|
||||||
|
created_timestamp = int(sig_struct['created'])
|
||||||
|
expires_timestamp = sig_struct.get('expires')
|
||||||
|
if expires_timestamp is not None:
|
||||||
|
expires_timestamp = int(expires_timestamp)
|
||||||
headers = sig_struct.get("headers", "date").split(" ")
|
headers = sig_struct.get("headers", "date").split(" ")
|
||||||
sig = base64.b64decode(sig_struct["signature"])
|
sig = base64.b64decode(sig_struct["signature"])
|
||||||
sts = self.get_string_to_sign(request, headers)
|
sts = self.get_string_to_sign(request, headers, created_timestamp, expires_timestamp=expires_timestamp)
|
||||||
key = key_resolver(key_id=sig_struct["keyId"], algorithm=sig_struct["algorithm"])
|
key = key_resolver(key_id=sig_struct["keyId"], algorithm=sig_struct["algorithm"])
|
||||||
Crypto(sig_struct["algorithm"]).verify(sig, sts, key)
|
Crypto(sig_struct["algorithm"]).verify(sig, sts, key)
|
||||||
|
|
||||||
class HTTPSignatureHeaderAuth(HTTPSignatureAuth):
|
class HTTPSignatureHeaderAuth(HTTPSignatureAuth):
|
||||||
"""
|
"""
|
||||||
https://tools.ietf.org/html/draft-cavage-http-signatures-08#section-4
|
https://tools.ietf.org/html/draft-cavage-http-signatures-08#section-4
|
||||||
|
|
||||||
Using "Signature" header instead of "Authorization" header.
|
Using "Signature" header instead of "Authorization" header.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __call__(self, request):
|
def __call__(self, request):
|
||||||
self.add_date(request)
|
request.headers["Signature"] = self.create_signature_string(request)
|
||||||
self.add_digest(request)
|
|
||||||
raw_sig = Crypto(self.algorithm).sign(string_to_sign=self.get_string_to_sign(request, self.headers),
|
|
||||||
key=self.key,
|
|
||||||
passphrase=self.passphrase)
|
|
||||||
sig = base64.b64encode(raw_sig).decode()
|
|
||||||
sig_struct = [("keyId", self.key_id),
|
|
||||||
("algorithm", self.algorithm),
|
|
||||||
("headers", " ".join(self.headers)),
|
|
||||||
("signature", sig)]
|
|
||||||
request.headers["Signature"] = ",".join('{}="{}"'.format(k, v) for k, v in sig_struct)
|
|
||||||
return request
|
return request
|
||||||
|
|
Loading…
Reference in New Issue