Skip to content

Commit

Permalink
feat: implement threshold strategy (#287)
Browse files Browse the repository at this point in the history
Follow-up on #286 to implement the threshold strategy to aggregate
responses from multiple providers:

- Introduce a top-level `MultiCallResults::reduce` method which takes as
parameter a `ConsensusStrategy` to aggregate the various responses. All
existing methods, including `eth_fee_history`, were refactor to use the
new `reduce` method.
- In case of `ConsensusStrategy::Threshold`, the reduction strategy
simply consists of returning the most frequent (non-error) response if
it appeared at least the specified threshold number of times.
- Refactor `MultiCallResults` to stored ok results and errors
separately.
  • Loading branch information
gregorydemay authored Sep 24, 2024
1 parent 4661cef commit 691f840
Show file tree
Hide file tree
Showing 13 changed files with 544 additions and 559 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ ic-config = { git = "https://github.com/dfinity/ic", rev = "release-2023-09-27_2
ic-crypto-test-utils-reproducible-rng = { git = "https://github.com/dfinity/ic", rev = "release-2024-09-12_01-30-base" }
ic-state-machine-tests = { git = "https://github.com/dfinity/ic", rev = "release-2023-09-27_23-01" }
ic-test-utilities-load-wasm = { git = "https://github.com/dfinity/ic", rev = "release-2023-09-27_23-01" }
itertools = "0.13"
maplit = "1"
proptest = { workspace = true }
serde_bytes = { workspace = true }
Expand Down
7 changes: 6 additions & 1 deletion candid/evm_rpc.did
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,12 @@ type RequestCostResult = variant { Ok : nat; Err : RpcError };
type RpcConfig = record { responseSizeEstimate : opt nat64; responseConsensus : opt ConsensusStrategy };
type ConsensusStrategy = variant {
Equality;
Threshold : record { num_providers : opt nat; min_num_ok : nat };
Threshold : record {
// Total number of providers to be queried. Can be omitted, if that number can be inferred (e.g., providers are specified in the request).
total : opt nat;
// Minimum number of providers that must return the same (non-error) result.
min : nat;
};
};
type RpcError = variant {
JsonRpcError : JsonRpcError;
Expand Down
10 changes: 5 additions & 5 deletions evm_rpc_types/src/result/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ impl<T> From<RpcResult<T>> for MultiRpcResult<T> {
}
}

#[derive(Clone, Debug, PartialEq, Eq, CandidType, Deserialize)]
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, CandidType, Deserialize)]
pub enum RpcError {
ProviderError(ProviderError),
HttpOutcallError(HttpOutcallError),
JsonRpcError(JsonRpcError),
ValidationError(ValidationError),
}

#[derive(Clone, Debug, PartialEq, Eq, CandidType, Deserialize)]
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, CandidType, Deserialize)]
pub enum ProviderError {
NoPermission,
TooFewCycles { expected: u128, received: u128 },
Expand All @@ -80,7 +80,7 @@ pub enum ProviderError {
InvalidRpcConfig(String),
}

#[derive(Clone, Debug, Eq, PartialEq, CandidType, Deserialize)]
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord, CandidType, Deserialize)]
pub enum HttpOutcallError {
/// Error from the IC system API.
IcError {
Expand All @@ -98,13 +98,13 @@ pub enum HttpOutcallError {
},
}

#[derive(Clone, Debug, Eq, PartialEq, CandidType, Deserialize)]
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord, CandidType, Deserialize)]
pub struct JsonRpcError {
pub code: i64,
pub message: String,
}

