Rewrote service trait join() method to allow thread join handles to return values other than () (#1213)

This commit is contained in:
carllin
2018-09-13 14:00:17 -07:00
committed by GitHub
parent 1d7e87d430
commit 8706774ea7
19 changed files with 138 additions and 111 deletions

View File

@ -136,9 +136,7 @@ impl BankingStage {
} }
impl Service for BankingStage { impl Service for BankingStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> { type JoinReturnType = ();
vec![self.thread_hdl]
}
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
self.thread_hdl.join() self.thread_hdl.join()

View File

@ -32,7 +32,6 @@ use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::sleep; use std::thread::sleep;
use std::thread::Builder; use std::thread::Builder;
use std::thread::JoinHandle;
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
@ -512,8 +511,7 @@ fn main() {
let leader = poll_gossip_for_leader(network, None).expect("unable to find leader on network"); let leader = poll_gossip_for_leader(network, None).expect("unable to find leader on network");
let exit_signal = Arc::new(AtomicBool::new(false)); let exit_signal = Arc::new(AtomicBool::new(false));
let mut c_threads = vec![]; let (nodes, leader, ncp) = converge(&leader, &exit_signal, num_nodes);
let (nodes, leader) = converge(&leader, &exit_signal, num_nodes, &mut c_threads);
if nodes.len() < num_nodes { if nodes.len() < num_nodes {
println!( println!(
@ -693,17 +691,14 @@ fn main() {
); );
// join the crdt client threads // join the crdt client threads
for t in c_threads { ncp.join().unwrap();
t.join().unwrap();
}
} }
fn converge( fn converge(
leader: &NodeInfo, leader: &NodeInfo,
exit_signal: &Arc<AtomicBool>, exit_signal: &Arc<AtomicBool>,
num_nodes: usize, num_nodes: usize,
threads: &mut Vec<JoinHandle<()>>, ) -> (Vec<NodeInfo>, Option<NodeInfo>, Ncp) {
) -> (Vec<NodeInfo>, Option<NodeInfo>) {
//lets spy on the network //lets spy on the network
let (node, gossip_socket) = Crdt::spy_node(); let (node, gossip_socket) = Crdt::spy_node();
let mut spy_crdt = Crdt::new(node).expect("Crdt::new"); let mut spy_crdt = Crdt::new(node).expect("Crdt::new");
@ -749,9 +744,8 @@ fn converge(
} }
sleep(Duration::new(1, 0)); sleep(Duration::new(1, 0));
} }
threads.extend(ncp.thread_hdls().into_iter());
let leader = spy_ref.read().unwrap().leader_data().cloned(); let leader = spy_ref.read().unwrap().leader_data().cloned();
(v, leader) (v, leader, ncp)
} }
#[cfg(test)] #[cfg(test)]

View File

