Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

adds ErasureSetId identifying erasure coding sets of shreds #21928

Merged
merged 1 commit into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use {
leader_schedule_cache::LeaderScheduleCache,
next_slots_iterator::NextSlotsIterator,
shred::{
max_ticks_per_n_shreds, Result as ShredResult, Shred, ShredId, ShredType, Shredder,
SHRED_PAYLOAD_SIZE,
max_ticks_per_n_shreds, ErasureSetId, Result as ShredResult, Shred, ShredId, ShredType,
Shredder, SHRED_PAYLOAD_SIZE,
},
},
bincode::deserialize,
Expand Down Expand Up @@ -551,8 +551,8 @@ impl Blockstore {
false
}

pub fn erasure_meta(&self, slot: Slot, set_index: u64) -> Result<Option<ErasureMeta>> {
self.erasure_meta_cf.get((slot, set_index))
fn erasure_meta(&self, erasure_set: ErasureSetId) -> Result<Option<ErasureMeta>> {
self.erasure_meta_cf.get(erasure_set.store_key())
}

pub fn orphan(&self, slot: Slot) -> Result<Option<bool>> {
Expand Down Expand Up @@ -731,7 +731,7 @@ impl Blockstore {

fn try_shred_recovery(
db: &Database,
erasure_metas: &HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
erasure_metas: &HashMap<ErasureSetId, ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
) -> Vec<Shred> {
Expand All @@ -743,7 +743,8 @@ impl Blockstore {
// 2. For new data shreds, check if an erasure set exists. If not, don't try recovery
// 3. Before trying recovery, check if enough number of shreds have been received
// 3a. Enough number of shreds = (#data + #coding shreds) > erasure.num_data
for (&(slot, _fec_set_index), erasure_meta) in erasure_metas.iter() {
for (erasure_set, erasure_meta) in erasure_metas.iter() {
let slot = erasure_set.slot();
let index_meta_entry = index_working_set.get_mut(&slot).expect("Index");
let index = &mut index_meta_entry.index;
match erasure_meta.status(index) {
Expand Down Expand Up @@ -940,8 +941,8 @@ impl Blockstore {
&mut write_batch,
)?;

for ((slot, set_index), erasure_meta) in erasure_metas {
write_batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
for (erasure_set, erasure_meta) in erasure_metas {
write_batch.put::<cf::ErasureMeta>(erasure_set.store_key(), &erasure_meta)?;
}

for (&slot, index_working_set_entry) in index_working_set.iter() {
Expand Down Expand Up @@ -1032,7 +1033,7 @@ impl Blockstore {
fn check_insert_coding_shred<F>(
&self,
shred: Shred,
erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_received_shreds: &mut HashMap<ShredId, Shred>,
Expand Down Expand Up @@ -1069,9 +1070,9 @@ impl Blockstore {
}
}

let set_index = u64::from(shred.fec_set_index());
let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| {
self.erasure_meta(slot, set_index)
let erasure_set = shred.erasure_set();
let erasure_meta = erasure_metas.entry(erasure_set).or_insert_with(|| {
self.erasure_meta(erasure_set)
.expect("Expect database get to succeed")
.unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap())
});
Expand Down Expand Up @@ -1100,8 +1101,8 @@ impl Blockstore {
// ToDo: This is a potential slashing condition
warn!("Received multiple erasure configs for the same erasure set!!!");
warn!(
"Slot: {}, shred index: {}, set_index: {}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}",
slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), shred.coding_header,
"Slot: {}, shred index: {}, erasure_set: {:?}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}",
slot, shred.index(), erasure_set, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), shred.coding_header,
);

return false;
Expand Down Expand Up @@ -1200,7 +1201,7 @@ impl Blockstore {
fn check_insert_data_shred<F>(
&self,
shred: Shred,
erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
Expand Down Expand Up @@ -1264,7 +1265,7 @@ impl Blockstore {
}
}

let set_index = u64::from(shred.fec_set_index());
let erasure_set = shred.erasure_set();
let newly_completed_data_sets = self.insert_data_shred(
slot_meta,
index_meta.data_mut(),
Expand All @@ -1275,8 +1276,8 @@ impl Blockstore {
just_inserted_shreds.insert(shred.id(), shred);
index_meta_working_set_entry.did_insert_occur = true;
slot_meta_entry.did_insert_occur = true;
if let HashMapEntry::Vacant(entry) = erasure_metas.entry((slot, set_index)) {
if let Some(meta) = self.erasure_meta(slot, set_index).unwrap() {
if let HashMapEntry::Vacant(entry) = erasure_metas.entry(erasure_set) {
if let Some(meta) = self.erasure_meta(erasure_set).unwrap() {
entry.insert(meta);
}
}
Expand Down
22 changes: 21 additions & 1 deletion ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ pub struct Shred {
pub payload: Vec<u8>,
}

/// Tuple which should uniquely identify a shred if it exists.
/// Tuple which uniquely identifies a shred should it exists.
#[derive(Clone, Copy, Eq, Hash, PartialEq)]
pub struct ShredId(Slot, /*shred index:*/ u32, ShredType);

Expand All @@ -250,6 +250,21 @@ impl ShredId {
}
}

/// Tuple which identifies erasure coding set that the shred belongs to.
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub(crate) struct ErasureSetId(Slot, /*fec_set_index:*/ u32);

impl ErasureSetId {
pub(crate) fn slot(&self) -> Slot {
self.0
}

// Storage key for ErasureMeta in blockstore db.
pub(crate) fn store_key(&self) -> (Slot, /*fec_set_index:*/ u64) {
(self.0, u64::from(self.1))
}
}

impl Shred {
fn deserialize_obj<'de, T>(index: &mut usize, size: usize, buf: &'de [u8]) -> bincode::Result<T>
where
Expand Down Expand Up @@ -518,6 +533,11 @@ impl Shred {
self.common_header.version
}

// Identifier for the erasure coding set that the shred belongs to.
pub(crate) fn erasure_set(&self) -> ErasureSetId {
ErasureSetId(self.slot(), self.fec_set_index())
}

// Returns the block index within the erasure coding set.
fn erasure_block_index(&self) -> Option<usize> {
let index = self.index().checked_sub(self.fec_set_index())?;
Expand Down