From 9a8d2f1130a835bee1d407dd4ec6f3fc0c3a772d Mon Sep 17 00:00:00 2001 From: etorreborre Date: Tue, 6 Feb 2024 15:34:03 +0100 Subject: [PATCH] refactor(rust): extract a credential refresher from the credential retriever --- Cargo.lock | 1 + examples/rust/get_started/Cargo.toml | 1 + .../06-credentials-exchange-client.rs | 2 +- .../06-credentials-exchange-server.rs | 2 +- ...bute-based-authentication-control-plane.rs | 26 +- ...tribute-based-authentication-edge-plane.rs | 25 +- .../ockam/ockam_api/src/cli_state/trust.rs | 19 +- .../ockam_api/src/cloud/secure_clients.rs | 20 +- .../rust/ockam/ockam_api/src/lib.rs | 1 + .../rust/ockam/ockam_api/src/nodes/service.rs | 56 +-- .../src/nodes/service/kafka_services.rs | 9 +- .../src/nodes/service/secure_channel.rs | 16 +- .../rust/ockam/ockam_api/src/util.rs | 7 +- .../ockam/ockam_api/tests/common/common.rs | 2 +- .../src/credentials/credentials.rs | 9 +- .../credentials/retriever/cache_retriever.rs | 135 ------- .../retriever/cached_credential_retriever.rs | 94 +++++ .../retriever/credential_retriever.rs | 25 +- .../credentials/retriever/memory_retriever.rs | 40 +- .../src/credentials/retriever/mod.rs | 5 +- .../credential_retriever_options.rs | 71 ++++ .../retriever/remote_retriever/info.rs | 27 -- .../retriever/remote_retriever/mod.rs | 14 +- .../remote_credential_refresher.rs | 352 +++++++++++++++++ .../remote_credential_retriever.rs | 268 +++++++++++++ .../remote_retriever/remote_retriever.rs | 370 ------------------ .../remote_retriever_creator.rs | 120 ------ .../remote_retriever_trait_impl.rs | 63 --- .../rust/ockam/ockam_identity/src/error.rs | 6 +- .../src/secure_channel/encryptor_worker.rs | 51 +-- .../handshake/handshake_state_machine.rs | 4 +- .../handshake/handshake_worker.rs | 74 +++- .../src/secure_channel/listener.rs | 13 +- .../src/secure_channel/options.rs | 55 +-- .../src/secure_channels/secure_channels.rs | 11 +- .../src/secure_channels/secure_client.rs | 22 +- .../ockam/ockam_identity/tests/channel.rs | 4 +- .../ockam/ockam_identity/tests/credentials.rs | 8 +- .../tests/credentials_refresh.rs | 60 +-- .../ockam_node/src/context/transports.rs | 18 +- 40 files changed, 1060 insertions(+), 1046 deletions(-) delete mode 100644 implementations/rust/ockam/ockam_identity/src/credentials/retriever/cache_retriever.rs create mode 100644 implementations/rust/ockam/ockam_identity/src/credentials/retriever/cached_credential_retriever.rs create mode 100644 implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/credential_retriever_options.rs delete mode 100644 implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/info.rs create mode 100644 implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_credential_refresher.rs create mode 100644 implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_credential_retriever.rs delete mode 100644 implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever.rs delete mode 100644 implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever_creator.rs delete mode 100644 implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever_trait_impl.rs diff --git a/Cargo.lock b/Cargo.lock index 73e6aa0ca8d..ca3c29023f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3077,6 +3077,7 @@ dependencies = [ "ockam_core", "ockam_multiaddr", "ockam_node", + "ockam_transport_core", "ockam_transport_tcp", "ockam_transport_udp", "ockam_transport_uds", diff --git a/examples/rust/get_started/Cargo.toml b/examples/rust/get_started/Cargo.toml index fac641e8d9e..5d38165be04 100644 --- a/examples/rust/get_started/Cargo.toml +++ b/examples/rust/get_started/Cargo.toml @@ -38,6 +38,7 @@ ockam_api = { path = "../../../implementations/rust/ockam/ockam_api" } ockam_core = { path = "../../../implementations/rust/ockam/ockam_core" } ockam_multiaddr = { path = "../../../implementations/rust/ockam/ockam_multiaddr" } ockam_node = { path = "../../../implementations/rust/ockam/ockam_node" } +ockam_transport_core = { path = "../../../implementations/rust/ockam/ockam_transport_core" } ockam_transport_tcp = { path = "../../../implementations/rust/ockam/ockam_transport_tcp" } ockam_transport_udp = { path = "../../../implementations/rust/ockam/ockam_transport_udp" } ockam_transport_uds = { path = "../../../implementations/rust/ockam/ockam_transport_uds" } diff --git a/examples/rust/get_started/examples/06-credentials-exchange-client.rs b/examples/rust/get_started/examples/06-credentials-exchange-client.rs index 30a8c8ce299..f434c460f67 100644 --- a/examples/rust/get_started/examples/06-credentials-exchange-client.rs +++ b/examples/rust/get_started/examples/06-credentials-exchange-client.rs @@ -73,7 +73,7 @@ async fn main(ctx: Context) -> Result<()> { route![server_connection, DefaultAddress::SECURE_CHANNEL_LISTENER], SecureChannelOptions::new() .with_authority(issuer.clone()) - .with_credential(credential)?, + .with_credential(credential), ) .await?; diff --git a/examples/rust/get_started/examples/06-credentials-exchange-server.rs b/examples/rust/get_started/examples/06-credentials-exchange-server.rs index 012eee89bed..7d1ec1b35a2 100644 --- a/examples/rust/get_started/examples/06-credentials-exchange-server.rs +++ b/examples/rust/get_started/examples/06-credentials-exchange-server.rs @@ -77,7 +77,7 @@ async fn main(ctx: Context) -> Result<()> { let tcp_listener_options = TcpListenerOptions::new(); let sc_listener_options = SecureChannelListenerOptions::new() .with_authority(issuer.clone()) - .with_credential(credential)? + .with_credential(credential) .as_consumer(&tcp_listener_options.spawner_flow_control_id()); node.flow_controls().add_consumer( diff --git a/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs b/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs index d195786233e..f8222f13f5e 100644 --- a/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs +++ b/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs @@ -1,10 +1,8 @@ -use std::sync::Arc; - use hello_ockam::{create_token, import_project}; use ockam::abac::AbacAccessControl; use ockam::identity::{ - RemoteCredentialRetrieverCreator, RemoteCredentialRetrieverInfo, SecureChannelListenerOptions, - SecureChannelOptions, TrustMultiIdentifiersPolicy, + CredentialRetrieverOptions, RemoteCredentialRetrieverInfo, SecureChannelListenerOptions, SecureChannelOptions, + TrustMultiIdentifiersPolicy, }; use ockam::remote::RemoteRelayOptions; use ockam::{node, Context, Result, TcpOutletOptions, TcpTransportExtension}; @@ -12,8 +10,8 @@ use ockam_api::authenticator::enrollment_tokens::TokenAcceptor; use ockam_api::authenticator::one_time_code::OneTimeCode; use ockam_api::nodes::NodeManager; use ockam_api::{multiaddr_to_route, multiaddr_to_transport_route, DefaultAddress}; -use ockam_core::AsyncTryClone; use ockam_multiaddr::MultiAddr; +use ockam_transport_core::Transport; /// This node supports a "control" server on which several "edge" devices can connect /// @@ -75,16 +73,12 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime let project_authority_route = multiaddr_to_transport_route(&project.authority_route()).unwrap(); // FIXME: Handle error // Create a credential retriever that will be used to obtain credentials - let credential_retriever = Arc::new(RemoteCredentialRetrieverCreator::new( - node.context().async_try_clone().await?, - Arc::new(tcp.clone()), - node.secure_channels(), - RemoteCredentialRetrieverInfo::new( - project.authority_identifier(), - project_authority_route, - DefaultAddress::CREDENTIAL_ISSUER.into(), - ), - )); + let credential_retriever_info = RemoteCredentialRetrieverInfo::new( + project.authority_identifier(), + project_authority_route, + DefaultAddress::CREDENTIAL_ISSUER.into(), + tcp.transport_type(), + ); // 3. create an access control policy checking the value of the "component" attribute of the caller let access_control = AbacAccessControl::create( @@ -106,7 +100,7 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime let tcp_project_route = multiaddr_to_route(&project.route(), &tcp).await.unwrap(); // FIXME: Handle error let project_options = SecureChannelOptions::new() - .with_credential_retriever_creator(credential_retriever)? + .with_credential_retriever_options(CredentialRetrieverOptions::remote_default(credential_retriever_info)) .with_authority(project.authority_identifier()) .with_trust_policy(TrustMultiIdentifiersPolicy::new(vec![project.identifier()])); diff --git a/examples/rust/get_started/examples/11-attribute-based-authentication-edge-plane.rs b/examples/rust/get_started/examples/11-attribute-based-authentication-edge-plane.rs index 272677cb626..72742e3be58 100644 --- a/examples/rust/get_started/examples/11-attribute-based-authentication-edge-plane.rs +++ b/examples/rust/get_started/examples/11-attribute-based-authentication-edge-plane.rs @@ -1,7 +1,7 @@ use hello_ockam::{create_token, import_project}; use ockam::abac::AbacAccessControl; use ockam::identity::{ - identities, RemoteCredentialRetrieverCreator, RemoteCredentialRetrieverInfo, SecureChannelOptions, + identities, CredentialRetrieverOptions, RemoteCredentialRetrieverInfo, SecureChannelOptions, TrustMultiIdentifiersPolicy, }; use ockam::node; @@ -10,9 +10,8 @@ use ockam_api::authenticator::enrollment_tokens::TokenAcceptor; use ockam_api::authenticator::one_time_code::OneTimeCode; use ockam_api::nodes::NodeManager; use ockam_api::{multiaddr_to_route, multiaddr_to_transport_route, DefaultAddress}; -use ockam_core::compat::sync::Arc; -use ockam_core::AsyncTryClone; use ockam_multiaddr::MultiAddr; +use ockam_transport_core::Transport; use ockam_transport_tcp::{TcpInletOptions, TcpTransportExtension}; /// This node supports an "edge" server which can connect to a "control" node @@ -74,17 +73,13 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime let project_authority_route = multiaddr_to_transport_route(&project.route()).unwrap(); // FIXME: Handle error - // Create a credential retriever that will be used to obtain credentials - let credential_retriever = Arc::new(RemoteCredentialRetrieverCreator::new( - node.context().async_try_clone().await?, - Arc::new(tcp.clone()), - node.secure_channels(), - RemoteCredentialRetrieverInfo::new( - project.authority_identifier(), - project_authority_route, - DefaultAddress::CREDENTIAL_ISSUER.into(), - ), - )); + // Information used to access a retriever that will be used to obtain credentials + let credential_retriever_info = RemoteCredentialRetrieverInfo::new( + project.authority_identifier(), + project_authority_route, + DefaultAddress::CREDENTIAL_ISSUER.into(), + tcp.transport_type(), + ); // 3. create an access control policy checking the value of the "component" attribute of the caller let access_control = AbacAccessControl::create( @@ -98,7 +93,7 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime let tcp_project_route = multiaddr_to_route(&project.route(), &tcp).await.unwrap(); // FIXME: Handle error let project_options = SecureChannelOptions::new() - .with_credential_retriever_creator(credential_retriever)? + .with_credential_retriever_options(CredentialRetrieverOptions::remote_default(credential_retriever_info)) .with_authority(project.authority_identifier()) .with_trust_policy(TrustMultiIdentifiersPolicy::new(vec![project.identifier()])); diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/trust.rs b/implementations/rust/ockam/ockam_api/src/cli_state/trust.rs index d02286a455b..f29569d6575 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/trust.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/trust.rs @@ -1,9 +1,12 @@ -use crate::nodes::service::{NodeManagerCredentialRetrieverOptions, NodeManagerTrustOptions}; +use crate::nodes::service::NodeManagerTrustOptions; use crate::{multiaddr_to_transport_route, CliState, DefaultAddress}; -use ockam::identity::{IdentitiesVerification, RemoteCredentialRetrieverInfo}; +use ockam::identity::{ + CredentialRetrieverOptions, IdentitiesVerification, RemoteCredentialRetrieverInfo, +}; use ockam_core::errcode::{Kind, Origin}; use ockam_core::{Error, Result}; use ockam_multiaddr::MultiAddr; +use ockam_transport_tcp::TCP; use ockam_vault::SoftwareVaultForVerifyingSignatures; impl CliState { @@ -71,10 +74,11 @@ impl CliState { authority_identifier.clone(), authority_route, DefaultAddress::CREDENTIAL_ISSUER.into(), + TCP, ); let trust_options = NodeManagerTrustOptions::new( - NodeManagerCredentialRetrieverOptions::Remote(info), + CredentialRetrieverOptions::remote_default(info), Some(authority_identifier.clone()), ); @@ -86,7 +90,7 @@ impl CliState { trust_options } else if expect_cached_credential { let trust_options = NodeManagerTrustOptions::new( - NodeManagerCredentialRetrieverOptions::CacheOnly(authority_identifier.clone()), + CredentialRetrieverOptions::CacheOnly(authority_identifier.clone()), Some(authority_identifier.clone()), ); @@ -98,7 +102,7 @@ impl CliState { trust_options } else { let trust_options = NodeManagerTrustOptions::new( - NodeManagerCredentialRetrieverOptions::None, + CredentialRetrieverOptions::None, Some(authority_identifier.clone()), ); @@ -123,7 +127,7 @@ impl CliState { None => { info!("TrustOptions configured: No Authority. No Credentials"); return Ok(NodeManagerTrustOptions::new( - NodeManagerCredentialRetrieverOptions::None, + CredentialRetrieverOptions::None, None, )); } @@ -141,10 +145,11 @@ impl CliState { authority_identifier.clone(), authority_route, DefaultAddress::CREDENTIAL_ISSUER.into(), + TCP, ); let trust_options = NodeManagerTrustOptions::new( - NodeManagerCredentialRetrieverOptions::Remote(info), + CredentialRetrieverOptions::remote_default(info), Some(authority_identifier.clone()), ); diff --git a/implementations/rust/ockam/ockam_api/src/cloud/secure_clients.rs b/implementations/rust/ockam/ockam_api/src/cloud/secure_clients.rs index 2c6948bfe9a..da7ced283d9 100644 --- a/implementations/rust/ockam/ockam_api/src/cloud/secure_clients.rs +++ b/implementations/rust/ockam/ockam_api/src/cloud/secure_clients.rs @@ -3,7 +3,7 @@ use std::str::FromStr; use std::time::Duration; use ockam::identity::{ - CredentialRetrieverCreator, Identifier, SecureChannels, SecureClient, DEFAULT_TIMEOUT, + CredentialRetrieverOptions, Identifier, SecureChannels, SecureClient, DEFAULT_TIMEOUT, }; use ockam_core::compat::sync::Arc; use ockam_core::env::{get_env, get_env_with_default, FromString}; @@ -85,15 +85,15 @@ impl NodeManager { caller_identifier: &Identifier, credentials_enabled: CredentialsEnabled, ) -> Result { - let credential_retriever_creator = match credentials_enabled { - CredentialsEnabled::On => self.credential_retriever_creator.clone(), - CredentialsEnabled::Off => None, + let credential_retriever_options = match credentials_enabled { + CredentialsEnabled::On => self.credential_retriever_options(), + CredentialsEnabled::Off => CredentialRetrieverOptions::None, }; NodeManager::project_node_client( &self.tcp_transport, self.secure_channels.clone(), - credential_retriever_creator, + credential_retriever_options, project_identifier, project_multiaddr, caller_identifier, @@ -131,7 +131,7 @@ impl NodeManager { Ok(ControllerClient { secure_client: SecureClient::new( secure_channels, - None, + CredentialRetrieverOptions::None, Arc::new(tcp_transport.clone()), controller_route, &controller_identifier, @@ -159,7 +159,7 @@ impl NodeManager { Ok(AuthorityNodeClient { secure_client: SecureClient::new( secure_channels, - None, + CredentialRetrieverOptions::None, Arc::new(tcp_transport.clone()), authority_route, authority_identifier, @@ -174,7 +174,7 @@ impl NodeManager { pub async fn project_node_client( tcp_transport: &TcpTransport, secure_channels: Arc, - credential_retriever_creator: Option>, + credential_retriever_options: CredentialRetrieverOptions, project_identifier: &Identifier, project_multiaddr: &MultiAddr, caller_identifier: &Identifier, @@ -188,7 +188,7 @@ impl NodeManager { Ok(ProjectNodeClient { secure_client: SecureClient::new( secure_channels, - credential_retriever_creator, + credential_retriever_options, Arc::new(tcp_transport.clone()), project_route, project_identifier, @@ -215,7 +215,7 @@ impl NodeManager { Ok(GenericSecureClient { secure_client: SecureClient::new( secure_channels, - None, + CredentialRetrieverOptions::None, Arc::new(tcp_transport.clone()), route, identifier, diff --git a/implementations/rust/ockam/ockam_api/src/lib.rs b/implementations/rust/ockam/ockam_api/src/lib.rs index bd9cd344d06..eb02448ab56 100644 --- a/implementations/rust/ockam/ockam_api/src/lib.rs +++ b/implementations/rust/ockam/ockam_api/src/lib.rs @@ -47,6 +47,7 @@ mod util; pub use cli_state::*; pub use influxdb_token_lease::*; pub use nodes::service::default_address::*; +pub use nodes::service::*; pub use session::sessions::ConnectionStatus; pub use util::*; pub use version::*; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service.rs b/implementations/rust/ockam/ockam_api/src/nodes/service.rs index 8ecfc7e9183..417f62bb68c 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service.rs @@ -11,11 +11,7 @@ use minicbor::{Decoder, Encode}; use opentelemetry::global; use opentelemetry::trace::{FutureExt, TraceContextExt, Tracer}; -use ockam::identity::models::CredentialAndPurposeKey; -use ockam::identity::{ - CachedCredentialRetrieverCreator, CredentialRetrieverCreator, MemoryCredentialRetrieverCreator, - RemoteCredentialRetrieverCreator, RemoteCredentialRetrieverInfo, -}; +use ockam::identity::CredentialRetrieverOptions; use ockam::identity::{Identifier, SecureChannels}; use ockam::{ Address, Context, RelayService, RelayServiceOptions, Result, Routed, TcpTransport, Worker, @@ -26,7 +22,7 @@ use ockam_abac::{Action, Env, Expr, Policy, Resource}; use ockam_core::api::{Method, RequestHeader, Response}; use ockam_core::compat::{string::String, sync::Arc}; use ockam_core::flow_control::FlowControlId; -use ockam_core::{AllowAll, AsyncTryClone, IncomingAccessControl}; +use ockam_core::{AllowAll, IncomingAccessControl}; use ockam_multiaddr::MultiAddr; use ockam_node::OpenTelemetryContext; @@ -97,8 +93,7 @@ pub struct NodeManager { api_transport_flow_control_id: FlowControlId, pub(crate) tcp_transport: TcpTransport, pub(crate) secure_channels: Arc, - pub(crate) credential_retriever_creator: Option>, - authority: Option, + pub(crate) trust_options: NodeManagerTrustOptions, pub(crate) registry: Arc, pub(crate) medic_handle: MedicHandle, } @@ -119,12 +114,12 @@ impl NodeManager { } } - pub fn credential_retriever_creator(&self) -> Option> { - self.credential_retriever_creator.clone() + pub fn authority(&self) -> Option { + self.trust_options.authority.clone() } - pub fn authority(&self) -> Option { - self.authority.clone() + pub fn credential_retriever_options(&self) -> CredentialRetrieverOptions { + self.trust_options.credential_retriever_options.clone() } pub fn node_name(&self) -> String { @@ -316,22 +311,14 @@ impl NodeManagerTransportOptions { } } -#[derive(Debug)] -pub enum NodeManagerCredentialRetrieverOptions { - None, - CacheOnly(Identifier), - Remote(RemoteCredentialRetrieverInfo), - InMemory(CredentialAndPurposeKey), -} - pub struct NodeManagerTrustOptions { - credential_retriever_options: NodeManagerCredentialRetrieverOptions, + credential_retriever_options: CredentialRetrieverOptions, authority: Option, } impl NodeManagerTrustOptions { pub fn new( - credential_retriever_options: NodeManagerCredentialRetrieverOptions, + credential_retriever_options: CredentialRetrieverOptions, authority: Option, ) -> Self { Self { @@ -375,28 +362,6 @@ impl NodeManager { .await? .identifier(); - let credential_retriever_creator: Option> = - match trust_options.credential_retriever_options { - NodeManagerCredentialRetrieverOptions::None => None, - NodeManagerCredentialRetrieverOptions::CacheOnly(issuer) => { - Some(Arc::new(CachedCredentialRetrieverCreator::new( - issuer.clone(), - secure_channels.identities().cached_credentials_repository(), - ))) - } - NodeManagerCredentialRetrieverOptions::Remote(info) => { - Some(Arc::new(RemoteCredentialRetrieverCreator::new( - ctx.async_try_clone().await?, - Arc::new(transport_options.tcp_transport.clone()), - secure_channels.clone(), - info.clone(), - ))) - } - NodeManagerCredentialRetrieverOptions::InMemory(credential) => { - Some(Arc::new(MemoryCredentialRetrieverCreator::new(credential))) - } - }; - let mut s = Self { cli_state, node_name: general_options.node_name, @@ -404,8 +369,7 @@ impl NodeManager { api_transport_flow_control_id: transport_options.api_transport_flow_control_id, tcp_transport: transport_options.tcp_transport, secure_channels, - credential_retriever_creator, - authority: trust_options.authority, + trust_options, registry, medic_handle, }; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs index 835543ca74e..cc7318322c1 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs @@ -171,8 +171,7 @@ impl InMemoryNode { })?; let authority_identifier = self - .authority - .clone() + .authority() .ok_or(ApiError::core("NodeManager has no authority"))?; let default_policy_expression = kafka_default_policy_expression(); @@ -289,8 +288,7 @@ impl InMemoryNode { ); let authority_identifier = self - .authority - .clone() + .authority() .ok_or(ApiError::core("NodeManager has no authority"))?; let secure_channels = self.secure_channels.clone(); @@ -389,8 +387,7 @@ impl NodeManager { .await?; let authority_id = self - .authority - .clone() + .authority() .ok_or(ApiError::core("NodeManager has no authority"))?; let outlet_policy_expression = None; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs index 3bb50207434..e875ca0d930 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs @@ -207,12 +207,8 @@ impl NodeManager { None => options, }; - let options = match self.credential_retriever_creator.as_ref() { - None => options, - Some(credential_retriever_creator) => { - options.with_credential_retriever_creator(credential_retriever_creator.clone())? - } - }; + let options = + options.with_credential_retriever_options(self.credential_retriever_options().clone()); let options = match authorized_identifiers.clone() { Some(ids) => options.with_trust_policy(TrustMultiIdentifiersPolicy::new(ids)), @@ -315,12 +311,8 @@ impl NodeManager { None => options, }; - let options = match self.credential_retriever_creator.as_ref() { - None => options, - Some(credential_retriever_creator) => { - options.with_credential_retriever_creator(credential_retriever_creator.clone())? - } - }; + let options = + options.with_credential_retriever_options(self.credential_retriever_options()); let listener = secure_channels .create_secure_channel_listener(ctx, &identifier, address.clone(), options) diff --git a/implementations/rust/ockam/ockam_api/src/util.rs b/implementations/rust/ockam/ockam_api/src/util.rs index 3b414af744e..da7cd43d949 100644 --- a/implementations/rust/ockam/ockam_api/src/util.rs +++ b/implementations/rust/ockam/ockam_api/src/util.rs @@ -375,7 +375,7 @@ pub fn local_worker(code: &Code) -> Result { #[cfg(test)] pub mod test_utils { use ockam::identity::utils::AttributesBuilder; - use ockam::identity::SecureChannels; + use ockam::identity::{CredentialRetrieverOptions, SecureChannels}; use ockam::Result; use ockam_core::compat::sync::Arc; use ockam_core::flow_control::FlowControls; @@ -388,8 +388,7 @@ pub mod test_utils { }; use crate::cli_state::{random_name, CliState}; use crate::nodes::service::{ - NodeManagerCredentialRetrieverOptions, NodeManagerGeneralOptions, - NodeManagerTransportOptions, NodeManagerTrustOptions, + NodeManagerGeneralOptions, NodeManagerTransportOptions, NodeManagerTrustOptions, }; use crate::nodes::InMemoryNode; use crate::nodes::{NodeManagerWorker, NODEMANAGER_ADDR}; @@ -454,7 +453,7 @@ pub mod test_utils { tcp.async_try_clone().await?, ), NodeManagerTrustOptions::new( - NodeManagerCredentialRetrieverOptions::InMemory(credential), + CredentialRetrieverOptions::InMemory(credential), Some(identifier.clone()), ), ) diff --git a/implementations/rust/ockam/ockam_api/tests/common/common.rs b/implementations/rust/ockam/ockam_api/tests/common/common.rs index 96474386d5a..273fdb7afe3 100644 --- a/implementations/rust/ockam/ockam_api/tests/common/common.rs +++ b/implementations/rust/ockam/ockam_api/tests/common/common.rs @@ -135,7 +135,7 @@ pub fn change_client_identifier( let client = client.get_secure_client(); let client = SecureClient::new( client.secure_channels(), - client.credential_retriever_creator(), + client.credential_retriever_options(), client.transport(), client.secure_route().clone(), client.server_identifier(), diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/credentials.rs b/implementations/rust/ockam/ockam_identity/src/credentials/credentials.rs index 8a9544cfeb6..082f4527635 100644 --- a/implementations/rust/ockam/ockam_identity/src/credentials/credentials.rs +++ b/implementations/rust/ockam/ockam_identity/src/credentials/credentials.rs @@ -4,7 +4,7 @@ use ockam_vault::{VaultForSigning, VaultForVerifyingSignatures}; use crate::models::{CredentialData, PurposeKeyAttestationData}; use crate::{ CredentialsCreation, CredentialsVerification, IdentitiesCreation, IdentityAttributesRepository, - PurposeKeys, + PurposeKeys, TimestampInSeconds, }; /// Structure with both [`CredentialData`] and [`PurposeKeyAttestationData`] that we get @@ -17,6 +17,13 @@ pub struct CredentialAndPurposeKeyData { pub purpose_key_data: PurposeKeyAttestationData, } +impl CredentialAndPurposeKeyData { + /// Return the expires_at timestamp + pub fn expires_at(&self) -> TimestampInSeconds { + self.credential_data.expires_at + } +} + /// Service for managing [`Credential`]s pub struct Credentials { credential_vault: Arc, diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/cache_retriever.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/cache_retriever.rs deleted file mode 100644 index 0b53c878f28..00000000000 --- a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/cache_retriever.rs +++ /dev/null @@ -1,135 +0,0 @@ -use crate::models::CredentialAndPurposeKey; -use crate::utils::now; -use crate::{ - CredentialRepository, CredentialRetriever, CredentialRetrieverCreator, Identifier, - IdentityError, TimestampInSeconds, -}; -use async_trait::async_trait; -use ockam_core::compat::boxed::Box; -use ockam_core::compat::sync::Arc; -use ockam_core::{Address, Result}; -use tracing::{debug, error}; - -/// Credential is considered already expired if it expires in less than this gap to account for a machine with a -/// wrong time -pub const DEFAULT_CREDENTIAL_CLOCK_SKEW_GAP: TimestampInSeconds = TimestampInSeconds(60); - -/// Credentials retriever for credentials located in a local cache -pub struct CachedCredentialRetriever { - issuer: Identifier, - subject: Identifier, - cache: Arc, - clock_skew_gap: TimestampInSeconds, -} - -impl CachedCredentialRetriever { - /// Create a new cache credential retriever - pub fn new( - issuer: Identifier, - subject: Identifier, - cache: Arc, - ) -> Self { - Self { - issuer, - subject, - cache, - clock_skew_gap: DEFAULT_CREDENTIAL_CLOCK_SKEW_GAP, - } - } - - /// Retrieve a credential from the credentials storage and check its expiration - pub async fn retrieve_impl( - issuer: &Identifier, - for_identity: &Identifier, - now: TimestampInSeconds, - cache: Arc, - clock_skew_gap: TimestampInSeconds, - ) -> Result> { - debug!( - "Requested credential for: {} from: {}", - for_identity, issuer - ); - - // check if we have a valid cached credential - if let Some(cached_credential) = cache.get(for_identity, issuer).await? { - // add an extra minute to have a bit of leeway for clock skew - if cached_credential.get_expires_at()? > now + clock_skew_gap { - debug!("Found valid cached credential for: {}", for_identity); - Ok(Some(cached_credential)) - } else { - debug!( - "Found expired cached credential for: {}. Deleting...", - for_identity - ); - let delete_res = cache.delete(for_identity, issuer).await; - - if let Some(err) = delete_res.err() { - error!( - "Error deleting expired credential for {} from {}. Err={}", - for_identity, issuer, err - ); - } - - Ok(None) - } - } else { - debug!("Found no cached credential for: {}", for_identity); - Ok(None) - } - } -} - -/// Creator for [`CachedCredentialRetriever`] -pub struct CachedCredentialRetrieverCreator { - issuer: Identifier, - cache: Arc, -} - -impl CachedCredentialRetrieverCreator { - /// Constructor - pub fn new(issuer: Identifier, cache: Arc) -> Self { - Self { issuer, cache } - } -} - -#[async_trait] -impl CredentialRetrieverCreator for CachedCredentialRetrieverCreator { - async fn create(&self, subject: &Identifier) -> Result> { - Ok(Arc::new(CachedCredentialRetriever::new( - self.issuer.clone(), - subject.clone(), - self.cache.clone(), - ))) - } -} - -#[async_trait] -impl CredentialRetriever for CachedCredentialRetriever { - async fn initialize(&self) -> Result<()> { - Ok(()) - } - - async fn retrieve(&self) -> Result { - let now = now()?; - match Self::retrieve_impl( - &self.issuer, - &self.subject, - now, - self.cache.clone(), - self.clock_skew_gap, - ) - .await? - { - Some(credential) => Ok(credential), - None => Err(IdentityError::NoCredential)?, - } - } - - fn subscribe(&self, _address: &Address) -> Result<()> { - Ok(()) - } - - fn unsubscribe(&self, _address: &Address) -> Result<()> { - Ok(()) - } -} diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/cached_credential_retriever.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/cached_credential_retriever.rs new file mode 100644 index 00000000000..a927fa9ae1b --- /dev/null +++ b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/cached_credential_retriever.rs @@ -0,0 +1,94 @@ +use crate::models::CredentialAndPurposeKey; +use crate::utils::now; +use crate::{ + CredentialRepository, CredentialRetriever, Identifier, IdentityError, TimestampInSeconds, +}; +use async_trait::async_trait; +use ockam_core::compat::boxed::Box; +use ockam_core::compat::sync::Arc; +use ockam_core::Result; +use tracing::{debug, error}; + +/// Credential is considered already expired if it expires in less than this gap to account for a machine with a +/// wrong time +pub const DEFAULT_CREDENTIAL_CLOCK_SKEW_GAP: TimestampInSeconds = TimestampInSeconds(60); + +/// Credentials retriever for credentials located in a local cache +pub struct CachedCredentialRetriever { + issuer: Identifier, + cache: Arc, + clock_skew_gap: TimestampInSeconds, +} + +impl CachedCredentialRetriever { + /// Create a new cache credential retriever + pub fn new( + issuer: Identifier, + cache: Arc, + clock_skew_gap: TimestampInSeconds, + ) -> Self { + Self { + issuer, + cache, + clock_skew_gap, + } + } + + /// Retrieve a credential from the credentials storage and check its expiration + pub async fn retrieve(&self, subject: &Identifier) -> Result { + debug!( + "Requested credential for: {} from: {}", + subject, self.issuer + ); + + // check if we have a valid cached credential + if let Some(cached_credential) = self.cache.get(subject, &self.issuer).await? { + // add an extra minute to have a bit of leeway for clock skew + if cached_credential.get_expires_at()? > now()? + self.clock_skew_gap { + debug!("Found valid cached credential for: {}.", subject); + Ok(cached_credential) + } else { + debug!( + "Found expired cached credential for: {}. Deleting...", + subject + ); + let delete_res = self.cache.delete(subject, &self.issuer).await; + + if let Some(err) = delete_res.err() { + error!( + "Error deleting expired credential for {} from {}. Err={}", + subject, self.issuer, err + ); + } + Err(IdentityError::NoCredential)? + } + } else { + debug!("Found no cached credential for: {}", subject); + Err(IdentityError::NoCredential)? + } + } + + /// Store a newly retrieved credential locally + pub async fn store( + &self, + subject: &Identifier, + expires_at: TimestampInSeconds, + credential_and_purpose_key: CredentialAndPurposeKey, + ) -> Result<()> { + self.cache + .put( + subject, + &self.issuer, + expires_at, + credential_and_purpose_key, + ) + .await + } +} + +#[async_trait] +impl CredentialRetriever for CachedCredentialRetriever { + async fn retrieve(&self, subject: &Identifier) -> Result { + self.retrieve(subject).await + } +} diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/credential_retriever.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/credential_retriever.rs index 5a4b0a338b6..0f5fd6ce581 100644 --- a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/credential_retriever.rs +++ b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/credential_retriever.rs @@ -1,5 +1,4 @@ use ockam_core::compat::boxed::Box; -use ockam_core::compat::sync::Arc; use ockam_core::{async_trait, Address, Result}; use crate::models::CredentialAndPurposeKey; @@ -8,22 +7,20 @@ use crate::Identifier; /// Trait for retrieving a credential for a given identity #[async_trait] pub trait CredentialRetriever: Send + Sync + 'static { - /// Initialization of the retriever. Might load initial state, or start scheduled refresh events. - async fn initialize(&self) -> Result<()>; - /// Retrieve a credential for an identity. - async fn retrieve(&self) -> Result; + async fn retrieve(&self, subject: &Identifier) -> Result; +} + +/// This trait refreshes a given credential in the background and can be used to be +/// notified when new values are available +#[async_trait] +pub trait CredentialRefresher: Send + Sync + 'static { + /// Start the background refresher + async fn initialize(&self) -> Result<()>; - /// Subscribe to credential refresh + /// Subscribe to refreshed CredentialAndPurposeKey fn subscribe(&self, address: &Address) -> Result<()>; - /// Unsubscribe from credential refresh + /// Unsubscribe to refreshed CredentialAndPurposeKey fn unsubscribe(&self, address: &Address) -> Result<()>; } - -/// Creator for [`CredentialRetriever`] implementation -#[async_trait] -pub trait CredentialRetrieverCreator: Send + Sync + 'static { - /// Retrieve a credential for an identity. - async fn create(&self, subject: &Identifier) -> Result>; -} diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/memory_retriever.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/memory_retriever.rs index ff30741f7b9..0c2e6241b10 100644 --- a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/memory_retriever.rs +++ b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/memory_retriever.rs @@ -1,9 +1,8 @@ use ockam_core::compat::boxed::Box; -use ockam_core::compat::sync::Arc; -use ockam_core::{async_trait, Address, Result}; +use ockam_core::{async_trait, Result}; use crate::models::CredentialAndPurposeKey; -use crate::{CredentialRetriever, CredentialRetrieverCreator, Identifier}; +use crate::{CredentialRetriever, Identifier}; /// Credentials retriever that retrieves a credential from memory pub struct MemoryCredentialRetriever { @@ -19,40 +18,7 @@ impl MemoryCredentialRetriever { #[async_trait] impl CredentialRetriever for MemoryCredentialRetriever { - async fn initialize(&self) -> Result<()> { - Ok(()) - } - - async fn retrieve(&self) -> Result { + async fn retrieve(&self, _subject: &Identifier) -> Result { Ok(self.credential.clone()) } - - fn subscribe(&self, _address: &Address) -> Result<()> { - Ok(()) - } - - fn unsubscribe(&self, _address: &Address) -> Result<()> { - Ok(()) - } -} - -/// Creator for [`MemoryCredentialRetriever`] -pub struct MemoryCredentialRetrieverCreator { - credential: CredentialAndPurposeKey, -} - -impl MemoryCredentialRetrieverCreator { - /// Constructor - pub fn new(credential: CredentialAndPurposeKey) -> Self { - Self { credential } - } -} - -#[async_trait] -impl CredentialRetrieverCreator for MemoryCredentialRetrieverCreator { - async fn create(&self, _subject: &Identifier) -> Result> { - Ok(Arc::new(MemoryCredentialRetriever::new( - self.credential.clone(), - ))) - } } diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/mod.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/mod.rs index 6ded8962b93..397ab8600f2 100644 --- a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/mod.rs +++ b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/mod.rs @@ -1,10 +1,9 @@ -mod cache_retriever; -#[allow(clippy::module_inception)] +mod cached_credential_retriever; mod credential_retriever; mod memory_retriever; mod remote_retriever; -pub use cache_retriever::*; +pub use cached_credential_retriever::*; pub use credential_retriever::*; pub use memory_retriever::*; pub use remote_retriever::*; diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/credential_retriever_options.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/credential_retriever_options.rs new file mode 100644 index 00000000000..ba3fd0a09ba --- /dev/null +++ b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/credential_retriever_options.rs @@ -0,0 +1,71 @@ +use serde::{Deserialize, Serialize}; + +use ockam_core::{Address, Route, TransportType}; + +use crate::models::CredentialAndPurposeKey; +use crate::{ + Identifier, RemoteCredentialRefresherTimingOptions, RemoteCredentialRetrieverTimingOptions, +}; + +/// Options for retrieving credentials +#[derive(Debug, Clone)] +pub enum CredentialRetrieverOptions { + /// No credential retrieval is required + None, + /// Credentials must be retrieved from cache, for a given issuer + CacheOnly(Identifier), + /// Credentials are retrieved via a remote authority + Remote { + /// Routing information to the issuer + retriever_info: RemoteCredentialRetrieverInfo, + /// Timing options for retrieving credentials + retriever_timing_options: RemoteCredentialRetrieverTimingOptions, + /// Timing options for refreshing credentials + refresher_timing_options: RemoteCredentialRefresherTimingOptions, + }, + /// Credentials have been provided in-memory + InMemory(CredentialAndPurposeKey), +} + +impl CredentialRetrieverOptions { + /// Create remote retriever options with default timing options + pub fn remote_default( + retriever_info: RemoteCredentialRetrieverInfo, + ) -> CredentialRetrieverOptions { + CredentialRetrieverOptions::Remote { + retriever_info, + retriever_timing_options: Default::default(), + refresher_timing_options: Default::default(), + } + } +} + +/// Information necessary to connect to a remote credential retriever +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RemoteCredentialRetrieverInfo { + /// Issuer identity, used to validate retrieved credentials + pub issuer: Identifier, + /// Route used to establish a secure channel to the remote node + pub route: Route, + /// Address of the credentials service on the remote node + pub service_address: Address, + /// Transport used by the SecureClient + pub transport_type: TransportType, +} + +impl RemoteCredentialRetrieverInfo { + /// Create new information for a credential retriever + pub fn new( + issuer: Identifier, + route: Route, + service_address: Address, + transport_type: TransportType, + ) -> Self { + Self { + issuer, + route, + service_address, + transport_type, + } + } +} diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/info.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/info.rs deleted file mode 100644 index d375ade74f7..00000000000 --- a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/info.rs +++ /dev/null @@ -1,27 +0,0 @@ -use serde::{Deserialize, Serialize}; - -use ockam_core::{Address, Route}; - -use crate::Identifier; - -/// Information necessary to connect to a remote credential retriever -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RemoteCredentialRetrieverInfo { - /// Issuer identity, used to validate retrieved credentials - pub issuer: Identifier, - /// Route used to establish a secure channel to the remote node - pub route: Route, - /// Address of the credentials service on the remote node - pub service_address: Address, -} - -impl RemoteCredentialRetrieverInfo { - /// Create new information for a credential retriever - pub fn new(issuer: Identifier, route: Route, service_address: Address) -> Self { - Self { - issuer, - route, - service_address, - } - } -} diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/mod.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/mod.rs index 9312ca99d75..8068b66640c 100644 --- a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/mod.rs +++ b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/mod.rs @@ -1,9 +1,7 @@ -mod info; -#[allow(clippy::module_inception)] -mod remote_retriever; -mod remote_retriever_creator; -mod remote_retriever_trait_impl; +mod credential_retriever_options; +mod remote_credential_refresher; +mod remote_credential_retriever; -pub use info::*; -pub use remote_retriever::*; -pub use remote_retriever_creator::*; +pub use credential_retriever_options::*; +pub use remote_credential_refresher::*; +pub use remote_credential_retriever::*; diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_credential_refresher.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_credential_refresher.rs new file mode 100644 index 00000000000..56d1d8eb576 --- /dev/null +++ b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_credential_refresher.rs @@ -0,0 +1,352 @@ +use async_trait::async_trait; +use core::cmp::max; +use ockam_core::compat::boxed::Box; +use tracing::{debug, error, info, warn}; + +use ockam_core::compat::sync::{Arc, RwLock}; +use ockam_core::compat::time::Duration; +use ockam_core::compat::vec::Vec; +use ockam_core::{route, Address, Decodable, Encodable, Encoded, Message, Result}; +use ockam_node::compat::asynchronous::Mutex; +use ockam_node::Context; + +use crate::models::CredentialAndPurposeKey; +use crate::utils::now; +use crate::{ + CachedCredentialRetriever, CredentialRefresher, CredentialRetriever, Identifier, IdentityError, + RemoteCredentialRetriever, TimestampInSeconds, DEFAULT_CREDENTIAL_CLOCK_SKEW_GAP, +}; + +/// This is the default interval before a credential expiration when we'll query for +/// a new credential to avoid it expiring before we got a new one. +pub const DEFAULT_PROACTIVE_REFRESH_CREDENTIAL_TIME_GAP: TimestampInSeconds = + TimestampInSeconds(60); + +/// Default minimal interval before 2 refreshed in case we retry the refresh. +pub const DEFAULT_MIN_REFRESH_CREDENTIAL_INTERVAL: Duration = Duration::from_secs(10); + +/// Start refresh in the background before it expires +pub const DEFAULT_CREDENTIAL_PROACTIVE_REFRESH_GAP: TimestampInSeconds = TimestampInSeconds(60); + +/// Timing options for refreshing remote credentials +#[derive(Debug, Clone, Copy)] +pub struct RemoteCredentialRefresherTimingOptions { + /// Minimum interval before refresh requests to the Authority node + pub min_refresh_interval: Duration, + /// Time gap used to request a new credential before the old one actually expires + pub proactive_refresh_gap: TimestampInSeconds, + /// Time gap used to consider credential expired before its actual expiration + /// to account for time errors on different machines + pub clock_skew_gap: TimestampInSeconds, +} + +impl Default for RemoteCredentialRefresherTimingOptions { + fn default() -> Self { + Self { + min_refresh_interval: DEFAULT_MIN_REFRESH_CREDENTIAL_INTERVAL, + proactive_refresh_gap: DEFAULT_PROACTIVE_REFRESH_CREDENTIAL_TIME_GAP, + clock_skew_gap: DEFAULT_CREDENTIAL_CLOCK_SKEW_GAP, + } + } +} + +/// Credentials refresher for credentials located on a different node +#[derive(Clone)] +pub struct RemoteCredentialRefresher { + ctx: Arc, + /// This retriever is used to get fresh credentials + remote_credential_retriever: Arc, + /// This retriever is only used to store fresh credentials + cached_credential_retriever: Arc, + /// Identifier for which we want to refresh credentials + subject: Identifier, + /// Options used to tune the refresh behavior + timing_options: RemoteCredentialRefresherTimingOptions, + /// This mutex makes sure that we only set-up the initial refresh once + is_initialized: Arc>, + /// Subscribers addresses that we will notify when credential is refreshed + subscribers: Arc>>, +} + +#[async_trait] +impl CredentialRefresher for RemoteCredentialRefresher { + async fn initialize(&self) -> Result<()> { + self.initialize().await + } + + fn subscribe(&self, address: &Address) -> Result<()> { + self.subscribe(address) + } + + fn unsubscribe(&self, address: &Address) -> Result<()> { + self.unsubscribe(address) + } +} + +impl RemoteCredentialRefresher { + /// Create a new remote credential refresher + pub fn new( + ctx: Arc, + remote_credential_retriever: Arc, + cached_credential_retriever: Arc, + subject: Identifier, + timing_options: RemoteCredentialRefresherTimingOptions, + ) -> Self { + debug!("Creation of RemoteCredentialRefresher for: {}", subject); + + Self { + ctx, + remote_credential_retriever, + cached_credential_retriever, + subject, + timing_options, + is_initialized: Arc::new(Mutex::new(false)), + subscribers: Default::default(), + } + } + + /// Make sure that an initial valid credential is available + pub async fn initialize(&self) -> Result<()> { + let mut is_initialized = self.is_initialized.lock().await; + if *is_initialized { + return Ok(()); + } + + debug!( + "Initialization of RemoteCredentialRefresher for: {}", + self.subject + ); + + let refresh_in = self.compute_refresh_duration(now()?, false).await?; + + if !refresh_in.has_valid_credential { + // We don't have a valid credential - refresh immediately and wait for the result + debug!( + "Creation of RemoteCredentialRefresher for: {} requires immediate credential refresh", + self.subject + ); + self.get_new_credential().await?; + } else { + // We still have a valid credential - schedule refresh in the background + self.schedule_credentials_refresh_impl(refresh_in.duration, false) + .await?; + } + + *is_initialized = true; + + Ok(()) + } + + /// The subscribe method can be used by a worker to subscribe to a refresh credential. + /// See [`Encryptor`] + pub fn subscribe(&self, address: &Address) -> Result<()> { + let mut subscribers = self.subscribers.write().unwrap(); + + if subscribers.contains(address) { + return Err(IdentityError::AddressAlreadySubscribedForThatCredentialRefresher)?; + } + + subscribers.push(address.clone()); + + Ok(()) + } + + /// The unsubscribe method is used to stop subscribing to credential refresh events + pub fn unsubscribe(&self, address: &Address) -> Result<()> { + let mut subscribers = self.subscribers.write().unwrap(); + + if let Some(i) = subscribers.iter().position(|x| x == address) { + subscribers.remove(i); + Ok(()) + } else { + Err(IdentityError::AddressIsNotSubscribedForThatCredentialRefresher)? + } + } +} + +struct RefreshDuration { + duration: Duration, + has_valid_credential: bool, +} + +impl RemoteCredentialRefresher { + async fn compute_refresh_duration( + &self, + now: TimestampInSeconds, + is_retry: bool, + ) -> Result { + let expires_at = match self + .cached_credential_retriever + .retrieve(&self.subject) + .await + { + Ok(c) => c.get_expires_at()?, + _ => now, + }; + + let mut has_valid_credential = false; + let refresh_in = if expires_at <= now + self.timing_options.clock_skew_gap { + // Credential is considered expired. We already need to refresh. + 0.into() + } else if expires_at + <= now + self.timing_options.clock_skew_gap + self.timing_options.proactive_refresh_gap + { + // Credential is not expired, but it's already time to refresh it + has_valid_credential = true; + 0.into() + } else { + // Credential is not expired, and will need refresh later + expires_at + - now + - self.timing_options.clock_skew_gap + - self.timing_options.proactive_refresh_gap + }; + let refresh_in = Duration::from(refresh_in); + + let refresh_in = if is_retry { + // Avoid too many request to the credential_retriever, the refresh can't be sooner than + // self.min_credential_refresh_interval if it's a retry + max(self.timing_options.min_refresh_interval, refresh_in) + } else { + refresh_in + }; + + Ok(RefreshDuration { + duration: refresh_in, + has_valid_credential, + }) + } + + /// Schedule a DelayedEvent that will at specific point in time put a message + /// into EncryptorWorker's own internal mailbox which it will use as a trigger to get a new + /// credential and present it to the other side. + async fn schedule_credentials_refresh( + &self, + now: TimestampInSeconds, + is_retry: bool, + ) -> Result<()> { + let refresh_in = self.compute_refresh_duration(now, is_retry).await?; + self.schedule_credentials_refresh_impl(refresh_in.duration, is_retry) + .await + } + + async fn notify_subscribers( + &self, + credential_and_purpose_key: CredentialAndPurposeKey, + ) -> Result<()> { + let subscribers = self.subscribers.read().unwrap().clone(); + for subscriber in subscribers { + match self + .ctx + .send( + route![subscriber.clone()], + CredentialAndPurposeKeyMessage(credential_and_purpose_key.clone()), + ) + .await + { + Ok(_) => { + debug!( + "Notified RemoteCredentialRetriever subscriber {}", + subscriber + ) + } + Err(_err) => { + warn!( + "Error notifying RemoteCredentialRetriever subscriber {}", + subscriber + ); + } + } + } + + Ok(()) + } + + /// Schedule a DelayedEvent that will at specific point in time put a message + /// into EncryptorWorker's own internal mailbox which it will use as a trigger to get a new + /// credential and present it to the other side. + async fn schedule_credentials_refresh_impl( + &self, + refresh_in: Duration, + is_retry: bool, + ) -> Result<()> { + let is_retry_str = if is_retry { " retry " } else { " " }; + info!( + "Scheduling background credentials refresh{} in {} seconds", + is_retry_str, + refresh_in.as_secs() + ); + + self.request_new_credential_in_background(refresh_in, is_retry); + Ok(()) + } +} + +impl RemoteCredentialRefresher { + async fn get_new_credential(&self) -> Result<()> { + let credential_and_purpose_key = self + .remote_credential_retriever + .retrieve(&self.subject) + .await?; + self.cached_credential_retriever + .store( + &self.subject, + credential_and_purpose_key.get_expires_at()?, + credential_and_purpose_key.clone(), + ) + .await?; + self.notify_subscribers(credential_and_purpose_key).await?; + self.schedule_credentials_refresh(now()?, false).await + } + + fn request_new_credential_in_background(&self, wait: Duration, is_retry: bool) { + let s = self.clone(); + ockam_node::spawn(async move { + let is_retry_str = if is_retry { " retry " } else { " " }; + info!( + "Scheduled background credentials refresh {} in {} seconds", + is_retry_str, + wait.as_secs() + ); + let now = now().unwrap(); + + s.ctx.sleep_long_until(*now + wait.as_secs()).await; + info!("Executing background credentials refresh {}", is_retry_str,); + let res = s.get_new_credential().await; + + if let Some(err) = res.err() { + error!( + "Error refreshing credential for {} in the background: {}", + s.subject, err + ); + + if let Err(e) = s.schedule_credentials_refresh(now, true).await { + error!( + "Error scheduling a credential refresh for {}: {}", + s.subject, e + ); + } + }; + }); + } +} + +/// This message is sent to an Encryptor when a refreshed credential is available +#[derive(Clone, Debug)] +pub struct CredentialAndPurposeKeyMessage(pub(crate) CredentialAndPurposeKey); + +impl Encodable for CredentialAndPurposeKeyMessage { + fn encode(&self) -> Result { + self.0.encode_as_cbor_bytes() + } +} + +impl Decodable for CredentialAndPurposeKeyMessage { + fn decode(e: &[u8]) -> Result { + Ok(CredentialAndPurposeKeyMessage( + CredentialAndPurposeKey::decode_from_cbor_bytes(e)?, + )) + } +} + +impl Message for CredentialAndPurposeKeyMessage {} diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_credential_retriever.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_credential_retriever.rs new file mode 100644 index 00000000000..3937ccf2a4e --- /dev/null +++ b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_credential_retriever.rs @@ -0,0 +1,268 @@ +use async_trait::async_trait; +use ockam_core::compat::boxed::Box; +use ockam_core::compat::collections::BTreeMap; +use ockam_core::compat::sync::Arc; +use ockam_node::compat::asynchronous::RwLock; +use tracing::{debug, info, trace}; + +use ockam_core::api::Request; +use ockam_core::compat::time::Duration; +use ockam_core::{Address, AllowAll, DenyAll, Mailboxes, Result}; +use ockam_node::Context; + +use crate::models::CredentialAndPurposeKey; +use crate::{ + CachedCredentialRetriever, CredentialRefresher, CredentialRetriever, + CredentialRetrieverOptions, Identifier, RemoteCredentialRefresher, + RemoteCredentialRefresherTimingOptions, RemoteCredentialRetrieverInfo, SecureChannels, + SecureClient, +}; + +/// Default timeout for requesting credential from the authority +pub const DEFAULT_CREDENTIAL_REQUEST_TIMEOUT: Duration = Duration::from_secs(15); + +/// Default timeout for creating secure channel to the authority +pub const DEFAULT_CREDENTIAL_SECURE_CHANNEL_CREATION_TIMEOUT: Duration = Duration::from_secs(30); + +/// Timing options for retrieving remote credentials +#[derive(Debug, Clone, Copy)] +pub struct RemoteCredentialRetrieverTimingOptions { + /// Timeout for request to the Authority node + pub request_timeout: Duration, + /// Timeout for creating secure channel to the Authority node + pub secure_channel_creation_timeout: Duration, +} + +impl Default for RemoteCredentialRetrieverTimingOptions { + fn default() -> Self { + Self { + request_timeout: DEFAULT_CREDENTIAL_REQUEST_TIMEOUT, + secure_channel_creation_timeout: DEFAULT_CREDENTIAL_SECURE_CHANNEL_CREATION_TIMEOUT, + } + } +} + +/// Credentials retriever for credentials located on a different node, issued by a specific authority. +/// The credentials are cached locally and can be periodically refreshed. +#[derive(Clone)] +pub struct RemoteCachedCredentialRetriever { + ctx: Arc, + /// This retriever is used to get new credentials from a specific issuer + remote_credential_retriever: Arc, + /// This retriever is used to get local credentials if they are not expired + /// and store new credentials retrieved with the remote credential retriever + cached_credential_retriever: Arc, + /// These options are used to create credential refreshers + refresher_timing_options: RemoteCredentialRefresherTimingOptions, + /// List of credential refreshers. Each refresher refreshes the credential of a given identity + /// by calling the remote_credential_retriever above (i.e. against the same authority) + refreshers: Arc>>>, +} + +impl RemoteCachedCredentialRetriever { + /// Create a new remote credential retriever + pub fn new( + ctx: Arc, + secure_channels: Arc, + issuer_info: RemoteCredentialRetrieverInfo, + retriever_timing_options: RemoteCredentialRetrieverTimingOptions, + refresher_timing_options: RemoteCredentialRefresherTimingOptions, + ) -> Self { + debug!( + "Creation of RemoteCredentialRetriever for authority: {}", + issuer_info.issuer + ); + + let remote_credential_retriever = Arc::new(RemoteCredentialRetriever::new( + ctx.clone(), + secure_channels.clone(), + issuer_info.clone(), + retriever_timing_options, + )); + + let cached_credential_retriever = Arc::new(CachedCredentialRetriever::new( + issuer_info.issuer.clone(), + secure_channels.identities().cached_credentials_repository(), + refresher_timing_options.clock_skew_gap, + )); + + Self { + ctx, + remote_credential_retriever, + cached_credential_retriever, + refresher_timing_options, + refreshers: Default::default(), + } + } +} + +#[async_trait] +impl CredentialRetriever for RemoteCachedCredentialRetriever { + async fn retrieve(&self, subject: &Identifier) -> Result { + self.retrieve(subject).await + } +} + +impl RemoteCachedCredentialRetriever { + /// Retrieve the credential for a given identifier by first checking if some valid credentials + /// are available locally. If not, use the remote retriever and store the new credentials locally. + async fn retrieve(&self, subject: &Identifier) -> Result { + if let Ok(credential_and_purpose_key) = + self.cached_credential_retriever.retrieve(subject).await + { + return Ok(credential_and_purpose_key); + }; + + let credential_and_purpose_key = self.remote_credential_retriever.retrieve(subject).await?; + self.cached_credential_retriever + .store( + subject, + credential_and_purpose_key.get_expires_at()?, + credential_and_purpose_key.clone(), + ) + .await?; + Ok(credential_and_purpose_key) + } + + /// Return a struct which will refresh the subject credential in the background. + /// The CredentialRefresher interface allows workers to subscribe to refreshed credentials + pub async fn make_refresher( + &self, + subject: &Identifier, + ) -> Result> { + debug!( + "Requested RemoteCredentialRefresher for: {} and authority {}", + subject, + self.issuer() + ); + let refreshers = self.refreshers.read().await; + if let Some(existing_refresher) = refreshers.get(subject) { + debug!( + "Returning existing RemoteCredentialRefresher for: {} and authority {}", + subject, + self.issuer() + ); + return Ok(existing_refresher.clone()); + } + drop(refreshers); + + let mut refreshers = self.refreshers.write().await; + if let Some(existing_refresher) = refreshers.get(subject) { + debug!( + "Returning existing RemoteCredentialRefresher for: {} and authority {}", + subject, + self.issuer() + ); + return Ok(existing_refresher.clone()); + } + + debug!( + "Creating new RemoteCredentialRefresher for: {}, authority: {}", + subject, + self.remote_credential_retriever.issuer() + ); + + let mailboxes = Mailboxes::main( + Address::random_tagged("RemoteCredentialRefresher"), + Arc::new(DenyAll), + Arc::new(AllowAll), + ); + let ctx = self.ctx.new_detached_with_mailboxes(mailboxes).await?; + let refresher = Arc::new(RemoteCredentialRefresher::new( + Arc::new(ctx), + self.remote_credential_retriever.clone(), + self.cached_credential_retriever.clone(), + subject.clone(), + self.refresher_timing_options, + )); + refresher.initialize().await?; + + refreshers.insert(subject.clone(), refresher.clone()); + Ok(refresher) + } + + /// Identifier of the credential issuer + fn issuer(&self) -> &Identifier { + self.remote_credential_retriever.issuer() + } +} + +/// Credentials retriever for credentials located on a different node +#[derive(Clone)] +pub struct RemoteCredentialRetriever { + ctx: Arc, + secure_channels: Arc, + issuer_info: RemoteCredentialRetrieverInfo, + retriever_timing_options: RemoteCredentialRetrieverTimingOptions, +} + +#[async_trait] +impl CredentialRetriever for RemoteCredentialRetriever { + async fn retrieve(&self, subject: &Identifier) -> Result { + self.retrieve(subject).await + } +} + +impl RemoteCredentialRetriever { + /// Create a new remote credential retriever + pub fn new( + ctx: Arc, + secure_channels: Arc, + issuer_info: RemoteCredentialRetrieverInfo, + retriever_timing_options: RemoteCredentialRetrieverTimingOptions, + ) -> Self { + RemoteCredentialRetriever { + ctx, + secure_channels, + issuer_info, + retriever_timing_options, + } + } + + /// Retrieve credentials for a given identity by calling an issuer on a remote node + async fn retrieve(&self, subject: &Identifier) -> Result { + let transport = self + .ctx + .get_registered_transport(self.issuer_info.transport_type)?; + let client = SecureClient::new( + self.secure_channels.clone(), + CredentialRetrieverOptions::None, + transport, + self.issuer_info.route.clone(), + &self.issuer_info.issuer, + subject, + self.retriever_timing_options + .secure_channel_creation_timeout, + self.retriever_timing_options.request_timeout, + ); + + let credential_and_purpose_key = client + .ask(&self.ctx, "credential_issuer", Request::post("/")) + .await? + .success()?; + + info!( + "Retrieved a new credential for {} from {}", + subject, &self.issuer_info.route + ); + + let _ = self + .secure_channels + .identities() + .credentials() + .credentials_verification() + .verify_credential( + Some(subject), + &[self.issuer().clone()], + &credential_and_purpose_key, + ) + .await?; + trace!("The retrieved credential is valid"); + Ok(credential_and_purpose_key) + } + + /// Identifier of the credential issuer + fn issuer(&self) -> &Identifier { + &self.issuer_info.issuer + } +} diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever.rs deleted file mode 100644 index ba560174faa..00000000000 --- a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever.rs +++ /dev/null @@ -1,370 +0,0 @@ -use core::cmp::max; -use tracing::{debug, error, info, trace, warn}; - -use ockam_core::api::Request; -use ockam_core::compat::sync::{Arc, RwLock}; -use ockam_core::compat::time::Duration; -use ockam_core::compat::vec::Vec; -use ockam_core::{route, Address, Result}; -use ockam_node::compat::asynchronous::Mutex; -use ockam_node::Context; -use ockam_transport_core::Transport; - -use crate::models::CredentialAndPurposeKey; -use crate::utils::now; -use crate::{ - CachedCredentialRetriever, Identifier, RemoteCredentialRetrieverInfo, SecureChannels, - SecureClient, TimestampInSeconds, DEFAULT_CREDENTIAL_CLOCK_SKEW_GAP, -}; - -/// This is the default interval before a credential expiration when we'll query for -/// a new credential to avoid it expiring before we got a new one. -pub const DEFAULT_PROACTIVE_REFRESH_CREDENTIAL_TIME_GAP: TimestampInSeconds = - TimestampInSeconds(60); - -/// Default minimal interval before 2 refreshed in case we retry the refresh. -pub const DEFAULT_MIN_REFRESH_CREDENTIAL_INTERVAL: Duration = Duration::from_secs(10); - -/// Default timeout for requesting credential from the authority -pub const DEFAULT_CREDENTIAL_REQUEST_TIMEOUT: Duration = Duration::from_secs(15); - -/// Default timeout for creating secure channel to the authority -pub const DEFAULT_CREDENTIAL_SECURE_CHANNEL_CREATION_TIMEOUT: Duration = Duration::from_secs(30); - -/// Start refresh in the background before it expires -pub const DEFAULT_CREDENTIAL_PROACTIVE_REFRESH_GAP: TimestampInSeconds = TimestampInSeconds(60); - -/// Timing options for retrieving remote credentials -#[derive(Clone, Copy)] -pub struct RemoteCredentialRetrieverTimingOptions { - /// Timeout for request to the Authority node - pub request_timeout: Duration, - /// Timeout for creating secure channel to the Authority node - pub secure_channel_creation_timeout: Duration, - /// Minimum interval before refresh requests to the Authority node - pub min_refresh_interval: Duration, - /// Time gap used to request a new credential before the old one actually expires - pub proactive_refresh_gap: TimestampInSeconds, - /// Time gap used to consider credential expired before its actual expiration - /// to account for time errors on different machines - pub clock_skew_gap: TimestampInSeconds, -} - -impl Default for RemoteCredentialRetrieverTimingOptions { - fn default() -> Self { - Self { - request_timeout: DEFAULT_CREDENTIAL_REQUEST_TIMEOUT, - secure_channel_creation_timeout: DEFAULT_CREDENTIAL_SECURE_CHANNEL_CREATION_TIMEOUT, - min_refresh_interval: DEFAULT_MIN_REFRESH_CREDENTIAL_INTERVAL, - proactive_refresh_gap: DEFAULT_PROACTIVE_REFRESH_CREDENTIAL_TIME_GAP, - clock_skew_gap: DEFAULT_CREDENTIAL_CLOCK_SKEW_GAP, - } - } -} - -#[derive(Clone)] -pub(super) struct LastPresentedCredential { - pub(super) credential: CredentialAndPurposeKey, - pub(super) expires_at: TimestampInSeconds, -} - -/// Credentials retriever for credentials located on a different node -#[derive(Clone)] -pub struct RemoteCredentialRetriever { - ctx: Arc, - transport: Arc, - secure_channels: Arc, - pub(super) issuer_info: RemoteCredentialRetrieverInfo, - pub(super) subject: Identifier, - pub(super) timing_options: RemoteCredentialRetrieverTimingOptions, - - is_initialized: Arc>, - pub(super) last_presented_credential: Arc>>, - /// Subscribers addresses that we will notify when credential is refreshed - pub(super) subscribers: Arc>>, -} - -impl RemoteCredentialRetriever { - /// Create a new remote credential retriever - pub fn new( - ctx: Context, - transport: Arc, - secure_channels: Arc, - issuer_info: RemoteCredentialRetrieverInfo, - subject: Identifier, - timing_options: RemoteCredentialRetrieverTimingOptions, - ) -> Self { - debug!( - "Creation of RemoteCredentialRetriever for: {}, authority: {}", - subject, issuer_info.issuer - ); - - Self { - ctx: Arc::new(ctx), - transport, - secure_channels, - issuer_info, - subject, - timing_options, - is_initialized: Arc::new(Mutex::new(false)), - last_presented_credential: Arc::new(RwLock::new(None)), - subscribers: Default::default(), - } - } - - pub(super) async fn initialize_impl(&self) -> Result<()> { - let mut is_initialized = self.is_initialized.lock().await; - if *is_initialized { - return Ok(()); - } - - debug!( - "Initialization of RemoteCredentialRetriever for: {}, authority: {}", - self.subject, self.issuer_info.issuer - ); - - let now = now()?; - - // Get a credential from the storage - let last_presented_credential = match CachedCredentialRetriever::retrieve_impl( - &self.issuer_info.issuer, - &self.subject, - now, - self.secure_channels - .identities - .cached_credentials_repository(), - self.timing_options.clock_skew_gap, - ) - .await? - { - None => None, - Some(last_presented_credential) => { - let expires_at = last_presented_credential.get_expires_at()?; - Some(LastPresentedCredential { - credential: last_presented_credential, - expires_at, - }) - } - }; - - *self.last_presented_credential.write().unwrap() = last_presented_credential; - - let refresh_in = self.compute_refresh_duration(now, false); - - if !refresh_in.has_valid_credential { - // We don't have a valid credential - refresh immediately and wait for the result - debug!( - "Creation of RemoteCredentialRetriever for: {}, authority: {} requires immediate credential refresh", - self.subject, self.issuer_info.issuer - ); - self.get_new_credential().await?; - } else { - // We still have a valid credential - schedule refresh in the background - self.schedule_credentials_refresh_impl(refresh_in.duration, false); - } - - *is_initialized = true; - - Ok(()) - } -} - -struct RefreshDuration { - duration: Duration, - has_valid_credential: bool, -} - -impl RemoteCredentialRetriever { - fn compute_refresh_duration(&self, now: TimestampInSeconds, is_retry: bool) -> RefreshDuration { - let last_presented_credential_expires_at = self - .last_presented_credential - .read() - .unwrap() - .as_ref() - .map(|c| c.expires_at) - .unwrap_or(now); - - let mut has_valid_credential = false; - let refresh_in = if last_presented_credential_expires_at - <= now + self.timing_options.clock_skew_gap - { - // Credential is considered expired. We already need to refresh. - 0.into() - } else if last_presented_credential_expires_at - <= now + self.timing_options.clock_skew_gap + self.timing_options.proactive_refresh_gap - { - // Credential is not expired, but it's already time to refresh it - has_valid_credential = true; - 0.into() - } else { - // Credential is not expired, and will need refresh later - last_presented_credential_expires_at - - now - - self.timing_options.clock_skew_gap - - self.timing_options.proactive_refresh_gap - }; - let refresh_in = Duration::from(refresh_in); - - let refresh_in = if is_retry { - // Avoid too many request to the credential_retriever, the refresh can't be sooner than - // self.min_credential_refresh_interval if it's a retry - max(self.timing_options.min_refresh_interval, refresh_in) - } else { - refresh_in - }; - - RefreshDuration { - duration: refresh_in, - has_valid_credential, - } - } - - /// Schedule a DelayedEvent that will at specific point in time put a message - /// into EncryptorWorker's own internal mailbox which it will use as a trigger to get a new - /// credential and present it to the other side. - fn schedule_credentials_refresh(&self, now: TimestampInSeconds, is_retry: bool) { - let refresh_in = self.compute_refresh_duration(now, is_retry); - - self.schedule_credentials_refresh_impl(refresh_in.duration, is_retry); - } - - async fn notify_subscribers(&self) -> Result<()> { - let subscribers = self.subscribers.read().unwrap().clone(); - for subscriber in subscribers { - match self.ctx.send(route![subscriber.clone()], ()).await { - Ok(_) => { - debug!( - "Notified RemoteCredentialRetriever subscriber {}", - subscriber - ) - } - Err(_err) => { - warn!( - "Error notifying RemoteCredentialRetriever subscriber {}", - subscriber - ); - } - } - } - - Ok(()) - } - - /// Schedule a DelayedEvent that will at specific point in time put a message - /// into EncryptorWorker's own internal mailbox which it will use as a trigger to get a new - /// credential and present it to the other side. - fn schedule_credentials_refresh_impl(&self, refresh_in: Duration, is_retry: bool) { - let is_retry_str = if is_retry { " retry " } else { " " }; - info!( - "Scheduling background credentials refresh{}from {} in {} seconds", - is_retry_str, - self.issuer_info.issuer, - refresh_in.as_secs() - ); - - self.request_new_credential_in_background(refresh_in, is_retry); - } -} - -impl RemoteCredentialRetriever { - async fn get_new_credential(&self) -> Result<()> { - let cache = self - .secure_channels - .identities - .cached_credentials_repository(); - - let client = SecureClient::new( - self.secure_channels.clone(), - None, - self.transport.clone(), - self.issuer_info.route.clone(), - &self.issuer_info.issuer, - &self.subject, - self.timing_options.secure_channel_creation_timeout, - self.timing_options.request_timeout, - ); - - let credential = client - .ask(&self.ctx, "credential_issuer", Request::post("/")) - .await? - .success()?; - - info!( - "Retrieved a new credential for {} from {}", - self.subject, &self.issuer_info.route - ); - - let credential_and_purpose_key_data = self - .secure_channels - .identities() - .credentials() - .credentials_verification() - .verify_credential( - Some(&self.subject), - &[self.issuer_info.issuer.clone()], - &credential, - ) - .await?; - let expires_at = credential_and_purpose_key_data.credential_data.expires_at; - - trace!("The retrieved credential is valid"); - - *self.last_presented_credential.write().unwrap() = Some(LastPresentedCredential { - credential: credential.clone(), - expires_at, - }); - - let caching_res = cache - .put( - &self.subject, - &self.issuer_info.issuer, - expires_at, - credential, - ) - .await; - - if let Some(err) = caching_res.err() { - error!( - "Error caching credential for {} from {}. Err={}", - self.subject, &self.issuer_info.issuer, err - ); - } - - self.notify_subscribers().await?; - let now = now()?; - - self.schedule_credentials_refresh(now, false); - - Ok(()) - } - - fn request_new_credential_in_background(&self, wait: Duration, is_retry: bool) { - let s = self.clone(); - ockam_node::spawn(async move { - let is_retry_str = if is_retry { " retry " } else { " " }; - info!( - "Scheduled background credentials refresh{}from {} in {} seconds", - is_retry_str, - s.issuer_info.issuer, - wait.as_secs() - ); - s.ctx - .sleep_long_until(*now().unwrap() + wait.as_secs()) - .await; - info!( - "Executing background credentials refresh{}from {}", - is_retry_str, s.issuer_info.issuer, - ); - let res = s.get_new_credential().await; - - if let Some(err) = res.err() { - error!( - "Error refreshing credential for {} in the background: {}", - s.subject, err - ); - - s.schedule_credentials_refresh(now().unwrap(), true); - } - }); - } -} diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever_creator.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever_creator.rs deleted file mode 100644 index 616598b572a..00000000000 --- a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever_creator.rs +++ /dev/null @@ -1,120 +0,0 @@ -use ockam_core::compat::boxed::Box; -use ockam_core::compat::collections::BTreeMap; -use ockam_core::compat::sync::Arc; -use ockam_core::{async_trait, Address, AllowAll, DenyAll, Mailboxes, Result}; -use ockam_node::compat::asynchronous::RwLock; -use ockam_node::Context; -use ockam_transport_core::Transport; -use tracing::debug; - -use crate::{ - CredentialRetriever, CredentialRetrieverCreator, Identifier, RemoteCredentialRetriever, - RemoteCredentialRetrieverInfo, RemoteCredentialRetrieverTimingOptions, SecureChannels, -}; - -/// Creator for [`RemoteCredentialRetriever`] -pub struct RemoteCredentialRetrieverCreator { - ctx: Context, - transport: Arc, - secure_channels: Arc, - info: RemoteCredentialRetrieverInfo, - timing_options: RemoteCredentialRetrieverTimingOptions, - - // Should be only one retriever per subject Identifier - registry: RwLock>>, -} - -impl RemoteCredentialRetrieverCreator { - /// Constructor - pub fn new( - ctx: Context, - transport: Arc, - secure_channels: Arc, - info: RemoteCredentialRetrieverInfo, - ) -> Self { - Self { - ctx, - transport, - secure_channels, - info, - timing_options: Default::default(), - registry: Default::default(), - } - } - - /// Constructor - pub fn new_extended( - ctx: Context, - transport: Arc, - secure_channels: Arc, - info: RemoteCredentialRetrieverInfo, - timing_options: RemoteCredentialRetrieverTimingOptions, - ) -> Self { - Self { - ctx, - transport, - secure_channels, - info, - timing_options, - registry: Default::default(), - } - } -} - -#[async_trait] -impl CredentialRetrieverCreator for RemoteCredentialRetrieverCreator { - async fn create(&self, subject: &Identifier) -> Result> { - debug!( - "Requested RemoteCredentialRetriever for: {}, authority: {}", - subject, self.info.issuer - ); - - let registry = self.registry.read().await; - if let Some(existing_retriever) = registry.get(subject) { - debug!( - "Returning existing RemoteCredentialRetriever for: {}, authority: {}", - subject, self.info.issuer - ); - return Ok(existing_retriever.clone()); - } - - drop(registry); - - let mut registry = self.registry.write().await; - if let Some(existing_retriever) = registry.get(subject) { - debug!( - "Returning existing RemoteCredentialRetriever for: {}, authority: {}", - subject, self.info.issuer - ); - return Ok(existing_retriever.clone()); - } - - debug!( - "Creating new RemoteCredentialRetriever for: {}, authority: {}", - subject, self.info.issuer - ); - let mailboxes = Mailboxes::main( - Address::random_tagged("RemoteCredentialRetriever"), - Arc::new(DenyAll), - Arc::new(AllowAll), - ); - let ctx = self.ctx.new_detached_with_mailboxes(mailboxes).await?; - let retriever = RemoteCredentialRetriever::new( - ctx, - self.transport.clone(), - self.secure_channels.clone(), - self.info.clone(), - subject.clone(), - self.timing_options, - ); - debug!( - "Created RemoteCredentialRetriever for: {}, authority: {}", - subject, self.info.issuer - ); - let retriever = Arc::new(retriever); - - registry.insert(subject.clone(), retriever.clone()); - - Ok(retriever) - } -} diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever_trait_impl.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever_trait_impl.rs deleted file mode 100644 index b08e138d401..00000000000 --- a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever_trait_impl.rs +++ /dev/null @@ -1,63 +0,0 @@ -use tracing::debug; - -use ockam_core::compat::boxed::Box; -use ockam_core::{async_trait, Address, Result}; - -use crate::models::CredentialAndPurposeKey; -use crate::utils::now; -use crate::{CredentialRetriever, IdentityError, RemoteCredentialRetriever}; - -#[async_trait] -impl CredentialRetriever for RemoteCredentialRetriever { - async fn initialize(&self) -> Result<()> { - self.initialize_impl().await - } - - async fn retrieve(&self) -> Result { - debug!( - "Requested credential for: {} from: {}", - self.subject, self.issuer_info.issuer - ); - - // Try to get last cached in memory credential - let last_presented_credential = match self.last_presented_credential.read().unwrap().clone() - { - Some(last_presented_credential) => last_presented_credential, - None => return Err(IdentityError::NoCredential)?, - }; - - let now = now()?; - // Check if it's still valid - if last_presented_credential.expires_at > now + self.timing_options.clock_skew_gap { - // Valid, let's return it - return Ok(last_presented_credential.credential); - } - - // TODO: Sometimes worth blocking and waiting for the refresh to happen - - Err(IdentityError::NoCredential)? - } - - fn subscribe(&self, address: &Address) -> Result<()> { - let mut subscribers = self.subscribers.write().unwrap(); - - if subscribers.contains(address) { - return Err(IdentityError::AddressAlreadySubscribedForThatCredentialRetriever)?; - } - - subscribers.push(address.clone()); - - Ok(()) - } - - fn unsubscribe(&self, address: &Address) -> Result<()> { - let mut subscribers = self.subscribers.write().unwrap(); - - if let Some(i) = subscribers.iter().position(|x| x == address) { - subscribers.remove(i); - Ok(()) - } else { - Err(IdentityError::AddressIsNotSubscribedForThatCredentialRetriever)? - } - } -} diff --git a/implementations/rust/ockam/ockam_identity/src/error.rs b/implementations/rust/ockam/ockam_identity/src/error.rs index 055d731bbb6..f26d786c4a4 100644 --- a/implementations/rust/ockam/ockam_identity/src/error.rs +++ b/implementations/rust/ockam/ockam_identity/src/error.rs @@ -58,14 +58,12 @@ pub enum IdentityError { ConsistencyError, /// Secret Key doesn't correspond to the Identity WrongSecretKey, - /// CredentialRetriever was already set - CredentialRetrieverCreatorAlreadySet, /// Credential is missing in the cache CachedCredentialMissing, /// Given address is already a subscriber for that RemoteCredentialRetriever - AddressAlreadySubscribedForThatCredentialRetriever, + AddressAlreadySubscribedForThatCredentialRefresher, /// Given address hasn't been subscribed for that RemoteCredentialRetriever - AddressIsNotSubscribedForThatCredentialRetriever, + AddressIsNotSubscribedForThatCredentialRefresher, /// Credential retriever couldn't return a credential NoCredential, } diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs index e0d76490a8e..82bc00befc6 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs @@ -15,8 +15,8 @@ use crate::secure_channel::addresses::Addresses; use crate::secure_channel::api::{EncryptionRequest, EncryptionResponse}; use crate::secure_channel::encryptor::Encryptor; use crate::{ - ChangeHistoryRepository, CredentialRetriever, Identifier, IdentityError, - PlaintextPayloadMessage, RefreshCredentialsMessage, SecureChannelMessage, + ChangeHistoryRepository, CredentialAndPurposeKeyMessage, CredentialRefresher, Identifier, + IdentityError, PlaintextPayloadMessage, RefreshCredentialsMessage, SecureChannelMessage, }; #[derive(Debug, Clone)] @@ -33,7 +33,7 @@ pub(crate) struct EncryptorWorker { encryptor: Encryptor, my_identifier: Identifier, change_history_repository: Arc, - credential_retriever: Option>, + credential_refresher: Option>, last_presented_credential: Option, shared_state: SecureChannelSharedState, } @@ -47,7 +47,7 @@ impl EncryptorWorker { encryptor: Encryptor, my_identifier: Identifier, change_history_repository: Arc, - credential_retriever: Option>, + credential_refresher: Option>, last_presented_credential: Option, shared_state: SecureChannelSharedState, ) -> Self { @@ -58,7 +58,7 @@ impl EncryptorWorker { encryptor, my_identifier, change_history_repository, - credential_retriever, + credential_refresher, last_presented_credential, shared_state, } @@ -160,29 +160,17 @@ impl EncryptorWorker { /// Asks credential retriever for a new credential and presents it to the other side, including /// the latest change_history - async fn handle_refresh_credentials(&mut self, ctx: &::Context) -> Result<()> { + async fn handle_refresh_credentials( + &mut self, + ctx: &::Context, + credential_and_purpose_key: CredentialAndPurposeKey, + ) -> Result<()> { debug!( "Started credentials refresh for {}", self.addresses.encryptor ); - let credential_retriever = match &self.credential_retriever { - Some(credential_retriever) => credential_retriever, - None => return Err(IdentityError::NoCredentialRetriever)?, - }; - - let credential = match credential_retriever.retrieve().await { - Ok(credential) => credential, - Err(err) => { - error!( - "Credentials refresh failed for {} with error={}", - self.addresses.encryptor, err, - ); - return Err(err); - } - }; - - if Some(&credential) == self.last_presented_credential.as_ref() { + if Some(&credential_and_purpose_key) == self.last_presented_credential.as_ref() { // Credential hasn't actually changed warn!( "Credentials refresh for {} cancelled since credential hasn't changed", @@ -208,7 +196,7 @@ impl EncryptorWorker { let msg = RefreshCredentialsMessage { change_history, - credentials: vec![credential.clone()], + credentials: vec![credential_and_purpose_key.clone()], }; let msg = SecureChannelMessage::RefreshCredentials(msg); @@ -227,7 +215,7 @@ impl EncryptorWorker { ) .await?; - self.last_presented_credential = Some(credential); + self.last_presented_credential = Some(credential_and_purpose_key); Ok(()) } @@ -256,10 +244,9 @@ impl Worker for EncryptorWorker { type Context = Context; async fn initialize(&mut self, _ctx: &mut Self::Context) -> Result<()> { - if let Some(credential_retriever) = &self.credential_retriever { - credential_retriever.subscribe(&self.addresses.encryptor_internal)?; + if let Some(remote_credential_refresher) = &self.credential_refresher { + remote_credential_refresher.subscribe(&self.addresses.encryptor_internal)?; } - Ok(()) } @@ -275,7 +262,9 @@ impl Worker for EncryptorWorker { } else if msg_addr == self.addresses.encryptor_api { self.handle_encrypt_api(ctx, msg).await?; } else if msg_addr == self.addresses.encryptor_internal { - self.handle_refresh_credentials(ctx).await?; + let credential_and_purpose_key = CredentialAndPurposeKeyMessage::decode(msg.payload())?; + self.handle_refresh_credentials(ctx, credential_and_purpose_key.0) + .await?; } else { return Err(IdentityError::UnknownChannelMsgDestination)?; } @@ -284,8 +273,8 @@ impl Worker for EncryptorWorker { } async fn shutdown(&mut self, context: &mut Self::Context) -> Result<()> { - if let Some(credential_retriever) = &self.credential_retriever { - credential_retriever.unsubscribe(&self.addresses.encryptor_internal)?; + if let Some(remote_credential_refresher) = &self.credential_refresher { + remote_credential_refresher.unsubscribe(&self.addresses.encryptor_internal)?; } let _ = context diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_state_machine.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_state_machine.rs index 8470c6ac0fa..8419d3c4d1b 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_state_machine.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_state_machine.rs @@ -107,7 +107,9 @@ impl CommonStateMachine { // prepare the payload that will be sent either in message 2 or message 3 let change_history = self.identities.get_change_history(&self.identifier).await?; let credential = match &self.credential_retriever { - Some(credential_retriever) => Some(credential_retriever.retrieve().await?), + Some(credential_retriever) => { + Some(credential_retriever.retrieve(&self.identifier).await?) + } None => None, }; diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs index 238b9866249..383df12144f 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs @@ -4,8 +4,8 @@ use core::time::Duration; use ockam_core::compat::{boxed::Box, vec::Vec}; use ockam_core::errcode::{Kind, Origin}; use ockam_core::{ - AllowAll, Any, Decodable, DenyAll, Error, Mailbox, Mailboxes, OutgoingAccessControl, Route, - Routed, + AllowAll, Any, AsyncTryClone, Decodable, DenyAll, Error, Mailbox, Mailboxes, + OutgoingAccessControl, Route, Routed, }; use ockam_core::{AllowOnwardAddress, Result, Worker}; use ockam_node::callback::CallbackSender; @@ -27,8 +27,10 @@ use crate::secure_channel::handshake::initiator_state_machine::InitiatorStateMac use crate::secure_channel::handshake::responder_state_machine::ResponderStateMachine; use crate::secure_channel::{Addresses, Role}; use crate::{ - ChangeHistoryRepository, CredentialRetriever, IdentityError, SecureChannelPurposeKey, - SecureChannelRegistryEntry, SecureChannels, TrustPolicy, + CachedCredentialRetriever, ChangeHistoryRepository, CredentialRefresher, CredentialRetriever, + CredentialRetrieverOptions, IdentityError, MemoryCredentialRetriever, + RemoteCachedCredentialRetriever, SecureChannelPurposeKey, SecureChannelRegistryEntry, + SecureChannels, TrustPolicy, }; /// This struct implements a Worker receiving and sending messages @@ -46,7 +48,7 @@ pub(crate) struct HandshakeWorker { authority: Option, change_history_repository: Arc, - credential_retriever: Option>, + credential_refresher: Option>, shared_state: SecureChannelSharedState, } @@ -59,8 +61,8 @@ impl Worker for HandshakeWorker { /// Initialize the state machine with an `Initialize` event /// Depending on the state machine role there might be a message to send to the other party async fn initialize(&mut self, context: &mut Self::Context) -> Result<()> { - if let Some(credential_retriever) = &self.credential_retriever { - credential_retriever.initialize().await?; + if let Some(remote_credential_refresher) = &self.credential_refresher { + remote_credential_refresher.initialize().await?; } match self.state_machine.on_event(Initialize).await? { @@ -167,14 +169,21 @@ impl HandshakeWorker { purpose_key: SecureChannelPurposeKey, trust_policy: Arc, decryptor_outgoing_access_control: Arc, - credential_retriever: Option>, + credential_retriever_options: CredentialRetrieverOptions, authority: Option, remote_route: Option, timeout: Option, role: Role, ) -> Result<()> { - let vault = secure_channels.identities.vault().secure_channel_vault; let identities = secure_channels.identities(); + let vault = identities.vault().secure_channel_vault; + let (credential_retriever, credential_refresher) = make_credential_retriever( + context, + credential_retriever_options, + secure_channels.clone(), + &identifier, + ) + .await?; let state_machine: Box = if role.is_initiator() { Box::new( @@ -214,6 +223,7 @@ impl HandshakeWorker { let shared_state = SecureChannelSharedState { should_send_close: Arc::new(AtomicBool::new(true)), }; + let worker = Self { secure_channels, callback_sender, @@ -223,7 +233,7 @@ impl HandshakeWorker { remote_route: remote_route.clone(), addresses: addresses.clone(), decryptor_handler: None, - credential_retriever, + credential_refresher, authority, change_history_repository: identities.change_history_repository(), shared_state, @@ -339,7 +349,7 @@ impl HandshakeWorker { ), self.identifier.clone(), self.change_history_repository.clone(), - self.credential_retriever.clone(), + self.credential_refresher.clone(), handshake_results.presented_credential, self.shared_state.clone(), ); @@ -402,3 +412,45 @@ impl HandshakeWorker { Ok(decryptor) } } + +async fn make_credential_retriever( + context: &Context, + options: CredentialRetrieverOptions, + secure_channels: Arc, + identifier: &Identifier, +) -> Result<( + Option>, + Option>, +)> { + match options { + CredentialRetrieverOptions::None => Ok((None, None)), + CredentialRetrieverOptions::CacheOnly(issuer) => { + let credential_retriever = Arc::new(CachedCredentialRetriever::new( + issuer.clone(), + secure_channels.identities().cached_credentials_repository(), + 0.into(), + )); + Ok((Some(credential_retriever), None)) + } + + CredentialRetrieverOptions::InMemory(credential) => Ok(( + Some(Arc::new(MemoryCredentialRetriever::new(credential))), + None, + )), + CredentialRetrieverOptions::Remote { + retriever_info, + retriever_timing_options, + refresher_timing_options, + } => { + let credential_retriever = Arc::new(RemoteCachedCredentialRetriever::new( + Arc::new(context.async_try_clone().await?), + secure_channels.clone(), + retriever_info.clone(), + retriever_timing_options, + refresher_timing_options, + )); + let credential_refresher = credential_retriever.make_refresher(identifier).await?; + Ok((Some(credential_retriever), Some(credential_refresher))) + } + } +} diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/listener.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/listener.rs index 1a4fec3528a..c2c428372b2 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/listener.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/listener.rs @@ -75,17 +75,6 @@ impl Worker for SecureChannelListenerWorker { .get_or_create_secure_channel_purpose_key(&self.identifier) .await?; - let credential_retriever = match &self.options.credential_retriever_creator { - Some(credential_retriever_creator) => { - // Only create, initialization should not happen here to avoid blocking listener - let credential_retriever = credential_retriever_creator - .create(&self.identifier) - .await?; - Some(credential_retriever) - } - None => None, - }; - HandshakeWorker::create( ctx, self.secure_channels.clone(), @@ -94,7 +83,7 @@ impl Worker for SecureChannelListenerWorker { purpose_key, self.options.trust_policy.clone(), access_control.decryptor_outgoing_access_control, - credential_retriever, + self.options.credential_retriever_options.clone(), self.options.authority.clone(), None, None, diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/options.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/options.rs index 67c86d6e2a6..5e66d69960d 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/options.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/options.rs @@ -5,10 +5,7 @@ use ockam_core::{Address, OutgoingAccessControl, Result}; use crate::models::CredentialAndPurposeKey; use crate::secure_channel::Addresses; -use crate::{ - CredentialRetrieverCreator, Identifier, IdentityError, MemoryCredentialRetrieverCreator, - TrustEveryonePolicy, TrustPolicy, -}; +use crate::{CredentialRetrieverOptions, Identifier, TrustEveryonePolicy, TrustPolicy}; use core::fmt; use core::fmt::Formatter; @@ -24,7 +21,7 @@ pub struct SecureChannelOptions { // To verify other party's credentials pub(crate) authority: Option, // To obtain our credentials - pub(crate) credential_retriever_creator: Option>, + pub(crate) credential_retriever_options: CredentialRetrieverOptions, pub(crate) timeout: Duration, } @@ -46,7 +43,7 @@ impl SecureChannelOptions { flow_control_id: FlowControls::generate_flow_control_id(), trust_policy: Arc::new(TrustEveryonePolicy), authority: None, - credential_retriever_creator: None, + credential_retriever_options: CredentialRetrieverOptions::None, timeout: DEFAULT_TIMEOUT, } } @@ -57,23 +54,18 @@ impl SecureChannelOptions { self } - /// Set [`CredentialRetrieverCreator`] - pub fn with_credential_retriever_creator( + /// Set [`CredentialRetriever`] + pub fn with_credential_retriever_options( mut self, - credential_retriever_creator: Arc, - ) -> Result { - if self.credential_retriever_creator.is_some() { - return Err(IdentityError::CredentialRetrieverCreatorAlreadySet.into()); - } - self.credential_retriever_creator = Some(credential_retriever_creator); - Ok(self) + credential_retriever_options: CredentialRetrieverOptions, + ) -> Self { + self.credential_retriever_options = credential_retriever_options; + self } /// Set credential - pub fn with_credential(self, credential: CredentialAndPurposeKey) -> Result { - self.with_credential_retriever_creator(Arc::new(MemoryCredentialRetrieverCreator::new( - credential, - ))) + pub fn with_credential(self, credential: CredentialAndPurposeKey) -> Self { + self.with_credential_retriever_options(CredentialRetrieverOptions::InMemory(credential)) } /// Sets Trusted Authority @@ -143,7 +135,7 @@ pub struct SecureChannelListenerOptions { // To verify other party's credentials pub(crate) authority: Option, // To obtain our credentials - pub(crate) credential_retriever_creator: Option>, + pub(crate) credential_retriever_options: CredentialRetrieverOptions, } impl fmt::Debug for SecureChannelListenerOptions { @@ -163,7 +155,7 @@ impl SecureChannelListenerOptions { flow_control_id: FlowControls::generate_flow_control_id(), trust_policy: Arc::new(TrustEveryonePolicy), authority: None, - credential_retriever_creator: None, + credential_retriever_options: CredentialRetrieverOptions::None, } } @@ -176,23 +168,18 @@ impl SecureChannelListenerOptions { self } - /// Set [`CredentialRetrieverCreator`] - pub fn with_credential_retriever_creator( + /// Set [`CredentialRetrieverOptions`] + pub fn with_credential_retriever_options( mut self, - credential_retriever_creator: Arc, - ) -> Result { - if self.credential_retriever_creator.is_some() { - return Err(IdentityError::CredentialRetrieverCreatorAlreadySet.into()); - } - self.credential_retriever_creator = Some(credential_retriever_creator); - Ok(self) + credential_retriever_options: CredentialRetrieverOptions, + ) -> Self { + self.credential_retriever_options = credential_retriever_options; + self } /// Set credential - pub fn with_credential(self, credential: CredentialAndPurposeKey) -> Result { - self.with_credential_retriever_creator(Arc::new(MemoryCredentialRetrieverCreator::new( - credential, - ))) + pub fn with_credential(self, credential: CredentialAndPurposeKey) -> Self { + self.with_credential_retriever_options(CredentialRetrieverOptions::InMemory(credential)) } /// Sets Trusted Authority diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channels/secure_channels.rs b/implementations/rust/ockam/ockam_identity/src/secure_channels/secure_channels.rs index 3ecb68d744a..1829aac9138 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channels/secure_channels.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channels/secure_channels.rs @@ -113,15 +113,6 @@ impl SecureChannels { .get_or_create_secure_channel_purpose_key(identifier) .await?; - let credential_retriever = match &options.credential_retriever_creator { - Some(credential_retriever_creator) => { - let credential_retriever = credential_retriever_creator.create(identifier).await?; - credential_retriever.initialize().await?; - Some(credential_retriever) - } - None => None, - }; - HandshakeWorker::create( ctx, Arc::new(self.clone()), @@ -130,7 +121,7 @@ impl SecureChannels { purpose_key, options.trust_policy, access_control.decryptor_outgoing_access_control, - credential_retriever, + options.credential_retriever_options.clone(), options.authority, Some(route), Some(options.timeout), diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channels/secure_client.rs b/implementations/rust/ockam/ockam_identity/src/secure_channels/secure_client.rs index af7a5efe7fd..6963b9465a1 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channels/secure_client.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channels/secure_client.rs @@ -1,4 +1,4 @@ -use crate::{CredentialRetrieverCreator, Identifier, SecureChannelOptions, TrustIdentifierPolicy}; +use crate::{CredentialRetrieverOptions, Identifier, SecureChannelOptions, TrustIdentifierPolicy}; use minicbor::{Decode, Encode}; use tracing::error; @@ -30,8 +30,8 @@ use ockam_transport_core::Transport; pub struct SecureClient { // secure_channels is used to create a secure channel before sending a request secure_channels: Arc, - // Credential retriever - credential_retriever_creator: Option>, + // Credential retriever options if a credential retriever is required + credential_retriever_options: CredentialRetrieverOptions, // transport to instantiate connections transport: Arc, // destination for the secure channel @@ -51,7 +51,7 @@ impl SecureClient { #[allow(clippy::too_many_arguments)] pub fn new( secure_channels: Arc, - credential_retriever_creator: Option>, + credential_retriever_options: CredentialRetrieverOptions, transport: Arc, server_route: Route, server_identifier: &Identifier, @@ -61,7 +61,7 @@ impl SecureClient { ) -> SecureClient { Self { secure_channels, - credential_retriever_creator, + credential_retriever_options, transport, secure_route: server_route, server_identifier: server_identifier.clone(), @@ -76,9 +76,9 @@ impl SecureClient { self.secure_channels.clone() } - /// CredentialRetriever - pub fn credential_retriever_creator(&self) -> Option> { - self.credential_retriever_creator.clone() + /// CredentialRetrieverOptions + pub fn credential_retriever_options(&self) -> CredentialRetrieverOptions { + self.credential_retriever_options.clone() } /// Transport @@ -237,11 +237,7 @@ impl SecureClient { .with_timeout(self.secure_channel_timeout); let options = - if let Some(credential_retriever_creator) = self.credential_retriever_creator.clone() { - options.with_credential_retriever_creator(credential_retriever_creator)? - } else { - options - }; + options.with_credential_retriever_options(self.credential_retriever_options.clone()); let secure_channel = self .secure_channels diff --git a/implementations/rust/ockam/ockam_identity/tests/channel.rs b/implementations/rust/ockam/ockam_identity/tests/channel.rs index ec0f2faad22..28ec3f56d35 100644 --- a/implementations/rust/ockam/ockam_identity/tests/channel.rs +++ b/implementations/rust/ockam/ockam_identity/tests/channel.rs @@ -125,7 +125,7 @@ async fn test_channel_send_credentials(context: &mut Context) -> Result<()> { "bob_listener", SecureChannelListenerOptions::new() .with_authority(authority.clone()) - .with_credential(bob_credential_2)?, + .with_credential(bob_credential_2), ) .await?; @@ -164,7 +164,7 @@ async fn test_channel_send_credentials(context: &mut Context) -> Result<()> { route!["bob_listener"], SecureChannelOptions::new() .with_authority(authority.clone()) - .with_credential(alice_credential_2)?, + .with_credential(alice_credential_2), ) .await?; diff --git a/implementations/rust/ockam/ockam_identity/tests/credentials.rs b/implementations/rust/ockam/ockam_identity/tests/credentials.rs index 958d32c8598..b4df4970014 100644 --- a/implementations/rust/ockam/ockam_identity/tests/credentials.rs +++ b/implementations/rust/ockam/ockam_identity/tests/credentials.rs @@ -53,7 +53,7 @@ async fn full_flow_oneway(ctx: &mut Context) -> Result<()> { route!["listener"], SecureChannelOptions::new() .with_trust_policy(TrustIdentifierPolicy::new(server.clone())) - .with_credential(credential)?, + .with_credential(credential), ) .await?; @@ -102,7 +102,7 @@ async fn full_flow_twoway(ctx: &mut Context) -> Result<()> { "listener", SecureChannelListenerOptions::new() .with_authority(authority.clone()) - .with_credential(credential)?, + .with_credential(credential), ) .await?; @@ -125,7 +125,7 @@ async fn full_flow_twoway(ctx: &mut Context) -> Result<()> { route!["listener"], SecureChannelOptions::new() .with_authority(authority.clone()) - .with_credential(credential)?, + .with_credential(credential), ) .await?; @@ -195,7 +195,7 @@ async fn access_control(ctx: &mut Context) -> Result<()> { route!["listener"], SecureChannelOptions::new() .with_trust_policy(TrustIdentifierPolicy::new(server.clone())) - .with_credential(credential1)?, + .with_credential(credential1), ) .await?; let channel2 = secure_channels diff --git a/implementations/rust/ockam/ockam_identity/tests/credentials_refresh.rs b/implementations/rust/ockam/ockam_identity/tests/credentials_refresh.rs index bd57a7bdbc5..2501a9458a7 100644 --- a/implementations/rust/ockam/ockam_identity/tests/credentials_refresh.rs +++ b/implementations/rust/ockam/ockam_identity/tests/credentials_refresh.rs @@ -3,17 +3,19 @@ use std::time::Duration; use ockam_core::api::Response; use ockam_core::compat::sync::Arc; -use ockam_core::{async_trait, Any, AsyncTryClone, Routed, Worker}; +use ockam_core::{async_trait, Any, Routed, Worker}; use ockam_core::{route, Result}; use ockam_identity::models::CredentialSchemaIdentifier; use ockam_identity::secure_channels::secure_channels; use ockam_identity::utils::AttributesBuilder; use ockam_identity::{ - Credentials, Identifier, IdentitySecureChannelLocalInfo, RemoteCredentialRetrieverCreator, - RemoteCredentialRetrieverInfo, RemoteCredentialRetrieverTimingOptions, - SecureChannelListenerOptions, SecureChannelOptions, SecureChannels, + CredentialRetrieverOptions, Credentials, Identifier, IdentitySecureChannelLocalInfo, + RemoteCredentialRefresherTimingOptions, RemoteCredentialRetrieverInfo, + RemoteCredentialRetrieverTimingOptions, SecureChannelListenerOptions, SecureChannelOptions, + SecureChannels, }; use ockam_node::Context; +use ockam_transport_core::Transport; use ockam_transport_tcp::TcpTransport; struct CredentialIssuer { @@ -48,7 +50,7 @@ impl Worker for CredentialIssuer { &self.authority, &subject, AttributesBuilder::with_schema(CredentialSchemaIdentifier(1)) - .with_attribute(b"key", b"value") + .with_attribute(*b"key", *b"value") .build(), self.ttl, ) @@ -67,18 +69,21 @@ impl Worker for CredentialIssuer { #[ockam_macros::test] async fn autorefresh(ctx: &mut Context) -> Result<()> { - let timing_options = RemoteCredentialRetrieverTimingOptions { - min_refresh_interval: Duration::from_secs(1), - proactive_refresh_gap: 1.into(), - clock_skew_gap: 0.into(), + let retriever_timing_options = RemoteCredentialRetrieverTimingOptions { request_timeout: Duration::from_secs(2), ..Default::default() }; + let refresher_timing_options = RemoteCredentialRefresherTimingOptions { + clock_skew_gap: 0.into(), + min_refresh_interval: Duration::from_secs(1), + proactive_refresh_gap: 1.into(), + }; let res = init( ctx, Duration::from_secs(0), Duration::from_secs(5), - timing_options, + retriever_timing_options, + refresher_timing_options, ) .await?; @@ -90,7 +95,7 @@ async fn autorefresh(ctx: &mut Context) -> Result<()> { &res.client, route!["server_api"], SecureChannelOptions::new() - .with_credential_retriever_creator(res.retriever)? + .with_credential_retriever_options(res.retriever_options) .with_authority(res.authority.clone()), ) .await?; @@ -152,18 +157,21 @@ async fn autorefresh(ctx: &mut Context) -> Result<()> { #[ockam_macros::test] async fn init_fail(ctx: &mut Context) -> Result<()> { - let timing_options = RemoteCredentialRetrieverTimingOptions { + let retriever_timing_options = RemoteCredentialRetrieverTimingOptions { + request_timeout: Duration::from_secs(2), + ..Default::default() + }; + let refresher_timing_options = RemoteCredentialRefresherTimingOptions { min_refresh_interval: Duration::from_secs(1), proactive_refresh_gap: 1.into(), clock_skew_gap: 0.into(), - request_timeout: Duration::from_secs(2), - ..Default::default() }; let res = init( ctx, Duration::from_secs(0), Duration::from_secs(5), - timing_options, + retriever_timing_options, + refresher_timing_options, ) .await?; @@ -176,7 +184,7 @@ async fn init_fail(ctx: &mut Context) -> Result<()> { &res.client, route!["server_api"], SecureChannelOptions::new() - .with_credential_retriever_creator(res.retriever)? + .with_credential_retriever_options(res.retriever_options) .with_authority(res.authority.clone()), ) .await; @@ -199,14 +207,15 @@ struct InitResult { server_secure_channels: Arc, authority_secure_channels: Arc, - retriever: Arc, + retriever_options: CredentialRetrieverOptions, } async fn init( ctx: &Context, delay: Duration, ttl: Duration, - timing_options: RemoteCredentialRetrieverTimingOptions, + retriever_timing_options: RemoteCredentialRetrieverTimingOptions, + refresher_timing_options: RemoteCredentialRefresherTimingOptions, ) -> Result { let tcp = TcpTransport::create(ctx).await?; @@ -273,17 +282,16 @@ async fn init( ) .await?; - let retriever = Arc::new(RemoteCredentialRetrieverCreator::new_extended( - ctx.async_try_clone().await?, - Arc::new(tcp), - client_secure_channels.clone(), - RemoteCredentialRetrieverInfo::new( + let retriever_options = CredentialRetrieverOptions::Remote { + retriever_info: RemoteCredentialRetrieverInfo::new( authority.clone(), route!["authority_api"], "credential_issuer".into(), + tcp.transport_type(), ), - timing_options, - )); + retriever_timing_options, + refresher_timing_options, + }; Ok(InitResult { call_counter, @@ -294,6 +302,6 @@ async fn init( client_secure_channels, server_secure_channels, authority_secure_channels, - retriever, + retriever_options, }) } diff --git a/implementations/rust/ockam/ockam_node/src/context/transports.rs b/implementations/rust/ockam/ockam_node/src/context/transports.rs index 1cff221df0c..ff57281ca2f 100644 --- a/implementations/rust/ockam/ockam_node/src/context/transports.rs +++ b/implementations/rust/ockam/ockam_node/src/context/transports.rs @@ -7,12 +7,28 @@ use ockam_transport_core::Transport; use crate::Context; impl Context { - /// Return the list of supported transports + /// Register a new transport pub fn register_transport(&self, transport: Arc) { let mut transports = self.transports.write().unwrap(); transports.insert(transport.transport_type(), transport); } + /// Get a registered transport given its transport type + pub fn get_registered_transport( + &self, + transport_type: TransportType, + ) -> Result> { + let transports = self.transports.read().unwrap(); + match transports.get(&transport_type) { + Some(transport) => Ok(transport.clone()), + None => Err(Error::new( + Origin::Core, + Kind::NotFound, + format!("transport {transport_type} not found"), + )), + } + } + /// Return true if a given transport has already been registered pub fn is_transport_registered(&self, transport_type: TransportType) -> bool { let transports = self.transports.read().unwrap();