diff --git a/src/banking_stage.rs b/src/banking_stage.rs index a2c8c7b8ce..871df48432 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -9,11 +9,12 @@ use packet::{PacketRecycler, Packets, SharedPackets}; use rayon::prelude::*; use record_stage::Signal; use result::Result; +use service::Service; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; -use std::thread::{Builder, JoinHandle}; +use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use std::time::Instant; use timing; @@ -22,7 +23,7 @@ use transaction::Transaction; /// Stores the stage's thread handle and output receiver. pub struct BankingStage { /// Handle to the stage's thread. - pub thread_hdl: JoinHandle<()>, + thread_hdl: JoinHandle<()>, } impl BankingStage { @@ -130,6 +131,16 @@ impl BankingStage { } } +impl Service for BankingStage { + fn thread_hdls(self) -> Vec> { + vec![self.thread_hdl] + } + + fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} + // TODO: When banking is pulled out of RequestStage, add this test back in. //use bank::Bank; diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 92bc03e06e..a366f268c9 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -13,6 +13,7 @@ use solana::hash::Hash; use solana::mint::Mint; use solana::nat::udp_public_bind; use solana::ncp::Ncp; +use solana::service::Service; use solana::signature::{GenKeys, KeyPair, KeyPairUtil}; use solana::streamer::default_window; use solana::thin_client::ThinClient; @@ -373,7 +374,7 @@ fn converge( } sleep(Duration::new(1, 0)); } - threads.extend(ncp.thread_hdls.into_iter()); + threads.extend(ncp.thread_hdls().into_iter()); rv } diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 8adb99554d..facd1c2116 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -9,6 +9,7 @@ use atty::{is, Stream}; use getopts::Options; use solana::crdt::{ReplicatedData, TestNode}; use solana::fullnode::{FullNode, InFile, OutFile}; +use solana::service::Service; use std::env; use std::fs::File; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -94,7 +95,5 @@ fn main() -> () { }; FullNode::new(node, true, InFile::StdIn, None, Some(outfile), exit) }; - for t in fullnode.thread_hdls { - t.join().expect("join"); - } + fullnode.join().expect("join"); } diff --git a/src/blob_fetch_stage.rs b/src/blob_fetch_stage.rs index 550780f207..168adeb273 100644 --- a/src/blob_fetch_stage.rs +++ b/src/blob_fetch_stage.rs @@ -1,15 +1,16 @@ //! The `blob_fetch_stage` pulls blobs from UDP sockets and sends it to a channel. use packet::BlobRecycler; +use service::Service; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::Arc; -use std::thread::JoinHandle; +use std::thread::{self, JoinHandle}; use streamer::{self, BlobReceiver}; pub struct BlobFetchStage { - pub thread_hdls: Vec>, + thread_hdls: Vec>, } impl BlobFetchStage { @@ -41,3 +42,16 @@ impl BlobFetchStage { (BlobFetchStage { thread_hdls }, blob_receiver) } } + +impl Service for BlobFetchStage { + fn thread_hdls(self) -> Vec> { + self.thread_hdls + } + + fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls() { + thread_hdl.join()?; + } + Ok(()) + } +} diff --git a/src/drone.rs b/src/drone.rs index be5d1e12dd..2bae58d128 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -136,6 +136,7 @@ mod tests { use fullnode::FullNode; use logger; use mint::Mint; + use service::Service; use signature::{KeyPair, KeyPairUtil}; use std::io::sink; use std::net::{SocketAddr, UdpSocket}; @@ -304,8 +305,6 @@ mod tests { assert_eq!(carlos_balance.unwrap(), TPS_BATCH); exit.store(true, Ordering::Relaxed); - for t in server.thread_hdls { - t.join().unwrap(); - } + server.join().unwrap(); } } diff --git a/src/fetch_stage.rs b/src/fetch_stage.rs index bf117c1e38..1cf3274533 100644 --- a/src/fetch_stage.rs +++ b/src/fetch_stage.rs @@ -1,15 +1,16 @@ //! The `fetch_stage` batches input from a UDP socket and sends it to a channel. use packet::PacketRecycler; +use service::Service; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::Arc; -use std::thread::JoinHandle; +use std::thread::{self, JoinHandle}; use streamer::{self, PacketReceiver}; pub struct FetchStage { - pub thread_hdls: Vec>, + thread_hdls: Vec>, } impl FetchStage { @@ -41,3 +42,16 @@ impl FetchStage { (FetchStage { thread_hdls }, packet_receiver) } } + +impl Service for FetchStage { + fn thread_hdls(self) -> Vec> { + self.thread_hdls + } + + fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls() { + thread_hdl.join()?; + } + Ok(()) + } +} diff --git a/src/fullnode.rs b/src/fullnode.rs index 2c1bdb811d..b901473784 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -6,13 +6,14 @@ use entry_writer; use ncp::Ncp; use packet::BlobRecycler; use rpu::Rpu; +use service::Service; use std::fs::{File, OpenOptions}; use std::io::{sink, stdin, stdout, BufReader}; use std::io::{Read, Write}; use std::net::SocketAddr; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; -use std::thread::JoinHandle; +use std::thread::{JoinHandle, Result}; use std::time::Duration; use streamer; use tpu::Tpu; @@ -20,7 +21,7 @@ use tvu::Tvu; //use std::time::Duration; pub struct FullNode { - pub thread_hdls: Vec>, + thread_hdls: Vec>, } pub enum InFile { @@ -152,7 +153,7 @@ impl FullNode { node.sockets.respond, exit.clone(), ); - thread_hdls.extend(rpu.thread_hdls); + thread_hdls.extend(rpu.thread_hdls()); let blob_recycler = BlobRecycler::default(); let (tpu, blob_receiver) = Tpu::new( @@ -163,7 +164,7 @@ impl FullNode { exit.clone(), writer, ); - thread_hdls.extend(tpu.thread_hdls); + thread_hdls.extend(tpu.thread_hdls()); let crdt = Arc::new(RwLock::new(Crdt::new(node.data))); let window = streamer::default_window(); let ncp = Ncp::new( @@ -173,7 +174,7 @@ impl FullNode { node.sockets.gossip_send, exit.clone(), ).expect("Ncp::new"); - thread_hdls.extend(ncp.thread_hdls); + thread_hdls.extend(ncp.thread_hdls()); let t_broadcast = streamer::broadcaster( node.sockets.broadcast, @@ -233,7 +234,7 @@ impl FullNode { node.sockets.respond, exit.clone(), ); - thread_hdls.extend(rpu.thread_hdls); + thread_hdls.extend(rpu.thread_hdls()); let crdt = Arc::new(RwLock::new(Crdt::new(node.data))); crdt.write() @@ -258,11 +259,25 @@ impl FullNode { node.sockets.retransmit, exit.clone(), ); - thread_hdls.extend(tvu.thread_hdls); - thread_hdls.extend(ncp.thread_hdls); + thread_hdls.extend(tvu.thread_hdls()); + thread_hdls.extend(ncp.thread_hdls()); FullNode { thread_hdls } } } + +impl Service for FullNode { + fn thread_hdls(self) -> Vec> { + self.thread_hdls + } + + fn join(self) -> Result<()> { + for thread_hdl in self.thread_hdls() { + thread_hdl.join()?; + } + Ok(()) + } +} + #[cfg(test)] mod tests { use bank::Bank; diff --git a/src/lib.rs b/src/lib.rs index b7206934aa..d91604f0bd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ pub mod request_processor; pub mod request_stage; pub mod result; pub mod rpu; +pub mod service; pub mod signature; pub mod sigverify; pub mod sigverify_stage; diff --git a/src/ncp.rs b/src/ncp.rs index 883f1e17a0..bbf7bd6ad9 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -3,15 +3,16 @@ use crdt::Crdt; use packet::{BlobRecycler, SharedBlob}; use result::Result; +use service::Service; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; -use std::thread::JoinHandle; +use std::thread::{self, JoinHandle}; use streamer; pub struct Ncp { - pub thread_hdls: Vec>, + thread_hdls: Vec>, } impl Ncp { @@ -56,6 +57,19 @@ impl Ncp { } } +impl Service for Ncp { + fn thread_hdls(self) -> Vec> { + self.thread_hdls + } + + fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls() { + thread_hdl.join()?; + } + Ok(()) + } +} + #[cfg(test)] mod tests { use crdt::{Crdt, TestNode}; diff --git a/src/record_stage.rs b/src/record_stage.rs index 4d50f258c4..4f8651a7b2 100644 --- a/src/record_stage.rs +++ b/src/record_stage.rs @@ -8,8 +8,9 @@ use entry::Entry; use hash::Hash; use recorder::Recorder; +use service::Service; use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError}; -use std::thread::{Builder, JoinHandle}; +use std::thread::{self, Builder, JoinHandle}; use std::time::{Duration, Instant}; use transaction::Transaction; @@ -20,7 +21,7 @@ pub enum Signal { } pub struct RecordStage { - pub thread_hdl: JoinHandle<()>, + thread_hdl: JoinHandle<()>, } impl RecordStage { @@ -124,6 +125,16 @@ impl RecordStage { } } +impl Service for RecordStage { + fn thread_hdls(self) -> Vec> { + vec![self.thread_hdl] + } + + fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index b51481727a..661edf60e5 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -3,14 +3,15 @@ use bank::Bank; use ledger; use result::Result; +use service::Service; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::thread::{Builder, JoinHandle}; +use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use streamer::BlobReceiver; pub struct ReplicateStage { - pub thread_hdl: JoinHandle<()>, + thread_hdl: JoinHandle<()>, } impl ReplicateStage { @@ -41,3 +42,13 @@ impl ReplicateStage { ReplicateStage { thread_hdl } } } + +impl Service for ReplicateStage { + fn thread_hdls(self) -> Vec> { + vec![self.thread_hdl] + } + + fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} diff --git a/src/request_stage.rs b/src/request_stage.rs index c047a6da22..de8304b013 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -6,17 +6,18 @@ use rayon::prelude::*; use request::Request; use request_processor::RequestProcessor; use result::Result; +use service::Service; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver}; use std::sync::Arc; -use std::thread::{Builder, JoinHandle}; +use std::thread::{self, Builder, JoinHandle}; use std::time::Instant; use streamer::{self, BlobReceiver, BlobSender}; use timing; pub struct RequestStage { - pub thread_hdl: JoinHandle<()>, + thread_hdl: JoinHandle<()>, pub request_processor: Arc, } @@ -114,3 +115,13 @@ impl RequestStage { ) } } + +impl Service for RequestStage { + fn thread_hdls(self) -> Vec> { + vec![self.thread_hdl] + } + + fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} diff --git a/src/rpu.rs b/src/rpu.rs index 03f876226a..4447bbe8fb 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -27,15 +27,16 @@ use bank::Bank; use packet::{BlobRecycler, PacketRecycler}; use request_processor::RequestProcessor; use request_stage::RequestStage; +use service::Service; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::Arc; -use std::thread::JoinHandle; +use std::thread::{self, JoinHandle}; use streamer; pub struct Rpu { - pub thread_hdls: Vec>, + thread_hdls: Vec>, } impl Rpu { @@ -71,7 +72,21 @@ impl Rpu { blob_receiver, ); - let thread_hdls = vec![t_receiver, t_responder, request_stage.thread_hdl]; + let mut thread_hdls = vec![t_receiver, t_responder]; + thread_hdls.extend(request_stage.thread_hdls().into_iter()); Rpu { thread_hdls } } } + +impl Service for Rpu { + fn thread_hdls(self) -> Vec> { + self.thread_hdls + } + + fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls() { + thread_hdl.join()?; + } + Ok(()) + } +} diff --git a/src/service.rs b/src/service.rs new file mode 100644 index 0000000000..b00e48577d --- /dev/null +++ b/src/service.rs @@ -0,0 +1,6 @@ +use std::thread::{JoinHandle, Result}; + +pub trait Service { + fn thread_hdls(self) -> Vec>; + fn join(self) -> Result<()>; +} diff --git a/src/sigverify_stage.rs b/src/sigverify_stage.rs index 7e2a74cb43..a6eeda8a2e 100644 --- a/src/sigverify_stage.rs +++ b/src/sigverify_stage.rs @@ -8,17 +8,18 @@ use packet::SharedPackets; use rand::{thread_rng, Rng}; use result::Result; +use service::Service; use sigverify; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; -use std::thread::{spawn, JoinHandle}; +use std::thread::{self, spawn, JoinHandle}; use std::time::Instant; use streamer::{self, PacketReceiver}; use timing; pub struct SigVerifyStage { - pub thread_hdls: Vec>, + thread_hdls: Vec>, } impl SigVerifyStage { @@ -98,3 +99,16 @@ impl SigVerifyStage { .collect() } } + +impl Service for SigVerifyStage { + fn thread_hdls(self) -> Vec> { + self.thread_hdls + } + + fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls() { + thread_hdl.join()?; + } + Ok(()) + } +} diff --git a/src/thin_client.rs b/src/thin_client.rs index cd1866f0c9..164e519e8c 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -229,6 +229,7 @@ mod tests { use fullnode::FullNode; use logger; use mint::Mint; + use service::Service; use signature::{KeyPair, KeyPairUtil}; use std::io::sink; use std::sync::atomic::{AtomicBool, Ordering}; @@ -274,9 +275,7 @@ mod tests { let balance = client.poll_get_balance(&bob_pubkey); assert_eq!(balance.unwrap(), 500); exit.store(true, Ordering::Relaxed); - for t in server.thread_hdls { - t.join().unwrap(); - } + server.join().unwrap(); } #[test] @@ -328,9 +327,7 @@ mod tests { let balance = client.poll_get_balance(&bob_pubkey); assert_eq!(balance.unwrap(), 500); exit.store(true, Ordering::Relaxed); - for t in server.thread_hdls { - t.join().unwrap(); - } + server.join().unwrap(); } #[test] @@ -372,8 +369,6 @@ mod tests { assert!(client.check_signature(&sig)); exit.store(true, Ordering::Relaxed); - for t in server.thread_hdls { - t.join().unwrap(); - } + server.join().unwrap(); } } diff --git a/src/tpu.rs b/src/tpu.rs index 0d1d35e11d..8ce230aa9d 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -30,18 +30,23 @@ use banking_stage::BankingStage; use fetch_stage::FetchStage; use packet::{BlobRecycler, PacketRecycler}; use record_stage::RecordStage; +use service::Service; use sigverify_stage::SigVerifyStage; use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::Arc; -use std::thread::JoinHandle; +use std::thread::{self, JoinHandle}; use std::time::Duration; use streamer::BlobReceiver; use write_stage::WriteStage; pub struct Tpu { - pub thread_hdls: Vec>, + fetch_stage: FetchStage, + sigverify_stage: SigVerifyStage, + banking_stage: BankingStage, + record_stage: RecordStage, + write_stage: WriteStage, } impl Tpu { @@ -82,13 +87,33 @@ impl Tpu { writer, entry_receiver, ); - let mut thread_hdls = vec![ - banking_stage.thread_hdl, - record_stage.thread_hdl, - write_stage.thread_hdl, - ]; - thread_hdls.extend(fetch_stage.thread_hdls.into_iter()); - thread_hdls.extend(sigverify_stage.thread_hdls.into_iter()); - (Tpu { thread_hdls }, blob_receiver) + + let tpu = Tpu { + fetch_stage, + sigverify_stage, + banking_stage, + record_stage, + write_stage, + }; + (tpu, blob_receiver) + } +} + +impl Service for Tpu { + fn thread_hdls(self) -> Vec> { + let mut thread_hdls = vec![]; + thread_hdls.extend(self.fetch_stage.thread_hdls().into_iter()); + thread_hdls.extend(self.sigverify_stage.thread_hdls().into_iter()); + thread_hdls.extend(self.banking_stage.thread_hdls().into_iter()); + thread_hdls.extend(self.record_stage.thread_hdls().into_iter()); + thread_hdls.extend(self.write_stage.thread_hdls().into_iter()); + thread_hdls + } + + fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls() { + thread_hdl.join()?; + } + Ok(()) } } diff --git a/src/tvu.rs b/src/tvu.rs index d28f0d2c21..aaf87f1e39 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -39,15 +39,18 @@ use blob_fetch_stage::BlobFetchStage; use crdt::Crdt; use packet::BlobRecycler; use replicate_stage::ReplicateStage; +use service::Service; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; -use std::thread::JoinHandle; +use std::thread::{self, JoinHandle}; use streamer::Window; use window_stage::WindowStage; pub struct Tvu { - pub thread_hdls: Vec>, + replicate_stage: ReplicateStage, + fetch_stage: BlobFetchStage, + window_stage: WindowStage, } impl Tvu { @@ -93,15 +96,31 @@ impl Tvu { let replicate_stage = ReplicateStage::new(bank, exit, blob_receiver); - let mut threads = vec![replicate_stage.thread_hdl]; - threads.extend(fetch_stage.thread_hdls.into_iter()); - threads.extend(window_stage.thread_hdls.into_iter()); Tvu { - thread_hdls: threads, + replicate_stage, + fetch_stage, + window_stage, } } } +impl Service for Tvu { + fn thread_hdls(self) -> Vec> { + let mut thread_hdls = vec![]; + thread_hdls.extend(self.replicate_stage.thread_hdls().into_iter()); + thread_hdls.extend(self.fetch_stage.thread_hdls().into_iter()); + thread_hdls.extend(self.window_stage.thread_hdls().into_iter()); + thread_hdls + } + + fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls() { + thread_hdl.join()?; + } + Ok(()) + } +} + #[cfg(test)] pub mod tests { use bank::Bank; @@ -114,6 +133,7 @@ pub mod tests { use ncp::Ncp; use packet::BlobRecycler; use result::Result; + use service::Service; use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; use std::net::UdpSocket; @@ -264,18 +284,10 @@ pub mod tests { assert_eq!(bob_balance, starting_balance - alice_ref_balance); exit.store(true, Ordering::Relaxed); - for t in tvu.thread_hdls { - t.join().expect("join"); - } - for t in dr_l.0.thread_hdls { - t.join().expect("join"); - } - for t in dr_2.0.thread_hdls { - t.join().expect("join"); - } - for t in dr_1.0.thread_hdls { - t.join().expect("join"); - } + tvu.join().expect("join"); + dr_l.0.join().expect("join"); + dr_2.0.join().expect("join"); + dr_1.0.join().expect("join"); t_receiver.join().expect("join"); t_responder.join().expect("join"); } diff --git a/src/window_stage.rs b/src/window_stage.rs index 9529f540e8..d5eaa6a278 100644 --- a/src/window_stage.rs +++ b/src/window_stage.rs @@ -2,15 +2,16 @@ use crdt::Crdt; use packet::BlobRecycler; +use service::Service; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; -use std::thread::JoinHandle; +use std::thread::{self, JoinHandle}; use streamer::{self, BlobReceiver, Window}; pub struct WindowStage { - pub thread_hdls: Vec>, + thread_hdls: Vec>, } impl WindowStage { @@ -48,3 +49,16 @@ impl WindowStage { (WindowStage { thread_hdls }, blob_receiver) } } + +impl Service for WindowStage { + fn thread_hdls(self) -> Vec> { + self.thread_hdls + } + + fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls() { + thread_hdl.join()?; + } + Ok(()) + } +} diff --git a/src/write_stage.rs b/src/write_stage.rs index f5ef96256a..04c315e1e3 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -8,17 +8,18 @@ use entry_writer::EntryWriter; use ledger::Block; use packet::BlobRecycler; use result::Result; +use service::Service; use std::collections::VecDeque; use std::io::Write; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver}; use std::sync::Arc; -use std::thread::{Builder, JoinHandle}; +use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use streamer::{BlobReceiver, BlobSender}; pub struct WriteStage { - pub thread_hdl: JoinHandle<()>, + thread_hdl: JoinHandle<()>, } impl WriteStage { @@ -73,3 +74,13 @@ impl WriteStage { (WriteStage { thread_hdl }, blob_receiver) } } + +impl Service for WriteStage { + fn thread_hdls(self) -> Vec> { + vec![self.thread_hdl] + } + + fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +}