From acc6bf1564afcf353939866128c248d4a173c770 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Wed, 6 Feb 2019 18:21:51 -0800 Subject: [PATCH] Don't over complicate the solution --- src/fullnode.rs | 62 ++++++++++++++++++++++++------------------------- src/tpu.rs | 1 - src/tvu.rs | 1 - 3 files changed, 30 insertions(+), 34 deletions(-) diff --git a/src/fullnode.rs b/src/fullnode.rs index ea8f5be63c..bcc66cae66 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -23,10 +23,10 @@ use solana_sdk::timing::{duration_as_ms, timestamp}; use std::net::UdpSocket; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender, SyncSender}; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender, SyncSender}; use std::sync::{Arc, RwLock}; use std::thread::{spawn, Result}; -use std::time::Instant; +use std::time::{Duration, Instant}; pub type TvuRotationSender = Sender; pub type TvuRotationReceiver = Receiver; @@ -103,7 +103,6 @@ pub struct Fullnode { broadcast_socket: UdpSocket, pub node_services: NodeServices, pub role_notifiers: (TvuRotationReceiver, TpuRotationReceiver), - role_senders: (TvuRotationSender, TpuRotationSender), blob_sender: BlobSender, } @@ -253,7 +252,7 @@ impl Fullnode { sockets, db_ledger.clone(), config.storage_rotate_count, - to_leader_sender.clone(), + to_leader_sender, &storage_state, config.entry_stream.as_ref(), ledger_signal_sender, @@ -278,7 +277,7 @@ impl Fullnode { &last_entry_id, id, scheduled_leader == id, - &to_validator_sender.clone(), + &to_validator_sender, &blob_sender, ); @@ -297,7 +296,6 @@ impl Fullnode { tpu_sockets: node.sockets.tpu, broadcast_socket: node.sockets.broadcast, role_notifiers: (to_leader_receiver, to_validator_receiver), - role_senders: (to_leader_sender, to_validator_sender), blob_sender, } } @@ -405,25 +403,34 @@ impl Fullnode { } fn handle_role_transition(&mut self) -> Option<(FullnodeReturnType, u64)> { - if self.node_services.tpu.is_leader() { - let should_be_forwarder = self.role_notifiers.1.recv(); - match should_be_forwarder { - Ok(TpuReturnType::LeaderRotation(tick_height)) => { - Some((self.leader_to_validator(tick_height), tick_height + 1)) - } - _ => None, + let timeout = Duration::from_secs(1); + loop { + if self.exit.load(Ordering::Relaxed) { + return None; } - } else { - let should_be_leader = self.role_notifiers.0.recv(); - match should_be_leader { - Ok(TvuReturnType::LeaderRotation(tick_height, entry_height, last_entry_id)) => { - self.validator_to_leader(tick_height, entry_height, last_entry_id); - Some(( - FullnodeReturnType::ValidatorToLeaderRotation, - tick_height + 1, - )) + + if self.node_services.tpu.is_leader() { + let should_be_forwarder = self.role_notifiers.1.recv_timeout(timeout); + match should_be_forwarder { + Ok(TpuReturnType::LeaderRotation(tick_height)) => { + return Some((self.leader_to_validator(tick_height), tick_height + 1)); + } + Err(RecvTimeoutError::Timeout) => continue, + _ => return None, + } + } else { + let should_be_leader = self.role_notifiers.0.recv_timeout(timeout); + match should_be_leader { + Ok(TvuReturnType::LeaderRotation(tick_height, entry_height, last_entry_id)) => { + self.validator_to_leader(tick_height, entry_height, last_entry_id); + return Some(( + FullnodeReturnType::ValidatorToLeaderRotation, + tick_height + 1, + )); + } + Err(RecvTimeoutError::Timeout) => continue, + _ => return None, } - _ => None, } } } @@ -436,14 +443,7 @@ impl Fullnode { ) -> impl FnOnce() { let (sender, receiver) = channel(); let exit = self.exit.clone(); - let senders = (self.role_senders.0.clone(), self.role_senders.1.clone()); spawn(move || loop { - if self.exit.load(Ordering::Relaxed) { - debug!("node shutdown requested"); - self.close().expect("Unable to close node"); - sender.send(true).expect("Unable to signal exit"); - break; - } let status = self.handle_role_transition(); match status { None => { @@ -462,8 +462,6 @@ impl Fullnode { }); move || { exit.store(true, Ordering::Relaxed); - let _ = senders.0.send(TvuReturnType::Abort); - let _ = senders.1.send(TpuReturnType::Abort); receiver.recv().unwrap(); debug!("node shutdown complete"); } diff --git a/src/tpu.rs b/src/tpu.rs index 980bc8011a..b400919b64 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -23,7 +23,6 @@ use std::thread; pub enum TpuReturnType { LeaderRotation(u64), - Abort, } pub enum TpuMode { diff --git a/src/tvu.rs b/src/tvu.rs index deb1ded50d..051049f00a 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -34,7 +34,6 @@ use std::thread; #[derive(Debug, PartialEq, Eq, Clone)] pub enum TvuReturnType { LeaderRotation(u64, u64, Hash), - Abort, } pub struct Tvu {