Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Added event for new microblocks #2571

Merged
merged 13 commits into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 47 additions & 17 deletions src/chainstate/stacks/db/unconfirmed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,26 @@ use vm::costs::ExecutionCost;

pub type UnconfirmedTxMap = HashMap<Txid, (StacksTransaction, BlockHeaderHash, u16)>;

pub struct ProcessedUnconfirmedState {
pub total_burns: u128,
pub total_fees: u128,
pub receipts: Vec<StacksTransactionReceipt>,
// tuple representing (index, sequence #), where the index corresponds to the starting index of
// receipts corresponding to that sequence number in the receipts vector
pub sequence_indices_for_receipts: Vec<(usize, u16)>,
pavitthrap marked this conversation as resolved.
Show resolved Hide resolved
}

impl Default for ProcessedUnconfirmedState {
fn default() -> Self {
ProcessedUnconfirmedState {
total_burns: 0,
total_fees: 0,
receipts: vec![],
sequence_indices_for_receipts: vec![],
}
}
}

pub struct UnconfirmedState {
pub confirmed_chain_tip: StacksBlockId,
pub unconfirmed_chain_tip: StacksBlockId,
Expand Down Expand Up @@ -128,10 +148,10 @@ impl UnconfirmedState {
chainstate: &StacksChainState,
burn_dbconn: &dyn BurnStateDB,
mblocks: Vec<StacksMicroblock>,
) -> Result<(u128, u128, Vec<StacksTransactionReceipt>), Error> {
) -> Result<ProcessedUnconfirmedState, Error> {
if self.last_mblock_seq == u16::max_value() {
// drop them -- nothing to do
return Ok((0, 0, vec![]));
return Ok(Default::default());
}

debug!(
Expand All @@ -147,9 +167,10 @@ impl UnconfirmedState {
let mut total_fees = 0;
let mut total_burns = 0;
let mut all_receipts = vec![];
let mut sequence_indices_for_receipts = vec![];
let mut mined_txs = UnconfirmedTxMap::new();
let mut new_cost = ExecutionCost::zero();
let mut new_bytes = 0;
let mut new_bytes;
let mut num_new_mblocks = 0;

if mblocks.len() > 0 {
Expand Down Expand Up @@ -203,6 +224,9 @@ impl UnconfirmedState {
total_fees += stx_fees;
total_burns += stx_burns;
num_new_mblocks += 1;
if receipts.len() > 0 {
sequence_indices_for_receipts.push((all_receipts.len(), seq));
}
all_receipts.append(&mut receipts);

last_mblock = Some(mblock_header);
Expand All @@ -217,6 +241,7 @@ impl UnconfirmedState {
}
total as u64
};
self.bytes_so_far += new_bytes;
pavitthrap marked this conversation as resolved.
Show resolved Hide resolved

for tx in &mblock.txs {
mined_txs.insert(
Expand All @@ -234,10 +259,14 @@ impl UnconfirmedState {
self.last_mblock_seq = last_mblock_seq;
self.mined_txs.extend(mined_txs);
self.cost_so_far = new_cost;
self.bytes_so_far += new_bytes;
self.num_mblocks_added += num_new_mblocks;

Ok((total_fees, total_burns, all_receipts))
Ok(ProcessedUnconfirmedState {
total_fees,
total_burns,
sequence_indices_for_receipts,
receipts: all_receipts,
})
}

/// Load up the Stacks microblock stream to process, composed of only the new microblocks
Expand All @@ -262,24 +291,25 @@ impl UnconfirmedState {
}

/// Update the view of the current confiremd chain tip's unconfirmed microblock state
/// Returns ProcessedUnconfirmedState for the microblocks newly added to the unconfirmed state
pub fn refresh(
&mut self,
chainstate: &StacksChainState,
burn_dbconn: &dyn BurnStateDB,
) -> Result<(u128, u128, Vec<StacksTransactionReceipt>), Error> {
) -> Result<ProcessedUnconfirmedState, Error> {
assert!(
!self.readonly,
"BUG: code tried to write unconfirmed state to a read-only instance"
);

if self.last_mblock_seq == u16::max_value() {
// no-op
return Ok((0, 0, vec![]));
return Ok(Default::default());
}

match self.load_child_microblocks(chainstate)? {
Some(microblocks) => self.append_microblocks(chainstate, burn_dbconn, microblocks),
None => Ok((0, 0, vec![])),
None => Ok(Default::default()),
}
}

Expand Down Expand Up @@ -349,15 +379,15 @@ impl StacksChainState {
&self,
burn_dbconn: &dyn BurnStateDB,
anchored_block_id: StacksBlockId,
) -> Result<(UnconfirmedState, u128, u128, Vec<StacksTransactionReceipt>), Error> {
) -> Result<(UnconfirmedState, ProcessedUnconfirmedState), Error> {
debug!("Make new unconfirmed state off of {}", &anchored_block_id);
let mut unconfirmed_state = UnconfirmedState::new(self, anchored_block_id)?;
let (fees, burns, receipts) = unconfirmed_state.refresh(self, burn_dbconn)?;
let processed_unconfirmed_state = unconfirmed_state.refresh(self, burn_dbconn)?;
debug!(
"Made new unconfirmed state off of {} (at {})",
&anchored_block_id, &unconfirmed_state.unconfirmed_chain_tip
);
Ok((unconfirmed_state, fees, burns, receipts))
Ok((unconfirmed_state, processed_unconfirmed_state))
}

/// Reload the unconfirmed view from a new chain tip.
Expand All @@ -369,7 +399,7 @@ impl StacksChainState {
&mut self,
burn_dbconn: &dyn BurnStateDB,
canonical_tip: StacksBlockId,
) -> Result<(u128, u128, Vec<StacksTransactionReceipt>), Error> {
) -> Result<ProcessedUnconfirmedState, Error> {
debug!("Reload unconfirmed state off of {}", &canonical_tip);

let unconfirmed_state = self.unconfirmed_state.take();
Expand Down Expand Up @@ -401,7 +431,7 @@ impl StacksChainState {
self.drop_unconfirmed_state(unconfirmed_state);
}

let (new_unconfirmed_state, fees, burns, receipts) =
let (new_unconfirmed_state, processed_unconfirmed_state) =
self.make_unconfirmed_state(burn_dbconn, canonical_tip)?;

debug!(
Expand All @@ -410,19 +440,19 @@ impl StacksChainState {
);

self.unconfirmed_state = Some(new_unconfirmed_state);
Ok((fees, burns, receipts))
Ok(processed_unconfirmed_state)
}

/// Refresh the current unconfirmed chain state
pub fn refresh_unconfirmed_state(
&mut self,
burn_dbconn: &dyn BurnStateDB,
) -> Result<(u128, u128, Vec<StacksTransactionReceipt>), Error> {
) -> Result<ProcessedUnconfirmedState, Error> {
let mut unconfirmed_state = self.unconfirmed_state.take();
let res = if let Some(ref mut unconfirmed_state) = unconfirmed_state {
if !unconfirmed_state.is_readable() {
warn!("Unconfirmed state is not readable; it will soon be refreshed");
return Ok((0, 0, vec![]));
return Ok(Default::default());
}

debug!(
Expand All @@ -439,7 +469,7 @@ impl StacksChainState {
res
} else {
warn!("No unconfirmed state instantiated");
Ok((0, 0, vec![]))
Ok(Default::default())
};
self.unconfirmed_state = unconfirmed_state;
res
Expand Down
42 changes: 29 additions & 13 deletions src/net/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use rand::Rng;
use vm::costs::ExecutionCost;

use crate::chainstate::coordinator::BlockEventDispatcher;
use chainstate::stacks::db::unconfirmed::ProcessedUnconfirmedState;

pub type BlocksAvailableMap = HashMap<BurnchainHeaderHash, (u64, ConsensusHash)>;

Expand Down Expand Up @@ -92,6 +93,7 @@ pub struct RelayerStats {

pub struct ProcessedNetReceipts {
pub mempool_txs_added: Vec<StacksTransaction>,
pub processed_unconfirmed_state: ProcessedUnconfirmedState,
}

/// Private trait for keeping track of messages that can be relayed, so we can identify the peers
Expand Down Expand Up @@ -757,7 +759,7 @@ impl Relayer {
Ok((new_blocks, bad_neighbors))
}

/// Prerocess all downloaded, confirmed microblock streams.
/// Preprocess all downloaded, confirmed microblock streams.
/// Does not fail on invalid blocks; just logs a warning.
/// Returns the consensus hashes for the sortitions that elected the stacks anchored blocks that produced these streams.
fn preprocess_downloaded_microblocks(
Expand Down Expand Up @@ -1112,7 +1114,7 @@ impl Relayer {
pub fn setup_unconfirmed_state(
chainstate: &mut StacksChainState,
sortdb: &SortitionDB,
) -> Result<(), Error> {
) -> Result<ProcessedUnconfirmedState, Error> {
let (canonical_consensus_hash, canonical_block_hash) =
SortitionDB::get_canonical_stacks_chain_tip_hash(sortdb.conn())?;
let canonical_tip = StacksBlockHeader::make_index_block_hash(
Expand All @@ -1124,8 +1126,10 @@ impl Relayer {
"Reload unconfirmed state off of {}/{}",
&canonical_consensus_hash, &canonical_block_hash
);
chainstate.reload_unconfirmed_state(&sortdb.index_conn(), canonical_tip)?;
Ok(())
let processed_unconfirmed_state =
chainstate.reload_unconfirmed_state(&sortdb.index_conn(), canonical_tip)?;

Ok(processed_unconfirmed_state)
}

/// Set up unconfirmed chain state in a read-only fashion
Expand All @@ -1149,16 +1153,23 @@ impl Relayer {
Ok(())
}

pub fn refresh_unconfirmed(chainstate: &mut StacksChainState, sortdb: &mut SortitionDB) {
if let Err(e) = Relayer::setup_unconfirmed_state(chainstate, sortdb) {
if let net_error::ChainstateError(ref err_msg) = e {
if err_msg == "Stacks chainstate error: NoSuchBlockError" {
trace!("Failed to instantiate unconfirmed state: {:?}", &e);
pub fn refresh_unconfirmed(
chainstate: &mut StacksChainState,
sortdb: &mut SortitionDB,
) -> ProcessedUnconfirmedState {
match Relayer::setup_unconfirmed_state(chainstate, sortdb) {
Ok(processed_unconfirmed_state) => processed_unconfirmed_state,
Err(e) => {
if let net_error::ChainstateError(ref err_msg) = e {
if err_msg == "Stacks chainstate error: NoSuchBlockError" {
trace!("Failed to instantiate unconfirmed state: {:?}", &e);
} else {
warn!("Failed to instantiate unconfirmed state: {:?}", &e);
}
} else {
warn!("Failed to instantiate unconfirmed state: {:?}", &e);
}
} else {
warn!("Failed to instantiate unconfirmed state: {:?}", &e);
Default::default()
}
}
}
Expand Down Expand Up @@ -1279,13 +1290,18 @@ impl Relayer {
}
}

let receipts = ProcessedNetReceipts { mempool_txs_added };
let mut processed_unconfirmed_state = Default::default();

// finally, refresh the unconfirmed chainstate, if need be
if network_result.has_microblocks() {
Relayer::refresh_unconfirmed(chainstate, sortdb);
processed_unconfirmed_state = Relayer::refresh_unconfirmed(chainstate, sortdb);
}

let receipts = ProcessedNetReceipts {
mempool_txs_added,
processed_unconfirmed_state,
};

Ok(receipts)
}
}
Expand Down
5 changes: 5 additions & 0 deletions testnet/stacks-node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,7 @@ pub enum EventKeyType {
AssetEvent(AssetIdentifier),
STXEvent,
MemPoolTransactions,
Microblocks,
AnyEvent,
BurnchainBlocks,
}
Expand All @@ -1322,6 +1323,10 @@ impl EventKeyType {
return Some(EventKeyType::BurnchainBlocks);
}

if raw_key == "microblocks" {
return Some(EventKeyType::Microblocks);
}

let comps: Vec<_> = raw_key.split("::").collect();
if comps.len() == 1 {
let split: Vec<_> = comps[0].split(".").collect();
Expand Down
Loading