Merge branch 'master' of github.com:kislyuk/requests-http-signature

pull/21/head
Andrey Kislyuk 2020-04-25 14:03:39 -07:00
commit c0c8fd7255
No known key found for this signature in database
GPG Key ID: 8AFAFCD242818A52
2 changed files with 54 additions and 36 deletions

View File

@ -30,7 +30,7 @@ Usage
By default, only the ``Date`` header is signed (as per the RFC) for body-less requests such as GET. The ``Date`` header
is set if it is absent. In addition, for requests with bodies (such as POST), the ``Digest`` header is set to the SHA256
of the request body and signed (an example of this appears in the RFC). To add other headers to the signature, pass an
array of header names in the ``header`` keyword argument.
array of header names in the ``headers`` keyword argument.
In addition to signing messages in the client, the class method ``HTTPSignatureAuth.verify()`` can be used to verify
incoming requests:

View File

@ -26,14 +26,12 @@ class Crypto:
key = self.load_pem_private_key(key, password=passphrase, backend=self.default_backend())
if self.algorithm in {"rsa-sha1", "rsa-sha256"}:
hasher = self.SHA1() if self.algorithm.endswith("sha1") else self.SHA256()
signer = key.signer(padding=self.PKCS1v15(), algorithm=hasher)
return key.sign(padding=self.PKCS1v15(), algorithm=hasher, data=string_to_sign)
elif self.algorithm in {"rsa-sha512"}:
hasher = self.SHA512()
signer = key.signer(padding=self.PKCS1v15(), algorithm=hasher)
return key.sign(padding=self.PKCS1v15(), algorithm=hasher, data=string_to_sign)
elif self.algorithm == "ecdsa-sha256":
signer = key.signer(signature_algorithm=self.ec.ECDSA(algorithm=self.SHA256()))
signer.update(string_to_sign)
return signer.finalize()
return key.sign(signature_algorithm=self.ec.ECDSA(algorithm=self.SHA256()), data=string_to_sign)
def verify(self, signature, string_to_sign, key):
if self.algorithm == "hmac-sha256":
@ -56,18 +54,21 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
"ecdsa-sha256",
}
def __init__(self, key, key_id, algorithm="hmac-sha256", headers=None, passphrase=None):
def __init__(self, key, key_id, algorithm="hmac-sha256", headers=None, passphrase=None, expires_in=None):
"""
:param typing.Union[bytes, string] passphrase: The passphrase for an encrypted RSA private key
:param datetime.timedelta expires_in: The time after which this signature should expire
"""
assert algorithm in self.known_algorithms
self.key = key
self.key_id = key_id
self.algorithm = algorithm
self.headers = [h.lower() for h in headers] if headers is not None else ["date"]
self.passphrase = passphrase if passphrase is None or isinstance(passphrase, bytes) else passphrase.encode()
self.expires_in = expires_in
def add_date(self, request, timestamp=None):
def add_date(self, request, timestamp):
if "Date" not in request.headers:
if timestamp is None:
timestamp = time.time()
request.headers["Date"] = email.utils.formatdate(timestamp, usegmt=True)
def add_digest(self, request):
@ -80,32 +81,56 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
request.headers["Digest"] = "SHA-256=" + base64.b64encode(digest).decode()
@classmethod
def get_string_to_sign(self, request, headers):
def get_string_to_sign(self, request, headers, created_timestamp, expires_timestamp):
sts = []
for header in headers:
if header == "(request-target)":
path_url = requests.models.RequestEncodingMixin.path_url.fget(request)
sts.append("(request-target): {} {}".format(request.method.lower(), path_url))
sts.append("{}: {} {}".format(header, request.method.lower(), path_url))
elif header == "(created)":
sts.append("{}: {}".format(header, created_timestamp))
elif header == "(expires)":
assert (expires_timestamp is not None), \
'You should provide the "expires_in" argument when using the (expires) header'
sts.append("{}: {}".format(header, int(expires_timestamp)))
else:
if header.lower() == "host":
value = request.headers.get("host", urlparse(request.url).hostname)
url = urlparse(request.url)
value = request.headers.get("host", url.hostname)
if url.scheme == "http" and url.port not in [None, 80] or url.scheme == "https" \
and url.port not in [443, None]:
value = "{}:{}".format(value, url.port)
else:
value = request.headers[header]
sts.append("{k}: {v}".format(k=header.lower(), v=value))
return "\n".join(sts).encode()
def __call__(self, request):
self.add_date(request)
def create_signature_string(self, request):
created_timestamp = int(time.time())
expires_timestamp = None
if self.expires_in is not None:
expires_timestamp = created_timestamp + self.expires_in.total_seconds()
self.add_date(request, created_timestamp)
self.add_digest(request)
raw_sig = Crypto(self.algorithm).sign(string_to_sign=self.get_string_to_sign(request, self.headers),
key=self.key.encode() if isinstance(self.key, str) else self.key,
passphrase=self.passphrase)
raw_sig = Crypto(self.algorithm).sign(
string_to_sign=self.get_string_to_sign(request, self.headers, created_timestamp, expires_timestamp),
key=self.key.encode() if isinstance(self.key, str) else self.key,
passphrase=self.passphrase,
)
sig = base64.b64encode(raw_sig).decode()
sig_struct = [("keyId", self.key_id),
("algorithm", self.algorithm),
("headers", " ".join(self.headers)),
("signature", sig)]
request.headers["Authorization"] = "Signature " + ",".join('{}="{}"'.format(k, v) for k, v in sig_struct)
sig_struct = [
("keyId", self.key_id),
("algorithm", self.algorithm),
("headers", " ".join(self.headers)),
("signature", sig),
("created", int(created_timestamp)),
]
if expires_timestamp is not None:
sig_struct.append(("expires", int(expires_timestamp)))
return ",".join('{}="{}"'.format(k, v) for k, v in sig_struct)
def __call__(self, request):
request.headers["Authorization"] = "Signature " + self.create_signature_string(request)
return request
@classmethod
@ -130,29 +155,22 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
for field in "keyId", "algorithm", "signature":
assert field in sig_struct, 'Required signature parameter "{}" not found'.format(field)
assert sig_struct["algorithm"] in self.known_algorithms, "Unknown signature algorithm"
created_timestamp = int(sig_struct['created'])
expires_timestamp = sig_struct.get('expires')
if expires_timestamp is not None:
expires_timestamp = int(expires_timestamp)
headers = sig_struct.get("headers", "date").split(" ")
sig = base64.b64decode(sig_struct["signature"])
sts = self.get_string_to_sign(request, headers)
sts = self.get_string_to_sign(request, headers, created_timestamp, expires_timestamp=expires_timestamp)
key = key_resolver(key_id=sig_struct["keyId"], algorithm=sig_struct["algorithm"])
Crypto(sig_struct["algorithm"]).verify(sig, sts, key)
class HTTPSignatureHeaderAuth(HTTPSignatureAuth):
"""
https://tools.ietf.org/html/draft-cavage-http-signatures-08#section-4
Using "Signature" header instead of "Authorization" header.
"""
def __call__(self, request):
self.add_date(request)
self.add_digest(request)
raw_sig = Crypto(self.algorithm).sign(string_to_sign=self.get_string_to_sign(request, self.headers),
key=self.key,
passphrase=self.passphrase)
sig = base64.b64encode(raw_sig).decode()
sig_struct = [("keyId", self.key_id),
("algorithm", self.algorithm),
("headers", " ".join(self.headers)),
("signature", sig)]
request.headers["Signature"] = ",".join('{}="{}"'.format(k, v) for k, v in sig_struct)
request.headers["Signature"] = self.create_signature_string(request)
return request