added jointypes to the stages in the tpu involved in leader rotation

This commit is contained in:
Carl
2018-09-14 00:17:40 -07:00
committed by Greg Fitzgerald
parent 062f654fe0
commit 1fb1c0a681
4 changed files with 76 additions and 88 deletions

View File

@ -11,12 +11,12 @@ use clap::{App, Arg};
use solana::client::mk_client; use solana::client::mk_client;
use solana::crdt::Node; use solana::crdt::Node;
use solana::drone::DRONE_PORT; use solana::drone::DRONE_PORT;
use solana::fullnode::{Config, Fullnode}; use solana::fullnode::{Config, Fullnode, NodeRole};
use solana::logger; use solana::logger;
use solana::metrics::set_panic_hook; use solana::metrics::set_panic_hook;
use solana::service::Service;
use solana::signature::{Keypair, KeypairUtil}; use solana::signature::{Keypair, KeypairUtil};
use solana::thin_client::poll_gossip_for_leader; use solana::thin_client::poll_gossip_for_leader;
use solana::tpu::TpuReturnType;
use solana::wallet::request_airdrop; use solana::wallet::request_airdrop;
use std::fs::File; use std::fs::File;
use std::net::{Ipv4Addr, SocketAddr}; use std::net::{Ipv4Addr, SocketAddr};
@ -84,7 +84,7 @@ fn main() -> () {
let node_info = node.info.clone(); let node_info = node.info.clone();
let pubkey = keypair.pubkey(); let pubkey = keypair.pubkey();
let fullnode = Fullnode::new(node, ledger_path, keypair, network, false); let mut fullnode = Fullnode::new(node, ledger_path, keypair, network, false);
// airdrop stuff, probably goes away at some point // airdrop stuff, probably goes away at some point
let leader = match network { let leader = match network {
@ -124,18 +124,24 @@ fn main() -> () {
} }
} }
/*loop { loop {
match fullnode.node_role { let node_role = fullnode.node_role.take();
NodeRole::Leader(leader_services) => { match node_role {
// TODO: return an exit code that signals we should do a role switch Some(NodeRole::Leader(leader_services)) => {
leader_services.join(); match leader_services.join() {
Ok(Some(TpuReturnType::LeaderRotation)) => (),
//fullnode.start_tvu(); //fullnode.start_tvu();
}, Err(e) => {
NodeRole::Validator(validator_services) => { eprintln!("Leader returned error: {:?}", e);
validator_services.join(); exit(1);
}
_ => (),
}
}
Some(NodeRole::Validator(validator_services)) => {
let _ = validator_services.join();
}
_ => (),
} }
} }
}*/
let _ = fullnode.join();
} }

View File

