diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 8fa9bfb0c8..de28cbd024 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -136,9 +136,7 @@ impl BankingStage { } impl Service for BankingStage { - fn thread_hdls(self) -> Vec> { - vec![self.thread_hdl] - } + type JoinReturnType = (); fn join(self) -> thread::Result<()> { self.thread_hdl.join() diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index 6b7ff72069..bec6d44e72 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -32,7 +32,6 @@ use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::thread::Builder; -use std::thread::JoinHandle; use std::time::Duration; use std::time::Instant; @@ -512,8 +511,7 @@ fn main() { let leader = poll_gossip_for_leader(network, None).expect("unable to find leader on network"); let exit_signal = Arc::new(AtomicBool::new(false)); - let mut c_threads = vec![]; - let (nodes, leader) = converge(&leader, &exit_signal, num_nodes, &mut c_threads); + let (nodes, leader, ncp) = converge(&leader, &exit_signal, num_nodes); if nodes.len() < num_nodes { println!( @@ -693,17 +691,14 @@ fn main() { ); // join the crdt client threads - for t in c_threads { - t.join().unwrap(); - } + ncp.join().unwrap(); } fn converge( leader: &NodeInfo, exit_signal: &Arc, num_nodes: usize, - threads: &mut Vec>, -) -> (Vec, Option) { +) -> (Vec, Option, Ncp) { //lets spy on the network let (node, gossip_socket) = Crdt::spy_node(); let mut spy_crdt = Crdt::new(node).expect("Crdt::new"); @@ -749,9 +744,8 @@ fn converge( } sleep(Duration::new(1, 0)); } - threads.extend(ncp.thread_hdls().into_iter()); let leader = spy_ref.read().unwrap().leader_data().cloned(); - (v, leader) + (v, leader, ncp) } #[cfg(test)] diff --git a/src/blob_fetch_stage.rs b/src/blob_fetch_stage.rs index 0625c8e02f..2b10a8a3f9 100644 --- a/src/blob_fetch_stage.rs +++ b/src/blob_fetch_stage.rs @@ -44,12 +44,10 @@ impl BlobFetchStage { } impl Service for BlobFetchStage { - fn thread_hdls(self) -> Vec> { - self.thread_hdls - } + type JoinReturnType = (); fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls() { + for thread_hdl in self.thread_hdls { thread_hdl.join()?; } Ok(()) diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 6af68bd931..9810aa791b 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -191,9 +191,7 @@ impl BroadcastStage { } impl Service for BroadcastStage { - fn thread_hdls(self) -> Vec> { - vec![self.thread_hdl] - } + type JoinReturnType = (); fn join(self) -> thread::Result<()> { self.thread_hdl.join() diff --git a/src/fetch_stage.rs b/src/fetch_stage.rs index d445101bb0..98f6a55292 100644 --- a/src/fetch_stage.rs +++ b/src/fetch_stage.rs @@ -45,12 +45,10 @@ impl FetchStage { } impl Service for FetchStage { - fn thread_hdls(self) -> Vec> { - self.thread_hdls - } + type JoinReturnType = (); fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls() { + for thread_hdl in self.thread_hdls { thread_hdl.join()?; } Ok(()) diff --git a/src/fullnode.rs b/src/fullnode.rs index 669da25258..5c94c6d73a 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -15,15 +15,60 @@ use signature::{Keypair, KeypairUtil}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; -use std::thread::{JoinHandle, Result}; +use std::thread::Result; use tpu::Tpu; use tvu::Tvu; use untrusted::Input; use window; +pub enum NodeRole { + Leader(LeaderServices), + Validator(ValidatorServices), +} + +pub struct LeaderServices { + tpu: Tpu, + broadcast_stage: BroadcastStage, +} + +impl LeaderServices { + fn new(tpu: Tpu, broadcast_stage: BroadcastStage) -> Self { + LeaderServices { + tpu, + broadcast_stage, + } + } + + fn join(self) -> Result<()> { + self.tpu.join()?; + self.broadcast_stage.join() + } +} + +pub struct ValidatorServices { + tvu: Tvu, +} + +impl ValidatorServices { + fn new(tvu: Tvu) -> Self { + ValidatorServices { tvu } + } + + fn join(self) -> Result<()> { + self.tvu.join() + } +} + +pub enum FullNodeReturnType { + LeaderRotation, +} + pub struct Fullnode { exit: Arc, - thread_hdls: Vec>, + rpu: Rpu, + rpc_service: JsonRpcService, + ncp: Ncp, + pub node_role: NodeRole, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -176,7 +221,6 @@ impl Fullnode { let exit = Arc::new(AtomicBool::new(false)); let bank = Arc::new(bank); let blob_recycler = BlobRecycler::default(); - let mut thread_hdls = vec![]; let rpu = Rpu::new( &bank, @@ -185,7 +229,6 @@ impl Fullnode { &blob_recycler, exit.clone(), ); - thread_hdls.extend(rpu.thread_hdls()); // TODO: this code assumes this node is the leader let mut drone_addr = node.info.contact_info.tpu; @@ -198,7 +241,6 @@ impl Fullnode { rpc_addr, exit.clone(), ); - thread_hdls.extend(rpc_service.thread_hdls()); let window = window::new_window_from_entries(ledger_tail, entry_height, &node.info, &blob_recycler); @@ -214,8 +256,8 @@ impl Fullnode { node.sockets.gossip, exit.clone(), ); - thread_hdls.extend(ncp.thread_hdls()); + let node_role; match leader_info { Some(leader_info) => { // Start in validator mode. @@ -234,7 +276,8 @@ impl Fullnode { ledger_path, exit.clone(), ); - thread_hdls.extend(tvu.thread_hdls()); + let validator_state = ValidatorServices::new(tvu); + node_role = NodeRole::Validator(validator_state); } None => { // Start in leader mode. @@ -254,7 +297,6 @@ impl Fullnode { ledger_path, sigverify_disabled, ); - thread_hdls.extend(tpu.thread_hdls()); let broadcast_stage = BroadcastStage::new( node.sockets.broadcast, @@ -264,33 +306,51 @@ impl Fullnode { blob_recycler.clone(), blob_receiver, ); - thread_hdls.extend(broadcast_stage.thread_hdls()); + let leader_state = LeaderServices::new(tpu, broadcast_stage); + node_role = NodeRole::Leader(leader_state); } } - Fullnode { exit, thread_hdls } + Fullnode { + rpu, + ncp, + rpc_service, + node_role, + exit, + } } //used for notifying many nodes in parallel to exit pub fn exit(&self) { self.exit.store(true, Ordering::Relaxed); } - pub fn close(self) -> Result<()> { + + pub fn close(self) -> Result<(Option)> { self.exit(); self.join() } } impl Service for Fullnode { - fn thread_hdls(self) -> Vec> { - self.thread_hdls - } + type JoinReturnType = Option; - fn join(self) -> Result<()> { - for thread_hdl in self.thread_hdls() { - thread_hdl.join()?; + fn join(self) -> Result> { + self.rpu.join()?; + self.ncp.join()?; + self.rpc_service.join()?; + match self.node_role { + NodeRole::Validator(validator_service) => { + validator_service.join()?; + } + NodeRole::Leader(leader_service) => { + leader_service.join()?; + } } - Ok(()) + + // TODO: Case on join values above to determine if + // a leader rotation was in order, and propagate up a + // signal to reflect that + Ok(None) } } diff --git a/src/ncp.rs b/src/ncp.rs index 7f6b793db3..664efb72df 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -66,12 +66,10 @@ impl Ncp { } impl Service for Ncp { - fn thread_hdls(self) -> Vec> { - self.thread_hdls - } + type JoinReturnType = (); fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls() { + for thread_hdl in self.thread_hdls { thread_hdl.join()?; } Ok(()) diff --git a/src/record_stage.rs b/src/record_stage.rs index dc7cee0fc8..43891c15c4 100644 --- a/src/record_stage.rs +++ b/src/record_stage.rs @@ -127,9 +127,7 @@ impl RecordStage { } impl Service for RecordStage { - fn thread_hdls(self) -> Vec> { - vec![self.thread_hdl] - } + type JoinReturnType = (); fn join(self) -> thread::Result<()> { self.thread_hdl.join() diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 3e33ce40c7..5e4795c9bc 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -22,6 +22,7 @@ use vote_stage::VoteStage; pub struct ReplicateStage { thread_hdls: Vec>, + vote_stage: VoteStage, } impl ReplicateStage { @@ -113,21 +114,23 @@ impl ReplicateStage { }) .unwrap(); - let mut thread_hdls = vec![t_responder, t_replicate]; - thread_hdls.extend(vote_stage.thread_hdls()); + let thread_hdls = vec![t_responder, t_replicate]; - ReplicateStage { thread_hdls } + ReplicateStage { + thread_hdls, + vote_stage, + } } } impl Service for ReplicateStage { - fn thread_hdls(self) -> Vec> { - self.thread_hdls - } + type JoinReturnType = (); + fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls() { + for thread_hdl in self.thread_hdls { thread_hdl.join()?; } + self.vote_stage.join()?; Ok(()) } } diff --git a/src/request_stage.rs b/src/request_stage.rs index 893613b4d8..77abd12c18 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -116,9 +116,7 @@ impl RequestStage { } impl Service for RequestStage { - fn thread_hdls(self) -> Vec> { - vec![self.thread_hdl] - } + type JoinReturnType = (); fn join(self) -> thread::Result<()> { self.thread_hdl.join() diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index b0f0071f3c..3a0270fda5 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -113,12 +113,10 @@ impl RetransmitStage { } impl Service for RetransmitStage { - fn thread_hdls(self) -> Vec> { - self.thread_hdls - } + type JoinReturnType = (); fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls() { + for thread_hdl in self.thread_hdls { thread_hdl.join()?; } Ok(()) diff --git a/src/rpc.rs b/src/rpc.rs index ee9a368f46..ed58a5a59b 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -68,9 +68,7 @@ impl JsonRpcService { } impl Service for JsonRpcService { - fn thread_hdls(self) -> Vec> { - vec![self.thread_hdl] - } + type JoinReturnType = (); fn join(self) -> thread::Result<()> { self.thread_hdl.join() diff --git a/src/rpu.rs b/src/rpu.rs index e67db168d9..d3cc33621c 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -36,6 +36,7 @@ use std::thread::{self, JoinHandle}; use streamer; pub struct Rpu { + request_stage: RequestStage, thread_hdls: Vec>, } @@ -71,21 +72,22 @@ impl Rpu { blob_receiver, ); - let mut thread_hdls = vec![t_receiver, t_responder]; - thread_hdls.extend(request_stage.thread_hdls().into_iter()); - Rpu { thread_hdls } + let thread_hdls = vec![t_receiver, t_responder]; + Rpu { + thread_hdls, + request_stage, + } } } impl Service for Rpu { - fn thread_hdls(self) -> Vec> { - self.thread_hdls - } + type JoinReturnType = (); fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls() { + for thread_hdl in self.thread_hdls { thread_hdl.join()?; } + self.request_stage.join()?; Ok(()) } } diff --git a/src/service.rs b/src/service.rs index b00e48577d..2ac911ec5a 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,6 +1,7 @@ -use std::thread::{JoinHandle, Result}; +use std::thread::Result; pub trait Service { - fn thread_hdls(self) -> Vec>; - fn join(self) -> Result<()>; + type JoinReturnType; + + fn join(self) -> Result; } diff --git a/src/sigverify_stage.rs b/src/sigverify_stage.rs index d64543b6fc..93b9b4037b 100644 --- a/src/sigverify_stage.rs +++ b/src/sigverify_stage.rs @@ -126,12 +126,10 @@ impl SigVerifyStage { } impl Service for SigVerifyStage { - fn thread_hdls(self) -> Vec> { - self.thread_hdls - } + type JoinReturnType = (); fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls() { + for thread_hdl in self.thread_hdls { thread_hdl.join()?; } Ok(()) diff --git a/src/tpu.rs b/src/tpu.rs index 157094a554..7b0c98866a 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -37,7 +37,7 @@ use sigverify_stage::SigVerifyStage; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; -use std::thread::{self, JoinHandle}; +use std::thread; use std::time::Duration; use streamer::BlobReceiver; use write_stage::WriteStage; @@ -106,20 +106,15 @@ impl Tpu { } impl Service for Tpu { - fn thread_hdls(self) -> Vec> { - let mut thread_hdls = vec![]; - thread_hdls.extend(self.fetch_stage.thread_hdls().into_iter()); - thread_hdls.extend(self.sigverify_stage.thread_hdls().into_iter()); - thread_hdls.extend(self.banking_stage.thread_hdls().into_iter()); - thread_hdls.extend(self.record_stage.thread_hdls().into_iter()); - thread_hdls.extend(self.write_stage.thread_hdls().into_iter()); - thread_hdls - } + type JoinReturnType = (); fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls() { - thread_hdl.join()?; - } + self.fetch_stage.join()?; + self.sigverify_stage.join()?; + self.banking_stage.join()?; + self.record_stage.join()?; + self.write_stage.join()?; + Ok(()) } } diff --git a/src/tvu.rs b/src/tvu.rs index 31e2b7f4b7..fef15b46fa 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -47,7 +47,7 @@ use signature::Keypair; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; -use std::thread::{self, JoinHandle}; +use std::thread; use window::SharedWindow; pub struct Tvu { @@ -125,18 +125,13 @@ impl Tvu { } impl Service for Tvu { - fn thread_hdls(self) -> Vec> { - let mut thread_hdls = vec![]; - thread_hdls.extend(self.replicate_stage.thread_hdls().into_iter()); - thread_hdls.extend(self.fetch_stage.thread_hdls().into_iter()); - thread_hdls.extend(self.retransmit_stage.thread_hdls().into_iter()); - thread_hdls - } + type JoinReturnType = (); fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls() { - thread_hdl.join()?; - } + self.replicate_stage.join()?; + self.fetch_stage.join()?; + self.retransmit_stage.join()?; + Ok(()) } } diff --git a/src/vote_stage.rs b/src/vote_stage.rs index 784a5360ef..05c4e32ce2 100644 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -213,9 +213,7 @@ impl VoteStage { } impl Service for VoteStage { - fn thread_hdls(self) -> Vec> { - vec![self.thread_hdl] - } + type JoinReturnType = (); fn join(self) -> thread::Result<()> { self.thread_hdl.join()?; diff --git a/src/write_stage.rs b/src/write_stage.rs index 7f3efc439f..1f8a546906 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -135,11 +135,10 @@ impl WriteStage { } impl Service for WriteStage { - fn thread_hdls(self) -> Vec> { - self.thread_hdls - } + type JoinReturnType = (); + fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls() { + for thread_hdl in self.thread_hdls { thread_hdl.join()?; } Ok(())