From 80d780d666e795b5f299cdf03703eeb0cc545fe7 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 7 Nov 2019 21:36:53 -0800 Subject: [PATCH] Upgrade Repair be more intelligent and agressive (bp #6789) (#6793) automerge --- core/benches/shredder.rs | 8 +- .../broadcast_fake_blobs_run.rs | 1 + .../fail_entry_verification_broadcast_run.rs | 1 + .../broadcast_stage/standard_broadcast_run.rs | 13 +- core/src/chacha.rs | 2 +- core/src/cluster_info.rs | 8 +- core/src/repair_service.rs | 20 +-- core/src/replay_stage.rs | 2 +- core/src/window_service.rs | 2 +- ledger/src/blocktree.rs | 103 +++++++++++--- ledger/src/blocktree_meta.rs | 19 +-- ledger/src/shred.rs | 128 +++++++++++++++--- 12 files changed, 236 insertions(+), 71 deletions(-) diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 6536429823..081373fc2f 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -35,7 +35,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) { let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64; let entries = create_ticks(num_ticks, Hash::default()); bencher.iter(|| { - let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone()).unwrap(); + let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0).unwrap(); shredder.entries_to_shreds(&entries, true, 0); }) } @@ -50,7 +50,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) { let entries = make_large_unchained_entries(txs_per_entry, num_entries); // 1Mb bencher.iter(|| { - let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone()).unwrap(); + let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0).unwrap(); shredder.entries_to_shreds(&entries, true, 0); }) } @@ -63,7 +63,7 @@ fn bench_deshredder(bencher: &mut Bencher) { let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size; let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64; let entries = create_ticks(num_ticks, Hash::default()); - let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp).unwrap(); + let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp, 0).unwrap(); let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; bencher.iter(|| { let raw = &mut Shredder::deshred(&data_shreds).unwrap(); @@ -75,7 +75,7 @@ fn bench_deshredder(bencher: &mut Bencher) { fn bench_deserialize_hdr(bencher: &mut Bencher) { let data = vec![0; SIZE_OF_DATA_SHRED_PAYLOAD]; - let shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true); + let shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true, 0); bencher.iter(|| { let payload = shred.payload.clone(); diff --git a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs index 5c44351351..619cca45e9 100644 --- a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs @@ -44,6 +44,7 @@ impl BroadcastRun for BroadcastFakeBlobsRun { bank.parent().unwrap().slot(), RECOMMENDED_FEC_RATE, keypair.clone(), + (bank.tick_height() % bank.ticks_per_slot()) as u8, ) .expect("Expected to create a new shredder"); diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 990592f86a..fabcd8e4c4 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -42,6 +42,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { bank.parent().unwrap().slot(), RECOMMENDED_FEC_RATE, keypair.clone(), + (bank.tick_height() % bank.ticks_per_slot()) as u8, ) .expect("Expected to create a new shredder"); diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index b07cf07866..c55f8ccd2c 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -2,7 +2,7 @@ use super::broadcast_utils::{self, ReceiveResults}; use super::*; use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo; use solana_ledger::entry::Entry; -use solana_ledger::shred::{Shred, Shredder, RECOMMENDED_FEC_RATE}; +use solana_ledger::shred::{Shred, Shredder, RECOMMENDED_FEC_RATE, SHRED_TICK_REFERENCE_MASK}; use solana_sdk::signature::Keypair; use solana_sdk::timing::duration_as_us; use std::time::Duration; @@ -46,7 +46,7 @@ impl StandardBroadcastRun { } } - fn check_for_interrupted_slot(&mut self) -> Option { + fn check_for_interrupted_slot(&mut self, max_ticks_in_slot: u8) -> Option { let (slot, _) = self.current_slot_and_parent.unwrap(); let mut last_unfinished_slot_shred = self .unfinished_slot @@ -60,6 +60,7 @@ impl StandardBroadcastRun { None, true, true, + max_ticks_in_slot & SHRED_TICK_REFERENCE_MASK, )) } else { None @@ -102,6 +103,7 @@ impl StandardBroadcastRun { blocktree: &Blocktree, entries: &[Entry], is_slot_end: bool, + reference_tick: u8, ) -> (Vec, Vec) { let (slot, parent_slot) = self.current_slot_and_parent.unwrap(); let shredder = Shredder::new( @@ -109,6 +111,7 @@ impl StandardBroadcastRun { parent_slot, RECOMMENDED_FEC_RATE, self.keypair.clone(), + reference_tick, ) .expect("Expected to create a new shredder"); @@ -168,13 +171,15 @@ impl StandardBroadcastRun { let to_shreds_start = Instant::now(); // 1) Check if slot was interrupted - let last_unfinished_slot_shred = self.check_for_interrupted_slot(); + let last_unfinished_slot_shred = + self.check_for_interrupted_slot(bank.ticks_per_slot() as u8); // 2) Convert entries to shreds and coding shreds let (data_shreds, coding_shreds) = self.entries_to_shreds( blocktree, &receive_results.entries, last_tick_height == bank.max_tick_height(), + (bank.tick_height() % bank.ticks_per_slot()) as u8, ); let to_shreds_elapsed = to_shreds_start.elapsed(); @@ -362,7 +367,7 @@ mod test { // Slot 2 interrupted slot 1 let shred = run - .check_for_interrupted_slot() + .check_for_interrupted_slot(0) .expect("Expected a shred that signals an interrupt"); // Validate the shred diff --git a/core/src/chacha.rs b/core/src/chacha.rs index a94ee79f0e..af0016d6e6 100644 --- a/core/src/chacha.rs +++ b/core/src/chacha.rs @@ -165,7 +165,7 @@ mod tests { hasher.hash(&buf[..size]); // golden needs to be updated if blob stuff changes.... - let golden: Hash = "BdmY3efqu7zbnFuGRAeFANwa35HkDdQ7hwhYez3xGXiM" + let golden: Hash = "HLzH7Nrh4q2K5WTh3e9vPNFZ1QVYhVDRMN9u5v51GqpJ" .parse() .unwrap(); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 2fea9c9869..863be3c6d1 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -820,7 +820,7 @@ impl ClusterInfo { } pub fn map_repair_request(&self, repair_request: &RepairType) -> Result> { match repair_request { - RepairType::Blob(slot, blob_index) => { + RepairType::Shred(slot, blob_index) => { datapoint_debug!( "cluster_info-repair", ("repair-slot", *slot, i64), @@ -1882,7 +1882,7 @@ mod tests { fn window_index_request() { let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); let mut cluster_info = ClusterInfo::new_with_invalid_keypair(me); - let rv = cluster_info.repair_request(&RepairType::Blob(0, 0)); + let rv = cluster_info.repair_request(&RepairType::Shred(0, 0)); assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); let gossip_addr = socketaddr!([127, 0, 0, 1], 1234); @@ -1901,7 +1901,7 @@ mod tests { ); cluster_info.insert_info(nxt.clone()); let rv = cluster_info - .repair_request(&RepairType::Blob(0, 0)) + .repair_request(&RepairType::Shred(0, 0)) .unwrap(); assert_eq!(nxt.gossip, gossip_addr); assert_eq!(rv.0, nxt.gossip); @@ -1926,7 +1926,7 @@ mod tests { while !one || !two { //this randomly picks an option, so eventually it should pick both let rv = cluster_info - .repair_request(&RepairType::Blob(0, 0)) + .repair_request(&RepairType::Shred(0, 0)) .unwrap(); if rv.0 == gossip_addr { one = true; diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index db2b4f1c7a..862ddf7aad 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -18,8 +18,8 @@ use std::{ time::Duration, }; -pub const MAX_REPAIR_LENGTH: usize = 16; -pub const REPAIR_MS: u64 = 100; +pub const MAX_REPAIR_LENGTH: usize = 1024; +pub const REPAIR_MS: u64 = 50; pub const MAX_ORPHANS: usize = 5; pub enum RepairStrategy { @@ -35,7 +35,7 @@ pub enum RepairStrategy { pub enum RepairType { Orphan(u64), HighestBlob(u64, u64), - Blob(u64, u64), + Shred(u64, u64), } pub struct RepairSlotRange { @@ -252,13 +252,13 @@ impl RepairService { } else { let reqs = blocktree.find_missing_data_indexes( slot, + slot_meta.first_shred_timestamp, slot_meta.consumed, slot_meta.received, max_repairs, ); - reqs.into_iter() - .map(|i| RepairType::Blob(slot, i)) + .map(|i| RepairType::Shred(slot, i)) .collect() } } @@ -478,12 +478,13 @@ mod test { } } blocktree.insert_shreds(shreds_to_write, None).unwrap(); - + // sleep so that the holes are ready for repair + sleep(Duration::from_secs(1)); let expected: Vec = (0..num_slots) .flat_map(|slot| { missing_indexes_per_slot .iter() - .map(move |blob_index| RepairType::Blob(slot as u64, *blob_index)) + .map(move |blob_index| RepairType::Shred(slot as u64, *blob_index)) }) .collect(); @@ -543,7 +544,8 @@ mod test { slot_shreds.remove(0); blocktree.insert_shreds(slot_shreds, None).unwrap(); } - + // sleep to make slot eligible for repair + sleep(Duration::from_secs(1)); // Iterate through all possible combinations of start..end (inclusive on both // sides of the range) for start in 0..slots.len() { @@ -555,7 +557,7 @@ mod test { ..=repair_slot_range.end) .map(|slot_index| { if slots.contains(&(slot_index as u64)) { - RepairType::Blob(slot_index as u64, 0) + RepairType::Shred(slot_index as u64, 0) } else { RepairType::HighestBlob(slot_index as u64, 0) } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index c88b5d47f5..bab24e9284 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1005,7 +1005,7 @@ mod test { let payload_len = SIZE_OF_DATA_SHRED_PAYLOAD; let gibberish = [0xa5u8; PACKET_DATA_SIZE]; let mut data_header = DataShredHeader::default(); - data_header.flags = DATA_COMPLETE_SHRED; + data_header.flags |= DATA_COMPLETE_SHRED; let mut shred = Shred::new_empty_from_header( ShredCommonHeader::default(), data_header, diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 47e22979fd..709218a272 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -308,7 +308,7 @@ mod test { parent: u64, keypair: &Arc, ) -> Vec { - let shredder = Shredder::new(slot, parent, 0.0, keypair.clone()) + let shredder = Shredder::new(slot, parent, 0.0, keypair.clone(), 0) .expect("Failed to create entry shredder"); shredder.entries_to_shreds(&entries, true, 0).0 } diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index e74b07f86d..b4c0bee5bd 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -20,10 +20,11 @@ use rocksdb::DBRawIterator; use solana_measure::measure::Measure; use solana_metrics::{datapoint_debug, datapoint_error}; use solana_rayon_threadlimit::get_thread_count; -use solana_sdk::clock::Slot; +use solana_sdk::clock::{Slot, DEFAULT_TICKS_PER_SECOND}; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; +use solana_sdk::timing::timestamp; use std::cell::RefCell; use std::cmp; use std::collections::HashMap; @@ -41,6 +42,7 @@ thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon:: .unwrap())); pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000; +pub const MAX_TURBINE_PROPAGATION_DELAY_TICKS: u64 = 16; pub type CompletedSlotsReceiver = Receiver>; @@ -833,6 +835,7 @@ impl Blocktree { slot_meta, index as u32, new_consumed, + shred.reference_tick(), ); data_index.set_present(index, true); trace!("inserted shred into slot {:?} and index {:?}", slot, index); @@ -909,7 +912,7 @@ impl Blocktree { }, |v| v, ); - let mut shredder = Shredder::new(current_slot, parent_slot, 0.0, keypair.clone()) + let mut shredder = Shredder::new(current_slot, parent_slot, 0.0, keypair.clone(), 0) .expect("Failed to create entry shredder"); let mut all_shreds = vec![]; let mut slot_entries = vec![]; @@ -932,8 +935,14 @@ impl Blocktree { shredder.entries_to_shreds(¤t_entries, true, start_index); all_shreds.append(&mut data_shreds); all_shreds.append(&mut coding_shreds); - shredder = Shredder::new(current_slot, parent_slot, 0.0, keypair.clone()) - .expect("Failed to create entry shredder"); + shredder = Shredder::new( + current_slot, + parent_slot, + 0.0, + keypair.clone(), + (ticks_per_slot - remaining_ticks_in_slot) as u8, + ) + .expect("Failed to create entry shredder"); } if entry.is_tick() { @@ -970,7 +979,8 @@ impl Blocktree { // for the slot with the specified slot fn find_missing_indexes( db_iterator: &mut DBRawIterator, - slot: u64, + slot: Slot, + first_timestamp: u64, start_index: u64, end_index: u64, max_missing: usize, @@ -983,6 +993,8 @@ impl Blocktree { } let mut missing_indexes = vec![]; + let ticks_since_first_insert = + DEFAULT_TICKS_PER_SECOND * (timestamp() - first_timestamp) / 1000; // Seek to the first shred with index >= start_index db_iterator.seek(&C::key((slot, start_index))); @@ -1010,7 +1022,15 @@ impl Blocktree { }; let upper_index = cmp::min(current_index, end_index); + // the tick that will be used to figure out the timeout for this hole + let reference_tick = u64::from(Shred::reference_tick_from_data( + &db_iterator.value().expect("couldn't read value"), + )); + if ticks_since_first_insert < reference_tick + MAX_TURBINE_PROPAGATION_DELAY_TICKS { + // The higher index holes have not timed out yet + break 'outer; + } for i in prev_index..upper_index { missing_indexes.push(i); if missing_indexes.len() == max_missing { @@ -1035,7 +1055,8 @@ impl Blocktree { pub fn find_missing_data_indexes( &self, - slot: u64, + slot: Slot, + first_timestamp: u64, start_index: u64, end_index: u64, max_missing: usize, @@ -1047,6 +1068,7 @@ impl Blocktree { Self::find_missing_indexes::( &mut db_iterator, slot, + first_timestamp, start_index, end_index, max_missing, @@ -1311,10 +1333,17 @@ fn update_slot_meta( slot_meta: &mut SlotMeta, index: u32, new_consumed: u64, + reference_tick: u8, ) { + let maybe_first_insert = slot_meta.received == 0; // Index is zero-indexed, while the "received" height starts from 1, // so received = index + 1 for the same shred. slot_meta.received = cmp::max((u64::from(index) + 1) as u64, slot_meta.received); + if maybe_first_insert && slot_meta.received > 0 { + // predict the timestamp of what would have been the first shred in this slot + let slot_time_elapsed = u64::from(reference_tick) * 1000 / DEFAULT_TICKS_PER_SECOND; + slot_meta.first_shred_timestamp = timestamp() - slot_time_elapsed; + } slot_meta.consumed = new_consumed; slot_meta.last_index = { // If the last index in the slot hasn't been set before, then @@ -1707,7 +1736,7 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Re let entries = create_ticks(ticks_per_slot, genesis_block.hash()); let last_hash = entries.last().unwrap().hash; - let shredder = Shredder::new(0, 0, 0.0, Arc::new(Keypair::new())) + let shredder = Shredder::new(0, 0, 0.0, Arc::new(Keypair::new()), 0) .expect("Failed to create entry shredder"); let shreds = shredder.entries_to_shreds(&entries, true, 0).0; assert!(shreds.last().unwrap().last_in_slot()); @@ -1792,7 +1821,7 @@ pub fn entries_to_test_shreds( parent_slot: u64, is_full_slot: bool, ) -> Vec { - let shredder = Shredder::new(slot, parent_slot, 0.0, Arc::new(Keypair::new())) + let shredder = Shredder::new(slot, parent_slot, 0.0, Arc::new(Keypair::new()), 0) .expect("Failed to create entry shredder"); shredder.entries_to_shreds(&entries, is_full_slot, 0).0 @@ -3163,27 +3192,27 @@ pub mod tests { // range of [0, gap) let expected: Vec = (1..gap).collect(); assert_eq!( - blocktree.find_missing_data_indexes(slot, 0, gap, gap as usize), + blocktree.find_missing_data_indexes(slot, 0, 0, gap, gap as usize), expected ); assert_eq!( - blocktree.find_missing_data_indexes(slot, 1, gap, (gap - 1) as usize), + blocktree.find_missing_data_indexes(slot, 0, 1, gap, (gap - 1) as usize), expected, ); assert_eq!( - blocktree.find_missing_data_indexes(slot, 0, gap - 1, (gap - 1) as usize), + blocktree.find_missing_data_indexes(slot, 0, 0, gap - 1, (gap - 1) as usize), &expected[..expected.len() - 1], ); assert_eq!( - blocktree.find_missing_data_indexes(slot, gap - 2, gap, gap as usize), + blocktree.find_missing_data_indexes(slot, 0, gap - 2, gap, gap as usize), vec![gap - 2, gap - 1], ); assert_eq!( - blocktree.find_missing_data_indexes(slot, gap - 2, gap, 1), + blocktree.find_missing_data_indexes(slot, 0, gap - 2, gap, 1), vec![gap - 2], ); assert_eq!( - blocktree.find_missing_data_indexes(slot, 0, gap, 1), + blocktree.find_missing_data_indexes(slot, 0, 0, gap, 1), vec![1], ); @@ -3192,11 +3221,11 @@ pub mod tests { let mut expected: Vec = (1..gap).collect(); expected.push(gap + 1); assert_eq!( - blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap + 2) as usize), + blocktree.find_missing_data_indexes(slot, 0, 0, gap + 2, (gap + 2) as usize), expected, ); assert_eq!( - blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap - 1) as usize), + blocktree.find_missing_data_indexes(slot, 0, 0, gap + 2, (gap - 1) as usize), &expected[..expected.len() - 1], ); @@ -3212,6 +3241,7 @@ pub mod tests { assert_eq!( blocktree.find_missing_data_indexes( slot, + 0, j * gap, i * gap, ((i - j) * gap) as usize @@ -3225,6 +3255,34 @@ pub mod tests { Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + #[test] + fn test_find_missing_data_indexes_timeout() { + let slot = 0; + let blocktree_path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + + // Write entries + let gap: u64 = 10; + let shreds: Vec<_> = (0..64) + .map(|i| Shred::new_from_data(slot, (i * gap) as u32, 0, None, false, false, i as u8)) + .collect(); + blocktree.insert_shreds(shreds, None).unwrap(); + + let empty: Vec = vec![]; + assert_eq!( + blocktree.find_missing_data_indexes(slot, timestamp(), 0, 50, 1), + empty + ); + let expected: Vec<_> = (1..=9).collect(); + assert_eq!( + blocktree.find_missing_data_indexes(slot, timestamp() - 400, 0, 50, 9), + expected + ); + + drop(blocktree); + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + #[test] fn test_find_missing_data_indexes_sanity() { let slot = 0; @@ -3234,10 +3292,10 @@ pub mod tests { // Early exit conditions let empty: Vec = vec![]; - assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 0, 1), empty); - assert_eq!(blocktree.find_missing_data_indexes(slot, 5, 5, 1), empty); - assert_eq!(blocktree.find_missing_data_indexes(slot, 4, 3, 1), empty); - assert_eq!(blocktree.find_missing_data_indexes(slot, 1, 2, 0), empty); + assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 0, 0, 1), empty); + assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 5, 5, 1), empty); + assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 4, 3, 1), empty); + assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 1, 2, 0), empty); let entries = create_ticks(100, Hash::default()); let mut shreds = entries_to_test_shreds(entries, slot, 0, true); @@ -3261,7 +3319,7 @@ pub mod tests { // [i, first_index - 1] for start in 0..STARTS { let result = blocktree.find_missing_data_indexes( - slot, start, // start + slot, 0, start, // start END, //end MAX, //max ); @@ -3291,7 +3349,7 @@ pub mod tests { for i in 0..num_shreds as u64 { for j in 0..i { assert_eq!( - blocktree.find_missing_data_indexes(slot, j, i, (i - j) as usize), + blocktree.find_missing_data_indexes(slot, 0, j, i, (i - j) as usize), empty ); } @@ -3816,6 +3874,7 @@ pub mod tests { Some(&[1, 1, 1]), true, true, + 0, )]; // With the corruption, nothing should be returned, even though an diff --git a/ledger/src/blocktree_meta.rs b/ledger/src/blocktree_meta.rs index 60774d9076..f97e34ac91 100644 --- a/ledger/src/blocktree_meta.rs +++ b/ledger/src/blocktree_meta.rs @@ -12,11 +12,13 @@ pub struct SlotMeta { // The total number of consecutive blobs starting from index 0 // we have received for this slot. pub consumed: u64, - // The index *plus one* of the highest blob received for this slot. Useful - // for checking if the slot has received any blobs yet, and to calculate the + // The index *plus one* of the highest shred received for this slot. Useful + // for checking if the slot has received any shreds yet, and to calculate the // range where there is one or more holes: `(consumed..received)`. pub received: u64, - // The index of the blob that is flagged as the last blob for this slot. + // The timestamp of the first time a shred was added for this slot + pub first_shred_timestamp: u64, + // The index of the shred that is flagged as the last shred for this slot. pub last_index: u64, // The slot height of the block this one derives from. pub parent_slot: u64, @@ -31,7 +33,7 @@ pub struct SlotMeta { } #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] -/// Index recording presence/absence of blobs +/// Index recording presence/absence of shreds pub struct Index { pub slot: u64, data: DataIndex, @@ -40,14 +42,14 @@ pub struct Index { #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] pub struct DataIndex { - /// Map representing presence/absence of data blobs + /// Map representing presence/absence of data shreds index: BTreeSet, } #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] /// Erasure coding information pub struct CodingIndex { - /// Map from set index, to hashmap from blob index to presence bool + /// Map from set index, to hashmap from shred index to presence bool index: BTreeSet, } @@ -145,8 +147,8 @@ impl DataIndex { impl SlotMeta { pub fn is_full(&self) -> bool { // last_index is std::u64::MAX when it has no information about how - // many blobs will fill this slot. - // Note: A full slot with zero blobs is not possible. + // many shreds will fill this slot. + // Note: A full slot with zero shreds is not possible. if self.last_index == std::u64::MAX { return false; } @@ -179,6 +181,7 @@ impl SlotMeta { slot, consumed: 0, received: 0, + first_shred_timestamp: 0, parent_slot, next_slots: vec![], is_connected: slot == 0, diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 70620cdbd5..03aa80fec1 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -8,12 +8,15 @@ use rayon::ThreadPool; use serde::{Deserialize, Serialize}; use solana_metrics::datapoint_debug; use solana_rayon_threadlimit::get_thread_count; -use solana_sdk::hash::Hash; -use solana_sdk::packet::PACKET_DATA_SIZE; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; -use std::sync::Arc; -use std::time::Instant; +use solana_sdk::{ + clock::Slot, + hash::Hash, + packet::PACKET_DATA_SIZE, + pubkey::Pubkey, + signature::{Keypair, KeypairUtil, Signature}, +}; +use std::mem::size_of; +use std::{sync::Arc, time::Instant}; /// The following constants are computed by hand, and hardcoded. /// `test_shred_constants` ensures that the values are correct. @@ -41,8 +44,9 @@ pub const CODING_SHRED: u8 = 0b0101_1010; pub const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 32; pub const RECOMMENDED_FEC_RATE: f32 = 1.0; -const LAST_SHRED_IN_SLOT: u8 = 0b0000_0001; -pub const DATA_COMPLETE_SHRED: u8 = 0b0000_0010; +pub const SHRED_TICK_REFERENCE_MASK: u8 = 0b0011_1111; +const LAST_SHRED_IN_SLOT: u8 = 0b1000_0000; +pub const DATA_COMPLETE_SHRED: u8 = 0b0100_0000; #[derive(Debug)] pub enum ShredError { @@ -131,6 +135,7 @@ impl Shred { data: Option<&[u8]>, is_last_data: bool, is_last_in_slot: bool, + reference_tick: u8, ) -> Self { let mut payload = vec![0; PACKET_DATA_SIZE]; let mut common_header = ShredCommonHeader::default(); @@ -139,6 +144,7 @@ impl Shred { let mut data_header = DataShredHeader::default(); data_header.parent_offset = parent_offset; + data_header.flags = reference_tick.min(SHRED_TICK_REFERENCE_MASK); if is_last_data { data_header.flags |= DATA_COMPLETE_SHRED @@ -324,6 +330,19 @@ impl Shred { } } + pub fn reference_tick(&self) -> u8 { + if self.is_data() { + self.data_header.flags & SHRED_TICK_REFERENCE_MASK + } else { + SHRED_TICK_REFERENCE_MASK + } + } + + pub fn reference_tick_from_data(data: &[u8]) -> u8 { + let flags = data[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER - size_of::()]; + flags & SHRED_TICK_REFERENCE_MASK + } + pub fn verify(&self, pubkey: &Pubkey) -> bool { self.signature() .verify(pubkey.as_ref(), &self.payload[SIZE_OF_SIGNATURE..]) @@ -337,10 +356,17 @@ pub struct Shredder { fec_rate: f32, keypair: Arc, pub signing_coding_time: u128, + reference_tick: u8, } impl Shredder { - pub fn new(slot: u64, parent_slot: u64, fec_rate: f32, keypair: Arc) -> Result { + pub fn new( + slot: Slot, + parent_slot: Slot, + fec_rate: f32, + keypair: Arc, + reference_tick: u8, + ) -> Result { if fec_rate > 1.0 || fec_rate < 0.0 { Err(ShredError::InvalidFecRate(fec_rate)) } else if slot < parent_slot || slot - parent_slot > u64::from(std::u16::MAX) { @@ -352,6 +378,7 @@ impl Shredder { fec_rate, keypair, signing_coding_time: 0, + reference_tick, }) } } @@ -395,6 +422,7 @@ impl Shredder { Some(shred_data), is_last_data, is_last_in_slot, + self.reference_tick, ); Shredder::sign_shred(&self.keypair, &mut shred); @@ -797,7 +825,7 @@ pub mod tests { // Test that parent cannot be > current slot assert_matches!( - Shredder::new(slot, slot + 1, 1.00, keypair.clone()), + Shredder::new(slot, slot + 1, 1.00, keypair.clone(), 0), Err(ShredError::SlotTooLow { slot: _, parent_slot: _, @@ -805,7 +833,7 @@ pub mod tests { ); // Test that slot - parent cannot be > u16 MAX assert_matches!( - Shredder::new(slot, slot - 1 - 0xffff, 1.00, keypair.clone()), + Shredder::new(slot, slot - 1 - 0xffff, 1.00, keypair.clone(), 0), Err(ShredError::SlotTooLow { slot: _, parent_slot: _, @@ -814,7 +842,7 @@ pub mod tests { let fec_rate = 0.25; let parent_slot = slot - 5; - let shredder = Shredder::new(slot, parent_slot, fec_rate, keypair.clone()) + let shredder = Shredder::new(slot, parent_slot, fec_rate, keypair.clone(), 0) .expect("Failed in creating shredder"); let entries: Vec<_> = (0..5) @@ -889,7 +917,7 @@ pub mod tests { let slot = 1; let parent_slot = 0; - let shredder = Shredder::new(slot, parent_slot, 0.0, keypair.clone()) + let shredder = Shredder::new(slot, parent_slot, 0.0, keypair.clone(), 0) .expect("Failed in creating shredder"); let entries: Vec<_> = (0..5) @@ -909,6 +937,72 @@ pub mod tests { assert_eq!(deserialized_shred, *data_shreds.last().unwrap()); } + #[test] + fn test_shred_reference_tick() { + let keypair = Arc::new(Keypair::new()); + let slot = 1; + + let parent_slot = 0; + let shredder = Shredder::new(slot, parent_slot, 0.0, keypair.clone(), 5) + .expect("Failed in creating shredder"); + + let entries: Vec<_> = (0..5) + .map(|_| { + let keypair0 = Keypair::new(); + let keypair1 = Keypair::new(); + let tx0 = + system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); + Entry::new(&Hash::default(), 1, vec![tx0]) + }) + .collect(); + + let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; + data_shreds.iter().for_each(|s| { + assert_eq!(s.reference_tick(), 5); + assert_eq!(Shred::reference_tick_from_data(&s.payload), 5); + }); + + let deserialized_shred = + Shred::new_from_serialized_shred(data_shreds.last().unwrap().payload.clone()).unwrap(); + assert_eq!(deserialized_shred.reference_tick(), 5); + } + + #[test] + fn test_shred_reference_tick_overflow() { + let keypair = Arc::new(Keypair::new()); + let slot = 1; + + let parent_slot = 0; + let shredder = Shredder::new(slot, parent_slot, 0.0, keypair.clone(), u8::max_value()) + .expect("Failed in creating shredder"); + + let entries: Vec<_> = (0..5) + .map(|_| { + let keypair0 = Keypair::new(); + let keypair1 = Keypair::new(); + let tx0 = + system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); + Entry::new(&Hash::default(), 1, vec![tx0]) + }) + .collect(); + + let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; + data_shreds.iter().for_each(|s| { + assert_eq!(s.reference_tick(), SHRED_TICK_REFERENCE_MASK); + assert_eq!( + Shred::reference_tick_from_data(&s.payload), + SHRED_TICK_REFERENCE_MASK + ); + }); + + let deserialized_shred = + Shred::new_from_serialized_shred(data_shreds.last().unwrap().payload.clone()).unwrap(); + assert_eq!( + deserialized_shred.reference_tick(), + SHRED_TICK_REFERENCE_MASK + ); + } + #[test] fn test_data_and_code_shredder() { let keypair = Arc::new(Keypair::new()); @@ -916,11 +1010,11 @@ pub mod tests { let slot = 0x123456789abcdef0; // Test that FEC rate cannot be > 1.0 assert_matches!( - Shredder::new(slot, slot - 5, 1.001, keypair.clone()), + Shredder::new(slot, slot - 5, 1.001, keypair.clone(), 0), Err(ShredError::InvalidFecRate(_)) ); - let shredder = Shredder::new(0x123456789abcdef0, slot - 5, 1.0, keypair.clone()) + let shredder = Shredder::new(0x123456789abcdef0, slot - 5, 1.0, keypair.clone(), 0) .expect("Failed in creating shredder"); // Create enough entries to make > 1 shred @@ -962,7 +1056,7 @@ pub mod tests { fn test_recovery_and_reassembly() { let keypair = Arc::new(Keypair::new()); let slot = 0x123456789abcdef0; - let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone()) + let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0) .expect("Failed in creating shredder"); let keypair0 = Keypair::new(); @@ -1208,7 +1302,7 @@ pub mod tests { fn test_multi_fec_block_coding() { let keypair = Arc::new(Keypair::new()); let slot = 0x123456789abcdef0; - let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone()) + let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0) .expect("Failed in creating shredder"); let num_fec_sets = 100;