Add Fullnode::run() to optionally manage node role transitions automatically

This commit is contained in:
Michael Vines 2019-02-01 18:09:38 -08:00
parent 5f565c92c9
commit f90d96367d
3 changed files with 68 additions and 50 deletions

View File

@ -17,6 +17,7 @@ use std::fs::File;
use std::io::{Error, ErrorKind, Result}; use std::io::{Error, ErrorKind, Result};
use std::net::{Ipv4Addr, SocketAddr}; use std::net::{Ipv4Addr, SocketAddr};
use std::process::exit; use std::process::exit;
use std::sync::mpsc::channel;
use std::sync::Arc; use std::sync::Arc;
use std::sync::RwLock; use std::sync::RwLock;
use std::thread::sleep; use std::thread::sleep;
@ -262,7 +263,7 @@ fn main() {
info!("New vote account ID is {:?}", vote_account_id); info!("New vote account ID is {:?}", vote_account_id);
let gossip_addr = node.info.gossip; let gossip_addr = node.info.gossip;
let mut fullnode = Fullnode::new( let fullnode = Fullnode::new(
node, node,
&keypair, &keypair,
ledger_path, ledger_path,
@ -274,6 +275,9 @@ fn main() {
&fullnode_config, &fullnode_config,
); );
let (rotation_sender, rotation_receiver) = channel();
fullnode.run(Some(rotation_sender));
if !no_signer { if !no_signer {
let leader_node_info = loop { let leader_node_info = loop {
info!("Looking for leader..."); info!("Looking for leader...");
@ -299,17 +303,9 @@ fn main() {
} }
info!("Node initialized"); info!("Node initialized");
loop { loop {
let status = fullnode.handle_role_transition(); info!(
match status { "Node rotation event: {:?}",
Ok(Some(transition)) => { rotation_receiver.recv().unwrap()
info!("role_transition complete: {:?}", transition);
}
_ => {
panic!(
"Fullnode TPU/TVU exited for some unexpected reason: {:?}",
status
); );
} }
};
}
} }

View File

@ -23,11 +23,9 @@ use solana_sdk::timing::{duration_as_ms, timestamp};
use std::net::UdpSocket; use std::net::UdpSocket;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::{channel, Receiver, Sender, SyncSender};
use std::sync::mpsc::{Receiver, Sender, SyncSender};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::sleep; use std::thread::{sleep, spawn, Result};
use std::thread::Result;
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
@ -67,6 +65,7 @@ impl NodeServices {
pub enum FullnodeReturnType { pub enum FullnodeReturnType {
LeaderToValidatorRotation, LeaderToValidatorRotation,
ValidatorToLeaderRotation, ValidatorToLeaderRotation,
LeaderToLeaderRotation,
} }
pub struct FullnodeConfig { pub struct FullnodeConfig {
@ -286,8 +285,8 @@ impl Fullnode {
} }
} }
pub fn leader_to_validator(&mut self, tick_height: u64) -> Result<()> { pub fn leader_to_validator(&mut self, tick_height: u64) -> FullnodeReturnType {
trace!("leader_to_validator"); trace!("leader_to_validator: tick_height={}", tick_height);
while self.bank.tick_height() < tick_height { while self.bank.tick_height() < tick_height {
sleep(Duration::from_millis(10)); sleep(Duration::from_millis(10));
@ -305,14 +304,11 @@ impl Fullnode {
.write() .write()
.unwrap() .unwrap()
.set_leader(scheduled_leader); .set_leader(scheduled_leader);
// In the rare case that the leader exited on a multiple of seed_rotation_interval
// when the new leader schedule was being generated, and there are no other validators
// in the active set, then the leader scheduler will pick the same leader again, so
// check for that
if scheduled_leader == self.id { if scheduled_leader == self.id {
let (last_entry_id, entry_height) = self.node_services.tvu.get_state(); let (last_entry_id, entry_height) = self.node_services.tvu.get_state();
self.validator_to_leader(tick_height, entry_height, last_entry_id); self.validator_to_leader(tick_height, entry_height, last_entry_id);
Ok(()) FullnodeReturnType::LeaderToLeaderRotation
} else { } else {
self.node_services.tpu.switch_to_forwarder( self.node_services.tpu.switch_to_forwarder(
self.tpu_sockets self.tpu_sockets
@ -321,7 +317,7 @@ impl Fullnode {
.collect(), .collect(),
self.cluster_info.clone(), self.cluster_info.clone(),
); );
Ok(()) FullnodeReturnType::LeaderToValidatorRotation
} }
} }
@ -357,22 +353,21 @@ impl Fullnode {
) )
} }
pub fn handle_role_transition(&mut self) -> Result<Option<FullnodeReturnType>> { pub fn handle_role_transition(&mut self) -> Option<FullnodeReturnType> {
loop { loop {
if self.exit.load(Ordering::Relaxed) { if self.exit.load(Ordering::Relaxed) {
return Ok(None); return None;
} }
let should_be_forwarder = self.role_notifiers.1.try_recv(); let should_be_forwarder = self.role_notifiers.1.try_recv();
let should_be_leader = self.role_notifiers.0.try_recv(); let should_be_leader = self.role_notifiers.0.try_recv();
match should_be_leader { match should_be_leader {
Ok(TvuReturnType::LeaderRotation(tick_height, entry_height, last_entry_id)) => { Ok(TvuReturnType::LeaderRotation(tick_height, entry_height, last_entry_id)) => {
self.validator_to_leader(tick_height, entry_height, last_entry_id); self.validator_to_leader(tick_height, entry_height, last_entry_id);
return Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation)); return Some(FullnodeReturnType::ValidatorToLeaderRotation);
} }
_ => match should_be_forwarder { _ => match should_be_forwarder {
Ok(TpuReturnType::LeaderRotation(tick_height)) => { Ok(TpuReturnType::LeaderRotation(tick_height)) => {
self.leader_to_validator(tick_height)?; return Some(self.leader_to_validator(tick_height))
return Ok(Some(FullnodeReturnType::LeaderToValidatorRotation));
} }
_ => { _ => {
continue; continue;
@ -382,6 +377,35 @@ impl Fullnode {
} }
} }
// Runs a thread to manage node role transitions. The returned closure can be used to signal the
// node to exit.
pub fn run(mut self, rotation_notifier: Option<Sender<FullnodeReturnType>>) -> impl FnOnce() {
let (sender, receiver) = channel();
let exit = self.exit.clone();
spawn(move || loop {
let status = self.handle_role_transition();
match status {
None => {
debug!("node shutdown requested");
self.close().expect("Unable to close node");
sender.send(true).expect("Unable to signal exit");
break;
}
Some(transition) => {
debug!("role_transition complete: {:?}", transition);
if let Some(ref rotation_notifier) = rotation_notifier {
rotation_notifier.send(transition).unwrap();
}
}
};
});
move || {
exit.store(true, Ordering::Relaxed);
receiver.recv().unwrap();
debug!("node shutdown complete");
}
}
// Used for notifying many nodes in parallel to exit // Used for notifying many nodes in parallel to exit
pub fn exit(&self) { pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed); self.exit.store(true, Ordering::Relaxed);
@ -597,7 +621,7 @@ mod tests {
let bootstrap_leader_keypair = Arc::new(bootstrap_leader_keypair); let bootstrap_leader_keypair = Arc::new(bootstrap_leader_keypair);
let voting_keypair = VotingKeypair::new_local(&bootstrap_leader_keypair); let voting_keypair = VotingKeypair::new_local(&bootstrap_leader_keypair);
// Start up the leader // Start up the leader
let mut bootstrap_leader = Fullnode::new( let bootstrap_leader = Fullnode::new(
bootstrap_leader_node, bootstrap_leader_node,
&bootstrap_leader_keypair, &bootstrap_leader_keypair,
&bootstrap_leader_ledger_path, &bootstrap_leader_ledger_path,
@ -607,16 +631,16 @@ mod tests {
&FullnodeConfig::default(), &FullnodeConfig::default(),
); );
// Wait for the leader to transition, ticks should cause the leader to let (rotation_sender, rotation_receiver) = channel();
// reach the height for leader rotation let bootstrap_leader_exit = bootstrap_leader.run(Some(rotation_sender));
match bootstrap_leader.handle_role_transition().unwrap() {
Some(FullnodeReturnType::LeaderToValidatorRotation) => (), // Wait for the bootstrap leader to transition. Since there are no other nodes in the
_ => { // cluster it will continue to be the leader
panic!("Expected a leader transition"); assert_eq!(
} rotation_receiver.recv().unwrap(),
} FullnodeReturnType::LeaderToLeaderRotation
assert!(bootstrap_leader.node_services.tpu.is_leader()); );
bootstrap_leader.close().unwrap(); bootstrap_leader_exit();
} }
#[test] #[test]
@ -860,7 +884,7 @@ mod tests {
// Release tvu bank lock, tvu should start making progress again and // Release tvu bank lock, tvu should start making progress again and
// handle_role_transition should successfully rotate the leader to a validator // handle_role_transition should successfully rotate the leader to a validator
assert_eq!( assert_eq!(
leader.handle_role_transition().unwrap().unwrap(), leader.handle_role_transition().unwrap(),
FullnodeReturnType::LeaderToValidatorRotation FullnodeReturnType::LeaderToValidatorRotation
); );
assert_eq!( assert_eq!(

View File

@ -1064,7 +1064,7 @@ fn test_leader_to_validator_transition() {
} }
// Wait for leader to shut down tpu and restart tvu // Wait for leader to shut down tpu and restart tvu
match leader.handle_role_transition().unwrap() { match leader.handle_role_transition() {
Some(FullnodeReturnType::LeaderToValidatorRotation) => (), Some(FullnodeReturnType::LeaderToValidatorRotation) => (),
_ => panic!("Expected reason for exit to be leader rotation"), _ => panic!("Expected reason for exit to be leader rotation"),
} }
@ -1201,13 +1201,13 @@ fn test_leader_validator_basic() {
} }
// Wait for validator to shut down tvu and restart tpu // Wait for validator to shut down tvu and restart tpu
match validator.handle_role_transition().unwrap() { match validator.handle_role_transition() {
Some(FullnodeReturnType::ValidatorToLeaderRotation) => (), Some(FullnodeReturnType::ValidatorToLeaderRotation) => (),
_ => panic!("Expected reason for exit to be leader rotation"), _ => panic!("Expected reason for exit to be leader rotation"),
} }
// Wait for the leader to shut down tpu and restart tvu // Wait for the leader to shut down tpu and restart tvu
match leader.handle_role_transition().unwrap() { match leader.handle_role_transition() {
Some(FullnodeReturnType::LeaderToValidatorRotation) => (), Some(FullnodeReturnType::LeaderToValidatorRotation) => (),
_ => panic!("Expected reason for exit to be leader rotation"), _ => panic!("Expected reason for exit to be leader rotation"),
} }
@ -1267,9 +1267,7 @@ fn run_node(id: Pubkey, mut fullnode: Fullnode, should_exit: Arc<AtomicBool>) ->
} }
Err(_) => match should_be_fwdr { Err(_) => match should_be_fwdr {
Ok(TpuReturnType::LeaderRotation(tick_height)) => { Ok(TpuReturnType::LeaderRotation(tick_height)) => {
fullnode fullnode.leader_to_validator(tick_height);
.leader_to_validator(tick_height)
.expect("failed when transitioning to validator");
} }
Err(_) => { Err(_) => {
sleep(Duration::new(1, 0)); sleep(Duration::new(1, 0));
@ -1394,7 +1392,7 @@ fn test_dropped_handoff_recovery() {
assert_eq!(num_converged, N); assert_eq!(num_converged, N);
info!("Wait for bootstrap_leader to transition to a validator",); info!("Wait for bootstrap_leader to transition to a validator",);
match nodes[0].handle_role_transition().unwrap() { match nodes[0].handle_role_transition() {
Some(FullnodeReturnType::LeaderToValidatorRotation) => (), Some(FullnodeReturnType::LeaderToValidatorRotation) => (),
_ => panic!("Expected reason for exit to be leader rotation"), _ => panic!("Expected reason for exit to be leader rotation"),
} }
@ -1416,7 +1414,7 @@ fn test_dropped_handoff_recovery() {
error!("TODO: FIX https://github.com/solana-labs/solana/issues/2482"); error!("TODO: FIX https://github.com/solana-labs/solana/issues/2482");
// TODO: Once fixed restore the commented out code below // TODO: Once fixed restore the commented out code below
/* /*
match next_leader.handle_role_transition().unwrap() { match next_leader.handle_role_transition() {
Some(FullnodeReturnType::ValidatorToLeaderRotation) => (), Some(FullnodeReturnType::ValidatorToLeaderRotation) => (),
_ => panic!("Expected reason for exit to be leader rotation"), _ => panic!("Expected reason for exit to be leader rotation"),
} }