Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[API] [PFNs] improve observability of view function and simulate usage #11696

Merged
merged 8 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ hex = { workspace = true }
hyper = { workspace = true }
itertools = { workspace = true }
mime = { workspace = true }
mini-moka = { workspace = true }
move-core-types = { workspace = true }
num_cpus = { workspace = true }
once_cell = { workspace = true }
Expand Down
152 changes: 150 additions & 2 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use crate::{
accept_type::AcceptType,
metrics,
response::{
bcs_api_disabled, block_not_found_by_height, block_not_found_by_version,
block_pruned_by_height, json_api_disabled, version_not_found, version_pruned,
Expand All @@ -18,7 +19,7 @@ use aptos_api_types::{
use aptos_config::config::{NodeConfig, RoleType};
use aptos_crypto::HashValue;
use aptos_gas_schedule::{AptosGasParameters, FromOnChainGasSchedule};
use aptos_logger::{error, warn};
use aptos_logger::{error, info, warn, Schema};
use aptos_mempool::{MempoolClientRequest, MempoolClientSender, SubmissionStatus};
use aptos_storage_interface::{
state_view::{DbStateView, DbStateViewAtVersion, LatestDbStateCheckpointView},
Expand All @@ -45,14 +46,21 @@ use aptos_types::{
use aptos_utils::aptos_try;
use aptos_vm::{data_cache::AsMoveResolver, move_vm_ext::AptosMoveResolver};
use futures::{channel::oneshot, SinkExt};
use mini_moka::sync::Cache;
use move_core_types::{
identifier::Identifier,
language_storage::{ModuleId, StructTag},
move_resource::MoveResource,
};
use serde::Serialize;
use std::{
cmp::Reverse,
collections::{BTreeMap, HashMap},
ops::{Bound::Included, Deref},
sync::{Arc, RwLock, RwLockWriteGuard},
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock, RwLockWriteGuard,
},
time::Instant,
};

Expand All @@ -66,6 +74,8 @@ pub struct Context {
gas_schedule_cache: Arc<RwLock<GasScheduleCache>>,
gas_estimation_cache: Arc<RwLock<GasEstimationCache>>,
gas_limit_cache: Arc<RwLock<GasLimitCache>>,
view_function_stats: Arc<FunctionStats>,
simulate_txn_stats: Arc<FunctionStats>,
}

impl std::fmt::Debug for Context {
Expand All @@ -81,6 +91,19 @@ impl Context {
mp_sender: MempoolClientSender,
node_config: NodeConfig,
) -> Self {
let (view_function_stats, simulate_txn_stats) = {
let log_per_call_stats = node_config.api.periodic_function_stats_sec.is_some();
(
Arc::new(FunctionStats::new(
FunctionType::ViewFuntion,
log_per_call_stats,
)),
Arc::new(FunctionStats::new(
FunctionType::TxnSimulation,
log_per_call_stats,
)),
)
};
Self {
chain_id,
db,
Expand All @@ -101,6 +124,8 @@ impl Context {
block_executor_onchain_config: OnChainExecutionConfig::default_if_missing()
.block_executor_onchain_config(),
})),
view_function_stats,
simulate_txn_stats,
}
}

Expand Down Expand Up @@ -1297,6 +1322,14 @@ impl Context {
.min_inclusion_prices
.len()
}

pub fn view_function_stats(&self) -> &FunctionStats {
&self.view_function_stats
}

pub fn simulate_txn_stats(&self) -> &FunctionStats {
&self.simulate_txn_stats
}
}

pub struct GasScheduleCache {
Expand Down Expand Up @@ -1329,3 +1362,118 @@ where
.await
.map_err(|err| E::internal_with_code_no_info(err, AptosErrorCode::InternalError))?
}

#[derive(Schema)]
pub struct LogSchema {
event: LogEvent,
}

impl LogSchema {
pub fn new(event: LogEvent) -> Self {
Self { event }
}
}

#[derive(Serialize, Copy, Clone)]
pub enum LogEvent {
ViewFunction,
TxnSimulation,
}

pub enum FunctionType {
ViewFuntion,
TxnSimulation,
}

impl FunctionType {
fn log_event(&self) -> LogEvent {
match self {
FunctionType::ViewFuntion => LogEvent::ViewFunction,
FunctionType::TxnSimulation => LogEvent::TxnSimulation,
}
}

fn operation_id(&self) -> &'static str {
match self {
FunctionType::ViewFuntion => "view_function",
FunctionType::TxnSimulation => "txn_simulation",
}
}
}

pub struct FunctionStats {
stats: Option<Cache<String, (Arc<AtomicU64>, Arc<AtomicU64>)>>,
log_event: LogEvent,
operation_id: String,
}

impl FunctionStats {
fn new(function_type: FunctionType, log_per_call_stats: bool) -> Self {
let stats = if log_per_call_stats {
Some(Cache::new(100))
} else {
None
};
FunctionStats {
stats,
log_event: function_type.log_event(),
operation_id: function_type.operation_id().to_string(),
}
}

pub fn function_to_key(module: &ModuleId, function: &Identifier) -> String {
format!("{}::{}", module, function)
}

pub fn increment(&self, key: String, gas: u64) {
metrics::GAS_USED
.with_label_values(&[&self.operation_id])
.observe(gas as f64);
if let Some(stats) = &self.stats {
let (prev_gas, prev_count) = stats.get(&key).unwrap_or_else(|| {
// Note, race can occur on inserting new entry, resulting in some lost data, but it should be fine
let new_gas = Arc::new(AtomicU64::new(0));
let new_count = Arc::new(AtomicU64::new(0));
stats.insert(key.clone(), (new_gas.clone(), new_count.clone()));
(new_gas, new_count)
});
prev_gas.fetch_add(gas, Ordering::Relaxed);
prev_count.fetch_add(1, Ordering::Relaxed);
}
}

pub fn log_and_clear(&self) {
if let Some(stats) = &self.stats {
if stats.iter().next().is_none() {
return;
}

let mut sorted: Vec<_> = stats
.iter()
.map(|entry| {
let (gas_used, count) = entry.value();
(
gas_used.load(Ordering::Relaxed),
count.load(Ordering::Relaxed),
entry.key().clone(),
)
})
.collect();
sorted.sort_by_key(|(gas_used, ..)| Reverse(*gas_used));

info!(
LogSchema::new(self.log_event),
top_1 = sorted.get(0),
top_2 = sorted.get(1),
top_3 = sorted.get(2),
top_4 = sorted.get(3),
top_5 = sorted.get(4),
top_6 = sorted.get(5),
top_7 = sorted.get(6),
top_8 = sorted.get(7),
);

stats.invalidate_all();
}
}
}
10 changes: 10 additions & 0 deletions api/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,13 @@ pub static GAS_ESTIMATE: Lazy<HistogramVec> = Lazy::new(|| {
)
.unwrap()
});

