From db688207a587f1da27803ab37b2b3b3c3a576c1d Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Wed, 6 Feb 2019 16:38:19 -0800 Subject: [PATCH] Add abort signals to tvu/tpu receivers --- src/fullnode.rs | 13 +++++++++---- src/tpu.rs | 1 + src/tvu.rs | 1 + 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/fullnode.rs b/src/fullnode.rs index 0d6aaa0b41..ea8f5be63c 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -103,6 +103,7 @@ pub struct Fullnode { broadcast_socket: UdpSocket, pub node_services: NodeServices, pub role_notifiers: (TvuRotationReceiver, TpuRotationReceiver), + role_senders: (TvuRotationSender, TpuRotationSender), blob_sender: BlobSender, } @@ -252,7 +253,7 @@ impl Fullnode { sockets, db_ledger.clone(), config.storage_rotate_count, - to_leader_sender, + to_leader_sender.clone(), &storage_state, config.entry_stream.as_ref(), ledger_signal_sender, @@ -277,7 +278,7 @@ impl Fullnode { &last_entry_id, id, scheduled_leader == id, - &to_validator_sender, + &to_validator_sender.clone(), &blob_sender, ); @@ -296,6 +297,7 @@ impl Fullnode { tpu_sockets: node.sockets.tpu, broadcast_socket: node.sockets.broadcast, role_notifiers: (to_leader_receiver, to_validator_receiver), + role_senders: (to_leader_sender, to_validator_sender), blob_sender, } } @@ -409,7 +411,7 @@ impl Fullnode { Ok(TpuReturnType::LeaderRotation(tick_height)) => { Some((self.leader_to_validator(tick_height), tick_height + 1)) } - Err(_) => None, + _ => None, } } else { let should_be_leader = self.role_notifiers.0.recv(); @@ -421,7 +423,7 @@ impl Fullnode { tick_height + 1, )) } - Err(_) => None, + _ => None, } } } @@ -434,6 +436,7 @@ impl Fullnode { ) -> impl FnOnce() { let (sender, receiver) = channel(); let exit = self.exit.clone(); + let senders = (self.role_senders.0.clone(), self.role_senders.1.clone()); spawn(move || loop { if self.exit.load(Ordering::Relaxed) { debug!("node shutdown requested"); @@ -459,6 +462,8 @@ impl Fullnode { }); move || { exit.store(true, Ordering::Relaxed); + let _ = senders.0.send(TvuReturnType::Abort); + let _ = senders.1.send(TpuReturnType::Abort); receiver.recv().unwrap(); debug!("node shutdown complete"); } diff --git a/src/tpu.rs b/src/tpu.rs index b400919b64..980bc8011a 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -23,6 +23,7 @@ use std::thread; pub enum TpuReturnType { LeaderRotation(u64), + Abort, } pub enum TpuMode { diff --git a/src/tvu.rs b/src/tvu.rs index 051049f00a..deb1ded50d 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -34,6 +34,7 @@ use std::thread; #[derive(Debug, PartialEq, Eq, Clone)] pub enum TvuReturnType { LeaderRotation(u64, u64, Hash), + Abort, } pub struct Tvu {