From 57f8a15b965f7fc76d61afc2a63c34f725b9ed8d Mon Sep 17 00:00:00 2001 From: carllin Date: Fri, 24 May 2019 00:47:51 -0700 Subject: [PATCH] Fix issues in ClusterInfoRepairListener (#4418) * Sort repairmen before shuffling so order is the same across all validators * Reduce repair redundancy to 1 for now * Fix local cache of roots so that 1) Timestamps are only updated to acknowledge a repair was sent 2) Roots are updated even when timestamps aren't updated to keep in sync with network * Refactor code, add test --- core/src/cluster_info_repair_listener.rs | 168 ++++++++++++++++++----- 1 file changed, 135 insertions(+), 33 deletions(-) diff --git a/core/src/cluster_info_repair_listener.rs b/core/src/cluster_info_repair_listener.rs index dd7316cbc1..c35cab0ccf 100644 --- a/core/src/cluster_info_repair_listener.rs +++ b/core/src/cluster_info_repair_listener.rs @@ -10,7 +10,7 @@ use rand_chacha::ChaChaRng; use solana_metrics::datapoint; use solana_runtime::epoch_schedule::EpochSchedule; use solana_sdk::pubkey::Pubkey; -use std::cmp::min; +use std::cmp; use std::collections::HashMap; use std::mem; use std::net::SocketAddr; @@ -21,7 +21,7 @@ use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; pub const REPAIRMEN_SLEEP_MILLIS: usize = 100; -pub const REPAIR_REDUNDANCY: usize = 3; +pub const REPAIR_REDUNDANCY: usize = 1; pub const NUM_BUFFER_SLOTS: usize = 50; pub const GOSSIP_DELAY_SLOTS: usize = 2; pub const NUM_SLOTS_PER_UPDATE: usize = 2; @@ -87,7 +87,10 @@ impl ClusterInfoRepairListener { let thread = Builder::new() .name("solana-cluster_info_repair_listener".to_string()) .spawn(move || { - // Maps a peer to the last timestamp and root they gossiped + // Maps a peer to + // 1) The latest timestamp of the EpochSlots gossip message at which a repair was + // sent to this peer + // 2) The latest root the peer gossiped let mut peer_roots: HashMap = HashMap::new(); let _ = Self::recv_loop( &blocktree, @@ -125,31 +128,14 @@ impl ClusterInfoRepairListener { // Iterate through all the known nodes in the network, looking for ones that // need repairs for peer in peers { - let last_update_ts = Self::get_last_ts(peer.id, peer_roots); - let my_root = - Self::read_my_gossiped_root(&my_pubkey, cluster_info, &mut my_gossiped_root); - { - let r_cluster_info = cluster_info.read().unwrap(); - - // Update our local map with the updated peers' information - if let Some((peer_epoch_slots, ts)) = - r_cluster_info.get_epoch_state_for_node(&peer.id, last_update_ts) - { - // Following logic needs to be fast because it holds the lock - // preventing updates on gossip - if Self::should_repair_peer( - my_root, - peer_epoch_slots.root, - NUM_BUFFER_SLOTS, - ) { - // Only update our local timestamp for this peer if we are going to - // repair them - peer_roots.insert(peer.id, (ts, peer_epoch_slots.root)); - - // Clone out EpochSlots structure to avoid holding lock on gossip - peers_needing_repairs.insert(peer.id, peer_epoch_slots.clone()); - } - } + if let Some(repairee_epoch_slots) = Self::process_potential_repairee( + &my_pubkey, + &peer.id, + cluster_info, + peer_roots, + &mut my_gossiped_root, + ) { + peers_needing_repairs.insert(peer.id, repairee_epoch_slots); } } @@ -169,6 +155,46 @@ impl ClusterInfoRepairListener { } } + fn process_potential_repairee( + my_id: &Pubkey, + peer_id: &Pubkey, + cluster_info: &Arc>, + peer_roots: &mut HashMap, + my_gossiped_root: &mut u64, + ) -> Option { + let last_cached_repair_ts = Self::get_last_ts(peer_id, peer_roots); + let my_root = Self::read_my_gossiped_root(&my_id, cluster_info, my_gossiped_root); + { + let r_cluster_info = cluster_info.read().unwrap(); + + // Update our local map with the updated peers' information + if let Some((peer_epoch_slots, updated_ts)) = + r_cluster_info.get_epoch_state_for_node(&peer_id, last_cached_repair_ts) + { + let peer_entry = peer_roots.entry(*peer_id).or_default(); + let peer_root = cmp::max(peer_epoch_slots.root, peer_entry.1); + let mut result = None; + let last_repair_ts = { + // Following logic needs to be fast because it holds the lock + // preventing updates on gossip + if Self::should_repair_peer(my_root, peer_epoch_slots.root, NUM_BUFFER_SLOTS) { + // Clone out EpochSlots structure to avoid holding lock on gossip + result = Some(peer_epoch_slots.clone()); + updated_ts + } else { + // No repairs were sent, don't need to update the timestamp + peer_entry.0 + } + }; + + *peer_entry = (last_repair_ts, peer_root); + result + } else { + None + } + } + } + fn serve_repairs( my_pubkey: &Pubkey, blocktree: &Blocktree, @@ -341,9 +367,9 @@ impl ClusterInfoRepairListener { repair_redundancy: usize, ) -> Option { let total_blobs = num_blobs_in_slot * repair_redundancy; - let total_repairmen_for_slot = min(total_blobs, eligible_repairmen.len()); + let total_repairmen_for_slot = cmp::min(total_blobs, eligible_repairmen.len()); - let blobs_per_repairman = min( + let blobs_per_repairman = cmp::min( (total_blobs + total_repairmen_for_slot - 1) / total_repairmen_for_slot, num_blobs_in_slot, ); @@ -389,9 +415,11 @@ impl ClusterInfoRepairListener { .collect(); repairmen.push(my_pubkey); + repairmen.sort(); repairmen } + // Read my root out of gossip, and update the cached `old_root` fn read_my_gossiped_root( my_pubkey: &Pubkey, cluster_info: &Arc>, @@ -422,8 +450,8 @@ impl ClusterInfoRepairListener { repairman_root > repairee_root + num_buffer_slots as u64 } - fn get_last_ts(pubkey: Pubkey, peer_roots: &mut HashMap) -> Option { - peer_roots.get(&pubkey).map(|(last_ts, _)| *last_ts) + fn get_last_ts(pubkey: &Pubkey, peer_roots: &mut HashMap) -> Option { + peer_roots.get(pubkey).map(|(last_ts, _)| *last_ts) } } @@ -443,6 +471,7 @@ mod tests { use super::*; use crate::blocktree::get_tmp_ledger_path; use crate::blocktree::tests::make_many_slot_entries; + use crate::cluster_info::Node; use crate::packet::{Blob, SharedBlob}; use crate::streamer; use std::collections::BTreeSet; @@ -503,6 +532,79 @@ mod tests { } } + #[test] + fn test_process_potential_repairee() { + // Set up node ids + let my_id = Pubkey::new_rand(); + let peer_id = Pubkey::new_rand(); + + // Set up cluster_info + let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( + Node::new_localhost().info, + ))); + + // Push a repairee's epoch slots into cluster info + let repairee_root = 0; + let repairee_slots = BTreeSet::new(); + cluster_info.write().unwrap().push_epoch_slots( + peer_id, + repairee_root, + repairee_slots.clone(), + ); + + // Set up locally cached information + let mut peer_roots = HashMap::new(); + let mut my_gossiped_root = repairee_root; + + // Root is not sufficiently far ahead, we shouldn't repair + assert!(ClusterInfoRepairListener::process_potential_repairee( + &my_id, + &peer_id, + &cluster_info, + &mut peer_roots, + &mut my_gossiped_root, + ) + .is_none()); + + // Update the root to be sufficiently far ahead. A repair should now occur even if the + // object in gossip is not updated + my_gossiped_root = repairee_root + NUM_BUFFER_SLOTS as u64 + 1; + assert!(ClusterInfoRepairListener::process_potential_repairee( + &my_id, + &peer_id, + &cluster_info, + &mut peer_roots, + &mut my_gossiped_root, + ) + .is_some()); + + // An repair was already sent, so if gossip is not updated, no repair should be sent again, + // even if our root moves forward + my_gossiped_root += 4; + assert!(ClusterInfoRepairListener::process_potential_repairee( + &my_id, + &peer_id, + &cluster_info, + &mut peer_roots, + &mut my_gossiped_root, + ) + .is_none()); + + // Update the gossiped EpochSlots. Now a repair should be sent again + cluster_info + .write() + .unwrap() + .push_epoch_slots(peer_id, repairee_root, repairee_slots); + assert!(ClusterInfoRepairListener::process_potential_repairee( + &my_id, + &peer_id, + &cluster_info, + &mut peer_roots, + &mut my_gossiped_root, + ) + .is_some()); + } + #[test] fn test_serve_repairs_to_repairee() { let blocktree_path = get_tmp_ledger_path!(); @@ -840,7 +942,7 @@ mod tests { // 1) If there are a sufficient number of repairmen, then each blob should be sent // `repair_redundancy` OR `repair_redundancy + 1` times. - let num_expected_redundancy = min(num_repairmen, repair_redundancy); + let num_expected_redundancy = cmp::min(num_repairmen, repair_redundancy); for b in results.keys() { assert!( results[b] == num_expected_redundancy || results[b] == num_expected_redundancy + 1