Add Service trait

Added a consistent interface to all the microservices.
This commit is contained in:
Greg Fitzgerald
2018-07-03 22:14:08 -06:00
parent 44150b2e85
commit 77bf17064a
20 changed files with 268 additions and 75 deletions

View File

@ -9,11 +9,12 @@ use packet::{PacketRecycler, Packets, SharedPackets};
use rayon::prelude::*; use rayon::prelude::*;
use record_stage::Signal; use record_stage::Signal;
use result::Result; use result::Result;
use service::Service;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc; use std::sync::Arc;
use std::thread::{Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use timing; use timing;
@ -22,7 +23,7 @@ use transaction::Transaction;
/// Stores the stage's thread handle and output receiver. /// Stores the stage's thread handle and output receiver.
pub struct BankingStage { pub struct BankingStage {
/// Handle to the stage's thread. /// Handle to the stage's thread.
pub thread_hdl: JoinHandle<()>, thread_hdl: JoinHandle<()>,
} }
impl BankingStage { impl BankingStage {
@ -130,6 +131,16 @@ impl BankingStage {
} }
} }
impl Service for BankingStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
vec![self.thread_hdl]
}
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
// TODO: When banking is pulled out of RequestStage, add this test back in. // TODO: When banking is pulled out of RequestStage, add this test back in.
//use bank::Bank; //use bank::Bank;

View File

@ -13,6 +13,7 @@ use solana::hash::Hash;
use solana::mint::Mint; use solana::mint::Mint;
use solana::nat::udp_public_bind; use solana::nat::udp_public_bind;
use solana::ncp::Ncp; use solana::ncp::Ncp;
use solana::service::Service;
use solana::signature::{GenKeys, KeyPair, KeyPairUtil}; use solana::signature::{GenKeys, KeyPair, KeyPairUtil};
use solana::streamer::default_window; use solana::streamer::default_window;
use solana::thin_client::ThinClient; use solana::thin_client::ThinClient;
@ -373,7 +374,7 @@ fn converge(
} }
sleep(Duration::new(1, 0)); sleep(Duration::new(1, 0));
} }
threads.extend(ncp.thread_hdls.into_iter()); threads.extend(ncp.thread_hdls().into_iter());
rv rv
} }

View File

@ -9,6 +9,7 @@ use atty::{is, Stream};
use getopts::Options; use getopts::Options;
use solana::crdt::{ReplicatedData, TestNode}; use solana::crdt::{ReplicatedData, TestNode};
use solana::fullnode::{FullNode, InFile, OutFile}; use solana::fullnode::{FullNode, InFile, OutFile};
use solana::service::Service;
use std::env; use std::env;
use std::fs::File; use std::fs::File;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
@ -94,7 +95,5 @@ fn main() -> () {
}; };
FullNode::new(node, true, InFile::StdIn, None, Some(outfile), exit) FullNode::new(node, true, InFile::StdIn, None, Some(outfile), exit)
}; };
for t in fullnode.thread_hdls { fullnode.join().expect("join");
t.join().expect("join");
}
} }

View File

@ -1,15 +1,16 @@
//! The `blob_fetch_stage` pulls blobs from UDP sockets and sends it to a channel. //! The `blob_fetch_stage` pulls blobs from UDP sockets and sends it to a channel.
use packet::BlobRecycler; use packet::BlobRecycler;
use service::Service;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::Arc; use std::sync::Arc;
use std::thread::JoinHandle; use std::thread::{self, JoinHandle};
use streamer::{self, BlobReceiver}; use streamer::{self, BlobReceiver};
pub struct BlobFetchStage { pub struct BlobFetchStage {
pub thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
} }
impl BlobFetchStage { impl BlobFetchStage {
@ -41,3 +42,16 @@ impl BlobFetchStage {
(BlobFetchStage { thread_hdls }, blob_receiver) (BlobFetchStage { thread_hdls }, blob_receiver)
} }
} }
impl Service for BlobFetchStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
self.thread_hdls
}
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() {
thread_hdl.join()?;
}
Ok(())
}
}

View File

@ -136,6 +136,7 @@ mod tests {
use fullnode::FullNode; use fullnode::FullNode;
use logger; use logger;
use mint::Mint; use mint::Mint;
use service::Service;
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
use std::io::sink; use std::io::sink;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
@ -304,8 +305,6 @@ mod tests {
assert_eq!(carlos_balance.unwrap(), TPS_BATCH); assert_eq!(carlos_balance.unwrap(), TPS_BATCH);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
for t in server.thread_hdls { server.join().unwrap();
t.join().unwrap();
}
} }
} }

