From 1fb1c0a681e48495827f02b7eaa5ebf5ff7f650d Mon Sep 17 00:00:00 2001 From: Carl Date: Fri, 14 Sep 2018 00:17:40 -0700 Subject: [PATCH] added jointypes to the stages in the tpu involved in leader rotation --- src/bin/fullnode.rs | 36 ++++++++++------- src/broadcast_stage.rs | 89 +++++++++++++++--------------------------- src/fullnode.rs | 20 +++++----- src/tpu.rs | 19 +++++---- 4 files changed, 76 insertions(+), 88 deletions(-) diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index b6dd80820c..4c93e2ec4e 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -11,12 +11,12 @@ use clap::{App, Arg}; use solana::client::mk_client; use solana::crdt::Node; use solana::drone::DRONE_PORT; -use solana::fullnode::{Config, Fullnode}; +use solana::fullnode::{Config, Fullnode, NodeRole}; use solana::logger; use solana::metrics::set_panic_hook; -use solana::service::Service; use solana::signature::{Keypair, KeypairUtil}; use solana::thin_client::poll_gossip_for_leader; +use solana::tpu::TpuReturnType; use solana::wallet::request_airdrop; use std::fs::File; use std::net::{Ipv4Addr, SocketAddr}; @@ -84,7 +84,7 @@ fn main() -> () { let node_info = node.info.clone(); let pubkey = keypair.pubkey(); - let fullnode = Fullnode::new(node, ledger_path, keypair, network, false); + let mut fullnode = Fullnode::new(node, ledger_path, keypair, network, false); // airdrop stuff, probably goes away at some point let leader = match network { @@ -124,18 +124,24 @@ fn main() -> () { } } - /*loop { - match fullnode.node_role { - NodeRole::Leader(leader_services) => { - // TODO: return an exit code that signals we should do a role switch - leader_services.join(); - //fullnode.start_tvu(); - }, - NodeRole::Validator(validator_services) => { - validator_services.join(); + loop { + let node_role = fullnode.node_role.take(); + match node_role { + Some(NodeRole::Leader(leader_services)) => { + match leader_services.join() { + Ok(Some(TpuReturnType::LeaderRotation)) => (), + //fullnode.start_tvu(); + Err(e) => { + eprintln!("Leader returned error: {:?}", e); + exit(1); + } + _ => (), + } } + Some(NodeRole::Validator(validator_services)) => { + let _ = validator_services.join(); + } + _ => (), } - }*/ - - let _ = fullnode.join(); + } } diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index f5ccdee9d9..98817a8faf 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -20,6 +20,12 @@ use std::time::{Duration, Instant}; use timing::duration_as_ms; use window::{self, SharedWindow, WindowIndex, WindowUtil, WINDOW_SIZE}; +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum BroadcastStageReturnType { + LeaderRotation, + ChannelDisconnected, +} + fn broadcast( node_info: &NodeInfo, broadcast_table: &[NodeInfo], @@ -143,7 +149,7 @@ fn broadcast( } pub struct BroadcastStage { - thread_hdl: JoinHandle<()>, + thread_hdl: JoinHandle, } impl BroadcastStage { @@ -154,7 +160,7 @@ impl BroadcastStage { entry_height: u64, recycler: &BlobRecycler, receiver: &Receiver>, - ) { + ) -> BroadcastStageReturnType { let mut transmit_index = WindowIndex { data: entry_height, coding: entry_height, @@ -170,7 +176,7 @@ impl BroadcastStage { // If the leader stays in power for the next // round as well, then we don't exit. Otherwise, exit. _ => { - return; + return BroadcastStageReturnType::LeaderRotation; } } } @@ -187,7 +193,9 @@ impl BroadcastStage { &mut receive_index, ) { match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { + return BroadcastStageReturnType::ChannelDisconnected + } Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? _ => { @@ -218,17 +226,7 @@ impl BroadcastStage { ) -> Self { let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) - .spawn(move || { - Self::run( - &sock, - &crdt, - &window, - entry_height, - &recycler, - &receiver, - exit_sender, - ); - }) + .spawn(move || Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver)) .unwrap(); BroadcastStage { thread_hdl } @@ -236,32 +234,26 @@ impl BroadcastStage { } impl Service for BroadcastStage { - type JoinReturnType = (); + type JoinReturnType = BroadcastStageReturnType; - fn join(self) -> thread::Result<()> { - self.thread_hdl.join()?; - Ok(()) + fn join(self) -> thread::Result { + self.thread_hdl.join() } } #[cfg(test)] mod tests { - use broadcast_stage::BroadcastStage; + use broadcast_stage::{BroadcastStage, BroadcastStageReturnType}; use crdt::{Crdt, Node, LEADER_ROTATION_INTERVAL}; use entry::Entry; - use hash::Hash; - use ledger::Block; use mint::Mint; use packet::BlobRecycler; use recorder::Recorder; use service::Service; use signature::{Keypair, KeypairUtil, Pubkey}; use std::cmp; - use std::sync::mpsc::{channel, Receiver}; + use std::sync::mpsc::{channel, Sender}; use std::sync::{Arc, RwLock}; - use std::thread::sleep; - use std::time::Duration; - use streamer::BlobSender; use window::{new_window_from_entries, SharedWindow}; fn setup_dummy_broadcast_stage() -> ( @@ -269,11 +261,9 @@ mod tests { Pubkey, BroadcastStage, SharedWindow, - BlobSender, - BlobRecycler, + Sender>, Arc>, Vec, - Receiver, ) { // Setup dummy leader info let leader_keypair = Keypair::new(); @@ -302,8 +292,7 @@ mod tests { let shared_window = Arc::new(RwLock::new(window)); - let (blob_sender, blob_receiver) = channel(); - let (exit_sender, exit_receiver) = channel(); + let (entry_sender, entry_receiver) = channel(); // Start up the broadcast stage let broadcast_stage = BroadcastStage::new( @@ -312,8 +301,7 @@ mod tests { shared_window.clone(), entry_height, blob_recycler.clone(), - blob_receiver, - exit_sender, + entry_receiver, ); ( @@ -321,11 +309,9 @@ mod tests { buddy_id, broadcast_stage, shared_window, - blob_sender, - blob_recycler, + entry_sender, crdt, entries, - exit_receiver, ) } @@ -342,17 +328,8 @@ mod tests { #[test] fn test_broadcast_stage_leader_rotation_exit() { - let ( - id, - buddy_id, - broadcast_stage, - shared_window, - blob_sender, - blob_recycler, - crdt, - entries, - exit_receiver, - ) = setup_dummy_broadcast_stage(); + let (id, buddy_id, broadcast_stage, shared_window, entry_sender, crdt, entries) = + setup_dummy_broadcast_stage(); { let mut wcrdt = crdt.write().unwrap(); // Set leader to myself @@ -371,8 +348,7 @@ mod tests { for _ in genesis_len..LEADER_ROTATION_INTERVAL { let new_entry = recorder.record(vec![]); - let blob = new_entry.to_blobs(&blob_recycler); - blob_sender.send(blob).unwrap(); + entry_sender.send(new_entry).unwrap(); } // Set the scheduled next leader in the crdt to the other buddy on the network @@ -385,23 +361,22 @@ mod tests { // it's no longer the leader after checking the crdt, and exit for _ in 0..LEADER_ROTATION_INTERVAL { let new_entry = recorder.record(vec![]); - let blob = new_entry.to_blobs(&blob_recycler); - match blob_sender.send(blob) { + match entry_sender.send(new_entry) { // We disconnected, break out of loop and check the results Err(_) => break, _ => (), }; } - match exit_receiver.recv() { - Ok(x) if x == false => panic!("Unexpected value on exit channel for Broadcast stage"), - _ => (), - } - let highest_index = find_highest_window_index(&shared_window); + // TODO: 2 * LEADER_ROTATION_INTERVAL - 1 due to the same bug in + // index_blobs() as mentioned above assert_eq!(highest_index, 2 * LEADER_ROTATION_INTERVAL - 1); // Make sure the threads closed cleanly - broadcast_stage.join().unwrap(); + assert_eq!( + broadcast_stage.join().unwrap(), + BroadcastStageReturnType::LeaderRotation + ); } } diff --git a/src/fullnode.rs b/src/fullnode.rs index 38d6e4f21d..4774e615bf 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -16,7 +16,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::Result; -use tpu::Tpu; +use tpu::{Tpu, TpuReturnType}; use tvu::Tvu; use untrusted::Input; use window; @@ -39,9 +39,9 @@ impl LeaderServices { } } - pub fn join(self) -> Result<()> { - self.tpu.join()?; - self.broadcast_stage.join() + pub fn join(self) -> Result> { + self.broadcast_stage.join()?; + self.tpu.join() } } @@ -59,7 +59,7 @@ impl ValidatorServices { } } -pub enum FullNodeReturnType { +pub enum FullnodeReturnType { LeaderRotation, } @@ -327,16 +327,16 @@ impl Fullnode { self.exit.store(true, Ordering::Relaxed); } - pub fn close(self) -> Result<(Option)> { + pub fn close(self) -> Result<(Option)> { self.exit(); self.join() } } impl Service for Fullnode { - type JoinReturnType = Option; + type JoinReturnType = Option; - fn join(self) -> Result> { + fn join(self) -> Result> { self.rpu.join()?; self.ncp.join()?; self.rpc_service.join()?; @@ -346,7 +346,9 @@ impl Service for Fullnode { validator_service.join()?; } Some(NodeRole::Leader(leader_service)) => { - leader_service.join()?; + if let Some(TpuReturnType::LeaderRotation) = leader_service.join()? { + return Ok(Some(FullnodeReturnType::LeaderRotation)); + } } _ => (), } diff --git a/src/tpu.rs b/src/tpu.rs index 122b56a7aa..32ae4ad6ee 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -41,7 +41,11 @@ use std::sync::mpsc::Receiver; use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; -use write_stage::WriteStage; +use write_stage::{WriteStage, WriteStageReturnType}; + +pub enum TpuReturnType { + LeaderRotation, +} pub struct Tpu { fetch_stage: FetchStage, @@ -103,22 +107,23 @@ impl Tpu { (tpu, entry_forwarder) } - pub fn close(self) -> thread::Result<()> { + pub fn close(self) -> thread::Result> { self.fetch_stage.close(); self.join() } } impl Service for Tpu { - type JoinReturnType = (); + type JoinReturnType = Option; - fn join(self) -> thread::Result<()> { + fn join(self) -> thread::Result<(Option)> { self.fetch_stage.join()?; self.sigverify_stage.join()?; self.banking_stage.join()?; self.record_stage.join()?; - self.write_stage.join()?; - - Ok(()) + match self.write_stage.join()? { + WriteStageReturnType::LeaderRotation => Ok(Some(TpuReturnType::LeaderRotation)), + _ => Ok(None), + } } }