Merge leader_to_validator/validator_to_leader

This commit is contained in:
Michael Vines
2019-02-10 19:34:18 -08:00
parent 4ae1783b97
commit 095afdfe47
5 changed files with 99 additions and 165 deletions

View File

@ -13,8 +13,8 @@ use crate::rpc_pubsub::PubSubService;
use crate::service::Service; use crate::service::Service;
use crate::storage_stage::StorageState; use crate::storage_stage::StorageState;
use crate::streamer::BlobSender; use crate::streamer::BlobSender;
use crate::tpu::{Tpu, TpuReturnType, TpuRotationReceiver}; use crate::tpu::{Tpu, TpuRotationReceiver, TpuRotationSender};
use crate::tvu::{Sockets, Tvu, TvuReturnType, TvuRotationReceiver}; use crate::tvu::{Sockets, Tvu};
use crate::voting_keypair::VotingKeypair; use crate::voting_keypair::VotingKeypair;
use log::Level; use log::Level;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
@ -105,8 +105,8 @@ pub struct Fullnode {
tpu_sockets: Vec<UdpSocket>, tpu_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket, broadcast_socket: UdpSocket,
node_services: NodeServices, node_services: NodeServices,
to_leader_receiver: TvuRotationReceiver, rotation_sender: TpuRotationSender,
to_validator_receiver: TpuRotationReceiver, rotation_receiver: TpuRotationReceiver,
blob_sender: BlobSender, blob_sender: BlobSender,
} }
@ -250,9 +250,8 @@ impl Fullnode {
Some(Arc::new(voting_keypair)) Some(Arc::new(voting_keypair))
}; };
// Setup channels for rotation indications // Setup channel for rotation indications
let (to_leader_sender, to_leader_receiver) = channel(); let (rotation_sender, rotation_receiver) = channel();
let (to_validator_sender, to_validator_receiver) = channel();
let blob_index = Self::get_consumed_for_slot(&blocktree, slot_height); let blob_index = Self::get_consumed_for_slot(&blocktree, slot_height);
@ -266,7 +265,7 @@ impl Fullnode {
sockets, sockets,
blocktree.clone(), blocktree.clone(),
config.storage_rotate_count, config.storage_rotate_count,
to_leader_sender, &rotation_sender,
&storage_state, &storage_state,
config.entry_stream.as_ref(), config.entry_stream.as_ref(),
ledger_signal_sender, ledger_signal_sender,
@ -290,7 +289,7 @@ impl Fullnode {
blob_index, blob_index,
&last_entry_id, &last_entry_id,
id, id,
&to_validator_sender, &rotation_sender,
&blob_sender, &blob_sender,
scheduled_leader == id, scheduled_leader == id,
); );
@ -309,8 +308,8 @@ impl Fullnode {
exit, exit,
tpu_sockets: node.sockets.tpu, tpu_sockets: node.sockets.tpu,
broadcast_socket: node.sockets.broadcast, broadcast_socket: node.sockets.broadcast,
to_leader_receiver, rotation_sender,
to_validator_receiver, rotation_receiver,
blob_sender, blob_sender,
} }
} }
@ -360,21 +359,45 @@ impl Fullnode {
(scheduled_leader, max_tick_height) (scheduled_leader, max_tick_height)
} }
fn leader_to_validator(&mut self, tick_height: u64) -> FullnodeReturnType { fn rotate(&mut self, tick_height: u64) -> FullnodeReturnType {
trace!( trace!("{:?}: rotate at tick_height={}", self.id, tick_height,);
"leader_to_validator({:?}): tick_height={}", let was_leader = self.node_services.tpu.is_leader();
self.id,
tick_height,
);
let (scheduled_leader, _max_tick_height) = self.get_next_leader(tick_height);
let (scheduled_leader, max_tick_height) = self.get_next_leader(tick_height);
if scheduled_leader == self.id { if scheduled_leader == self.id {
debug!("node is still the leader"); let transition = if was_leader {
debug!("{:?} remaining in leader role", self.id);
FullnodeReturnType::LeaderToLeaderRotation
} else {
debug!("{:?} rotating to leader role", self.id);
FullnodeReturnType::ValidatorToLeaderRotation
};
let last_entry_id = self.bank.last_id(); let last_entry_id = self.bank.last_id();
self.validator_to_leader(tick_height, last_entry_id);
FullnodeReturnType::LeaderToLeaderRotation self.node_services.tpu.switch_to_leader(
&Arc::new(self.bank.copy_for_tpu()),
PohServiceConfig::default(),
self.tpu_sockets
.iter()
.map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
.collect(),
self.broadcast_socket
.try_clone()
.expect("Failed to clone broadcast socket"),
self.cluster_info.clone(),
self.sigverify_disabled,
max_tick_height,
0,
&last_entry_id,
self.id,
&self.rotation_sender,
&self.blob_sender,
);
transition
} else { } else {
debug!("{:?} rotating to validator role", self.id);
self.node_services.tpu.switch_to_forwarder( self.node_services.tpu.switch_to_forwarder(
self.tpu_sockets self.tpu_sockets
.iter() .iter()
@ -386,73 +409,6 @@ impl Fullnode {
} }
} }
pub fn validator_to_leader(&mut self, tick_height: u64, last_entry_id: Hash) {
trace!(
"validator_to_leader({:?}): tick_height={} last_entry_id={}",
self.id,
tick_height,
last_entry_id,
);
let (scheduled_leader, max_tick_height) = self.get_next_leader(tick_height);
assert_eq!(scheduled_leader, self.id, "node is not the leader");
let (to_validator_sender, to_validator_receiver) = channel();
self.to_validator_receiver = to_validator_receiver;
self.node_services.tpu.switch_to_leader(
&Arc::new(self.bank.copy_for_tpu()),
PohServiceConfig::default(),
self.tpu_sockets
.iter()
.map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
.collect(),
self.broadcast_socket
.try_clone()
.expect("Failed to clone broadcast socket"),
self.cluster_info.clone(),
self.sigverify_disabled,
max_tick_height,
0,
&last_entry_id,
self.id,
&to_validator_sender,
&self.blob_sender,
)
}
fn handle_role_transition(&mut self) -> Option<(FullnodeReturnType, u64)> {
let timeout = Duration::from_secs(1);
loop {
if self.exit.load(Ordering::Relaxed) {
return None;
}
if self.node_services.tpu.is_leader() {
let should_be_forwarder = self.to_validator_receiver.recv_timeout(timeout);
match should_be_forwarder {
Ok(TpuReturnType::LeaderRotation(tick_height)) => {
return Some((self.leader_to_validator(tick_height), tick_height + 1));
}
Err(RecvTimeoutError::Timeout) => continue,
_ => return None,
}
} else {
let should_be_leader = self.to_leader_receiver.recv_timeout(timeout);
match should_be_leader {
Ok(TvuReturnType::LeaderRotation(tick_height, last_entry_id)) => {
self.validator_to_leader(tick_height, last_entry_id);
return Some((
FullnodeReturnType::ValidatorToLeaderRotation,
tick_height + 1,
));
}
Err(RecvTimeoutError::Timeout) => continue,
_ => return None,
}
}
}
}
// Runs a thread to manage node role transitions. The returned closure can be used to signal the // Runs a thread to manage node role transitions. The returned closure can be used to signal the
// node to exit. // node to exit.
pub fn run( pub fn run(
@ -461,22 +417,28 @@ impl Fullnode {
) -> impl FnOnce() { ) -> impl FnOnce() {
let (sender, receiver) = channel(); let (sender, receiver) = channel();
let exit = self.exit.clone(); let exit = self.exit.clone();
let timeout = Duration::from_secs(1);
spawn(move || loop { spawn(move || loop {
let status = self.handle_role_transition(); if self.exit.load(Ordering::Relaxed) {
match status { debug!("node shutdown requested");
None => { self.close().expect("Unable to close node");
debug!("node shutdown requested"); sender.send(true).expect("Unable to signal exit");
self.close().expect("Unable to close node"); break;
sender.send(true).expect("Unable to signal exit"); }
break;
} match self.rotation_receiver.recv_timeout(timeout) {
Some(transition) => { Ok(tick_height) => {
debug!("role_transition complete: {:?}", transition); let transition = self.rotate(tick_height);
debug!("role transition complete: {:?}", transition);
if let Some(ref rotation_notifier) = rotation_notifier { if let Some(ref rotation_notifier) = rotation_notifier {
rotation_notifier.send(transition).unwrap(); rotation_notifier
.send((transition, tick_height + 1))
.unwrap();
} }
} }
}; Err(RecvTimeoutError::Timeout) => continue,
_ => (),
}
}); });
move || { move || {
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
@ -927,7 +889,7 @@ mod tests {
let voting_keypair = VotingKeypair::new_local(&leader_keypair); let voting_keypair = VotingKeypair::new_local(&leader_keypair);
info!("Start the bootstrap leader"); info!("Start the bootstrap leader");
let mut leader = Fullnode::new( let leader = Fullnode::new(
leader_node, leader_node,
&leader_keypair, &leader_keypair,
&leader_ledger_path, &leader_ledger_path,
@ -946,13 +908,13 @@ mod tests {
converge(&leader_node_info, 2); converge(&leader_node_info, 2);
info!("Wait for leader -> validator transition"); info!("Wait for leader -> validator transition");
let signal = leader let rotation_signal = leader
.to_validator_receiver .rotation_receiver
.recv() .recv()
.expect("signal for leader -> validator transition"); .expect("signal for leader -> validator transition");
let (rn_sender, rn_receiver) = channel(); debug!("received rotation signal: {:?}", rotation_signal);
rn_sender.send(signal).expect("send"); // Re-send the rotation signal, it'll be received again once the tvu is unpaused
leader.to_validator_receiver = rn_receiver; leader.rotation_sender.send(rotation_signal).expect("send");
info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)"); info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)");
{ {
@ -960,13 +922,13 @@ mod tests {
assert!(w_last_ids.tick_height < ticks_per_slot - 1); assert!(w_last_ids.tick_height < ticks_per_slot - 1);
} }
// Clear the blobs we've recieved so far. After this rotation, we should // Clear the blobs we've received so far. After this rotation, we should
// no longer receive blobs from slot 0 // no longer receive blobs from slot 0
while let Ok(_) = blob_fetch_receiver.try_recv() {} while let Ok(_) = blob_fetch_receiver.try_recv() {}
let leader_exit = leader.run(Some(rotation_sender)); let leader_exit = leader.run(Some(rotation_sender));
// Wait for leader_to_validator() function execution to trigger a leader to leader rotation // Wait for Tpu bank to progress while the Tvu bank is stuck
sleep(Duration::from_millis(1000)); sleep(Duration::from_millis(1000));
// Tvu bank lock is released here, so tvu should start making progress again and should signal a // Tvu bank lock is released here, so tvu should start making progress again and should signal a

View File

@ -5,7 +5,7 @@ use crate::poh_recorder::{PohRecorder, PohRecorderError};
use crate::result::Error; use crate::result::Error;
use crate::result::Result; use crate::result::Result;
use crate::service::Service; use crate::service::Service;
use crate::tpu::{TpuReturnType, TpuRotationSender}; use crate::tpu::TpuRotationSender;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::thread::sleep; use std::thread::sleep;
@ -92,8 +92,7 @@ impl PohService {
let res = poh.hash(); let res = poh.hash();
if let Err(e) = res { if let Err(e) = res {
if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e { if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e {
to_validator_sender to_validator_sender.send(max_tick_height)?;
.send(TpuReturnType::LeaderRotation(max_tick_height))?;
} }
return Err(e); return Err(e);
} }
@ -106,8 +105,7 @@ impl PohService {
let res = poh.tick(); let res = poh.tick();
if let Err(e) = res { if let Err(e) = res {
if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e { if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e {
// Leader rotation should only happen if a max_tick_height was specified to_validator_sender.send(max_tick_height)?;
to_validator_sender.send(TpuReturnType::LeaderRotation(max_tick_height))?;
} }
return Err(e); return Err(e);
} }

View File

@ -13,7 +13,7 @@ use crate::entry_stream::MockEntryStream as EntryStream;
use crate::packet::BlobError; use crate::packet::BlobError;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service; use crate::service::Service;
use crate::tvu::{TvuReturnType, TvuRotationSender}; use crate::tvu::TvuRotationSender;
use crate::voting_keypair::VotingKeypair; use crate::voting_keypair::VotingKeypair;
use log::Level; use log::Level;
use solana_metrics::{influxdb, submit}; use solana_metrics::{influxdb, submit};
@ -189,7 +189,7 @@ impl ReplayStage {
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
mut current_blob_index: u64, mut current_blob_index: u64,
last_entry_id: Arc<RwLock<Hash>>, last_entry_id: Arc<RwLock<Hash>>,
to_leader_sender: TvuRotationSender, to_leader_sender: &TvuRotationSender,
entry_stream: Option<&String>, entry_stream: Option<&String>,
ledger_signal_sender: SyncSender<bool>, ledger_signal_sender: SyncSender<bool>,
ledger_signal_receiver: Receiver<bool>, ledger_signal_receiver: Receiver<bool>,
@ -203,6 +203,7 @@ impl ReplayStage {
(pause, pause_) (pause, pause_)
}; };
let exit_ = exit.clone(); let exit_ = exit.clone();
let to_leader_sender = to_leader_sender.clone();
let t_replay = Builder::new() let t_replay = Builder::new()
.name("solana-replay-stage".to_string()) .name("solana-replay-stage".to_string())
.spawn(move || { .spawn(move || {
@ -295,12 +296,7 @@ impl ReplayStage {
cluster_info.write().unwrap().set_leader(leader_id); cluster_info.write().unwrap().set_leader(leader_id);
if leader_id != last_leader_id && my_id == leader_id { if leader_id != last_leader_id && my_id == leader_id {
to_leader_sender to_leader_sender.send(current_tick_height).unwrap();
.send(TvuReturnType::LeaderRotation(
current_tick_height,
*last_entry_id.read().unwrap(),
))
.unwrap();
} }
// Check for any slots that chain to this one // Check for any slots that chain to this one
@ -386,7 +382,6 @@ mod test {
use crate::genesis_block::GenesisBlock; use crate::genesis_block::GenesisBlock;
use crate::leader_scheduler::{make_active_set_entries, LeaderSchedulerConfig}; use crate::leader_scheduler::{make_active_set_entries, LeaderSchedulerConfig};
use crate::replay_stage::ReplayStage; use crate::replay_stage::ReplayStage;
use crate::tvu::TvuReturnType;
use crate::voting_keypair::VotingKeypair; use crate::voting_keypair::VotingKeypair;
use chrono::{DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset};
use serde_json::Value; use serde_json::Value;
@ -472,7 +467,7 @@ mod test {
exit.clone(), exit.clone(),
meta.consumed, meta.consumed,
Arc::new(RwLock::new(last_entry_id)), Arc::new(RwLock::new(last_entry_id)),
rotation_sender, &rotation_sender,
None, None,
l_sender, l_sender,
l_receiver, l_receiver,
@ -486,8 +481,6 @@ mod test {
entries_to_send.push(entry); entries_to_send.push(entry);
} }
let expected_last_id = entries_to_send.last().unwrap().id;
// Write the entries to the ledger, replay_stage should get notified of changes // Write the entries to the ledger, replay_stage should get notified of changes
blocktree blocktree
.write_entries(DEFAULT_SLOT_HEIGHT, meta.consumed, &entries_to_send) .write_entries(DEFAULT_SLOT_HEIGHT, meta.consumed, &entries_to_send)
@ -495,17 +488,10 @@ mod test {
info!("Wait for replay_stage to exit and check return value is correct"); info!("Wait for replay_stage to exit and check return value is correct");
assert_eq!( assert_eq!(
Some(TvuReturnType::LeaderRotation( 2 * ticks_per_slot - 1,
2 * ticks_per_slot - 1, rotation_receiver
expected_last_id, .recv()
)), .expect("should have signaled leader rotation"),
{
Some(
rotation_receiver
.recv()
.expect("should have signaled leader rotation"),
)
}
); );
info!("Check that the entries on the ledger writer channel are correct"); info!("Check that the entries on the ledger writer channel are correct");
@ -575,7 +561,7 @@ mod test {
exit.clone(), exit.clone(),
entry_height, entry_height,
Arc::new(RwLock::new(last_entry_id)), Arc::new(RwLock::new(last_entry_id)),
to_leader_sender, &to_leader_sender,
None, None,
l_sender, l_sender,
l_receiver, l_receiver,
@ -689,7 +675,7 @@ mod test {
exit.clone(), exit.clone(),
meta.consumed, meta.consumed,
Arc::new(RwLock::new(last_entry_id)), Arc::new(RwLock::new(last_entry_id)),
rotation_tx, &rotation_tx,
None, None,
l_sender, l_sender,
l_receiver, l_receiver,
@ -729,18 +715,12 @@ mod test {
// Wait for replay_stage to exit and check return value is correct // Wait for replay_stage to exit and check return value is correct
assert_eq!( assert_eq!(
Some(TvuReturnType::LeaderRotation( active_window_tick_length,
active_window_tick_length, rotation_rx
expected_last_id, .recv()
)), .expect("should have signaled leader rotation")
{
Some(
rotation_rx
.recv()
.expect("should have signaled leader rotation"),
)
}
); );
assert_ne!(expected_last_id, Hash::default()); assert_ne!(expected_last_id, Hash::default());
//replay stage should continue running even after rotation has happened (tvu never goes down) //replay stage should continue running even after rotation has happened (tvu never goes down)
replay_stage replay_stage

View File

@ -20,10 +20,7 @@ use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread; use std::thread;
pub enum TpuReturnType { pub type TpuReturnType = u64; // tick_height to initiate a rotation
LeaderRotation(u64),
}
pub type TpuRotationSender = Sender<TpuReturnType>; pub type TpuRotationSender = Sender<TpuReturnType>;
pub type TpuRotationReceiver = Receiver<TpuReturnType>; pub type TpuRotationReceiver = Receiver<TpuReturnType>;

View File

@ -21,22 +21,19 @@ use crate::retransmit_stage::RetransmitStage;
use crate::service::Service; use crate::service::Service;
use crate::storage_stage::{StorageStage, StorageState}; use crate::storage_stage::{StorageStage, StorageState};
use crate::streamer::BlobSender; use crate::streamer::BlobSender;
use crate::tpu::{TpuReturnType, TpuRotationReceiver, TpuRotationSender};
use crate::voting_keypair::VotingKeypair; use crate::voting_keypair::VotingKeypair;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender, SyncSender}; use std::sync::mpsc::{channel, Receiver, SyncSender};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread; use std::thread;
#[derive(Debug, PartialEq, Eq, Clone)] pub type TvuReturnType = TpuReturnType;
pub enum TvuReturnType { pub type TvuRotationSender = TpuRotationSender;
LeaderRotation(u64, Hash), pub type TvuRotationReceiver = TpuRotationReceiver;
}
pub type TvuRotationSender = Sender<TvuReturnType>;
pub type TvuRotationReceiver = Receiver<TvuReturnType>;
pub struct Tvu { pub struct Tvu {
fetch_stage: BlobFetchStage, fetch_stage: BlobFetchStage,
@ -75,7 +72,7 @@ impl Tvu {
sockets: Sockets, sockets: Sockets,
blocktree: Arc<Blocktree>, blocktree: Arc<Blocktree>,
storage_rotate_count: u64, storage_rotate_count: u64,
to_leader_sender: TvuRotationSender, to_leader_sender: &TvuRotationSender,
storage_state: &StorageState, storage_state: &StorageState,
entry_stream: Option<&String>, entry_stream: Option<&String>,
ledger_signal_sender: SyncSender<bool>, ledger_signal_sender: SyncSender<bool>,
@ -261,7 +258,7 @@ pub mod tests {
}, },
Arc::new(blocktree), Arc::new(blocktree),
STORAGE_ROTATE_TEST_COUNT, STORAGE_ROTATE_TEST_COUNT,
sender, &sender,
&StorageState::default(), &StorageState::default(),
None, None,
l_sender, l_sender,
@ -348,7 +345,7 @@ pub mod tests {
}, },
Arc::new(blocktree), Arc::new(blocktree),
STORAGE_ROTATE_TEST_COUNT, STORAGE_ROTATE_TEST_COUNT,
sender, &sender,
&StorageState::default(), &StorageState::default(),
None, None,
l_sender, l_sender,