Add abort signals to tvu/tpu receivers

This commit is contained in:
Sagar Dhawan
2019-02-06 16:38:19 -08:00
committed by Michael Vines
parent 9681c4d468
commit db688207a5
3 changed files with 11 additions and 4 deletions

View File

@ -103,6 +103,7 @@ pub struct Fullnode {
broadcast_socket: UdpSocket, broadcast_socket: UdpSocket,
pub node_services: NodeServices, pub node_services: NodeServices,
pub role_notifiers: (TvuRotationReceiver, TpuRotationReceiver), pub role_notifiers: (TvuRotationReceiver, TpuRotationReceiver),
role_senders: (TvuRotationSender, TpuRotationSender),
blob_sender: BlobSender, blob_sender: BlobSender,
} }
@ -252,7 +253,7 @@ impl Fullnode {
sockets, sockets,
db_ledger.clone(), db_ledger.clone(),
config.storage_rotate_count, config.storage_rotate_count,
to_leader_sender, to_leader_sender.clone(),
&storage_state, &storage_state,
config.entry_stream.as_ref(), config.entry_stream.as_ref(),
ledger_signal_sender, ledger_signal_sender,
@ -277,7 +278,7 @@ impl Fullnode {
&last_entry_id, &last_entry_id,
id, id,
scheduled_leader == id, scheduled_leader == id,
&to_validator_sender, &to_validator_sender.clone(),
&blob_sender, &blob_sender,
); );
@ -296,6 +297,7 @@ impl Fullnode {
tpu_sockets: node.sockets.tpu, tpu_sockets: node.sockets.tpu,
broadcast_socket: node.sockets.broadcast, broadcast_socket: node.sockets.broadcast,
role_notifiers: (to_leader_receiver, to_validator_receiver), role_notifiers: (to_leader_receiver, to_validator_receiver),
role_senders: (to_leader_sender, to_validator_sender),
blob_sender, blob_sender,
} }
} }
@ -409,7 +411,7 @@ impl Fullnode {
Ok(TpuReturnType::LeaderRotation(tick_height)) => { Ok(TpuReturnType::LeaderRotation(tick_height)) => {
Some((self.leader_to_validator(tick_height), tick_height + 1)) Some((self.leader_to_validator(tick_height), tick_height + 1))
} }
Err(_) => None, _ => None,
} }
} else { } else {
let should_be_leader = self.role_notifiers.0.recv(); let should_be_leader = self.role_notifiers.0.recv();
@ -421,7 +423,7 @@ impl Fullnode {
tick_height + 1, tick_height + 1,
)) ))
} }
Err(_) => None, _ => None,
} }
} }
} }
@ -434,6 +436,7 @@ impl Fullnode {
) -> impl FnOnce() { ) -> impl FnOnce() {
let (sender, receiver) = channel(); let (sender, receiver) = channel();
let exit = self.exit.clone(); let exit = self.exit.clone();
let senders = (self.role_senders.0.clone(), self.role_senders.1.clone());
spawn(move || loop { spawn(move || loop {
if self.exit.load(Ordering::Relaxed) { if self.exit.load(Ordering::Relaxed) {
debug!("node shutdown requested"); debug!("node shutdown requested");
@ -459,6 +462,8 @@ impl Fullnode {
}); });
move || { move || {
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
let _ = senders.0.send(TvuReturnType::Abort);
let _ = senders.1.send(TpuReturnType::Abort);
receiver.recv().unwrap(); receiver.recv().unwrap();
debug!("node shutdown complete"); debug!("node shutdown complete");
} }

View File

@ -23,6 +23,7 @@ use std::thread;
pub enum TpuReturnType { pub enum TpuReturnType {
LeaderRotation(u64), LeaderRotation(u64),
Abort,
} }
pub enum TpuMode { pub enum TpuMode {

View File

@ -34,6 +34,7 @@ use std::thread;
#[derive(Debug, PartialEq, Eq, Clone)] #[derive(Debug, PartialEq, Eq, Clone)]
pub enum TvuReturnType { pub enum TvuReturnType {
LeaderRotation(u64, u64, Hash), LeaderRotation(u64, u64, Hash),
Abort,
} }
pub struct Tvu { pub struct Tvu {