Correctly remove replicator from data plane after its done repairing (#4301)
* Correctly remove replicator from data plane after its done repairing * Update discover to report nodes and replicators separately * Fix print and condition to be spy
This commit is contained in:
@ -55,10 +55,11 @@ impl GossipService {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn discover_nodes(
|
||||
/// Discover Nodes and Replicators in a cluster
|
||||
pub fn discover_cluster(
|
||||
entry_point: &SocketAddr,
|
||||
num_nodes: usize,
|
||||
) -> std::io::Result<Vec<ContactInfo>> {
|
||||
) -> std::io::Result<(Vec<ContactInfo>, Vec<ContactInfo>)> {
|
||||
discover(entry_point, Some(num_nodes), Some(30), None, None)
|
||||
}
|
||||
|
||||
@ -68,7 +69,7 @@ pub fn discover(
|
||||
timeout: Option<u64>,
|
||||
find_node: Option<Pubkey>,
|
||||
gossip_addr: Option<&SocketAddr>,
|
||||
) -> std::io::Result<Vec<ContactInfo>> {
|
||||
) -> std::io::Result<(Vec<ContactInfo>, Vec<ContactInfo>)> {
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let (gossip_service, spy_ref) = make_gossip_node(entry_point, &exit, gossip_addr);
|
||||
|
||||
@ -76,7 +77,8 @@ pub fn discover(
|
||||
info!("Gossip entry point: {:?}", entry_point);
|
||||
info!("Spy node id: {:?}", id);
|
||||
|
||||
let (met_criteria, secs, tvu_peers) = spy(spy_ref.clone(), num_nodes, timeout, find_node);
|
||||
let (met_criteria, secs, tvu_peers, replicators) =
|
||||
spy(spy_ref.clone(), num_nodes, timeout, find_node);
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
gossip_service.join().unwrap();
|
||||
@ -87,7 +89,7 @@ pub fn discover(
|
||||
secs,
|
||||
spy_ref.read().unwrap().contact_info_trace()
|
||||
);
|
||||
return Ok(tvu_peers);
|
||||
return Ok((tvu_peers, replicators));
|
||||
}
|
||||
|
||||
if !tvu_peers.is_empty() {
|
||||
@ -95,7 +97,7 @@ pub fn discover(
|
||||
"discover failed to match criteria by timeout...\n{}",
|
||||
spy_ref.read().unwrap().contact_info_trace()
|
||||
);
|
||||
return Ok(tvu_peers);
|
||||
return Ok((tvu_peers, replicators));
|
||||
}
|
||||
|
||||
info!(
|
||||
@ -132,10 +134,11 @@ fn spy(
|
||||
num_nodes: Option<usize>,
|
||||
timeout: Option<u64>,
|
||||
find_node: Option<Pubkey>,
|
||||
) -> (bool, u64, Vec<ContactInfo>) {
|
||||
) -> (bool, u64, Vec<ContactInfo>, Vec<ContactInfo>) {
|
||||
let now = Instant::now();
|
||||
let mut met_criteria = false;
|
||||
let mut tvu_peers: Vec<ContactInfo> = Vec::new();
|
||||
let mut replicators: Vec<ContactInfo> = Vec::new();
|
||||
let mut i = 0;
|
||||
loop {
|
||||
if let Some(secs) = timeout {
|
||||
@ -143,11 +146,24 @@ fn spy(
|
||||
break;
|
||||
}
|
||||
}
|
||||
tvu_peers = spy_ref.read().unwrap().tvu_peers();
|
||||
// collect tvu peers but filter out replicators since their tvu is transient and we do not want
|
||||
// it to show up as a "node"
|
||||
tvu_peers = spy_ref
|
||||
.read()
|
||||
.unwrap()
|
||||
.tvu_peers()
|
||||
.into_iter()
|
||||
.filter(|node| !ClusterInfo::is_replicator(&node))
|
||||
.collect::<Vec<_>>();
|
||||
replicators = spy_ref.read().unwrap().storage_peers();
|
||||
if let Some(num) = num_nodes {
|
||||
if tvu_peers.len() >= num {
|
||||
if tvu_peers.len() + replicators.len() >= num {
|
||||
if let Some(pubkey) = find_node {
|
||||
if tvu_peers.iter().any(|x| x.id == pubkey) {
|
||||
if tvu_peers
|
||||
.iter()
|
||||
.chain(replicators.iter())
|
||||
.any(|x| x.id == pubkey)
|
||||
{
|
||||
met_criteria = true;
|
||||
break;
|
||||
}
|
||||
@ -158,7 +174,12 @@ fn spy(
|
||||
}
|
||||
}
|
||||
if let Some(pubkey) = find_node {
|
||||
if num_nodes.is_none() && tvu_peers.iter().any(|x| x.id == pubkey) {
|
||||
if num_nodes.is_none()
|
||||
&& tvu_peers
|
||||
.iter()
|
||||
.chain(replicators.iter())
|
||||
.any(|x| x.id == pubkey)
|
||||
{
|
||||
met_criteria = true;
|
||||
break;
|
||||
}
|
||||
@ -174,7 +195,12 @@ fn spy(
|
||||
));
|
||||
i += 1;
|
||||
}
|
||||
(met_criteria, now.elapsed().as_secs(), tvu_peers)
|
||||
(
|
||||
met_criteria,
|
||||
now.elapsed().as_secs(),
|
||||
tvu_peers,
|
||||
replicators,
|
||||
)
|
||||
}
|
||||
|
||||
/// Makes a spy or gossip node based on whether or not a gossip_addr was passed in
|
||||
@ -243,29 +269,30 @@ mod tests {
|
||||
|
||||
let spy_ref = Arc::new(RwLock::new(cluster_info));
|
||||
|
||||
let (met_criteria, secs, tvu_peers) = spy(spy_ref.clone(), None, Some(1), None);
|
||||
let (met_criteria, secs, tvu_peers, _) = spy(spy_ref.clone(), None, Some(1), None);
|
||||
assert_eq!(met_criteria, false);
|
||||
assert_eq!(secs, 1);
|
||||
assert_eq!(tvu_peers, spy_ref.read().unwrap().tvu_peers());
|
||||
|
||||
// Find num_nodes
|
||||
let (met_criteria, _, _) = spy(spy_ref.clone(), Some(1), None, None);
|
||||
let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), None, None);
|
||||
assert_eq!(met_criteria, true);
|
||||
let (met_criteria, _, _) = spy(spy_ref.clone(), Some(2), None, None);
|
||||
let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(2), None, None);
|
||||
assert_eq!(met_criteria, true);
|
||||
|
||||
// Find specific node by pubkey
|
||||
let (met_criteria, _, _) = spy(spy_ref.clone(), None, None, Some(peer0));
|
||||
let (met_criteria, _, _, _) = spy(spy_ref.clone(), None, None, Some(peer0));
|
||||
assert_eq!(met_criteria, true);
|
||||
let (met_criteria, _, _) = spy(spy_ref.clone(), None, Some(0), Some(Pubkey::new_rand()));
|
||||
let (met_criteria, _, _, _) = spy(spy_ref.clone(), None, Some(0), Some(Pubkey::new_rand()));
|
||||
assert_eq!(met_criteria, false);
|
||||
|
||||
// Find num_nodes *and* specific node by pubkey
|
||||
let (met_criteria, _, _) = spy(spy_ref.clone(), Some(1), None, Some(peer0));
|
||||
let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), None, Some(peer0));
|
||||
assert_eq!(met_criteria, true);
|
||||
let (met_criteria, _, _) = spy(spy_ref.clone(), Some(3), Some(0), Some(peer0));
|
||||
let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(3), Some(0), Some(peer0));
|
||||
assert_eq!(met_criteria, false);
|
||||
let (met_criteria, _, _) = spy(spy_ref.clone(), Some(1), Some(0), Some(Pubkey::new_rand()));
|
||||
let (met_criteria, _, _, _) =
|
||||
spy(spy_ref.clone(), Some(1), Some(0), Some(Pubkey::new_rand()));
|
||||
assert_eq!(met_criteria, false);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user