diff --git a/Cargo.lock b/Cargo.lock index 57f14c7ae9..e7650a6caf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4626,6 +4626,7 @@ dependencies = [ "solana-transaction-status", "solana-version", "solana-vote-program", + "static_assertions", "symlink", "systemstat", "tempfile", diff --git a/core/Cargo.toml b/core/Cargo.toml index 6bc198786b..c752d0e253 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -82,6 +82,7 @@ serde_json = "1.0.56" serial_test = "0.5.1" solana-stake-program = { path = "../programs/stake", version = "=1.8.0" } solana-version = { path = "../version", version = "=1.8.0" } +static_assertions = "1.1.0" symlink = "0.1.0" systemstat = "0.1.8" tokio_02 = { version = "0.2", package = "tokio", features = ["full"] } diff --git a/core/src/cluster_slots.rs b/core/src/cluster_slots.rs index f5099f7720..0e47cc34f9 100644 --- a/core/src/cluster_slots.rs +++ b/core/src/cluster_slots.rs @@ -1,4 +1,4 @@ -use crate::serve_repair::RepairType; +use crate::serve_repair::ShredRepairType; use itertools::Itertools; use solana_gossip::{ cluster_info::ClusterInfo, contact_info::ContactInfo, crds::Cursor, epoch_slots::EpochSlots, @@ -186,14 +186,14 @@ impl ClusterSlots { &self, self_id: &Pubkey, root: Slot, - ) -> Vec { + ) -> Vec { let my_slots = self.collect(self_id); self.cluster_slots .read() .unwrap() .keys() .filter(|x| **x > root && !my_slots.contains(*x)) - .map(|x| RepairType::HighestShred(*x, 0)) + .map(|x| ShredRepairType::HighestShred(*x, 0)) .collect() } } @@ -390,7 +390,7 @@ mod tests { let self_id = solana_sdk::pubkey::new_rand(); assert_eq!( cs.generate_repairs_for_missing_slots(&self_id, 0), - vec![RepairType::HighestShred(1, 0)] + vec![ShredRepairType::HighestShred(1, 0)] ) } diff --git a/core/src/outstanding_requests.rs b/core/src/outstanding_requests.rs index fe542e92b1..27d6ba5d05 100644 --- a/core/src/outstanding_requests.rs +++ b/core/src/outstanding_requests.rs @@ -73,13 +73,13 @@ pub struct RequestStatus { #[cfg(test)] pub(crate) mod tests { use super::*; - use crate::serve_repair::RepairType; + use crate::serve_repair::ShredRepairType; use solana_ledger::shred::Shred; use solana_sdk::timing::timestamp; #[test] fn test_add_request() { - let repair_type = RepairType::Orphan(9); + let repair_type = ShredRepairType::Orphan(9); let mut outstanding_requests = OutstandingRequests::default(); let nonce = outstanding_requests.add_request(repair_type, timestamp()); let request_status = outstanding_requests.requests.get(&nonce).unwrap(); @@ -92,7 +92,7 @@ pub(crate) mod tests { #[test] fn test_timeout_expired_remove() { - let repair_type = RepairType::Orphan(9); + let repair_type = ShredRepairType::Orphan(9); let mut outstanding_requests = OutstandingRequests::default(); let nonce = outstanding_requests.add_request(repair_type, timestamp()); let shred = Shred::new_empty_data_shred(); @@ -109,7 +109,7 @@ pub(crate) mod tests { #[test] fn test_register_response() { - let repair_type = RepairType::Orphan(9); + let repair_type = ShredRepairType::Orphan(9); let mut outstanding_requests = OutstandingRequests::default(); let nonce = outstanding_requests.add_request(repair_type, timestamp()); diff --git a/core/src/repair_response.rs b/core/src/repair_response.rs index fef52628c7..ecd8b5d7c2 100644 --- a/core/src/repair_response.rs +++ b/core/src/repair_response.rs @@ -17,23 +17,23 @@ pub fn repair_response_packet( .get_data_shred(slot, shred_index) .expect("Blockstore could not get data shred"); shred - .map(|shred| repair_response_packet_from_shred(shred, dest, nonce)) + .map(|shred| repair_response_packet_from_bytes(shred, dest, nonce)) .unwrap_or(None) } -pub fn repair_response_packet_from_shred( - shred: Vec, +pub fn repair_response_packet_from_bytes( + bytes: Vec, dest: &SocketAddr, nonce: Nonce, ) -> Option { let mut packet = Packet::default(); - packet.meta.size = shred.len() + SIZE_OF_NONCE; + packet.meta.size = bytes.len() + SIZE_OF_NONCE; if packet.meta.size > packet.data.len() { return None; } packet.meta.set_addr(dest); - packet.data[..shred.len()].copy_from_slice(&shred); - let mut wr = io::Cursor::new(&mut packet.data[shred.len()..]); + packet.data[..bytes.len()].copy_from_slice(&bytes); + let mut wr = io::Cursor::new(&mut packet.data[bytes.len()..]); bincode::serialize_into(&mut wr, &nonce).expect("Buffer not large enough to fit nonce"); Some(packet) } @@ -77,7 +77,7 @@ mod test { Shredder::sign_shred(&keypair, &mut shred); trace!("signature {}", shred.common_header.signature); let nonce = 9; - let mut packet = repair_response_packet_from_shred( + let mut packet = repair_response_packet_from_bytes( shred.payload, &SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080), nonce, diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 9e781cfebf..8b5e2f1f91 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -7,7 +7,7 @@ use crate::{ repair_weight::RepairWeight, replay_stage::DUPLICATE_THRESHOLD, result::Result, - serve_repair::{RepairType, ServeRepair, REPAIR_PEERS_CACHE_CAPACITY}, + serve_repair::{ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY}, }; use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use lru::LruCache; @@ -40,8 +40,7 @@ pub type DuplicateSlotsResetSender = CrossbeamSender; pub type DuplicateSlotsResetReceiver = CrossbeamReceiver; pub type ConfirmedSlotsSender = CrossbeamSender>; pub type ConfirmedSlotsReceiver = CrossbeamReceiver>; - -pub type OutstandingRepairs = OutstandingRequests; +pub type OutstandingRepairs = OutstandingRequests; #[derive(Default, Debug)] pub struct SlotRepairs { @@ -366,9 +365,9 @@ impl RepairService { blockstore: &Blockstore, max_repairs: usize, repair_range: &RepairSlotRange, - ) -> Result> { + ) -> Result> { // Slot height and shred indexes for shreds we want to repair - let mut repairs: Vec = vec![]; + let mut repairs: Vec = vec![]; for slot in repair_range.start..=repair_range.end { if repairs.len() >= max_repairs { break; @@ -399,11 +398,11 @@ impl RepairService { slot: Slot, slot_meta: &SlotMeta, max_repairs: usize, - ) -> Vec { + ) -> Vec { if max_repairs == 0 || slot_meta.is_full() { vec![] } else if slot_meta.consumed == slot_meta.received { - vec![RepairType::HighestShred(slot, slot_meta.received)] + vec![ShredRepairType::HighestShred(slot, slot_meta.received)] } else { let reqs = blockstore.find_missing_data_indexes( slot, @@ -413,7 +412,7 @@ impl RepairService { max_repairs, ); reqs.into_iter() - .map(|i| RepairType::Shred(slot, i)) + .map(|i| ShredRepairType::Shred(slot, i)) .collect() } } @@ -421,7 +420,7 @@ impl RepairService { /// Repairs any fork starting at the input slot pub fn generate_repairs_for_fork<'a>( blockstore: &Blockstore, - repairs: &mut Vec, + repairs: &mut Vec, max_repairs: usize, slot: Slot, duplicate_slot_repair_statuses: &impl Contains<'a, Slot>, @@ -453,7 +452,7 @@ impl RepairService { fn generate_duplicate_repairs_for_slot( blockstore: &Blockstore, slot: Slot, - ) -> Option> { + ) -> Option> { if let Some(slot_meta) = blockstore.meta(slot).unwrap() { if slot_meta.is_full() { // If the slot is full, no further need to repair this slot @@ -527,7 +526,7 @@ impl RepairService { #[allow(dead_code)] fn serialize_and_send_request( - repair_type: &RepairType, + repair_type: &ShredRepairType, repair_socket: &UdpSocket, repair_pubkey: &Pubkey, to: &SocketAddr, @@ -709,7 +708,10 @@ mod test { &HashSet::default(), None, ), - vec![RepairType::Orphan(2), RepairType::HighestShred(0, 0)] + vec![ + ShredRepairType::Orphan(2), + ShredRepairType::HighestShred(0, 0) + ] ); } @@ -740,7 +742,7 @@ mod test { &HashSet::default(), None ), - vec![RepairType::HighestShred(0, 0)] + vec![ShredRepairType::HighestShred(0, 0)] ); } Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); @@ -776,11 +778,11 @@ mod test { .unwrap(); // sleep so that the holes are ready for repair sleep(Duration::from_secs(1)); - let expected: Vec = (0..num_slots) + let expected: Vec = (0..num_slots) .flat_map(|slot| { missing_indexes_per_slot .iter() - .map(move |shred_index| RepairType::Shred(slot as u64, *shred_index)) + .map(move |shred_index| ShredRepairType::Shred(slot as u64, *shred_index)) }) .collect(); @@ -832,8 +834,8 @@ mod test { blockstore.insert_shreds(shreds, None, false).unwrap(); // We didn't get the last shred for this slot, so ask for the highest shred for that slot - let expected: Vec = - vec![RepairType::HighestShred(0, num_shreds_per_slot - 1)]; + let expected: Vec = + vec![ShredRepairType::HighestShred(0, num_shreds_per_slot - 1)]; let mut repair_weight = RepairWeight::new(0); assert_eq!( @@ -876,13 +878,13 @@ mod test { start: slots[start], end: slots[end], }; - let expected: Vec = (repair_slot_range.start + let expected: Vec = (repair_slot_range.start ..=repair_slot_range.end) .map(|slot_index| { if slots.contains(&(slot_index as u64)) { - RepairType::Shred(slot_index as u64, 0) + ShredRepairType::Shred(slot_index as u64, 0) } else { - RepairType::HighestShred(slot_index as u64, 0) + ShredRepairType::HighestShred(slot_index as u64, 0) } }) .collect(); @@ -922,10 +924,10 @@ mod test { } let end = 4; - let expected: Vec = vec![ - RepairType::HighestShred(end - 2, 0), - RepairType::HighestShred(end - 1, 0), - RepairType::HighestShred(end, 0), + let expected: Vec = vec![ + ShredRepairType::HighestShred(end - 2, 0), + ShredRepairType::HighestShred(end - 1, 0), + ShredRepairType::HighestShred(end, 0), ]; let repair_slot_range = RepairSlotRange { start: 2, end }; diff --git a/core/src/repair_weight.rs b/core/src/repair_weight.rs index fe080518a5..dad4e18eaa 100644 --- a/core/src/repair_weight.rs +++ b/core/src/repair_weight.rs @@ -1,6 +1,6 @@ use crate::{ heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, repair_service::RepairTiming, - repair_weighted_traversal, serve_repair::RepairType, tree_diff::TreeDiff, + repair_weighted_traversal, serve_repair::ShredRepairType, tree_diff::TreeDiff, }; use solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore}; use solana_measure::measure::Measure; @@ -147,7 +147,7 @@ impl RepairWeight { max_new_shreds: usize, ignore_slots: &impl Contains<'a, Slot>, repair_timing: Option<&mut RepairTiming>, - ) -> Vec { + ) -> Vec { let mut repairs = vec![]; let mut get_best_orphans_elapsed = Measure::start("get_best_orphans"); // Update the orphans in order from heaviest to least heavy @@ -248,7 +248,7 @@ impl RepairWeight { fn get_best_shreds<'a>( &mut self, blockstore: &Blockstore, - repairs: &mut Vec, + repairs: &mut Vec, max_new_shreds: usize, ignore_slots: &impl Contains<'a, Slot>, ) { @@ -265,7 +265,7 @@ impl RepairWeight { fn get_best_orphans( &mut self, blockstore: &Blockstore, - repairs: &mut Vec, + repairs: &mut Vec, epoch_stakes: &HashMap, epoch_schedule: &EpochSchedule, max_new_orphans: usize, @@ -306,7 +306,7 @@ impl RepairWeight { if let Some(new_orphan_root) = new_orphan_root { if new_orphan_root != self.root && !best_orphans.contains(&new_orphan_root) { best_orphans.insert(new_orphan_root); - repairs.push(RepairType::Orphan(new_orphan_root)); + repairs.push(ShredRepairType::Orphan(new_orphan_root)); } } } @@ -317,7 +317,7 @@ impl RepairWeight { if best_orphans.len() < max_new_orphans { for new_orphan in blockstore.orphans_iterator(self.root + 1).unwrap() { if !best_orphans.contains(&new_orphan) { - repairs.push(RepairType::Orphan(new_orphan)); + repairs.push(ShredRepairType::Orphan(new_orphan)); best_orphans.insert(new_orphan); } diff --git a/core/src/repair_weighted_traversal.rs b/core/src/repair_weighted_traversal.rs index 8b6cd0ceb4..80c27c8020 100644 --- a/core/src/repair_weighted_traversal.rs +++ b/core/src/repair_weighted_traversal.rs @@ -1,6 +1,6 @@ use crate::{ heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, repair_service::RepairService, - serve_repair::RepairType, tree_diff::TreeDiff, + serve_repair::ShredRepairType, tree_diff::TreeDiff, }; use solana_ledger::blockstore::Blockstore; use solana_runtime::contains::Contains; @@ -73,7 +73,7 @@ impl<'a> Iterator for RepairWeightTraversal<'a> { pub fn get_best_repair_shreds<'a>( tree: &HeaviestSubtreeForkChoice, blockstore: &Blockstore, - repairs: &mut Vec, + repairs: &mut Vec, max_new_shreds: usize, ignore_slots: &impl Contains<'a, Slot>, ) { @@ -227,7 +227,7 @@ pub mod test { repairs, [0, 1, 2, 4, 3, 5] .iter() - .map(|slot| RepairType::HighestShred(*slot, last_shred)) + .map(|slot| ShredRepairType::HighestShred(*slot, last_shred)) .collect::>() ); @@ -254,7 +254,7 @@ pub mod test { repairs, [0, 1, 2, 4, 6, 7] .iter() - .map(|slot| RepairType::HighestShred(*slot, last_shred)) + .map(|slot| ShredRepairType::HighestShred(*slot, last_shred)) .collect::>() ); @@ -289,7 +289,7 @@ pub mod test { repairs, [1, 7, 3, 5] .iter() - .map(|slot| RepairType::HighestShred(*slot, last_shred)) + .map(|slot| ShredRepairType::HighestShred(*slot, last_shred)) .collect::>() ); @@ -308,7 +308,7 @@ pub mod test { repairs, [1, 7, 8, 3] .iter() - .map(|slot| RepairType::HighestShred(*slot, last_shred)) + .map(|slot| ShredRepairType::HighestShred(*slot, last_shred)) .collect::>() ); } @@ -332,7 +332,7 @@ pub mod test { repairs, [0, 1, 2, 4, 6, 7, 3, 5] .iter() - .map(|slot| RepairType::HighestShred(*slot, last_shred)) + .map(|slot| ShredRepairType::HighestShred(*slot, last_shred)) .collect::>() ); } @@ -357,7 +357,7 @@ pub mod test { repairs, [0, 2, 4, 5] .iter() - .map(|slot| RepairType::HighestShred(*slot, last_shred)) + .map(|slot| ShredRepairType::HighestShred(*slot, last_shred)) .collect::>() ); @@ -377,7 +377,7 @@ pub mod test { repairs, [0, 4, 6, 7, 5] .iter() - .map(|slot| RepairType::HighestShred(*slot, last_shred)) + .map(|slot| ShredRepairType::HighestShred(*slot, last_shred)) .collect::>() ); @@ -396,7 +396,7 @@ pub mod test { repairs, [0, 4, 5] .iter() - .map(|slot| RepairType::HighestShred(*slot, last_shred)) + .map(|slot| ShredRepairType::HighestShred(*slot, last_shred)) .collect::>() ); } diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 0740f651cf..5de8e5efa3 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -17,13 +17,16 @@ use solana_gossip::{ weighted_shuffle::weighted_best, }; use solana_ledger::{ + ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash}, blockstore::Blockstore, - shred::{Nonce, Shred}, + shred::{Nonce, Shred, SIZE_OF_NONCE}, }; use solana_measure::measure::Measure; use solana_metrics::inc_new_counter_debug; use solana_perf::packet::{limited_deserialize, Packets, PacketsRecycler}; -use solana_sdk::{clock::Slot, pubkey::Pubkey, timing::duration_as_ms}; +use solana_sdk::{ + clock::Slot, hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms, +}; use solana_streamer::streamer::{PacketReceiver, PacketSender}; use std::{ collections::HashSet, @@ -34,52 +37,99 @@ use std::{ time::{Duration, Instant}, }; +type SlotHash = (Slot, Hash); + /// the number of slots to respond with when responding to `Orphan` requests pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10; // Number of slots to cache their respective repair peers and sampling weights. pub(crate) const REPAIR_PEERS_CACHE_CAPACITY: usize = 128; // Limit cache entries ttl in order to avoid re-using outdated data. const REPAIR_PEERS_CACHE_TTL: Duration = Duration::from_secs(10); +pub const MAX_ANCESTOR_BYTES_IN_PACKET: usize = + PACKET_DATA_SIZE - + SIZE_OF_NONCE - + 4 /*(response version enum discriminator)*/ - + 4 /*slot_hash length*/; +pub const MAX_ANCESTOR_RESPONSES: usize = + MAX_ANCESTOR_BYTES_IN_PACKET / std::mem::size_of::(); +#[cfg(test)] +static_assertions::const_assert_eq!(MAX_ANCESTOR_RESPONSES, 30); #[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)] -pub enum RepairType { +pub enum ShredRepairType { Orphan(Slot), HighestShred(Slot, u64), Shred(Slot, u64), } -impl RepairType { +impl ShredRepairType { pub fn slot(&self) -> Slot { match self { - RepairType::Orphan(slot) => *slot, - RepairType::HighestShred(slot, _) => *slot, - RepairType::Shred(slot, _) => *slot, + ShredRepairType::Orphan(slot) => *slot, + ShredRepairType::HighestShred(slot, _) => *slot, + ShredRepairType::Shred(slot, _) => *slot, } } } -impl RequestResponse for RepairType { +impl RequestResponse for ShredRepairType { type Response = Shred; fn num_expected_responses(&self) -> u32 { match self { - RepairType::Orphan(_) => (MAX_ORPHAN_REPAIR_RESPONSES + 1) as u32, // run_orphan uses <= MAX_ORPHAN_REPAIR_RESPONSES - RepairType::HighestShred(_, _) => 1, - RepairType::Shred(_, _) => 1, + ShredRepairType::Orphan(_) => (MAX_ORPHAN_REPAIR_RESPONSES + 1) as u32, // run_orphan uses <= MAX_ORPHAN_REPAIR_RESPONSES + ShredRepairType::HighestShred(_, _) => 1, + ShredRepairType::Shred(_, _) => 1, } } fn verify_response(&self, response_shred: &Shred) -> bool { match self { - RepairType::Orphan(slot) => response_shred.slot() <= *slot, - RepairType::HighestShred(slot, index) => { + ShredRepairType::Orphan(slot) => response_shred.slot() <= *slot, + ShredRepairType::HighestShred(slot, index) => { response_shred.slot() as u64 == *slot && response_shred.index() as u64 >= *index } - RepairType::Shred(slot, index) => { + ShredRepairType::Shred(slot, index) => { response_shred.slot() as u64 == *slot && response_shred.index() as u64 == *index } } } } +pub struct AncestorHashesRepair(Slot); +#[derive(Serialize, Deserialize)] +pub enum AncestorHashesResponseVersion { + Current(Vec), +} +impl AncestorHashesResponseVersion { + #[cfg(test)] + fn into_slot_hashes(self) -> Vec { + match self { + AncestorHashesResponseVersion::Current(slot_hashes) => slot_hashes, + } + } + + fn slot_hashes(&self) -> &[SlotHash] { + match self { + AncestorHashesResponseVersion::Current(slot_hashes) => slot_hashes, + } + } + + fn max_ancestors_in_response(&self) -> usize { + match self { + AncestorHashesResponseVersion::Current(_) => MAX_ANCESTOR_RESPONSES, + } + } +} + +impl RequestResponse for AncestorHashesRepair { + type Response = AncestorHashesResponseVersion; + fn num_expected_responses(&self) -> u32 { + 1 + } + fn verify_response(&self, response: &AncestorHashesResponseVersion) -> bool { + response.slot_hashes().len() <= response.max_ancestors_in_response() + } +} + #[derive(Default)] pub struct ServeRepairStats { pub total_packets: usize, @@ -89,17 +139,19 @@ pub struct ServeRepairStats { pub window_index: usize, pub highest_window_index: usize, pub orphan: usize, + pub ancestor_hashes: usize, } /// Window protocol messages #[derive(Serialize, Deserialize, Debug)] pub enum RepairProtocol { - WindowIndex(ContactInfo, u64, u64), - HighestWindowIndex(ContactInfo, u64, u64), - Orphan(ContactInfo, u64), - WindowIndexWithNonce(ContactInfo, u64, u64, Nonce), - HighestWindowIndexWithNonce(ContactInfo, u64, u64, Nonce), - OrphanWithNonce(ContactInfo, u64, Nonce), + WindowIndex(ContactInfo, Slot, u64), + HighestWindowIndex(ContactInfo, Slot, u64), + Orphan(ContactInfo, Slot), + WindowIndexWithNonce(ContactInfo, Slot, u64, Nonce), + HighestWindowIndexWithNonce(ContactInfo, Slot, u64, Nonce), + OrphanWithNonce(ContactInfo, Slot, Nonce), + AncestorHashes(ContactInfo, Slot, Nonce), } #[derive(Clone)] @@ -168,6 +220,7 @@ impl ServeRepair { RepairProtocol::WindowIndexWithNonce(ref from, _, _, _) => from, RepairProtocol::HighestWindowIndexWithNonce(ref from, _, _, _) => from, RepairProtocol::OrphanWithNonce(ref from, _, _) => from, + RepairProtocol::AncestorHashes(ref from, _, _) => from, } } @@ -235,6 +288,13 @@ impl ServeRepair { "OrphanWithNonce", ) } + RepairProtocol::AncestorHashes(_, slot, nonce) => { + stats.ancestor_hashes += 1; + ( + Self::run_ancestor_hashes(recycler, from_addr, blockstore, *slot, *nonce), + "AncestorHashes", + ) + } _ => (None, "Unsupported repair type"), } }; @@ -319,7 +379,10 @@ impl ServeRepair { stats.highest_window_index ); inc_new_counter_debug!("serve_repair-request-orphan", stats.orphan); - + inc_new_counter_debug!( + "serve_repair-request-ancestor-hashes", + stats.ancestor_hashes + ); *stats = ServeRepairStats::default(); } @@ -420,7 +483,7 @@ impl ServeRepair { pub(crate) fn repair_request( &self, cluster_slots: &ClusterSlots, - repair_request: RepairType, + repair_request: ShredRepairType, peers_cache: &mut LruCache, repair_stats: &mut RepairStats, repair_validators: &Option>, @@ -464,25 +527,25 @@ impl ServeRepair { pub fn map_repair_request( &self, - repair_request: &RepairType, + repair_request: &ShredRepairType, repair_peer_id: &Pubkey, repair_stats: &mut RepairStats, nonce: Nonce, ) -> Result> { match repair_request { - RepairType::Shred(slot, shred_index) => { + ShredRepairType::Shred(slot, shred_index) => { repair_stats .shred .update(repair_peer_id, *slot, *shred_index); Ok(self.window_index_request_bytes(*slot, *shred_index, nonce)?) } - RepairType::HighestShred(slot, shred_index) => { + ShredRepairType::HighestShred(slot, shred_index) => { repair_stats .highest_shred .update(repair_peer_id, *slot, *shred_index); Ok(self.window_highest_index_request_bytes(*slot, *shred_index, nonce)?) } - RepairType::Orphan(slot) => { + ShredRepairType::Orphan(slot) => { repair_stats.orphan.update(repair_peer_id, *slot, 0); Ok(self.orphan_bytes(*slot, nonce)?) } @@ -620,6 +683,40 @@ impl ServeRepair { } Some(res) } + + fn run_ancestor_hashes( + recycler: &PacketsRecycler, + from_addr: &SocketAddr, + blockstore: Option<&Arc>, + slot: Slot, + nonce: Nonce, + ) -> Option { + let blockstore = blockstore?; + let ancestor_slot_hashes = if blockstore.is_duplicate_confirmed(slot) { + let ancestor_iterator = + AncestorIteratorWithHash::from(AncestorIterator::new_inclusive(slot, blockstore)); + ancestor_iterator.take(MAX_ANCESTOR_RESPONSES).collect() + } else { + // If this slot is not duplicate confirmed, return nothing + vec![] + }; + let response = AncestorHashesResponseVersion::Current(ancestor_slot_hashes); + let serialized_response = serialize(&response).ok()?; + + // Could probably directly write response into packet via `serialize_into()` + // instead of incurring extra copy in `repair_response_packet_from_bytes`, but + // serialize_into doesn't return the written size... + let packet = repair_response::repair_response_packet_from_bytes( + serialized_response, + from_addr, + nonce, + )?; + Some(Packets::new_unpinned_with_recycler_data( + recycler, + "run_ancestor_hashes", + vec![packet], + )) + } } #[cfg(test)] @@ -676,7 +773,7 @@ mod tests { nonce, ) .expect("packets"); - let request = RepairType::HighestShred(slot, index); + let request = ShredRepairType::HighestShred(slot, index); verify_responses(&request, rv.packets.iter()); let rv: Vec = rv @@ -762,7 +859,7 @@ mod tests { nonce, ) .expect("packets"); - let request = RepairType::Shred(slot, index); + let request = ShredRepairType::Shred(slot, index); verify_responses(&request, rv.packets.iter()); let rv: Vec = rv .packets @@ -788,7 +885,7 @@ mod tests { let mut outstanding_requests = OutstandingRepairs::default(); let rv = serve_repair.repair_request( &cluster_slots, - RepairType::Shred(0, 0), + ShredRepairType::Shred(0, 0), &mut LruCache::new(100), &mut RepairStats::default(), &None, @@ -816,7 +913,7 @@ mod tests { let rv = serve_repair .repair_request( &cluster_slots, - RepairType::Shred(0, 0), + ShredRepairType::Shred(0, 0), &mut LruCache::new(100), &mut RepairStats::default(), &None, @@ -850,7 +947,7 @@ mod tests { let rv = serve_repair .repair_request( &cluster_slots, - RepairType::Shred(0, 0), + ShredRepairType::Shred(0, 0), &mut LruCache::new(100), &mut RepairStats::default(), &None, @@ -923,7 +1020,7 @@ mod tests { .collect(); // Verify responses - let request = RepairType::Orphan(slot); + let request = ShredRepairType::Orphan(slot); verify_responses(&request, rv.iter()); let expected: Vec<_> = (slot..slot + num_slots) @@ -1006,6 +1103,89 @@ mod tests { Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); } + #[test] + fn test_run_ancestor_hashes() { + solana_logger::setup(); + let recycler = PacketsRecycler::default(); + let ledger_path = get_tmp_ledger_path!(); + { + let slot = 0; + let num_slots = MAX_ANCESTOR_RESPONSES as u64; + let nonce = 10; + + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + + // Create slots [slot, slot + num_slots) with 5 shreds apiece + let (shreds, _) = make_many_slot_entries(slot, num_slots, 5); + + blockstore + .insert_shreds(shreds, None, false) + .expect("Expect successful ledger write"); + + // We don't have slot `slot + num_slots`, so we return empty + let rv = ServeRepair::run_ancestor_hashes( + &recycler, + &socketaddr_any!(), + Some(&blockstore), + slot + num_slots, + nonce, + ) + .expect("run_ancestor_hashes packets") + .packets; + assert_eq!(rv.len(), 1); + let packet = &rv[0]; + let ancestor_hashes_response: AncestorHashesResponseVersion = + limited_deserialize(&packet.data[..packet.meta.size - SIZE_OF_NONCE]).unwrap(); + assert!(ancestor_hashes_response.into_slot_hashes().is_empty()); + + // `slot + num_slots - 1` is not marked duplicate confirmed so nothing should return + // empty + let rv = ServeRepair::run_ancestor_hashes( + &recycler, + &socketaddr_any!(), + Some(&blockstore), + slot + num_slots - 1, + nonce, + ) + .expect("run_ancestor_hashes packets") + .packets; + assert_eq!(rv.len(), 1); + let packet = &rv[0]; + let ancestor_hashes_response: AncestorHashesResponseVersion = + limited_deserialize(&packet.data[..packet.meta.size - SIZE_OF_NONCE]).unwrap(); + assert!(ancestor_hashes_response.into_slot_hashes().is_empty()); + + // Set duplicate confirmed + let mut expected_ancestors = Vec::with_capacity(num_slots as usize); + expected_ancestors.resize(num_slots as usize, (0, Hash::default())); + for (i, duplicate_confirmed_slot) in (slot..slot + num_slots).enumerate() { + let frozen_hash = Hash::new_unique(); + expected_ancestors[num_slots as usize - i - 1] = + (duplicate_confirmed_slot, frozen_hash); + blockstore.insert_bank_hash(duplicate_confirmed_slot, frozen_hash, true); + } + let rv = ServeRepair::run_ancestor_hashes( + &recycler, + &socketaddr_any!(), + Some(&blockstore), + slot + num_slots - 1, + nonce, + ) + .expect("run_ancestor_hashes packets") + .packets; + assert_eq!(rv.len(), 1); + let packet = &rv[0]; + let ancestor_hashes_response: AncestorHashesResponseVersion = + limited_deserialize(&packet.data[..packet.meta.size - SIZE_OF_NONCE]).unwrap(); + assert_eq!( + ancestor_hashes_response.into_slot_hashes(), + expected_ancestors + ); + } + + Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); + } + #[test] fn test_repair_with_repair_validators() { let cluster_slots = ClusterSlots::default(); @@ -1031,7 +1211,7 @@ mod tests { assert!(serve_repair .repair_request( &cluster_slots, - RepairType::Shred(0, 0), + ShredRepairType::Shred(0, 0), &mut LruCache::new(100), &mut RepairStats::default(), &trusted_validators, @@ -1048,7 +1228,7 @@ mod tests { assert!(serve_repair .repair_request( &cluster_slots, - RepairType::Shred(0, 0), + ShredRepairType::Shred(0, 0), &mut LruCache::new(100), &mut RepairStats::default(), &trusted_validators, @@ -1069,7 +1249,7 @@ mod tests { assert!(serve_repair .repair_request( &cluster_slots, - RepairType::Shred(0, 0), + ShredRepairType::Shred(0, 0), &mut LruCache::new(100), &mut RepairStats::default(), &None, @@ -1079,13 +1259,13 @@ mod tests { } #[test] - fn test_verify_response() { - let repair = RepairType::Orphan(9); + fn test_verify_shred_response() { + let repair = ShredRepairType::Orphan(9); // Ensure new options are addded to this test match repair { - RepairType::Orphan(_) => (), - RepairType::HighestShred(_, _) => (), - RepairType::Shred(_, _) => (), + ShredRepairType::Orphan(_) => (), + ShredRepairType::HighestShred(_, _) => (), + ShredRepairType::Shred(_, _) => (), }; let slot = 9; @@ -1094,7 +1274,7 @@ mod tests { // Orphan let mut shred = Shred::new_empty_data_shred(); shred.set_slot(slot); - let request = RepairType::Orphan(slot); + let request = ShredRepairType::Orphan(slot); assert!(request.verify_response(&shred)); shred.set_slot(slot - 1); assert!(request.verify_response(&shred)); @@ -1105,7 +1285,7 @@ mod tests { shred = Shred::new_empty_data_shred(); shred.set_slot(slot); shred.set_index(index); - let request = RepairType::HighestShred(slot, index as u64); + let request = ShredRepairType::HighestShred(slot, index as u64); assert!(request.verify_response(&shred)); shred.set_index(index + 1); assert!(request.verify_response(&shred)); @@ -1121,7 +1301,7 @@ mod tests { shred = Shred::new_empty_data_shred(); shred.set_slot(slot); shred.set_index(index); - let request = RepairType::Shred(slot, index as u64); + let request = ShredRepairType::Shred(slot, index as u64); assert!(request.verify_response(&shred)); shred.set_index(index + 1); assert!(!request.verify_response(&shred)); @@ -1130,11 +1310,26 @@ mod tests { assert!(!request.verify_response(&shred)); } - fn verify_responses<'a>(request: &RepairType, packets: impl Iterator) { + fn verify_responses<'a>(request: &ShredRepairType, packets: impl Iterator) { for packet in packets { let shred_payload = packet.data.to_vec(); let shred = Shred::new_from_serialized_shred(shred_payload).unwrap(); request.verify_response(&shred); } } + + #[test] + fn test_verify_ancestor_response() { + let request_slot = MAX_ANCESTOR_RESPONSES as Slot; + let repair = AncestorHashesRepair(request_slot); + let mut response: Vec = (0..request_slot) + .into_iter() + .map(|slot| (slot, Hash::new_unique())) + .collect(); + assert!(repair.verify_response(&AncestorHashesResponseVersion::Current(response.clone()))); + + // over the allowed limit, should fail + response.push((request_slot, Hash::new_unique())); + assert!(!repair.verify_response(&AncestorHashesResponseVersion::Current(response))); + } } diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 7530147379..0a5621f2bf 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -806,7 +806,7 @@ mod test { #[test] fn test_prune_shreds() { - use crate::serve_repair::RepairType; + use crate::serve_repair::ShredRepairType; use std::net::{IpAddr, Ipv4Addr}; solana_logger::setup(); let (common, coding) = Shredder::new_coding_shred_header(5, 5, 5, 6, 6, 0); @@ -818,7 +818,7 @@ mod test { nonce: 0, }; let outstanding_requests = Arc::new(RwLock::new(OutstandingRepairs::default())); - let repair_type = RepairType::Orphan(9); + let repair_type = ShredRepairType::Orphan(9); let nonce = outstanding_requests .write() .unwrap()