Skip to content

Commit

Permalink
[#566] Switch to pyOpenSSL for validating certificate chain
Browse files Browse the repository at this point in the history
  • Loading branch information
nabla-c0d3 committed Nov 6, 2022
1 parent 8ad73ec commit 11f135d
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 125 deletions.
3 changes: 3 additions & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ black==22.3.0
pytest-cov
faker

# For mypy
types-pyOpenSSL

# For building the Windows executable
cx-freeze; sys.platform == 'win32'
7 changes: 4 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,11 @@ def get_include_files() -> List[Tuple[str, str]]:
entry_points={"console_scripts": ["sslyze = sslyze.__main__:main"]},
# Dependencies
install_requires=[
"nassl>=4.0.1,<5.0.0",
"cryptography>=2.6,<39.0.0",
"tls-parser>=2.0.0,<3.0.0",
"nassl>=4.0.1,<5",
"cryptography>=2.6,<39",
"tls-parser>=2,<3",
"pydantic>=1.7,<1.11",
"pyOpenSSL>=20,<23",
],
# cx_freeze info for Windows builds with Python embedded
options={"build_exe": {"packages": ["cffi", "cryptography"], "include_files": get_include_files()}},
Expand Down
2 changes: 1 addition & 1 deletion sslyze/mozilla_tls_profile/mozilla_config_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def _check_certificates(
else:
deployed_key_algorithms.add(public_key.__class__.__name__)

deployed_signature_algorithms.add(leaf_cert.signature_algorithm_oid._name)
deployed_signature_algorithms.add(leaf_cert.signature_algorithm_oid._name) # type: ignore

# Validate the cert's lifespan
leaf_cert_lifespan = leaf_cert.not_valid_after - leaf_cert.not_valid_before
Expand Down
83 changes: 3 additions & 80 deletions sslyze/plugins/certificate_info/_cert_chain_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,19 @@
from dataclasses import dataclass
from pathlib import Path

from ssl import CertificateError, match_hostname
from typing import Optional, List, cast, Dict
from typing import Optional, List, cast

import cryptography
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.x509 import ExtensionNotFound, ExtensionOID, Certificate, load_pem_x509_certificate, TLSFeature
from cryptography.x509.ocsp import load_der_ocsp_response, OCSPResponseStatus, OCSPResponse
from nassl._nassl import X509
from nassl.cert_chain_verifier import CertificateChainVerifier, CertificateChainVerificationFailed
import nassl.ocsp_response

from sslyze.plugins.certificate_info._certificate_utils import extract_dns_subject_alternative_names, get_common_names
from sslyze.plugins.certificate_info._symantec import SymantecDistructTester
from sslyze.plugins.certificate_info.trust_stores.trust_store import TrustStore


@dataclass(frozen=True)
class PathValidationResult:
"""The result of trying to validate a server's certificate chain using a specific trust store.
Attributes:
trust_stores: The trust store used for validation.
verified_certificate_chain: The verified certificate chain returned by OpenSSL.
Index 0 is the leaf certificate and the last element is the anchor/CA certificate from the trust store.
Will be None if the validation failed or the verified chain could not be built.
Each certificate is parsed using the cryptography module; documentation is available at
https://cryptography.io/en/latest/x509/reference/#x-509-certificate-object.
openssl_error_string: The result string returned by OpenSSL's validation function; None if validation was
successful.
was_validation_successful: Whether the certificate chain is trusted when using supplied the trust_stores.
"""

trust_store: TrustStore
verified_certificate_chain: Optional[List[Certificate]]
openssl_error_string: Optional[str]

@property
def was_validation_successful(self) -> bool:
return True if self.verified_certificate_chain else False
from sslyze.plugins.certificate_info.trust_stores.trust_store import TrustStore, PathValidationResult


@dataclass(frozen=True)
Expand Down Expand Up @@ -221,7 +193,7 @@ def perform(self) -> CertificateDeploymentAnalysisResult:
# Try to generate the verified certificate chain using each trust store
all_path_validation_results = []
for trust_store in self.trust_stores_for_validation:
path_validation_result = _verify_certificate_chain(self.server_certificate_chain_as_pem, trust_store)
path_validation_result = trust_store.verify_certificate_chain(self.server_certificate_chain_as_pem)
all_path_validation_results.append(path_validation_result)

# Keep one trust store that was able to build the verified chain to then run additional checks
Expand Down Expand Up @@ -318,52 +290,3 @@ def _certificate_matches_hostname(certificate: Certificate, server_hostname: str
return True
except CertificateError:
return False


# TODO(AD): There is probably a memory leak in nassl.X509 or nassl.X509_STORE_CTX
# https://github.com/nabla-c0d3/sslyze/issues/560
# It might be due to bad reference counting in nassl_X509_STORE_CTX_set0_trusted_stack()
# More specifically the call to X509_chain_up_ref() - is there corresponding call to decrease ref count?
# As a workaround, we cache the (huge) list of trusted certificates, for each trust store
_cache_for_trusted_certificates_per_file: Dict[Path, List[X509]] = {}


def _convert_and_cache_pem_certs_to_x509s(trusted_certificates_path: Path) -> List[X509]:
certs_as_509s = _cache_for_trusted_certificates_per_file.get(trusted_certificates_path)
if certs_as_509s:
return certs_as_509s

# Parse the PEM certificate in the file
all_certs_as_pem: List[str] = []
with trusted_certificates_path.open() as file_content:
for pem_segment in file_content.read().split("-----BEGIN CERTIFICATE-----")[1::]:
pem_content = pem_segment.split("-----END CERTIFICATE-----")[0]
pem_cert = f"-----BEGIN CERTIFICATE-----{pem_content}-----END CERTIFICATE-----"
all_certs_as_pem.append(pem_cert)

# Convert them to X509 objects and save that in the cache
all_certs_as_509s = [X509(cert_pem) for cert_pem in all_certs_as_pem]
_cache_for_trusted_certificates_per_file[trusted_certificates_path] = all_certs_as_509s
return all_certs_as_509s


def _verify_certificate_chain(server_certificate_chain: List[str], trust_store: TrustStore) -> PathValidationResult:
server_chain_as_x509s = [X509(pem_cert) for pem_cert in server_certificate_chain]
trust_store_as_x509s = _convert_and_cache_pem_certs_to_x509s(trust_store.path)
chain_verifier = CertificateChainVerifier(trust_store_as_x509s)

verified_chain: Optional[List[Certificate]]
try:
openssl_verify_str = None
verified_chain_as_509s = chain_verifier.verify(server_chain_as_x509s)
verified_chain = [
load_pem_x509_certificate(x509_cert.as_pem().encode("ascii"), backend=default_backend())
for x509_cert in verified_chain_as_509s
]
except CertificateChainVerificationFailed as e:
verified_chain = None
openssl_verify_str = e.openssl_error_string

return PathValidationResult(
trust_store=trust_store, verified_certificate_chain=verified_chain, openssl_error_string=openssl_verify_str
)
3 changes: 1 addition & 2 deletions sslyze/plugins/certificate_info/_certificate_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

from cryptography import x509
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from cryptography.x509 import ExtensionOID, DNSName, ExtensionNotFound, NameOID
from cryptography.x509.extensions import DuplicateExtension # type: ignore
from cryptography.x509 import ExtensionOID, DNSName, ExtensionNotFound, NameOID, DuplicateExtension


def extract_dns_subject_alternative_names(certificate: x509.Certificate) -> List[str]:
Expand Down
18 changes: 9 additions & 9 deletions sslyze/plugins/certificate_info/json_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@
from typing import Any, List, Optional

import pydantic
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.x509 import NameAttribute
from cryptography.x509.ocsp import OCSPResponseStatus
from cryptography.x509.oid import ObjectIdentifier # type: ignore
from cryptography.x509 import NameAttribute, ObjectIdentifier, Name, Certificate, ocsp
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey

from sslyze import (
Expand Down Expand Up @@ -78,7 +75,10 @@ class _ObjectIdentifierAsJson(_BaseModelWithOrmMode):

@classmethod
def from_orm(cls, oid: ObjectIdentifier) -> "_ObjectIdentifierAsJson":
return cls(name=oid._name, dotted_string=oid.dotted_string)
return cls(
name=oid._name, # type: ignore
dotted_string=oid.dotted_string
)


class _NameAttributeAsJson(_BaseModelWithOrmMode):
Expand All @@ -100,7 +100,7 @@ class _X509NameAsJson(_BaseModelWithOrmMode):
attributes: List[_NameAttributeAsJson]

@classmethod
def from_orm(cls, name: x509.name.Name) -> "_X509NameAsJson":
def from_orm(cls, name: Name) -> "_X509NameAsJson":
return cls(
rfc4514_string=name.rfc4514_string(), attributes=[_NameAttributeAsJson.from_orm(attr) for attr in name]
)
Expand Down Expand Up @@ -143,7 +143,7 @@ class _CertificateAsJson(_BaseModelWithOrmMode):
public_key: _PublicKeyAsJson

@classmethod
def from_orm(cls, certificate: x509.Certificate) -> "_CertificateAsJson":
def from_orm(cls, certificate: Certificate) -> "_CertificateAsJson":
signature_hash_algorithm: Optional[_HashAlgorithmAsJson]
if certificate.signature_hash_algorithm:
signature_hash_algorithm = _HashAlgorithmAsJson.from_orm(certificate.signature_hash_algorithm)
Expand Down Expand Up @@ -194,9 +194,9 @@ class _OcspResponseAsJson(_BaseModelWithOrmMode):
serial_number: Optional[int]

@classmethod
def from_orm(cls, ocsp_response: x509.ocsp.OCSPResponse) -> "_OcspResponseAsJson":
def from_orm(cls, ocsp_response: ocsp.OCSPResponse) -> "_OcspResponseAsJson":
response_status = ocsp_response.response_status.name
if ocsp_response.response_status != OCSPResponseStatus.SUCCESSFUL:
if ocsp_response.response_status != ocsp.OCSPResponseStatus.SUCCESSFUL:
return cls(
response_status=response_status,
certificate_status=None,
Expand Down
69 changes: 61 additions & 8 deletions sslyze/plugins/certificate_info/trust_stores/trust_store.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,40 @@
from dataclasses import dataclass
from pathlib import Path

from cryptography.x509.base import Certificate
from cryptography.x509.extensions import ExtensionNotFound, CertificatePolicies
from cryptography.x509.oid import ObjectIdentifier
from cryptography.x509.oid import ExtensionOID
from OpenSSL import crypto
from cryptography.x509 import Certificate
from cryptography.x509 import ExtensionNotFound, CertificatePolicies
from cryptography.x509 import ObjectIdentifier
from cryptography.x509 import ExtensionOID
from typing import List, cast
from typing import Optional


@dataclass(frozen=True)
class PathValidationResult:
"""The result of trying to validate a server's certificate chain using a specific trust store.
Attributes:
trust_store: The trust store used for validation.
verified_certificate_chain: The verified certificate chain returned by OpenSSL.
Index 0 is the leaf certificate and the last element is the anchor/CA certificate from the trust store.
Will be None if the validation failed or the verified chain could not be built.
Each certificate is parsed using the cryptography module; documentation is available at
https://cryptography.io/en/latest/x509/reference/#x-509-certificate-object.
openssl_error_string: The result string returned by OpenSSL's validation function; None if validation was
successful.
was_validation_successful: Whether the certificate chain is trusted when using supplied the trust_stores.
"""

trust_store: "TrustStore"
verified_certificate_chain: Optional[List[Certificate]]
openssl_error_string: Optional[str]

@property
def was_validation_successful(self) -> bool:
return True if self.verified_certificate_chain else False


class TrustStore:
"""A set of root certificates to be used for certificate validation.
Expand All @@ -19,10 +44,14 @@ class TrustStore:
version: The human-readable version or date of the trust store (such as "09/2016").
"""

path: Path
name: str
version: str
ev_oids: Optional[List[ObjectIdentifier]] = None
def __init__(self, path: Path, name: str, version: str, ev_oids: Optional[List[ObjectIdentifier]] = None) -> None:
self.path = path
self.name = name
self.version = version
self.ev_oids = ev_oids

self._x509_store = crypto.X509Store()
self._x509_store.load_locations(cafile=self.path)

def is_certificate_extended_validation(self, certificate: Certificate) -> bool:
"""Is the supplied server certificate EV?"""
Expand All @@ -39,3 +68,27 @@ def is_certificate_extended_validation(self, certificate: Certificate) -> bool:
if policy.policy_identifier in self.ev_oids:
return True
return False

def verify_certificate_chain(self, certificate_chain_as_pem: List[str]) -> PathValidationResult:
certificate = crypto.load_certificate(
buffer=certificate_chain_as_pem[0].encode("ascii"), type=crypto.FILETYPE_PEM
)
chain = [
crypto.load_certificate(buffer=cert.encode("ascii"), type=crypto.FILETYPE_PEM)
for cert in certificate_chain_as_pem[1::]
]
x509_store_ctx = crypto.X509StoreContext(store=self._x509_store, certificate=certificate, chain=chain)

verified_chain: Optional[List[Certificate]]
error_message: Optional[str]
try:
verified_chain_as_x509s = x509_store_ctx.get_verified_chain()
verified_chain = [x509.to_cryptography() for x509 in verified_chain_as_x509s]
error_message = None
except crypto.X509StoreContextError as exc:
verified_chain = None
error_message = exc.args[0]

return PathValidationResult(
trust_store=self, verified_certificate_chain=verified_chain, openssl_error_string=error_message
)
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import sys
from os.path import realpath

from cryptography.hazmat._oid import ObjectIdentifier
from cryptography.x509 import ObjectIdentifier

from sslyze.plugins.certificate_info.trust_stores.trust_store import TrustStore
from typing import List
Expand Down
21 changes: 0 additions & 21 deletions tests/plugins_tests/certificate_info/test_cert_chain_analyzer.py

This file was deleted.

Loading

0 comments on commit 11f135d

Please sign in to comment.