Skip to content

Commit

Permalink
[API] [PFNs] improve observability of view function and simulate usage (
Browse files Browse the repository at this point in the history
#11696) (#11791)

### Description

On PFNs, periodically (every minute) log the top-9 gas-using view functions and simulate transactions. This gives observability to the PFNs usage.

Utilizes a small cache that is reset every minute. Alternatively, such high-cardinality data would not work with prometheus.

Requires minimal changes to propagate gas usage.

### Test Plan

Deploy PFN locally, observe log messages.
  • Loading branch information
bchocho authored Jan 30, 2024
1 parent 53a437e commit 60f7f5a
Show file tree
Hide file tree
Showing 15 changed files with 308 additions and 33 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ hex = { workspace = true }
hyper = { workspace = true }
itertools = { workspace = true }
mime = { workspace = true }
mini-moka = { workspace = true }
move-core-types = { workspace = true }
num_cpus = { workspace = true }
once_cell = { workspace = true }
Expand Down
152 changes: 150 additions & 2 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use crate::{
accept_type::AcceptType,
metrics,
response::{
bcs_api_disabled, block_not_found_by_height, block_not_found_by_version,
block_pruned_by_height, json_api_disabled, version_not_found, version_pruned,
Expand All @@ -18,7 +19,7 @@ use aptos_api_types::{
use aptos_config::config::{NodeConfig, RoleType};
use aptos_crypto::HashValue;
use aptos_gas_schedule::{AptosGasParameters, FromOnChainGasSchedule};
use aptos_logger::{error, warn};
use aptos_logger::{error, info, warn, Schema};
use aptos_mempool::{MempoolClientRequest, MempoolClientSender, SubmissionStatus};
use aptos_state_view::TStateView;
use aptos_storage_interface::{
Expand All @@ -45,15 +46,22 @@ use aptos_types::{
use aptos_utils::aptos_try;
use aptos_vm::data_cache::AsMoveResolver;
use futures::{channel::oneshot, SinkExt};
use mini_moka::sync::Cache;
use move_core_types::{
identifier::Identifier,
language_storage::{ModuleId, StructTag},
move_resource::MoveResource,
resolver::ModuleResolver,
};
use serde::Serialize;
use std::{
cmp::Reverse,
collections::{BTreeMap, HashMap},
ops::{Bound::Included, Deref},
sync::{Arc, RwLock, RwLockWriteGuard},
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock, RwLockWriteGuard,
},
time::Instant,
};

Expand All @@ -67,6 +75,8 @@ pub struct Context {
gas_schedule_cache: Arc<RwLock<GasScheduleCache>>,
gas_estimation_cache: Arc<RwLock<GasEstimationCache>>,
gas_limit_cache: Arc<RwLock<GasLimitCache>>,
view_function_stats: Arc<FunctionStats>,
simulate_txn_stats: Arc<FunctionStats>,
}

impl std::fmt::Debug for Context {
Expand All @@ -82,6 +92,19 @@ impl Context {
mp_sender: MempoolClientSender,
node_config: NodeConfig,
) -> Self {
let (view_function_stats, simulate_txn_stats) = {
let log_per_call_stats = node_config.api.periodic_function_stats_sec.is_some();
(
Arc::new(FunctionStats::new(
FunctionType::ViewFuntion,
log_per_call_stats,
)),
Arc::new(FunctionStats::new(
FunctionType::TxnSimulation,
log_per_call_stats,
)),
)
};
Self {
chain_id,
db,
Expand All @@ -102,6 +125,8 @@ impl Context {
block_executor_onchain_config: OnChainExecutionConfig::default_if_missing()
.block_executor_onchain_config(),
})),
view_function_stats,
simulate_txn_stats,
}
}

Expand Down Expand Up @@ -1292,6 +1317,14 @@ impl Context {
.min_inclusion_prices
.len()
}

pub fn view_function_stats(&self) -> &FunctionStats {
&self.view_function_stats
}

pub fn simulate_txn_stats(&self) -> &FunctionStats {
&self.simulate_txn_stats
}
}

pub struct GasScheduleCache {
Expand Down Expand Up @@ -1324,3 +1357,118 @@ where
.await
.map_err(|err| E::internal_with_code_no_info(err, AptosErrorCode::InternalError))?
}

#[derive(Schema)]
pub struct LogSchema {
event: LogEvent,
}

impl LogSchema {
pub fn new(event: LogEvent) -> Self {
Self { event }
}
}

#[derive(Serialize, Copy, Clone)]
pub enum LogEvent {
ViewFunction,
TxnSimulation,
}

pub enum FunctionType {
ViewFuntion,
TxnSimulation,
}

impl FunctionType {
fn log_event(&self) -> LogEvent {
match self {
FunctionType::ViewFuntion => LogEvent::ViewFunction,
FunctionType::TxnSimulation => LogEvent::TxnSimulation,
}
}

fn operation_id(&self) -> &'static str {
match self {
FunctionType::ViewFuntion => "view_function",
FunctionType::TxnSimulation => "txn_simulation",
}
}
}

pub struct FunctionStats {
stats: Option<Cache<String, (Arc<AtomicU64>, Arc<AtomicU64>)>>,
log_event: LogEvent,
operation_id: String,
}

impl FunctionStats {
fn new(function_type: FunctionType, log_per_call_stats: bool) -> Self {
let stats = if log_per_call_stats {
Some(Cache::new(100))
} else {
None
};
FunctionStats {
stats,
log_event: function_type.log_event(),
operation_id: function_type.operation_id().to_string(),
}
}

pub fn function_to_key(module: &ModuleId, function: &Identifier) -> String {
format!("{}::{}", module, function)
}

pub fn increment(&self, key: String, gas: u64) {
metrics::GAS_USED
.with_label_values(&[&self.operation_id])
.observe(gas as f64);
if let Some(stats) = &self.stats {
let (prev_gas, prev_count) = stats.get(&key).unwrap_or_else(|| {
// Note, race can occur on inserting new entry, resulting in some lost data, but it should be fine
let new_gas = Arc::new(AtomicU64::new(0));
let new_count = Arc::new(AtomicU64::new(0));
stats.insert(key.clone(), (new_gas.clone(), new_count.clone()));
(new_gas, new_count)
});
prev_gas.fetch_add(gas, Ordering::Relaxed);
prev_count.fetch_add(1, Ordering::Relaxed);
}
}

pub fn log_and_clear(&self) {
if let Some(stats) = &self.stats {
if stats.iter().next().is_none() {
return;
}

let mut sorted: Vec<_> = stats
.iter()
.map(|entry| {
let (gas_used, count) = entry.value();
(
gas_used.load(Ordering::Relaxed),
count.load(Ordering::Relaxed),
entry.key().clone(),
)
})
.collect();
sorted.sort_by_key(|(gas_used, ..)| Reverse(*gas_used));

info!(
LogSchema::new(self.log_event),
top_1 = sorted.get(0),
top_2 = sorted.get(1),
top_3 = sorted.get(2),
top_4 = sorted.get(3),
top_5 = sorted.get(4),
top_6 = sorted.get(5),
top_7 = sorted.get(6),
top_8 = sorted.get(7),
);

stats.invalidate_all();
}
}
}
10 changes: 10 additions & 0 deletions api/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,13 @@ pub static GAS_ESTIMATE: Lazy<HistogramVec> = Lazy::new(|| {
)
.unwrap()
});

pub static GAS_USED: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"aptos_api_gas_used",
"Amount of gas used by each API operation",
&["operation_id"],
BYTE_BUCKETS.clone()
)
.unwrap()
});
20 changes: 19 additions & 1 deletion api/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ pub fn bootstrap(
attach_poem_to_runtime(runtime.handle(), context.clone(), config, false)
.context("Failed to attach poem to runtime")?;

let context_cloned = context.clone();
if let Some(period_ms) = config.api.periodic_gas_estimation_ms {
runtime.spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(period_ms));
loop {
interval.tick().await;
let context_cloned = context.clone();
let context_cloned = context_cloned.clone();
tokio::task::spawn_blocking(move || {
if let Ok(latest_ledger_info) =
context_cloned.get_latest_ledger_info::<crate::response::BasicError>()
Expand All @@ -66,6 +67,23 @@ pub fn bootstrap(
});
}

let context_cloned = context.clone();
if let Some(period_sec) = config.api.periodic_function_stats_sec {
runtime.spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(period_sec));
loop {
interval.tick().await;
let context_cloned = context_cloned.clone();
tokio::task::spawn_blocking(move || {
context_cloned.view_function_stats().log_and_clear();
context_cloned.simulate_txn_stats().log_and_clear();
})
.await
.unwrap_or(());
}
});
}