#[derive(Clone, Debug, Eq, PartialEq, CandidType, Deserialize)]
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord, CandidType, Deserialize)]
pub enum ValidationError {
Custom(String),
InvalidHex(String),
Expand Down
8 changes: 5 additions & 3 deletions evm_rpc_types/src/rpc_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ pub enum ConsensusStrategy {
/// All providers must return the same non-error result.
#[default]
Equality,

/// A subset of providers must return the same non-error result.
Threshold {
/// Number of providers to be queried:
/// Total number of providers to be queried:
/// * If `None`, will be set to the number of providers manually specified in `RpcServices`.
/// * If `Some`, must correspond to the number of manually specified providers in `RpcServices`;
/// or if they are none indicating that default providers should be used, select the corresponding number of providers.
num_providers: Option<u8>,
total: Option<u8>,

/// Minimum number of providers that must return the same (non-error) result.
min_num_ok: u8,
min: u8,
},
}

Expand Down
33 changes: 15 additions & 18 deletions src/candid_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ fn process_result<T>(method: RpcMethod, result: Result<T, MultiCallError<T>>) ->
Err(err) => match err {
MultiCallError::ConsistentError(err) => MultiRpcResult::Consistent(Err(err)),
MultiCallError::InconsistentResults(multi_call_results) => {
multi_call_results.results.iter().for_each(|(service, _)| {
let results = multi_call_results.into_vec();
results.iter().for_each(|(service, _service_result)| {
if let Ok(ResolvedRpcService::Provider(provider)) =
resolve_rpc_service(service.clone())
{
Expand All @@ -35,7 +36,7 @@ fn process_result<T>(method: RpcMethod, result: Result<T, MultiCallError<T>>) ->
)
}
});
MultiRpcResult::Inconsistent(multi_call_results.results.into_iter().collect())
MultiRpcResult::Inconsistent(results)
}
},
}
Expand Down Expand Up @@ -121,8 +122,7 @@ impl CandidRpcClient {
RpcMethod::EthGetTransactionCount,
self.client
.eth_get_transaction_count(into_get_transaction_count_params(args))
.await
.reduce_with_equality(),
.await,
)
.map(Nat256::from)
}
Expand Down Expand Up @@ -193,21 +193,20 @@ mod test {
process_result(
method,
Err(MultiCallError::<()>::InconsistentResults(
MultiCallResults {
results: Default::default()
}
MultiCallResults::default()
))
),
MultiRpcResult::Inconsistent(vec![])
);
assert_eq!(
process_result(
method,
Err(MultiCallError::InconsistentResults(MultiCallResults {
results: vec![(RpcService::EthMainnet(EthMainnetService::Ankr), Ok(5))]
.into_iter()
.collect(),
}))
Err(MultiCallError::InconsistentResults(
MultiCallResults::from_non_empty_iter(vec![(
RpcService::EthMainnet(EthMainnetService::Ankr),
Ok(5)
)])
))
),
MultiRpcResult::Inconsistent(vec![(
RpcService::EthMainnet(EthMainnetService::Ankr),
Expand All @@ -217,17 +216,15 @@ mod test {
assert_eq!(
process_result(
method,
Err(MultiCallError::InconsistentResults(MultiCallResults {
results: vec![
Err(MultiCallError::InconsistentResults(
MultiCallResults::from_non_empty_iter(vec![
(RpcService::EthMainnet(EthMainnetService::Ankr), Ok(5)),
(
RpcService::EthMainnet(EthMainnetService::Cloudflare),
Err(RpcError::ProviderError(ProviderError::NoPermission))
)
]
.into_iter()
.collect(),
}))
])
))
),
MultiRpcResult::Inconsistent(vec![
(RpcService::EthMainnet(EthMainnetService::Ankr), Ok(5)),
Expand Down
10 changes: 0 additions & 10 deletions src/rpc_client/eth_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,6 @@ pub fn is_response_too_large(code: &RejectionCode, message: &str) -> bool {
&& (message.contains("size limit") || message.contains("length limit"))
}

pub fn are_errors_consistent<T: PartialEq>(
left: &Result<T, RpcError>,
right: &Result<T, RpcError>,
) -> bool {
match (left, right) {
(Ok(_), _) | (_, Ok(_)) => true,
_ => left == right,
}
}

#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct ResponseSizeEstimate(u64);

Expand Down
Loading

0 comments on commit 691f840

Please sign in to comment.