Require key ID
parent
a04e79cbe8
commit
9c5a122925
|
@ -12,8 +12,8 @@ specified in the RFC), there is an optional dependency on `cryptography <https:/
|
|||
import requests
|
||||
from requests_http_signature import HTTPSignatureAuth
|
||||
preshared_secret = 'monorail_cat'
|
||||
url = 'http://httpbin.org/get'
|
||||
requests.get(url, auth=HTTPSignatureAuth(key=preshared_secret))
|
||||
url = 'http://example.com/path'
|
||||
requests.get(url, auth=HTTPSignatureAuth(key=preshared_secret, key_id='squirrel'))
|
||||
|
||||
In addition to signing messages in the client, the class method ``HTTPSignatureAuth.verify()`` can be used to verify
|
||||
incoming requests:
|
||||
|
|
|
@ -44,7 +44,7 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
|
|||
"ecdsa-sha256",
|
||||
}
|
||||
|
||||
def __init__(self, key, key_id="hmac-key-1", algorithm="hmac-sha256", headers=None, passphrase=None):
|
||||
def __init__(self, key, key_id, algorithm="hmac-sha256", headers=None, passphrase=None):
|
||||
assert algorithm in self.known_algorithms
|
||||
self.key = key
|
||||
self.key_id = key_id
|
||||
|
@ -65,6 +65,7 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
|
|||
digest = self.hasher_constructor(request.body).digest()
|
||||
request.headers["Digest"] = "SHA-256=" + base64.b64encode(digest).decode()
|
||||
|
||||
@classmethod
|
||||
def get_string_to_sign(self, request, headers):
|
||||
sts = []
|
||||
for header in headers:
|
||||
|
@ -92,6 +93,7 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
|
|||
request.headers["Authorization"] = "Signature " + ",".join('{}="{}"'.format(k, v) for k, v in sig_struct)
|
||||
return request
|
||||
|
||||
@classmethod
|
||||
def verify(self, request, key_resolver):
|
||||
assert "Authorization" in request.headers, "No Authorization header found"
|
||||
msg = 'Unexpected scheme found in Authorization header (expected "Signature")'
|
||||
|
|
|
@ -20,7 +20,7 @@ class TestAdapter(HTTPAdapter):
|
|||
if "pubkey" in request.headers:
|
||||
return base64.b64decode(request.headers["pubkey"])
|
||||
return hmac_secret
|
||||
HTTPSignatureAuth(key=hmac_secret).verify(request, key_resolver=key_resolver)
|
||||
HTTPSignatureAuth.verify(request, key_resolver=key_resolver)
|
||||
response = requests.Response()
|
||||
response.status_code = requests.codes.ok
|
||||
response.url = request.url
|
||||
|
@ -34,15 +34,16 @@ class TestRequestsHTTPSignature(unittest.TestCase):
|
|||
|
||||
def test_basic_statements(self):
|
||||
url = 'http://example.com/path?query#fragment'
|
||||
self.session.get(url, auth=HTTPSignatureAuth(key=hmac_secret))
|
||||
self.session.get(url, auth=HTTPSignatureAuth(key=hmac_secret, key_id="sekret"))
|
||||
with self.assertRaises(AssertionError):
|
||||
self.session.get(url, auth=HTTPSignatureAuth(key=hmac_secret[::-1]))
|
||||
self.session.get(url, auth=HTTPSignatureAuth(key=hmac_secret[::-1], key_id="sekret"))
|
||||
|
||||
def test_rfc_example(self):
|
||||
url = 'http://example.org/foo'
|
||||
payload = {"hello": "world"}
|
||||
date = "Tue, 07 Jun 2014 20:51:35 GMT"
|
||||
auth = HTTPSignatureAuth(key=hmac_secret,
|
||||
key_id="sekret",
|
||||
headers=["(request-target)", "host", "date", "digest", "content-length"])
|
||||
self.session.post(url, json=payload, headers={"Date": date}, auth=auth)
|
||||
|
||||
|
@ -65,7 +66,7 @@ class TestRequestsHTTPSignature(unittest.TestCase):
|
|||
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
||||
)
|
||||
url = 'http://example.com/path?query#fragment'
|
||||
auth = HTTPSignatureAuth(algorithm="rsa-sha256", key=private_key_pem, passphrase=passphrase)
|
||||
auth = HTTPSignatureAuth(algorithm="rsa-sha256", key=private_key_pem, key_id="sekret", passphrase=passphrase)
|
||||
self.session.get(url, auth=auth, headers=dict(pubkey=base64.b64encode(public_key_pem)))
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
Loading…
Reference in New Issue