@ -44,12 +44,10 @@ impl BlobFetchStage {
} }
impl Service for BlobFetchStage { impl Service for BlobFetchStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> { type JoinReturnType = ();
self.thread_hdls
}
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() { for thread_hdl in self.thread_hdls {
thread_hdl.join()?; thread_hdl.join()?;
} }
Ok(()) Ok(())

View File

@ -191,9 +191,7 @@ impl BroadcastStage {
} }
impl Service for BroadcastStage { impl Service for BroadcastStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> { type JoinReturnType = ();
vec![self.thread_hdl]
}
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
self.thread_hdl.join() self.thread_hdl.join()

View File

@ -45,12 +45,10 @@ impl FetchStage {
} }
impl Service for FetchStage { impl Service for FetchStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> { type JoinReturnType = ();
self.thread_hdls
}
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() { for thread_hdl in self.thread_hdls {
thread_hdl.join()?; thread_hdl.join()?;
} }
Ok(()) Ok(())

View File

@ -15,15 +15,60 @@ use signature::{Keypair, KeypairUtil};
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{JoinHandle, Result}; use std::thread::Result;
use tpu::Tpu; use tpu::Tpu;
use tvu::Tvu; use tvu::Tvu;
use untrusted::Input; use untrusted::Input;
use window; use window;
pub enum NodeRole {
Leader(LeaderServices),
Validator(ValidatorServices),
}
pub struct LeaderServices {
tpu: Tpu,
broadcast_stage: BroadcastStage,
}
impl LeaderServices {
fn new(tpu: Tpu, broadcast_stage: BroadcastStage) -> Self {
LeaderServices {
tpu,
broadcast_stage,
}
}
fn join(self) -> Result<()> {
self.tpu.join()?;
self.broadcast_stage.join()
}
}
pub struct ValidatorServices {
tvu: Tvu,
}
impl ValidatorServices {
fn new(tvu: Tvu) -> Self {
ValidatorServices { tvu }
}
fn join(self) -> Result<()> {
self.tvu.join()
}
}
pub enum FullNodeReturnType {
LeaderRotation,
}
pub struct Fullnode { pub struct Fullnode {
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
thread_hdls: Vec<JoinHandle<()>>, rpu: Rpu,
rpc_service: JsonRpcService,
ncp: Ncp,
pub node_role: NodeRole,
} }
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
@ -176,7 +221,6 @@ impl Fullnode {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let bank = Arc::new(bank); let bank = Arc::new(bank);
let blob_recycler = BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
let mut thread_hdls = vec![];
let rpu = Rpu::new( let rpu = Rpu::new(
&bank, &bank,
@ -185,7 +229,6 @@ impl Fullnode {
&blob_recycler, &blob_recycler,
exit.clone(), exit.clone(),
); );
thread_hdls.extend(rpu.thread_hdls());
// TODO: this code assumes this node is the leader // TODO: this code assumes this node is the leader
let mut drone_addr = node.info.contact_info.tpu; let mut drone_addr = node.info.contact_info.tpu;
@ -198,7 +241,6 @@ impl Fullnode {
rpc_addr, rpc_addr,
exit.clone(), exit.clone(),
); );
thread_hdls.extend(rpc_service.thread_hdls());
let window = let window =
window::new_window_from_entries(ledger_tail, entry_height, &node.info, &blob_recycler); window::new_window_from_entries(ledger_tail, entry_height, &node.info, &blob_recycler);
@ -214,8 +256,8 @@ impl Fullnode {
node.sockets.gossip, node.sockets.gossip,
exit.clone(), exit.clone(),
); );
thread_hdls.extend(ncp.thread_hdls());
let node_role;
match leader_info { match leader_info {
Some(leader_info) => { Some(leader_info) => {
// Start in validator mode. // Start in validator mode.
@ -234,7 +276,8 @@ impl Fullnode {
ledger_path, ledger_path,
exit.clone(), exit.clone(),
); );
thread_hdls.extend(tvu.thread_hdls()); let validator_state = ValidatorServices::new(tvu);
node_role = NodeRole::Validator(validator_state);
} }
None => { None => {
// Start in leader mode. // Start in leader mode.
@ -254,7 +297,6 @@ impl Fullnode {
ledger_path, ledger_path,
sigverify_disabled, sigverify_disabled,
); );
thread_hdls.extend(tpu.thread_hdls());
let broadcast_stage = BroadcastStage::new( let broadcast_stage = BroadcastStage::new(
node.sockets.broadcast, node.sockets.broadcast,
@ -264,33 +306,51 @@ impl Fullnode {
blob_recycler.clone(), blob_recycler.clone(),
blob_receiver, blob_receiver,
); );
thread_hdls.extend(broadcast_stage.thread_hdls()); let leader_state = LeaderServices::new(tpu, broadcast_stage);
node_role = NodeRole::Leader(leader_state);
} }
} }
Fullnode { exit, thread_hdls } Fullnode {
rpu,
ncp,
rpc_service,
node_role,
exit,
}
} }
//used for notifying many nodes in parallel to exit //used for notifying many nodes in parallel to exit
pub fn exit(&self) { pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed); self.exit.store(true, Ordering::Relaxed);
} }
pub fn close(self) -> Result<()> {
pub fn close(self) -> Result<(Option<FullNodeReturnType>)> {
self.exit(); self.exit();
self.join() self.join()
} }
} }
impl Service for Fullnode { impl Service for Fullnode {
fn thread_hdls(self) -> Vec<JoinHandle<()>> { type JoinReturnType = Option<FullNodeReturnType>;
self.thread_hdls
}
fn join(self) -> Result<()> { fn join(self) -> Result<Option<FullNodeReturnType>> {
for thread_hdl in self.thread_hdls() { self.rpu.join()?;
thread_hdl.join()?; self.ncp.join()?;
self.rpc_service.join()?;
match self.node_role {
NodeRole::Validator(validator_service) => {
validator_service.join()?;
}
NodeRole::Leader(leader_service) => {
leader_service.join()?;
}
} }
Ok(())
// TODO: Case on join values above to determine if
// a leader rotation was in order, and propagate up a
// signal to reflect that
Ok(None)
} }
} }

