Fix issues in ClusterInfoRepairListener (#4418)

* Sort repairmen before shuffling so order is the same across all validators

* Reduce repair redundancy to 1 for now

* Fix local cache of roots so that 1) Timestamps are only updated to acknowledge a repair was sent 2) Roots are updated even when timestamps aren't updated to keep in sync with network

* Refactor code, add test
This commit is contained in:
carllin
2019-05-24 00:47:51 -07:00
committed by GitHub
parent cfe5afd34c
commit 57f8a15b96

View File

@ -10,7 +10,7 @@ use rand_chacha::ChaChaRng;
use solana_metrics::datapoint; use solana_metrics::datapoint;
use solana_runtime::epoch_schedule::EpochSchedule; use solana_runtime::epoch_schedule::EpochSchedule;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::cmp::min; use std::cmp;
use std::collections::HashMap; use std::collections::HashMap;
use std::mem; use std::mem;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -21,7 +21,7 @@ use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
pub const REPAIRMEN_SLEEP_MILLIS: usize = 100; pub const REPAIRMEN_SLEEP_MILLIS: usize = 100;
pub const REPAIR_REDUNDANCY: usize = 3; pub const REPAIR_REDUNDANCY: usize = 1;
pub const NUM_BUFFER_SLOTS: usize = 50; pub const NUM_BUFFER_SLOTS: usize = 50;
pub const GOSSIP_DELAY_SLOTS: usize = 2; pub const GOSSIP_DELAY_SLOTS: usize = 2;
pub const NUM_SLOTS_PER_UPDATE: usize = 2; pub const NUM_SLOTS_PER_UPDATE: usize = 2;
@ -87,7 +87,10 @@ impl ClusterInfoRepairListener {
let thread = Builder::new() let thread = Builder::new()
.name("solana-cluster_info_repair_listener".to_string()) .name("solana-cluster_info_repair_listener".to_string())
.spawn(move || { .spawn(move || {
// Maps a peer to the last timestamp and root they gossiped // Maps a peer to
// 1) The latest timestamp of the EpochSlots gossip message at which a repair was
// sent to this peer
// 2) The latest root the peer gossiped
let mut peer_roots: HashMap<Pubkey, (u64, u64)> = HashMap::new(); let mut peer_roots: HashMap<Pubkey, (u64, u64)> = HashMap::new();
let _ = Self::recv_loop( let _ = Self::recv_loop(
&blocktree, &blocktree,
@ -125,31 +128,14 @@ impl ClusterInfoRepairListener {
// Iterate through all the known nodes in the network, looking for ones that // Iterate through all the known nodes in the network, looking for ones that
// need repairs // need repairs
for peer in peers { for peer in peers {
let last_update_ts = Self::get_last_ts(peer.id, peer_roots); if let Some(repairee_epoch_slots) = Self::process_potential_repairee(
let my_root = &my_pubkey,
Self::read_my_gossiped_root(&my_pubkey, cluster_info, &mut my_gossiped_root); &peer.id,
{ cluster_info,
let r_cluster_info = cluster_info.read().unwrap(); peer_roots,
&mut my_gossiped_root,
// Update our local map with the updated peers' information ) {
if let Some((peer_epoch_slots, ts)) = peers_needing_repairs.insert(peer.id, repairee_epoch_slots);
r_cluster_info.get_epoch_state_for_node(&peer.id, last_update_ts)
{
// Following logic needs to be fast because it holds the lock
// preventing updates on gossip
if Self::should_repair_peer(
my_root,
peer_epoch_slots.root,
NUM_BUFFER_SLOTS,
) {
// Only update our local timestamp for this peer if we are going to
// repair them
peer_roots.insert(peer.id, (ts, peer_epoch_slots.root));
// Clone out EpochSlots structure to avoid holding lock on gossip
peers_needing_repairs.insert(peer.id, peer_epoch_slots.clone());
}
}
} }
} }
@ -169,6 +155,46 @@ impl ClusterInfoRepairListener {
} }
} }
fn process_potential_repairee(
my_id: &Pubkey,
peer_id: &Pubkey,
cluster_info: &Arc<RwLock<ClusterInfo>>,
peer_roots: &mut HashMap<Pubkey, (u64, u64)>,
my_gossiped_root: &mut u64,
) -> Option<EpochSlots> {
let last_cached_repair_ts = Self::get_last_ts(peer_id, peer_roots);
let my_root = Self::read_my_gossiped_root(&my_id, cluster_info, my_gossiped_root);
{
let r_cluster_info = cluster_info.read().unwrap();
// Update our local map with the updated peers' information
if let Some((peer_epoch_slots, updated_ts)) =
r_cluster_info.get_epoch_state_for_node(&peer_id, last_cached_repair_ts)
{
let peer_entry = peer_roots.entry(*peer_id).or_default();
let peer_root = cmp::max(peer_epoch_slots.root, peer_entry.1);
let mut result = None;
let last_repair_ts = {
// Following logic needs to be fast because it holds the lock
// preventing updates on gossip
if Self::should_repair_peer(my_root, peer_epoch_slots.root, NUM_BUFFER_SLOTS) {
// Clone out EpochSlots structure to avoid holding lock on gossip
result = Some(peer_epoch_slots.clone());
updated_ts
} else {
// No repairs were sent, don't need to update the timestamp
peer_entry.0
}
};
*peer_entry = (last_repair_ts, peer_root);
result
} else {
None
}
}
}
fn serve_repairs( fn serve_repairs(
my_pubkey: &Pubkey, my_pubkey: &Pubkey,
blocktree: &Blocktree, blocktree: &Blocktree,
@ -341,9 +367,9 @@ impl ClusterInfoRepairListener {
repair_redundancy: usize, repair_redundancy: usize,
) -> Option<BlobIndexesToRepairIterator> { ) -> Option<BlobIndexesToRepairIterator> {
let total_blobs = num_blobs_in_slot * repair_redundancy; let total_blobs = num_blobs_in_slot * repair_redundancy;
let total_repairmen_for_slot = min(total_blobs, eligible_repairmen.len()); let total_repairmen_for_slot = cmp::min(total_blobs, eligible_repairmen.len());
let blobs_per_repairman = min( let blobs_per_repairman = cmp::min(
(total_blobs + total_repairmen_for_slot - 1) / total_repairmen_for_slot, (total_blobs + total_repairmen_for_slot - 1) / total_repairmen_for_slot,
num_blobs_in_slot, num_blobs_in_slot,
); );
@ -389,9 +415,11 @@ impl ClusterInfoRepairListener {
.collect(); .collect();
repairmen.push(my_pubkey); repairmen.push(my_pubkey);
repairmen.sort();
repairmen repairmen
} }
// Read my root out of gossip, and update the cached `old_root`
fn read_my_gossiped_root( fn read_my_gossiped_root(
my_pubkey: &Pubkey, my_pubkey: &Pubkey,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
@ -422,8 +450,8 @@ impl ClusterInfoRepairListener {
repairman_root > repairee_root + num_buffer_slots as u64 repairman_root > repairee_root + num_buffer_slots as u64
} }
fn get_last_ts(pubkey: Pubkey, peer_roots: &mut HashMap<Pubkey, (u64, u64)>) -> Option<u64> { fn get_last_ts(pubkey: &Pubkey, peer_roots: &mut HashMap<Pubkey, (u64, u64)>) -> Option<u64> {
peer_roots.get(&pubkey).map(|(last_ts, _)| *last_ts) peer_roots.get(pubkey).map(|(last_ts, _)| *last_ts)
} }
} }
@ -443,6 +471,7 @@ mod tests {
use super::*; use super::*;
use crate::blocktree::get_tmp_ledger_path; use crate::blocktree::get_tmp_ledger_path;
use crate::blocktree::tests::make_many_slot_entries; use crate::blocktree::tests::make_many_slot_entries;
use crate::cluster_info::Node;
use crate::packet::{Blob, SharedBlob}; use crate::packet::{Blob, SharedBlob};
use crate::streamer; use crate::streamer;
use std::collections::BTreeSet; use std::collections::BTreeSet;
@ -503,6 +532,79 @@ mod tests {
} }
} }
#[test]
fn test_process_potential_repairee() {
// Set up node ids
let my_id = Pubkey::new_rand();
let peer_id = Pubkey::new_rand();
// Set up cluster_info
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
Node::new_localhost().info,
)));
// Push a repairee's epoch slots into cluster info
let repairee_root = 0;
let repairee_slots = BTreeSet::new();
cluster_info.write().unwrap().push_epoch_slots(
peer_id,
repairee_root,
repairee_slots.clone(),
);
// Set up locally cached information
let mut peer_roots = HashMap::new();
let mut my_gossiped_root = repairee_root;
// Root is not sufficiently far ahead, we shouldn't repair
assert!(ClusterInfoRepairListener::process_potential_repairee(
&my_id,
&peer_id,
&cluster_info,
&mut peer_roots,
&mut my_gossiped_root,
)
.is_none());
// Update the root to be sufficiently far ahead. A repair should now occur even if the
// object in gossip is not updated
my_gossiped_root = repairee_root + NUM_BUFFER_SLOTS as u64 + 1;
assert!(ClusterInfoRepairListener::process_potential_repairee(
&my_id,
&peer_id,
&cluster_info,
&mut peer_roots,
&mut my_gossiped_root,
)
.is_some());
// An repair was already sent, so if gossip is not updated, no repair should be sent again,
// even if our root moves forward
my_gossiped_root += 4;
assert!(ClusterInfoRepairListener::process_potential_repairee(
&my_id,
&peer_id,
&cluster_info,
&mut peer_roots,
&mut my_gossiped_root,
)
.is_none());
// Update the gossiped EpochSlots. Now a repair should be sent again
cluster_info
.write()
.unwrap()
.push_epoch_slots(peer_id, repairee_root, repairee_slots);
assert!(ClusterInfoRepairListener::process_potential_repairee(
&my_id,
&peer_id,
&cluster_info,
&mut peer_roots,
&mut my_gossiped_root,
)
.is_some());
}
#[test] #[test]
fn test_serve_repairs_to_repairee() { fn test_serve_repairs_to_repairee() {
let blocktree_path = get_tmp_ledger_path!(); let blocktree_path = get_tmp_ledger_path!();
@ -840,7 +942,7 @@ mod tests {
// 1) If there are a sufficient number of repairmen, then each blob should be sent // 1) If there are a sufficient number of repairmen, then each blob should be sent
// `repair_redundancy` OR `repair_redundancy + 1` times. // `repair_redundancy` OR `repair_redundancy + 1` times.
let num_expected_redundancy = min(num_repairmen, repair_redundancy); let num_expected_redundancy = cmp::min(num_repairmen, repair_redundancy);
for b in results.keys() { for b in results.keys() {
assert!( assert!(
results[b] == num_expected_redundancy || results[b] == num_expected_redundancy + 1 results[b] == num_expected_redundancy || results[b] == num_expected_redundancy + 1