From 7f3af6a8ede45dcabc24af0903b41e582c5a5ca6 Mon Sep 17 00:00:00 2001 From: dlee Date: Fri, 3 Feb 2023 16:30:07 -0600 Subject: [PATCH 1/5] typechecks made more strict using more flags --- mypy.ini | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/mypy.ini b/mypy.ini index 01cbb57a8..c6d8affe3 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,3 +1,9 @@ [mypy] ignore_missing_imports = True plugins=mypy_zope:plugin +show_column_numbers = True +pretty = True +show_error_codes = True +warn_unused_configs =True +warn_redundant_casts = True +strict_equality = True \ No newline at end of file From e2e33933a88db59e98e749ab59d7115f3e169aa6 Mon Sep 17 00:00:00 2001 From: dlee Date: Fri, 3 Feb 2023 16:48:06 -0600 Subject: [PATCH 2/5] Forgot to push newsfragment --- newsfragments/3971.minor | 1 + src/allmydata/crypto/rsa.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 newsfragments/3971.minor diff --git a/newsfragments/3971.minor b/newsfragments/3971.minor new file mode 100644 index 000000000..a6cbb6a89 --- /dev/null +++ b/newsfragments/3971.minor @@ -0,0 +1 @@ +Changes made to mypy.ini to make mypy more 'strict' and prevent future regressions. \ No newline at end of file diff --git a/src/allmydata/crypto/rsa.py b/src/allmydata/crypto/rsa.py index e579a3d2a..dca908467 100644 --- a/src/allmydata/crypto/rsa.py +++ b/src/allmydata/crypto/rsa.py @@ -127,7 +127,7 @@ def der_string_from_signing_key(private_key: PrivateKey) -> bytes: :returns: bytes representing `private_key` """ _validate_private_key(private_key) - return private_key.private_bytes( # type: ignore[attr-defined] + return private_key.private_bytes( encoding=Encoding.DER, format=PrivateFormat.PKCS8, encryption_algorithm=NoEncryption(), From 80db4a9de4b3c95ec231d1a55dc7f48f96a014f5 Mon Sep 17 00:00:00 2001 From: Daniel Date: Fri, 3 Feb 2023 21:25:24 -0600 Subject: [PATCH 3/5] Delete rsa.py --- src/allmydata/crypto/rsa.py | 225 ------------------------------------ 1 file changed, 225 deletions(-) delete mode 100644 src/allmydata/crypto/rsa.py diff --git a/src/allmydata/crypto/rsa.py b/src/allmydata/crypto/rsa.py deleted file mode 100644 index dca908467..000000000 --- a/src/allmydata/crypto/rsa.py +++ /dev/null @@ -1,225 +0,0 @@ -""" -Helper functions for cryptography-related operations inside Tahoe -using RSA public-key encryption and decryption. - -In cases where these functions happen to use and return objects that -are documented in the `cryptography` library, code outside this module -should only use functions from allmydata.crypto.rsa and not rely on -features of any objects that `cryptography` documents. - -That is, the public and private keys are opaque objects; DO NOT depend -on any of their methods. -""" - -from __future__ import annotations - -from typing_extensions import TypeAlias -from typing import Callable - -from functools import partial - -from cryptography.exceptions import InvalidSignature -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.asymmetric import rsa, padding -from cryptography.hazmat.primitives.serialization import load_der_private_key, load_der_public_key, \ - Encoding, PrivateFormat, PublicFormat, NoEncryption - -from allmydata.crypto.error import BadSignature - -PublicKey: TypeAlias = rsa.RSAPublicKey -PrivateKey: TypeAlias = rsa.RSAPrivateKey - -# This is the value that was used by `pycryptopp`, and we must continue to use it for -# both backwards compatibility and interoperability. -# -# The docs for `cryptography` suggest to use the constant defined at -# `cryptography.hazmat.primitives.asymmetric.padding.PSS.MAX_LENGTH`, but this causes old -# signatures to fail to validate. -RSA_PSS_SALT_LENGTH = 32 - -RSA_PADDING = padding.PSS( - mgf=padding.MGF1(hashes.SHA256()), - salt_length=RSA_PSS_SALT_LENGTH, -) - - - -def create_signing_keypair(key_size: int) -> tuple[PrivateKey, PublicKey]: - """ - Create a new RSA signing (private) keypair from scratch. Can be used with - `sign_data` function. - - :param key_size: length of key in bits - - :returns: 2-tuple of (private_key, public_key) - """ - priv_key = rsa.generate_private_key( - public_exponent=65537, - key_size=key_size, - backend=default_backend() - ) - return priv_key, priv_key.public_key() - - -def create_signing_keypair_from_string(private_key_der: bytes) -> tuple[PrivateKey, PublicKey]: - """ - Create an RSA signing (private) key from previously serialized - private key bytes. - - :param private_key_der: blob as returned from `der_string_from_signing_keypair` - - :returns: 2-tuple of (private_key, public_key) - """ - _load = partial( - load_der_private_key, - private_key_der, - password=None, - backend=default_backend(), - ) - - def load_with_validation() -> PrivateKey: - k = _load() - assert isinstance(k, PrivateKey) - return k - - def load_without_validation() -> PrivateKey: - k = _load(unsafe_skip_rsa_key_validation=True) - assert isinstance(k, PrivateKey) - return k - - # Load it once without the potentially expensive OpenSSL validation - # checks. These have superlinear complexity. We *will* run them just - # below - but first we'll apply our own constant-time checks. - load: Callable[[], PrivateKey] = load_without_validation - try: - unsafe_priv_key = load() - except TypeError: - # cryptography<39 does not support this parameter, so just load the - # key with validation... - unsafe_priv_key = load_with_validation() - # But avoid *reloading* it since that will run the expensive - # validation *again*. - load = lambda: unsafe_priv_key - - if not isinstance(unsafe_priv_key, rsa.RSAPrivateKey): - raise ValueError( - "Private Key did not decode to an RSA key" - ) - if unsafe_priv_key.key_size != 2048: - raise ValueError( - "Private Key must be 2048 bits" - ) - - # Now re-load it with OpenSSL's validation applied. - safe_priv_key = load() - - return safe_priv_key, safe_priv_key.public_key() - - -def der_string_from_signing_key(private_key: PrivateKey) -> bytes: - """ - Serializes a given RSA private key to a DER string - - :param private_key: a private key object as returned from - `create_signing_keypair` or `create_signing_keypair_from_string` - - :returns: bytes representing `private_key` - """ - _validate_private_key(private_key) - return private_key.private_bytes( - encoding=Encoding.DER, - format=PrivateFormat.PKCS8, - encryption_algorithm=NoEncryption(), - ) - - -def der_string_from_verifying_key(public_key: PublicKey) -> bytes: - """ - Serializes a given RSA public key to a DER string. - - :param public_key: a public key object as returned from - `create_signing_keypair` or `create_signing_keypair_from_string` - - :returns: bytes representing `public_key` - """ - _validate_public_key(public_key) - return public_key.public_bytes( - encoding=Encoding.DER, - format=PublicFormat.SubjectPublicKeyInfo, - ) - - -def create_verifying_key_from_string(public_key_der: bytes) -> PublicKey: - """ - Create an RSA verifying key from a previously serialized public key - - :param bytes public_key_der: a blob as returned by `der_string_from_verifying_key` - - :returns: a public key object suitable for use with other - functions in this module - """ - pub_key = load_der_public_key( - public_key_der, - backend=default_backend(), - ) - assert isinstance(pub_key, PublicKey) - return pub_key - - -def sign_data(private_key: PrivateKey, data: bytes) -> bytes: - """ - :param private_key: the private part of a keypair returned from - `create_signing_keypair_from_string` or `create_signing_keypair` - - :param data: the bytes to sign - - :returns: bytes which are a signature of the bytes given as `data`. - """ - _validate_private_key(private_key) - return private_key.sign( - data, - RSA_PADDING, - hashes.SHA256(), - ) - -def verify_signature(public_key: PublicKey, alleged_signature: bytes, data: bytes) -> None: - """ - :param public_key: a verifying key, returned from `create_verifying_key_from_string` or `create_verifying_key_from_private_key` - - :param bytes alleged_signature: the bytes of the alleged signature - - :param bytes data: the data which was allegedly signed - """ - _validate_public_key(public_key) - try: - public_key.verify( - alleged_signature, - data, - RSA_PADDING, - hashes.SHA256(), - ) - except InvalidSignature: - raise BadSignature() - - -def _validate_public_key(public_key: PublicKey) -> None: - """ - Internal helper. Checks that `public_key` is a valid cryptography - object - """ - if not isinstance(public_key, rsa.RSAPublicKey): - raise ValueError( - f"public_key must be an RSAPublicKey not {type(public_key)}" - ) - - -def _validate_private_key(private_key: PrivateKey) -> None: - """ - Internal helper. Checks that `public_key` is a valid cryptography - object - """ - if not isinstance(private_key, rsa.RSAPrivateKey): - raise ValueError( - f"private_key must be an RSAPrivateKey not {type(private_key)}" - ) From 31c5b78e6ab9d17efffd647767fbf8224ced77f6 Mon Sep 17 00:00:00 2001 From: dlee Date: Fri, 3 Feb 2023 21:35:55 -0600 Subject: [PATCH 4/5] Add back rsa.py accidentally removed file on website --- src/allmydata/crypto/rsa.py | 225 ++++++++++++++++++++++++++++++++++++ 1 file changed, 225 insertions(+) create mode 100644 src/allmydata/crypto/rsa.py diff --git a/src/allmydata/crypto/rsa.py b/src/allmydata/crypto/rsa.py new file mode 100644 index 000000000..e579a3d2a --- /dev/null +++ b/src/allmydata/crypto/rsa.py @@ -0,0 +1,225 @@ +""" +Helper functions for cryptography-related operations inside Tahoe +using RSA public-key encryption and decryption. + +In cases where these functions happen to use and return objects that +are documented in the `cryptography` library, code outside this module +should only use functions from allmydata.crypto.rsa and not rely on +features of any objects that `cryptography` documents. + +That is, the public and private keys are opaque objects; DO NOT depend +on any of their methods. +""" + +from __future__ import annotations + +from typing_extensions import TypeAlias +from typing import Callable + +from functools import partial + +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import rsa, padding +from cryptography.hazmat.primitives.serialization import load_der_private_key, load_der_public_key, \ + Encoding, PrivateFormat, PublicFormat, NoEncryption + +from allmydata.crypto.error import BadSignature + +PublicKey: TypeAlias = rsa.RSAPublicKey +PrivateKey: TypeAlias = rsa.RSAPrivateKey + +# This is the value that was used by `pycryptopp`, and we must continue to use it for +# both backwards compatibility and interoperability. +# +# The docs for `cryptography` suggest to use the constant defined at +# `cryptography.hazmat.primitives.asymmetric.padding.PSS.MAX_LENGTH`, but this causes old +# signatures to fail to validate. +RSA_PSS_SALT_LENGTH = 32 + +RSA_PADDING = padding.PSS( + mgf=padding.MGF1(hashes.SHA256()), + salt_length=RSA_PSS_SALT_LENGTH, +) + + + +def create_signing_keypair(key_size: int) -> tuple[PrivateKey, PublicKey]: + """ + Create a new RSA signing (private) keypair from scratch. Can be used with + `sign_data` function. + + :param key_size: length of key in bits + + :returns: 2-tuple of (private_key, public_key) + """ + priv_key = rsa.generate_private_key( + public_exponent=65537, + key_size=key_size, + backend=default_backend() + ) + return priv_key, priv_key.public_key() + + +def create_signing_keypair_from_string(private_key_der: bytes) -> tuple[PrivateKey, PublicKey]: + """ + Create an RSA signing (private) key from previously serialized + private key bytes. + + :param private_key_der: blob as returned from `der_string_from_signing_keypair` + + :returns: 2-tuple of (private_key, public_key) + """ + _load = partial( + load_der_private_key, + private_key_der, + password=None, + backend=default_backend(), + ) + + def load_with_validation() -> PrivateKey: + k = _load() + assert isinstance(k, PrivateKey) + return k + + def load_without_validation() -> PrivateKey: + k = _load(unsafe_skip_rsa_key_validation=True) + assert isinstance(k, PrivateKey) + return k + + # Load it once without the potentially expensive OpenSSL validation + # checks. These have superlinear complexity. We *will* run them just + # below - but first we'll apply our own constant-time checks. + load: Callable[[], PrivateKey] = load_without_validation + try: + unsafe_priv_key = load() + except TypeError: + # cryptography<39 does not support this parameter, so just load the + # key with validation... + unsafe_priv_key = load_with_validation() + # But avoid *reloading* it since that will run the expensive + # validation *again*. + load = lambda: unsafe_priv_key + + if not isinstance(unsafe_priv_key, rsa.RSAPrivateKey): + raise ValueError( + "Private Key did not decode to an RSA key" + ) + if unsafe_priv_key.key_size != 2048: + raise ValueError( + "Private Key must be 2048 bits" + ) + + # Now re-load it with OpenSSL's validation applied. + safe_priv_key = load() + + return safe_priv_key, safe_priv_key.public_key() + + +def der_string_from_signing_key(private_key: PrivateKey) -> bytes: + """ + Serializes a given RSA private key to a DER string + + :param private_key: a private key object as returned from + `create_signing_keypair` or `create_signing_keypair_from_string` + + :returns: bytes representing `private_key` + """ + _validate_private_key(private_key) + return private_key.private_bytes( # type: ignore[attr-defined] + encoding=Encoding.DER, + format=PrivateFormat.PKCS8, + encryption_algorithm=NoEncryption(), + ) + + +def der_string_from_verifying_key(public_key: PublicKey) -> bytes: + """ + Serializes a given RSA public key to a DER string. + + :param public_key: a public key object as returned from + `create_signing_keypair` or `create_signing_keypair_from_string` + + :returns: bytes representing `public_key` + """ + _validate_public_key(public_key) + return public_key.public_bytes( + encoding=Encoding.DER, + format=PublicFormat.SubjectPublicKeyInfo, + ) + + +def create_verifying_key_from_string(public_key_der: bytes) -> PublicKey: + """ + Create an RSA verifying key from a previously serialized public key + + :param bytes public_key_der: a blob as returned by `der_string_from_verifying_key` + + :returns: a public key object suitable for use with other + functions in this module + """ + pub_key = load_der_public_key( + public_key_der, + backend=default_backend(), + ) + assert isinstance(pub_key, PublicKey) + return pub_key + + +def sign_data(private_key: PrivateKey, data: bytes) -> bytes: + """ + :param private_key: the private part of a keypair returned from + `create_signing_keypair_from_string` or `create_signing_keypair` + + :param data: the bytes to sign + + :returns: bytes which are a signature of the bytes given as `data`. + """ + _validate_private_key(private_key) + return private_key.sign( + data, + RSA_PADDING, + hashes.SHA256(), + ) + +def verify_signature(public_key: PublicKey, alleged_signature: bytes, data: bytes) -> None: + """ + :param public_key: a verifying key, returned from `create_verifying_key_from_string` or `create_verifying_key_from_private_key` + + :param bytes alleged_signature: the bytes of the alleged signature + + :param bytes data: the data which was allegedly signed + """ + _validate_public_key(public_key) + try: + public_key.verify( + alleged_signature, + data, + RSA_PADDING, + hashes.SHA256(), + ) + except InvalidSignature: + raise BadSignature() + + +def _validate_public_key(public_key: PublicKey) -> None: + """ + Internal helper. Checks that `public_key` is a valid cryptography + object + """ + if not isinstance(public_key, rsa.RSAPublicKey): + raise ValueError( + f"public_key must be an RSAPublicKey not {type(public_key)}" + ) + + +def _validate_private_key(private_key: PrivateKey) -> None: + """ + Internal helper. Checks that `public_key` is a valid cryptography + object + """ + if not isinstance(private_key, rsa.RSAPrivateKey): + raise ValueError( + f"private_key must be an RSAPrivateKey not {type(private_key)}" + ) From eb26c97ef70edeafe3f6777759dcca7d6a08c98c Mon Sep 17 00:00:00 2001 From: dlee Date: Mon, 6 Feb 2023 15:29:53 -0600 Subject: [PATCH 5/5] implicit_optional flag added and errors related to flag fixed --- mypy.ini | 1 + src/allmydata/web/common.py | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/mypy.ini b/mypy.ini index c6d8affe3..e6e7d16ff 100644 --- a/mypy.ini +++ b/mypy.ini @@ -5,5 +5,6 @@ show_column_numbers = True pretty = True show_error_codes = True warn_unused_configs =True +no_implicit_optional = True warn_redundant_casts = True strict_equality = True \ No newline at end of file diff --git a/src/allmydata/web/common.py b/src/allmydata/web/common.py index c49354217..3d85b1c4d 100644 --- a/src/allmydata/web/common.py +++ b/src/allmydata/web/common.py @@ -707,12 +707,12 @@ def url_for_string(req, url_string): T = TypeVar("T") @overload -def get_arg(req: IRequest, argname: str | bytes, default: T = None, *, multiple: Literal[False] = False) -> T | bytes: ... +def get_arg(req: IRequest, argname: str | bytes, default: Optional[T] = None, *, multiple: Literal[False] = False) -> T | bytes: ... @overload -def get_arg(req: IRequest, argname: str | bytes, default: T = None, *, multiple: Literal[True]) -> T | tuple[bytes, ...]: ... +def get_arg(req: IRequest, argname: str | bytes, default: Optional[T] = None, *, multiple: Literal[True]) -> T | tuple[bytes, ...]: ... -def get_arg(req: IRequest, argname: str | bytes, default: T = None, *, multiple: bool = False) -> None | T | bytes | tuple[bytes, ...]: +def get_arg(req: IRequest, argname: str | bytes, default: Optional[T] = None, *, multiple: bool = False) -> None | T | bytes | tuple[bytes, ...]: """Extract an argument from either the query args (req.args) or the form body fields (req.fields). If multiple=False, this returns a single value (or the default, which defaults to None), and the query args take