From a0ffbf50a5df14d57fc888003384baec5d14feb0 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Thu, 16 May 2019 07:14:58 -0700 Subject: [PATCH] Correctly remove replicator from data plane after its done repairing (#4301) * Correctly remove replicator from data plane after its done repairing * Update discover to report nodes and replicators separately * Fix print and condition to be spy --- bench-exchange/src/bench.rs | 6 ++-- bench-exchange/src/main.rs | 9 ++--- bench-tps/src/main.rs | 11 +++--- core/src/cluster_info.rs | 38 ++++++++++++++++++--- core/src/cluster_tests.rs | 8 ++--- core/src/fullnode.rs | 4 +-- core/src/gossip_service.rs | 67 ++++++++++++++++++++++++++----------- core/src/local_cluster.rs | 6 ++-- core/src/replicator.rs | 4 ++- core/tests/local_cluster.rs | 6 ++-- core/tests/replicator.rs | 13 ++++--- gossip/src/main.rs | 4 +-- 12 files changed, 119 insertions(+), 57 deletions(-) diff --git a/bench-exchange/src/bench.rs b/bench-exchange/src/bench.rs index f478e5b3f9..2cf9865e13 100644 --- a/bench-exchange/src/bench.rs +++ b/bench-exchange/src/bench.rs @@ -893,7 +893,7 @@ pub fn airdrop_lamports(client: &Client, drone_addr: &SocketAddr, id: &Keypair, mod tests { use super::*; use solana::fullnode::FullnodeConfig; - use solana::gossip_service::{discover_nodes, get_clients}; + use solana::gossip_service::{discover_cluster, get_clients}; use solana::local_cluster::{ClusterConfig, LocalCluster}; use solana_drone::drone::run_local_drone; use solana_exchange_api::exchange_processor::process_instruction; @@ -946,8 +946,8 @@ mod tests { let drone_addr = addr_receiver.recv_timeout(Duration::from_secs(2)).unwrap(); info!("Connecting to the cluster"); - let nodes = - discover_nodes(&cluster.entry_point_info.gossip, NUM_NODES).unwrap_or_else(|err| { + let (nodes, _) = discover_cluster(&cluster.entry_point_info.gossip, NUM_NODES) + .unwrap_or_else(|err| { error!("Failed to discover {} nodes: {:?}", NUM_NODES, err); exit(1); }); diff --git a/bench-exchange/src/main.rs b/bench-exchange/src/main.rs index 301ca2dd1b..ae5155ecc8 100644 --- a/bench-exchange/src/main.rs +++ b/bench-exchange/src/main.rs @@ -7,7 +7,7 @@ extern crate solana_exchange_program; use crate::bench::{airdrop_lamports, do_bench_exchange, Config}; use log::*; -use solana::gossip_service::{discover_nodes, get_clients}; +use solana::gossip_service::{discover_cluster, get_clients}; use solana_sdk::signature::KeypairUtil; fn main() { @@ -33,9 +33,10 @@ fn main() { } = cli_config; info!("Connecting to the cluster"); - let nodes = discover_nodes(&entrypoint_addr, num_nodes).unwrap_or_else(|_| { - panic!("Failed to discover nodes"); - }); + let (nodes, _replicators) = + discover_cluster(&entrypoint_addr, num_nodes).unwrap_or_else(|_| { + panic!("Failed to discover nodes"); + }); let clients = get_clients(&nodes); diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index 36f937cf95..38744eef89 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -2,7 +2,7 @@ mod bench; mod cli; use crate::bench::{do_bench_tps, generate_and_fund_keypairs, Config, NUM_LAMPORTS_PER_ACCOUNT}; -use solana::gossip_service::{discover_nodes, get_clients}; +use solana::gossip_service::{discover_cluster, get_clients}; use std::process::exit; fn main() { @@ -25,10 +25,11 @@ fn main() { } = cli_config; println!("Connecting to the cluster"); - let nodes = discover_nodes(&entrypoint_addr, num_nodes).unwrap_or_else(|err| { - eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err); - exit(1); - }); + let (nodes, _replicators) = + discover_cluster(&entrypoint_addr, num_nodes).unwrap_or_else(|err| { + eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err); + exit(1); + }); if nodes.len() < num_nodes { eprintln!( "Error: Insufficient nodes discovered. Expecting {} or more", diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 58548d5b74..d6dd532007 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -242,6 +242,7 @@ impl ClusterInfo { pub fn contact_info_trace(&self) -> String { let now = timestamp(); let mut spy_nodes = 0; + let mut replicators = 0; let my_id = self.my_data().id; let nodes: Vec<_> = self .all_peers() @@ -249,6 +250,8 @@ impl ClusterInfo { .map(|(node, last_updated)| { if Self::is_spy_node(&node) { spy_nodes += 1; + } else if Self::is_replicator(&node) { + replicators += 1; } fn addr_to_string(addr: &SocketAddr) -> String { if ContactInfo::is_valid_address(addr) { @@ -276,9 +279,14 @@ impl ClusterInfo { " Node contact info | Age | Node identifier \n\ -------------------------------+---------+-----------------------------------\n\ {}\ - Nodes: {}{}", + Nodes: {}{}{}", nodes.join(""), - nodes.len() - spy_nodes, + nodes.len() - spy_nodes - replicators, + if replicators > 0 { + format!("\nReplicators: {}", replicators) + } else { + "".to_string() + }, if spy_nodes > 0 { format!("\nSpies: {}", spy_nodes) } else { @@ -378,7 +386,7 @@ impl ClusterInfo { .collect() } - /// compute broadcast table + /// all peers that have a valid tvu port. pub fn tvu_peers(&self) -> Vec { let me = self.my_data().id; self.gossip @@ -392,6 +400,20 @@ impl ClusterInfo { .collect() } + /// all peers that have a valid storage addr + pub fn storage_peers(&self) -> Vec { + let me = self.my_data().id; + self.gossip + .crds + .table + .values() + .filter_map(|x| x.value.contact_info()) + .filter(|x| ContactInfo::is_valid_address(&x.storage_addr)) + .filter(|x| x.id != me) + .cloned() + .collect() + } + /// all peers that have a valid tvu pub fn retransmit_peers(&self) -> Vec { let me = self.my_data().id; @@ -417,9 +439,15 @@ impl ClusterInfo { } fn is_spy_node(contact_info: &ContactInfo) -> bool { - !ContactInfo::is_valid_address(&contact_info.tpu) + (!ContactInfo::is_valid_address(&contact_info.tpu) || !ContactInfo::is_valid_address(&contact_info.gossip) - || !ContactInfo::is_valid_address(&contact_info.tvu) + || !ContactInfo::is_valid_address(&contact_info.tvu)) + && !ContactInfo::is_valid_address(&contact_info.storage_addr) + } + + pub fn is_replicator(contact_info: &ContactInfo) -> bool { + ContactInfo::is_valid_address(&contact_info.storage_addr) + && !ContactInfo::is_valid_address(&contact_info.tpu) } fn sort_by_stake( diff --git a/core/src/cluster_tests.rs b/core/src/cluster_tests.rs index ca41f31568..e5a2b0262b 100644 --- a/core/src/cluster_tests.rs +++ b/core/src/cluster_tests.rs @@ -6,7 +6,7 @@ use crate::blocktree::Blocktree; use crate::cluster_info::FULLNODE_PORT_RANGE; use crate::contact_info::ContactInfo; use crate::entry::{Entry, EntrySlice}; -use crate::gossip_service::discover_nodes; +use crate::gossip_service::discover_cluster; use crate::locktower::VOTE_THRESHOLD_DEPTH; use crate::poh_service::PohServiceConfig; use solana_client::thin_client::create_client; @@ -30,7 +30,7 @@ pub fn spend_and_verify_all_nodes( funding_keypair: &Keypair, nodes: usize, ) { - let cluster_nodes = discover_nodes(&entry_point_info.gossip, nodes).unwrap(); + let (cluster_nodes, _) = discover_cluster(&entry_point_info.gossip, nodes).unwrap(); assert!(cluster_nodes.len() >= nodes); for ingress_node in &cluster_nodes { let random_keypair = Keypair::new(); @@ -81,7 +81,7 @@ pub fn send_many_transactions(node: &ContactInfo, funding_keypair: &Keypair, num } pub fn fullnode_exit(entry_point_info: &ContactInfo, nodes: usize) { - let cluster_nodes = discover_nodes(&entry_point_info.gossip, nodes).unwrap(); + let (cluster_nodes, _) = discover_cluster(&entry_point_info.gossip, nodes).unwrap(); assert!(cluster_nodes.len() >= nodes); for node in &cluster_nodes { let client = create_client(node.client_facing_addr(), FULLNODE_PORT_RANGE); @@ -153,7 +153,7 @@ pub fn kill_entry_and_spend_and_verify_rest( slot_millis: u64, ) { solana_logger::setup(); - let cluster_nodes = discover_nodes(&entry_point_info.gossip, nodes).unwrap(); + let (cluster_nodes, _) = discover_cluster(&entry_point_info.gossip, nodes).unwrap(); assert!(cluster_nodes.len() >= nodes); let client = create_client(entry_point_info.client_facing_addr(), FULLNODE_PORT_RANGE); let first_two_epoch_slots = MINIMUM_SLOT_LENGTH * 3; diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index 543923cdc8..ad2ee545d9 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -5,7 +5,7 @@ use crate::blocktree::{Blocktree, CompletedSlotsReceiver}; use crate::blocktree_processor::{self, BankForksInfo}; use crate::cluster_info::{ClusterInfo, Node}; use crate::contact_info::ContactInfo; -use crate::gossip_service::{discover_nodes, GossipService}; +use crate::gossip_service::{discover_cluster, GossipService}; use crate::leader_schedule_cache::LeaderScheduleCache; use crate::poh_recorder::PohRecorder; use crate::poh_service::{PohService, PohServiceConfig}; @@ -369,7 +369,7 @@ pub fn new_fullnode_for_tests() -> (Fullnode, ContactInfo, Keypair, String) { None, &FullnodeConfig::default(), ); - discover_nodes(&contact_info.gossip, 1).expect("Node startup failed"); + discover_cluster(&contact_info.gossip, 1).expect("Node startup failed"); (node, contact_info, mint_keypair, ledger_path) } diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 69758e46e7..4d0b41ea4b 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -55,10 +55,11 @@ impl GossipService { } } -pub fn discover_nodes( +/// Discover Nodes and Replicators in a cluster +pub fn discover_cluster( entry_point: &SocketAddr, num_nodes: usize, -) -> std::io::Result> { +) -> std::io::Result<(Vec, Vec)> { discover(entry_point, Some(num_nodes), Some(30), None, None) } @@ -68,7 +69,7 @@ pub fn discover( timeout: Option, find_node: Option, gossip_addr: Option<&SocketAddr>, -) -> std::io::Result> { +) -> std::io::Result<(Vec, Vec)> { let exit = Arc::new(AtomicBool::new(false)); let (gossip_service, spy_ref) = make_gossip_node(entry_point, &exit, gossip_addr); @@ -76,7 +77,8 @@ pub fn discover( info!("Gossip entry point: {:?}", entry_point); info!("Spy node id: {:?}", id); - let (met_criteria, secs, tvu_peers) = spy(spy_ref.clone(), num_nodes, timeout, find_node); + let (met_criteria, secs, tvu_peers, replicators) = + spy(spy_ref.clone(), num_nodes, timeout, find_node); exit.store(true, Ordering::Relaxed); gossip_service.join().unwrap(); @@ -87,7 +89,7 @@ pub fn discover( secs, spy_ref.read().unwrap().contact_info_trace() ); - return Ok(tvu_peers); + return Ok((tvu_peers, replicators)); } if !tvu_peers.is_empty() { @@ -95,7 +97,7 @@ pub fn discover( "discover failed to match criteria by timeout...\n{}", spy_ref.read().unwrap().contact_info_trace() ); - return Ok(tvu_peers); + return Ok((tvu_peers, replicators)); } info!( @@ -132,10 +134,11 @@ fn spy( num_nodes: Option, timeout: Option, find_node: Option, -) -> (bool, u64, Vec) { +) -> (bool, u64, Vec, Vec) { let now = Instant::now(); let mut met_criteria = false; let mut tvu_peers: Vec = Vec::new(); + let mut replicators: Vec = Vec::new(); let mut i = 0; loop { if let Some(secs) = timeout { @@ -143,11 +146,24 @@ fn spy( break; } } - tvu_peers = spy_ref.read().unwrap().tvu_peers(); + // collect tvu peers but filter out replicators since their tvu is transient and we do not want + // it to show up as a "node" + tvu_peers = spy_ref + .read() + .unwrap() + .tvu_peers() + .into_iter() + .filter(|node| !ClusterInfo::is_replicator(&node)) + .collect::>(); + replicators = spy_ref.read().unwrap().storage_peers(); if let Some(num) = num_nodes { - if tvu_peers.len() >= num { + if tvu_peers.len() + replicators.len() >= num { if let Some(pubkey) = find_node { - if tvu_peers.iter().any(|x| x.id == pubkey) { + if tvu_peers + .iter() + .chain(replicators.iter()) + .any(|x| x.id == pubkey) + { met_criteria = true; break; } @@ -158,7 +174,12 @@ fn spy( } } if let Some(pubkey) = find_node { - if num_nodes.is_none() && tvu_peers.iter().any(|x| x.id == pubkey) { + if num_nodes.is_none() + && tvu_peers + .iter() + .chain(replicators.iter()) + .any(|x| x.id == pubkey) + { met_criteria = true; break; } @@ -174,7 +195,12 @@ fn spy( )); i += 1; } - (met_criteria, now.elapsed().as_secs(), tvu_peers) + ( + met_criteria, + now.elapsed().as_secs(), + tvu_peers, + replicators, + ) } /// Makes a spy or gossip node based on whether or not a gossip_addr was passed in @@ -243,29 +269,30 @@ mod tests { let spy_ref = Arc::new(RwLock::new(cluster_info)); - let (met_criteria, secs, tvu_peers) = spy(spy_ref.clone(), None, Some(1), None); + let (met_criteria, secs, tvu_peers, _) = spy(spy_ref.clone(), None, Some(1), None); assert_eq!(met_criteria, false); assert_eq!(secs, 1); assert_eq!(tvu_peers, spy_ref.read().unwrap().tvu_peers()); // Find num_nodes - let (met_criteria, _, _) = spy(spy_ref.clone(), Some(1), None, None); + let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), None, None); assert_eq!(met_criteria, true); - let (met_criteria, _, _) = spy(spy_ref.clone(), Some(2), None, None); + let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(2), None, None); assert_eq!(met_criteria, true); // Find specific node by pubkey - let (met_criteria, _, _) = spy(spy_ref.clone(), None, None, Some(peer0)); + let (met_criteria, _, _, _) = spy(spy_ref.clone(), None, None, Some(peer0)); assert_eq!(met_criteria, true); - let (met_criteria, _, _) = spy(spy_ref.clone(), None, Some(0), Some(Pubkey::new_rand())); + let (met_criteria, _, _, _) = spy(spy_ref.clone(), None, Some(0), Some(Pubkey::new_rand())); assert_eq!(met_criteria, false); // Find num_nodes *and* specific node by pubkey - let (met_criteria, _, _) = spy(spy_ref.clone(), Some(1), None, Some(peer0)); + let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), None, Some(peer0)); assert_eq!(met_criteria, true); - let (met_criteria, _, _) = spy(spy_ref.clone(), Some(3), Some(0), Some(peer0)); + let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(3), Some(0), Some(peer0)); assert_eq!(met_criteria, false); - let (met_criteria, _, _) = spy(spy_ref.clone(), Some(1), Some(0), Some(Pubkey::new_rand())); + let (met_criteria, _, _, _) = + spy(spy_ref.clone(), Some(1), Some(0), Some(Pubkey::new_rand())); assert_eq!(met_criteria, false); } } diff --git a/core/src/local_cluster.rs b/core/src/local_cluster.rs index b3a8e54e05..34ec186772 100644 --- a/core/src/local_cluster.rs +++ b/core/src/local_cluster.rs @@ -4,7 +4,7 @@ use crate::cluster_info::{Node, FULLNODE_PORT_RANGE}; use crate::contact_info::ContactInfo; use crate::fullnode::{Fullnode, FullnodeConfig}; use crate::genesis_utils::create_genesis_block_with_leader; -use crate::gossip_service::discover_nodes; +use crate::gossip_service::discover_cluster; use crate::replicator::Replicator; use crate::service::Service; use solana_client::thin_client::create_client; @@ -176,7 +176,7 @@ impl LocalCluster { }; (0..config.num_listeners).for_each(|_| cluster.add_validator(&listener_config, 0)); - discover_nodes( + discover_cluster( &cluster.entry_point_info.gossip, config.node_stakes.len() + config.num_listeners as usize, ) @@ -186,7 +186,7 @@ impl LocalCluster { cluster.add_replicator(); } - discover_nodes( + discover_cluster( &cluster.entry_point_info.gossip, config.node_stakes.len() + config.num_replicators as usize, ) diff --git a/core/src/replicator.rs b/core/src/replicator.rs index d2d99baf0e..8f4418a559 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -26,6 +26,7 @@ use solana_sdk::hash::{Hash, Hasher}; use solana_sdk::message::Message; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::system_transaction; +use solana_sdk::timing::timestamp; use solana_sdk::transaction::Transaction; use solana_sdk::transport::TransportError; use solana_storage_api::{get_segment_from_slot, storage_instruction, SLOTS_PER_SEGMENT}; @@ -209,7 +210,7 @@ impl Replicator { ); info!("Connecting to the cluster via {:?}", cluster_entrypoint); - let nodes = crate::gossip_service::discover_nodes(&cluster_entrypoint.gossip, 1)?; + let (nodes, _) = crate::gossip_service::discover_cluster(&cluster_entrypoint.gossip, 1)?; let client = crate::gossip_service::get_client(&nodes); let (storage_blockhash, storage_slot) = Self::poll_for_blockhash_and_slot(&cluster_info)?; @@ -349,6 +350,7 @@ impl Replicator { // Remove replicator from the data plane let mut contact_info = node_info.clone(); contact_info.tvu = "0.0.0.0:0".parse().unwrap(); + contact_info.wallclock = timestamp(); { let mut cluster_info_w = cluster_info.write().unwrap(); cluster_info_w.insert_self(contact_info); diff --git a/core/tests/local_cluster.rs b/core/tests/local_cluster.rs index a078dd28b3..5fc2b91c88 100644 --- a/core/tests/local_cluster.rs +++ b/core/tests/local_cluster.rs @@ -3,7 +3,7 @@ extern crate solana; use solana::cluster::Cluster; use solana::cluster_tests; use solana::fullnode::FullnodeConfig; -use solana::gossip_service::discover_nodes; +use solana::gossip_service::discover_cluster; use solana::local_cluster::{ClusterConfig, LocalCluster}; use solana::poh_service::PohServiceConfig; use solana_runtime::epoch_schedule::MINIMUM_SLOT_LENGTH; @@ -153,7 +153,7 @@ fn test_forwarding() { }; let cluster = LocalCluster::new(&config); - let cluster_nodes = discover_nodes(&cluster.entry_point_info.gossip, 2).unwrap(); + let (cluster_nodes, _) = discover_cluster(&cluster.entry_point_info.gossip, 2).unwrap(); assert!(cluster_nodes.len() >= 2); let leader_id = cluster.entry_point_info.id; @@ -203,6 +203,6 @@ fn test_listener_startup() { ..ClusterConfig::default() }; let cluster = LocalCluster::new(&config); - let cluster_nodes = discover_nodes(&cluster.entry_point_info.gossip, 4).unwrap(); + let (cluster_nodes, _) = discover_cluster(&cluster.entry_point_info.gossip, 4).unwrap(); assert_eq!(cluster_nodes.len(), 4); } diff --git a/core/tests/replicator.rs b/core/tests/replicator.rs index 3bf71fd526..3521f6e2b9 100644 --- a/core/tests/replicator.rs +++ b/core/tests/replicator.rs @@ -9,7 +9,7 @@ use solana::blocktree::{create_new_tmp_ledger, Blocktree}; use solana::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE}; use solana::contact_info::ContactInfo; use solana::fullnode::FullnodeConfig; -use solana::gossip_service::discover_nodes; +use solana::gossip_service::discover_cluster; use solana::local_cluster::{ClusterConfig, LocalCluster}; use solana::replicator::Replicator; use solana::replicator::ReplicatorRequest; @@ -114,15 +114,18 @@ fn run_replicator_startup_basic(num_nodes: usize, num_replicators: usize) { }; let cluster = LocalCluster::new(&config); - let cluster_nodes = discover_nodes( + let (cluster_nodes, cluster_replicators) = discover_cluster( &cluster.entry_point_info.gossip, num_nodes + num_replicators, ) .unwrap(); - assert_eq!(cluster_nodes.len(), num_nodes + num_replicators); + assert_eq!( + cluster_nodes.len() + cluster_replicators.len(), + num_nodes + num_replicators + ); let mut replicator_count = 0; let mut replicator_info = ContactInfo::default(); - for node in &cluster_nodes { + for node in &cluster_replicators { info!("storage: {:?} rpc: {:?}", node.storage_addr, node.rpc); if ContactInfo::is_valid_address(&node.storage_addr) { replicator_count += 1; @@ -225,7 +228,7 @@ fn test_account_setup() { }; let cluster = LocalCluster::new(&config); - let _ = discover_nodes( + let _ = discover_cluster( &cluster.entry_point_info.gossip, num_nodes + num_replicators as usize, ) diff --git a/gossip/src/main.rs b/gossip/src/main.rs index af7111be61..602db4c5fa 100644 --- a/gossip/src/main.rs +++ b/gossip/src/main.rs @@ -133,7 +133,7 @@ fn main() -> Result<(), Box> { Some(addr) }; - let nodes = discover( + let (nodes, _replicators) = discover( &entrypoint_addr, num_nodes, timeout, @@ -174,7 +174,7 @@ fn main() -> Result<(), Box> { .unwrap() .parse::() .unwrap(); - let nodes = discover(&entrypoint_addr, None, None, Some(pubkey), None)?; + let (nodes, _replicators) = discover(&entrypoint_addr, None, None, Some(pubkey), None)?; let node = nodes.iter().find(|x| x.id == pubkey).unwrap(); if !ContactInfo::is_valid_address(&node.rpc) {