View File

@ -66,12 +66,10 @@ impl Ncp {
} }
impl Service for Ncp { impl Service for Ncp {
fn thread_hdls(self) -> Vec<JoinHandle<()>> { type JoinReturnType = ();
self.thread_hdls
}
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() { for thread_hdl in self.thread_hdls {
thread_hdl.join()?; thread_hdl.join()?;
} }
Ok(()) Ok(())

View File

@ -127,9 +127,7 @@ impl RecordStage {
} }
impl Service for RecordStage { impl Service for RecordStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> { type JoinReturnType = ();
vec![self.thread_hdl]
}
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
self.thread_hdl.join() self.thread_hdl.join()

View File

@ -22,6 +22,7 @@ use vote_stage::VoteStage;
pub struct ReplicateStage { pub struct ReplicateStage {
thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
vote_stage: VoteStage,
} }
impl ReplicateStage { impl ReplicateStage {
@ -113,21 +114,23 @@ impl ReplicateStage {
}) })
.unwrap(); .unwrap();
let mut thread_hdls = vec![t_responder, t_replicate]; let thread_hdls = vec![t_responder, t_replicate];
thread_hdls.extend(vote_stage.thread_hdls());
ReplicateStage { thread_hdls } ReplicateStage {
thread_hdls,
vote_stage,
}
} }
} }
impl Service for ReplicateStage { impl Service for ReplicateStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> { type JoinReturnType = ();
self.thread_hdls
}
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() { for thread_hdl in self.thread_hdls {
thread_hdl.join()?; thread_hdl.join()?;
} }
self.vote_stage.join()?;
Ok(()) Ok(())
} }
} }

View File

@ -116,9 +116,7 @@ impl RequestStage {
} }
impl Service for RequestStage { impl Service for RequestStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> { type JoinReturnType = ();
vec![self.thread_hdl]
}
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
self.thread_hdl.join() self.thread_hdl.join()

View File

@ -113,12 +113,10 @@ impl RetransmitStage {
} }
impl Service for RetransmitStage { impl Service for RetransmitStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> { type JoinReturnType = ();
self.thread_hdls
}
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() { for thread_hdl in self.thread_hdls {
thread_hdl.join()?; thread_hdl.join()?;
} }
Ok(()) Ok(())

View File

@ -68,9 +68,7 @@ impl JsonRpcService {
} }
impl Service for JsonRpcService { impl Service for JsonRpcService {
fn thread_hdls(self) -> Vec<JoinHandle<()>> { type JoinReturnType = ();
vec![self.thread_hdl]
}
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
self.thread_hdl.join() self.thread_hdl.join()

View File

@ -36,6 +36,7 @@ use std::thread::{self, JoinHandle};
use streamer; use streamer;
pub struct Rpu { pub struct Rpu {
request_stage: RequestStage,
thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
} }
@ -71,21 +72,22 @@ impl Rpu {
blob_receiver, blob_receiver,
); );
let mut thread_hdls = vec![t_receiver, t_responder]; let thread_hdls = vec![t_receiver, t_responder];
thread_hdls.extend(request_stage.thread_hdls().into_iter()); Rpu {
Rpu { thread_hdls } thread_hdls,
request_stage,
}
} }
} }
impl Service for Rpu { impl Service for Rpu {
fn thread_hdls(self) -> Vec<JoinHandle<()>> { type JoinReturnType = ();
self.thread_hdls
}
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() { for thread_hdl in self.thread_hdls {
thread_hdl.join()?; thread_hdl.join()?;
} }
self.request_stage.join()?;
Ok(()) Ok(())
} }
} }

