Don't over complicate the solution

This commit is contained in:
Sagar Dhawan
2019-02-06 18:21:51 -08:00
committed by Michael Vines
parent db688207a5
commit acc6bf1564
3 changed files with 30 additions and 34 deletions

View File

@ -23,10 +23,10 @@ use solana_sdk::timing::{duration_as_ms, timestamp};
use std::net::UdpSocket; use std::net::UdpSocket;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender, SyncSender}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender, SyncSender};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{spawn, Result}; use std::thread::{spawn, Result};
use std::time::Instant; use std::time::{Duration, Instant};
pub type TvuRotationSender = Sender<TvuReturnType>; pub type TvuRotationSender = Sender<TvuReturnType>;
pub type TvuRotationReceiver = Receiver<TvuReturnType>; pub type TvuRotationReceiver = Receiver<TvuReturnType>;
@ -103,7 +103,6 @@ pub struct Fullnode {
broadcast_socket: UdpSocket, broadcast_socket: UdpSocket,
pub node_services: NodeServices, pub node_services: NodeServices,
pub role_notifiers: (TvuRotationReceiver, TpuRotationReceiver), pub role_notifiers: (TvuRotationReceiver, TpuRotationReceiver),
role_senders: (TvuRotationSender, TpuRotationSender),
blob_sender: BlobSender, blob_sender: BlobSender,
} }
@ -253,7 +252,7 @@ impl Fullnode {
sockets, sockets,
db_ledger.clone(), db_ledger.clone(),
config.storage_rotate_count, config.storage_rotate_count,
to_leader_sender.clone(), to_leader_sender,
&storage_state, &storage_state,
config.entry_stream.as_ref(), config.entry_stream.as_ref(),
ledger_signal_sender, ledger_signal_sender,
@ -278,7 +277,7 @@ impl Fullnode {
&last_entry_id, &last_entry_id,
id, id,
scheduled_leader == id, scheduled_leader == id,
&to_validator_sender.clone(), &to_validator_sender,
&blob_sender, &blob_sender,
); );
@ -297,7 +296,6 @@ impl Fullnode {
tpu_sockets: node.sockets.tpu, tpu_sockets: node.sockets.tpu,
broadcast_socket: node.sockets.broadcast, broadcast_socket: node.sockets.broadcast,
role_notifiers: (to_leader_receiver, to_validator_receiver), role_notifiers: (to_leader_receiver, to_validator_receiver),
role_senders: (to_leader_sender, to_validator_sender),
blob_sender, blob_sender,
} }
} }
@ -405,25 +403,34 @@ impl Fullnode {
} }
fn handle_role_transition(&mut self) -> Option<(FullnodeReturnType, u64)> { fn handle_role_transition(&mut self) -> Option<(FullnodeReturnType, u64)> {
if self.node_services.tpu.is_leader() { let timeout = Duration::from_secs(1);
let should_be_forwarder = self.role_notifiers.1.recv(); loop {
match should_be_forwarder { if self.exit.load(Ordering::Relaxed) {
Ok(TpuReturnType::LeaderRotation(tick_height)) => { return None;
Some((self.leader_to_validator(tick_height), tick_height + 1))
}
_ => None,
} }
} else {
let should_be_leader = self.role_notifiers.0.recv(); if self.node_services.tpu.is_leader() {
match should_be_leader { let should_be_forwarder = self.role_notifiers.1.recv_timeout(timeout);
Ok(TvuReturnType::LeaderRotation(tick_height, entry_height, last_entry_id)) => { match should_be_forwarder {
self.validator_to_leader(tick_height, entry_height, last_entry_id); Ok(TpuReturnType::LeaderRotation(tick_height)) => {
Some(( return Some((self.leader_to_validator(tick_height), tick_height + 1));
FullnodeReturnType::ValidatorToLeaderRotation, }
tick_height + 1, Err(RecvTimeoutError::Timeout) => continue,
)) _ => return None,
}
} else {
let should_be_leader = self.role_notifiers.0.recv_timeout(timeout);
match should_be_leader {
Ok(TvuReturnType::LeaderRotation(tick_height, entry_height, last_entry_id)) => {
self.validator_to_leader(tick_height, entry_height, last_entry_id);
return Some((
FullnodeReturnType::ValidatorToLeaderRotation,
tick_height + 1,
));
}
Err(RecvTimeoutError::Timeout) => continue,
_ => return None,
} }
_ => None,
} }
} }
} }
@ -436,14 +443,7 @@ impl Fullnode {
) -> impl FnOnce() { ) -> impl FnOnce() {
let (sender, receiver) = channel(); let (sender, receiver) = channel();
let exit = self.exit.clone(); let exit = self.exit.clone();
let senders = (self.role_senders.0.clone(), self.role_senders.1.clone());
spawn(move || loop { spawn(move || loop {
if self.exit.load(Ordering::Relaxed) {
debug!("node shutdown requested");
self.close().expect("Unable to close node");
sender.send(true).expect("Unable to signal exit");
break;
}
let status = self.handle_role_transition(); let status = self.handle_role_transition();
match status { match status {
None => { None => {
@ -462,8 +462,6 @@ impl Fullnode {
}); });
move || { move || {
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
let _ = senders.0.send(TvuReturnType::Abort);
let _ = senders.1.send(TpuReturnType::Abort);
receiver.recv().unwrap(); receiver.recv().unwrap();
debug!("node shutdown complete"); debug!("node shutdown complete");
} }

View File

@ -23,7 +23,6 @@ use std::thread;
pub enum TpuReturnType { pub enum TpuReturnType {
LeaderRotation(u64), LeaderRotation(u64),
Abort,
} }
pub enum TpuMode { pub enum TpuMode {

View File

@ -34,7 +34,6 @@ use std::thread;
#[derive(Debug, PartialEq, Eq, Clone)] #[derive(Debug, PartialEq, Eq, Clone)]
pub enum TvuReturnType { pub enum TvuReturnType {
LeaderRotation(u64, u64, Hash), LeaderRotation(u64, u64, Hash),
Abort,
} }
pub struct Tvu { pub struct Tvu {