Added use_auth_header option for switching between Authorization and Signature headers

pull/4/head
Eliot Berriot 2018-03-30 21:44:08 +02:00
parent 7f8f69116d
commit 7856f6f671
No known key found for this signature in database
GPG Key ID: DD6965E2476E5C27
2 changed files with 37 additions and 9 deletions

View File

@ -48,11 +48,15 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
"ecdsa-sha256",
}
def __init__(self, key, key_id, algorithm="hmac-sha256", headers=None, passphrase=None):
def __init__(
self, key, key_id, algorithm="hmac-sha256", headers=None,
passphrase=None, use_auth_header=True):
assert algorithm in self.known_algorithms
self.key = key
self.key_id = key_id
self.algorithm = algorithm
# Wether to use Authorization or Signature header
self.use_auth_header = use_auth_header
self.headers = [h.lower() for h in headers] if headers is not None else ["date"]
self.passphrase = passphrase if passphrase is None or isinstance(passphrase, bytes) else passphrase.encode()
@ -95,20 +99,34 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
("algorithm", self.algorithm),
("headers", " ".join(self.headers)),
("signature", sig)]
request.headers["Authorization"] = "Signature " + ",".join('{}="{}"'.format(k, v) for k, v in sig_struct)
header_content = ",".join('{}="{}"'.format(k, v) for k, v in sig_struct)
if self.use_auth_header:
request.headers["Authorization"] = "Signature " + header_content
else:
# we use the Signature header as described in the RFC:
# https://tools.ietf.org/html/draft-cavage-http-signatures-06#section-4
request.headers["Signature"] = header_content
return request
@classmethod
def get_sig_struct(self, request):
scheme, sig_struct = request.headers["Authorization"].split(" ", 1)
def get_sig_struct(self, request, use_auth_header=True):
if use_auth_header:
scheme, sig_struct = request.headers["Authorization"].split(" ", 1)
else:
sig_struct = request.headers["Signature"]
return {i.split("=", 1)[0]: i.split("=", 1)[1].strip('"') for i in sig_struct.split(",")}
@classmethod
def verify(self, request, key_resolver):
assert "Authorization" in request.headers, "No Authorization header found"
msg = 'Unexpected scheme found in Authorization header (expected "Signature")'
assert request.headers["Authorization"].startswith("Signature "), msg
sig_struct = self.get_sig_struct(request)
def verify(self, request, key_resolver, use_auth_header=True):
if use_auth_header:
assert "Authorization" in request.headers, "No Authorization header found"
msg = 'Unexpected scheme found in Authorization header (expected "Signature")'
assert request.headers["Authorization"].startswith("Signature "), msg
else:
assert "Signature" in request.headers, "No Signature header found"
sig_struct = self.get_sig_struct(request, use_auth_header)
for field in "keyId", "algorithm", "signature":
assert field in sig_struct, 'Required signature parameter "{}" not found'.format(field)
assert sig_struct["algorithm"] in self.known_algorithms, "Unknown signature algorithm"

View File

@ -102,6 +102,16 @@ class TestRequestsHTTPSignature(unittest.TestCase):
auth = HTTPSignatureAuth(algorithm="rsa-sha256", key=private_key_pem, key_id="sekret", passphrase=passphrase)
self.session.get(url, auth=auth, headers=dict(pubkey=base64.b64encode(public_key_pem)))
def test_can_use_signature_header(self):
auth = auth=HTTPSignatureAuth(
key=hmac_secret, key_id="sekret", use_auth_header=False)
r = requests.Request(
url='http://test.com', auth=auth)
prepared = r.prepare()
self.assertIn('Signature', prepared.headers)
self.assertTrue(prepared.headers['Signature'].startswith('keyId='))
if __name__ == '__main__':
unittest.main()