From 385086359c10255c79b72ac30f66f60e53b496e2 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Thu, 12 Sep 2019 10:10:25 -0700 Subject: [PATCH] Reduce serializations/deserializations of shreds (#5879) --- core/src/blocktree.rs | 25 +-- core/src/broadcast_stage/broadcast_utils.rs | 9 +- core/src/shred.rs | 203 +++++++++++--------- core/src/window_service.rs | 6 +- 4 files changed, 124 insertions(+), 119 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index e27085cb33..b31d7f0a42 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -890,11 +890,8 @@ impl Blocktree { parent_slot = current_slot - 1; remaining_ticks_in_slot = ticks_per_slot; shredder.finalize_slot(); - let shreds: Vec = shredder - .shreds - .iter() - .map(|s| bincode::deserialize(s).unwrap()) - .collect(); + let shreds: Vec = + shredder.shred_tuples.into_iter().map(|(s, _)| s).collect(); all_shreds.extend(shreds); shredder = Shredder::new(current_slot, parent_slot, 0.0, &Arc::new(Keypair::new()), 0) @@ -917,11 +914,7 @@ impl Blocktree { if is_full_slot && remaining_ticks_in_slot != 0 { shredder.finalize_slot(); } - let shreds: Vec = shredder - .shreds - .iter() - .map(|s| bincode::deserialize(s).unwrap()) - .collect(); + let shreds: Vec = shredder.shred_tuples.into_iter().map(|(s, _)| s).collect(); all_shreds.extend(shreds); let num_shreds = all_shreds.len(); @@ -1630,11 +1623,7 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Re bincode::serialize_into(&mut shredder, &entries) .expect("Expect to write all entries to shreds"); shredder.finalize_slot(); - let shreds: Vec = shredder - .shreds - .iter() - .map(|s| bincode::deserialize(s).unwrap()) - .collect(); + let shreds: Vec = shredder.shred_tuples.into_iter().map(|(s, _)| s).collect(); blocktree.insert_shreds(shreds, None)?; blocktree.set_roots(&[0])?; @@ -1709,11 +1698,7 @@ pub fn entries_to_test_shreds( shredder.finalize_data(); } - let shreds: Vec = shredder - .shreds - .iter() - .map(|s| bincode::deserialize(s).unwrap()) - .collect(); + let shreds: Vec = shredder.shred_tuples.into_iter().map(|(s, _)| s).collect(); shreds } diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index 5f26f38805..627077d25f 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -101,16 +101,13 @@ pub(super) fn entries_to_shreds( shredder.finalize_data(); } - let mut shreds: Vec = shredder - .shreds - .iter() - .map(|s| bincode::deserialize(s).unwrap()) - .collect(); + let (mut shreds, mut shred_bufs): (Vec, Vec>) = + shredder.shred_tuples.into_iter().unzip(); trace!("Inserting {:?} shreds in blocktree", shreds.len()); latest_shred_index = u64::from(shredder.index); all_shreds.append(&mut shreds); - all_shred_bufs.append(&mut shredder.shreds); + all_shred_bufs.append(&mut shred_bufs); }); (all_shreds, all_shred_bufs, latest_shred_index) } diff --git a/core/src/shred.rs b/core/src/shred.rs index d41c146f54..a1bc7d9099 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -95,6 +95,16 @@ impl Shred { } } + pub fn set_signature(&mut self, sig: Signature) { + match self { + Shred::FirstInSlot(s) + | Shred::Data(s) + | Shred::DataComplete(s) + | Shred::LastInSlot(s) => s.header.common_header.signature = sig, + Shred::Coding(s) => s.header.common_header.signature = sig, + }; + } + pub fn seed(&self) -> [u8; 32] { let mut seed = [0; 32]; let seed_len = seed.len(); @@ -264,7 +274,7 @@ pub struct Shredder { parent_offset: u16, fec_rate: f32, signer: Arc, - pub shreds: Vec>, + pub shred_tuples: Vec<(Shred, Vec)>, fec_set_shred_start: usize, active_shred: Option, active_offset: usize, @@ -371,20 +381,28 @@ impl Shredder { /// Serialize the payload, sign it and store the signature in the shred /// Store the signed shred in the vector of shreds - fn finalize_shred(&mut self, mut shred: Vec, signature_offset: usize) { + fn finalize_shred( + &mut self, + mut shred: Shred, + mut shred_buf: Vec, + signature_offset: usize, + ) { let data_offset = signature_offset + bincode::serialized_size(&Signature::default()).unwrap() as usize; - let signature = bincode::serialize(&self.signer.sign_message(&shred[data_offset..])) - .expect("Failed to generate serialized signature"); - shred[signature_offset..signature_offset + signature.len()].copy_from_slice(&signature); - self.shreds.push(shred); + let signature = self.signer.sign_message(&shred_buf[data_offset..]); + let serialized_signature = + bincode::serialize(&signature).expect("Failed to generate serialized signature"); + shred.set_signature(signature); + shred_buf[signature_offset..signature_offset + serialized_signature.len()] + .copy_from_slice(&serialized_signature); + self.shred_tuples.push((shred, shred_buf)); } /// Finalize a data shred. Update the shred index for the next shred fn finalize_data_shred(&mut self, shred: Shred) { let data = bincode::serialize(&shred).expect("Failed to serialize shred"); - self.finalize_shred(data, CodingShred::overhead()); + self.finalize_shred(shred, data, CodingShred::overhead()); self.active_offset = 0; self.index += 1; } @@ -425,9 +443,9 @@ impl Shredder { // All information after "reserved" field (coding shred header) in a data shred is encoded let coding_block_offset = CodingShred::overhead(); - let data_ptrs: Vec<_> = self.shreds[self.fec_set_shred_start..] + let data_ptrs: Vec<_> = self.shred_tuples[self.fec_set_shred_start..] .iter() - .map(|data| &data[coding_block_offset..]) + .map(|(_, data)| &data[coding_block_offset..]) .collect(); // Create empty coding shreds, with correctly populated headers @@ -462,11 +480,12 @@ impl Shredder { as usize; // Finalize the coding blocks (sign and append to the shred list) - coding_shreds - .into_iter() - .for_each(|code| self.finalize_shred(code, coding_header_offset)); + coding_shreds.into_iter().for_each(|code| { + let shred: Shred = bincode::deserialize(&code).unwrap(); + self.finalize_shred(shred, code, coding_header_offset) + }); self.fec_set_index = self.index; - self.fec_set_shred_start = self.shreds.len(); + self.fec_set_shred_start = self.shred_tuples.len(); } } @@ -733,7 +752,7 @@ mod tests { let mut shredder = Shredder::new(slot, slot - 5, 0.0, &keypair, 0).expect("Failed in creating shredder"); - assert!(shredder.shreds.is_empty()); + assert!(shredder.shred_tuples.is_empty()); assert_eq!(shredder.active_shred, None); assert_eq!(shredder.active_offset, 0); @@ -743,13 +762,13 @@ mod tests { // Test0: Write some data to shred. Not enough to create a signed shred let data: Vec = (0..25).collect(); assert_eq!(shredder.write(&data).unwrap(), data.len()); - assert!(shredder.shreds.is_empty()); + assert!(shredder.shred_tuples.is_empty()); assert_ne!(shredder.active_shred, None); assert_eq!(shredder.active_offset, 25); // Test1: Write some more data to shred. Not enough to create a signed shred assert_eq!(shredder.write(&data).unwrap(), data.len()); - assert!(shredder.shreds.is_empty()); + assert!(shredder.shred_tuples.is_empty()); assert_eq!(shredder.active_offset, 50); // Test2: Write enough data to create a shred (> PACKET_DATA_SIZE) @@ -758,14 +777,14 @@ mod tests { let offset = shredder.write(&data).unwrap(); assert_ne!(offset, data.len()); // Assert that we have atleast one signed shred - assert!(!shredder.shreds.is_empty()); + assert!(!shredder.shred_tuples.is_empty()); // Assert that a new active shred was also created assert_ne!(shredder.active_shred, None); // Assert that the new active shred was not populated assert_eq!(shredder.active_offset, 0); // Test3: Assert that the first shred in slot was created (since we gave a parent to shredder) - let shred = shredder.shreds.pop().unwrap(); + let (_, shred) = shredder.shred_tuples.pop().unwrap(); assert_eq!(shred.len(), PACKET_DATA_SIZE); info!("Len: {}", shred.len()); info!("{:?}", shred); @@ -786,17 +805,17 @@ mod tests { shredder.write(&data[offset..]).unwrap(); // It shouldn't generate a signed shred - assert!(shredder.shreds.is_empty()); + assert!(shredder.shred_tuples.is_empty()); // Test6: Let's finalize the FEC block. That should result in the current shred to morph into // a signed LastInFECBlock shred shredder.finalize_data(); // We should have a new signed shred - assert!(!shredder.shreds.is_empty()); + assert!(!shredder.shred_tuples.is_empty()); // Must be Last in FEC Set - let shred = shredder.shreds.pop().unwrap(); + let (_, shred) = shredder.shred_tuples.pop().unwrap(); assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); @@ -816,9 +835,9 @@ mod tests { assert_ne!(offset, data.len()); // We should have a new signed shred - assert!(!shredder.shreds.is_empty()); + assert!(!shredder.shred_tuples.is_empty()); - let shred = shredder.shreds.pop().unwrap(); + let (_, shred) = shredder.shred_tuples.pop().unwrap(); assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); @@ -833,10 +852,10 @@ mod tests { assert_ne!(offset, data.len()); // We should have a new signed shred - assert!(!shredder.shreds.is_empty()); + assert!(!shredder.shred_tuples.is_empty()); // Must be a Data shred - let shred = shredder.shreds.pop().unwrap(); + let (_, shred) = shredder.shred_tuples.pop().unwrap(); assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); @@ -854,10 +873,10 @@ mod tests { shredder.finalize_slot(); // We should have a new signed shred - assert!(!shredder.shreds.is_empty()); + assert!(!shredder.shred_tuples.is_empty()); // Must be LastInSlot - let shred = shredder.shreds.pop().unwrap(); + let (_, shred) = shredder.shred_tuples.pop().unwrap(); assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); @@ -876,7 +895,7 @@ mod tests { let mut shredder = Shredder::new(slot, slot - 5, 0.0, &keypair, 0).expect("Failed in creating shredder"); - assert!(shredder.shreds.is_empty()); + assert!(shredder.shred_tuples.is_empty()); assert_eq!(shredder.active_shred, None); assert_eq!(shredder.active_offset, 0); @@ -885,14 +904,14 @@ mod tests { let _ = shredder.write(&data).unwrap(); // We should have 0 shreds now - assert_eq!(shredder.shreds.len(), 0); + assert_eq!(shredder.shred_tuples.len(), 0); shredder.finalize_data(); // We should have 2 shreds now (FirstInSlot, and LastInFECBlock) - assert_eq!(shredder.shreds.len(), 2); + assert_eq!(shredder.shred_tuples.len(), 2); - let shred = shredder.shreds.remove(0); + let (_, shred) = shredder.shred_tuples.remove(0); assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); assert_matches!(deserialized_shred, Shred::FirstInSlot(_)); @@ -901,7 +920,7 @@ mod tests { assert_eq!(deserialized_shred.parent(), slot - 5); assert!(deserialized_shred.verify(&keypair.pubkey())); - let shred = shredder.shreds.remove(0); + let (_, shred) = shredder.shred_tuples.remove(0); assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); assert_matches!(deserialized_shred, Shred::DataComplete(_)); @@ -913,7 +932,7 @@ mod tests { let mut shredder = Shredder::new(0x123456789abcdef0, slot - 5, 0.0, &keypair, 2) .expect("Failed in creating shredder"); - assert!(shredder.shreds.is_empty()); + assert!(shredder.shred_tuples.is_empty()); assert_eq!(shredder.active_shred, None); assert_eq!(shredder.active_offset, 0); @@ -922,13 +941,13 @@ mod tests { let _ = shredder.write(&data).unwrap(); // We should have 0 shreds now - assert_eq!(shredder.shreds.len(), 0); + assert_eq!(shredder.shred_tuples.len(), 0); shredder.finalize_data(); // We should have 1 shred now (LastInFECBlock) - assert_eq!(shredder.shreds.len(), 1); - let shred = shredder.shreds.remove(0); + assert_eq!(shredder.shred_tuples.len(), 1); + let (_, shred) = shredder.shred_tuples.remove(0); assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); assert_matches!(deserialized_shred, Shred::DataComplete(_)); @@ -949,7 +968,7 @@ mod tests { let mut shredder = Shredder::new(0x123456789abcdef0, slot - 5, 1.0, &keypair, 0) .expect("Failed in creating shredder"); - assert!(shredder.shreds.is_empty()); + assert!(shredder.shred_tuples.is_empty()); assert_eq!(shredder.active_shred, None); assert_eq!(shredder.active_offset, 0); @@ -960,13 +979,13 @@ mod tests { let _ = shredder.write(&data).unwrap(); // We should have 2 shreds now - assert_eq!(shredder.shreds.len(), 2); + assert_eq!(shredder.shred_tuples.len(), 2); shredder.finalize_data(); // Finalize must have created 1 final data shred and 3 coding shreds // assert_eq!(shredder.shreds.len(), 6); - let shred = shredder.shreds.remove(0); + let (_, shred) = shredder.shred_tuples.remove(0); assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); assert_matches!(deserialized_shred, Shred::FirstInSlot(_)); @@ -975,7 +994,7 @@ mod tests { assert_eq!(deserialized_shred.parent(), slot - 5); assert!(deserialized_shred.verify(&keypair.pubkey())); - let shred = shredder.shreds.remove(0); + let (_, shred) = shredder.shred_tuples.remove(0); assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); assert_matches!(deserialized_shred, Shred::Data(_)); @@ -984,7 +1003,7 @@ mod tests { assert_eq!(deserialized_shred.parent(), slot - 5); assert!(deserialized_shred.verify(&keypair.pubkey())); - let shred = shredder.shreds.remove(0); + let (_, shred) = shredder.shred_tuples.remove(0); assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); assert_matches!(deserialized_shred, Shred::DataComplete(_)); @@ -993,7 +1012,7 @@ mod tests { assert_eq!(deserialized_shred.parent(), slot - 5); assert!(deserialized_shred.verify(&keypair.pubkey())); - let shred = shredder.shreds.remove(0); + let (_, shred) = shredder.shred_tuples.remove(0); assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); assert_matches!(deserialized_shred, Shred::Coding(_)); @@ -1001,7 +1020,7 @@ mod tests { assert_eq!(deserialized_shred.slot(), slot); assert!(deserialized_shred.verify(&keypair.pubkey())); - let shred = shredder.shreds.remove(0); + let (_, shred) = shredder.shred_tuples.remove(0); assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); assert_matches!(deserialized_shred, Shred::Coding(_)); @@ -1009,7 +1028,7 @@ mod tests { assert_eq!(deserialized_shred.slot(), slot); assert!(deserialized_shred.verify(&keypair.pubkey())); - let shred = shredder.shreds.remove(0); + let (_, shred) = shredder.shred_tuples.remove(0); assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); assert_matches!(deserialized_shred, Shred::Coding(_)); @@ -1025,7 +1044,7 @@ mod tests { let mut shredder = Shredder::new(slot, slot - 5, 1.0, &keypair, 0).expect("Failed in creating shredder"); - assert!(shredder.shreds.is_empty()); + assert!(shredder.shred_tuples.is_empty()); assert_eq!(shredder.active_shred, None); assert_eq!(shredder.active_offset, 0); @@ -1040,7 +1059,7 @@ mod tests { // We should have some shreds now assert_eq!( - shredder.shreds.len(), + shredder.shred_tuples.len(), data.len() / approx_shred_payload_size ); assert_eq!(offset, data.len()); @@ -1049,12 +1068,12 @@ mod tests { // We should have 10 shreds now (one additional final shred, and equal number of coding shreds) let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; - assert_eq!(shredder.shreds.len(), expected_shred_count); + assert_eq!(shredder.shred_tuples.len(), expected_shred_count); let shreds: Vec = shredder - .shreds + .shred_tuples .iter() - .map(|s| bincode::deserialize(s).unwrap()) + .map(|(s, _)| s.clone()) .collect(); // Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail @@ -1087,16 +1106,18 @@ mod tests { // Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work let mut shreds: Vec = shredder - .shreds + .shred_tuples .iter() .enumerate() - .filter_map(|(i, s)| { - if i % 2 == 0 { - Some(bincode::deserialize(s).unwrap()) - } else { - None - } - }) + .filter_map( + |(i, (s, _))| { + if i % 2 == 0 { + Some(s.clone()) + } else { + None + } + }, + ) .collect(); let mut result = Shredder::try_recovery( @@ -1158,16 +1179,18 @@ mod tests { // Test3: Try recovery/reassembly with 3 missing data shreds + 2 coding shreds. Hint: should work let mut shreds: Vec = shredder - .shreds + .shred_tuples .iter() .enumerate() - .filter_map(|(i, s)| { - if i % 2 != 0 { - Some(bincode::deserialize(s).unwrap()) - } else { - None - } - }) + .filter_map( + |(i, (s, _))| { + if i % 2 != 0 { + Some(s.clone()) + } else { + None + } + }, + ) .collect(); let mut result = Shredder::try_recovery( @@ -1240,7 +1263,7 @@ mod tests { // We should have some shreds now assert_eq!( - shredder.shreds.len(), + shredder.shred_tuples.len(), data.len() / approx_shred_payload_size ); assert_eq!(offset, data.len()); @@ -1249,19 +1272,21 @@ mod tests { // We should have 10 shreds now (one additional final shred, and equal number of coding shreds) let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; - assert_eq!(shredder.shreds.len(), expected_shred_count); + assert_eq!(shredder.shred_tuples.len(), expected_shred_count); let mut shreds: Vec = shredder - .shreds + .shred_tuples .iter() .enumerate() - .filter_map(|(i, s)| { - if i % 2 != 0 { - Some(bincode::deserialize(s).unwrap()) - } else { - None - } - }) + .filter_map( + |(i, (s, _))| { + if i % 2 != 0 { + Some(s.clone()) + } else { + None + } + }, + ) .collect(); let mut result = Shredder::try_recovery( @@ -1323,12 +1348,12 @@ mod tests { // Test5: Try recovery/reassembly with 3 missing data shreds + 3 coding shreds. Hint: should fail let shreds: Vec = shredder - .shreds + .shred_tuples .iter() .enumerate() - .filter_map(|(i, s)| { + .filter_map(|(i, (s, _))| { if (i < 5 && i % 2 != 0) || (i >= 5 && i % 2 == 0) { - Some(bincode::deserialize(s).unwrap()) + Some(s.clone()) } else { None } @@ -1354,7 +1379,7 @@ mod tests { // We should have some shreds now assert_eq!( - shredder.shreds.len(), + shredder.shred_tuples.len(), data.len() / approx_shred_payload_size ); assert_eq!(offset, data.len()); @@ -1363,19 +1388,21 @@ mod tests { // We should have 10 shreds now (one additional final shred, and equal number of coding shreds) let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; - assert_eq!(shredder.shreds.len(), expected_shred_count); + assert_eq!(shredder.shred_tuples.len(), expected_shred_count); let mut shreds: Vec = shredder - .shreds + .shred_tuples .iter() .enumerate() - .filter_map(|(i, s)| { - if i % 2 != 0 { - Some(bincode::deserialize(s).unwrap()) - } else { - None - } - }) + .filter_map( + |(i, (s, _))| { + if i % 2 != 0 { + Some(s.clone()) + } else { + None + } + }, + ) .collect(); let mut result = Shredder::try_recovery( diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 153d762a59..b5c8bfb338 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -302,11 +302,7 @@ mod test { bincode::serialize_into(&mut shredder, &entries) .expect("Expect to write all entries to shreds"); shredder.finalize_slot(); - shredder - .shreds - .iter() - .map(|s| bincode::deserialize(s).unwrap()) - .collect() + shredder.shred_tuples.into_iter().map(|(s, _)| s).collect() } #[test]