Collect and re-forward packets received while TpuForwarder is shutting down

This commit is contained in:
Michael Vines
2019-02-14 14:12:19 -08:00
parent 5333bda234
commit 626a381ddc
5 changed files with 145 additions and 141 deletions

View File

@ -98,7 +98,6 @@ pub struct Fullnode {
rpc_pubsub_service: Option<PubSubService>, rpc_pubsub_service: Option<PubSubService>,
gossip_service: GossipService, gossip_service: GossipService,
bank: Arc<Bank>, bank: Arc<Bank>,
cluster_info: Arc<RwLock<ClusterInfo>>,
sigverify_disabled: bool, sigverify_disabled: bool,
tpu_sockets: Vec<UdpSocket>, tpu_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket, broadcast_socket: UdpSocket,
@ -228,8 +227,6 @@ impl Fullnode {
(scheduled_leader, max_tick_height, blob_index) (scheduled_leader, max_tick_height, blob_index)
}; };
cluster_info.write().unwrap().set_leader(scheduled_leader);
let sockets = Sockets { let sockets = Sockets {
repair: node repair: node
.sockets .sockets
@ -274,34 +271,10 @@ impl Fullnode {
ledger_signal_sender, ledger_signal_sender,
ledger_signal_receiver, ledger_signal_receiver,
); );
let tpu = Tpu::new( let tpu = Tpu::new(id, &cluster_info);
&Arc::new(bank.copy_for_tpu()),
PohServiceConfig::default(),
node.sockets
.tpu
.iter()
.map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
.collect(),
node.sockets
.broadcast
.try_clone()
.expect("Failed to clone broadcast socket"),
&cluster_info,
config.sigverify_disabled,
max_tick_height,
blob_index,
&last_entry_id,
id,
&rotation_sender,
&blocktree,
scheduled_leader == id,
);
inc_new_counter_info!("fullnode-new", 1); let mut fullnode = Self {
Self {
id, id,
cluster_info,
bank, bank,
sigverify_disabled: config.sigverify_disabled, sigverify_disabled: config.sigverify_disabled,
gossip_service, gossip_service,
@ -314,7 +287,16 @@ impl Fullnode {
rotation_sender, rotation_sender,
rotation_receiver, rotation_receiver,
blocktree, blocktree,
} };
fullnode.rotate(
scheduled_leader,
max_tick_height,
blob_index,
&last_entry_id,
);
inc_new_counter_info!("fullnode-new", 1);
fullnode
} }
fn get_next_leader(&self, tick_height: u64) -> (Pubkey, u64) { fn get_next_leader(&self, tick_height: u64) -> (Pubkey, u64) {
@ -355,30 +337,30 @@ impl Fullnode {
max_tick_height max_tick_height
); );
self.cluster_info
.write()
.unwrap()
.set_leader(scheduled_leader);
(scheduled_leader, max_tick_height) (scheduled_leader, max_tick_height)
} }
fn rotate(&mut self, tick_height: u64) -> FullnodeReturnType { fn rotate(
trace!("{:?}: rotate at tick_height={}", self.id, tick_height,); &mut self,
let was_leader = self.node_services.tpu.is_leader(); next_leader: Pubkey,
max_tick_height: u64,
let (scheduled_leader, max_tick_height) = self.get_next_leader(tick_height); blob_index: u64,
if scheduled_leader == self.id { last_entry_id: &Hash,
let transition = if was_leader { ) -> FullnodeReturnType {
if next_leader == self.id {
let transition = match self.node_services.tpu.is_leader() {
Some(was_leader) => {
if was_leader {
debug!("{:?} remaining in leader role", self.id); debug!("{:?} remaining in leader role", self.id);
FullnodeReturnType::LeaderToLeaderRotation FullnodeReturnType::LeaderToLeaderRotation
} else { } else {
debug!("{:?} rotating to leader role", self.id); debug!("{:?} rotating to leader role", self.id);
FullnodeReturnType::ValidatorToLeaderRotation FullnodeReturnType::ValidatorToLeaderRotation
}
}
None => FullnodeReturnType::LeaderToLeaderRotation, // value doesn't matter here...
}; };
let last_entry_id = self.bank.last_id();
self.node_services.tpu.switch_to_leader( self.node_services.tpu.switch_to_leader(
&Arc::new(self.bank.copy_for_tpu()), &Arc::new(self.bank.copy_for_tpu()),
PohServiceConfig::default(), PohServiceConfig::default(),
@ -391,17 +373,16 @@ impl Fullnode {
.expect("Failed to clone broadcast socket"), .expect("Failed to clone broadcast socket"),
self.sigverify_disabled, self.sigverify_disabled,
max_tick_height, max_tick_height,
0, blob_index,
&last_entry_id, last_entry_id,
self.id,
&self.rotation_sender, &self.rotation_sender,
&self.blocktree, &self.blocktree,
); );
transition transition
} else { } else {
debug!("{:?} rotating to validator role", self.id); debug!("{:?} rotating to validator role", self.id);
self.node_services.tpu.switch_to_forwarder( self.node_services.tpu.switch_to_forwarder(
next_leader,
self.tpu_sockets self.tpu_sockets
.iter() .iter()
.map(|s| s.try_clone().expect("Failed to clone TPU sockets")) .map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
@ -430,7 +411,10 @@ impl Fullnode {
match self.rotation_receiver.recv_timeout(timeout) { match self.rotation_receiver.recv_timeout(timeout) {
Ok(tick_height) => { Ok(tick_height) => {
let transition = self.rotate(tick_height); trace!("{:?}: rotate at tick_height={}", self.id, tick_height);
let (next_leader, max_tick_height) = self.get_next_leader(tick_height);
let transition =
self.rotate(next_leader, max_tick_height, 0, &self.bank.last_id());
debug!("role transition complete: {:?}", transition); debug!("role transition complete: {:?}", transition);
if let Some(ref rotation_notifier) = rotation_notifier { if let Some(ref rotation_notifier) = rotation_notifier {
rotation_notifier rotation_notifier
@ -732,7 +716,7 @@ mod tests {
&fullnode_config, &fullnode_config,
); );
assert!(!bootstrap_leader.node_services.tpu.is_leader()); assert!(!bootstrap_leader.node_services.tpu.is_leader().unwrap());
// Test that a node knows to transition to a leader based on parsing the ledger // Test that a node knows to transition to a leader based on parsing the ledger
let validator = Fullnode::new( let validator = Fullnode::new(
@ -744,7 +728,7 @@ mod tests {
&fullnode_config, &fullnode_config,
); );
assert!(validator.node_services.tpu.is_leader()); assert!(validator.node_services.tpu.is_leader().unwrap());
validator.close().expect("Expected leader node to close"); validator.close().expect("Expected leader node to close");
bootstrap_leader bootstrap_leader
.close() .close()

View File

@ -278,11 +278,13 @@ impl ReplayStage {
// Check for leader rotation // Check for leader rotation
let leader_id = Self::get_leader_for_next_tick(&bank); let leader_id = Self::get_leader_for_next_tick(&bank);
if leader_id != last_leader_id {
if my_id == leader_id {
to_leader_sender.send(current_tick_height).unwrap();
} else {
// TODO: Remove this soon once we boot the leader from ClusterInfo // TODO: Remove this soon once we boot the leader from ClusterInfo
cluster_info.write().unwrap().set_leader(leader_id); cluster_info.write().unwrap().set_leader(leader_id);
}
if leader_id != last_leader_id && my_id == leader_id {
to_leader_sender.send(current_tick_height).unwrap();
} }
// Check for any slots that chain to this one // Check for any slots that chain to this one

View File

@ -68,51 +68,18 @@ impl ForwarderServices {
pub struct Tpu { pub struct Tpu {
tpu_mode: Option<TpuMode>, tpu_mode: Option<TpuMode>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
id: Pubkey,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<RwLock<ClusterInfo>>,
} }
impl Tpu { impl Tpu {
#[allow(clippy::too_many_arguments)] pub fn new(id: Pubkey, cluster_info: &Arc<RwLock<ClusterInfo>>) -> Self {
pub fn new( Self {
bank: &Arc<Bank>,
tick_duration: PohServiceConfig,
transactions_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket,
cluster_info: &Arc<RwLock<ClusterInfo>>,
sigverify_disabled: bool,
max_tick_height: u64,
blob_index: u64,
last_entry_id: &Hash,
leader_id: Pubkey,
to_validator_sender: &TpuRotationSender,
blocktree: &Arc<Blocktree>,
is_leader: bool,
) -> Self {
let mut tpu = Self {
tpu_mode: None, tpu_mode: None,
exit: Arc::new(AtomicBool::new(false)), exit: Arc::new(AtomicBool::new(false)),
id,
cluster_info: cluster_info.clone(), cluster_info: cluster_info.clone(),
};
if is_leader {
tpu.switch_to_leader(
bank,
tick_duration,
transactions_sockets,
broadcast_socket,
sigverify_disabled,
max_tick_height,
blob_index,
last_entry_id,
leader_id,
to_validator_sender,
blocktree,
);
} else {
tpu.switch_to_forwarder(transactions_sockets);
} }
tpu
} }
fn mode_close(&self) { fn mode_close(&self) {
@ -144,8 +111,15 @@ impl Tpu {
fn close_and_forward_unprocessed_packets(&mut self) { fn close_and_forward_unprocessed_packets(&mut self) {
self.mode_close(); self.mode_close();
if let Some(TpuMode::Leader(svcs)) = self.tpu_mode.take().as_mut() { let unprocessed_packets = match self.tpu_mode.take().as_mut() {
let unprocessed_packets = svcs.banking_stage.join_and_collect_unprocessed_packets(); Some(TpuMode::Leader(svcs)) => {
svcs.banking_stage.join_and_collect_unprocessed_packets()
}
Some(TpuMode::Forwarder(svcs)) => {
svcs.tpu_forwarder.join_and_collect_unprocessed_packets()
}
None => vec![],
};
if !unprocessed_packets.is_empty() { if !unprocessed_packets.is_empty() {
let tpu = self.cluster_info.read().unwrap().leader_data().unwrap().tpu; let tpu = self.cluster_info.read().unwrap().leader_data().unwrap().tpu;
@ -155,11 +129,12 @@ impl Tpu {
}); });
} }
} }
}
pub fn switch_to_forwarder(&mut self, transactions_sockets: Vec<UdpSocket>) { pub fn switch_to_forwarder(&mut self, leader_id: Pubkey, transactions_sockets: Vec<UdpSocket>) {
self.close_and_forward_unprocessed_packets(); self.close_and_forward_unprocessed_packets();
self.cluster_info.write().unwrap().set_leader(leader_id);
let tpu_forwarder = TpuForwarder::new(transactions_sockets, self.cluster_info.clone()); let tpu_forwarder = TpuForwarder::new(transactions_sockets, self.cluster_info.clone());
self.tpu_mode = Some(TpuMode::Forwarder(ForwarderServices::new(tpu_forwarder))); self.tpu_mode = Some(TpuMode::Forwarder(ForwarderServices::new(tpu_forwarder)));
} }
@ -175,12 +150,13 @@ impl Tpu {
max_tick_height: u64, max_tick_height: u64,
blob_index: u64, blob_index: u64,
last_entry_id: &Hash, last_entry_id: &Hash,
leader_id: Pubkey,
to_validator_sender: &TpuRotationSender, to_validator_sender: &TpuRotationSender,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
) { ) {
self.close_and_forward_unprocessed_packets(); self.close_and_forward_unprocessed_packets();
self.cluster_info.write().unwrap().set_leader(self.id);
self.exit = Arc::new(AtomicBool::new(false)); self.exit = Arc::new(AtomicBool::new(false));
let (packet_sender, packet_receiver) = channel(); let (packet_sender, packet_receiver) = channel();
let fetch_stage = FetchStage::new_with_sender( let fetch_stage = FetchStage::new_with_sender(
@ -203,7 +179,7 @@ impl Tpu {
tick_duration, tick_duration,
last_entry_id, last_entry_id,
max_tick_height, max_tick_height,
leader_id, self.id,
&to_validator_sender, &to_validator_sender,
); );
@ -229,10 +205,11 @@ impl Tpu {
self.tpu_mode = Some(TpuMode::Leader(svcs)); self.tpu_mode = Some(TpuMode::Leader(svcs));
} }
pub fn is_leader(&self) -> bool { pub fn is_leader(&self) -> Option<bool> {
match self.tpu_mode { match self.tpu_mode {
Some(TpuMode::Leader(_)) => true, Some(TpuMode::Leader(_)) => Some(true),
_ => false, Some(TpuMode::Forwarder(_)) => Some(false),
None => None,
} }
} }

View File

@ -2,10 +2,10 @@
//! transaction processing unit responsibility, which //! transaction processing unit responsibility, which
//! forwards received packets to the current leader //! forwards received packets to the current leader
use crate::banking_stage::UnprocessedPackets;
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use crate::counter::Counter; use crate::counter::Counter;
use crate::result::Result;
use crate::service::Service; use crate::service::Service;
use crate::streamer::{self, PacketReceiver}; use crate::streamer::{self, PacketReceiver};
use log::Level; use log::Level;
@ -18,9 +18,11 @@ use std::thread::{self, Builder, JoinHandle};
fn get_forwarding_addr(leader_data: Option<&ContactInfo>, my_id: &Pubkey) -> Option<SocketAddr> { fn get_forwarding_addr(leader_data: Option<&ContactInfo>, my_id: &Pubkey) -> Option<SocketAddr> {
let leader_data = leader_data?; let leader_data = leader_data?;
if leader_data.id == *my_id || !ContactInfo::is_valid_address(&leader_data.tpu) { if leader_data.id == *my_id {
// weird cases, but we don't want to broadcast, send to ANY, or info!("I may be stuck in a loop"); // Should never try to forward to ourselves
// induce an infinite loop, but this shouldn't happen, or shouldn't be true for long... return None;
}
if !ContactInfo::is_valid_address(&leader_data.tpu) {
return None; return None;
} }
Some(leader_data.tpu) Some(leader_data.tpu)
@ -29,36 +31,65 @@ fn get_forwarding_addr(leader_data: Option<&ContactInfo>, my_id: &Pubkey) -> Opt
pub struct TpuForwarder { pub struct TpuForwarder {
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
forwarder_thread: Option<JoinHandle<UnprocessedPackets>>,
} }
impl TpuForwarder { impl TpuForwarder {
fn forward(receiver: &PacketReceiver, cluster_info: &Arc<RwLock<ClusterInfo>>) -> Result<()> { fn forward(
let socket = UdpSocket::bind("0.0.0.0:0")?; receiver: &PacketReceiver,
cluster_info: &Arc<RwLock<ClusterInfo>>,
exit: &Arc<AtomicBool>,
) -> UnprocessedPackets {
let socket = UdpSocket::bind("0.0.0.0:0").expect("Unable to bind");
let my_id = cluster_info.read().unwrap().id(); let my_id = cluster_info.read().unwrap().id();
let mut unprocessed_packets = vec![];
loop { loop {
let msgs = receiver.recv()?; match receiver.recv() {
Ok(msgs) => {
inc_new_counter_info!( inc_new_counter_info!(
"tpu_forwarder-msgs_received", "tpu_forwarder-msgs_received",
msgs.read().unwrap().packets.len() msgs.read().unwrap().packets.len()
); );
let send_addr = get_forwarding_addr(cluster_info.read().unwrap().leader_data(), &my_id); if exit.load(Ordering::Relaxed) {
// Collect all remaining packets on exit signaled
unprocessed_packets.push((msgs, 0));
continue;
}
if let Some(send_addr) = send_addr { match get_forwarding_addr(cluster_info.read().unwrap().leader_data(), &my_id) {
Some(send_addr) => {
msgs.write().unwrap().set_addr(&send_addr); msgs.write().unwrap().set_addr(&send_addr);
msgs.read().unwrap().send_to(&socket)?; msgs.read().unwrap().send_to(&socket).unwrap_or_else(|err| {
info!("Failed to forward packet to {:?}: {:?}", send_addr, err)
});
}
None => warn!("Packets dropped due to no forwarding address"),
} }
} }
Err(err) => {
trace!("Exiting forwarder due to {:?}", err);
break;
}
}
}
unprocessed_packets
}
pub fn join_and_collect_unprocessed_packets(&mut self) -> UnprocessedPackets {
let forwarder_thread = self.forwarder_thread.take().unwrap();
forwarder_thread.join().unwrap_or_else(|err| {
warn!("forwarder_thread join failed: {:?}", err);
vec![]
})
} }
pub fn new(sockets: Vec<UdpSocket>, cluster_info: Arc<RwLock<ClusterInfo>>) -> Self { pub fn new(sockets: Vec<UdpSocket>, cluster_info: Arc<RwLock<ClusterInfo>>) -> Self {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let (sender, receiver) = channel(); let (sender, receiver) = channel();
let mut thread_hdls: Vec<_> = sockets let thread_hdls: Vec<_> = sockets
.into_iter() .into_iter()
.map(|socket| { .map(|socket| {
streamer::receiver( streamer::receiver(
@ -70,16 +101,19 @@ impl TpuForwarder {
}) })
.collect(); .collect();
let thread_hdl = Builder::new() let thread_exit = exit.clone();
let forwarder_thread = Some(
Builder::new()
.name("solana-tpu_forwarder".to_string()) .name("solana-tpu_forwarder".to_string())
.spawn(move || { .spawn(move || Self::forward(&receiver, &cluster_info, &thread_exit))
let _ignored = Self::forward(&receiver, &cluster_info); .unwrap(),
}) );
.unwrap();
thread_hdls.push(thread_hdl); TpuForwarder {
exit,
TpuForwarder { exit, thread_hdls } thread_hdls,
forwarder_thread,
}
} }
pub fn close(&self) { pub fn close(&self) {
@ -95,6 +129,7 @@ impl Service for TpuForwarder {
for thread_hdl in self.thread_hdls { for thread_hdl in self.thread_hdls {
thread_hdl.join()?; thread_hdl.join()?;
} }
self.forwarder_thread.unwrap().join()?;
Ok(()) Ok(())
} }
} }

View File

@ -1797,6 +1797,13 @@ fn test_fullnode_rotate(
let mut fullnode_config = FullnodeConfig::default(); let mut fullnode_config = FullnodeConfig::default();
fullnode_config.leader_scheduler_config.ticks_per_slot = ticks_per_slot; fullnode_config.leader_scheduler_config.ticks_per_slot = ticks_per_slot;
fullnode_config.leader_scheduler_config.slots_per_epoch = slots_per_epoch; fullnode_config.leader_scheduler_config.slots_per_epoch = slots_per_epoch;
// Note: when debugging failures in this test, disabling voting can help keep the log noise
// down by removing the extra vote transactions
/*
fullnode_config.voting_disabled = true;
*/
let blocktree_config = fullnode_config.ledger_config(); let blocktree_config = fullnode_config.ledger_config();
fullnode_config fullnode_config
.leader_scheduler_config .leader_scheduler_config
@ -2084,7 +2091,6 @@ fn test_one_fullnode_rotate_every_tick_with_transactions() {
} }
#[test] #[test]
#[ignore]
fn test_two_fullnodes_rotate_every_tick_with_transactions() { fn test_two_fullnodes_rotate_every_tick_with_transactions() {
test_fullnode_rotate(1, 1, true, true); test_fullnode_rotate(1, 1, true, true);
} }