diff --git a/requests_http_signature/__init__.py b/requests_http_signature/__init__.py index 5d08b57..9ebb4d6 100644 --- a/requests_http_signature/__init__.py +++ b/requests_http_signature/__init__.py @@ -109,16 +109,24 @@ class HTTPSignatureAuth(requests.auth.AuthBase): return request @classmethod - def get_sig_struct(self, request): - scheme, sig_struct = request.headers["Authorization"].split(" ", 1) + def get_sig_struct(self, request, scheme="Authorization"): + sig_struct = request.headers[scheme] + if scheme == "Authorization": + sig_struct = sig_struct.split(" ", 1)[1] return {i.split("=", 1)[0]: i.split("=", 1)[1].strip('"') for i in sig_struct.split(",")} @classmethod - def verify(self, request, key_resolver): - assert "Authorization" in request.headers, "No Authorization header found" - msg = 'Unexpected scheme found in Authorization header (expected "Signature")' - assert request.headers["Authorization"].startswith("Signature "), msg - sig_struct = self.get_sig_struct(request) + def verify(self, request, key_resolver, scheme="Authorization"): + if scheme == "Authorization": + assert "Authorization" in request.headers, "No Authorization header found" + msg = 'Unexpected scheme found in Authorization header (expected "Signature")' + assert request.headers["Authorization"].startswith("Signature "), msg + elif scheme == "Signature": + assert "Signature" in request.headers, "No Signature header found" + else: + raise RequestsHttpSignatureException('Unknown signature scheme "{}"'.format(scheme)) + + sig_struct = self.get_sig_struct(request, scheme=scheme) for field in "keyId", "algorithm", "signature": assert field in sig_struct, 'Required signature parameter "{}" not found'.format(field) assert sig_struct["algorithm"] in self.known_algorithms, "Unknown signature algorithm" diff --git a/test/test.py b/test/test.py index c5e20c9..ef48678 100755 --- a/test/test.py +++ b/test/test.py @@ -9,19 +9,22 @@ from requests.adapters import HTTPAdapter sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) # noqa -from requests_http_signature import HTTPSignatureAuth, RequestsHttpSignatureException +from requests_http_signature import HTTPSignatureAuth, HTTPSignatureHeaderAuth, RequestsHttpSignatureException hmac_secret = b"monorail_cat" passphrase = b"passw0rd" class TestAdapter(HTTPAdapter): def __init__(self, testcase): self.testcase = testcase + def send(self, request, *args, **kwargs): def key_resolver(key_id, algorithm): if "pubkey" in request.headers: return base64.b64decode(request.headers["pubkey"]) return hmac_secret - HTTPSignatureAuth.verify(request, key_resolver=key_resolver) + HTTPSignatureAuth.verify(request, + key_resolver=key_resolver, + scheme=request.headers.get("sigScheme", "Authorization")) if "expectSig" in request.headers: self.testcase.assertEqual(request.headers["expectSig"], HTTPSignatureAuth.get_sig_struct(request)["signature"]) @@ -83,6 +86,12 @@ class TestRequestsHTTPSignature(unittest.TestCase): headers = {"Date": date, "pubkey": pubkey_b64, "expectSig": expect_sig, "content-type": "application/json"} self.session.post(url, json=payload, headers=headers, auth=auth) + #def test_signature_scheme(self): + auth = HTTPSignatureHeaderAuth(key=hmac_secret, + key_id="sekret", + headers=["(request-target)", "host", "date", "digest", "content-length"]) + self.session.post(url, json=payload, headers={"Date": date, "sigScheme": "Signature"}, auth=auth) + def test_rsa(self): from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa