Use isort and black

pull/38/head
Andrey Kislyuk 2022-08-31 10:49:21 -07:00
parent c643c8ccd7
commit 45f540a22c
No known key found for this signature in database
GPG Key ID: 8AFAFCD242818A52
6 changed files with 116 additions and 86 deletions

View File

@ -24,3 +24,5 @@ jobs:
run: pip install .[tests] run: pip install .[tests]
- name: Test - name: Test
run: make test run: make test
- uses: isort/isort-action@v1.0.0
- uses: psf/black@stable

View File

@ -1,7 +1,9 @@
import os, sys import os
import sys
import guzzle_sphinx_theme import guzzle_sphinx_theme
sys.path.insert(0, os.path.abspath('..')) sys.path.insert(0, os.path.abspath(".."))
project = "requests-http-signature" project = "requests-http-signature"
copyright = "Andrey Kislyuk" copyright = "Andrey Kislyuk"
@ -27,6 +29,6 @@ html_sidebars = {
"logo-text.html", "logo-text.html",
# "globaltoc.html", # "globaltoc.html",
"localtoc.html", "localtoc.html",
"searchbox.html" "searchbox.html",
] ]
} }

5
pyproject.toml Normal file
View File

@ -0,0 +1,5 @@
[tool.black]
line-length = 120
exclude = ".*/version.py"
[tool.isort]
profile = "black"

View File

@ -89,16 +89,19 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
_auto_cover_header_fields = {"authorization", "content-digest", "date"} _auto_cover_header_fields = {"authorization", "content-digest", "date"}
def __init__(self, *, def __init__(
signature_algorithm: Type[HTTPSignatureAlgorithm], self,
key: bytes = None, *,
key_id: str, signature_algorithm: Type[HTTPSignatureAlgorithm],
key_resolver: HTTPSignatureKeyResolver = None, key: bytes = None,
covered_component_ids: Sequence[str] = ("@method", "@authority", "@target-uri"), key_id: str,
label: str = None, key_resolver: HTTPSignatureKeyResolver = None,
include_alg: bool = True, covered_component_ids: Sequence[str] = ("@method", "@authority", "@target-uri"),
use_nonce: bool = False, label: str = None,
expires_in: datetime.timedelta = None): include_alg: bool = True,
use_nonce: bool = False,
expires_in: datetime.timedelta = None,
):
if key_resolver is None and key is None: if key_resolver is None and key is None:
raise RequestsHttpSignatureException("Either key_resolver or key must be specified.") raise RequestsHttpSignatureException("Either key_resolver or key must be specified.")
if key_resolver is not None and key is not None: if key_resolver is not None and key is not None:
@ -112,9 +115,11 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
self.use_nonce = use_nonce self.use_nonce = use_nonce
self.covered_component_ids = covered_component_ids self.covered_component_ids = covered_component_ids
self.expires_in = expires_in self.expires_in = expires_in
self.signer = HTTPMessageSigner(signature_algorithm=signature_algorithm, self.signer = HTTPMessageSigner(
key_resolver=key_resolver, signature_algorithm=signature_algorithm,
component_resolver_class=self.component_resolver_class) key_resolver=key_resolver,
component_resolver_class=self.component_resolver_class,
)
def add_date(self, request, timestamp): def add_date(self, request, timestamp):
if "Date" not in request.headers: if "Date" not in request.headers:
@ -156,14 +161,16 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
created = self.get_created(request) created = self.get_created(request)
expires = self.get_expires(request, created=created) expires = self.get_expires(request, created=created)
covered_component_ids = self.get_covered_component_ids(request) covered_component_ids = self.get_covered_component_ids(request)
self.signer.sign(request, self.signer.sign(
key_id=self.key_id, request,
created=created, key_id=self.key_id,
expires=expires, created=created,
nonce=self.get_nonce(request), expires=expires,
label=self.label, nonce=self.get_nonce(request),
include_alg=self.include_alg, label=self.label,
covered_component_ids=covered_component_ids) include_alg=self.include_alg,
covered_component_ids=covered_component_ids,
)
return request return request
@classmethod @classmethod
@ -173,11 +180,15 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
return message.body return message.body
@classmethod @classmethod
def verify(cls, message: Union[requests.PreparedRequest, requests.Response], *, def verify(
require_components: Sequence[str] = ("@method", "@authority", "@target-uri"), cls,
signature_algorithm: Type[HTTPSignatureAlgorithm], message: Union[requests.PreparedRequest, requests.Response],
key_resolver: HTTPSignatureKeyResolver, *,
max_age: datetime.timedelta = datetime.timedelta(days=1)) -> VerifyResult: require_components: Sequence[str] = ("@method", "@authority", "@target-uri"),
signature_algorithm: Type[HTTPSignatureAlgorithm],
key_resolver: HTTPSignatureKeyResolver,
max_age: datetime.timedelta = datetime.timedelta(days=1),
) -> VerifyResult:
""" """
Verify an HTTP message signature. Verify an HTTP message signature.
@ -244,9 +255,11 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
if "content-digest" not in require_components and '"content-digest"' not in require_components: if "content-digest" not in require_components and '"content-digest"' not in require_components:
require_components = list(require_components) + ["content-digest"] require_components = list(require_components) + ["content-digest"]
verifier = HTTPMessageVerifier(signature_algorithm=signature_algorithm, verifier = HTTPMessageVerifier(
key_resolver=key_resolver, signature_algorithm=signature_algorithm,
component_resolver_class=cls.component_resolver_class) key_resolver=key_resolver,
component_resolver_class=cls.component_resolver_class,
)
verify_results = verifier.verify(message, max_age=max_age) verify_results = verifier.verify(message, max_age=max_age)
if len(verify_results) != 1: if len(verify_results) != 1:
raise InvalidSignature("Multiple signatures are not supported.") raise InvalidSignature("Multiple signatures are not supported.")