Ok(runtime)
}

Expand Down
30 changes: 29 additions & 1 deletion api/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
accept_type::AcceptType,
accounts::Account,
bcs_payload::Bcs,
context::{api_spawn_blocking, Context},
context::{api_spawn_blocking, Context, FunctionStats},
failpoint::fail_point_poem,
generate_error_response, generate_success_response, metrics,
page::Page,
Expand Down Expand Up @@ -1235,6 +1235,34 @@ impl TransactionsApi {
_ => ExecutionStatus::MiscellaneousError(None),
};

let stats_key = match txn.payload() {
TransactionPayload::Script(_) => {
format!("Script::{}", txn.clone().committed_hash()).to_string()
},
TransactionPayload::ModuleBundle(_) => "ModuleBundle::unknown".to_string(),
TransactionPayload::EntryFunction(entry_function) => FunctionStats::function_to_key(
entry_function.module(),
&entry_function.function().into(),
),
TransactionPayload::Multisig(multisig) => {
if let Some(payload) = &multisig.transaction_payload {
match payload {
MultisigTransactionPayload::EntryFunction(entry_function) => {
FunctionStats::function_to_key(
entry_function.module(),
&entry_function.function().into(),
)
},
}
} else {
"Multisig::unknown".to_string()
}
},
};
self.context
.simulate_txn_stats()
.increment(stats_key, output.gas_used());