View File

@ -1,15 +1,16 @@
//! The `fetch_stage` batches input from a UDP socket and sends it to a channel. //! The `fetch_stage` batches input from a UDP socket and sends it to a channel.
use packet::PacketRecycler; use packet::PacketRecycler;
use service::Service;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::Arc; use std::sync::Arc;
use std::thread::JoinHandle; use std::thread::{self, JoinHandle};
use streamer::{self, PacketReceiver}; use streamer::{self, PacketReceiver};
pub struct FetchStage { pub struct FetchStage {
pub thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
} }
impl FetchStage { impl FetchStage {
@ -41,3 +42,16 @@ impl FetchStage {
(FetchStage { thread_hdls }, packet_receiver) (FetchStage { thread_hdls }, packet_receiver)
} }
} }
impl Service for FetchStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
self.thread_hdls
}
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() {
thread_hdl.join()?;
}
Ok(())
}
}

View File

@ -6,13 +6,14 @@ use entry_writer;
use ncp::Ncp; use ncp::Ncp;
use packet::BlobRecycler; use packet::BlobRecycler;
use rpu::Rpu; use rpu::Rpu;
use service::Service;
use std::fs::{File, OpenOptions}; use std::fs::{File, OpenOptions};
use std::io::{sink, stdin, stdout, BufReader}; use std::io::{sink, stdin, stdout, BufReader};
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::JoinHandle; use std::thread::{JoinHandle, Result};
use std::time::Duration; use std::time::Duration;
use streamer; use streamer;
use tpu::Tpu; use tpu::Tpu;
@ -20,7 +21,7 @@ use tvu::Tvu;
//use std::time::Duration; //use std::time::Duration;
pub struct FullNode { pub struct FullNode {
pub thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
} }
pub enum InFile { pub enum InFile {
@ -152,7 +153,7 @@ impl FullNode {
node.sockets.respond, node.sockets.respond,
exit.clone(), exit.clone(),
); );
thread_hdls.extend(rpu.thread_hdls); thread_hdls.extend(rpu.thread_hdls());
let blob_recycler = BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
let (tpu, blob_receiver) = Tpu::new( let (tpu, blob_receiver) = Tpu::new(
@ -163,7 +164,7 @@ impl FullNode {
exit.clone(), exit.clone(),
writer, writer,
); );
thread_hdls.extend(tpu.thread_hdls); thread_hdls.extend(tpu.thread_hdls());
let crdt = Arc::new(RwLock::new(Crdt::new(node.data))); let crdt = Arc::new(RwLock::new(Crdt::new(node.data)));
let window = streamer::default_window(); let window = streamer::default_window();
let ncp = Ncp::new( let ncp = Ncp::new(
@ -173,7 +174,7 @@ impl FullNode {
node.sockets.gossip_send, node.sockets.gossip_send,
exit.clone(), exit.clone(),
).expect("Ncp::new"); ).expect("Ncp::new");
thread_hdls.extend(ncp.thread_hdls); thread_hdls.extend(ncp.thread_hdls());
let t_broadcast = streamer::broadcaster( let t_broadcast = streamer::broadcaster(
node.sockets.broadcast, node.sockets.broadcast,
@ -233,7 +234,7 @@ impl FullNode {
node.sockets.respond, node.sockets.respond,
exit.clone(), exit.clone(),
); );
thread_hdls.extend(rpu.thread_hdls); thread_hdls.extend(rpu.thread_hdls());
let crdt = Arc::new(RwLock::new(Crdt::new(node.data))); let crdt = Arc::new(RwLock::new(Crdt::new(node.data)));
crdt.write() crdt.write()
@ -258,11 +259,25 @@ impl FullNode {
node.sockets.retransmit, node.sockets.retransmit,
exit.clone(), exit.clone(),
); );
thread_hdls.extend(tvu.thread_hdls); thread_hdls.extend(tvu.thread_hdls());
thread_hdls.extend(ncp.thread_hdls); thread_hdls.extend(ncp.thread_hdls());
FullNode { thread_hdls } FullNode { thread_hdls }
} }
} }
impl Service for FullNode {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
self.thread_hdls
}
fn join(self) -> Result<()> {
for thread_hdl in self.thread_hdls() {
thread_hdl.join()?;
}
Ok(())
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use bank::Bank; use bank::Bank;

View File

@ -38,6 +38,7 @@ pub mod request_processor;
pub mod request_stage; pub mod request_stage;
pub mod result; pub mod result;
pub mod rpu; pub mod rpu;
pub mod service;
pub mod signature; pub mod signature;
pub mod sigverify; pub mod sigverify;
pub mod sigverify_stage; pub mod sigverify_stage;

View File

@ -3,15 +3,16 @@
use crdt::Crdt; use crdt::Crdt;
use packet::{BlobRecycler, SharedBlob}; use packet::{BlobRecycler, SharedBlob};
use result::Result; use result::Result;
use service::Service;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::JoinHandle; use std::thread::{self, JoinHandle};
use streamer; use streamer;
pub struct Ncp { pub struct Ncp {
pub thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
} }
impl Ncp { impl Ncp {
@ -56,6 +57,19 @@ impl Ncp {
} }
} }
impl Service for Ncp {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
self.thread_hdls
}
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() {
thread_hdl.join()?;
}
Ok(())
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crdt::{Crdt, TestNode}; use crdt::{Crdt, TestNode};

