From c65c0d9b23a33fa13a75c7a63050757df8307e4e Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 9 Jul 2018 14:53:18 -0600 Subject: [PATCH] Expose fewer exit variables --- src/bin/fullnode.rs | 7 +-- src/blob_fetch_stage.rs | 9 +++- src/fetch_stage.rs | 9 +++- src/fullnode.rs | 23 ++++---- src/ncp.rs | 19 ++++--- src/tpu.rs | 5 ++ src/tvu.rs | 10 ++-- tests/multinode.rs | 117 +++++++++++++--------------------------- 8 files changed, 89 insertions(+), 110 deletions(-) diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index c1a265564e..5c4b55e3c7 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -14,8 +14,6 @@ use solana::service::Service; use std::fs::File; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::process::exit; -use std::sync::atomic::AtomicBool; -use std::sync::Arc; //use std::time::Duration; fn main() -> () { @@ -68,11 +66,10 @@ fn main() -> () { } } let mut node = TestNode::new_with_bind_addr(repl_data, bind_addr); - let exit = Arc::new(AtomicBool::new(false)); let fullnode = if let Some(t) = matches.value_of("testnet") { let testnet_address_string = t.to_string(); let testnet_addr = testnet_address_string.parse().unwrap(); - FullNode::new(node, false, InFile::StdIn, Some(testnet_addr), None, exit) + FullNode::new(node, false, InFile::StdIn, Some(testnet_addr), None) } else { node.data.current_leader_id = node.data.id.clone(); @@ -81,7 +78,7 @@ fn main() -> () { } else { OutFile::StdOut }; - FullNode::new(node, true, InFile::StdIn, None, Some(outfile), exit) + FullNode::new(node, true, InFile::StdIn, None, Some(outfile)) }; fullnode.join().expect("join"); } diff --git a/src/blob_fetch_stage.rs b/src/blob_fetch_stage.rs index 168adeb273..c02c0836ee 100644 --- a/src/blob_fetch_stage.rs +++ b/src/blob_fetch_stage.rs @@ -3,13 +3,14 @@ use packet::BlobRecycler; use service::Service; use std::net::UdpSocket; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::Arc; use std::thread::{self, JoinHandle}; use streamer::{self, BlobReceiver}; pub struct BlobFetchStage { + exit: Arc, thread_hdls: Vec>, } @@ -39,7 +40,11 @@ impl BlobFetchStage { }) .collect(); - (BlobFetchStage { thread_hdls }, blob_receiver) + (BlobFetchStage { exit, thread_hdls }, blob_receiver) + } + + pub fn close(&self) { + self.exit.store(true, Ordering::Relaxed); } } diff --git a/src/fetch_stage.rs b/src/fetch_stage.rs index 1cf3274533..bac6be8d8a 100644 --- a/src/fetch_stage.rs +++ b/src/fetch_stage.rs @@ -3,13 +3,14 @@ use packet::PacketRecycler; use service::Service; use std::net::UdpSocket; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::Arc; use std::thread::{self, JoinHandle}; use streamer::{self, PacketReceiver}; pub struct FetchStage { + exit: Arc, thread_hdls: Vec>, } @@ -39,7 +40,11 @@ impl FetchStage { }) .collect(); - (FetchStage { thread_hdls }, packet_receiver) + (FetchStage { exit, thread_hdls }, packet_receiver) + } + + pub fn close(&self) { + self.exit.store(true, Ordering::Relaxed); } } diff --git a/src/fullnode.rs b/src/fullnode.rs index 956ae49788..fa66030057 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -14,7 +14,7 @@ use std::fs::{File, OpenOptions}; use std::io::{sink, stdin, stdout, BufReader}; use std::io::{Read, Write}; use std::net::SocketAddr; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{JoinHandle, Result}; use std::time::Duration; @@ -24,6 +24,7 @@ use tvu::Tvu; //use std::time::Duration; pub struct FullNode { + exit: Arc, thread_hdls: Vec>, } @@ -44,7 +45,6 @@ impl FullNode { infile: InFile, network_entry_for_validator: Option, outfile_for_leader: Option, - exit: Arc, ) -> FullNode { info!("creating bank..."); let bank = Bank::default(); @@ -70,6 +70,7 @@ impl FullNode { local_gossip_addr, node.data.contact_info.ncp ); let requests_addr = node.data.contact_info.rpu.clone(); + let exit = Arc::new(AtomicBool::new(false)); if !leader { let testnet_addr = network_entry_for_validator.expect("validator requires entry"); @@ -215,7 +216,7 @@ impl FullNode { ); thread_hdls.extend(vec![t_broadcast]); - FullNode { thread_hdls } + FullNode { exit, thread_hdls } } /// Create a server instance acting as a validator. @@ -294,7 +295,12 @@ impl FullNode { ); thread_hdls.extend(tvu.thread_hdls()); thread_hdls.extend(ncp.thread_hdls()); - FullNode { thread_hdls } + FullNode { exit, thread_hdls } + } + + pub fn close(self) -> Result<()> { + self.exit.store(true, Ordering::Relaxed); + self.join() } } @@ -317,7 +323,7 @@ mod tests { use crdt::TestNode; use fullnode::FullNode; use mint::Mint; - use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::atomic::AtomicBool; use std::sync::Arc; #[test] fn validator_exit() { @@ -326,10 +332,7 @@ mod tests { let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); let entry = tn.data.clone(); - let v = FullNode::new_validator(bank, 0, None, tn, entry, exit.clone()); - exit.store(true, Ordering::Relaxed); - for t in v.thread_hdls { - t.join().unwrap(); - } + let v = FullNode::new_validator(bank, 0, None, tn, entry, exit); + v.close().unwrap(); } } diff --git a/src/ncp.rs b/src/ncp.rs index 76bbb6b870..b72c7a54c2 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -5,13 +5,14 @@ use packet::{BlobRecycler, SharedBlob}; use result::Result; use service::Service; use std::net::UdpSocket; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::{self, JoinHandle}; use streamer; pub struct Ncp { + exit: Arc, thread_hdls: Vec>, } @@ -47,9 +48,14 @@ impl Ncp { response_sender.clone(), exit.clone(), ); - let t_gossip = Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit); + let t_gossip = Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit.clone()); let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; - Ok(Ncp { thread_hdls }) + Ok(Ncp { exit, thread_hdls }) + } + + pub fn close(self) -> thread::Result<()> { + self.exit.store(true, Ordering::Relaxed); + self.join() } } @@ -70,7 +76,7 @@ impl Service for Ncp { mod tests { use crdt::{Crdt, TestNode}; use ncp::Ncp; - use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; #[test] @@ -89,9 +95,6 @@ mod tests { tn.sockets.gossip_send, exit.clone(), ).unwrap(); - exit.store(true, Ordering::Relaxed); - for t in d.thread_hdls { - t.join().expect("thread join"); - } + d.close().expect("thread join"); } } diff --git a/src/tpu.rs b/src/tpu.rs index ae213e8882..d151987c9c 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -87,6 +87,11 @@ impl Tpu { }; (tpu, blob_receiver) } + + pub fn close(self) -> thread::Result<()> { + self.fetch_stage.close(); + self.join() + } } impl Service for Tpu { diff --git a/src/tvu.rs b/src/tvu.rs index 522b2d1c9e..3316f837cd 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -101,6 +101,11 @@ impl Tvu { window_stage, } } + + pub fn close(self) -> thread::Result<()> { + self.fetch_stage.close(); + self.join() + } } impl Service for Tvu { @@ -136,7 +141,7 @@ pub mod tests { use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; use std::net::UdpSocket; - use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -279,8 +284,7 @@ pub mod tests { let bob_balance = bank.get_balance(&bob_keypair.pubkey()); assert_eq!(bob_balance, starting_balance - alice_ref_balance); - exit.store(true, Ordering::Relaxed); - tvu.join().expect("join"); + tvu.close().expect("close"); dr_l.0.join().expect("join"); dr_2.0.join().expect("join"); dr_1.0.join().expect("join"); diff --git a/tests/multinode.rs b/tests/multinode.rs index edfe57e398..8eda478508 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -11,14 +11,13 @@ use solana::fullnode::{FullNode, InFile, OutFile}; use solana::logger; use solana::mint::Mint; use solana::ncp::Ncp; -use solana::service::Service; use solana::signature::{KeyPair, KeyPairUtil, PublicKey}; use solana::streamer::default_window; use solana::thin_client::ThinClient; use std::fs::File; use std::mem; use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; @@ -36,7 +35,7 @@ fn converge(leader: &ReplicatedData, num_nodes: usize) -> Vec { spy_crdt.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_window = default_window(); - let dr = Ncp::new( + let ncp = Ncp::new( spy_ref.clone(), spy_window, spy.sockets.gossip, @@ -66,8 +65,7 @@ fn converge(leader: &ReplicatedData, num_nodes: usize) -> Vec { sleep(Duration::new(1, 0)); } assert!(converged); - exit.store(true, Ordering::Relaxed); - dr.join().unwrap(); + ncp.close().unwrap(); rv } @@ -92,18 +90,10 @@ fn test_multi_node_validator_catchup_from_zero() { let leader = TestNode::new(); let leader_data = leader.data.clone(); let bob_pubkey = KeyPair::new().pubkey(); - let exit = Arc::new(AtomicBool::new(false)); let (alice, ledger_path) = genesis(10_000); - let server = FullNode::new( - leader, - true, - InFile::Path(ledger_path.clone()), - None, - None, - exit.clone(), - ); - let mut threads = server.thread_hdls(); + let server = FullNode::new(leader, true, InFile::Path(ledger_path.clone()), None, None); + let mut nodes = vec![server]; for _ in 0..N { let validator = TestNode::new(); let mut val = FullNode::new( @@ -112,9 +102,8 @@ fn test_multi_node_validator_catchup_from_zero() { InFile::Path(ledger_path.clone()), Some(leader_data.contact_info.ncp), None, - exit.clone(), ); - threads.append(&mut val.thread_hdls()); + nodes.push(val); } let servers = converge(&leader_data, N + 1); //contains the leader addr as well @@ -145,9 +134,8 @@ fn test_multi_node_validator_catchup_from_zero() { InFile::Path(ledger_path.clone()), Some(leader_data.contact_info.ncp), None, - exit.clone(), ); - threads.append(&mut val.thread_hdls()); + nodes.push(val); //contains the leader and new node let servers = converge(&leader_data, N + 2); @@ -180,9 +168,8 @@ fn test_multi_node_validator_catchup_from_zero() { } assert_eq!(success, servers.len()); - exit.store(true, Ordering::Relaxed); - for t in threads { - t.join().unwrap(); + for node in nodes { + node.close().unwrap(); } } @@ -194,27 +181,19 @@ fn test_multi_node_basic() { let leader = TestNode::new(); let leader_data = leader.data.clone(); let bob_pubkey = KeyPair::new().pubkey(); - let exit = Arc::new(AtomicBool::new(false)); let (alice, ledger_path) = genesis(10_000); - let server = FullNode::new( - leader, - true, - InFile::Path(ledger_path.clone()), - None, - None, - exit.clone(), - ); - let threads = server.thread_hdls(); + let server = FullNode::new(leader, true, InFile::Path(ledger_path.clone()), None, None); + let mut nodes = vec![server]; for _ in 0..N { let validator = TestNode::new(); - FullNode::new( + let val = FullNode::new( validator, false, InFile::Path(ledger_path.clone()), Some(leader_data.contact_info.ncp), None, - exit.clone(), ); + nodes.push(val); } let servers = converge(&leader_data, N + 1); //contains the leader addr as well @@ -236,9 +215,8 @@ fn test_multi_node_basic() { } assert_eq!(success, servers.len()); - exit.store(true, Ordering::Relaxed); - for t in threads { - t.join().unwrap(); + for node in nodes { + node.close().unwrap(); } std::fs::remove_file(ledger_path).unwrap(); } @@ -248,7 +226,6 @@ fn test_boot_validator_from_file() { logger::setup(); let leader = TestNode::new(); let bob_pubkey = KeyPair::new().pubkey(); - let exit = Arc::new(AtomicBool::new(false)); let (alice, ledger_path) = genesis(100_000); let leader_data = leader.data.clone(); let leader_fullnode = FullNode::new( @@ -257,7 +234,6 @@ fn test_boot_validator_from_file() { InFile::Path(ledger_path.clone()), None, Some(OutFile::Path(ledger_path.clone())), - exit.clone(), ); let leader_balance = send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap(); @@ -274,42 +250,36 @@ fn test_boot_validator_from_file() { InFile::Path(ledger_path.clone()), Some(leader_data.contact_info.ncp), None, - exit.clone(), ); let mut client = mk_client(&validator_data); let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance)); assert!(getbal == Some(leader_balance)); - exit.store(true, Ordering::Relaxed); - leader_fullnode.join().unwrap(); - val_fullnode.join().unwrap(); + leader_fullnode.close().unwrap(); + val_fullnode.close().unwrap(); std::fs::remove_file(ledger_path).unwrap(); } fn restart_leader( - exit: Option>, leader_fullnode: Option, ledger_path: String, -) -> (ReplicatedData, FullNode, Arc) { - if let (Some(exit), Some(leader_fullnode)) = (exit, leader_fullnode) { +) -> (ReplicatedData, FullNode) { + if let Some(leader_fullnode) = leader_fullnode { // stop the leader - exit.store(true, Ordering::Relaxed); - leader_fullnode.join().unwrap(); + leader_fullnode.close().unwrap(); } let leader = TestNode::new(); let leader_data = leader.data.clone(); - let exit = Arc::new(AtomicBool::new(false)); let leader_fullnode = FullNode::new( leader, true, InFile::Path(ledger_path.clone()), None, Some(OutFile::Path(ledger_path.clone())), - exit.clone(), ); - (leader_data, leader_fullnode, exit) + (leader_data, leader_fullnode) } #[test] @@ -322,7 +292,7 @@ fn test_leader_restart_validator_start_from_old_ledger() { let (alice, ledger_path) = genesis(100_000); let bob_pubkey = KeyPair::new().pubkey(); - let (leader_data, leader_fullnode, exit) = restart_leader(None, None, ledger_path.clone()); + let (leader_data, leader_fullnode) = restart_leader(None, ledger_path.clone()); // lengthen the ledger let leader_balance = @@ -337,8 +307,7 @@ fn test_leader_restart_validator_start_from_old_ledger() { .expect(format!("copy {} to {}", &ledger_path, &stale_ledger_path,).as_str()); // restart the leader - let (leader_data, leader_fullnode, exit) = - restart_leader(Some(exit), Some(leader_fullnode), ledger_path.clone()); + let (leader_data, leader_fullnode) = restart_leader(Some(leader_fullnode), ledger_path.clone()); // lengthen the ledger let leader_balance = @@ -346,8 +315,7 @@ fn test_leader_restart_validator_start_from_old_ledger() { assert_eq!(leader_balance, 1000); // restart the leader - let (leader_data, leader_fullnode, exit) = - restart_leader(Some(exit), Some(leader_fullnode), ledger_path.clone()); + let (leader_data, leader_fullnode) = restart_leader(Some(leader_fullnode), ledger_path.clone()); // start validator from old ledger let validator = TestNode::new(); @@ -358,7 +326,6 @@ fn test_leader_restart_validator_start_from_old_ledger() { InFile::Path(stale_ledger_path.clone()), Some(leader_data.contact_info.ncp), None, - exit.clone(), ); // trigger broadcast, validator should catch up from leader, whose window contains @@ -380,14 +347,13 @@ fn test_leader_restart_validator_start_from_old_ledger() { let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(expected)); assert_eq!(getbal, Some(expected)); - exit.store(true, Ordering::Relaxed); - leader_fullnode.join().unwrap(); - val_fullnode.join().unwrap(); + leader_fullnode.close().unwrap(); + val_fullnode.close().unwrap(); std::fs::remove_file(ledger_path).unwrap(); std::fs::remove_file(stale_ledger_path).unwrap(); } -//TODO: this test will run a long time so its disabled for CI +//TODO: this test will run a long time so it's disabled for CI #[test] #[ignore] fn test_multi_node_dynamic_network() { @@ -395,7 +361,6 @@ fn test_multi_node_dynamic_network() { const N: usize = 25; let leader = TestNode::new(); let bob_pubkey = KeyPair::new().pubkey(); - let exit = Arc::new(AtomicBool::new(false)); let (alice, ledger_path) = genesis(100_000); let leader_data = leader.data.clone(); let server = FullNode::new( @@ -404,10 +369,8 @@ fn test_multi_node_dynamic_network() { InFile::Path(ledger_path.clone()), None, Some(OutFile::Path(ledger_path.clone())), - exit.clone(), ); info!("{:x} LEADER", leader_data.debug_id()); - let threads = server.thread_hdls(); let leader_balance = send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap(); assert_eq!(leader_balance, 500); @@ -415,10 +378,9 @@ fn test_multi_node_dynamic_network() { send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(1000)).unwrap(); assert_eq!(leader_balance, 1000); - let mut validators: Vec<(ReplicatedData, Arc, FullNode)> = (0..N) + let mut validators: Vec<(ReplicatedData, FullNode)> = (0..N) .into_iter() .map(|_| { - let exit = Arc::new(AtomicBool::new(false)); let validator = TestNode::new(); let rd = validator.data.clone(); let val = FullNode::new( @@ -427,12 +389,12 @@ fn test_multi_node_dynamic_network() { InFile::Path(ledger_path.clone()), Some(leader_data.contact_info.ncp), Some(OutFile::Path(ledger_path.clone())), - exit.clone(), ); info!("{:x} VALIDATOR", rd.debug_id()); - (rd, exit, val) + (rd, val) }) .collect(); + for i in 0..N { //verify leader can do transfer let expected = ((i + 3) * 500) as i64; @@ -475,7 +437,6 @@ fn test_multi_node_dynamic_network() { } let val = { - let exit = Arc::new(AtomicBool::new(false)); let validator = TestNode::new(); let rd = validator.data.clone(); let val = FullNode::new( @@ -484,10 +445,9 @@ fn test_multi_node_dynamic_network() { InFile::Path(ledger_path.clone()), Some(leader_data.contact_info.ncp), Some(OutFile::Path(ledger_path.clone())), - exit.clone(), ); info!("{:x} ADDED", rd.debug_id()); - (rd, exit, val) + (rd, val) }; let old_val = mem::replace(&mut validators[i], val); @@ -495,19 +455,16 @@ fn test_multi_node_dynamic_network() { // this should be almost true, or at least validators.len() - 1 while the other node catches up //assert!(success == validators.len()); //kill a validator - old_val.1.store(true, Ordering::Relaxed); - old_val.2.join().unwrap(); + old_val.1.close().unwrap(); info!("{:x} KILLED", old_val.0.debug_id()); //add a new one } - for (_, exit, val) in validators.into_iter() { - exit.store(true, Ordering::Relaxed); - val.join().unwrap(); - } - exit.store(true, Ordering::Relaxed); - for t in threads { - t.join().unwrap(); + + for (_, node) in validators { + node.close().unwrap(); } + server.close().unwrap(); + std::fs::remove_file(ledger_path).unwrap(); }