Add test for HTTPSignatureHeaderAuth
parent
d7770e3ae9
commit
2824b9b6e5
|
@ -109,16 +109,24 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
|
|||
return request
|
||||
|
||||
@classmethod
|
||||
def get_sig_struct(self, request):
|
||||
scheme, sig_struct = request.headers["Authorization"].split(" ", 1)
|
||||
def get_sig_struct(self, request, scheme="Authorization"):
|
||||
sig_struct = request.headers[scheme]
|
||||
if scheme == "Authorization":
|
||||
sig_struct = sig_struct.split(" ", 1)[1]
|
||||
return {i.split("=", 1)[0]: i.split("=", 1)[1].strip('"') for i in sig_struct.split(",")}
|
||||
|
||||
@classmethod
|
||||
def verify(self, request, key_resolver):
|
||||
assert "Authorization" in request.headers, "No Authorization header found"
|
||||
msg = 'Unexpected scheme found in Authorization header (expected "Signature")'
|
||||
assert request.headers["Authorization"].startswith("Signature "), msg
|
||||
sig_struct = self.get_sig_struct(request)
|
||||
def verify(self, request, key_resolver, scheme="Authorization"):
|
||||
if scheme == "Authorization":
|
||||
assert "Authorization" in request.headers, "No Authorization header found"
|
||||
msg = 'Unexpected scheme found in Authorization header (expected "Signature")'
|
||||
assert request.headers["Authorization"].startswith("Signature "), msg
|
||||
elif scheme == "Signature":
|
||||
assert "Signature" in request.headers, "No Signature header found"
|
||||
else:
|
||||
raise RequestsHttpSignatureException('Unknown signature scheme "{}"'.format(scheme))
|
||||
|
||||
sig_struct = self.get_sig_struct(request, scheme=scheme)
|
||||
for field in "keyId", "algorithm", "signature":
|
||||
assert field in sig_struct, 'Required signature parameter "{}" not found'.format(field)
|
||||
assert sig_struct["algorithm"] in self.known_algorithms, "Unknown signature algorithm"
|
||||
|
|
13
test/test.py
13
test/test.py
|
@ -9,19 +9,22 @@ from requests.adapters import HTTPAdapter
|
|||
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) # noqa
|
||||
|
||||
from requests_http_signature import HTTPSignatureAuth, RequestsHttpSignatureException
|
||||
from requests_http_signature import HTTPSignatureAuth, HTTPSignatureHeaderAuth, RequestsHttpSignatureException
|
||||
hmac_secret = b"monorail_cat"
|
||||
passphrase = b"passw0rd"
|
||||
|
||||
class TestAdapter(HTTPAdapter):
|
||||
def __init__(self, testcase):
|
||||
self.testcase = testcase
|
||||
|
||||
def send(self, request, *args, **kwargs):
|
||||
def key_resolver(key_id, algorithm):
|
||||
if "pubkey" in request.headers:
|
||||
return base64.b64decode(request.headers["pubkey"])
|
||||
return hmac_secret
|
||||
HTTPSignatureAuth.verify(request, key_resolver=key_resolver)
|
||||
HTTPSignatureAuth.verify(request,
|
||||
key_resolver=key_resolver,
|
||||
scheme=request.headers.get("sigScheme", "Authorization"))
|
||||
if "expectSig" in request.headers:
|
||||
self.testcase.assertEqual(request.headers["expectSig"],
|
||||
HTTPSignatureAuth.get_sig_struct(request)["signature"])
|
||||
|
@ -83,6 +86,12 @@ class TestRequestsHTTPSignature(unittest.TestCase):
|
|||
headers = {"Date": date, "pubkey": pubkey_b64, "expectSig": expect_sig, "content-type": "application/json"}
|
||||
self.session.post(url, json=payload, headers=headers, auth=auth)
|
||||
|
||||
#def test_signature_scheme(self):
|
||||
auth = HTTPSignatureHeaderAuth(key=hmac_secret,
|
||||
key_id="sekret",
|
||||
headers=["(request-target)", "host", "date", "digest", "content-length"])
|
||||
self.session.post(url, json=payload, headers={"Date": date, "sigScheme": "Signature"}, auth=auth)
|
||||
|
||||
def test_rsa(self):
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
|
|
Loading…
Reference in New Issue