adds methods to obtain shreds' erasure coded block and index (#21325)
This commit is contained in:
@ -460,6 +460,44 @@ impl Shred {
|
|||||||
self.common_header.version
|
self.common_header.version
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns the block index within the erasure coding set.
|
||||||
|
fn erasure_block_index(&self) -> Option<usize> {
|
||||||
|
let fec_set_index = self.common_header.fec_set_index;
|
||||||
|
let index = self.index().checked_sub(fec_set_index)? as usize;
|
||||||
|
match self.shred_type() {
|
||||||
|
ShredType::Data => Some(index),
|
||||||
|
ShredType::Code => {
|
||||||
|
let num_data_shreds = self.coding_header.num_data_shreds as usize;
|
||||||
|
let num_coding_shreds = self.coding_header.num_coding_shreds as usize;
|
||||||
|
let fec_set_size = num_data_shreds.checked_add(num_coding_shreds)?;
|
||||||
|
let index = index.checked_add(num_data_shreds)?;
|
||||||
|
(index < fec_set_size).then(|| index)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns the portion of the shred's payload which is erasure coded.
|
||||||
|
fn erasure_block(self) -> Vec<u8> {
|
||||||
|
let shred_type = self.shred_type();
|
||||||
|
let mut block = self.payload;
|
||||||
|
match shred_type {
|
||||||
|
ShredType::Data => {
|
||||||
|
// SIZE_OF_CODING_SHRED_HEADERS bytes at the end of data shreds
|
||||||
|
// is never used and is not part of erasure coding.
|
||||||
|
let size = SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS;
|
||||||
|
block.resize(size, 0u8);
|
||||||
|
}
|
||||||
|
ShredType::Code => {
|
||||||
|
// SIZE_OF_CODING_SHRED_HEADERS bytes at the begining of the
|
||||||
|
// coding shreds contains the header and is not part of erasure
|
||||||
|
// coding.
|
||||||
|
let offset = SIZE_OF_CODING_SHRED_HEADERS.min(block.len());
|
||||||
|
block.drain(..offset);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
block
|
||||||
|
}
|
||||||
|
|
||||||
pub fn set_index(&mut self, index: u32) {
|
pub fn set_index(&mut self, index: u32) {
|
||||||
self.common_header.index = index;
|
self.common_header.index = index;
|
||||||
Self::serialize_obj_into(
|
Self::serialize_obj_into(
|
||||||
@ -869,7 +907,6 @@ impl Shredder {
|
|||||||
));
|
));
|
||||||
let num_data_shreds = num_data_shreds as usize;
|
let num_data_shreds = num_data_shreds as usize;
|
||||||
let num_coding_shreds = num_coding_shreds as usize;
|
let num_coding_shreds = num_coding_shreds as usize;
|
||||||
let fec_set_index = fec_set_index as usize;
|
|
||||||
let fec_set_size = num_data_shreds + num_coding_shreds;
|
let fec_set_size = num_data_shreds + num_coding_shreds;
|
||||||
if num_coding_shreds == 0 || shreds.len() >= fec_set_size {
|
if num_coding_shreds == 0 || shreds.len() >= fec_set_size {
|
||||||
return Ok(Vec::default());
|
return Ok(Vec::default());
|
||||||
@ -878,42 +915,28 @@ impl Shredder {
|
|||||||
let mut mask = vec![false; num_data_shreds];
|
let mut mask = vec![false; num_data_shreds];
|
||||||
let mut blocks = vec![None; fec_set_size];
|
let mut blocks = vec![None; fec_set_size];
|
||||||
for shred in shreds {
|
for shred in shreds {
|
||||||
if (shred.index() as usize) < fec_set_index {
|
let index = match shred.erasure_block_index() {
|
||||||
return Err(InvalidIndex);
|
Some(index) if index < fec_set_size => index,
|
||||||
}
|
_ => return Err(InvalidIndex),
|
||||||
let shred_is_data = shred.is_data();
|
|
||||||
let offset = if shred_is_data { 0 } else { num_data_shreds };
|
|
||||||
let index = offset + shred.index() as usize - fec_set_index;
|
|
||||||
let mut block = shred.payload;
|
|
||||||
if shred_is_data {
|
|
||||||
if index >= num_data_shreds {
|
|
||||||
return Err(InvalidIndex);
|
|
||||||
}
|
|
||||||
mask[index] = true;
|
|
||||||
// SIZE_OF_CODING_SHRED_HEADERS bytes at the end of data shreds
|
|
||||||
// is never used and is not part of erasure coding.
|
|
||||||
block.resize(SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS, 0u8);
|
|
||||||
} else {
|
|
||||||
if index >= fec_set_size {
|
|
||||||
return Err(InvalidIndex);
|
|
||||||
}
|
|
||||||
// SIZE_OF_CODING_SHRED_HEADERS bytes at the begining of the
|
|
||||||
// coding shreds contains the header and is not part of erasure
|
|
||||||
// coding.
|
|
||||||
block.drain(..SIZE_OF_CODING_SHRED_HEADERS);
|
|
||||||
};
|
};
|
||||||
blocks[index] = Some(block);
|
blocks[index] = Some(shred.erasure_block());
|
||||||
|
if index < num_data_shreds {
|
||||||
|
mask[index] = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Session::new(num_data_shreds, num_coding_shreds)?.decode_blocks(&mut blocks)?;
|
Session::new(num_data_shreds, num_coding_shreds)?.decode_blocks(&mut blocks)?;
|
||||||
let data_shred_indices = fec_set_index..fec_set_index + num_data_shreds;
|
|
||||||
let recovered_data = mask
|
let recovered_data = mask
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.zip(blocks)
|
.zip(blocks)
|
||||||
.filter(|(mask, _)| !mask)
|
.filter(|(mask, _)| !mask)
|
||||||
.filter_map(|(_, block)| Shred::new_from_serialized_shred(block?).ok())
|
.filter_map(|(_, block)| Shred::new_from_serialized_shred(block?).ok())
|
||||||
.filter(|shred| {
|
.filter(|shred| {
|
||||||
let index = shred.index() as usize;
|
shred.slot() == slot
|
||||||
shred.slot() == slot && data_shred_indices.contains(&index)
|
&& shred.is_data()
|
||||||
|
&& match shred.erasure_block_index() {
|
||||||
|
Some(index) => index < num_data_shreds,
|
||||||
|
None => false,
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
Ok(recovered_data)
|
Ok(recovered_data)
|
||||||
|
Reference in New Issue
Block a user