Remove spy_node duplication

This commit is contained in:
Michael Vines
2019-03-06 17:08:40 -08:00
parent 96c0222b30
commit 8bc7d5a172
6 changed files with 31 additions and 41 deletions

View File

@ -41,23 +41,26 @@ fn main() {
println!("Looking for leader at {:?}", network); println!("Looking for leader at {:?}", network);
let leader = poll_gossip_for_leader(network, Some(30)).unwrap_or_else(|err| { let leader = poll_gossip_for_leader(network, Some(30)).unwrap_or_else(|err| {
println!( eprintln!(
"Error: unable to find leader on network after 30 seconds: {:?}", "Error: unable to find leader on network after 30 seconds: {:?}",
err err
); );
exit(1); exit(1);
}); });
let nodes = discover(&leader, num_nodes); let nodes = discover(&leader, num_nodes).unwrap_or_else(|err| {
eprintln!("{:?}", err);
exit(1);
});
if nodes.len() < num_nodes { if nodes.len() < num_nodes {
println!( eprintln!(
"Error: Insufficient nodes discovered. Expecting {} or more", "Error: Insufficient nodes discovered. Expecting {} or more",
num_nodes num_nodes
); );
exit(1); exit(1);
} }
if reject_extra_nodes && nodes.len() > num_nodes { if reject_extra_nodes && nodes.len() > num_nodes {
println!( eprintln!(
"Error: Extra nodes discovered. Expecting exactly {}", "Error: Extra nodes discovered. Expecting exactly {}",
num_nodes num_nodes
); );

View File

@ -1257,21 +1257,11 @@ impl ClusterInfo {
.unwrap() .unwrap()
} }
pub fn spy_node() -> (NodeInfo, UdpSocket) { pub fn spy_node(id: &Pubkey) -> (NodeInfo, UdpSocket) {
let (_, gossip_socket) = bind_in_range(FULLNODE_PORT_RANGE).unwrap(); let (_, gossip_socket) = bind_in_range(FULLNODE_PORT_RANGE).unwrap();
let pubkey = Keypair::new().pubkey();
let daddr = socketaddr_any!(); let daddr = socketaddr_any!();
let node = NodeInfo::new( let node = NodeInfo::new(*id, daddr, daddr, daddr, daddr, daddr, daddr, timestamp());
pubkey,
daddr,
daddr,
daddr,
daddr,
daddr,
daddr,
timestamp(),
);
(node, gossip_socket) (node, gossip_socket)
} }
} }
@ -1460,7 +1450,7 @@ mod tests {
fn test_cluster_spy_gossip() { fn test_cluster_spy_gossip() {
//check that gossip doesn't try to push to invalid addresses //check that gossip doesn't try to push to invalid addresses
let node = Node::new_localhost(); let node = Node::new_localhost();
let (spy, _) = ClusterInfo::spy_node(); let (spy, _) = ClusterInfo::spy_node(&Keypair::new().pubkey());
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
node.info, node.info,
))); )));

View File