View File

@ -1,6 +1,7 @@
use std::thread::{JoinHandle, Result}; use std::thread::Result;
pub trait Service { pub trait Service {
fn thread_hdls(self) -> Vec<JoinHandle<()>>; type JoinReturnType;
fn join(self) -> Result<()>;
fn join(self) -> Result<Self::JoinReturnType>;
} }

View File

@ -126,12 +126,10 @@ impl SigVerifyStage {
} }
impl Service for SigVerifyStage { impl Service for SigVerifyStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> { type JoinReturnType = ();
self.thread_hdls
}
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() { for thread_hdl in self.thread_hdls {
thread_hdl.join()?; thread_hdl.join()?;
} }
Ok(()) Ok(())

View File

@ -37,7 +37,7 @@ use sigverify_stage::SigVerifyStage;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle}; use std::thread;
use std::time::Duration; use std::time::Duration;
use streamer::BlobReceiver; use streamer::BlobReceiver;
use write_stage::WriteStage; use write_stage::WriteStage;
@ -106,20 +106,15 @@ impl Tpu {
} }
impl Service for Tpu { impl Service for Tpu {
fn thread_hdls(self) -> Vec<JoinHandle<()>> { type JoinReturnType = ();
let mut thread_hdls = vec![];
thread_hdls.extend(self.fetch_stage.thread_hdls().into_iter());
thread_hdls.extend(self.sigverify_stage.thread_hdls().into_iter());
thread_hdls.extend(self.banking_stage.thread_hdls().into_iter());
thread_hdls.extend(self.record_stage.thread_hdls().into_iter());
thread_hdls.extend(self.write_stage.thread_hdls().into_iter());
thread_hdls
}
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() { self.fetch_stage.join()?;
thread_hdl.join()?; self.sigverify_stage.join()?;
} self.banking_stage.join()?;
self.record_stage.join()?;
self.write_stage.join()?;
Ok(()) Ok(())
} }
} }

View File

@ -47,7 +47,7 @@ use signature::Keypair;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle}; use std::thread;
use window::SharedWindow; use window::SharedWindow;
pub struct Tvu { pub struct Tvu {
@ -125,18 +125,13 @@ impl Tvu {
} }
impl Service for Tvu { impl Service for Tvu {
fn thread_hdls(self) -> Vec<JoinHandle<()>> { type JoinReturnType = ();
let mut thread_hdls = vec![];
thread_hdls.extend(self.replicate_stage.thread_hdls().into_iter());
thread_hdls.extend(self.fetch_stage.thread_hdls().into_iter());
thread_hdls.extend(self.retransmit_stage.thread_hdls().into_iter());
thread_hdls
}
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() { self.replicate_stage.join()?;
thread_hdl.join()?; self.fetch_stage.join()?;
} self.retransmit_stage.join()?;
Ok(()) Ok(())
} }
} }

View File

@ -213,9 +213,7 @@ impl VoteStage {
} }
impl Service for VoteStage { impl Service for VoteStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> { type JoinReturnType = ();
vec![self.thread_hdl]
}
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
self.thread_hdl.join()?; self.thread_hdl.join()?;

View File

@ -135,11 +135,10 @@ impl WriteStage {
} }
impl Service for WriteStage { impl Service for WriteStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> { type JoinReturnType = ();
self.thread_hdls
}
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() { for thread_hdl in self.thread_hdls {
thread_hdl.join()?; thread_hdl.join()?;
} }
Ok(()) Ok(())