From 33052c1dd2eeab8c64f780730477a9346f6b6239 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Tue, 15 Oct 2019 20:48:45 -0700 Subject: [PATCH] Cleanup shred header structures (#6378) automerge --- core/benches/shredder.rs | 10 +-- core/src/blocktree.rs | 22 ++--- core/src/cluster_info.rs | 10 +-- core/src/shred.rs | 189 +++++++++++++++++---------------------- 4 files changed, 103 insertions(+), 128 deletions(-) diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 4f18e15f3f..7a496397f6 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -6,7 +6,7 @@ use solana_core::entry::create_ticks; use solana_core::entry::Entry; use solana_core::shred::{ max_entries_per_n_shred, max_ticks_per_n_shreds, Shred, Shredder, RECOMMENDED_FEC_RATE, - SIZE_OF_DATA_SHRED_HEADER, + SIZE_OF_SHRED_HEADER, }; use solana_core::test_tx; use solana_sdk::hash::Hash; @@ -31,7 +31,7 @@ fn make_large_unchained_entries(txs_per_entry: u64, num_entries: u64) -> Vec= num_coding should fail { let mut coding_shred = Shred::new_empty_from_header(shred.clone()); - coding_shred.headers.common_header.num_coding_shreds = - coding_shred.headers.common_header.position; + coding_shred.headers.coding_header.num_coding_shreds = + coding_shred.headers.coding_header.position; let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); assert!(!Blocktree::should_insert_coding_shred( &coding_shred, @@ -3244,9 +3244,9 @@ pub mod tests { // has index > u32::MAX should fail { let mut coding_shred = Shred::new_empty_from_header(shred.clone()); - coding_shred.headers.common_header.num_coding_shreds = 3; - coding_shred.headers.common_header.coding_header.index = std::u32::MAX - 1; - coding_shred.headers.common_header.position = 0; + coding_shred.headers.coding_header.num_coding_shreds = 3; + coding_shred.headers.coding_header.common_header.index = std::u32::MAX - 1; + coding_shred.headers.coding_header.position = 0; let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); assert!(!Blocktree::should_insert_coding_shred( &coding_shred, @@ -3255,7 +3255,7 @@ pub mod tests { )); // Decreasing the number of num_coding_shreds will put it within the allowed limit - coding_shred.headers.common_header.num_coding_shreds = 2; + coding_shred.headers.coding_header.num_coding_shreds = 2; assert!(Blocktree::should_insert_coding_shred( &coding_shred, index.coding(), diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 3e56aeab1a..417e3e66af 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1787,7 +1787,7 @@ mod tests { use crate::repair_service::RepairType; use crate::result::Error; use crate::shred::max_ticks_per_n_shreds; - use crate::shred::{DataShredHeader, Shred}; + use crate::shred::{Shred, ShredHeader}; use crate::test_tx::test_tx; use rayon::prelude::*; use solana_sdk::hash::Hash; @@ -1947,10 +1947,10 @@ mod tests { 0, ); assert!(rv.is_empty()); - let mut data_shred = DataShredHeader::default(); - data_shred.data_header.slot = 2; - data_shred.parent_offset = 1; - data_shred.data_header.index = 1; + let mut data_shred = ShredHeader::default(); + data_shred.data_header.common_header.slot = 2; + data_shred.data_header.parent_offset = 1; + data_shred.data_header.common_header.index = 1; let shred_info = Shred::new_empty_from_header(data_shred); blocktree diff --git a/core/src/shred.rs b/core/src/shred.rs index 53d484fd93..35e7d45d9f 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -27,14 +27,12 @@ lazy_static! { { serialized_size(&CodingShredHeader::default()).unwrap() as usize }; pub static ref SIZE_OF_DATA_SHRED_HEADER: usize = { serialized_size(&DataShredHeader::default()).unwrap() as usize }; - pub static ref SIZE_OF_COMMON_SHRED_HEADER: usize = - { serialized_size(&ShredCommonHeader::default()).unwrap() as usize }; + pub static ref SIZE_OF_SHRED_HEADER: usize = + { serialized_size(&ShredHeader::default()).unwrap() as usize }; static ref SIZE_OF_SIGNATURE: usize = { bincode::serialized_size(&Signature::default()).unwrap() as usize }; - pub static ref SIZE_OF_SHRED_TYPE: usize = { bincode::serialized_size(&0u8).unwrap() as usize }; - pub static ref SIZE_OF_PARENT_OFFSET: usize = - { bincode::serialized_size(&0u16).unwrap() as usize }; - pub static ref SIZE_OF_FLAGS: usize = { bincode::serialized_size(&0u8).unwrap() as usize }; + pub static ref SIZE_OF_SHRED_TYPE: usize = + { bincode::serialized_size(&ShredType(DATA_SHRED)).unwrap() as usize }; } thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() @@ -56,7 +54,10 @@ pub const RECOMMENDED_FEC_RATE: f32 = 0.25; const LAST_SHRED_IN_SLOT: u8 = 0b0000_0001; const DATA_COMPLETE_SHRED: u8 = 0b0000_0010; -/// A common header that is present at start of every shred +#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] +pub struct ShredType(u8); + +/// A common header that is present in data and code shred headers #[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)] pub struct ShredCommonHeader { pub signature: Signature, @@ -64,59 +65,49 @@ pub struct ShredCommonHeader { pub index: u32, } -/// A common header that is present at start of every data shred -#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] +/// The data shred header has parent offset and flags +#[derive(Serialize, Clone, Default, Deserialize, PartialEq, Debug)] pub struct DataShredHeader { - pub common_header: CodingShredHeader, - pub data_header: ShredCommonHeader, + pub common_header: ShredCommonHeader, pub parent_offset: u16, pub flags: u8, } /// The coding shred header has FEC information -#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] +#[derive(Serialize, Clone, Default, Deserialize, PartialEq, Debug)] pub struct CodingShredHeader { - pub shred_type: u8, - pub coding_header: ShredCommonHeader, + pub common_header: ShredCommonHeader, pub num_data_shreds: u16, pub num_coding_shreds: u16, pub position: u16, } -impl Default for DataShredHeader { - fn default() -> Self { - DataShredHeader { - common_header: CodingShredHeader { - shred_type: DATA_SHRED, - ..CodingShredHeader::default() - }, - data_header: ShredCommonHeader::default(), - parent_offset: 0, - flags: 0, - } - } +/// A common header that is present at start of every shred +#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] +pub struct ShredHeader { + pub shred_type: ShredType, + pub coding_header: CodingShredHeader, + pub data_header: DataShredHeader, } -impl Default for CodingShredHeader { +impl Default for ShredHeader { fn default() -> Self { - CodingShredHeader { - shred_type: CODING_SHRED, - coding_header: ShredCommonHeader::default(), - num_data_shreds: 0, - num_coding_shreds: 0, - position: 0, + ShredHeader { + shred_type: ShredType(DATA_SHRED), + coding_header: CodingShredHeader::default(), + data_header: DataShredHeader::default(), } } } #[derive(Clone, Debug, PartialEq)] pub struct Shred { - pub headers: DataShredHeader, + pub headers: ShredHeader, pub payload: Vec, } impl Shred { - fn new(header: DataShredHeader, shred_buf: Vec) -> Self { + fn new(header: ShredHeader, shred_buf: Vec) -> Self { Shred { headers: header, payload: shred_buf, @@ -132,74 +123,57 @@ impl Shred { is_last_in_slot: bool, ) -> Self { let mut shred_buf = vec![0; PACKET_DATA_SIZE]; - let mut header = DataShredHeader::default(); - header.data_header.slot = slot; - header.data_header.index = index; - header.parent_offset = parent_offset; - header.flags = 0; + let mut header = ShredHeader::default(); + header.data_header.common_header.slot = slot; + header.data_header.common_header.index = index; + header.data_header.parent_offset = parent_offset; + header.data_header.flags = 0; if is_last_data { - header.flags |= DATA_COMPLETE_SHRED + header.data_header.flags |= DATA_COMPLETE_SHRED } if is_last_in_slot { - header.flags |= LAST_SHRED_IN_SLOT + header.data_header.flags |= LAST_SHRED_IN_SLOT } if let Some(data) = data { - bincode::serialize_into(&mut shred_buf[..*SIZE_OF_DATA_SHRED_HEADER], &header) + bincode::serialize_into(&mut shred_buf[..*SIZE_OF_SHRED_HEADER], &header) .expect("Failed to write header into shred buffer"); - shred_buf[*SIZE_OF_DATA_SHRED_HEADER..*SIZE_OF_DATA_SHRED_HEADER + data.len()] + shred_buf[*SIZE_OF_SHRED_HEADER..*SIZE_OF_SHRED_HEADER + data.len()] .clone_from_slice(data); } Self::new(header, shred_buf) } - fn deserialize_obj<'de, T>(index: &mut usize, size: usize, buf: &'de [u8]) -> result::Result - where - T: Deserialize<'de>, - { - let ret = bincode::deserialize(&buf[*index..*index + size])?; - *index += size; - Ok(ret) - } pub fn new_from_serialized_shred(shred_buf: Vec) -> result::Result { - let shred_type: u8 = bincode::deserialize(&shred_buf[..*SIZE_OF_SHRED_TYPE])?; - let header = if shred_type == CODING_SHRED { - let end = *SIZE_OF_CODING_SHRED_HEADER; - let mut header = DataShredHeader::default(); - header.common_header = bincode::deserialize(&shred_buf[..end])?; + let shred_type: ShredType = bincode::deserialize(&shred_buf[..*SIZE_OF_SHRED_TYPE])?; + let mut header = if shred_type == ShredType(CODING_SHRED) { + let start = *SIZE_OF_SHRED_TYPE; + let end = start + *SIZE_OF_CODING_SHRED_HEADER; + let mut header = ShredHeader::default(); + header.coding_header = bincode::deserialize(&shred_buf[start..end])?; + header + } else if shred_type == ShredType(DATA_SHRED) { + let start = *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE; + let end = start + *SIZE_OF_DATA_SHRED_HEADER; + let mut header = ShredHeader::default(); + header.data_header = bincode::deserialize(&shred_buf[start..end])?; header - } else if shred_type == DATA_SHRED { - let mut start = *SIZE_OF_CODING_SHRED_HEADER; - let common_hdr: ShredCommonHeader = - Self::deserialize_obj(&mut start, *SIZE_OF_COMMON_SHRED_HEADER, &shred_buf)?; - - let parent_offset: u16 = - Self::deserialize_obj(&mut start, *SIZE_OF_PARENT_OFFSET, &shred_buf)?; - - let flags: u8 = Self::deserialize_obj(&mut start, *SIZE_OF_FLAGS, &shred_buf)?; - let mut hdr = DataShredHeader { - common_header: CodingShredHeader::default(), - data_header: common_hdr, - parent_offset, - flags, - }; - hdr.common_header.shred_type = shred_type; - hdr } else { return Err(Error::BlocktreeError(BlocktreeError::InvalidShredData( Box::new(bincode::ErrorKind::Custom("Invalid shred type".to_string())), ))); }; + header.shred_type = shred_type; Ok(Self::new(header, shred_buf)) } - pub fn new_empty_from_header(headers: DataShredHeader) -> Self { + pub fn new_empty_from_header(headers: ShredHeader) -> Self { let mut payload = vec![0; PACKET_DATA_SIZE]; - let mut wr = io::Cursor::new(&mut payload[..*SIZE_OF_DATA_SHRED_HEADER]); + let mut wr = io::Cursor::new(&mut payload[..*SIZE_OF_SHRED_HEADER]); bincode::serialize_into(&mut wr, &headers).expect("Failed to serialize shred"); Shred { headers, payload } } @@ -207,23 +181,23 @@ impl Shred { pub fn new_empty_data_shred() -> Self { let mut payload = vec![0; PACKET_DATA_SIZE]; payload[0] = DATA_SHRED; - let headers = DataShredHeader::default(); + let headers = ShredHeader::default(); Shred { headers, payload } } pub fn header(&self) -> &ShredCommonHeader { if self.is_data() { - &self.headers.data_header + &self.headers.data_header.common_header } else { - &self.headers.common_header.coding_header + &self.headers.coding_header.common_header } } pub fn header_mut(&mut self) -> &mut ShredCommonHeader { if self.is_data() { - &mut self.headers.data_header + &mut self.headers.data_header.common_header } else { - &mut self.headers.common_header.coding_header + &mut self.headers.coding_header.common_header } } @@ -233,7 +207,8 @@ impl Shred { pub fn parent(&self) -> u64 { if self.is_data() { - self.headers.data_header.slot - u64::from(self.headers.parent_offset) + self.headers.data_header.common_header.slot + - u64::from(self.headers.data_header.parent_offset) } else { std::u64::MAX } @@ -268,12 +243,12 @@ impl Shred { } pub fn is_data(&self) -> bool { - self.headers.common_header.shred_type == DATA_SHRED + self.headers.shred_type == ShredType(DATA_SHRED) } pub fn last_in_slot(&self) -> bool { if self.is_data() { - self.headers.flags & LAST_SHRED_IN_SLOT == LAST_SHRED_IN_SLOT + self.headers.data_header.flags & LAST_SHRED_IN_SLOT == LAST_SHRED_IN_SLOT } else { false } @@ -283,13 +258,13 @@ impl Shred { /// Use this only for test code which doesn't care about actual shred pub fn set_last_in_slot(&mut self) { if self.is_data() { - self.headers.flags |= LAST_SHRED_IN_SLOT + self.headers.data_header.flags |= LAST_SHRED_IN_SLOT } } pub fn data_complete(&self) -> bool { if self.is_data() { - self.headers.flags & DATA_COMPLETE_SHRED == DATA_COMPLETE_SHRED + self.headers.data_header.flags & DATA_COMPLETE_SHRED == DATA_COMPLETE_SHRED } else { false } @@ -297,7 +272,7 @@ impl Shred { pub fn coding_params(&self) -> Option<(u16, u16, u16)> { if !self.is_data() { - let header = &self.headers.common_header; + let header = &self.headers.coding_header; Some(( header.num_data_shreds, header.num_coding_shreds, @@ -310,7 +285,7 @@ impl Shred { pub fn verify(&self, pubkey: &Pubkey) -> bool { let signed_payload_offset = if self.is_data() { - *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE } else { *SIZE_OF_SHRED_TYPE } + *SIZE_OF_SIGNATURE; @@ -373,7 +348,7 @@ impl Shredder { bincode::serialize(entries).expect("Expect to serialize all entries"); let serialize_time = now.elapsed().as_millis(); - let no_header_size = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER; + let no_header_size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER; let num_shreds = (serialized_shreds.len() + no_header_size - 1) / no_header_size; let last_shred_index = next_shred_index + num_shreds as u32 - 1; @@ -406,7 +381,7 @@ impl Shredder { Shredder::sign_shred( &self.keypair, &mut shred, - *SIZE_OF_CODING_SHRED_HEADER, + *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE, ); shred }) @@ -465,14 +440,14 @@ impl Shredder { num_data: usize, num_code: usize, position: usize, - ) -> DataShredHeader { - let mut header = DataShredHeader::default(); - header.common_header.shred_type = CODING_SHRED; - header.common_header.coding_header.index = index; - header.common_header.coding_header.slot = slot; - header.common_header.num_coding_shreds = num_code as u16; - header.common_header.num_data_shreds = num_data as u16; - header.common_header.position = position as u16; + ) -> ShredHeader { + let mut header = ShredHeader::default(); + header.shred_type = ShredType(CODING_SHRED); + header.coding_header.common_header.index = index; + header.coding_header.common_header.slot = slot; + header.coding_header.num_coding_shreds = num_code as u16; + header.coding_header.num_data_shreds = num_data as u16; + header.coding_header.position = position as u16; header } @@ -492,7 +467,7 @@ impl Shredder { let start_index = data_shred_batch[0].header().index; // All information after coding shred field in a data shred is encoded - let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER; + let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE; let data_ptrs: Vec<_> = data_shred_batch .iter() .map(|data| &data.payload[coding_block_offset..]) @@ -588,7 +563,7 @@ impl Shredder { let fec_set_size = num_data + num_coding; if num_coding > 0 && shreds.len() < fec_set_size { - let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER; + let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE; // Let's try recovering missing shreds using erasure let mut present = &mut vec![true; fec_set_size]; @@ -697,7 +672,7 @@ impl Shredder { data_shred_bufs[..num_data] .iter() .flat_map(|data| { - let offset = *SIZE_OF_DATA_SHRED_HEADER; + let offset = *SIZE_OF_SHRED_HEADER; data[offset as usize..].iter() }) .cloned() @@ -711,7 +686,7 @@ pub fn max_ticks_per_n_shreds(num_shreds: u64) -> u64 { } pub fn max_entries_per_n_shred(entry: &Entry, num_shreds: u64) -> u64 { - let shred_data_size = (PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER) as u64; + let shred_data_size = (PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER) as u64; let vec_size = bincode::serialized_size(&vec![entry]).unwrap(); let entry_size = bincode::serialized_size(entry).unwrap(); let count_size = vec_size - entry_size; @@ -794,7 +769,7 @@ pub mod tests { .collect(); let size = serialized_size(&entries).unwrap(); - let no_header_size = (PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER) as u64; + let no_header_size = (PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER) as u64; let num_expected_data_shreds = (size + no_header_size - 1) / no_header_size; let num_expected_coding_shreds = Shredder::calculate_num_coding_shreds(num_expected_data_shreds as f32, fec_rate); @@ -807,8 +782,8 @@ pub mod tests { let mut data_shred_indexes = HashSet::new(); let mut coding_shred_indexes = HashSet::new(); for shred in data_shreds.iter() { - assert_eq!(shred.headers.common_header.shred_type, DATA_SHRED); - let index = shred.headers.data_header.index; + assert_eq!(shred.headers.shred_type, ShredType(DATA_SHRED)); + let index = shred.headers.data_header.common_header.index; let is_last = index as u64 == num_expected_data_shreds - 1; verify_test_data_shred( shred, @@ -825,8 +800,8 @@ pub mod tests { } for shred in coding_shreds.iter() { - let index = shred.headers.data_header.index; - assert_eq!(shred.headers.common_header.shred_type, CODING_SHRED); + let index = shred.headers.data_header.common_header.index; + assert_eq!(shred.headers.shred_type, ShredType(CODING_SHRED)); verify_test_code_shred(shred, index, slot, &keypair.pubkey(), true); assert!(!coding_shred_indexes.contains(&index)); coding_shred_indexes.insert(index);