// Build up a transaction from the outputs
// All state hashes are invalid, and will be filled with 0s
let txn = aptos_types::transaction::Transaction::UserTransaction(txn);
Expand Down
23 changes: 14 additions & 9 deletions api/src/view_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::{
accept_type::AcceptType,
bcs_payload::Bcs,
context::api_spawn_blocking,
context::{api_spawn_blocking, FunctionStats},
failpoint::fail_point_poem,
response::{
BadRequestError, BasicErrorWith404, BasicResponse, BasicResponseStatus, BasicResultWith404,
Expand Down Expand Up @@ -135,22 +135,22 @@ fn view_request(
));
}

let return_vals = AptosVM::execute_view_function(
let output = AptosVM::execute_view_function(
&state_view,
view_function.module.clone(),
view_function.function.clone(),
view_function.ty_args.clone(),
view_function.args.clone(),
context.node_config.api.max_gas_view_function,
)
.map_err(|err| {
);
let values = output.values.map_err(|err| {
BasicErrorWith404::bad_request_with_code_no_info(err, AptosErrorCode::InvalidInput)
})?;
match accept_type {
let result = match accept_type {
AcceptType::Bcs => {
// The return values are already BCS encoded, but we still need to encode the outside
// vector without re-encoding the inside values
let num_vals = return_vals.len();
let num_vals = values.len();

// Push the length of the return values
let mut length = vec![];
Expand All @@ -163,7 +163,7 @@ fn view_request(
})?;

// Combine all of the return values
let values = return_vals.into_iter().concat();
let values = values.into_iter().concat();
let ret = [length, values].concat();

BasicResponse::try_from_encoded((ret, &ledger_info, BasicResponseStatus::Ok))
Expand All @@ -186,7 +186,7 @@ fn view_request(
)
})?;

let move_vals = return_vals
let move_vals = values
.into_iter()
.zip(return_types.into_iter())
.map(|(v, ty)| {
Expand All @@ -205,5 +205,10 @@ fn view_request(

BasicResponse::try_from_json((move_vals, &ledger_info, BasicResponseStatus::Ok))
},
}
};
context.view_function_stats().increment(
FunctionStats::function_to_key(&view_function.module, &view_function.function),
output.gas_used,
);
result
}
Loading

0 comments on commit 60f7f5a

Please sign in to comment.