View File

@ -8,8 +8,9 @@
use entry::Entry; use entry::Entry;
use hash::Hash; use hash::Hash;
use recorder::Recorder; use recorder::Recorder;
use service::Service;
use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError}; use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError};
use std::thread::{Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use transaction::Transaction; use transaction::Transaction;
@ -20,7 +21,7 @@ pub enum Signal {
} }
pub struct RecordStage { pub struct RecordStage {
pub thread_hdl: JoinHandle<()>, thread_hdl: JoinHandle<()>,
} }
impl RecordStage { impl RecordStage {
@ -124,6 +125,16 @@ impl RecordStage {
} }
} }
impl Service for RecordStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
vec![self.thread_hdl]
}
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -3,14 +3,15 @@
use bank::Bank; use bank::Bank;
use ledger; use ledger;
use result::Result; use result::Result;
use service::Service;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::thread::{Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer::BlobReceiver; use streamer::BlobReceiver;
pub struct ReplicateStage { pub struct ReplicateStage {
pub thread_hdl: JoinHandle<()>, thread_hdl: JoinHandle<()>,
} }
impl ReplicateStage { impl ReplicateStage {
@ -41,3 +42,13 @@ impl ReplicateStage {
ReplicateStage { thread_hdl } ReplicateStage { thread_hdl }
} }
} }
impl Service for ReplicateStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
vec![self.thread_hdl]
}
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}

View File

@ -6,17 +6,18 @@ use rayon::prelude::*;
use request::Request; use request::Request;
use request_processor::RequestProcessor; use request_processor::RequestProcessor;
use result::Result; use result::Result;
use service::Service;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver}; use std::sync::mpsc::{channel, Receiver};
use std::sync::Arc; use std::sync::Arc;
use std::thread::{Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::Instant; use std::time::Instant;
use streamer::{self, BlobReceiver, BlobSender}; use streamer::{self, BlobReceiver, BlobSender};
use timing; use timing;
pub struct RequestStage { pub struct RequestStage {
pub thread_hdl: JoinHandle<()>, thread_hdl: JoinHandle<()>,
pub request_processor: Arc<RequestProcessor>, pub request_processor: Arc<RequestProcessor>,
} }
@ -114,3 +115,13 @@ impl RequestStage {
) )
} }
} }
impl Service for RequestStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
vec![self.thread_hdl]
}
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}

View File

@ -27,15 +27,16 @@ use bank::Bank;
use packet::{BlobRecycler, PacketRecycler}; use packet::{BlobRecycler, PacketRecycler};
use request_processor::RequestProcessor; use request_processor::RequestProcessor;
use request_stage::RequestStage; use request_stage::RequestStage;
use service::Service;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::Arc; use std::sync::Arc;
use std::thread::JoinHandle; use std::thread::{self, JoinHandle};
use streamer; use streamer;
pub struct Rpu { pub struct Rpu {
pub thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
} }
impl Rpu { impl Rpu {
@ -71,7 +72,21 @@ impl Rpu {
blob_receiver, blob_receiver,
); );
let thread_hdls = vec![t_receiver, t_responder, request_stage.thread_hdl]; let mut thread_hdls = vec![t_receiver, t_responder];
thread_hdls.extend(request_stage.thread_hdls().into_iter());
Rpu { thread_hdls } Rpu { thread_hdls }
} }
} }
impl Service for Rpu {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
self.thread_hdls
}
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() {
thread_hdl.join()?;
}
Ok(())
}
}

6
src/service.rs Normal file
View File

