Reduce serializations/deserializations of shreds (#5879)

This commit is contained in:
Pankaj Garg
2019-09-12 10:10:25 -07:00
committed by GitHub
parent 176c7d8b13
commit 385086359c
4 changed files with 124 additions and 119 deletions

View File

@ -890,11 +890,8 @@ impl Blocktree {
parent_slot = current_slot - 1;
remaining_ticks_in_slot = ticks_per_slot;
shredder.finalize_slot();
let shreds: Vec<Shred> = shredder
.shreds
.iter()
.map(|s| bincode::deserialize(s).unwrap())
.collect();
let shreds: Vec<Shred> =
shredder.shred_tuples.into_iter().map(|(s, _)| s).collect();
all_shreds.extend(shreds);
shredder =
Shredder::new(current_slot, parent_slot, 0.0, &Arc::new(Keypair::new()), 0)
@ -917,11 +914,7 @@ impl Blocktree {
if is_full_slot && remaining_ticks_in_slot != 0 {
shredder.finalize_slot();
}
let shreds: Vec<Shred> = shredder
.shreds
.iter()
.map(|s| bincode::deserialize(s).unwrap())
.collect();
let shreds: Vec<Shred> = shredder.shred_tuples.into_iter().map(|(s, _)| s).collect();
all_shreds.extend(shreds);
let num_shreds = all_shreds.len();
@ -1630,11 +1623,7 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Re
bincode::serialize_into(&mut shredder, &entries)
.expect("Expect to write all entries to shreds");
shredder.finalize_slot();
let shreds: Vec<Shred> = shredder
.shreds
.iter()
.map(|s| bincode::deserialize(s).unwrap())
.collect();
let shreds: Vec<Shred> = shredder.shred_tuples.into_iter().map(|(s, _)| s).collect();
blocktree.insert_shreds(shreds, None)?;
blocktree.set_roots(&[0])?;
@ -1709,11 +1698,7 @@ pub fn entries_to_test_shreds(
shredder.finalize_data();
}
let shreds: Vec<Shred> = shredder
.shreds
.iter()
.map(|s| bincode::deserialize(s).unwrap())
.collect();
let shreds: Vec<Shred> = shredder.shred_tuples.into_iter().map(|(s, _)| s).collect();
shreds
}

View File

@ -101,16 +101,13 @@ pub(super) fn entries_to_shreds(
shredder.finalize_data();
}
let mut shreds: Vec<Shred> = shredder
.shreds
.iter()
.map(|s| bincode::deserialize(s).unwrap())
.collect();
let (mut shreds, mut shred_bufs): (Vec<Shred>, Vec<Vec<u8>>) =
shredder.shred_tuples.into_iter().unzip();
trace!("Inserting {:?} shreds in blocktree", shreds.len());
latest_shred_index = u64::from(shredder.index);
all_shreds.append(&mut shreds);
all_shred_bufs.append(&mut shredder.shreds);
all_shred_bufs.append(&mut shred_bufs);
});
(all_shreds, all_shred_bufs, latest_shred_index)
}

View File

