From f90d96367d858335ad576f81d420445f77a6dd2c Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 1 Feb 2019 18:09:38 -0800 Subject: [PATCH] Add Fullnode::run() to optionally manage node role transitions automatically --- fullnode/src/main.rs | 22 +++++------- src/fullnode.rs | 82 ++++++++++++++++++++++++++++---------------- tests/multinode.rs | 14 ++++---- 3 files changed, 68 insertions(+), 50 deletions(-) diff --git a/fullnode/src/main.rs b/fullnode/src/main.rs index 64a1867bf9..80b1cc6c0a 100644 --- a/fullnode/src/main.rs +++ b/fullnode/src/main.rs @@ -17,6 +17,7 @@ use std::fs::File; use std::io::{Error, ErrorKind, Result}; use std::net::{Ipv4Addr, SocketAddr}; use std::process::exit; +use std::sync::mpsc::channel; use std::sync::Arc; use std::sync::RwLock; use std::thread::sleep; @@ -262,7 +263,7 @@ fn main() { info!("New vote account ID is {:?}", vote_account_id); let gossip_addr = node.info.gossip; - let mut fullnode = Fullnode::new( + let fullnode = Fullnode::new( node, &keypair, ledger_path, @@ -274,6 +275,9 @@ fn main() { &fullnode_config, ); + let (rotation_sender, rotation_receiver) = channel(); + fullnode.run(Some(rotation_sender)); + if !no_signer { let leader_node_info = loop { info!("Looking for leader..."); @@ -299,17 +303,9 @@ fn main() { } info!("Node initialized"); loop { - let status = fullnode.handle_role_transition(); - match status { - Ok(Some(transition)) => { - info!("role_transition complete: {:?}", transition); - } - _ => { - panic!( - "Fullnode TPU/TVU exited for some unexpected reason: {:?}", - status - ); - } - }; + info!( + "Node rotation event: {:?}", + rotation_receiver.recv().unwrap() + ); } } diff --git a/src/fullnode.rs b/src/fullnode.rs index 0be4e59fec..e15aefad0b 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -23,11 +23,9 @@ use solana_sdk::timing::{duration_as_ms, timestamp}; use std::net::UdpSocket; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::mpsc::channel; -use std::sync::mpsc::{Receiver, Sender, SyncSender}; +use std::sync::mpsc::{channel, Receiver, Sender, SyncSender}; use std::sync::{Arc, RwLock}; -use std::thread::sleep; -use std::thread::Result; +use std::thread::{sleep, spawn, Result}; use std::time::Duration; use std::time::Instant; @@ -67,6 +65,7 @@ impl NodeServices { pub enum FullnodeReturnType { LeaderToValidatorRotation, ValidatorToLeaderRotation, + LeaderToLeaderRotation, } pub struct FullnodeConfig { @@ -286,8 +285,8 @@ impl Fullnode { } } - pub fn leader_to_validator(&mut self, tick_height: u64) -> Result<()> { - trace!("leader_to_validator"); + pub fn leader_to_validator(&mut self, tick_height: u64) -> FullnodeReturnType { + trace!("leader_to_validator: tick_height={}", tick_height); while self.bank.tick_height() < tick_height { sleep(Duration::from_millis(10)); @@ -305,14 +304,11 @@ impl Fullnode { .write() .unwrap() .set_leader(scheduled_leader); - // In the rare case that the leader exited on a multiple of seed_rotation_interval - // when the new leader schedule was being generated, and there are no other validators - // in the active set, then the leader scheduler will pick the same leader again, so - // check for that + if scheduled_leader == self.id { let (last_entry_id, entry_height) = self.node_services.tvu.get_state(); self.validator_to_leader(tick_height, entry_height, last_entry_id); - Ok(()) + FullnodeReturnType::LeaderToLeaderRotation } else { self.node_services.tpu.switch_to_forwarder( self.tpu_sockets @@ -321,7 +317,7 @@ impl Fullnode { .collect(), self.cluster_info.clone(), ); - Ok(()) + FullnodeReturnType::LeaderToValidatorRotation } } @@ -357,22 +353,21 @@ impl Fullnode { ) } - pub fn handle_role_transition(&mut self) -> Result> { + pub fn handle_role_transition(&mut self) -> Option { loop { if self.exit.load(Ordering::Relaxed) { - return Ok(None); + return None; } let should_be_forwarder = self.role_notifiers.1.try_recv(); let should_be_leader = self.role_notifiers.0.try_recv(); match should_be_leader { Ok(TvuReturnType::LeaderRotation(tick_height, entry_height, last_entry_id)) => { self.validator_to_leader(tick_height, entry_height, last_entry_id); - return Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation)); + return Some(FullnodeReturnType::ValidatorToLeaderRotation); } _ => match should_be_forwarder { Ok(TpuReturnType::LeaderRotation(tick_height)) => { - self.leader_to_validator(tick_height)?; - return Ok(Some(FullnodeReturnType::LeaderToValidatorRotation)); + return Some(self.leader_to_validator(tick_height)) } _ => { continue; @@ -382,6 +377,35 @@ impl Fullnode { } } + // Runs a thread to manage node role transitions. The returned closure can be used to signal the + // node to exit. + pub fn run(mut self, rotation_notifier: Option>) -> impl FnOnce() { + let (sender, receiver) = channel(); + let exit = self.exit.clone(); + spawn(move || loop { + let status = self.handle_role_transition(); + match status { + None => { + debug!("node shutdown requested"); + self.close().expect("Unable to close node"); + sender.send(true).expect("Unable to signal exit"); + break; + } + Some(transition) => { + debug!("role_transition complete: {:?}", transition); + if let Some(ref rotation_notifier) = rotation_notifier { + rotation_notifier.send(transition).unwrap(); + } + } + }; + }); + move || { + exit.store(true, Ordering::Relaxed); + receiver.recv().unwrap(); + debug!("node shutdown complete"); + } + } + // Used for notifying many nodes in parallel to exit pub fn exit(&self) { self.exit.store(true, Ordering::Relaxed); @@ -597,7 +621,7 @@ mod tests { let bootstrap_leader_keypair = Arc::new(bootstrap_leader_keypair); let voting_keypair = VotingKeypair::new_local(&bootstrap_leader_keypair); // Start up the leader - let mut bootstrap_leader = Fullnode::new( + let bootstrap_leader = Fullnode::new( bootstrap_leader_node, &bootstrap_leader_keypair, &bootstrap_leader_ledger_path, @@ -607,16 +631,16 @@ mod tests { &FullnodeConfig::default(), ); - // Wait for the leader to transition, ticks should cause the leader to - // reach the height for leader rotation - match bootstrap_leader.handle_role_transition().unwrap() { - Some(FullnodeReturnType::LeaderToValidatorRotation) => (), - _ => { - panic!("Expected a leader transition"); - } - } - assert!(bootstrap_leader.node_services.tpu.is_leader()); - bootstrap_leader.close().unwrap(); + let (rotation_sender, rotation_receiver) = channel(); + let bootstrap_leader_exit = bootstrap_leader.run(Some(rotation_sender)); + + // Wait for the bootstrap leader to transition. Since there are no other nodes in the + // cluster it will continue to be the leader + assert_eq!( + rotation_receiver.recv().unwrap(), + FullnodeReturnType::LeaderToLeaderRotation + ); + bootstrap_leader_exit(); } #[test] @@ -860,7 +884,7 @@ mod tests { // Release tvu bank lock, tvu should start making progress again and // handle_role_transition should successfully rotate the leader to a validator assert_eq!( - leader.handle_role_transition().unwrap().unwrap(), + leader.handle_role_transition().unwrap(), FullnodeReturnType::LeaderToValidatorRotation ); assert_eq!( diff --git a/tests/multinode.rs b/tests/multinode.rs index b4f4c3a544..e070b801a8 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -1064,7 +1064,7 @@ fn test_leader_to_validator_transition() { } // Wait for leader to shut down tpu and restart tvu - match leader.handle_role_transition().unwrap() { + match leader.handle_role_transition() { Some(FullnodeReturnType::LeaderToValidatorRotation) => (), _ => panic!("Expected reason for exit to be leader rotation"), } @@ -1201,13 +1201,13 @@ fn test_leader_validator_basic() { } // Wait for validator to shut down tvu and restart tpu - match validator.handle_role_transition().unwrap() { + match validator.handle_role_transition() { Some(FullnodeReturnType::ValidatorToLeaderRotation) => (), _ => panic!("Expected reason for exit to be leader rotation"), } // Wait for the leader to shut down tpu and restart tvu - match leader.handle_role_transition().unwrap() { + match leader.handle_role_transition() { Some(FullnodeReturnType::LeaderToValidatorRotation) => (), _ => panic!("Expected reason for exit to be leader rotation"), } @@ -1267,9 +1267,7 @@ fn run_node(id: Pubkey, mut fullnode: Fullnode, should_exit: Arc) -> } Err(_) => match should_be_fwdr { Ok(TpuReturnType::LeaderRotation(tick_height)) => { - fullnode - .leader_to_validator(tick_height) - .expect("failed when transitioning to validator"); + fullnode.leader_to_validator(tick_height); } Err(_) => { sleep(Duration::new(1, 0)); @@ -1394,7 +1392,7 @@ fn test_dropped_handoff_recovery() { assert_eq!(num_converged, N); info!("Wait for bootstrap_leader to transition to a validator",); - match nodes[0].handle_role_transition().unwrap() { + match nodes[0].handle_role_transition() { Some(FullnodeReturnType::LeaderToValidatorRotation) => (), _ => panic!("Expected reason for exit to be leader rotation"), } @@ -1416,7 +1414,7 @@ fn test_dropped_handoff_recovery() { error!("TODO: FIX https://github.com/solana-labs/solana/issues/2482"); // TODO: Once fixed restore the commented out code below /* - match next_leader.handle_role_transition().unwrap() { + match next_leader.handle_role_transition() { Some(FullnodeReturnType::ValidatorToLeaderRotation) => (), _ => panic!("Expected reason for exit to be leader rotation"), }