From fc73c5630197ca9afbbd890d3b3b2e3a9dcdb6fd Mon Sep 17 00:00:00 2001 From: Andrey Kislyuk Date: Tue, 12 Apr 2022 12:01:18 -0700 Subject: [PATCH] Support and document verifying responses --- README.rst | 25 ++++++++++++----- requests_http_signature/__init__.py | 43 +++++++++++++++++------------ setup.py | 2 +- test/test.py | 31 +++++++++++++++++++-- 4 files changed, 72 insertions(+), 29 deletions(-) diff --git a/README.rst b/README.rst index 2df27b5..994a414 100644 --- a/README.rst +++ b/README.rst @@ -36,10 +36,9 @@ To add other headers to the signature, pass an array of header names in the ``co See the `API documentation `_ for the full list of options and details. -Verifying messages -~~~~~~~~~~~~~~~~~~ -In addition to signing messages in the client, the class method ``HTTPSignatureAuth.verify()`` can be used to verify -incoming requests: +Verifying responses +~~~~~~~~~~~~~~~~~~~ +The class method ``HTTPSignatureAuth.verify()`` can be used to verify responses received back from the server: .. code-block:: python @@ -48,12 +47,24 @@ incoming requests: assert key_id == 'squirrel' return 'monorail_cat' - request = requests.Request(...) # Reconstruct the incoming request using the Requests API - request = request.prepare() - HTTPSignatureAuth.verify(request, + response = requests.get(url, auth=auth) + HTTPSignatureAuth.verify(response, signature_algorithm=algorithms.HMAC_SHA256, key_resolver=key_resolver) +More generally, you can reconstruct an arbitrary request using the +`Requests API `_ and pass it to ``verify()``: + +.. code-block:: python + + request = requests.Request(...) # Reconstruct the incoming request using the Requests API + prepared_request = request.prepare() # Generate a PreparedRequest + HTTPSignatureAuth.verify(prepared_request, ...) + +To verify incoming requests and sign responses in the context of an HTTP server, see the +`flask-http-signature `_ and +`http-message-signatures `_ packages. + .. admonition:: See what is signed It is important to understand and follow the best practice rule of "See what is signed" when verifying HTTP message diff --git a/requests_http_signature/__init__.py b/requests_http_signature/__init__.py index c60da77..5f8cfc0 100644 --- a/requests_http_signature/__init__.py +++ b/requests_http_signature/__init__.py @@ -2,7 +2,7 @@ import datetime import email.utils import hashlib import secrets -from typing import List +from typing import List, Union import http_sfv import requests @@ -148,7 +148,13 @@ class HTTPSignatureAuth(requests.auth.AuthBase): return request @classmethod - def verify(cls, request: requests.PreparedRequest, *, + def get_body(cls, message): + if isinstance(message, requests.Response): + return message.content + return message.body + + @classmethod + def verify(cls, message: Union[requests.PreparedRequest, requests.Response], *, require_components: List[str] = ("@method", "@authority", "@target-uri"), signature_algorithm: HTTPSignatureAlgorithm, key_resolver: HTTPSignatureKeyResolver): @@ -166,23 +172,23 @@ class HTTPSignatureAuth(requests.auth.AuthBase): You can ensure that the information signed is what you expect to be signed by only trusting the *VerifyResult* tuple returned by ``verify()``. - :param request: - The HTTP request to verify. You can reconstruct an incoming request using the - `Requests API `_ as follows:: + :param message: + The HTTP response or request to verify. You can either pass a received response, or reconstruct an arbitrary + request using the `Requests API `_:: request = requests.Request(...) - request = request.prepare() - HTTPSignatureAuth.verify(request, ...) + prepared_request = request.prepare() + HTTPSignatureAuth.verify(prepared_request, ...) :param require_components: A list of lowercased header names or derived component IDs ("@method", "@target-uri", "@authority", "@scheme", "@request-target", "@path", "@query", "@query-params", "@status", or "@request-response" as specified in the standard) to require to be covered by the signature. If the "content-digest" header field - is specified here (recommended for requests that have a body), it will be verified by matching it against - the digest hash computed on the body of the request (expected to be bytes). + is specified here (recommended for messages that have a body), it will be verified by matching it against + the digest hash computed on the body of the message (expected to be bytes). If this parameter is not specified, ``verify()`` will set it to ("@method", "@authority", "@target-uri") - for requests without a body, and ("@method", "@authority", "@target-uri", "content-digest") for requests + for messages without a body, and ("@method", "@authority", "@target-uri", "content-digest") for messages with a body. :param signature_algorithm: The algorithm expected to be used by the signature. Any signature not using the expected algorithm will @@ -204,19 +210,20 @@ class HTTPSignatureAuth(requests.auth.AuthBase): * ``algorithm``: (same as ``signature_algorithm`` above) * ``covered_components``: A mapping of component names to their values, as covered by the signature * ``parameters``: A mapping of signature parameters to their values, as covered by the signature - * ``body``: The message body for requests that have a body and pass validation of the covered + * ``body``: The message body for messages that have a body and pass validation of the covered content-digest; ``None`` otherwise. :raises: ``InvalidSignature`` - raised whenever signature validation fails for any reason. """ - if request.body is not None: + body = cls.get_body(message) + if body is not None: if "content-digest" not in require_components and '"content-digest"' not in require_components: require_components = list(require_components) + ["content-digest"] verifier = HTTPMessageVerifier(signature_algorithm=signature_algorithm, key_resolver=key_resolver, component_resolver_class=cls.component_resolver_class) - verify_results = verifier.verify(request) + verify_results = verifier.verify(message) if len(verify_results) != 1: raise InvalidSignature("Multiple signatures are not supported.") verify_result = verify_results[0] @@ -227,8 +234,8 @@ class HTTPSignatureAuth(requests.auth.AuthBase): if component_key not in verify_result.covered_components: raise InvalidSignature(f"A required component, {component_key}, was not covered by the signature.") if component_key == '"content-digest"': - if request.body is None: - raise InvalidSignature("Found a content-digest header in a request with no body") + if body is None: + raise InvalidSignature("Found a content-digest header in a message with no body") digest = http_sfv.Dictionary() digest.parse(verify_result.covered_components[component_key].encode()) if len(digest) < 1: @@ -238,8 +245,8 @@ class HTTPSignatureAuth(requests.auth.AuthBase): raise InvalidSignature(f'Unsupported digest algorithm "{k}"') raw_digest = v.value hasher = cls._digest_hashers[k] - expect_digest = hasher(request.body).digest() + expect_digest = hasher(body).digest() if raw_digest != expect_digest: - raise InvalidSignature("The content-digest header does not match the request body") - verify_result = verify_result._replace(body=request.body) + raise InvalidSignature("The content-digest header does not match the message body") + verify_result = verify_result._replace(body=body) return verify_result diff --git a/setup.py b/setup.py index 0b92f4a..7f4ca0a 100755 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ setup( }, setup_requires=['setuptools_scm >= 3.4.3'], install_requires=[ - "http-message-signatures >= 0.2.2", + "http-message-signatures >= 0.2.3", "http-sfv >= 0.9.3", "requests >= 2.27.1" ], diff --git a/test/test.py b/test/test.py index be0c4d6..c054a5a 100755 --- a/test/test.py +++ b/test/test.py @@ -1,14 +1,15 @@ #!/usr/bin/env python -import os, sys, unittest, logging, base64 +import os, sys, unittest, logging, base64, io, json +import http_sfv import requests from requests.adapters import HTTPAdapter sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from requests_http_signature import algorithms, HTTPSignatureAuth # noqa: E402 -from http_message_signatures import InvalidSignature # noqa: E402 +from http_message_signatures import HTTPMessageSigner, InvalidSignature # noqa: E402 logging.basicConfig(level="DEBUG") @@ -34,10 +35,20 @@ class TestAdapter(HTTPAdapter): except InvalidSignature: pass response = requests.Response() + response.request = request response.status_code = requests.codes.ok response.url = request.url response.headers["Received-Signature-Input"] = request.headers["Signature-Input"] response.headers["Received-Signature"] = request.headers["Signature"] + response.raw = io.BytesIO(json.dumps({}).encode()) + signer = HTTPMessageSigner(signature_algorithm=self.client_auth.signer.signature_algorithm, + key_resolver=self.client_auth.signer.key_resolver) + hasher = HTTPSignatureAuth._digest_hashers["sha-256"] + digest = hasher(response.raw.getvalue()).digest() + response.headers["Content-Digest"] = str(http_sfv.Dictionary({"sha-256": digest})) + signer.sign(response, + key_id=default_keyid, + covered_component_ids=("@method", "@authority", "content-digest", "@target-uri")) return response @@ -61,7 +72,21 @@ class TestRequestsHTTPSignature(unittest.TestCase): self.session.get(url, auth=self.auth) self.auth.signer.key_resolver.resolve_private_key = lambda k: b"abc" self.session.get(url, auth=self.auth) - self.session.post(url, auth=self.auth, data=b"xyz") + res = self.session.post(url, auth=self.auth, data=b"xyz") + verify_args = dict(signature_algorithm=algorithms.HMAC_SHA256, key_resolver=self.auth.signer.key_resolver) + HTTPSignatureAuth.verify(res, **verify_args) + res.headers["Content-Digest"] = res.headers["Content-Digest"][::-1] + with self.assertRaises(InvalidSignature): + HTTPSignatureAuth.verify(res, **verify_args) + del res.headers["Content-Digest"] + with self.assertRaises(InvalidSignature): + HTTPSignatureAuth.verify(res, **verify_args) + res.headers["Signature"] = res.headers["Signature"][::-1] + with self.assertRaises(InvalidSignature): + HTTPSignatureAuth.verify(res, **verify_args) + del res.headers["Signature"] + with self.assertRaises(InvalidSignature): + HTTPSignatureAuth.verify(res, **verify_args) def test_expired_signature(self): "TODO"