From 7c8491c054c46c7ddf7ff39d41903baac796f036 Mon Sep 17 00:00:00 2001 From: Pavitthra Pandurangan Date: Mon, 5 Apr 2021 17:47:59 -0400 Subject: [PATCH 1/8] Feat: Added event for new microblocks --- src/net/relay.rs | 41 +++++++++++----- testnet/stacks-node/src/config.rs | 5 ++ testnet/stacks-node/src/event_dispatcher.rs | 48 ++++++++++++++++++- testnet/stacks-node/src/neon_node.rs | 6 +++ .../src/tests/neon_integrations.rs | 33 ++++++++++++- 5 files changed, 118 insertions(+), 15 deletions(-) diff --git a/src/net/relay.rs b/src/net/relay.rs index d1fe92da27..f14685d215 100644 --- a/src/net/relay.rs +++ b/src/net/relay.rs @@ -92,6 +92,7 @@ pub struct RelayerStats { pub struct ProcessedNetReceipts { pub mempool_txs_added: Vec, + pub unconfirmed_microblock_tx_receipts: Vec, } /// Private trait for keeping track of messages that can be relayed, so we can identify the peers @@ -757,7 +758,7 @@ impl Relayer { Ok((new_blocks, bad_neighbors)) } - /// Prerocess all downloaded, confirmed microblock streams. + /// Preprocess all downloaded, confirmed microblock streams. /// Does not fail on invalid blocks; just logs a warning. /// Returns the consensus hashes for the sortitions that elected the stacks anchored blocks that produced these streams. fn preprocess_downloaded_microblocks( @@ -1136,7 +1137,7 @@ impl Relayer { pub fn setup_unconfirmed_state( chainstate: &mut StacksChainState, sortdb: &SortitionDB, - ) -> Result<(), Error> { + ) -> Result, Error> { let (canonical_consensus_hash, canonical_block_hash) = SortitionDB::get_canonical_stacks_chain_tip_hash(sortdb.conn())?; let canonical_tip = StacksBlockHeader::make_index_block_hash( @@ -1148,8 +1149,10 @@ impl Relayer { "Reload unconfirmed state off of {}/{}", &canonical_consensus_hash, &canonical_block_hash ); - chainstate.reload_unconfirmed_state(&sortdb.index_conn(), canonical_tip)?; - Ok(()) + let (_, _, receipts) = + chainstate.reload_unconfirmed_state(&sortdb.index_conn(), canonical_tip)?; + + Ok(receipts) } /// Set up unconfirmed chain state in a read-only fashion @@ -1173,16 +1176,23 @@ impl Relayer { Ok(()) } - pub fn refresh_unconfirmed(chainstate: &mut StacksChainState, sortdb: &mut SortitionDB) { - if let Err(e) = Relayer::setup_unconfirmed_state(chainstate, sortdb) { - if let net_error::ChainstateError(ref err_msg) = e { - if err_msg == "Stacks chainstate error: NoSuchBlockError" { - trace!("Failed to instantiate unconfirmed state: {:?}", &e); + pub fn refresh_unconfirmed( + chainstate: &mut StacksChainState, + sortdb: &mut SortitionDB, + ) -> Vec { + match Relayer::setup_unconfirmed_state(chainstate, sortdb) { + Ok(receipts) => receipts, + Err(e) => { + if let net_error::ChainstateError(ref err_msg) = e { + if err_msg == "Stacks chainstate error: NoSuchBlockError" { + trace!("Failed to instantiate unconfirmed state: {:?}", &e); + } else { + warn!("Failed to instantiate unconfirmed state: {:?}", &e); + } } else { warn!("Failed to instantiate unconfirmed state: {:?}", &e); } - } else { - warn!("Failed to instantiate unconfirmed state: {:?}", &e); + Vec::new() } } } @@ -1303,13 +1313,18 @@ impl Relayer { } } - let receipts = ProcessedNetReceipts { mempool_txs_added }; + let mut unconfirmed_microblock_tx_receipts = Vec::new(); // finally, refresh the unconfirmed chainstate, if need be if network_result.has_microblocks() { - Relayer::refresh_unconfirmed(chainstate, sortdb); + unconfirmed_microblock_tx_receipts = Relayer::refresh_unconfirmed(chainstate, sortdb); } + let receipts = ProcessedNetReceipts { + mempool_txs_added, + unconfirmed_microblock_tx_receipts, + }; + Ok(receipts) } } diff --git a/testnet/stacks-node/src/config.rs b/testnet/stacks-node/src/config.rs index 4dad6e3b07..53296f4c9b 100644 --- a/testnet/stacks-node/src/config.rs +++ b/testnet/stacks-node/src/config.rs @@ -1280,6 +1280,7 @@ pub enum EventKeyType { AssetEvent(AssetIdentifier), STXEvent, MemPoolTransactions, + Microblocks, AnyEvent, BurnchainBlocks, } @@ -1302,6 +1303,10 @@ impl EventKeyType { return Some(EventKeyType::BurnchainBlocks); } + if raw_key == "microblocks" { + return Some(EventKeyType::Microblocks); + } + let comps: Vec<_> = raw_key.split("::").collect(); if comps.len() == 1 { let split: Vec<_> = comps[0].split(".").collect(); diff --git a/testnet/stacks-node/src/event_dispatcher.rs b/testnet/stacks-node/src/event_dispatcher.rs index 0b5654c605..49df8b0a57 100644 --- a/testnet/stacks-node/src/event_dispatcher.rs +++ b/testnet/stacks-node/src/event_dispatcher.rs @@ -42,6 +42,8 @@ const STATUS_RESP_TRUE: &str = "success"; const STATUS_RESP_NOT_COMMITTED: &str = "abort_by_response"; const STATUS_RESP_POST_CONDITION: &str = "abort_by_post_condition"; +/// Update `serve()` in `neon_integrations.rs` with any new paths that need to be tested +pub const PATH_MICROBLOCK_SUBMIT: &str = "new_microblocks"; pub const PATH_MEMPOOL_TX_SUBMIT: &str = "new_mempool_tx"; pub const PATH_MEMPOOL_TX_DROP: &str = "drop_mempool_tx"; pub const PATH_BURN_BLOCK_SUBMIT: &str = "new_burn_block"; @@ -111,6 +113,20 @@ impl EventObserver { } } + fn make_new_microblocks_payload(receipts: Vec) -> serde_json::Value { + let mut tx_index = 0; + let mut serialized_txs = Vec::new(); + for receipt in receipts.iter() { + let payload = EventObserver::make_new_block_txs_payload(receipt, tx_index); + serialized_txs.push(payload); + tx_index += 1; + } + + json!({ + "transactions": serialized_txs, + }) + } + fn make_new_mempool_txs_payload(transactions: Vec) -> serde_json::Value { let raw_txs = transactions .into_iter() @@ -224,6 +240,10 @@ impl EventObserver { self.send_payload(payload, PATH_MEMPOOL_TX_SUBMIT); } + fn send_new_microblocks(&self, payload: &serde_json::Value) { + self.send_payload(payload, PATH_MICROBLOCK_SUBMIT); + } + fn send_dropped_mempool_txs(&self, payload: &serde_json::Value) { self.send_payload(payload, PATH_MEMPOOL_TX_DROP); } @@ -287,6 +307,7 @@ pub struct EventDispatcher { assets_observers_lookup: HashMap>, burn_block_observers_lookup: HashSet, mempool_observers_lookup: HashSet, + microblock_observers_lookup: HashSet, stx_observers_lookup: HashSet, any_event_observers_lookup: HashSet, boot_receipts: Arc>>>, @@ -357,6 +378,7 @@ impl EventDispatcher { any_event_observers_lookup: HashSet::new(), burn_block_observers_lookup: HashSet::new(), mempool_observers_lookup: HashSet::new(), + microblock_observers_lookup: HashSet::new(), boot_receipts: Arc::new(Mutex::new(None)), } } @@ -541,6 +563,28 @@ impl EventDispatcher { } } + pub fn process_new_microblocks(&self, receipts: Vec) { + // lazily assemble payload only if we have observers + let interested_observers: Vec<_> = self + .registered_observers + .iter() + .enumerate() + .filter(|(obs_id, _observer)| { + self.microblock_observers_lookup.contains(&(*obs_id as u16)) + || self.any_event_observers_lookup.contains(&(*obs_id as u16)) + }) + .collect(); + if interested_observers.len() < 1 { + return; + } + + let payload = EventObserver::make_new_microblocks_payload(receipts); + + for (_, observer) in interested_observers.iter() { + observer.send_new_microblocks(&payload); + } + } + pub fn process_new_mempool_txs(&self, txs: Vec) { // lazily assemble payload only if we have observers let interested_observers: Vec<_> = self @@ -628,7 +672,6 @@ impl EventDispatcher { } pub fn register_observer(&mut self, conf: &EventObserverConfig) { - // let event_observer = EventObserver::new(&conf.address, conf.port); info!("Registering event observer at: {}", conf.endpoint); let event_observer = EventObserver { endpoint: conf.endpoint.clone(), @@ -659,6 +702,9 @@ impl EventDispatcher { EventKeyType::MemPoolTransactions => { self.mempool_observers_lookup.insert(observer_index); } + EventKeyType::Microblocks => { + self.microblock_observers_lookup.insert(observer_index); + } EventKeyType::STXEvent => { self.stx_observers_lookup.insert(observer_index); } diff --git a/testnet/stacks-node/src/neon_node.rs b/testnet/stacks-node/src/neon_node.rs index 6d597ad214..033cef5ce2 100644 --- a/testnet/stacks-node/src/neon_node.rs +++ b/testnet/stacks-node/src/neon_node.rs @@ -786,6 +786,11 @@ fn spawn_miner_relayer( event_dispatcher.process_new_mempool_txs(net_receipts.mempool_txs_added); } + let num_unconfirmed_microblock_tx_receipts = net_receipts.unconfirmed_microblock_tx_receipts.len(); + if num_unconfirmed_microblock_tx_receipts > 0 { + event_dispatcher.process_new_microblocks(net_receipts.unconfirmed_microblock_tx_receipts); + } + // Dispatch retrieved attachments, if any. if net_result.has_attachments() { event_dispatcher.process_new_attachments(&net_result.attachments); @@ -1001,6 +1006,7 @@ enum LeaderKeyRegistrationState { Active(RegisteredKey), } +/// This node is used for both neon testnet and for mainnet impl InitializedNeonNode { fn new( config: Config, diff --git a/testnet/stacks-node/src/tests/neon_integrations.rs b/testnet/stacks-node/src/tests/neon_integrations.rs index 62a8a59a9f..c9a4973c84 100644 --- a/testnet/stacks-node/src/tests/neon_integrations.rs +++ b/testnet/stacks-node/src/tests/neon_integrations.rs @@ -100,6 +100,7 @@ mod test_observer { lazy_static! { pub static ref NEW_BLOCKS: Mutex> = Mutex::new(Vec::new()); + pub static ref NEW_MICROBLOCKS: Mutex> = Mutex::new(Vec::new()); pub static ref BURN_BLOCKS: Mutex> = Mutex::new(Vec::new()); pub static ref MEMTXS: Mutex> = Mutex::new(Vec::new()); pub static ref MEMTXS_DROPPED: Mutex> = Mutex::new(Vec::new()); @@ -120,6 +121,14 @@ mod test_observer { Ok(warp::http::StatusCode::OK) } + async fn handle_microblocks( + microblocks: serde_json::Value, + ) -> Result { + let mut microblock_events = NEW_MICROBLOCKS.lock().unwrap(); + microblock_events.push(microblocks); + Ok(warp::http::StatusCode::OK) + } + async fn handle_mempool_txs(txs: serde_json::Value) -> Result { let new_rawtxs = txs .as_array() @@ -175,6 +184,10 @@ mod test_observer { NEW_BLOCKS.lock().unwrap().clone() } + pub fn get_microblocks() -> Vec { + NEW_MICROBLOCKS.lock().unwrap().clone() + } + pub fn get_burn_blocks() -> Vec { BURN_BLOCKS.lock().unwrap().clone() } @@ -183,6 +196,7 @@ mod test_observer { ATTACHMENTS.lock().unwrap().clone() } + /// each path here should correspond to one of the paths listed in `event_dispatcher.rs` async fn serve() { let new_blocks = warp::path!("new_block") .and(warp::post()) @@ -204,6 +218,10 @@ mod test_observer { .and(warp::post()) .and(warp::body::json()) .and_then(handle_attachments); + let new_microblocks = warp::path!("new_microblocks") + .and(warp::post()) + .and(warp::body::json()) + .and_then(handle_microblocks); info!("Spawning warp server"); warp::serve( @@ -211,7 +229,8 @@ mod test_observer { .or(mempool_txs) .or(mempool_drop_txs) .or(new_burn_blocks) - .or(new_attachments), + .or(new_attachments) + .or(new_microblocks), ) .run(([127, 0, 0, 1], EVENT_OBSERVER_PORT)) .await @@ -1271,6 +1290,18 @@ fn microblock_integration_test() { .unwrap() ); + // wait at least two p2p refreshes so it can produce the microblock + for i in 0..30 { + debug!( + "wait {} more seconds for microblock miner to find our transaction...", + 30 - i + ); + sleep_ms(1000); + } + + let microblock_events = test_observer::get_microblocks(); + assert_eq!(microblock_events.len(), 1); + let memtx_events = test_observer::get_memtxs(); assert_eq!(memtx_events.len(), 1); assert_eq!(&memtx_events[0], &format!("0x{}", &bytes_to_hex(&tx))); From eb2afa8cf25e8f06743f221faa0f314889dc5e2b Mon Sep 17 00:00:00 2001 From: Pavitthra Pandurangan Date: Tue, 6 Apr 2021 19:25:41 -0400 Subject: [PATCH 2/8] Added more fields to new microblocks event --- src/chainstate/stacks/db/unconfirmed.rs | 4 +- testnet/stacks-node/src/event_dispatcher.rs | 140 ++++++++++++------ testnet/stacks-node/src/neon_node.rs | 10 +- .../src/tests/neon_integrations.rs | 21 ++- 4 files changed, 127 insertions(+), 48 deletions(-) diff --git a/src/chainstate/stacks/db/unconfirmed.rs b/src/chainstate/stacks/db/unconfirmed.rs index 055d1d535f..dda735a09f 100644 --- a/src/chainstate/stacks/db/unconfirmed.rs +++ b/src/chainstate/stacks/db/unconfirmed.rs @@ -145,7 +145,7 @@ impl UnconfirmedState { let mut all_receipts = vec![]; let mut mined_txs = UnconfirmedTxMap::new(); let mut new_cost = ExecutionCost::zero(); - let mut new_bytes = 0; + let mut new_bytes; let mut num_new_mblocks = 0; if mblocks.len() > 0 { @@ -210,6 +210,7 @@ impl UnconfirmedState { } total as u64 }; + self.bytes_so_far += new_bytes; for tx in &mblock.txs { mined_txs.insert( @@ -227,7 +228,6 @@ impl UnconfirmedState { self.last_mblock_seq = last_mblock_seq; self.mined_txs.extend(mined_txs); self.cost_so_far = new_cost; - self.bytes_so_far += new_bytes; self.num_mblocks_added += num_new_mblocks; Ok((total_fees, total_burns, all_receipts)) diff --git a/testnet/stacks-node/src/event_dispatcher.rs b/testnet/stacks-node/src/event_dispatcher.rs index 49df8b0a57..5c5c0e9d8a 100644 --- a/testnet/stacks-node/src/event_dispatcher.rs +++ b/testnet/stacks-node/src/event_dispatcher.rs @@ -113,20 +113,6 @@ impl EventObserver { } } - fn make_new_microblocks_payload(receipts: Vec) -> serde_json::Value { - let mut tx_index = 0; - let mut serialized_txs = Vec::new(); - for receipt in receipts.iter() { - let payload = EventObserver::make_new_block_txs_payload(receipt, tx_index); - serialized_txs.push(payload); - tx_index += 1; - } - - json!({ - "transactions": serialized_txs, - }) - } - fn make_new_mempool_txs_payload(transactions: Vec) -> serde_json::Value { let raw_txs = transactions .into_iter() @@ -240,8 +226,37 @@ impl EventObserver { self.send_payload(payload, PATH_MEMPOOL_TX_SUBMIT); } - fn send_new_microblocks(&self, payload: &serde_json::Value) { - self.send_payload(payload, PATH_MICROBLOCK_SUBMIT); + /// Serializes new microblocks data into a JSON payload and sends it off to the correct path + fn send_new_microblocks( + &self, + parent_index_block_hash: StacksBlockId, + filtered_events: Vec<(usize, &(bool, Txid, StacksTransactionEvent))>, + receipts: &Vec, + ) { + // Serialize events to JSON + let serialized_events: Vec = filtered_events + .iter() + .map(|(event_index, (committed, txid, event))| { + event.json_serialize(*event_index, txid, *committed) + }) + .collect(); + + // Serialize receipts + let mut tx_index = 0; + let mut serialized_txs = Vec::new(); + for receipt in receipts.iter() { + let payload = EventObserver::make_new_block_txs_payload(receipt, tx_index); + serialized_txs.push(payload); + tx_index += 1; + } + + let payload = json!({ + "parent_index_block_hash": format!("0x{}", parent_index_block_hash), + "events": serialized_events, + "transactions": serialized_txs, + }); + + self.send_payload(&payload, PATH_MICROBLOCK_SUBMIT); } fn send_dropped_mempool_txs(&self, payload: &serde_json::Value) { @@ -254,7 +269,7 @@ impl EventObserver { fn send( &self, - filtered_events: Vec<(usize, &(bool, Txid, &StacksTransactionEvent))>, + filtered_events: Vec<(usize, &(bool, Txid, StacksTransactionEvent))>, chain_tip: &ChainTip, parent_index_hash: &StacksBlockId, boot_receipts: &Vec, @@ -418,37 +433,29 @@ impl EventDispatcher { } } - pub fn process_chain_tip( + /// Iterates through tx receipts, and then the events corresponding to each receipt to + /// generate a dispatch matrix & event vector. + /// + /// # Returns + /// - dispatch_matrix: a vector where each index corresponds to the hashset of event indexes + /// that each respective event observer is subscribed to + /// - events: a vector of all events from all the tx receipts + fn create_dispatch_matrix_and_event_vector( &self, - chain_tip: &ChainTip, - parent_index_hash: &StacksBlockId, - winner_txid: Txid, - mature_rewards: Vec, - mature_rewards_info: Option, + receipts: &Vec, + ) -> ( + Vec>, + Vec<(bool, Txid, StacksTransactionEvent)>, ) { let mut dispatch_matrix: Vec> = self .registered_observers .iter() .map(|_| HashSet::new()) .collect(); - let mut events: Vec<(bool, Txid, &StacksTransactionEvent)> = vec![]; + let mut events: Vec<(bool, Txid, StacksTransactionEvent)> = vec![]; let mut i: usize = 0; - let boot_receipts = if chain_tip.metadata.block_height == 1 { - let mut boot_receipts_result = self - .boot_receipts - .lock() - .expect("Unexpected concurrent access to `boot_receipts` in the event dispatcher!"); - if let Some(val) = boot_receipts_result.take() { - val - } else { - vec![] - } - } else { - vec![] - }; - - for receipt in chain_tip.receipts.iter().chain(boot_receipts.iter()) { + for receipt in receipts { let tx_hash = receipt.transaction.txid(); for event in receipt.events.iter() { match event { @@ -514,7 +521,7 @@ impl EventDispatcher { ); } } - events.push((!receipt.post_condition_aborted, tx_hash, event)); + events.push((!receipt.post_condition_aborted, tx_hash, event.clone())); for o_i in &self.any_event_observers_lookup { dispatch_matrix[*o_i as usize].insert(i); } @@ -522,6 +529,39 @@ impl EventDispatcher { } } + (dispatch_matrix, events) + } + + pub fn process_chain_tip( + &self, + chain_tip: &ChainTip, + parent_index_hash: &StacksBlockId, + winner_txid: Txid, + mature_rewards: Vec, + mature_rewards_info: Option, + ) { + let boot_receipts = if chain_tip.metadata.block_height == 1 { + let mut boot_receipts_result = self + .boot_receipts + .lock() + .expect("Unexpected concurrent access to `boot_receipts` in the event dispatcher!"); + if let Some(val) = boot_receipts_result.take() { + val + } else { + vec![] + } + } else { + vec![] + }; + let receipts = chain_tip + .receipts + .iter() + .cloned() + .chain(boot_receipts.iter().cloned()) + .collect(); + + let (dispatch_matrix, events) = self.create_dispatch_matrix_and_event_vector(&receipts); + if dispatch_matrix.len() > 0 { let mature_rewards_vec = if let Some(rewards_info) = mature_rewards_info { mature_rewards @@ -563,7 +603,14 @@ impl EventDispatcher { } } - pub fn process_new_microblocks(&self, receipts: Vec) { + /// Creates a list of observers that are interested in the new microblocks event, + /// creates a mapping from observers to the event ids that are relevant to each, and then + /// sends the event to each interested observer. + pub fn process_new_microblocks( + &self, + parent_index_block_hash: StacksBlockId, + receipts: Vec, + ) { // lazily assemble payload only if we have observers let interested_observers: Vec<_> = self .registered_observers @@ -577,11 +624,16 @@ impl EventDispatcher { if interested_observers.len() < 1 { return; } + let (dispatch_matrix, events) = self.create_dispatch_matrix_and_event_vector(&receipts); - let payload = EventObserver::make_new_microblocks_payload(receipts); + for (obs_id, observer) in interested_observers.iter() { + let filtered_events_ids = &dispatch_matrix[*obs_id]; + let filtered_events: Vec<_> = filtered_events_ids + .iter() + .map(|event_id| (*event_id, &events[*event_id])) + .collect(); - for (_, observer) in interested_observers.iter() { - observer.send_new_microblocks(&payload); + observer.send_new_microblocks(parent_index_block_hash, filtered_events, &receipts); } } diff --git a/testnet/stacks-node/src/neon_node.rs b/testnet/stacks-node/src/neon_node.rs index 033cef5ce2..fd5023d2b7 100644 --- a/testnet/stacks-node/src/neon_node.rs +++ b/testnet/stacks-node/src/neon_node.rs @@ -788,7 +788,15 @@ fn spawn_miner_relayer( let num_unconfirmed_microblock_tx_receipts = net_receipts.unconfirmed_microblock_tx_receipts.len(); if num_unconfirmed_microblock_tx_receipts > 0 { - event_dispatcher.process_new_microblocks(net_receipts.unconfirmed_microblock_tx_receipts); + let (canonical_consensus_hash, canonical_block_hash) = + SortitionDB::get_canonical_stacks_chain_tip_hash( + sortdb.conn()).expect( + "FATAL: failed to query sortition DB for canonical stacks chain tip"); + let canonical_tip = StacksBlockHeader::make_index_block_hash( + &canonical_consensus_hash, + &canonical_block_hash, + ); + event_dispatcher.process_new_microblocks(canonical_tip, net_receipts.unconfirmed_microblock_tx_receipts); } // Dispatch retrieved attachments, if any. diff --git a/testnet/stacks-node/src/tests/neon_integrations.rs b/testnet/stacks-node/src/tests/neon_integrations.rs index c9a4973c84..d6d82e4ea7 100644 --- a/testnet/stacks-node/src/tests/neon_integrations.rs +++ b/testnet/stacks-node/src/tests/neon_integrations.rs @@ -55,6 +55,7 @@ use stacks::burnchains::bitcoin::BitcoinNetworkType; use stacks::burnchains::{BurnchainHeaderHash, Txid}; use stacks::chainstate::burn::operations::{BlockstackOperationType, PreStxOp, TransferStxOp}; use stacks::chainstate::stacks::boot::boot_code_id; +use stacks::chainstate::stacks::StacksBlockId; use stacks::core::BLOCK_LIMIT_MAINNET; fn neon_integration_test_conf() -> (Config, StacksAddress) { @@ -1279,6 +1280,11 @@ fn microblock_integration_test() { .json::() .unwrap(); assert!(tip_info.stacks_tip_height >= 3); + let stacks_tip = tip_info.stacks_tip; + let stacks_tip_consensus_hash = + ConsensusHash::from_hex(&tip_info.stacks_tip_consensus_hash).unwrap(); + let stacks_id_tip = + StacksBlockHeader::make_index_block_hash(&stacks_tip_consensus_hash, &stacks_tip); eprintln!( "{:#?}", @@ -1299,8 +1305,21 @@ fn microblock_integration_test() { sleep_ms(1000); } - let microblock_events = test_observer::get_microblocks(); + let mut microblock_events = test_observer::get_microblocks(); assert_eq!(microblock_events.len(), 1); + let microblock = microblock_events.pop().unwrap(); + let transactions = microblock.get("transactions").unwrap().as_array().unwrap(); + assert_eq!(transactions.len(), 1); + let microblock_associated_hash = microblock + .get("parent_index_block_hash") + .unwrap() + .as_str() + .unwrap(); + let index_block_hash_bytes = hex_bytes(µblock_associated_hash[2..]).unwrap(); + assert_eq!( + StacksBlockId::from_vec(&index_block_hash_bytes), + Some(stacks_id_tip) + ); let memtx_events = test_observer::get_memtxs(); assert_eq!(memtx_events.len(), 1); From 7c20bafd54dd05bc16785422eec19143cd378673 Mon Sep 17 00:00:00 2001 From: Pavitthra Pandurangan Date: Mon, 12 Apr 2021 18:29:56 -0400 Subject: [PATCH 3/8] Added sequence of microblock & better tx_index for serialized transactions --- src/chainstate/stacks/db/unconfirmed.rs | 60 +++++++--- src/net/relay.rs | 21 ++-- testnet/stacks-node/src/event_dispatcher.rs | 87 +++++++++++--- testnet/stacks-node/src/neon_node.rs | 4 +- .../src/tests/neon_integrations.rs | 107 ++++++++++++++++-- 5 files changed, 224 insertions(+), 55 deletions(-) diff --git a/src/chainstate/stacks/db/unconfirmed.rs b/src/chainstate/stacks/db/unconfirmed.rs index dda735a09f..3835d8e39c 100644 --- a/src/chainstate/stacks/db/unconfirmed.rs +++ b/src/chainstate/stacks/db/unconfirmed.rs @@ -43,6 +43,26 @@ use vm::costs::ExecutionCost; pub type UnconfirmedTxMap = HashMap; +pub struct ProcessedUnconfirmedState { + pub total_burns: u128, + pub total_fees: u128, + pub receipts: Vec, + // tuple representing (index, sequence #), where the index corresponds to the starting index of + // receipts corresponding to that sequence number in the receipts vector + pub sequence_indices_for_receipts: Vec<(usize, u16)>, +} + +impl Default for ProcessedUnconfirmedState { + fn default() -> Self { + ProcessedUnconfirmedState { + total_burns: 0, + total_fees: 0, + receipts: vec![], + sequence_indices_for_receipts: vec![], + } + } +} + pub struct UnconfirmedState { pub confirmed_chain_tip: StacksBlockId, pub unconfirmed_chain_tip: StacksBlockId, @@ -124,10 +144,10 @@ impl UnconfirmedState { chainstate: &StacksChainState, burn_dbconn: &dyn BurnStateDB, mblocks: Vec, - ) -> Result<(u128, u128, Vec), Error> { + ) -> Result { if self.last_mblock_seq == u16::max_value() { // drop them -- nothing to do - return Ok((0, 0, vec![])); + return Ok(Default::default()); } debug!( @@ -143,6 +163,7 @@ impl UnconfirmedState { let mut total_fees = 0; let mut total_burns = 0; let mut all_receipts = vec![]; + let mut sequence_indices_for_receipts = vec![]; let mut mined_txs = UnconfirmedTxMap::new(); let mut new_cost = ExecutionCost::zero(); let mut new_bytes; @@ -196,6 +217,9 @@ impl UnconfirmedState { total_fees += stx_fees; total_burns += stx_burns; num_new_mblocks += 1; + if receipts.len() > 0 { + sequence_indices_for_receipts.push((all_receipts.len(), seq)); + } all_receipts.append(&mut receipts); last_mblock = Some(mblock_header); @@ -230,7 +254,12 @@ impl UnconfirmedState { self.cost_so_far = new_cost; self.num_mblocks_added += num_new_mblocks; - Ok((total_fees, total_burns, all_receipts)) + Ok(ProcessedUnconfirmedState { + total_fees, + total_burns, + sequence_indices_for_receipts, + receipts: all_receipts, + }) } /// Load up the Stacks microblock stream to process, composed of only the new microblocks @@ -255,11 +284,12 @@ impl UnconfirmedState { } /// Update the view of the current confiremd chain tip's unconfirmed microblock state + /// Returns ProcessedUnconfirmedState for the microblocks newly added to the unconfirmed state pub fn refresh( &mut self, chainstate: &StacksChainState, burn_dbconn: &dyn BurnStateDB, - ) -> Result<(u128, u128, Vec), Error> { + ) -> Result { assert!( !self.readonly, "BUG: code tried to write unconfirmed state to a read-only instance" @@ -267,12 +297,12 @@ impl UnconfirmedState { if self.last_mblock_seq == u16::max_value() { // no-op - return Ok((0, 0, vec![])); + return Ok(Default::default()); } match self.load_child_microblocks(chainstate)? { Some(microblocks) => self.append_microblocks(chainstate, burn_dbconn, microblocks), - None => Ok((0, 0, vec![])), + None => Ok(Default::default()), } } @@ -342,15 +372,15 @@ impl StacksChainState { &self, burn_dbconn: &dyn BurnStateDB, anchored_block_id: StacksBlockId, - ) -> Result<(UnconfirmedState, u128, u128, Vec), Error> { + ) -> Result<(UnconfirmedState, ProcessedUnconfirmedState), Error> { debug!("Make new unconfirmed state off of {}", &anchored_block_id); let mut unconfirmed_state = UnconfirmedState::new(self, anchored_block_id)?; - let (fees, burns, receipts) = unconfirmed_state.refresh(self, burn_dbconn)?; + let processed_unconfirmed_state = unconfirmed_state.refresh(self, burn_dbconn)?; debug!( "Made new unconfirmed state off of {} (at {})", &anchored_block_id, &unconfirmed_state.unconfirmed_chain_tip ); - Ok((unconfirmed_state, fees, burns, receipts)) + Ok((unconfirmed_state, processed_unconfirmed_state)) } /// Reload the unconfirmed view from a new chain tip. @@ -362,7 +392,7 @@ impl StacksChainState { &mut self, burn_dbconn: &dyn BurnStateDB, canonical_tip: StacksBlockId, - ) -> Result<(u128, u128, Vec), Error> { + ) -> Result { debug!("Reload unconfirmed state off of {}", &canonical_tip); let unconfirmed_state = self.unconfirmed_state.take(); @@ -394,7 +424,7 @@ impl StacksChainState { self.drop_unconfirmed_state(unconfirmed_state); } - let (new_unconfirmed_state, fees, burns, receipts) = + let (new_unconfirmed_state, processed_unconfirmed_state) = self.make_unconfirmed_state(burn_dbconn, canonical_tip)?; debug!( @@ -403,19 +433,19 @@ impl StacksChainState { ); self.unconfirmed_state = Some(new_unconfirmed_state); - Ok((fees, burns, receipts)) + Ok(processed_unconfirmed_state) } /// Refresh the current unconfirmed chain state pub fn refresh_unconfirmed_state( &mut self, burn_dbconn: &dyn BurnStateDB, - ) -> Result<(u128, u128, Vec), Error> { + ) -> Result { let mut unconfirmed_state = self.unconfirmed_state.take(); let res = if let Some(ref mut unconfirmed_state) = unconfirmed_state { if !unconfirmed_state.is_readable() { warn!("Unconfirmed state is not readable; it will soon be refreshed"); - return Ok((0, 0, vec![])); + return Ok(Default::default()); } debug!( @@ -432,7 +462,7 @@ impl StacksChainState { res } else { warn!("No unconfirmed state instantiated"); - Ok((0, 0, vec![])) + Ok(Default::default()) }; self.unconfirmed_state = unconfirmed_state; res diff --git a/src/net/relay.rs b/src/net/relay.rs index f14685d215..83c99afba5 100644 --- a/src/net/relay.rs +++ b/src/net/relay.rs @@ -61,6 +61,7 @@ use rand::Rng; use vm::costs::ExecutionCost; use crate::chainstate::coordinator::BlockEventDispatcher; +use chainstate::stacks::db::unconfirmed::ProcessedUnconfirmedState; pub type BlocksAvailableMap = HashMap; @@ -92,7 +93,7 @@ pub struct RelayerStats { pub struct ProcessedNetReceipts { pub mempool_txs_added: Vec, - pub unconfirmed_microblock_tx_receipts: Vec, + pub processed_unconfirmed_state: ProcessedUnconfirmedState, } /// Private trait for keeping track of messages that can be relayed, so we can identify the peers @@ -1137,7 +1138,7 @@ impl Relayer { pub fn setup_unconfirmed_state( chainstate: &mut StacksChainState, sortdb: &SortitionDB, - ) -> Result, Error> { + ) -> Result { let (canonical_consensus_hash, canonical_block_hash) = SortitionDB::get_canonical_stacks_chain_tip_hash(sortdb.conn())?; let canonical_tip = StacksBlockHeader::make_index_block_hash( @@ -1149,10 +1150,10 @@ impl Relayer { "Reload unconfirmed state off of {}/{}", &canonical_consensus_hash, &canonical_block_hash ); - let (_, _, receipts) = + let processed_unconfirmed_state = chainstate.reload_unconfirmed_state(&sortdb.index_conn(), canonical_tip)?; - Ok(receipts) + Ok(processed_unconfirmed_state) } /// Set up unconfirmed chain state in a read-only fashion @@ -1179,9 +1180,9 @@ impl Relayer { pub fn refresh_unconfirmed( chainstate: &mut StacksChainState, sortdb: &mut SortitionDB, - ) -> Vec { + ) -> ProcessedUnconfirmedState { match Relayer::setup_unconfirmed_state(chainstate, sortdb) { - Ok(receipts) => receipts, + Ok(processed_unconfirmed_state) => processed_unconfirmed_state, Err(e) => { if let net_error::ChainstateError(ref err_msg) = e { if err_msg == "Stacks chainstate error: NoSuchBlockError" { @@ -1192,7 +1193,7 @@ impl Relayer { } else { warn!("Failed to instantiate unconfirmed state: {:?}", &e); } - Vec::new() + Default::default() } } } @@ -1313,16 +1314,16 @@ impl Relayer { } } - let mut unconfirmed_microblock_tx_receipts = Vec::new(); + let mut processed_unconfirmed_state = Default::default(); // finally, refresh the unconfirmed chainstate, if need be if network_result.has_microblocks() { - unconfirmed_microblock_tx_receipts = Relayer::refresh_unconfirmed(chainstate, sortdb); + processed_unconfirmed_state = Relayer::refresh_unconfirmed(chainstate, sortdb); } let receipts = ProcessedNetReceipts { mempool_txs_added, - unconfirmed_microblock_tx_receipts, + processed_unconfirmed_state, }; Ok(receipts) diff --git a/testnet/stacks-node/src/event_dispatcher.rs b/testnet/stacks-node/src/event_dispatcher.rs index 5c5c0e9d8a..a248e9bc45 100644 --- a/testnet/stacks-node/src/event_dispatcher.rs +++ b/testnet/stacks-node/src/event_dispatcher.rs @@ -32,6 +32,7 @@ use stacks::vm::types::{AssetIdentifier, QualifiedContractIdentifier, Value}; use super::config::{EventKeyType, EventObserverConfig}; use super::node::ChainTip; +use stacks::chainstate::stacks::db::unconfirmed::ProcessedUnconfirmedState; #[derive(Debug, Clone)] struct EventObserver { @@ -155,10 +156,10 @@ impl EventObserver { }) } - fn make_new_block_txs_payload( + /// Returns tuple of (txid, success, raw_result, raw_tx, contract_interface_json) + fn generate_payload_info_for_receipt( receipt: &StacksTransactionReceipt, - tx_index: u32, - ) -> serde_json::Value { + ) -> (String, &str, String, String, serde_json::Value) { let tx = &receipt.transaction; let success = match (receipt.post_condition_aborted, &receipt.result) { @@ -192,6 +193,37 @@ impl EventObserver { None => json!(null), } }; + (txid, success, raw_result, raw_tx, contract_interface_json) + } + + /// Returns json payload to send for new block event + fn make_new_block_txs_payload( + receipt: &StacksTransactionReceipt, + tx_index: u32, + ) -> serde_json::Value { + let (txid, success, raw_result, raw_tx, contract_interface_json) = + EventObserver::generate_payload_info_for_receipt(receipt); + + json!({ + "txid": format!("0x{}", &txid), + "tx_index": tx_index, + "status": success, + "raw_result": format!("0x{}", &raw_result), + "raw_tx": format!("0x{}", &raw_tx), + "contract_abi": contract_interface_json, + "execution_cost": receipt.execution_cost, + }) + } + + /// Returns json payload to send for new microblock event + fn make_new_microblock_txs_payload( + receipt: &StacksTransactionReceipt, + tx_index: u32, + sequence: u16, + ) -> serde_json::Value { + let (txid, success, raw_result, raw_tx, contract_interface_json) = + EventObserver::generate_payload_info_for_receipt(receipt); + json!({ "txid": format!("0x{}", &txid), "tx_index": tx_index, @@ -200,6 +232,7 @@ impl EventObserver { "raw_tx": format!("0x{}", &raw_tx), "contract_abi": contract_interface_json, "execution_cost": receipt.execution_cost, + "sequence": sequence, }) } @@ -231,7 +264,7 @@ impl EventObserver { &self, parent_index_block_hash: StacksBlockId, filtered_events: Vec<(usize, &(bool, Txid, StacksTransactionEvent))>, - receipts: &Vec, + serialized_txs: &Vec, ) { // Serialize events to JSON let serialized_events: Vec = filtered_events @@ -241,15 +274,6 @@ impl EventObserver { }) .collect(); - // Serialize receipts - let mut tx_index = 0; - let mut serialized_txs = Vec::new(); - for receipt in receipts.iter() { - let payload = EventObserver::make_new_block_txs_payload(receipt, tx_index); - serialized_txs.push(payload); - tx_index += 1; - } - let payload = json!({ "parent_index_block_hash": format!("0x{}", parent_index_block_hash), "events": serialized_events, @@ -609,7 +633,7 @@ impl EventDispatcher { pub fn process_new_microblocks( &self, parent_index_block_hash: StacksBlockId, - receipts: Vec, + processed_unconfirmed_state: ProcessedUnconfirmedState, ) { // lazily assemble payload only if we have observers let interested_observers: Vec<_> = self @@ -624,7 +648,34 @@ impl EventDispatcher { if interested_observers.len() < 1 { return; } - let (dispatch_matrix, events) = self.create_dispatch_matrix_and_event_vector(&receipts); + let (dispatch_matrix, events) = + self.create_dispatch_matrix_and_event_vector(&processed_unconfirmed_state.receipts); + + // Serialize receipts + let sequence_indices = processed_unconfirmed_state.sequence_indices_for_receipts; + let mut tx_index = 0; + let mut microblock_index = 0; + let mut serialized_txs = Vec::new(); + let mut curr_sequence_number = sequence_indices[microblock_index].1; + + for (i, receipt) in processed_unconfirmed_state.receipts.iter().enumerate() { + // if the current receipt index "belongs" to the next microblock, reset `tx_index` to 0, + // increment the `microblock_index`, and update the sequence number + if microblock_index + 1 < sequence_indices.len() + && i >= sequence_indices[microblock_index + 1].0 + { + microblock_index += 1; + tx_index = 0; + curr_sequence_number = sequence_indices[microblock_index].1; + } + let payload = EventObserver::make_new_microblock_txs_payload( + receipt, + tx_index, + curr_sequence_number, + ); + serialized_txs.push(payload); + tx_index += 1; + } for (obs_id, observer) in interested_observers.iter() { let filtered_events_ids = &dispatch_matrix[*obs_id]; @@ -633,7 +684,11 @@ impl EventDispatcher { .map(|event_id| (*event_id, &events[*event_id])) .collect(); - observer.send_new_microblocks(parent_index_block_hash, filtered_events, &receipts); + observer.send_new_microblocks( + parent_index_block_hash, + filtered_events, + &serialized_txs, + ); } } diff --git a/testnet/stacks-node/src/neon_node.rs b/testnet/stacks-node/src/neon_node.rs index fd5023d2b7..8ad76c1cc4 100644 --- a/testnet/stacks-node/src/neon_node.rs +++ b/testnet/stacks-node/src/neon_node.rs @@ -786,7 +786,7 @@ fn spawn_miner_relayer( event_dispatcher.process_new_mempool_txs(net_receipts.mempool_txs_added); } - let num_unconfirmed_microblock_tx_receipts = net_receipts.unconfirmed_microblock_tx_receipts.len(); + let num_unconfirmed_microblock_tx_receipts = net_receipts.processed_unconfirmed_state.receipts.len(); if num_unconfirmed_microblock_tx_receipts > 0 { let (canonical_consensus_hash, canonical_block_hash) = SortitionDB::get_canonical_stacks_chain_tip_hash( @@ -796,7 +796,7 @@ fn spawn_miner_relayer( &canonical_consensus_hash, &canonical_block_hash, ); - event_dispatcher.process_new_microblocks(canonical_tip, net_receipts.unconfirmed_microblock_tx_receipts); + event_dispatcher.process_new_microblocks(canonical_tip, net_receipts.processed_unconfirmed_state); } // Dispatch retrieved attachments, if any. diff --git a/testnet/stacks-node/src/tests/neon_integrations.rs b/testnet/stacks-node/src/tests/neon_integrations.rs index d6d82e4ea7..e4e8bb76a1 100644 --- a/testnet/stacks-node/src/tests/neon_integrations.rs +++ b/testnet/stacks-node/src/tests/neon_integrations.rs @@ -50,12 +50,14 @@ use std::{ }; use std::{env, thread}; +use crate::util::hash::{MerkleTree, Sha512Trunc256Sum}; +use crate::util::secp256k1::MessageSignature; use stacks::burnchains::bitcoin::address::{BitcoinAddress, BitcoinAddressType}; use stacks::burnchains::bitcoin::BitcoinNetworkType; use stacks::burnchains::{BurnchainHeaderHash, Txid}; use stacks::chainstate::burn::operations::{BlockstackOperationType, PreStxOp, TransferStxOp}; use stacks::chainstate::stacks::boot::boot_code_id; -use stacks::chainstate::stacks::StacksBlockId; +use stacks::chainstate::stacks::{StacksBlockId, StacksMicroblockHeader}; use stacks::core::BLOCK_LIMIT_MAINNET; fn neon_integration_test_conf() -> (Config, StacksAddress) { @@ -1113,6 +1115,32 @@ fn bitcoind_forking_test() { channel.stop_chains_coordinator(); } +/// Returns a StacksMicroblock with the given transactions, sequence, and parent block that is +/// signed with the given private key. +fn make_signed_microblock( + block_privk: &StacksPrivateKey, + txs: Vec, + parent_block: BlockHeaderHash, + seq: u16, +) -> StacksMicroblock { + let txid_vecs = txs.iter().map(|tx| tx.txid().as_bytes().to_vec()).collect(); + let merkle_tree = MerkleTree::::new(&txid_vecs); + let tx_merkle_root = merkle_tree.root(); + + let mut mblock = StacksMicroblock { + header: StacksMicroblockHeader { + version: 0x12, + sequence: seq, + prev_block: parent_block, + tx_merkle_root: tx_merkle_root, + signature: MessageSignature([0u8; 65]), + }, + txs: txs, + }; + mblock.sign(block_privk).unwrap(); + mblock +} + #[test] #[ignore] fn microblock_integration_test() { @@ -1122,6 +1150,8 @@ fn microblock_integration_test() { let spender_sk = StacksPrivateKey::from_hex(SK_1).unwrap(); let spender_addr: PrincipalData = to_addr(&spender_sk).into(); + let second_spender_sk = StacksPrivateKey::from_hex(SK_2).unwrap(); + let second_spender_addr: PrincipalData = to_addr(&second_spender_sk).into(); let (mut conf, miner_account) = neon_integration_test_conf(); @@ -1129,6 +1159,10 @@ fn microblock_integration_test() { address: spender_addr.clone(), amount: 100300, }); + conf.initial_balances.push(InitialBalance { + address: second_spender_addr.clone(), + amount: 10000, + }); conf.node.mine_microblocks = true; conf.node.wait_time_for_microblocks = 30000; @@ -1175,17 +1209,21 @@ fn microblock_integration_test() { next_block_and_wait(&mut btc_regtest_controller, &blocks_processed); // let's query the miner's account nonce: - info!("Miner account: {}", miner_account); let account = get_account(&http_origin, &miner_account); assert_eq!(account.balance, 0); assert_eq!(account.nonce, 1); - // and our spender + // and our first spender let account = get_account(&http_origin, &spender_addr); assert_eq!(account.balance, 100300); assert_eq!(account.nonce, 0); + // and our second spender + let account = get_account(&http_origin, &second_spender_addr); + assert_eq!(account.balance, 10000); + assert_eq!(account.nonce, 0); + // okay, let's push a transaction that is marked microblock only! let recipient = StacksAddress::from_string(ADDR_4).unwrap(); let tx = make_stacks_transfer_mblock_only(&spender_sk, 0, 1000, &recipient.into(), 1000); @@ -1212,18 +1250,31 @@ fn microblock_integration_test() { let account = get_account(&http_origin, &spender_addr); assert_eq!(account.nonce, 1); - // push another transaction that is marked microblock only + // push another two transactions that are marked microblock only let recipient = StacksAddress::from_string(ADDR_4).unwrap(); let unconfirmed_tx_bytes = make_stacks_transfer_mblock_only(&spender_sk, 1, 1000, &recipient.into(), 1000); let unconfirmed_tx = StacksTransaction::consensus_deserialize(&mut &unconfirmed_tx_bytes[..]).unwrap(); + let second_unconfirmed_tx_bytes = + make_stacks_transfer_mblock_only(&second_spender_sk, 0, 1000, &recipient.into(), 1500); + let second_unconfirmed_tx = + StacksTransaction::consensus_deserialize(&mut &second_unconfirmed_tx_bytes[..]).unwrap(); // TODO (hack) instantiate the sortdb in the burnchain let _ = btc_regtest_controller.sortdb_mut(); - // put it into a microblock - let microblock = { + // put each into a microblock + let (microblock, second_microblock) = { + let path = format!("{}/v2/info", &http_origin); + let tip_info = client + .get(&path) + .send() + .unwrap() + .json::() + .unwrap(); + let stacks_tip = tip_info.stacks_tip; + let (consensus_hash, stacks_block) = get_tip_anchored_block(&conf); let tip_hash = StacksBlockHeader::make_index_block_hash(&consensus_hash, &stacks_block.block_hash()); @@ -1237,15 +1288,19 @@ fn microblock_integration_test() { chainstate .reload_unconfirmed_state(&btc_regtest_controller.sortdb_ref().index_conn(), tip_hash) .unwrap(); - - make_microblock( + let first_microblock = make_microblock( &privk, &mut chainstate, &btc_regtest_controller.sortdb_ref().index_conn(), consensus_hash, - stacks_block, + stacks_block.clone(), vec![unconfirmed_tx], - ) + ); + + let second_microblock = + make_signed_microblock(&privk, vec![second_unconfirmed_tx], stacks_tip, 1); + + (first_microblock, second_microblock) }; let mut microblock_bytes = vec![]; @@ -1253,7 +1308,7 @@ fn microblock_integration_test() { .consensus_serialize(&mut microblock_bytes) .unwrap(); - // post it + // post the first microblock let path = format!("{}/v2/microblocks", &http_origin); let res: String = client .post(&path) @@ -1272,6 +1327,24 @@ fn microblock_integration_test() { assert_eq!(account.nonce, 1); assert_eq!(account.balance, 98300); + let mut second_microblock_bytes = vec![]; + second_microblock + .consensus_serialize(&mut second_microblock_bytes) + .unwrap(); + + // post the second microblock + let path = format!("{}/v2/microblocks", &http_origin); + let res: String = client + .post(&path) + .header("Content-Type", "application/octet-stream") + .body(second_microblock_bytes.clone()) + .send() + .unwrap() + .json() + .unwrap(); + + assert_eq!(res, format!("{}", &second_microblock.block_hash())); + let path = format!("{}/v2/info", &http_origin); let tip_info = client .get(&path) @@ -1305,11 +1378,15 @@ fn microblock_integration_test() { sleep_ms(1000); } + // check event observer for new microblock event (expect 2) let mut microblock_events = test_observer::get_microblocks(); - assert_eq!(microblock_events.len(), 1); + assert_eq!(microblock_events.len(), 2); + // this microblock should correspond to `second_microblock` let microblock = microblock_events.pop().unwrap(); let transactions = microblock.get("transactions").unwrap().as_array().unwrap(); assert_eq!(transactions.len(), 1); + let tx_sequence = transactions[0].get("sequence").unwrap().as_u64().unwrap(); + assert_eq!(tx_sequence, 1); let microblock_associated_hash = microblock .get("parent_index_block_hash") .unwrap() @@ -1320,6 +1397,12 @@ fn microblock_integration_test() { StacksBlockId::from_vec(&index_block_hash_bytes), Some(stacks_id_tip) ); + // this microblock should correspond to the first microblock that was posted + let microblock = microblock_events.pop().unwrap(); + let transactions = microblock.get("transactions").unwrap().as_array().unwrap(); + assert_eq!(transactions.len(), 1); + let tx_sequence = transactions[0].get("sequence").unwrap().as_u64().unwrap(); + assert_eq!(tx_sequence, 0); let memtx_events = test_observer::get_memtxs(); assert_eq!(memtx_events.len(), 1); From bbf4f9bb33e5bfe04304787169aa4583db37a093 Mon Sep 17 00:00:00 2001 From: Pavitthra Pandurangan Date: Tue, 13 Apr 2021 19:39:20 -0400 Subject: [PATCH 4/8] Remove new_bytes change (will be addressed in alternate PR), added randomness to version number in test --- src/chainstate/stacks/db/unconfirmed.rs | 4 ++-- testnet/stacks-node/src/tests/neon_integrations.rs | 11 ++++++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/chainstate/stacks/db/unconfirmed.rs b/src/chainstate/stacks/db/unconfirmed.rs index 3e5ede079f..8678b6d1cf 100644 --- a/src/chainstate/stacks/db/unconfirmed.rs +++ b/src/chainstate/stacks/db/unconfirmed.rs @@ -170,7 +170,7 @@ impl UnconfirmedState { let mut sequence_indices_for_receipts = vec![]; let mut mined_txs = UnconfirmedTxMap::new(); let mut new_cost = ExecutionCost::zero(); - let mut new_bytes; + let mut new_bytes = 0; let mut num_new_mblocks = 0; if mblocks.len() > 0 { @@ -241,7 +241,6 @@ impl UnconfirmedState { } total as u64 }; - self.bytes_so_far += new_bytes; for tx in &mblock.txs { mined_txs.insert( @@ -259,6 +258,7 @@ impl UnconfirmedState { self.last_mblock_seq = last_mblock_seq; self.mined_txs.extend(mined_txs); self.cost_so_far = new_cost; + self.bytes_so_far += new_bytes; self.num_mblocks_added += num_new_mblocks; Ok(ProcessedUnconfirmedState { diff --git a/testnet/stacks-node/src/tests/neon_integrations.rs b/testnet/stacks-node/src/tests/neon_integrations.rs index c6361cc2e0..753a5a48a5 100644 --- a/testnet/stacks-node/src/tests/neon_integrations.rs +++ b/testnet/stacks-node/src/tests/neon_integrations.rs @@ -25,7 +25,8 @@ use stacks::{ use stacks::{ chainstate::stacks::{ db::StacksChainState, StacksAddress, StacksBlock, StacksBlockHeader, StacksBlockId, - StacksPrivateKey, StacksPublicKey, StacksTransaction, TransactionPayload, + StacksMicroblockHeader, StacksPrivateKey, StacksPublicKey, StacksTransaction, + TransactionPayload, }, net::RPCPoxInfoData, util::db::query_row_columns, @@ -64,9 +65,9 @@ use stacks::burnchains::bitcoin::BitcoinNetworkType; use stacks::burnchains::{BurnchainHeaderHash, Txid}; use stacks::chainstate::burn::operations::{BlockstackOperationType, PreStxOp, TransferStxOp}; use stacks::chainstate::stacks::boot::boot_code_id; -use stacks::chainstate::stacks::{StacksBlockId, StacksMicroblockHeader}; use stacks::core::BLOCK_LIMIT_MAINNET; +use rand::Rng; use rusqlite::types::ToSql; fn neon_integration_test_conf() -> (Config, StacksAddress) { @@ -1147,13 +1148,15 @@ fn make_signed_microblock( parent_block: BlockHeaderHash, seq: u16, ) -> StacksMicroblock { + let mut rng = rand::thread_rng(); + let txid_vecs = txs.iter().map(|tx| tx.txid().as_bytes().to_vec()).collect(); let merkle_tree = MerkleTree::::new(&txid_vecs); let tx_merkle_root = merkle_tree.root(); let mut mblock = StacksMicroblock { header: StacksMicroblockHeader { - version: 0x12, + version: rng.gen(), sequence: seq, prev_block: parent_block, tx_merkle_root: tx_merkle_root, @@ -1393,6 +1396,7 @@ fn microblock_integration_test() { .unwrap() ); + // todo - pipe in the PoxSyncWatchdog to the RunLoop struct to avoid flakiness here // wait at least two p2p refreshes so it can produce the microblock for i in 0..30 { debug!( @@ -1428,6 +1432,7 @@ fn microblock_integration_test() { let tx_sequence = transactions[0].get("sequence").unwrap().as_u64().unwrap(); assert_eq!(tx_sequence, 0); + // check mempool tx events let memtx_events = test_observer::get_memtxs(); assert_eq!(memtx_events.len(), 1); assert_eq!(&memtx_events[0], &format!("0x{}", &bytes_to_hex(&tx))); From 6c1803b842bff73c42f6c92e403972e6606228d6 Mon Sep 17 00:00:00 2001 From: Pavitthra Pandurangan Date: Wed, 14 Apr 2021 18:04:08 -0400 Subject: [PATCH 5/8] Less convoluted way to map receipts to a specific microblock sequence number --- src/chainstate/stacks/db/unconfirmed.rs | 15 +++----- testnet/stacks-node/src/event_dispatcher.rs | 40 ++++++++++----------- 2 files changed, 22 insertions(+), 33 deletions(-) diff --git a/src/chainstate/stacks/db/unconfirmed.rs b/src/chainstate/stacks/db/unconfirmed.rs index 8678b6d1cf..d3c611ac69 100644 --- a/src/chainstate/stacks/db/unconfirmed.rs +++ b/src/chainstate/stacks/db/unconfirmed.rs @@ -46,10 +46,9 @@ pub type UnconfirmedTxMap = HashMap, - // tuple representing (index, sequence #), where the index corresponds to the starting index of - // receipts corresponding to that sequence number in the receipts vector - pub sequence_indices_for_receipts: Vec<(usize, u16)>, + // each element of this vector is a tuple, where each tuple contains a microblock + // sequence number, and a vector of transaction receipts for that microblock + pub receipts: Vec<(u16, Vec)>, } impl Default for ProcessedUnconfirmedState { @@ -58,7 +57,6 @@ impl Default for ProcessedUnconfirmedState { total_burns: 0, total_fees: 0, receipts: vec![], - sequence_indices_for_receipts: vec![], } } } @@ -167,7 +165,6 @@ impl UnconfirmedState { let mut total_fees = 0; let mut total_burns = 0; let mut all_receipts = vec![]; - let mut sequence_indices_for_receipts = vec![]; let mut mined_txs = UnconfirmedTxMap::new(); let mut new_cost = ExecutionCost::zero(); let mut new_bytes = 0; @@ -224,10 +221,7 @@ impl UnconfirmedState { total_fees += stx_fees; total_burns += stx_burns; num_new_mblocks += 1; - if receipts.len() > 0 { - sequence_indices_for_receipts.push((all_receipts.len(), seq)); - } - all_receipts.append(&mut receipts); + all_receipts.push((seq, receipts)); last_mblock = Some(mblock_header); last_mblock_seq = seq; @@ -264,7 +258,6 @@ impl UnconfirmedState { Ok(ProcessedUnconfirmedState { total_fees, total_burns, - sequence_indices_for_receipts, receipts: all_receipts, }) } diff --git a/testnet/stacks-node/src/event_dispatcher.rs b/testnet/stacks-node/src/event_dispatcher.rs index 6b79b72a9b..5ac99d7c7d 100644 --- a/testnet/stacks-node/src/event_dispatcher.rs +++ b/testnet/stacks-node/src/event_dispatcher.rs @@ -657,33 +657,29 @@ impl EventDispatcher { if interested_observers.len() < 1 { return; } - let (dispatch_matrix, events) = - self.create_dispatch_matrix_and_event_vector(&processed_unconfirmed_state.receipts); + let (dispatch_matrix, events) = self.create_dispatch_matrix_and_event_vector( + &processed_unconfirmed_state + .receipts + .iter() + .flat_map(|(_, r)| r.clone()) + .collect(), + ); // Serialize receipts - let sequence_indices = processed_unconfirmed_state.sequence_indices_for_receipts; let mut tx_index = 0; - let mut microblock_index = 0; let mut serialized_txs = Vec::new(); - let mut curr_sequence_number = sequence_indices[microblock_index].1; - - for (i, receipt) in processed_unconfirmed_state.receipts.iter().enumerate() { - // if the current receipt index "belongs" to the next microblock, reset `tx_index` to 0, - // increment the `microblock_index`, and update the sequence number - if microblock_index + 1 < sequence_indices.len() - && i >= sequence_indices[microblock_index + 1].0 - { - microblock_index += 1; - tx_index = 0; - curr_sequence_number = sequence_indices[microblock_index].1; + + for (curr_sequence_number, receipts) in processed_unconfirmed_state.receipts.iter() { + tx_index = 0; + for receipt in receipts.iter() { + let payload = EventObserver::make_new_microblock_txs_payload( + receipt, + tx_index, + *curr_sequence_number, + ); + serialized_txs.push(payload); + tx_index += 1; } - let payload = EventObserver::make_new_microblock_txs_payload( - receipt, - tx_index, - curr_sequence_number, - ); - serialized_txs.push(payload); - tx_index += 1; } for (obs_id, observer) in interested_observers.iter() { From a317ed908270a0e42ac761d45af19534797aa938 Mon Sep 17 00:00:00 2001 From: Pavitthra Pandurangan Date: Tue, 20 Apr 2021 14:11:32 -0600 Subject: [PATCH 6/8] Get chain tip differently in relayer thread, avoid cloning event value in event dispatcher code --- src/chainstate/stacks/db/unconfirmed.rs | 2 +- testnet/stacks-node/src/event_dispatcher.rs | 76 ++++++++++++--------- testnet/stacks-node/src/neon_node.rs | 15 ++-- 3 files changed, 50 insertions(+), 43 deletions(-) diff --git a/src/chainstate/stacks/db/unconfirmed.rs b/src/chainstate/stacks/db/unconfirmed.rs index d3c611ac69..ad52bbec6f 100644 --- a/src/chainstate/stacks/db/unconfirmed.rs +++ b/src/chainstate/stacks/db/unconfirmed.rs @@ -203,7 +203,7 @@ impl UnconfirmedState { &mblock_hash, mblock.header.sequence ); - let (stx_fees, stx_burns, mut receipts) = + let (stx_fees, stx_burns, receipts) = match StacksChainState::process_microblocks_transactions( &mut clarity_tx, &vec![mblock.clone()], diff --git a/testnet/stacks-node/src/event_dispatcher.rs b/testnet/stacks-node/src/event_dispatcher.rs index 5ac99d7c7d..0e46cc7996 100644 --- a/testnet/stacks-node/src/event_dispatcher.rs +++ b/testnet/stacks-node/src/event_dispatcher.rs @@ -43,6 +43,14 @@ struct EventObserver { should_keep_running: Arc, } +struct ReceiptPayloadInfo<'a> { + txid: String, + success: &'a str, + raw_result: String, + raw_tx: String, + contract_interface_json: serde_json::Value, +} + const STATUS_RESP_TRUE: &str = "success"; const STATUS_RESP_NOT_COMMITTED: &str = "abort_by_response"; const STATUS_RESP_POST_CONDITION: &str = "abort_by_post_condition"; @@ -166,9 +174,7 @@ impl EventObserver { } /// Returns tuple of (txid, success, raw_result, raw_tx, contract_interface_json) - fn generate_payload_info_for_receipt( - receipt: &StacksTransactionReceipt, - ) -> (String, &str, String, String, serde_json::Value) { + fn generate_payload_info_for_receipt(receipt: &StacksTransactionReceipt) -> ReceiptPayloadInfo { let tx = &receipt.transaction; let success = match (receipt.post_condition_aborted, &receipt.result) { @@ -202,7 +208,13 @@ impl EventObserver { None => json!(null), } }; - (txid, success, raw_result, raw_tx, contract_interface_json) + ReceiptPayloadInfo { + txid, + success, + raw_result, + raw_tx, + contract_interface_json, + } } /// Returns json payload to send for new block event @@ -210,16 +222,15 @@ impl EventObserver { receipt: &StacksTransactionReceipt, tx_index: u32, ) -> serde_json::Value { - let (txid, success, raw_result, raw_tx, contract_interface_json) = - EventObserver::generate_payload_info_for_receipt(receipt); + let receipt_payload_info = EventObserver::generate_payload_info_for_receipt(receipt); json!({ - "txid": format!("0x{}", &txid), + "txid": format!("0x{}", &receipt_payload_info.txid), "tx_index": tx_index, - "status": success, - "raw_result": format!("0x{}", &raw_result), - "raw_tx": format!("0x{}", &raw_tx), - "contract_abi": contract_interface_json, + "status": receipt_payload_info.success, + "raw_result": format!("0x{}", &receipt_payload_info.raw_result), + "raw_tx": format!("0x{}", &receipt_payload_info.raw_tx), + "contract_abi": receipt_payload_info.contract_interface_json, "execution_cost": receipt.execution_cost, }) } @@ -230,16 +241,15 @@ impl EventObserver { tx_index: u32, sequence: u16, ) -> serde_json::Value { - let (txid, success, raw_result, raw_tx, contract_interface_json) = - EventObserver::generate_payload_info_for_receipt(receipt); + let receipt_payload_info = EventObserver::generate_payload_info_for_receipt(receipt); json!({ - "txid": format!("0x{}", &txid), + "txid": format!("0x{}", &receipt_payload_info.txid), "tx_index": tx_index, - "status": success, - "raw_result": format!("0x{}", &raw_result), - "raw_tx": format!("0x{}", &raw_tx), - "contract_abi": contract_interface_json, + "status": receipt_payload_info.success, + "raw_result": format!("0x{}", &receipt_payload_info.raw_result), + "raw_tx": format!("0x{}", &receipt_payload_info.raw_tx), + "contract_abi": receipt_payload_info.contract_interface_json, "execution_cost": receipt.execution_cost, "sequence": sequence, }) @@ -272,7 +282,7 @@ impl EventObserver { fn send_new_microblocks( &self, parent_index_block_hash: StacksBlockId, - filtered_events: Vec<(usize, &(bool, Txid, StacksTransactionEvent))>, + filtered_events: Vec<(usize, &(bool, Txid, &StacksTransactionEvent))>, serialized_txs: &Vec, ) { // Serialize events to JSON @@ -302,7 +312,7 @@ impl EventObserver { fn send( &self, - filtered_events: Vec<(usize, &(bool, Txid, StacksTransactionEvent))>, + filtered_events: Vec<(usize, &(bool, Txid, &StacksTransactionEvent))>, chain_tip: &ChainTip, parent_index_hash: &StacksBlockId, boot_receipts: &Vec, @@ -473,19 +483,19 @@ impl EventDispatcher { /// - dispatch_matrix: a vector where each index corresponds to the hashset of event indexes /// that each respective event observer is subscribed to /// - events: a vector of all events from all the tx receipts - fn create_dispatch_matrix_and_event_vector( + fn create_dispatch_matrix_and_event_vector<'a>( &self, - receipts: &Vec, + receipts: &'a Vec, ) -> ( Vec>, - Vec<(bool, Txid, StacksTransactionEvent)>, + Vec<(bool, Txid, &'a StacksTransactionEvent)>, ) { let mut dispatch_matrix: Vec> = self .registered_observers .iter() .map(|_| HashSet::new()) .collect(); - let mut events: Vec<(bool, Txid, StacksTransactionEvent)> = vec![]; + let mut events: Vec<(bool, Txid, &StacksTransactionEvent)> = vec![]; let mut i: usize = 0; for receipt in receipts { @@ -554,7 +564,7 @@ impl EventDispatcher { ); } } - events.push((!receipt.post_condition_aborted, tx_hash, event.clone())); + events.push((!receipt.post_condition_aborted, tx_hash, event)); for o_i in &self.any_event_observers_lookup { dispatch_matrix[*o_i as usize].insert(i); } @@ -657,16 +667,16 @@ impl EventDispatcher { if interested_observers.len() < 1 { return; } - let (dispatch_matrix, events) = self.create_dispatch_matrix_and_event_vector( - &processed_unconfirmed_state - .receipts - .iter() - .flat_map(|(_, r)| r.clone()) - .collect(), - ); + let flattened_receipts = processed_unconfirmed_state + .receipts + .iter() + .flat_map(|(_, r)| r.clone()) + .collect(); + let (dispatch_matrix, events) = + self.create_dispatch_matrix_and_event_vector(&flattened_receipts); // Serialize receipts - let mut tx_index = 0; + let mut tx_index; let mut serialized_txs = Vec::new(); for (curr_sequence_number, receipts) in processed_unconfirmed_state.receipts.iter() { diff --git a/testnet/stacks-node/src/neon_node.rs b/testnet/stacks-node/src/neon_node.rs index 6b7215b44e..f8d1a0ef8c 100644 --- a/testnet/stacks-node/src/neon_node.rs +++ b/testnet/stacks-node/src/neon_node.rs @@ -814,15 +814,12 @@ fn spawn_miner_relayer( let num_unconfirmed_microblock_tx_receipts = net_receipts.processed_unconfirmed_state.receipts.len(); if num_unconfirmed_microblock_tx_receipts > 0 { - let (canonical_consensus_hash, canonical_block_hash) = - SortitionDB::get_canonical_stacks_chain_tip_hash( - sortdb.conn()).expect( - "FATAL: failed to query sortition DB for canonical stacks chain tip"); - let canonical_tip = StacksBlockHeader::make_index_block_hash( - &canonical_consensus_hash, - &canonical_block_hash, - ); - event_dispatcher.process_new_microblocks(canonical_tip, net_receipts.processed_unconfirmed_state); + if let Some(unconfirmed_state) = chainstate.unconfirmed_state.as_ref() { + let canonical_tip = unconfirmed_state.confirmed_chain_tip.clone(); + event_dispatcher.process_new_microblocks(canonical_tip, net_receipts.processed_unconfirmed_state); + } else { + warn!("Relayer: oops, unconfirmed state is uninitialized but there are microblock events"); + } } // Dispatch retrieved attachments, if any. From 35f01703740de4460c4b88fcea2837bcfce709f7 Mon Sep 17 00:00:00 2001 From: Pavitthra Pandurangan Date: Mon, 26 Apr 2021 15:16:18 -0600 Subject: [PATCH 7/8] Fixed import issues --- testnet/stacks-node/src/tests/neon_integrations.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/testnet/stacks-node/src/tests/neon_integrations.rs b/testnet/stacks-node/src/tests/neon_integrations.rs index f831f8cb4d..dc4859370b 100644 --- a/testnet/stacks-node/src/tests/neon_integrations.rs +++ b/testnet/stacks-node/src/tests/neon_integrations.rs @@ -26,6 +26,7 @@ use stacks::net::{ }; use stacks::types::chainstate::{ BlockHeaderHash, BurnchainHeaderHash, StacksAddress, StacksBlockHeader, StacksBlockId, + StacksMicroblockHeader, }; use stacks::util::hash::Hash160; use stacks::util::hash::{bytes_to_hex, hex_bytes}; @@ -45,8 +46,7 @@ use stacks::{ }; use stacks::{ chainstate::stacks::{ - db::StacksChainState, StacksAddress, StacksBlock, StacksBlockHeader, StacksBlockId, - StacksMicroblockHeader, StacksPrivateKey, StacksPublicKey, StacksTransaction, + db::StacksChainState, StacksBlock, StacksPrivateKey, StacksPublicKey, StacksTransaction, TransactionPayload, }, net::RPCPoxInfoData, @@ -73,7 +73,6 @@ use super::{ SK_2, }; - fn neon_integration_test_conf() -> (Config, StacksAddress) { let mut conf = super::new_test_conf(); @@ -2287,7 +2286,7 @@ fn size_overflow_unconfirmed_invalid_stream_microblocks_integration_test() { } let mut ctr = 0; - for i in 0..6 { + for _i in 0..6 { submit_tx(&http_origin, &flat_txs[ctr]); if !wait_for_microblocks(µblocks_processed, 240) { break; From 0b66a0dd5860f57fd96171c9f88650a6bc72625e Mon Sep 17 00:00:00 2001 From: Pavitthra Pandurangan Date: Mon, 26 Apr 2021 15:20:26 -0600 Subject: [PATCH 8/8] Fixed rustfmt issue --- src/net/relay.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net/relay.rs b/src/net/relay.rs index 54466aea3e..a5b53519f8 100644 --- a/src/net/relay.rs +++ b/src/net/relay.rs @@ -52,8 +52,8 @@ use util::hash::Sha512Trunc256Sum; use vm::costs::ExecutionCost; use crate::chainstate::coordinator::BlockEventDispatcher; -use chainstate::stacks::db::unconfirmed::ProcessedUnconfirmedState; use crate::types::chainstate::{PoxId, SortitionId}; +use chainstate::stacks::db::unconfirmed::ProcessedUnconfirmedState; use types::chainstate::BurnchainHeaderHash; pub type BlocksAvailableMap = HashMap;