pub static GAS_USED: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"aptos_api_gas_used",
"Amount of gas used by each API operation",
&["operation_id"],
BYTE_BUCKETS.clone()
)
.unwrap()
});
20 changes: 19 additions & 1 deletion api/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ pub fn bootstrap(
attach_poem_to_runtime(runtime.handle(), context.clone(), config, false)
.context("Failed to attach poem to runtime")?;

let context_cloned = context.clone();
if let Some(period_ms) = config.api.periodic_gas_estimation_ms {
runtime.spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(period_ms));
loop {
interval.tick().await;
let context_cloned = context.clone();
let context_cloned = context_cloned.clone();
tokio::task::spawn_blocking(move || {
if let Ok(latest_ledger_info) =
context_cloned.get_latest_ledger_info::<crate::response::BasicError>()
Expand All @@ -66,6 +67,23 @@ pub fn bootstrap(
});
}

let context_cloned = context.clone();
if let Some(period_sec) = config.api.periodic_function_stats_sec {
runtime.spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(period_sec));
loop {
interval.tick().await;
let context_cloned = context_cloned.clone();
tokio::task::spawn_blocking(move || {
context_cloned.view_function_stats().log_and_clear();
context_cloned.simulate_txn_stats().log_and_clear();
})
.await
.unwrap_or(());
}
});
}

