Skip to content

Commit

Permalink
PEM decryption
Browse files Browse the repository at this point in the history
  • Loading branch information
alex committed Jan 23, 2025
1 parent d9bf81e commit 3250d10
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 119 deletions.
1 change: 1 addition & 0 deletions src/rust/cryptography-crypto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
// 2.0, and the BSD License. See the LICENSE file in the root of this repository
// for complete details.

pub mod pbkdf1;
pub mod pkcs12;
109 changes: 74 additions & 35 deletions src/rust/src/backend/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,50 +64,89 @@ fn load_pem_private_key<'p>(
backend: Option<pyo3::Bound<'_, pyo3::PyAny>>,
unsafe_skip_rsa_key_validation: bool,
) -> CryptographyResult<pyo3::Bound<'p, pyo3::PyAny>> {
let _ = backend;

let p = x509::find_in_pem(
data.as_bytes(),
|p| ["PRIVATE KEY", "ENCRYPTED PRIVATE KEY", "RSA PRIVATE KEY", "EC PRIVATE KEY", "DSA PRIVATE KEY"].contains(&p.tag()),
"Valid PEM but no BEGIN/END delimiters for a private key found. Are you sure this is a private key?"
)?;
// TODO: if proc-type is present, decrypt PEM layer.
if p.headers().get("Proc-Type").is_none() {
let pkey = match p.tag() {
"PRIVATE KEY" => cryptography_key_parsing::pkcs8::parse_private_key(p.contents())?,
"ENCRYPTED PRIVATE KEY" => {
cryptography_key_parsing::pkcs8::parse_encrypted_private_key(
p.contents(),
password.as_ref().map(|v| v.as_bytes()),
)?
}
"RSA PRIVATE KEY" => {
cryptography_key_parsing::rsa::parse_pkcs1_private_key(p.contents())?
}
"EC PRIVATE KEY" => {
cryptography_key_parsing::ec::parse_pkcs1_private_key(p.contents(), None)?
}
"DSA PRIVATE KEY" => {
cryptography_key_parsing::dsa::parse_pkcs1_private_key(p.contents())?
}
_ => unreachable!(),
};
if password.is_some() && p.tag() != "ENCRYPTED PRIVATE KEY" {
let password = password.as_ref().map(|v| v.as_bytes());
let mut password_used = false;
// TODO: Surely we can avoid this clone?
let tag = p.tag().to_string();
let data = match p.headers().get("Proc-Type") {
Some("4,ENCRYPTED") => {
password_used = true;
let Some(dek_info) = p.headers().get("DEK-Info") else {
todo!()
};
let Some((cipher_algorithm, iv)) = dek_info.split_once(',') else {
todo!()
};

let password = match password {
None | Some(b"") => {
return Err(CryptographyError::from(
pyo3::exceptions::PyTypeError::new_err(
"Password was not given but private key is encrypted",
),
))
}
Some(p) => p,
};

let cipher = match cipher_algorithm {
"AES-128-CBC" => openssl::symm::Cipher::aes_128_cbc(),
"AES-256-CBC" => openssl::symm::Cipher::aes_256_cbc(),
"DES-EDE3-CBC" => openssl::symm::Cipher::des_ede3_cbc(),
_ => {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"Key encrypted with unknown cipher.",
),
))
}
};
let iv = utils::hex_decode(iv)?;
let key = cryptography_crypto::pbkdf1::openssl_kdf(
openssl::hash::MessageDigest::md5(),
password,
&iv,
cipher.key_len(),
)?;
openssl::symm::decrypt(cipher, &key, Some(&iv), p.contents()).map_err(|_| {
pyo3::exceptions::PyValueError::new_err("Incorrect password, could not decrypt key")
})?
}
Some(_) => {
return Err(CryptographyError::from(
pyo3::exceptions::PyTypeError::new_err(
"Password was given but private key is not encrypted.",
pyo3::exceptions::PyValueError::new_err(
"Proc-Type PEM header is not valid, key could not be decrypted.",
),
));
))
}
return private_key_from_pkey(py, &pkey, unsafe_skip_rsa_key_validation);
}
None => p.into_contents(),
};

let _ = backend;
let password = password.as_ref().map(CffiBuf::as_bytes);
let mut status = utils::PasswordCallbackStatus::Unused;
let pkey = openssl::pkey::PKey::private_key_from_pem_callback(
data.as_bytes(),
utils::password_callback(&mut status, password),
);
let pkey = utils::handle_key_load_result(py, pkey, status, password)?;
let pkey = match tag.as_str() {
"PRIVATE KEY" => cryptography_key_parsing::pkcs8::parse_private_key(&data)?,
"ENCRYPTED PRIVATE KEY" => {
password_used = true;
cryptography_key_parsing::pkcs8::parse_encrypted_private_key(&data, password)?
}
"RSA PRIVATE KEY" => cryptography_key_parsing::rsa::parse_pkcs1_private_key(&data)?,
"EC PRIVATE KEY" => cryptography_key_parsing::ec::parse_pkcs1_private_key(&data, None)?,
"DSA PRIVATE KEY" => cryptography_key_parsing::dsa::parse_pkcs1_private_key(&data)?,
_ => unreachable!(),
};
if password.is_some() && !password_used {
return Err(CryptographyError::from(
pyo3::exceptions::PyTypeError::new_err(
"Password was given but private key is not encrypted.",
),
));
}
private_key_from_pkey(py, &pkey, unsafe_skip_rsa_key_validation)
}