@ -95,6 +95,16 @@ impl Shred {
}
}
pub fn set_signature(&mut self, sig: Signature) {
match self {
Shred::FirstInSlot(s)
| Shred::Data(s)
| Shred::DataComplete(s)
| Shred::LastInSlot(s) => s.header.common_header.signature = sig,
Shred::Coding(s) => s.header.common_header.signature = sig,
};
}
pub fn seed(&self) -> [u8; 32] {
let mut seed = [0; 32];
let seed_len = seed.len();
@ -264,7 +274,7 @@ pub struct Shredder {
parent_offset: u16,
fec_rate: f32,
signer: Arc<Keypair>,
pub shreds: Vec<Vec<u8>>,
pub shred_tuples: Vec<(Shred, Vec<u8>)>,
fec_set_shred_start: usize,
active_shred: Option<Shred>,
active_offset: usize,
@ -371,20 +381,28 @@ impl Shredder {
/// Serialize the payload, sign it and store the signature in the shred
/// Store the signed shred in the vector of shreds
fn finalize_shred(&mut self, mut shred: Vec<u8>, signature_offset: usize) {
fn finalize_shred(
&mut self,
mut shred: Shred,
mut shred_buf: Vec<u8>,
signature_offset: usize,
) {
let data_offset =
signature_offset + bincode::serialized_size(&Signature::default()).unwrap() as usize;
let signature = bincode::serialize(&self.signer.sign_message(&shred[data_offset..]))
.expect("Failed to generate serialized signature");
shred[signature_offset..signature_offset + signature.len()].copy_from_slice(&signature);
self.shreds.push(shred);
let signature = self.signer.sign_message(&shred_buf[data_offset..]);
let serialized_signature =
bincode::serialize(&signature).expect("Failed to generate serialized signature");
shred.set_signature(signature);
shred_buf[signature_offset..signature_offset + serialized_signature.len()]
.copy_from_slice(&serialized_signature);
self.shred_tuples.push((shred, shred_buf));
}
/// Finalize a data shred. Update the shred index for the next shred
fn finalize_data_shred(&mut self, shred: Shred) {
let data = bincode::serialize(&shred).expect("Failed to serialize shred");
self.finalize_shred(data, CodingShred::overhead());
self.finalize_shred(shred, data, CodingShred::overhead());
self.active_offset = 0;
self.index += 1;
}
@ -425,9 +443,9 @@ impl Shredder {
// All information after "reserved" field (coding shred header) in a data shred is encoded
let coding_block_offset = CodingShred::overhead();
let data_ptrs: Vec<_> = self.shreds[self.fec_set_shred_start..]
let data_ptrs: Vec<_> = self.shred_tuples[self.fec_set_shred_start..]
.iter()
.map(|data| &data[coding_block_offset..])
.map(|(_, data)| &data[coding_block_offset..])
.collect();
// Create empty coding shreds, with correctly populated headers
@ -462,11 +480,12 @@ impl Shredder {
as usize;
// Finalize the coding blocks (sign and append to the shred list)
coding_shreds
.into_iter()
.for_each(|code| self.finalize_shred(code, coding_header_offset));
coding_shreds.into_iter().for_each(|code| {
let shred: Shred = bincode::deserialize(&code).unwrap();
self.finalize_shred(shred, code, coding_header_offset)
});
self.fec_set_index = self.index;
self.fec_set_shred_start = self.shreds.len();
self.fec_set_shred_start = self.shred_tuples.len();
}
}
@ -733,7 +752,7 @@ mod tests {
let mut shredder =
Shredder::new(slot, slot - 5, 0.0, &keypair, 0).expect("Failed in creating shredder");
assert!(shredder.shreds.is_empty());
assert!(shredder.shred_tuples.is_empty());
assert_eq!(shredder.active_shred, None);
assert_eq!(shredder.active_offset, 0);
@ -743,13 +762,13 @@ mod tests {
// Test0: Write some data to shred. Not enough to create a signed shred
let data: Vec<u8> = (0..25).collect();
assert_eq!(shredder.write(&data).unwrap(), data.len());
assert!(shredder.shreds.is_empty());
assert!(shredder.shred_tuples.is_empty());
assert_ne!(shredder.active_shred, None);
assert_eq!(shredder.active_offset, 25);
// Test1: Write some more data to shred. Not enough to create a signed shred
assert_eq!(shredder.write(&data).unwrap(), data.len());
assert!(shredder.shreds.is_empty());
assert!(shredder.shred_tuples.is_empty());
assert_eq!(shredder.active_offset, 50);
// Test2: Write enough data to create a shred (> PACKET_DATA_SIZE)
@ -758,14 +777,14 @@ mod tests {
let offset = shredder.write(&data).unwrap();
assert_ne!(offset, data.len());
// Assert that we have atleast one signed shred
assert!(!shredder.shreds.is_empty());
assert!(!shredder.shred_tuples.is_empty());
// Assert that a new active shred was also created
assert_ne!(shredder.active_shred, None);
// Assert that the new active shred was not populated
assert_eq!(shredder.active_offset, 0);
// Test3: Assert that the first shred in slot was created (since we gave a parent to shredder)
let shred = shredder.shreds.pop().unwrap();
let (_, shred) = shredder.shred_tuples.pop().unwrap();
assert_eq!(shred.len(), PACKET_DATA_SIZE);
info!("Len: {}", shred.len());
info!("{:?}", shred);
@ -786,17 +805,17 @@ mod tests {
shredder.write(&data[offset..]).unwrap();
// It shouldn't generate a signed shred
assert!(shredder.shreds.is_empty());
assert!(shredder.shred_tuples.is_empty());
// Test6: Let's finalize the FEC block. That should result in the current shred to morph into
// a signed LastInFECBlock shred
shredder.finalize_data();
// We should have a new signed shred
assert!(!shredder.shreds.is_empty());
assert!(!shredder.shred_tuples.is_empty());
// Must be Last in FEC Set
let shred = shredder.shreds.pop().unwrap();
let (_, shred) = shredder.shred_tuples.pop().unwrap();
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
@ -816,9 +835,9 @@ mod tests {
assert_ne!(offset, data.len());
// We should have a new signed shred
assert!(!shredder.shreds.is_empty());
assert!(!shredder.shred_tuples.is_empty());
let shred = shredder.shreds.pop().unwrap();
let (_, shred) = shredder.shred_tuples.pop().unwrap();
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
@ -833,10 +852,10 @@ mod tests {
assert_ne!(offset, data.len());
// We should have a new signed shred
assert!(!shredder.shreds.is_empty());
assert!(!shredder.shred_tuples.is_empty());
// Must be a Data shred
let shred = shredder.shreds.pop().unwrap();
let (_, shred) = shredder.shred_tuples.pop().unwrap();
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
@ -854,10 +873,10 @@ mod tests {
shredder.finalize_slot();
// We should have a new signed shred
assert!(!shredder.shreds.is_empty());
assert!(!shredder.shred_tuples.is_empty());
// Must be LastInSlot
let shred = shredder.shreds.pop().unwrap();
let (_, shred) = shredder.shred_tuples.pop().unwrap();
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
@ -876,7 +895,7 @@ mod tests {
let mut shredder =
Shredder::new(slot, slot - 5, 0.0, &keypair, 0).expect("Failed in creating shredder");
assert!(shredder.shreds.is_empty());
assert!(shredder.shred_tuples.is_empty());
assert_eq!(shredder.active_shred, None);
assert_eq!(shredder.active_offset, 0);
@ -885,14 +904,14 @@ mod tests {
let _ = shredder.write(&data).unwrap();
// We should have 0 shreds now
assert_eq!(shredder.shreds.len(), 0);
assert_eq!(shredder.shred_tuples.len(), 0);
shredder.finalize_data();
// We should have 2 shreds now (FirstInSlot, and LastInFECBlock)
assert_eq!(shredder.shreds.len(), 2);
assert_eq!(shredder.shred_tuples.len(), 2);
let shred = shredder.shreds.remove(0);
let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
assert_matches!(deserialized_shred, Shred::FirstInSlot(_));
@ -901,7 +920,7 @@ mod tests {
assert_eq!(deserialized_shred.parent(), slot - 5);
assert!(deserialized_shred.verify(&keypair.pubkey()));
let shred = shredder.shreds.remove(0);
let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
assert_matches!(deserialized_shred, Shred::DataComplete(_));
@ -913,7 +932,7 @@ mod tests {
let mut shredder = Shredder::new(0x123456789abcdef0, slot - 5, 0.0, &keypair, 2)
.expect("Failed in creating shredder");
assert!(shredder.shreds.is_empty());
assert!(shredder.shred_tuples.is_empty());
assert_eq!(shredder.active_shred, None);
assert_eq!(shredder.active_offset, 0);
@ -922,13 +941,13 @@ mod tests {
let _ = shredder.write(&data).unwrap();
// We should have 0 shreds now
assert_eq!(shredder.shreds.len(), 0);
assert_eq!(shredder.shred_tuples.len(), 0);
shredder.finalize_data();
// We should have 1 shred now (LastInFECBlock)
assert_eq!(shredder.shreds.len(), 1);
let shred = shredder.shreds.remove(0);
assert_eq!(shredder.shred_tuples.len(), 1);
let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
assert_matches!(deserialized_shred, Shred::DataComplete(_));
@ -949,7 +968,7 @@ mod tests {
let mut shredder = Shredder::new(0x123456789abcdef0, slot - 5, 1.0, &keypair, 0)
.expect("Failed in creating shredder");
assert!(shredder.shreds.is_empty());
assert!(shredder.shred_tuples.is_empty());
assert_eq!(shredder.active_shred, None);
assert_eq!(shredder.active_offset, 0);
@ -960,13 +979,13 @@ mod tests {
let _ = shredder.write(&data).unwrap();
// We should have 2 shreds now
assert_eq!(shredder.shreds.len(), 2);
assert_eq!(shredder.shred_tuples.len(), 2);
shredder.finalize_data();
// Finalize must have created 1 final data shred and 3 coding shreds
// assert_eq!(shredder.shreds.len(), 6);
let shred = shredder.shreds.remove(0);
let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
assert_matches!(deserialized_shred, Shred::FirstInSlot(_));
@ -975,7 +994,7 @@ mod tests {
assert_eq!(deserialized_shred.parent(), slot - 5);
assert!(deserialized_shred.verify(&keypair.pubkey()));
let shred = shredder.shreds.remove(0);
let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
assert_matches!(deserialized_shred, Shred::Data(_));
@ -984,7 +1003,7 @@ mod tests {
assert_eq!(deserialized_shred.parent(), slot - 5);
assert!(deserialized_shred.verify(&keypair.pubkey()));
let shred = shredder.shreds.remove(0);
let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
assert_matches!(deserialized_shred, Shred::DataComplete(_));
@ -993,7 +1012,7 @@ mod tests {
assert_eq!(deserialized_shred.parent(), slot - 5);
assert!(deserialized_shred.verify(&keypair.pubkey()));
let shred = shredder.shreds.remove(0);
let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
assert_matches!(deserialized_shred, Shred::Coding(_));
@ -1001,7 +1020,7 @@ mod tests {
assert_eq!(deserialized_shred.slot(), slot);
assert!(deserialized_shred.verify(&keypair.pubkey()));
let shred = shredder.shreds.remove(0);
let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
assert_matches!(deserialized_shred, Shred::Coding(_));
@ -1009,7 +1028,7 @@ mod tests {
assert_eq!(deserialized_shred.slot(), slot);
assert!(deserialized_shred.verify(&keypair.pubkey()));
let shred = shredder.shreds.remove(0);
let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
assert_matches!(deserialized_shred, Shred::Coding(_));
@ -1025,7 +1044,7 @@ mod tests {
let mut shredder =
Shredder::new(slot, slot - 5, 1.0, &keypair, 0).expect("Failed in creating shredder");
assert!(shredder.shreds.is_empty());
assert!(shredder.shred_tuples.is_empty());
assert_eq!(shredder.active_shred, None);
assert_eq!(shredder.active_offset, 0);
@ -1040,7 +1059,7 @@ mod tests {
// We should have some shreds now
assert_eq!(
shredder.shreds.len(),
shredder.shred_tuples.len(),
data.len() / approx_shred_payload_size
);
assert_eq!(offset, data.len());
@ -1049,12 +1068,12 @@ mod tests {
// We should have 10 shreds now (one additional final shred, and equal number of coding shreds)
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
assert_eq!(shredder.shreds.len(), expected_shred_count);
assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
let shreds: Vec<Shred> = shredder
.shreds
.shred_tuples
.iter()
.map(|s| bincode::deserialize(s).unwrap())
.map(|(s, _)| s.clone())
.collect();
// Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail
@ -1087,16 +1106,18 @@ mod tests {
// Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work
let mut shreds: Vec<Shred> = shredder
.shreds
.shred_tuples
.iter()
.enumerate()
.filter_map(|(i, s)| {
.filter_map(
|(i, (s, _))| {
if i % 2 == 0 {
Some(bincode::deserialize(s).unwrap())
Some(s.clone())
} else {
None
}
})
},
)
.collect();
let mut result = Shredder::try_recovery(
@ -1158,16 +1179,18 @@ mod tests {
// Test3: Try recovery/reassembly with 3 missing data shreds + 2 coding shreds. Hint: should work
let mut shreds: Vec<Shred> = shredder
.shreds
.shred_tuples
.iter()
.enumerate()
.filter_map(|(i, s)| {
.filter_map(
|(i, (s, _))| {
if i % 2 != 0 {
Some(bincode::deserialize(s).unwrap())
Some(s.clone())
} else {
None
}
})
},
)
.collect();
let mut result = Shredder::try_recovery(
@ -1240,7 +1263,7 @@ mod tests {
// We should have some shreds now
assert_eq!(
shredder.shreds.len(),
shredder.shred_tuples.len(),
data.len() / approx_shred_payload_size
);
assert_eq!(offset, data.len());
@ -1249,19 +1272,21 @@ mod tests {
// We should have 10 shreds now (one additional final shred, and equal number of coding shreds)
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
assert_eq!(shredder.shreds.len(), expected_shred_count);
assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
let mut shreds: Vec<Shred> = shredder
.shreds
.shred_tuples
.iter()
.enumerate()
.filter_map(|(i, s)| {
.filter_map(
|(i, (s, _))| {
if i % 2 != 0 {
Some(bincode::deserialize(s).unwrap())
Some(s.clone())
} else {
None
}
})
},
)
.collect();
let mut result = Shredder::try_recovery(
@ -1323,12 +1348,12 @@ mod tests {
// Test5: Try recovery/reassembly with 3 missing data shreds + 3 coding shreds. Hint: should fail
let shreds: Vec<Shred> = shredder
.shreds
.shred_tuples
.iter()
.enumerate()
.filter_map(|(i, s)| {
.filter_map(|(i, (s, _))| {
if (i < 5 && i % 2 != 0) || (i >= 5 && i % 2 == 0) {
Some(bincode::deserialize(s).unwrap())
Some(s.clone())
} else {
None
}
@ -1354,7 +1379,7 @@ mod tests {
// We should have some shreds now
assert_eq!(
shredder.shreds.len(),
shredder.shred_tuples.len(),
data.len() / approx_shred_payload_size
);
assert_eq!(offset, data.len());
@ -1363,19 +1388,21 @@ mod tests {
// We should have 10 shreds now (one additional final shred, and equal number of coding shreds)
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
assert_eq!(shredder.shreds.len(), expected_shred_count);
assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
let mut shreds: Vec<Shred> = shredder
.shreds
.shred_tuples
.iter()
.enumerate()
.filter_map(|(i, s)| {
.filter_map(
|(i, (s, _))| {
if i % 2 != 0 {
Some(bincode::deserialize(s).unwrap())
Some(s.clone())
} else {
None
}
})
},
)
.collect();
let mut result = Shredder::try_recovery(

View File

@ -302,11 +302,7 @@ mod test {
bincode::serialize_into(&mut shredder, &entries)
.expect("Expect to write all entries to shreds");
shredder.finalize_slot();
shredder
.shreds
.iter()
.map(|s| bincode::deserialize(s).unwrap())
.collect()
shredder.shred_tuples.into_iter().map(|(s, _)| s).collect()
}
#[test]