Ok(runtime)
}

Expand Down
30 changes: 29 additions & 1 deletion api/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
accept_type::AcceptType,
accounts::Account,
bcs_payload::Bcs,
context::{api_spawn_blocking, Context},
context::{api_spawn_blocking, Context, FunctionStats},
failpoint::fail_point_poem,
generate_error_response, generate_success_response, metrics,
page::Page,
Expand Down Expand Up @@ -1235,6 +1235,34 @@ impl TransactionsApi {
_ => ExecutionStatus::MiscellaneousError(None),
};

let stats_key = match txn.payload() {
TransactionPayload::Script(_) => {
format!("Script::{}", txn.clone().committed_hash()).to_string()
},
TransactionPayload::ModuleBundle(_) => "ModuleBundle::unknown".to_string(),
TransactionPayload::EntryFunction(entry_function) => FunctionStats::function_to_key(
entry_function.module(),
&entry_function.function().into(),
),
TransactionPayload::Multisig(multisig) => {
if let Some(payload) = &multisig.transaction_payload {
match payload {
MultisigTransactionPayload::EntryFunction(entry_function) => {
FunctionStats::function_to_key(
entry_function.module(),
&entry_function.function().into(),
)
},
}
} else {
"Multisig::unknown".to_string()
}
},
};
self.context
.simulate_txn_stats()
.increment(stats_key, output.gas_used());

// Build up a transaction from the outputs
// All state hashes are invalid, and will be filled with 0s
let txn = aptos_types::transaction::Transaction::UserTransaction(txn);
Expand Down
23 changes: 14 additions & 9 deletions api/src/view_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::{
accept_type::AcceptType,
bcs_payload::Bcs,
context::api_spawn_blocking,
context::{api_spawn_blocking, FunctionStats},
failpoint::fail_point_poem,
response::{
BadRequestError, BasicErrorWith404, BasicResponse, BasicResponseStatus, BasicResultWith404,
Expand Down Expand Up @@ -135,22 +135,22 @@ fn view_request(
));
}

let return_vals = AptosVM::execute_view_function(
let output = AptosVM::execute_view_function(
&state_view,
view_function.module.clone(),
view_function.function.clone(),
view_function.ty_args.clone(),
view_function.args.clone(),
context.node_config.api.max_gas_view_function,
)
.map_err(|err| {
);
let values = output.values.map_err(|err| {
BasicErrorWith404::bad_request_with_code_no_info(err, AptosErrorCode::InvalidInput)
})?;
match accept_type {
let result = match accept_type {
AcceptType::Bcs => {
// The return values are already BCS encoded, but we still need to encode the outside
// vector without re-encoding the inside values
let num_vals = return_vals.len();
let num_vals = values.len();

// Push the length of the return values
let mut length = vec![];
Expand All @@ -163,7 +163,7 @@ fn view_request(
})?;

// Combine all of the return values
let values = return_vals.into_iter().concat();
let values = values.into_iter().concat();
let ret = [length, values].concat();

BasicResponse::try_from_encoded((ret, &ledger_info, BasicResponseStatus::Ok))
Expand All @@ -186,7 +186,7 @@ fn view_request(
)
})?;

let move_vals = return_vals
let move_vals = values
.into_iter()
.zip(return_types.into_iter())
.map(|(v, ty)| {
Expand All @@ -205,5 +205,10 @@ fn view_request(

BasicResponse::try_from_json((move_vals, &ledger_info, BasicResponseStatus::Ok))
},
}
};
context.view_function_stats().increment(
FunctionStats::function_to_key(&view_function.module, &view_function.function),
output.gas_used,
);
result
}
Loading
Loading