diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index b2f911d351..6ed6aa599b 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -8,6 +8,7 @@ use solana::crdt::{Crdt, TestNode}; use solana::logger; use solana::ncp::Ncp; use solana::packet::Blob; +use solana::service::Service; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; @@ -58,10 +59,8 @@ where sleep(Duration::new(1, 0)); } exit.store(true, Ordering::Relaxed); - for (c, dr, _) in listen.into_iter() { - for j in dr.thread_hdls.into_iter() { - j.join().unwrap(); - } + for (c, dr, _) in listen { + dr.join().unwrap(); // make it clear what failed // protocol is to chatty, updates should stop after everyone receives `num` assert!(c.read().unwrap().update_index <= num as u64); @@ -175,13 +174,9 @@ pub fn crdt_retransmit() { //r1 was the sender, so it should fail to receive the packet assert_eq!(res, [true, false, false]); exit.store(true, Ordering::Relaxed); - let mut threads = vec![]; - threads.extend(dr1.thread_hdls.into_iter()); - threads.extend(dr2.thread_hdls.into_iter()); - threads.extend(dr3.thread_hdls.into_iter()); - for t in threads.into_iter() { - t.join().unwrap(); - } + dr1.join().unwrap(); + dr2.join().unwrap(); + dr3.join().unwrap(); } #[test] @@ -255,13 +250,8 @@ fn test_external_liveness_table() { // Shutdown validators c2 and c3 c2_c3_exit.store(true, Ordering::Relaxed); - let mut threads = vec![]; - threads.extend(dr2.thread_hdls.into_iter()); - threads.extend(dr3.thread_hdls.into_iter()); - - for t in threads { - t.join().unwrap(); - } + dr2.join().unwrap(); + dr3.join().unwrap(); // Allow communication between c1 and c4, make sure that c1's external_liveness table // entry for c4 gets cleared @@ -281,11 +271,6 @@ fn test_external_liveness_table() { // Shutdown validators c1 and c4 c1_c4_exit.store(true, Ordering::Relaxed); - let mut threads = vec![]; - threads.extend(dr1.thread_hdls.into_iter()); - threads.extend(dr4.thread_hdls.into_iter()); - - for t in threads { - t.join().unwrap(); - } + dr1.join().unwrap(); + dr4.join().unwrap(); } diff --git a/tests/multinode.rs b/tests/multinode.rs index ba25556130..1437c448ff 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -11,10 +11,12 @@ use solana::fullnode::{FullNode, InFile, OutFile}; use solana::logger; use solana::mint::Mint; use solana::ncp::Ncp; +use solana::service::Service; use solana::signature::{KeyPair, KeyPairUtil, PublicKey}; use solana::streamer::default_window; use solana::thin_client::ThinClient; use std::fs::File; +use std::mem; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; @@ -65,9 +67,7 @@ fn converge(leader: &ReplicatedData, num_nodes: usize) -> Vec { } assert!(converged); exit.store(true, Ordering::Relaxed); - for t in dr.thread_hdls.into_iter() { - t.join().unwrap(); - } + dr.join().unwrap(); rv } @@ -103,7 +103,7 @@ fn test_multi_node_validator_catchup_from_zero() { None, exit.clone(), ); - let mut threads = server.thread_hdls; + let mut threads = server.thread_hdls(); for _ in 0..N { let validator = TestNode::new(); let mut val = FullNode::new( @@ -114,7 +114,7 @@ fn test_multi_node_validator_catchup_from_zero() { None, exit.clone(), ); - threads.append(&mut val.thread_hdls); + threads.append(&mut val.thread_hdls()); } let servers = converge(&leader_data, N + 1); //contains the leader addr as well @@ -139,7 +139,7 @@ fn test_multi_node_validator_catchup_from_zero() { success = 0; // start up another validator, converge and then check everyone's balances - let mut val = FullNode::new( + let val = FullNode::new( TestNode::new(), false, InFile::Path(ledger_path.clone()), @@ -147,7 +147,7 @@ fn test_multi_node_validator_catchup_from_zero() { None, exit.clone(), ); - threads.append(&mut val.thread_hdls); + threads.append(&mut val.thread_hdls()); //contains the leader and new node let servers = converge(&leader_data, N + 2); @@ -204,7 +204,7 @@ fn test_multi_node_basic() { None, exit.clone(), ); - let threads = server.thread_hdls; + let threads = server.thread_hdls(); for _ in 0..N { let validator = TestNode::new(); FullNode::new( @@ -282,12 +282,8 @@ fn test_boot_validator_from_file() { assert!(getbal == Some(leader_balance)); exit.store(true, Ordering::Relaxed); - for t in leader_fullnode.thread_hdls { - t.join().unwrap(); - } - for t in val_fullnode.thread_hdls { - t.join().unwrap(); - } + leader_fullnode.join().unwrap(); + val_fullnode.join().unwrap(); std::fs::remove_file(ledger_path).unwrap(); } @@ -310,7 +306,7 @@ fn test_multi_node_dynamic_network() { Some(OutFile::Path(ledger_path.clone())), exit.clone(), ); - let threads = server.thread_hdls; + let threads = server.thread_hdls(); let leader_balance = send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap(); assert_eq!(leader_balance, 500); @@ -375,18 +371,8 @@ fn test_multi_node_dynamic_network() { distance ); } - // this should be almost true, or at least validators.len() - 1 while the other node catches up - //assert!(success == validators.len()); - //kill a validator - validators[i].1.store(true, Ordering::Relaxed); - let mut ts = vec![]; - ts.append(&mut validators[i].2.thread_hdls); - for t in ts.into_iter() { - t.join().unwrap(); - } - info!("{:x} KILLED", validators[i].0.debug_id()); - //add a new one - validators[i] = { + + let val = { let exit = Arc::new(AtomicBool::new(false)); let validator = TestNode::new(); let rd = validator.data.clone(); @@ -401,12 +387,20 @@ fn test_multi_node_dynamic_network() { info!("{:x} ADDED", rd.debug_id()); (rd, exit, val) }; + + let old_val = mem::replace(&mut validators[i], val); + + // this should be almost true, or at least validators.len() - 1 while the other node catches up + //assert!(success == validators.len()); + //kill a validator + old_val.1.store(true, Ordering::Relaxed); + old_val.2.join().unwrap(); + info!("{:x} KILLED", old_val.0.debug_id()); + //add a new one } for (_, exit, val) in validators.into_iter() { exit.store(true, Ordering::Relaxed); - for t in val.thread_hdls { - t.join().unwrap(); - } + val.join().unwrap(); } exit.store(true, Ordering::Relaxed); for t in threads {