Support and document verifying responses

pull/34/head
Andrey Kislyuk 2022-04-12 12:01:18 -07:00
parent 755d9325b8
commit fc73c56301
No known key found for this signature in database
GPG Key ID: 8AFAFCD242818A52
4 changed files with 72 additions and 29 deletions

View File

@ -36,10 +36,9 @@ To add other headers to the signature, pass an array of header names in the ``co
See the `API documentation <https://pyauth.github.io/requests-http-signature/#id1>`_ for the full list of options and
details.
Verifying messages
~~~~~~~~~~~~~~~~~~
In addition to signing messages in the client, the class method ``HTTPSignatureAuth.verify()`` can be used to verify
incoming requests:
Verifying responses
~~~~~~~~~~~~~~~~~~~
The class method ``HTTPSignatureAuth.verify()`` can be used to verify responses received back from the server:
.. code-block:: python
@ -48,12 +47,24 @@ incoming requests:
assert key_id == 'squirrel'
return 'monorail_cat'
request = requests.Request(...) # Reconstruct the incoming request using the Requests API
request = request.prepare()
HTTPSignatureAuth.verify(request,
response = requests.get(url, auth=auth)
HTTPSignatureAuth.verify(response,
signature_algorithm=algorithms.HMAC_SHA256,
key_resolver=key_resolver)
More generally, you can reconstruct an arbitrary request using the
`Requests API <https://docs.python-requests.org/en/latest/api/#requests.Request>`_ and pass it to ``verify()``:
.. code-block:: python
request = requests.Request(...) # Reconstruct the incoming request using the Requests API
prepared_request = request.prepare() # Generate a PreparedRequest
HTTPSignatureAuth.verify(prepared_request, ...)
To verify incoming requests and sign responses in the context of an HTTP server, see the
`flask-http-signature <https://github.com/pyauth/flask-http-signature>`_ and
`http-message-signatures <https://github.com/pyauth/http-message-signatures>`_ packages.
.. admonition:: See what is signed
It is important to understand and follow the best practice rule of "See what is signed" when verifying HTTP message

View File

@ -2,7 +2,7 @@ import datetime
import email.utils
import hashlib
import secrets
from typing import List
from typing import List, Union
import http_sfv
import requests
@ -148,7 +148,13 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
return request
@classmethod
def verify(cls, request: requests.PreparedRequest, *,
def get_body(cls, message):
if isinstance(message, requests.Response):
return message.content
return message.body
@classmethod
def verify(cls, message: Union[requests.PreparedRequest, requests.Response], *,
require_components: List[str] = ("@method", "@authority", "@target-uri"),
signature_algorithm: HTTPSignatureAlgorithm,
key_resolver: HTTPSignatureKeyResolver):
@ -166,23 +172,23 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
You can ensure that the information signed is what you expect to be signed by only trusting the *VerifyResult*
tuple returned by ``verify()``.
:param request:
The HTTP request to verify. You can reconstruct an incoming request using the
`Requests API <https://docs.python-requests.org/en/latest/api/#requests.Request>`_ as follows::
:param message:
The HTTP response or request to verify. You can either pass a received response, or reconstruct an arbitrary
request using the `Requests API <https://docs.python-requests.org/en/latest/api/#requests.Request>`_::
request = requests.Request(...)
request = request.prepare()
HTTPSignatureAuth.verify(request, ...)
prepared_request = request.prepare()
HTTPSignatureAuth.verify(prepared_request, ...)
:param require_components:
A list of lowercased header names or derived component IDs ("@method", "@target-uri", "@authority",
"@scheme", "@request-target", "@path", "@query", "@query-params", "@status", or "@request-response" as
specified in the standard) to require to be covered by the signature. If the "content-digest" header field
is specified here (recommended for requests that have a body), it will be verified by matching it against
the digest hash computed on the body of the request (expected to be bytes).
is specified here (recommended for messages that have a body), it will be verified by matching it against
the digest hash computed on the body of the message (expected to be bytes).
If this parameter is not specified, ``verify()`` will set it to ("@method", "@authority", "@target-uri")
for requests without a body, and ("@method", "@authority", "@target-uri", "content-digest") for requests
for messages without a body, and ("@method", "@authority", "@target-uri", "content-digest") for messages
with a body.
:param signature_algorithm:
The algorithm expected to be used by the signature. Any signature not using the expected algorithm will
@ -204,19 +210,20 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
* ``algorithm``: (same as ``signature_algorithm`` above)
* ``covered_components``: A mapping of component names to their values, as covered by the signature
* ``parameters``: A mapping of signature parameters to their values, as covered by the signature
* ``body``: The message body for requests that have a body and pass validation of the covered
* ``body``: The message body for messages that have a body and pass validation of the covered
content-digest; ``None`` otherwise.
:raises: ``InvalidSignature`` - raised whenever signature validation fails for any reason.
"""
if request.body is not None:
body = cls.get_body(message)
if body is not None:
if "content-digest" not in require_components and '"content-digest"' not in require_components:
require_components = list(require_components) + ["content-digest"]
verifier = HTTPMessageVerifier(signature_algorithm=signature_algorithm,
key_resolver=key_resolver,
component_resolver_class=cls.component_resolver_class)
verify_results = verifier.verify(request)
verify_results = verifier.verify(message)
if len(verify_results) != 1:
raise InvalidSignature("Multiple signatures are not supported.")
verify_result = verify_results[0]
@ -227,8 +234,8 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
if component_key not in verify_result.covered_components:
raise InvalidSignature(f"A required component, {component_key}, was not covered by the signature.")
if component_key == '"content-digest"':
if request.body is None:
raise InvalidSignature("Found a content-digest header in a request with no body")
if body is None:
raise InvalidSignature("Found a content-digest header in a message with no body")
digest = http_sfv.Dictionary()
digest.parse(verify_result.covered_components[component_key].encode())
if len(digest) < 1:
@ -238,8 +245,8 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
raise InvalidSignature(f'Unsupported digest algorithm "{k}"')
raw_digest = v.value
hasher = cls._digest_hashers[k]
expect_digest = hasher(request.body).digest()
expect_digest = hasher(body).digest()
if raw_digest != expect_digest:
raise InvalidSignature("The content-digest header does not match the request body")
verify_result = verify_result._replace(body=request.body)
raise InvalidSignature("The content-digest header does not match the message body")
verify_result = verify_result._replace(body=body)
return verify_result

View File

@ -16,7 +16,7 @@ setup(
},
setup_requires=['setuptools_scm >= 3.4.3'],
install_requires=[
"http-message-signatures >= 0.2.2",
"http-message-signatures >= 0.2.3",
"http-sfv >= 0.9.3",
"requests >= 2.27.1"
],

View File

@ -1,14 +1,15 @@
#!/usr/bin/env python
import os, sys, unittest, logging, base64
import os, sys, unittest, logging, base64, io, json
import http_sfv
import requests
from requests.adapters import HTTPAdapter
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from requests_http_signature import algorithms, HTTPSignatureAuth # noqa: E402
from http_message_signatures import InvalidSignature # noqa: E402
from http_message_signatures import HTTPMessageSigner, InvalidSignature # noqa: E402
logging.basicConfig(level="DEBUG")
@ -34,10 +35,20 @@ class TestAdapter(HTTPAdapter):
except InvalidSignature:
pass
response = requests.Response()
response.request = request
response.status_code = requests.codes.ok
response.url = request.url
response.headers["Received-Signature-Input"] = request.headers["Signature-Input"]
response.headers["Received-Signature"] = request.headers["Signature"]
response.raw = io.BytesIO(json.dumps({}).encode())
signer = HTTPMessageSigner(signature_algorithm=self.client_auth.signer.signature_algorithm,
key_resolver=self.client_auth.signer.key_resolver)
hasher = HTTPSignatureAuth._digest_hashers["sha-256"]
digest = hasher(response.raw.getvalue()).digest()
response.headers["Content-Digest"] = str(http_sfv.Dictionary({"sha-256": digest}))
signer.sign(response,
key_id=default_keyid,
covered_component_ids=("@method", "@authority", "content-digest", "@target-uri"))
return response
@ -61,7 +72,21 @@ class TestRequestsHTTPSignature(unittest.TestCase):
self.session.get(url, auth=self.auth)
self.auth.signer.key_resolver.resolve_private_key = lambda k: b"abc"
self.session.get(url, auth=self.auth)
self.session.post(url, auth=self.auth, data=b"xyz")
res = self.session.post(url, auth=self.auth, data=b"xyz")
verify_args = dict(signature_algorithm=algorithms.HMAC_SHA256, key_resolver=self.auth.signer.key_resolver)
HTTPSignatureAuth.verify(res, **verify_args)
res.headers["Content-Digest"] = res.headers["Content-Digest"][::-1]
with self.assertRaises(InvalidSignature):
HTTPSignatureAuth.verify(res, **verify_args)
del res.headers["Content-Digest"]
with self.assertRaises(InvalidSignature):
HTTPSignatureAuth.verify(res, **verify_args)
res.headers["Signature"] = res.headers["Signature"][::-1]
with self.assertRaises(InvalidSignature):
HTTPSignatureAuth.verify(res, **verify_args)
del res.headers["Signature"]
with self.assertRaises(InvalidSignature):
HTTPSignatureAuth.verify(res, **verify_args)
def test_expired_signature(self):
"TODO"