Expand Down
90 changes: 32 additions & 58 deletions src/rust/src/backend/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::backend::hashes::Hash;
use crate::error::{CryptographyError, CryptographyResult};
use crate::{error, types};
use crate::types;
use pyo3::types::{PyAnyMethods, PyBytesMethods};

pub(crate) fn py_int_to_bn(
Expand Down Expand Up @@ -403,66 +403,40 @@ pub(crate) fn calculate_digest_and_algorithm<'p>(
Ok((data, algorithm))
}

pub(crate) enum PasswordCallbackStatus {
Unused,
Used,
BufferTooSmall(usize),
}

pub(crate) fn password_callback<'a>(
status: &'a mut PasswordCallbackStatus,
password: Option<&'a [u8]>,
) -> impl FnOnce(&mut [u8]) -> Result<usize, openssl::error::ErrorStack> + 'a {
move |buf| {
*status = PasswordCallbackStatus::Used;
match password.as_ref() {
Some(p) if p.len() <= buf.len() => {
buf[..p.len()].copy_from_slice(p);
Ok(p.len())
}
Some(_) => {
*status = PasswordCallbackStatus::BufferTooSmall(buf.len());
Ok(0)
}
None => Ok(0),
}
pub(crate) fn hex_decode(v: &str) -> CryptographyResult<Vec<u8>> {
if v.len() % 2 != 0 {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err("Invalid hex value - odd length"),
));
}
}

pub(crate) fn handle_key_load_result<T>(
py: pyo3::Python<'_>,
pkey: Result<openssl::pkey::PKey<T>, openssl::error::ErrorStack>,
status: PasswordCallbackStatus,
password: Option<&[u8]>,
) -> CryptographyResult<openssl::pkey::PKey<T>> {
match (pkey, status, password) {
(Ok(k), PasswordCallbackStatus::Unused, None)
| (Ok(k), PasswordCallbackStatus::Used, Some(_)) => Ok(k),

(Ok(_), PasswordCallbackStatus::Unused, Some(_)) => Err(CryptographyError::from(
pyo3::exceptions::PyTypeError::new_err(
"Password was given but private key is not encrypted.",
),
)),
let mut b = Vec::with_capacity(v.len() / 2);
let v = v.as_bytes();
for i in (0..v.len()).step_by(2) {
let high = match v[i] {
b @ b'0'..=b'9' => b - b'0',
b @ b'a'..=b'f' => b - b'a' + 10,
b @ b'A'..=b'F' => b - b'A' + 10,
_ => {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err("Invalid hex value"),
))
}
};

(_, PasswordCallbackStatus::Used, None | Some(b"")) => Err(CryptographyError::from(
pyo3::exceptions::PyTypeError::new_err(
"Password was not given but private key is encrypted",
),
)),
(_, PasswordCallbackStatus::BufferTooSmall(size), _) => Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(format!(
"Passwords longer than {size} bytes are not supported"
)),
)),
(Err(e), _, _) => {
let errors = error::list_from_openssl_error(py, &e);
Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err((
"Could not deserialize key data. The data may be in an incorrect format, the provided password may be incorrect, it may be encrypted with an unsupported algorithm, or it may be an unsupported key type (e.g. EC curves with explicit parameters).",
errors.unbind(),
let low = match v[i + 1] {
b @ b'0'..=b'9' => b - b'0',
b @ b'a'..=b'f' => b - b'a' + 10,
b @ b'A'..=b'F' => b - b'A' + 10,
_ => {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err("Invalid hex value"),
))
))
}
}
};

b.push((high << 4) | low);
}

Ok(b)
}
27 changes: 1 addition & 26 deletions tests/hazmat/backends/test_openssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@


import itertools
import os

import pytest

Expand All @@ -22,10 +21,7 @@
DummyMode,
)
from ...hazmat.primitives.test_rsa import rsa_key_2048
from ...utils import (
load_vectors_from_file,
raises_unsupported_algorithm,
)
from ...utils import raises_unsupported_algorithm

# Make ruff happy since we're importing fixtures that pytest patches in as
# func args
Expand Down Expand Up @@ -200,27 +196,6 @@ def test_unsupported_mgf1_hash_algorithm_md5_decrypt(self, rsa_key_2048):
)


class TestOpenSSLSerializationWithOpenSSL:
def test_very_long_pem_serialization_password(self):
password = b"x" * 1025

with pytest.raises(ValueError, match="Passwords longer than"):
load_vectors_from_file(
os.path.join(
"asymmetric",
"Traditional_OpenSSL_Serialization",
"key1.pem",
),
lambda pemfile: (
serialization.load_pem_private_key(
pemfile.read().encode(),
password,
unsafe_skip_rsa_key_validation=False,
)
),
)


class TestRSAPEMSerialization:
def test_password_length_limit(self, rsa_key_2048):
password = b"x" * 1024
Expand Down

0 comments on commit 3250d10

Please sign in to comment.