Skip to content

Commit

Permalink
feat: tx trace api
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangsoledad committed Jan 14, 2019
1 parent 2661c9a commit 2a8a35d
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 65 deletions.
1 change: 0 additions & 1 deletion network/src/tests/peers_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{
};
use ckb_util::RwLock;
use faketime::unix_time_as_millis;
use std::default::Default;
use std::sync::Arc;

fn new_peer_store() -> impl PeerStore {
Expand Down
1 change: 1 addition & 0 deletions pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ env_logger = "0.6"
ckb-db = { path = "../db" }
hash = {path = "../util/hash"}
ckb-chain = { path = "../chain" }
tempfile = "3.0"
103 changes: 68 additions & 35 deletions pool/src/tests/pool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::txs_pool::pool::TransactionPoolService;
use crate::txs_pool::trace::{Action, TxTrace};
use crate::txs_pool::types::*;
use channel::select;
use channel::{self, Receiver};
Expand All @@ -22,6 +23,7 @@ use std::io::Read;
use std::path::Path;
use std::sync::Arc;
use std::time;
use tempfile::TempPath;

macro_rules! expect_output_parent {
($pool:expr, $expected:pat, $( $output:expr ),+ ) => {
Expand Down Expand Up @@ -469,20 +471,14 @@ fn test_switch_fork() {
assert_eq!(mtxs, vec![txs[3].clone(), txs[6].clone(), txs[5].clone()]);
}

#[cfg(not(disable_faketime))]
#[test]
fn test_tx_trace() {
let mut pool = TestPool::<ChainKVStore<MemoryKeyValueDB>>::simple();

let faketime_file = faketime::millis_tempfile(8102).expect("create faketime file");
faketime::enable(&faketime_file);

fn prepare_trace(
pool: &mut TestPool<ChainKVStore<MemoryKeyValueDB>>,
faketime_file: &TempPath,
) -> Transaction {
let tx = test_transaction(vec![OutPoint::new(pool.tx_hash.clone(), 0)], 2);

let block_number = { pool.shared.tip_header().read().number() };

let tx_hash = tx.hash();

pool.service.trace_transaction(tx.clone()).unwrap();
let prop_ids = pool.service.prepare_proposal(10);

Expand All @@ -495,41 +491,78 @@ fn test_tx_trace() {
.proposal_transactions(vec![tx.proposal_short_id()])
.build();

faketime::write_millis(&faketime_file, 9102).expect("write millis");
faketime::write_millis(faketime_file, 9102).expect("write millis");

pool.service.reconcile_block(&block);
tx
}

let trace = pool.service.get_transaction_traces(&tx_hash);
#[cfg(not(disable_faketime))]
#[test]
fn test_get_transaction_traces() {
let mut pool = TestPool::<ChainKVStore<MemoryKeyValueDB>>::simple();
let faketime_file = faketime::millis_tempfile(8102).expect("create faketime file");
faketime::enable(&faketime_file);

assert_eq!(
format!("{:?}", trace),
concat!(
"Some([",
"Trace { action: AddPending, info: unknown tx, add to pending, time: 8102 }, ",
"Trace { action: Proposed, info: ProposalShortId(0xda495f694cac79513d00) proposed in block number(2)-hash(0xb42c5305777987f80112e862a3e722c1d0e68c671f1d8920d16ebfc6783a6467), time: 9102 }, ",
"Trace { action: AddCommit, info: add to commit pool, time: 9102 }",
"])"
let tx = prepare_trace(&mut pool, &faketime_file);
let tx_hash = tx.hash();

let trace = pool.service.get_transaction_traces(&tx_hash);
match trace.map(|t| t.as_slice()) {
Some(
[TxTrace {
action: Action::AddPending,
info: _,
time: 8102,
}, TxTrace {
action: Action::Proposed,
info: proposal_info,
time: 9102,
}, TxTrace {
action: Action::AddCommit,
info: _,
time: 9102,
}],
) => assert_eq!(
proposal_info,
concat!("ProposalShortId(0xda495f694cac79513d00) proposed ",
"in block number(2)-hash(0xb42c5305777987f80112e862a3e722c1d0e68c671f1d8920d16ebfc6783a6467)")
),
);
_ => assert!(false),
}

faketime::write_millis(&faketime_file, 9103).expect("write millis");

let block = apply_transactions(vec![tx.clone()], vec![], &mut pool);

let trace = pool.service.get_transaction_traces(&tx_hash);
assert_eq!(
format!("{:?}", trace),
format!(
"{}{}{}{}Trace {{ action: Committed, info: committed in block number({:?})-hash({:#x}), time: 9103 }}{}",
"Some([",
"Trace { action: AddPending, info: unknown tx, add to pending, time: 8102 }, ",
"Trace { action: Proposed, info: ProposalShortId(0xda495f694cac79513d00) proposed in block number(2)-hash(0xb42c5305777987f80112e862a3e722c1d0e68c671f1d8920d16ebfc6783a6467), time: 9102 }, ",
"Trace { action: AddCommit, info: add to commit pool, time: 9102 }, ",
block.header().number(),
block.header().hash(),
"])"
match trace.map(|t| t.as_slice()) {
Some(
[TxTrace {
action: Action::AddPending,
info: _,
time: 8102,
}, TxTrace {
action: Action::Proposed,
info: _,
time: 9102,
}, TxTrace {
action: Action::AddCommit,
info: _,
time: 9102,
}, TxTrace {
action: Action::Committed,
info: committed_info,
time: 9103,
}],
) => assert_eq!(
committed_info,
&format!(
"committed in block number({:?})-hash({:#x})",
block.header().number(),
block.header().hash()
)
),
);
_ => assert!(false),
}
}

struct TestPool<CI> {
Expand Down
1 change: 1 addition & 0 deletions pool/src/txs_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod trace;
pub mod types;

pub use self::pool::{TransactionPoolController, TransactionPoolService};
pub use self::trace::TxTrace;
pub use self::types::{
Orphan, PendingQueue, Pool, PoolConfig, PoolError, ProposedQueue, TxStage, TxoStatus,
};
44 changes: 39 additions & 5 deletions pool/src/txs_pool/pool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Top-level Pool type, methods, and tests
use super::trace::{Trace, TraceMap};
use super::trace::{TxTrace, TxTraceMap};
use super::types::{
InsertionResult, Orphan, PendingQueue, Pool, PoolConfig, PoolError, ProposedQueue, TxStage,
TxoStatus,
Expand Down Expand Up @@ -33,6 +33,8 @@ pub struct TransactionPoolController {
contains_key_sender: Sender<Request<ProposalShortId, bool>>,
get_transaction_sender: Sender<Request<ProposalShortId, Option<Transaction>>>,
add_transaction_sender: Sender<Request<Transaction, Result<InsertionResult, PoolError>>>,
reg_trace_sender: Sender<Request<Transaction, Result<InsertionResult, PoolError>>>,
get_trace_sender: Sender<Request<H256, Option<Vec<TxTrace>>>>,
}

pub struct TransactionPoolReceivers {
Expand All @@ -41,6 +43,8 @@ pub struct TransactionPoolReceivers {
contains_key_receiver: Receiver<Request<ProposalShortId, bool>>,
get_transaction_receiver: Receiver<Request<ProposalShortId, Option<Transaction>>>,
add_transaction_receiver: Receiver<Request<Transaction, Result<InsertionResult, PoolError>>>,
reg_trace_receiver: Receiver<Request<Transaction, Result<InsertionResult, PoolError>>>,
get_trace_receiver: Receiver<Request<H256, Option<Vec<TxTrace>>>>,
}

impl TransactionPoolController {
Expand All @@ -54,20 +58,26 @@ impl TransactionPoolController {
channel::bounded(DEFAULT_CHANNEL_SIZE);
let (add_transaction_sender, add_transaction_receiver) =
channel::bounded(DEFAULT_CHANNEL_SIZE);
let (reg_trace_sender, reg_trace_receiver) = channel::bounded(DEFAULT_CHANNEL_SIZE);
let (get_trace_sender, get_trace_receiver) = channel::bounded(DEFAULT_CHANNEL_SIZE);
(
TransactionPoolController {
get_proposal_commit_transactions_sender,
get_potential_transactions_sender,
contains_key_sender,
get_transaction_sender,
add_transaction_sender,
reg_trace_sender,
get_trace_sender,
},
TransactionPoolReceivers {
get_proposal_commit_transactions_receiver,
get_potential_transactions_receiver,
contains_key_receiver,
get_transaction_receiver,
add_transaction_receiver,
reg_trace_receiver,
get_trace_receiver,
},
)
}
Expand Down Expand Up @@ -100,6 +110,14 @@ impl TransactionPoolController {
pub fn add_transaction(&self, tx: Transaction) -> Result<InsertionResult, PoolError> {
Request::call(&self.add_transaction_sender, tx).expect("add_transaction() failed")
}

pub fn trace_transaction(&self, tx: Transaction) -> Result<InsertionResult, PoolError> {
Request::call(&self.reg_trace_sender, tx).expect("trace_transaction() failed")
}

pub fn get_transaction_trace(&self, hash: H256) -> Option<Vec<TxTrace>> {
Request::call(&self.get_trace_sender, hash).expect("trace_transaction() failed")
}
}

/// The pool itself.
Expand All @@ -119,7 +137,7 @@ pub struct TransactionPoolService<CI> {
shared: Shared<CI>,
notify: NotifyController,

trace: TraceMap,
trace: TxTraceMap,
}

impl<CI> CellProvider for TransactionPoolService<CI>
Expand Down Expand Up @@ -164,7 +182,7 @@ where
cache: LruCache::new(cache_size),
shared,
notify,
trace: TraceMap::new(trace_size),
trace: TxTraceMap::new(trace_size),
}
}

Expand Down Expand Up @@ -221,6 +239,22 @@ where
_ => {
error!(target: "txs_pool", "channel add_transaction_receiver closed");
}
},
recv(receivers.reg_trace_receiver) -> msg => match msg {
Ok(Request { responder, arguments: tx }) => {
let _ = responder.send(self.trace_transaction(tx));
}
_ => {
error!(target: "txs_pool", "channel reg_trace_receiver closed");
}
},
recv(receivers.get_trace_receiver) -> msg => match msg {
Ok(Request { responder, arguments: hash }) => {
let _ = responder.send(self.get_transaction_traces(&hash).cloned());
}
_ => {
error!(target: "txs_pool", "channel get_trace_receiver closed");
}
}
}
}).expect("Start TransactionPoolService failed!")
Expand Down Expand Up @@ -406,7 +440,7 @@ where
}
}

pub(crate) fn get_transaction_traces(&self, hash: &H256) -> Option<&Vec<Trace>> {
pub(crate) fn get_transaction_traces(&self, hash: &H256) -> Option<&Vec<TxTrace>> {
self.trace.get(hash)
}

Expand Down Expand Up @@ -500,7 +534,7 @@ where
} else {
if self.config.trace_enable() {
self.trace
.add_commit(&tx.hash(), format!("add to commit pool"));
.add_commit(&tx.hash(), "add to commit pool".to_string());
}
self.pool.add_transaction(tx.clone());
self.reconcile_orphan(&tx);
Expand Down
Loading

0 comments on commit 2a8a35d

Please sign in to comment.