@ -19,7 +19,7 @@ pub fn spend_and_verify_all_nodes(
funding_keypair: &Keypair, funding_keypair: &Keypair,
nodes: usize, nodes: usize,
) { ) {
let cluster_nodes = discover(&entry_point_info, nodes); let cluster_nodes = discover(&entry_point_info, nodes).unwrap();
assert!(cluster_nodes.len() >= nodes); assert!(cluster_nodes.len() >= nodes);
for ingress_node in &cluster_nodes { for ingress_node in &cluster_nodes {
let random_keypair = Keypair::new(); let random_keypair = Keypair::new();
@ -46,7 +46,7 @@ pub fn spend_and_verify_all_nodes(
} }
pub fn fullnode_exit(entry_point_info: &ContactInfo, nodes: usize) { pub fn fullnode_exit(entry_point_info: &ContactInfo, nodes: usize) {
let cluster_nodes = discover(&entry_point_info, nodes); let cluster_nodes = discover(&entry_point_info, nodes).unwrap();
assert!(cluster_nodes.len() >= nodes); assert!(cluster_nodes.len() >= nodes);
for node in &cluster_nodes { for node in &cluster_nodes {
let mut client = mk_client(&node); let mut client = mk_client(&node);
@ -65,7 +65,7 @@ pub fn kill_entry_and_spend_and_verify_rest(
nodes: usize, nodes: usize,
) { ) {
solana_logger::setup(); solana_logger::setup();
let cluster_nodes = discover(&entry_point_info, nodes); let cluster_nodes = discover(&entry_point_info, nodes).unwrap();
assert!(cluster_nodes.len() >= nodes); assert!(cluster_nodes.len() >= nodes);
let mut client = mk_client(&entry_point_info); let mut client = mk_client(&entry_point_info);
info!("sleeping for an epoch"); info!("sleeping for an epoch");

View File

@ -77,11 +77,15 @@ pub fn make_listening_node(
(gossip_service, new_node_cluster_info_ref, new_node, id) (gossip_service, new_node_cluster_info_ref, new_node, id)
} }
pub fn discover(entry_point_info: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> { pub fn discover(
entry_point_info: &NodeInfo,
num_nodes: usize,
) -> Result<Vec<NodeInfo>, &'static str> {
info!("Wait for convergence with {} nodes", num_nodes); info!("Wait for convergence with {} nodes", num_nodes);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let (gossip_service, spy_ref, id) = make_spy_node(entry_point_info, &exit); let (gossip_service, spy_ref) = make_spy_node(entry_point_info, &exit);
let id = spy_ref.read().unwrap().keypair.pubkey();
trace!( trace!(
"discover: spy_node {} looking for at least {} nodes", "discover: spy_node {} looking for at least {} nodes",
id, id,
@ -100,7 +104,7 @@ pub fn discover(entry_point_info: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo>
if rpc_peers.len() >= num_nodes { if rpc_peers.len() >= num_nodes {
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
gossip_service.join().unwrap(); gossip_service.join().unwrap();
return rpc_peers; return Ok(rpc_peers);
} }
debug!( debug!(
"discover: expecting an additional {} nodes", "discover: expecting an additional {} nodes",
@ -108,11 +112,11 @@ pub fn discover(entry_point_info: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo>
); );
sleep(Duration::new(1, 0)); sleep(Duration::new(1, 0));
} }
panic!("Failed to converge"); Err("Failed to converge")
} }
fn make_spy_node( pub fn make_spy_node(
leader: &NodeInfo, entry_point: &NodeInfo,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> (GossipService, Arc<RwLock<ClusterInfo>>) { ) -> (GossipService, Arc<RwLock<ClusterInfo>>) {
let keypair = Arc::new(Keypair::new()); let keypair = Arc::new(Keypair::new());
@ -122,9 +126,8 @@ fn make_spy_node(
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
let gossip_service = let gossip_service =
GossipService::new(&spy_cluster_info_ref, None, None, spy.sockets.gossip, exit); GossipService::new(&cluster_info.clone(), None, None, gossip_socket, &exit);
(gossip_service, cluster_info)
(gossip_service, spy_cluster_info_ref, id)
} }
impl Service for GossipService { impl Service for GossipService {

View File

@ -102,7 +102,7 @@ impl LocalCluster {
); );
fullnodes.push(validator_server); fullnodes.push(validator_server);
} }
discover(&leader_node_info, num_nodes); discover(&leader_node_info, num_nodes).unwrap();
Self { Self {
funding_keypair: mint_keypair, funding_keypair: mint_keypair,
entry_point_info: leader_node_info, entry_point_info: leader_node_info,

View File

@ -3,9 +3,9 @@
//! messages to the network directly. The binary encoding of its messages are //! messages to the network directly. The binary encoding of its messages are
//! unstable and may change in future releases. //! unstable and may change in future releases.
use crate::cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo}; use crate::cluster_info::{ClusterInfoError, NodeInfo};
use crate::fullnode::{Fullnode, FullnodeConfig}; use crate::fullnode::{Fullnode, FullnodeConfig};
use crate::gossip_service::GossipService; use crate::gossip_service::make_spy_node;
use crate::packet::PACKET_DATA_SIZE; use crate::packet::PACKET_DATA_SIZE;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::rpc_request::{RpcClient, RpcRequest, RpcRequestHandler}; use crate::rpc_request::{RpcClient, RpcRequest, RpcRequestHandler};
@ -26,7 +26,7 @@ use std;
use std::io; use std::io;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::Arc;
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
@ -392,14 +392,8 @@ impl Drop for ThinClient {
pub fn poll_gossip_for_leader(gossip_addr: SocketAddr, timeout: Option<u64>) -> Result<NodeInfo> { pub fn poll_gossip_for_leader(gossip_addr: SocketAddr, timeout: Option<u64>) -> Result<NodeInfo> {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let (node, gossip_socket) = ClusterInfo::spy_node();
let my_addr = gossip_socket.local_addr().unwrap();
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(node)));
let gossip_service =
GossipService::new(&cluster_info.clone(), None, None, gossip_socket, &exit);
let entry_point = NodeInfo::new_entry_point(&gossip_addr); let entry_point = NodeInfo::new_entry_point(&gossip_addr);
cluster_info.write().unwrap().insert_info(entry_point); let (gossip_service, cluster_info) = make_spy_node(&entry_point, &exit);
let deadline = match timeout { let deadline = match timeout {
Some(timeout) => Duration::new(timeout, 0), Some(timeout) => Duration::new(timeout, 0),
@ -408,7 +402,7 @@ pub fn poll_gossip_for_leader(gossip_addr: SocketAddr, timeout: Option<u64>) ->
let now = Instant::now(); let now = Instant::now();
let result = loop { let result = loop {
sleep(Duration::from_millis(100)); sleep(Duration::from_millis(100));
trace!("polling {:?} for leader from {:?}", gossip_addr, my_addr); trace!("polling {:?} for leader", gossip_addr);
if let Some(leader) = cluster_info.read().unwrap().get_gossip_top_leader() { if let Some(leader) = cluster_info.read().unwrap().get_gossip_top_leader() {
if log_enabled!(log::Level::Trace) { if log_enabled!(log::Level::Trace) {