From d8c9655128b997e691305cd56e22c9bb6fbcb64b Mon Sep 17 00:00:00 2001 From: anatoly yakovenko Date: Mon, 16 Jul 2018 19:31:52 -0700 Subject: [PATCH] Dynamic test assert (#643) * log responder error to warn * log responder error to warn * fixup! * fixed assert * fixed bad ports issue * comments * test for dummy address in Crdt::new instaad of NodeInfo::new * return error if ContactInfo supplied to Crdt::new cannot be used to connect to network * comments --- src/bin/client-demo.rs | 2 +- src/crdt.rs | 131 +++++++++++++++++++++++++++++++-------- src/drone.rs | 2 +- src/fullnode.rs | 6 +- src/ncp.rs | 4 +- src/packet.rs | 2 +- src/streamer.rs | 6 +- src/thin_client.rs | 6 +- src/tvu.rs | 12 ++-- tests/data_replicator.rs | 4 +- tests/multinode.rs | 37 ++++++----- 11 files changed, 149 insertions(+), 63 deletions(-) diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 8efe00116e..0c879ebaba 100755 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -386,7 +386,7 @@ fn converge( //lets spy on the network let daddr = "0.0.0.0:0".parse().unwrap(); let (spy, spy_gossip) = spy_node(); - let mut spy_crdt = Crdt::new(spy); + let mut spy_crdt = Crdt::new(spy).expect("Crdt::new"); spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_crdt)); diff --git a/src/crdt.rs b/src/crdt.rs index aef273b2d9..6e49449db0 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -48,6 +48,8 @@ const MIN_TABLE_SIZE: usize = 2; pub enum CrdtError { TooSmall, NoLeader, + BadContactInfo, + BadNodeInfo, } pub fn parse_port_or_addr(optstr: Option) -> SocketAddr { @@ -254,8 +256,26 @@ enum Protocol { } impl Crdt { - pub fn new(me: NodeInfo) -> Crdt { - assert_eq!(me.version, 0); + pub fn new(me: NodeInfo) -> Result { + if me.version != 0 { + return Err(Error::CrdtError(CrdtError::BadNodeInfo)); + } + for addr in &[ + me.contact_info.ncp, + me.contact_info.tvu, + me.contact_info.rpu, + me.contact_info.tpu, + me.contact_info.tvu_window, + ] { + //dummy address is allowed, services will filter them + if addr.ip().is_unspecified() && addr.port() == 0 { + continue; + } + //if addr is not a dummy address, than it must be valid + if addr.ip().is_unspecified() || addr.port() == 0 || addr.ip().is_multicast() { + return Err(Error::CrdtError(CrdtError::BadContactInfo)); + } + } let mut g = Crdt { table: HashMap::new(), local: HashMap::new(), @@ -267,7 +287,7 @@ impl Crdt { }; g.local.insert(me.id, g.update_index); g.table.insert(me.id, me); - g + Ok(g) } pub fn debug_id(&self) -> u64 { make_debug_id(&self.me) @@ -1071,23 +1091,18 @@ pub struct TestNode { pub sockets: Sockets, } -impl Default for TestNode { - fn default() -> Self { - Self::new() - } -} - impl TestNode { - pub fn new() -> Self { + pub fn new_localhost() -> Self { let pubkey = KeyPair::new().pubkey(); - Self::new_with_pubkey(pubkey) + Self::new_localhost_with_pubkey(pubkey) } - pub fn new_with_pubkey(pubkey: PublicKey) -> Self { - let transaction = UdpSocket::bind("0.0.0.0:0").unwrap(); - let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); - let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); - let requests = UdpSocket::bind("0.0.0.0:0").unwrap(); - let repair = UdpSocket::bind("0.0.0.0:0").unwrap(); + pub fn new_localhost_with_pubkey(pubkey: PublicKey) -> Self { + let transaction = UdpSocket::bind("127.0.0.1:0").unwrap(); + let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); + let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); + let requests = UdpSocket::bind("127.0.0.1:0").unwrap(); + let repair = UdpSocket::bind("127.0.0.1:0").unwrap(); + let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap(); let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -1190,6 +1205,68 @@ mod tests { let p3 = parse_port_or_addr(None); assert_eq!(p3.port(), 8000); } + #[test] + fn test_bad_address() { + let d1 = NodeInfo::new( + KeyPair::new().pubkey(), + "0.0.0.0:1234".parse().unwrap(), + "0.0.0.0:1235".parse().unwrap(), + "0.0.0.0:1236".parse().unwrap(), + "0.0.0.0:1237".parse().unwrap(), + "0.0.0.0:1238".parse().unwrap(), + ); + assert_matches!( + Crdt::new(d1).err(), + Some(Error::CrdtError(CrdtError::BadContactInfo)) + ); + let d2 = NodeInfo::new( + KeyPair::new().pubkey(), + "0.0.0.1:0".parse().unwrap(), + "0.0.0.1:0".parse().unwrap(), + "0.0.0.1:0".parse().unwrap(), + "0.0.0.1:0".parse().unwrap(), + "0.0.0.1:0".parse().unwrap(), + ); + assert_matches!( + Crdt::new(d2).err(), + Some(Error::CrdtError(CrdtError::BadContactInfo)) + ); + let d3 = NodeInfo::new( + KeyPair::new().pubkey(), + "0.0.0.0:0".parse().unwrap(), + "0.0.0.0:0".parse().unwrap(), + "0.0.0.0:0".parse().unwrap(), + "0.0.0.0:0".parse().unwrap(), + "0.0.0.0:0".parse().unwrap(), + ); + assert_eq!(Crdt::new(d3).is_ok(), true); + let d4 = NodeInfo::new( + KeyPair::new().pubkey(), + "255.255.255.255:0".parse().unwrap(), + "255.255.255.255:0".parse().unwrap(), + "255.255.255.255:0".parse().unwrap(), + "255.255.255.255:0".parse().unwrap(), + "255.255.255.255:0".parse().unwrap(), + ); + assert_matches!( + Crdt::new(d4).err(), + Some(Error::CrdtError(CrdtError::BadContactInfo)) + ); + let mut d5 = NodeInfo::new( + KeyPair::new().pubkey(), + "255.255.255.255:0".parse().unwrap(), + "255.255.255.255:0".parse().unwrap(), + "255.255.255.255:0".parse().unwrap(), + "255.255.255.255:0".parse().unwrap(), + "255.255.255.255:0".parse().unwrap(), + ); + d5.version = 1; + assert_matches!( + Crdt::new(d5).err(), + Some(Error::CrdtError(CrdtError::BadNodeInfo)) + ); + } + #[test] fn insert_test() { let mut d = NodeInfo::new( @@ -1201,7 +1278,7 @@ mod tests { "127.0.0.1:1238".parse().unwrap(), ); assert_eq!(d.version, 0); - let mut crdt = Crdt::new(d.clone()); + let mut crdt = Crdt::new(d.clone()).unwrap(); assert_eq!(crdt.table[&d.id].version, 0); d.version = 2; crdt.insert(&d); @@ -1214,7 +1291,7 @@ mod tests { fn test_new_vote() { let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); assert_eq!(d.version, 0); - let mut crdt = Crdt::new(d.clone()); + let mut crdt = Crdt::new(d.clone()).unwrap(); assert_eq!(crdt.table[&d.id].version, 0); let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap()); assert_ne!(d.id, leader.id); @@ -1241,7 +1318,7 @@ mod tests { fn test_insert_vote() { let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); assert_eq!(d.version, 0); - let mut crdt = Crdt::new(d.clone()); + let mut crdt = Crdt::new(d.clone()).unwrap(); assert_eq!(crdt.table[&d.id].version, 0); let vote_same_version = Vote { version: d.version, @@ -1273,7 +1350,7 @@ mod tests { // TODO: remove this test once leaders vote let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); assert_eq!(d.version, 0); - let mut crdt = Crdt::new(d.clone()); + let mut crdt = Crdt::new(d.clone()).unwrap(); let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap()); assert_ne!(d.id, leader.id); crdt.insert(&leader); @@ -1342,7 +1419,7 @@ mod tests { "127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(), ); - let mut crdt = Crdt::new(d1.clone()); + let mut crdt = Crdt::new(d1.clone()).expect("Crdt::new"); let (key, ix, ups) = crdt.get_updates_since(0); assert_eq!(key, d1.id); assert_eq!(ix, 1); @@ -1363,7 +1440,7 @@ mod tests { sorted(&ups), sorted(&vec![d1.clone(), d2.clone(), d3.clone()]) ); - let mut crdt2 = Crdt::new(d2.clone()); + let mut crdt2 = Crdt::new(d2.clone()).expect("Crdt::new"); crdt2.apply_updates(key, ix, &ups, &vec![]); assert_eq!(crdt2.table.values().len(), 3); assert_eq!( @@ -1385,7 +1462,7 @@ mod tests { "127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(), ); - let mut crdt = Crdt::new(me.clone()); + let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); let rv = crdt.window_index_request(0); assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall))); let nxt = NodeInfo::new( @@ -1447,7 +1524,7 @@ mod tests { "127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(), ); - let mut crdt = Crdt::new(me.clone()); + let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); let rv = crdt.gossip_request(); assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall))); let nxt1 = NodeInfo::new( @@ -1506,7 +1583,7 @@ mod tests { fn purge_test() { logger::setup(); let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); - let mut crdt = Crdt::new(me.clone()); + let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); let nxt = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap()); assert_ne!(me.id, nxt.id); crdt.set_leader(me.id); @@ -1617,7 +1694,7 @@ mod tests { let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); let leader0 = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); let leader1 = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); - let mut crdt = Crdt::new(me.clone()); + let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); assert_eq!(crdt.top_leader(), None); crdt.set_leader(leader0.id); assert_eq!(crdt.top_leader().unwrap(), leader0.id); diff --git a/src/drone.rs b/src/drone.rs index 36e0ba653f..6f4fcc67f0 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -265,7 +265,7 @@ mod tests { const TPS_BATCH: i64 = 5_000_000; logger::setup(); - let leader = TestNode::new(); + let leader = TestNode::new_localhost(); let alice = Mint::new(10_000_000); let bank = Bank::new(&alice); diff --git a/src/fullnode.rs b/src/fullnode.rs index 0d3e2340a8..d32d01066d 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -203,7 +203,7 @@ impl FullNode { thread_hdls.extend(rpu.thread_hdls()); let blob_recycler = BlobRecycler::default(); - let crdt = Arc::new(RwLock::new(Crdt::new(node.data))); + let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new"))); let (tpu, blob_receiver) = Tpu::new( &bank.clone(), &crdt.clone(), @@ -285,7 +285,7 @@ impl FullNode { ); thread_hdls.extend(rpu.thread_hdls()); - let crdt = Arc::new(RwLock::new(Crdt::new(node.data))); + let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new"))); crdt.write() .expect("'crdt' write lock before insert() in pub fn replicate") .insert(&entry_point); @@ -349,7 +349,7 @@ mod tests { #[test] fn validator_exit() { let kp = KeyPair::new(); - let tn = TestNode::new_with_pubkey(kp.pubkey()); + let tn = TestNode::new_localhost_with_pubkey(kp.pubkey()); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); diff --git a/src/ncp.rs b/src/ncp.rs index e03b7b5df1..0907df973e 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -88,8 +88,8 @@ mod tests { // test that stage will exit when flag is set fn test_exit() { let exit = Arc::new(AtomicBool::new(false)); - let tn = TestNode::new(); - let crdt = Crdt::new(tn.data.clone()); + let tn = TestNode::new_localhost(); + let crdt = Crdt::new(tn.data.clone()).expect("Crdt::new"); let c = Arc::new(RwLock::new(crdt)); let w = Arc::new(RwLock::new(vec![])); let d = Ncp::new( diff --git a/src/packet.rs b/src/packet.rs index 50c67c069b..6a20fcebf2 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -421,7 +421,7 @@ impl Blob { let p = r.read().expect("'r' read lock in pub fn send_to"); let a = p.meta.addr(); if let Err(e) = socket.send_to(&p.data[..p.meta.size], &a) { - info!( + warn!( "error sending {} byte packet to {:?}: {:?}", p.meta.size, a, e ); diff --git a/src/streamer.rs b/src/streamer.rs index 26f01d973e..e30f2ae400 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -116,7 +116,7 @@ pub fn responder( match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => error!("{} responder error: {:?}", name, e), + _ => warn!("{} responder error: {:?}", name, e), } } }) @@ -889,9 +889,9 @@ mod test { #[test] pub fn window_send_test() { logger::setup(); - let tn = TestNode::new(); + let tn = TestNode::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); - let mut crdt_me = Crdt::new(tn.data.clone()); + let mut crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new"); let me_id = crdt_me.my_data().id; crdt_me.set_leader(me_id); let subs = Arc::new(RwLock::new(crdt_me)); diff --git a/src/thin_client.rs b/src/thin_client.rs index 10d7b73739..26d5942775 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -274,7 +274,7 @@ mod tests { #[test] fn test_thin_client() { logger::setup(); - let leader = TestNode::new(); + let leader = TestNode::new_localhost(); let leader_data = leader.data.clone(); let alice = Mint::new(10_000); @@ -317,7 +317,7 @@ mod tests { #[ignore] fn test_bad_sig() { logger::setup(); - let leader = TestNode::new(); + let leader = TestNode::new_localhost(); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); @@ -371,7 +371,7 @@ mod tests { #[test] fn test_client_check_signature() { logger::setup(); - let leader = TestNode::new(); + let leader = TestNode::new_localhost(); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); diff --git a/src/tvu.rs b/src/tvu.rs index cf1d79ce01..1740a471f7 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -168,21 +168,21 @@ pub mod tests { #[test] fn test_replicate() { logger::setup(); - let leader = TestNode::new(); + let leader = TestNode::new_localhost(); let target1_kp = KeyPair::new(); - let target1 = TestNode::new_with_pubkey(target1_kp.pubkey()); - let target2 = TestNode::new(); + let target1 = TestNode::new_localhost_with_pubkey(target1_kp.pubkey()); + let target2 = TestNode::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); //start crdt_leader - let mut crdt_l = Crdt::new(leader.data.clone()); + let mut crdt_l = Crdt::new(leader.data.clone()).expect("Crdt::new"); crdt_l.set_leader(leader.data.id); let cref_l = Arc::new(RwLock::new(crdt_l)); let dr_l = new_ncp(cref_l, leader.sockets.gossip, exit.clone()).unwrap(); //start crdt2 - let mut crdt2 = Crdt::new(target2.data.clone()); + let mut crdt2 = Crdt::new(target2.data.clone()).expect("Crdt::new"); crdt2.insert(&leader.data); crdt2.set_leader(leader.data.id); let leader_id = leader.data.id; @@ -217,7 +217,7 @@ pub mod tests { let bank = Arc::new(Bank::new(&mint)); //start crdt1 - let mut crdt1 = Crdt::new(target1.data.clone()); + let mut crdt1 = Crdt::new(target1.data.clone()).expect("Crdt::new"); crdt1.insert(&leader.data); crdt1.set_leader(leader.data.id); let cref1 = Arc::new(RwLock::new(crdt1)); diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index 6dc58bac45..790e480ecb 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -16,8 +16,8 @@ use std::thread::sleep; use std::time::Duration; fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { - let tn = TestNode::new(); - let crdt = Crdt::new(tn.data.clone()); + let tn = TestNode::new_localhost(); + let crdt = Crdt::new(tn.data.clone()).expect("Crdt::new"); let c = Arc::new(RwLock::new(crdt)); let w = Arc::new(RwLock::new(vec![])); let d = Ncp::new( diff --git a/tests/multinode.rs b/tests/multinode.rs index 46790d8d81..678579ba9b 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -24,12 +24,12 @@ use std::time::Duration; fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { //lets spy on the network let exit = Arc::new(AtomicBool::new(false)); - let mut spy = TestNode::new(); + let mut spy = TestNode::new_localhost(); let daddr = "0.0.0.0:0".parse().unwrap(); let me = spy.data.id.clone(); spy.data.contact_info.tvu = daddr; spy.data.contact_info.rpu = daddr; - let mut spy_crdt = Crdt::new(spy.data); + let mut spy_crdt = Crdt::new(spy.data).expect("Crdt::new"); spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_crdt)); @@ -86,7 +86,7 @@ fn test_multi_node_validator_catchup_from_zero() { logger::setup(); const N: usize = 5; trace!("test_multi_node_validator_catchup_from_zero"); - let leader = TestNode::new(); + let leader = TestNode::new_localhost(); let leader_data = leader.data.clone(); let bob_pubkey = KeyPair::new().pubkey(); @@ -101,7 +101,7 @@ fn test_multi_node_validator_catchup_from_zero() { let mut nodes = vec![server]; for _ in 0..N { let keypair = KeyPair::new(); - let validator = TestNode::new_with_pubkey(keypair.pubkey()); + let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); let mut val = FullNode::new( validator, false, @@ -135,7 +135,7 @@ fn test_multi_node_validator_catchup_from_zero() { success = 0; // start up another validator, converge and then check everyone's balances let keypair = KeyPair::new(); - let validator = TestNode::new_with_pubkey(keypair.pubkey()); + let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); let val = FullNode::new( validator, false, @@ -186,7 +186,7 @@ fn test_multi_node_basic() { logger::setup(); const N: usize = 5; trace!("test_multi_node_basic"); - let leader = TestNode::new(); + let leader = TestNode::new_localhost(); let leader_data = leader.data.clone(); let bob_pubkey = KeyPair::new().pubkey(); let (alice, ledger_path) = genesis(10_000); @@ -200,7 +200,7 @@ fn test_multi_node_basic() { let mut nodes = vec![server]; for _ in 0..N { let keypair = KeyPair::new(); - let validator = TestNode::new_with_pubkey(keypair.pubkey()); + let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); let val = FullNode::new( validator, false, @@ -239,7 +239,7 @@ fn test_multi_node_basic() { #[test] fn test_boot_validator_from_file() { logger::setup(); - let leader = TestNode::new(); + let leader = TestNode::new_localhost(); let bob_pubkey = KeyPair::new().pubkey(); let (alice, ledger_path) = genesis(100_000); let leader_data = leader.data.clone(); @@ -258,7 +258,7 @@ fn test_boot_validator_from_file() { assert_eq!(leader_balance, 1000); let keypair = KeyPair::new(); - let validator = TestNode::new_with_pubkey(keypair.pubkey()); + let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.data.clone(); let val_fullnode = FullNode::new( validator, @@ -277,7 +277,7 @@ fn test_boot_validator_from_file() { } fn create_leader(ledger_path: &str) -> (NodeInfo, FullNode) { - let leader = TestNode::new(); + let leader = TestNode::new_localhost(); let leader_data = leader.data.clone(); let leader_fullnode = FullNode::new( leader, @@ -328,7 +328,7 @@ fn test_leader_restart_validator_start_from_old_ledger() { // start validator from old ledger let keypair = KeyPair::new(); - let validator = TestNode::new_with_pubkey(keypair.pubkey()); + let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.data.clone(); let val_fullnode = FullNode::new( validator, @@ -369,7 +369,7 @@ fn test_leader_restart_validator_start_from_old_ledger() { fn test_multi_node_dynamic_network() { logger::setup(); const N: usize = 60; - let leader = TestNode::new(); + let leader = TestNode::new_localhost(); let bob_pubkey = KeyPair::new().pubkey(); let (alice, ledger_path) = genesis(100_000); let leader_data = leader.data.clone(); @@ -392,7 +392,7 @@ fn test_multi_node_dynamic_network() { .into_iter() .map(|n| { let keypair = KeyPair::new(); - let validator = TestNode::new_with_pubkey(keypair.pubkey()); + let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); let rd = validator.data.clone(); //send some tokens to the new validator let bal = @@ -410,6 +410,7 @@ fn test_multi_node_dynamic_network() { }) .collect(); + let mut consecutive_success = 0; for i in 0..N { //verify leader can do transfer let expected = ((i + 3) * 500) as i64; @@ -452,9 +453,17 @@ fn test_multi_node_dynamic_network() { validators.len(), distance ); - //assert_eq!(success, validators.len()); + if success == validators.len() && distance == 0 { + consecutive_success += 1; + } else { + consecutive_success = 0; + } + if consecutive_success == 10 { + break; + } } } + assert_eq!(consecutive_success, 10); for (_, node) in validators { node.close().unwrap(); }