@ -20,6 +20,12 @@ use std::time::{Duration, Instant};
use timing::duration_as_ms; use timing::duration_as_ms;
use window::{self, SharedWindow, WindowIndex, WindowUtil, WINDOW_SIZE}; use window::{self, SharedWindow, WindowIndex, WindowUtil, WINDOW_SIZE};
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum BroadcastStageReturnType {
LeaderRotation,
ChannelDisconnected,
}
fn broadcast( fn broadcast(
node_info: &NodeInfo, node_info: &NodeInfo,
broadcast_table: &[NodeInfo], broadcast_table: &[NodeInfo],
@ -143,7 +149,7 @@ fn broadcast(
} }
pub struct BroadcastStage { pub struct BroadcastStage {
thread_hdl: JoinHandle<()>, thread_hdl: JoinHandle<BroadcastStageReturnType>,
} }
impl BroadcastStage { impl BroadcastStage {
@ -154,7 +160,7 @@ impl BroadcastStage {
entry_height: u64, entry_height: u64,
recycler: &BlobRecycler, recycler: &BlobRecycler,
receiver: &Receiver<Vec<Entry>>, receiver: &Receiver<Vec<Entry>>,
) { ) -> BroadcastStageReturnType {
let mut transmit_index = WindowIndex { let mut transmit_index = WindowIndex {
data: entry_height, data: entry_height,
coding: entry_height, coding: entry_height,
@ -170,7 +176,7 @@ impl BroadcastStage {
// If the leader stays in power for the next // If the leader stays in power for the next
// round as well, then we don't exit. Otherwise, exit. // round as well, then we don't exit. Otherwise, exit.
_ => { _ => {
return; return BroadcastStageReturnType::LeaderRotation;
} }
} }
} }
@ -187,7 +193,9 @@ impl BroadcastStage {
&mut receive_index, &mut receive_index,
) { ) {
match e { match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
return BroadcastStageReturnType::ChannelDisconnected
}
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
_ => { _ => {
@ -218,17 +226,7 @@ impl BroadcastStage {
) -> Self { ) -> Self {
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string()) .name("solana-broadcaster".to_string())
.spawn(move || { .spawn(move || Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver))
Self::run(
&sock,
&crdt,
&window,
entry_height,
&recycler,
&receiver,
exit_sender,
);
})
.unwrap(); .unwrap();
BroadcastStage { thread_hdl } BroadcastStage { thread_hdl }
@ -236,32 +234,26 @@ impl BroadcastStage {
} }
impl Service for BroadcastStage { impl Service for BroadcastStage {
type JoinReturnType = (); type JoinReturnType = BroadcastStageReturnType;
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<BroadcastStageReturnType> {
self.thread_hdl.join()?; self.thread_hdl.join()
Ok(())
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use broadcast_stage::BroadcastStage; use broadcast_stage::{BroadcastStage, BroadcastStageReturnType};
use crdt::{Crdt, Node, LEADER_ROTATION_INTERVAL}; use crdt::{Crdt, Node, LEADER_ROTATION_INTERVAL};
use entry::Entry; use entry::Entry;
use hash::Hash;
use ledger::Block;
use mint::Mint; use mint::Mint;
use packet::BlobRecycler; use packet::BlobRecycler;
use recorder::Recorder; use recorder::Recorder;
use service::Service; use service::Service;
use signature::{Keypair, KeypairUtil, Pubkey}; use signature::{Keypair, KeypairUtil, Pubkey};
use std::cmp; use std::cmp;
use std::sync::mpsc::{channel, Receiver}; use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
use streamer::BlobSender;
use window::{new_window_from_entries, SharedWindow}; use window::{new_window_from_entries, SharedWindow};
fn setup_dummy_broadcast_stage() -> ( fn setup_dummy_broadcast_stage() -> (
@ -269,11 +261,9 @@ mod tests {
Pubkey, Pubkey,
BroadcastStage, BroadcastStage,
SharedWindow, SharedWindow,
BlobSender, Sender<Vec<Entry>>,
BlobRecycler,
Arc<RwLock<Crdt>>, Arc<RwLock<Crdt>>,
Vec<Entry>, Vec<Entry>,
Receiver<bool>,
) { ) {
// Setup dummy leader info // Setup dummy leader info
let leader_keypair = Keypair::new(); let leader_keypair = Keypair::new();
@ -302,8 +292,7 @@ mod tests {
let shared_window = Arc::new(RwLock::new(window)); let shared_window = Arc::new(RwLock::new(window));
let (blob_sender, blob_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let (exit_sender, exit_receiver) = channel();
// Start up the broadcast stage // Start up the broadcast stage
let broadcast_stage = BroadcastStage::new( let broadcast_stage = BroadcastStage::new(
@ -312,8 +301,7 @@ mod tests {
shared_window.clone(), shared_window.clone(),
entry_height, entry_height,
blob_recycler.clone(), blob_recycler.clone(),
blob_receiver, entry_receiver,
exit_sender,
); );
( (
@ -321,11 +309,9 @@ mod tests {
buddy_id, buddy_id,
broadcast_stage, broadcast_stage,
shared_window, shared_window,
blob_sender, entry_sender,
blob_recycler,
crdt, crdt,
entries, entries,
exit_receiver,
) )
} }
@ -342,17 +328,8 @@ mod tests {
#[test] #[test]
fn test_broadcast_stage_leader_rotation_exit() { fn test_broadcast_stage_leader_rotation_exit() {
let ( let (id, buddy_id, broadcast_stage, shared_window, entry_sender, crdt, entries) =
id, setup_dummy_broadcast_stage();
buddy_id,
broadcast_stage,
shared_window,
blob_sender,
blob_recycler,
crdt,
entries,
exit_receiver,
) = setup_dummy_broadcast_stage();
{ {
let mut wcrdt = crdt.write().unwrap(); let mut wcrdt = crdt.write().unwrap();
// Set leader to myself // Set leader to myself
@ -371,8 +348,7 @@ mod tests {
for _ in genesis_len..LEADER_ROTATION_INTERVAL { for _ in genesis_len..LEADER_ROTATION_INTERVAL {
let new_entry = recorder.record(vec![]); let new_entry = recorder.record(vec![]);
let blob = new_entry.to_blobs(&blob_recycler); entry_sender.send(new_entry).unwrap();
blob_sender.send(blob).unwrap();
} }
// Set the scheduled next leader in the crdt to the other buddy on the network // Set the scheduled next leader in the crdt to the other buddy on the network
@ -385,23 +361,22 @@ mod tests {
// it's no longer the leader after checking the crdt, and exit // it's no longer the leader after checking the crdt, and exit
for _ in 0..LEADER_ROTATION_INTERVAL { for _ in 0..LEADER_ROTATION_INTERVAL {
let new_entry = recorder.record(vec![]); let new_entry = recorder.record(vec![]);
let blob = new_entry.to_blobs(&blob_recycler); match entry_sender.send(new_entry) {
match blob_sender.send(blob) {
// We disconnected, break out of loop and check the results // We disconnected, break out of loop and check the results
Err(_) => break, Err(_) => break,
_ => (), _ => (),
}; };
} }
match exit_receiver.recv() {
Ok(x) if x == false => panic!("Unexpected value on exit channel for Broadcast stage"),
_ => (),
}
let highest_index = find_highest_window_index(&shared_window); let highest_index = find_highest_window_index(&shared_window);
// TODO: 2 * LEADER_ROTATION_INTERVAL - 1 due to the same bug in
// index_blobs() as mentioned above
assert_eq!(highest_index, 2 * LEADER_ROTATION_INTERVAL - 1); assert_eq!(highest_index, 2 * LEADER_ROTATION_INTERVAL - 1);
// Make sure the threads closed cleanly // Make sure the threads closed cleanly
broadcast_stage.join().unwrap(); assert_eq!(
broadcast_stage.join().unwrap(),
BroadcastStageReturnType::LeaderRotation
);
} }
} }

View File

@ -16,7 +16,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::Result; use std::thread::Result;
use tpu::Tpu; use tpu::{Tpu, TpuReturnType};
use tvu::Tvu; use tvu::Tvu;
use untrusted::Input; use untrusted::Input;
use window; use window;
@ -39,9 +39,9 @@ impl LeaderServices {
} }
} }
pub fn join(self) -> Result<()> { pub fn join(self) -> Result<Option<TpuReturnType>> {
self.tpu.join()?; self.broadcast_stage.join()?;
self.broadcast_stage.join() self.tpu.join()
} }
} }
@ -59,7 +59,7 @@ impl ValidatorServices {
} }
} }
pub enum FullNodeReturnType { pub enum FullnodeReturnType {
LeaderRotation, LeaderRotation,
} }
@ -327,16 +327,16 @@ impl Fullnode {
self.exit.store(true, Ordering::Relaxed); self.exit.store(true, Ordering::Relaxed);
} }
pub fn close(self) -> Result<(Option<FullNodeReturnType>)> { pub fn close(self) -> Result<(Option<FullnodeReturnType>)> {
self.exit(); self.exit();
self.join() self.join()
} }
} }
impl Service for Fullnode { impl Service for Fullnode {
type JoinReturnType = Option<FullNodeReturnType>; type JoinReturnType = Option<FullnodeReturnType>;
fn join(self) -> Result<Option<FullNodeReturnType>> { fn join(self) -> Result<Option<FullnodeReturnType>> {
self.rpu.join()?; self.rpu.join()?;
self.ncp.join()?; self.ncp.join()?;
self.rpc_service.join()?; self.rpc_service.join()?;
@ -346,7 +346,9 @@ impl Service for Fullnode {
validator_service.join()?; validator_service.join()?;
} }
Some(NodeRole::Leader(leader_service)) => { Some(NodeRole::Leader(leader_service)) => {
leader_service.join()?; if let Some(TpuReturnType::LeaderRotation) = leader_service.join()? {
return Ok(Some(FullnodeReturnType::LeaderRotation));
}
} }
_ => (), _ => (),
} }

View File

@ -41,7 +41,11 @@ use std::sync::mpsc::Receiver;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use write_stage::WriteStage; use write_stage::{WriteStage, WriteStageReturnType};
pub enum TpuReturnType {
LeaderRotation,
}
pub struct Tpu { pub struct Tpu {
fetch_stage: FetchStage, fetch_stage: FetchStage,
@ -103,22 +107,23 @@ impl Tpu {
(tpu, entry_forwarder) (tpu, entry_forwarder)
} }
pub fn close(self) -> thread::Result<()> { pub fn close(self) -> thread::Result<Option<TpuReturnType>> {
self.fetch_stage.close(); self.fetch_stage.close();
self.join() self.join()
} }
} }
impl Service for Tpu { impl Service for Tpu {
type JoinReturnType = (); type JoinReturnType = Option<TpuReturnType>;
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<(Option<TpuReturnType>)> {
self.fetch_stage.join()?; self.fetch_stage.join()?;
self.sigverify_stage.join()?; self.sigverify_stage.join()?;
self.banking_stage.join()?; self.banking_stage.join()?;
self.record_stage.join()?; self.record_stage.join()?;
self.write_stage.join()?; match self.write_stage.join()? {
WriteStageReturnType::LeaderRotation => Ok(Some(TpuReturnType::LeaderRotation)),
Ok(()) _ => Ok(None),
}
} }
} }