https://github.com/solana-labs/solana/pull/16646
removed first_coding_index since the field is currently redundant and
always equal to fec_set_index.
However, with upcoming changes to erasure coding schema, this will no
longer be the same as fec_set_index and so requires a separate field to
represent.
(cherry picked from commit 49ba09b333
)
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
@ -10,7 +10,6 @@ use {
|
|||||||
IteratorMode, LedgerColumn, Result, WriteBatch,
|
IteratorMode, LedgerColumn, Result, WriteBatch,
|
||||||
},
|
},
|
||||||
blockstore_meta::*,
|
blockstore_meta::*,
|
||||||
erasure::ErasureConfig,
|
|
||||||
leader_schedule_cache::LeaderScheduleCache,
|
leader_schedule_cache::LeaderScheduleCache,
|
||||||
next_slots_iterator::NextSlotsIterator,
|
next_slots_iterator::NextSlotsIterator,
|
||||||
shred::{Result as ShredResult, Shred, ShredType, Shredder, SHRED_PAYLOAD_SIZE},
|
shred::{Result as ShredResult, Shred, ShredType, Shredder, SHRED_PAYLOAD_SIZE},
|
||||||
@ -1056,21 +1055,16 @@ impl Blockstore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let set_index = u64::from(shred.common_header.fec_set_index);
|
let set_index = u64::from(shred.fec_set_index());
|
||||||
let erasure_config = ErasureConfig::new(
|
|
||||||
shred.coding_header.num_data_shreds as usize,
|
|
||||||
shred.coding_header.num_coding_shreds as usize,
|
|
||||||
);
|
|
||||||
|
|
||||||
let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| {
|
let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| {
|
||||||
self.erasure_meta(slot, set_index)
|
self.erasure_meta(slot, set_index)
|
||||||
.expect("Expect database get to succeed")
|
.expect("Expect database get to succeed")
|
||||||
.unwrap_or_else(|| ErasureMeta::new(set_index, erasure_config))
|
.unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap())
|
||||||
});
|
});
|
||||||
|
|
||||||
// TODO: handle_duplicate is not invoked and so duplicate shreds are
|
// TODO: handle_duplicate is not invoked and so duplicate shreds are
|
||||||
// not gossiped to the rest of cluster.
|
// not gossiped to the rest of cluster.
|
||||||
if erasure_config != erasure_meta.config() {
|
if !erasure_meta.check_coding_shred(&shred) {
|
||||||
metrics.num_coding_shreds_invalid_erasure_config += 1;
|
metrics.num_coding_shreds_invalid_erasure_config += 1;
|
||||||
let conflicting_shred = self.find_conflicting_coding_shred(
|
let conflicting_shred = self.find_conflicting_coding_shred(
|
||||||
&shred,
|
&shred,
|
||||||
@ -1093,7 +1087,7 @@ impl Blockstore {
|
|||||||
warn!("Received multiple erasure configs for the same erasure set!!!");
|
warn!("Received multiple erasure configs for the same erasure set!!!");
|
||||||
warn!(
|
warn!(
|
||||||
"Slot: {}, shred index: {}, set_index: {}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}",
|
"Slot: {}, shred index: {}, set_index: {}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}",
|
||||||
slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), erasure_config
|
slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), shred.coding_header,
|
||||||
);
|
);
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
@ -1223,7 +1217,7 @@ impl Blockstore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let set_index = u64::from(shred.common_header.fec_set_index);
|
let set_index = u64::from(shred.fec_set_index());
|
||||||
let newly_completed_data_sets = self.insert_data_shred(
|
let newly_completed_data_sets = self.insert_data_shred(
|
||||||
slot_meta,
|
slot_meta,
|
||||||
index_meta.data_mut(),
|
index_meta.data_mut(),
|
||||||
@ -5438,7 +5432,7 @@ pub mod tests {
|
|||||||
true, // is_last_in_slot
|
true, // is_last_in_slot
|
||||||
0, // reference_tick
|
0, // reference_tick
|
||||||
shred5.common_header.version,
|
shred5.common_header.version,
|
||||||
shred5.common_header.fec_set_index,
|
shred5.fec_set_index(),
|
||||||
);
|
);
|
||||||
assert!(blockstore.should_insert_data_shred(
|
assert!(blockstore.should_insert_data_shred(
|
||||||
&empty_shred,
|
&empty_shred,
|
||||||
@ -5654,7 +5648,7 @@ pub mod tests {
|
|||||||
DataShredHeader::default(),
|
DataShredHeader::default(),
|
||||||
coding.clone(),
|
coding.clone(),
|
||||||
);
|
);
|
||||||
let index = coding_shred.index() - coding_shred.common_header.fec_set_index - 1;
|
let index = coding_shred.index() - coding_shred.fec_set_index() - 1;
|
||||||
coding_shred.set_index(index as u32);
|
coding_shred.set_index(index as u32);
|
||||||
|
|
||||||
assert!(!Blockstore::should_insert_coding_shred(
|
assert!(!Blockstore::should_insert_coding_shred(
|
||||||
@ -5684,8 +5678,7 @@ pub mod tests {
|
|||||||
DataShredHeader::default(),
|
DataShredHeader::default(),
|
||||||
coding.clone(),
|
coding.clone(),
|
||||||
);
|
);
|
||||||
let num_coding_shreds =
|
let num_coding_shreds = coding_shred.index() - coding_shred.fec_set_index();
|
||||||
coding_shred.common_header.index - coding_shred.common_header.fec_set_index;
|
|
||||||
coding_shred.coding_header.num_coding_shreds = num_coding_shreds as u16;
|
coding_shred.coding_header.num_coding_shreds = num_coding_shreds as u16;
|
||||||
assert!(!Blockstore::should_insert_coding_shred(
|
assert!(!Blockstore::should_insert_coding_shred(
|
||||||
&coding_shred,
|
&coding_shred,
|
||||||
|
@ -1,5 +1,8 @@
|
|||||||
use {
|
use {
|
||||||
crate::erasure::ErasureConfig,
|
crate::{
|
||||||
|
erasure::ErasureConfig,
|
||||||
|
shred::{Shred, ShredType},
|
||||||
|
},
|
||||||
serde::{Deserialize, Serialize},
|
serde::{Deserialize, Serialize},
|
||||||
solana_sdk::{clock::Slot, hash::Hash},
|
solana_sdk::{clock::Slot, hash::Hash},
|
||||||
std::{
|
std::{
|
||||||
@ -56,9 +59,8 @@ pub struct ShredIndex {
|
|||||||
pub struct ErasureMeta {
|
pub struct ErasureMeta {
|
||||||
/// Which erasure set in the slot this is
|
/// Which erasure set in the slot this is
|
||||||
set_index: u64,
|
set_index: u64,
|
||||||
/// Deprecated field.
|
/// First coding index in the FEC set
|
||||||
#[serde(rename = "first_coding_index")]
|
first_coding_index: u64,
|
||||||
__unused_first_coding_index: u64,
|
|
||||||
/// Size of shards in this erasure set
|
/// Size of shards in this erasure set
|
||||||
#[serde(rename = "size")]
|
#[serde(rename = "size")]
|
||||||
__unused_size: usize,
|
__unused_size: usize,
|
||||||
@ -226,14 +228,40 @@ impl SlotMeta {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl ErasureMeta {
|
impl ErasureMeta {
|
||||||
pub(crate) fn new(set_index: u64, config: ErasureConfig) -> ErasureMeta {
|
pub(crate) fn from_coding_shred(shred: &Shred) -> Option<Self> {
|
||||||
ErasureMeta {
|
match shred.shred_type() {
|
||||||
set_index,
|
ShredType::Data => None,
|
||||||
|
ShredType::Code => {
|
||||||
|
let config = ErasureConfig::new(
|
||||||
|
usize::from(shred.coding_header.num_data_shreds),
|
||||||
|
usize::from(shred.coding_header.num_coding_shreds),
|
||||||
|
);
|
||||||
|
let first_coding_index = u64::from(shred.first_coding_index()?);
|
||||||
|
let erasure_meta = ErasureMeta {
|
||||||
|
set_index: u64::from(shred.fec_set_index()),
|
||||||
config,
|
config,
|
||||||
__unused_first_coding_index: 0,
|
first_coding_index,
|
||||||
__unused_size: 0,
|
__unused_size: 0,
|
||||||
|
};
|
||||||
|
Some(erasure_meta)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns true if the erasure fields on the shred
|
||||||
|
// are consistent with the erasure-meta.
|
||||||
|
pub(crate) fn check_coding_shred(&self, shred: &Shred) -> bool {
|
||||||
|
let mut other = match Self::from_coding_shred(shred) {
|
||||||
|
Some(erasure_meta) => erasure_meta,
|
||||||
|
None => return false,
|
||||||
|
};
|
||||||
|
other.__unused_size = self.__unused_size;
|
||||||
|
// Ignore first_coding_index field for now to be backward compatible.
|
||||||
|
// TODO remove this once cluster is upgraded to always populate
|
||||||
|
// first_coding_index field.
|
||||||
|
other.first_coding_index = self.first_coding_index;
|
||||||
|
self == &other
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn config(&self) -> ErasureConfig {
|
pub(crate) fn config(&self) -> ErasureConfig {
|
||||||
self.config
|
self.config
|
||||||
@ -246,7 +274,16 @@ impl ErasureMeta {
|
|||||||
|
|
||||||
pub(crate) fn coding_shreds_indices(&self) -> Range<u64> {
|
pub(crate) fn coding_shreds_indices(&self) -> Range<u64> {
|
||||||
let num_coding = self.config.num_coding() as u64;
|
let num_coding = self.config.num_coding() as u64;
|
||||||
self.set_index..self.set_index + num_coding
|
// first_coding_index == 0 may imply that the field is not populated.
|
||||||
|
// self.set_index to be backward compatible.
|
||||||
|
// TODO remove this once cluster is upgraded to always populate
|
||||||
|
// first_coding_index field.
|
||||||
|
let first_coding_index = if self.first_coding_index == 0 {
|
||||||
|
self.set_index
|
||||||
|
} else {
|
||||||
|
self.first_coding_index
|
||||||
|
};
|
||||||
|
first_coding_index..first_coding_index + num_coding
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn status(&self, index: &Index) -> ErasureMetaStatus {
|
pub(crate) fn status(&self, index: &Index) -> ErasureMetaStatus {
|
||||||
@ -316,7 +353,12 @@ mod test {
|
|||||||
let set_index = 0;
|
let set_index = 0;
|
||||||
let erasure_config = ErasureConfig::new(8, 16);
|
let erasure_config = ErasureConfig::new(8, 16);
|
||||||
|
|
||||||
let e_meta = ErasureMeta::new(set_index, erasure_config);
|
let e_meta = ErasureMeta {
|
||||||
|
set_index,
|
||||||
|
first_coding_index: set_index,
|
||||||
|
config: erasure_config,
|
||||||
|
__unused_size: 0,
|
||||||
|
};
|
||||||
let mut rng = thread_rng();
|
let mut rng = thread_rng();
|
||||||
let mut index = Index::new(0);
|
let mut index = Index::new(0);
|
||||||
|
|
||||||
|
@ -53,18 +53,18 @@ pub struct ErasureConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl ErasureConfig {
|
impl ErasureConfig {
|
||||||
pub fn new(num_data: usize, num_coding: usize) -> ErasureConfig {
|
pub(crate) fn new(num_data: usize, num_coding: usize) -> ErasureConfig {
|
||||||
ErasureConfig {
|
ErasureConfig {
|
||||||
num_data,
|
num_data,
|
||||||
num_coding,
|
num_coding,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn num_data(self) -> usize {
|
pub(crate) fn num_data(self) -> usize {
|
||||||
self.num_data
|
self.num_data
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn num_coding(self) -> usize {
|
pub(crate) fn num_coding(self) -> usize {
|
||||||
self.num_coding
|
self.num_coding
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -471,6 +471,15 @@ impl Shred {
|
|||||||
self.common_header.fec_set_index
|
self.common_header.fec_set_index
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn first_coding_index(&self) -> Option<u32> {
|
||||||
|
match self.shred_type() {
|
||||||
|
ShredType::Data => None,
|
||||||
|
// TODO should be: self.index() - self.coding_header.position
|
||||||
|
// once position field is populated.
|
||||||
|
ShredType::Code => Some(self.fec_set_index()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Returns true if the shred passes sanity checks.
|
// Returns true if the shred passes sanity checks.
|
||||||
pub(crate) fn sanitize(&self) -> bool {
|
pub(crate) fn sanitize(&self) -> bool {
|
||||||
self.erasure_block_index().is_some()
|
self.erasure_block_index().is_some()
|
||||||
@ -883,7 +892,7 @@ impl Shredder {
|
|||||||
assert_eq!(fec_set_index, index);
|
assert_eq!(fec_set_index, index);
|
||||||
assert!(data.iter().all(|shred| shred.common_header.slot == slot
|
assert!(data.iter().all(|shred| shred.common_header.slot == slot
|
||||||
&& shred.common_header.version == version
|
&& shred.common_header.version == version
|
||||||
&& shred.common_header.fec_set_index == fec_set_index));
|
&& shred.fec_set_index() == fec_set_index));
|
||||||
let num_data = data.len();
|
let num_data = data.len();
|
||||||
let num_coding = if is_last_in_slot {
|
let num_coding = if is_last_in_slot {
|
||||||
(2 * MAX_DATA_SHREDS_PER_FEC_BLOCK as usize)
|
(2 * MAX_DATA_SHREDS_PER_FEC_BLOCK as usize)
|
||||||
@ -929,7 +938,7 @@ impl Shredder {
|
|||||||
Self::verify_consistent_shred_payload_sizes("try_recovery()", &shreds)?;
|
Self::verify_consistent_shred_payload_sizes("try_recovery()", &shreds)?;
|
||||||
let (slot, fec_set_index) = match shreds.first() {
|
let (slot, fec_set_index) = match shreds.first() {
|
||||||
None => return Ok(Vec::default()),
|
None => return Ok(Vec::default()),
|
||||||
Some(shred) => (shred.slot(), shred.common_header.fec_set_index),
|
Some(shred) => (shred.slot(), shred.fec_set_index()),
|
||||||
};
|
};
|
||||||
let (num_data_shreds, num_coding_shreds) = match shreds.iter().find(|shred| shred.is_code())
|
let (num_data_shreds, num_coding_shreds) = match shreds.iter().find(|shred| shred.is_code())
|
||||||
{
|
{
|
||||||
@ -939,9 +948,9 @@ impl Shredder {
|
|||||||
shred.coding_header.num_coding_shreds,
|
shred.coding_header.num_coding_shreds,
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
debug_assert!(shreds.iter().all(
|
debug_assert!(shreds
|
||||||
|shred| shred.slot() == slot && shred.common_header.fec_set_index == fec_set_index
|
.iter()
|
||||||
));
|
.all(|shred| shred.slot() == slot && shred.fec_set_index() == fec_set_index));
|
||||||
debug_assert!(shreds
|
debug_assert!(shreds
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|shred| shred.is_code())
|
.filter(|shred| shred.is_code())
|
||||||
@ -1784,7 +1793,7 @@ pub mod tests {
|
|||||||
let max_per_block = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
|
let max_per_block = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
|
||||||
data_shreds.iter().enumerate().for_each(|(i, s)| {
|
data_shreds.iter().enumerate().for_each(|(i, s)| {
|
||||||
let expected_fec_set_index = start_index + ((i / max_per_block) * max_per_block) as u32;
|
let expected_fec_set_index = start_index + ((i / max_per_block) * max_per_block) as u32;
|
||||||
assert_eq!(s.common_header.fec_set_index, expected_fec_set_index);
|
assert_eq!(s.fec_set_index(), expected_fec_set_index);
|
||||||
});
|
});
|
||||||
|
|
||||||
coding_shreds.iter().enumerate().for_each(|(i, s)| {
|
coding_shreds.iter().enumerate().for_each(|(i, s)| {
|
||||||
@ -1792,7 +1801,7 @@ pub mod tests {
|
|||||||
while expected_fec_set_index as usize > data_shreds.len() {
|
while expected_fec_set_index as usize > data_shreds.len() {
|
||||||
expected_fec_set_index -= max_per_block as u32;
|
expected_fec_set_index -= max_per_block as u32;
|
||||||
}
|
}
|
||||||
assert_eq!(s.common_header.fec_set_index, expected_fec_set_index);
|
assert_eq!(s.fec_set_index(), expected_fec_set_index);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user