View File

@ -1,24 +1,20 @@
#!/usr/bin/env python #!/usr/bin/env python
from setuptools import setup, find_packages from setuptools import find_packages, setup
setup( setup(
name='requests-http-signature', name="requests-http-signature",
url='https://github.com/pyauth/requests-http-signature', url="https://github.com/pyauth/requests-http-signature",
license='Apache Software License', license="Apache Software License",
author='Andrey Kislyuk', author="Andrey Kislyuk",
author_email='kislyuk@gmail.com', author_email="kislyuk@gmail.com",
description="A Requests auth module for HTTP Message Signatures", description="A Requests auth module for HTTP Message Signatures",
long_description=open('README.rst').read(), long_description=open("README.rst").read(),
use_scm_version={ use_scm_version={
"write_to": "requests_http_signature/version.py", "write_to": "requests_http_signature/version.py",
}, },
setup_requires=['setuptools_scm >= 3.4.3'], setup_requires=["setuptools_scm >= 3.4.3"],
install_requires=[ install_requires=["http-message-signatures >= 0.4.3", "http-sfv >= 0.9.3", "requests >= 2.25.1"],
"http-message-signatures >= 0.4.3",
"http-sfv >= 0.9.3",
"requests >= 2.25.1"
],
extras_require={ extras_require={
"tests": [ "tests": [
"flake8", "flake8",
@ -29,23 +25,23 @@ setup(
"types-requests", "types-requests",
] ]
}, },
packages=find_packages(exclude=['test']), packages=find_packages(exclude=["test"]),
include_package_data=True, include_package_data=True,
package_data={ package_data={
"http_message_signatures": ["py.typed"], "http_message_signatures": ["py.typed"],
}, },
platforms=['MacOS X', 'Posix'], platforms=["MacOS X", "Posix"],
test_suite='test', test_suite="test",
classifiers=[ classifiers=[
'Intended Audience :: Developers', "Intended Audience :: Developers",
'License :: OSI Approved :: Apache Software License', "License :: OSI Approved :: Apache Software License",
'Operating System :: MacOS :: MacOS X', "Operating System :: MacOS :: MacOS X",
'Operating System :: POSIX', "Operating System :: POSIX",
'Programming Language :: Python', "Programming Language :: Python",
'Programming Language :: Python :: 3.7', "Programming Language :: Python :: 3.7",
'Programming Language :: Python :: 3.8', "Programming Language :: Python :: 3.8",
'Programming Language :: Python :: 3.9', "Programming Language :: Python :: 3.9",
'Programming Language :: Python :: 3.10', "Programming Language :: Python :: 3.10",
'Topic :: Software Development :: Libraries :: Python Modules' "Topic :: Software Development :: Libraries :: Python Modules",
], ],
) )

View File

@ -1,15 +1,23 @@
#!/usr/bin/env python #!/usr/bin/env python
import os, sys, unittest, logging, base64, io, json import base64
import io
import json
import logging
import os
import sys
import unittest
import http_sfv import http_sfv
import requests import requests
from requests.adapters import HTTPAdapter from requests.adapters import HTTPAdapter
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from requests_http_signature import algorithms, HTTPSignatureAuth # noqa: E402 from http_message_signatures import HTTPMessageSigner # noqa: E402
from http_message_signatures import HTTPMessageSigner, InvalidSignature # noqa: E402 from http_message_signatures import InvalidSignature # noqa: E402
from requests_http_signature import HTTPSignatureAuth, algorithms # noqa: E402
logging.basicConfig(level="DEBUG") logging.basicConfig(level="DEBUG")
@ -24,8 +32,10 @@ class TestAdapter(HTTPAdapter):
self.client_auth = auth self.client_auth = auth
def send(self, request, *args, **kwargs): def send(self, request, *args, **kwargs):
verify_args = dict(signature_algorithm=self.client_auth.signer.signature_algorithm, verify_args = dict(
key_resolver=self.client_auth.signer.key_resolver) signature_algorithm=self.client_auth.signer.signature_algorithm,
key_resolver=self.client_auth.signer.key_resolver,
)
HTTPSignatureAuth.verify(request, **verify_args) HTTPSignatureAuth.verify(request, **verify_args)
if request.body is not None: if request.body is not None:
request.body = request.body[::-1] request.body = request.body[::-1]
@ -41,14 +51,18 @@ class TestAdapter(HTTPAdapter):
response.headers["Received-Signature-Input"] = request.headers["Signature-Input"] response.headers["Received-Signature-Input"] = request.headers["Signature-Input"]
response.headers["Received-Signature"] = request.headers["Signature"] response.headers["Received-Signature"] = request.headers["Signature"]
response.raw = io.BytesIO(json.dumps({}).encode()) response.raw = io.BytesIO(json.dumps({}).encode())
signer = HTTPMessageSigner(signature_algorithm=self.client_auth.signer.signature_algorithm, signer = HTTPMessageSigner(
key_resolver=self.client_auth.signer.key_resolver) signature_algorithm=self.client_auth.signer.signature_algorithm,
key_resolver=self.client_auth.signer.key_resolver,
)
hasher = HTTPSignatureAuth._content_digest_hashers["sha-256"] hasher = HTTPSignatureAuth._content_digest_hashers["sha-256"]
digest = hasher(response.raw.getvalue()).digest() digest = hasher(response.raw.getvalue()).digest()
response.headers["Content-Digest"] = str(http_sfv.Dictionary({"sha-256": digest})) response.headers["Content-Digest"] = str(http_sfv.Dictionary({"sha-256": digest}))
signer.sign(response, signer.sign(
key_id=default_keyid, response,
covered_component_ids=("@method", "@authority", "content-digest", "@target-uri")) key_id=default_keyid,
covered_component_ids=("@method", "@authority", "content-digest", "@target-uri"),
)
return response return response
@ -65,7 +79,7 @@ class TestRequestsHTTPSignature(unittest.TestCase):
self.session.mount("https://", TestAdapter(self.auth)) self.session.mount("https://", TestAdapter(self.auth))
def test_basic_statements(self): def test_basic_statements(self):
url = 'http://example.com/path?query#fragment' url = "http://example.com/path?query#fragment"
self.session.get(url, auth=self.auth) self.session.get(url, auth=self.auth)
self.auth.signer.key_resolver.resolve_public_key = lambda k: b"abc" self.auth.signer.key_resolver.resolve_public_key = lambda k: b"abc"
with self.assertRaises(InvalidSignature): with self.assertRaises(InvalidSignature):
@ -89,46 +103,44 @@ class TestRequestsHTTPSignature(unittest.TestCase):
HTTPSignatureAuth.verify(res, **verify_args) HTTPSignatureAuth.verify(res, **verify_args)
def test_auto_cover_authorization_header(self): def test_auto_cover_authorization_header(self):
url = 'http://example.com/path?query#fragment' url = "http://example.com/path?query#fragment"
res = self.session.get(url, auth=self.auth, headers={"Authorization": "Bearer 12345"}) res = self.session.get(url, auth=self.auth, headers={"Authorization": "Bearer 12345"})
self.assertIn('"authorization"', res.headers["Received-Signature-Input"]) self.assertIn('"authorization"', res.headers["Received-Signature-Input"])
def test_b21(self): def test_b21(self):
url = 'https://example.com/foo?param=Value&Pet=dog' url = "https://example.com/foo?param=Value&Pet=dog"
self.session.post( self.session.post(
url, url,
json={"hello": "world"}, json={"hello": "world"},
headers={ headers={
"Date": "Tue, 20 Apr 2021 02:07:55 GMT", "Date": "Tue, 20 Apr 2021 02:07:55 GMT",
"Content-Digest": ("sha-512=:WZDPaVn/7XgHaAy8pmojAkGWoRx2UFChF41A2svX+TaPm+" "Content-Digest": (
"AbwAgBWnrIiYllu7BNNyealdVLvRwEmTHWXvJwew==:") "sha-512=:WZDPaVn/7XgHaAy8pmojAkGWoRx2UFChF41A2svX+TaPm+"
"AbwAgBWnrIiYllu7BNNyealdVLvRwEmTHWXvJwew==:"
),
}, },
auth=self.auth auth=self.auth,
) )
@unittest.skip("TODO") @unittest.skip("TODO")
def test_rsa(self): def test_rsa(self):
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
private_key = rsa.generate_private_key( from cryptography.hazmat.primitives.asymmetric import rsa
public_exponent=65537,
key_size=2048, private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())
backend=default_backend()
)
private_key_pem = private_key.private_bytes( private_key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM, encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8, format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(passphrase) encryption_algorithm=serialization.BestAvailableEncryption(passphrase),
) )
public_key_pem = private_key.public_key().public_bytes( public_key_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM, encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo
format=serialization.PublicFormat.SubjectPublicKeyInfo
) )
url = 'http://example.com/path?query#fragment' url = "http://example.com/path?query#fragment"
auth = HTTPSignatureAuth(algorithm="rsa-sha256", key=private_key_pem, key_id="sekret", passphrase=passphrase) auth = HTTPSignatureAuth(algorithm="rsa-sha256", key=private_key_pem, key_id="sekret", passphrase=passphrase)
self.session.get(url, auth=auth, headers=dict(pubkey=base64.b64encode(public_key_pem))) self.session.get(url, auth=auth, headers=dict(pubkey=base64.b64encode(public_key_pem)))
if __name__ == '__main__': if __name__ == "__main__":
unittest.main() unittest.main()