@ -0,0 +1,6 @@
use std::thread::{JoinHandle, Result};
pub trait Service {
fn thread_hdls(self) -> Vec<JoinHandle<()>>;
fn join(self) -> Result<()>;
}

View File

@ -8,17 +8,18 @@
use packet::SharedPackets; use packet::SharedPackets;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use result::Result; use result::Result;
use service::Service;
use sigverify; use sigverify;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread::{spawn, JoinHandle}; use std::thread::{self, spawn, JoinHandle};
use std::time::Instant; use std::time::Instant;
use streamer::{self, PacketReceiver}; use streamer::{self, PacketReceiver};
use timing; use timing;
pub struct SigVerifyStage { pub struct SigVerifyStage {
pub thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
} }
impl SigVerifyStage { impl SigVerifyStage {
@ -98,3 +99,16 @@ impl SigVerifyStage {
.collect() .collect()
} }
} }
impl Service for SigVerifyStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
self.thread_hdls
}
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() {
thread_hdl.join()?;
}
Ok(())
}
}

View File

@ -229,6 +229,7 @@ mod tests {
use fullnode::FullNode; use fullnode::FullNode;
use logger; use logger;
use mint::Mint; use mint::Mint;
use service::Service;
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
use std::io::sink; use std::io::sink;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
@ -274,9 +275,7 @@ mod tests {
let balance = client.poll_get_balance(&bob_pubkey); let balance = client.poll_get_balance(&bob_pubkey);
assert_eq!(balance.unwrap(), 500); assert_eq!(balance.unwrap(), 500);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
for t in server.thread_hdls { server.join().unwrap();
t.join().unwrap();
}
} }
#[test] #[test]
@ -328,9 +327,7 @@ mod tests {
let balance = client.poll_get_balance(&bob_pubkey); let balance = client.poll_get_balance(&bob_pubkey);
assert_eq!(balance.unwrap(), 500); assert_eq!(balance.unwrap(), 500);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
for t in server.thread_hdls { server.join().unwrap();
t.join().unwrap();
}
} }
#[test] #[test]
@ -372,8 +369,6 @@ mod tests {
assert!(client.check_signature(&sig)); assert!(client.check_signature(&sig));
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
for t in server.thread_hdls { server.join().unwrap();
t.join().unwrap();
}
} }
} }

View File

@ -30,18 +30,23 @@ use banking_stage::BankingStage;
use fetch_stage::FetchStage; use fetch_stage::FetchStage;
use packet::{BlobRecycler, PacketRecycler}; use packet::{BlobRecycler, PacketRecycler};
use record_stage::RecordStage; use record_stage::RecordStage;
use service::Service;
use sigverify_stage::SigVerifyStage; use sigverify_stage::SigVerifyStage;
use std::io::Write; use std::io::Write;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use std::thread::JoinHandle; use std::thread::{self, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer::BlobReceiver; use streamer::BlobReceiver;
use write_stage::WriteStage; use write_stage::WriteStage;
pub struct Tpu { pub struct Tpu {
pub thread_hdls: Vec<JoinHandle<()>>, fetch_stage: FetchStage,
sigverify_stage: SigVerifyStage,
banking_stage: BankingStage,
record_stage: RecordStage,
write_stage: WriteStage,
} }
impl Tpu { impl Tpu {
@ -82,13 +87,33 @@ impl Tpu {
writer, writer,
entry_receiver, entry_receiver,
); );
let mut thread_hdls = vec![
banking_stage.thread_hdl, let tpu = Tpu {
record_stage.thread_hdl, fetch_stage,
write_stage.thread_hdl, sigverify_stage,
]; banking_stage,
thread_hdls.extend(fetch_stage.thread_hdls.into_iter()); record_stage,
thread_hdls.extend(sigverify_stage.thread_hdls.into_iter()); write_stage,
(Tpu { thread_hdls }, blob_receiver) };
(tpu, blob_receiver)
}
}
impl Service for Tpu {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
let mut thread_hdls = vec![];
thread_hdls.extend(self.fetch_stage.thread_hdls().into_iter());
thread_hdls.extend(self.sigverify_stage.thread_hdls().into_iter());
thread_hdls.extend(self.banking_stage.thread_hdls().into_iter());
thread_hdls.extend(self.record_stage.thread_hdls().into_iter());
thread_hdls.extend(self.write_stage.thread_hdls().into_iter());
thread_hdls
}
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() {
thread_hdl.join()?;
}
Ok(())
} }
} }

View File

@ -39,15 +39,18 @@ use blob_fetch_stage::BlobFetchStage;
use crdt::Crdt; use crdt::Crdt;
use packet::BlobRecycler; use packet::BlobRecycler;
use replicate_stage::ReplicateStage; use replicate_stage::ReplicateStage;
use service::Service;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::JoinHandle; use std::thread::{self, JoinHandle};
use streamer::Window; use streamer::Window;
use window_stage::WindowStage; use window_stage::WindowStage;
pub struct Tvu { pub struct Tvu {
pub thread_hdls: Vec<JoinHandle<()>>, replicate_stage: ReplicateStage,
fetch_stage: BlobFetchStage,
window_stage: WindowStage,
} }
impl Tvu { impl Tvu {
@ -93,15 +96,31 @@ impl Tvu {
let replicate_stage = ReplicateStage::new(bank, exit, blob_receiver); let replicate_stage = ReplicateStage::new(bank, exit, blob_receiver);
let mut threads = vec![replicate_stage.thread_hdl];
threads.extend(fetch_stage.thread_hdls.into_iter());
threads.extend(window_stage.thread_hdls.into_iter());
Tvu { Tvu {
thread_hdls: threads, replicate_stage,
fetch_stage,
window_stage,
} }
} }
} }
impl Service for Tvu {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
let mut thread_hdls = vec![];
thread_hdls.extend(self.replicate_stage.thread_hdls().into_iter());
thread_hdls.extend(self.fetch_stage.thread_hdls().into_iter());
thread_hdls.extend(self.window_stage.thread_hdls().into_iter());
thread_hdls
}
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() {
thread_hdl.join()?;
}
Ok(())
}
}
#[cfg(test)] #[cfg(test)]
pub mod tests { pub mod tests {
use bank::Bank; use bank::Bank;
@ -114,6 +133,7 @@ pub mod tests {
use ncp::Ncp; use ncp::Ncp;
use packet::BlobRecycler; use packet::BlobRecycler;
use result::Result; use result::Result;
use service::Service;
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::net::UdpSocket; use std::net::UdpSocket;
@ -264,18 +284,10 @@ pub mod tests {
assert_eq!(bob_balance, starting_balance - alice_ref_balance); assert_eq!(bob_balance, starting_balance - alice_ref_balance);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
for t in tvu.thread_hdls { tvu.join().expect("join");
t.join().expect("join"); dr_l.0.join().expect("join");
} dr_2.0.join().expect("join");
for t in dr_l.0.thread_hdls { dr_1.0.join().expect("join");
t.join().expect("join");
}
for t in dr_2.0.thread_hdls {
t.join().expect("join");
}
for t in dr_1.0.thread_hdls {
t.join().expect("join");
}
t_receiver.join().expect("join"); t_receiver.join().expect("join");
t_responder.join().expect("join"); t_responder.join().expect("join");
} }

View File

@ -2,15 +2,16 @@
use crdt::Crdt; use crdt::Crdt;
use packet::BlobRecycler; use packet::BlobRecycler;
use service::Service;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::JoinHandle; use std::thread::{self, JoinHandle};
use streamer::{self, BlobReceiver, Window}; use streamer::{self, BlobReceiver, Window};
pub struct WindowStage { pub struct WindowStage {
pub thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
} }
impl WindowStage { impl WindowStage {
@ -48,3 +49,16 @@ impl WindowStage {
(WindowStage { thread_hdls }, blob_receiver) (WindowStage { thread_hdls }, blob_receiver)
} }
} }
impl Service for WindowStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
self.thread_hdls
}
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() {
thread_hdl.join()?;
}
Ok(())
}
}

View File

@ -8,17 +8,18 @@ use entry_writer::EntryWriter;
use ledger::Block; use ledger::Block;
use packet::BlobRecycler; use packet::BlobRecycler;
use result::Result; use result::Result;
use service::Service;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::Write; use std::io::Write;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver}; use std::sync::mpsc::{channel, Receiver};
use std::sync::Arc; use std::sync::Arc;
use std::thread::{Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer::{BlobReceiver, BlobSender}; use streamer::{BlobReceiver, BlobSender};
pub struct WriteStage { pub struct WriteStage {
pub thread_hdl: JoinHandle<()>, thread_hdl: JoinHandle<()>,
} }
impl WriteStage { impl WriteStage {
@ -73,3 +74,13 @@ impl WriteStage {
(WriteStage { thread_hdl }, blob_receiver) (WriteStage { thread_hdl }, blob_receiver)
} }
} }
impl Service for WriteStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
vec![self.thread_hdl]
}
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}