From 9b43b00d5cc7a5065ecd8f19f61f2dcfab7da5f8 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Tue, 6 Nov 2018 13:17:41 -0800 Subject: [PATCH] remove tick_count, leader_scheduler, from broadcast code (#1725) --- src/broadcast_stage.rs | 25 +------------------------ src/cluster_info.rs | 33 --------------------------------- src/fullnode.rs | 4 ---- src/tpu.rs | 1 - src/window.rs | 2 -- 5 files changed, 1 insertion(+), 64 deletions(-) diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 83da063d0f..8c35266f9f 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -6,7 +6,6 @@ use entry::Entry; #[cfg(feature = "erasure")] use erasure; use influx_db_client as influxdb; -use leader_scheduler::LeaderScheduler; use ledger::Block; use log::Level; use metrics; @@ -29,10 +28,7 @@ pub enum BroadcastStageReturnType { ChannelDisconnected, } -#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] fn broadcast( - leader_scheduler: &Arc>, - mut tick_height: u64, node_info: &NodeInfo, broadcast_table: &[NodeInfo], window: &SharedWindow, @@ -50,9 +46,6 @@ fn broadcast( ventries.push(entries); while let Ok(entries) = receiver.try_recv() { num_entries += entries.len(); - tick_height += entries - .iter() - .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64); ventries.push(entries); } inc_new_counter_info!("broadcast_stage-entries_received", num_entries); @@ -137,8 +130,6 @@ fn broadcast( // Send blobs out from the window ClusterInfo::broadcast( - &leader_scheduler, - tick_height, &node_info, &broadcast_table, &window, @@ -198,8 +189,6 @@ impl BroadcastStage { window: &SharedWindow, entry_height: u64, receiver: &Receiver>, - leader_scheduler: &Arc>, - tick_height: u64, ) -> BroadcastStageReturnType { let mut transmit_index = WindowIndex { data: entry_height, @@ -210,8 +199,6 @@ impl BroadcastStage { loop { let broadcast_table = cluster_info.read().unwrap().compute_broadcast_table(); if let Err(e) = broadcast( - leader_scheduler, - tick_height, &me, &broadcast_table, &window, @@ -256,23 +243,13 @@ impl BroadcastStage { window: SharedWindow, entry_height: u64, receiver: Receiver>, - leader_scheduler: Arc>, - tick_height: u64, exit_sender: Arc, ) -> Self { let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { let _exit = Finalizer::new(exit_sender); - Self::run( - &sock, - &cluster_info, - &window, - entry_height, - &receiver, - &leader_scheduler, - tick_height, - ) + Self::run(&sock, &cluster_info, &window, entry_height, &receiver) }).unwrap(); BroadcastStage { thread_hdl } diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 60a2e2f07e..c7978ddb11 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -16,7 +16,6 @@ use bincode::{deserialize, serialize, serialized_size}; use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy}; use counter::Counter; use hash::Hash; -use leader_scheduler::LeaderScheduler; use ledger::LedgerWindow; use log::Level; use netutil::{bind_in_range, bind_to, find_available_port_in_range, multi_bind_in_range}; @@ -461,10 +460,7 @@ impl ClusterInfo { /// broadcast messages from the leader to layer 1 nodes /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` - #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn broadcast( - leader_scheduler: &Arc>, - tick_height: u64, me: &NodeInfo, broadcast_table: &[NodeInfo], window: &SharedWindow, @@ -506,35 +502,6 @@ impl ClusterInfo { br_idx ); - // Make sure the next leader in line knows about the entries before his slot in the leader - // rotation so they can initiate repairs if necessary - { - let ls_lock = leader_scheduler.read().unwrap(); - let next_leader_height = ls_lock.max_height_for_leader(tick_height); - let next_leader_id = - next_leader_height.map(|nlh| ls_lock.get_scheduled_leader(nlh)); - // In the case the next scheduled leader is None, then the write_stage moved - // the schedule too far ahead and we no longer are in the known window - // (will happen during calculation of the next set of slots every epoch or - // seed_rotation_interval heights when we move the window forward in the - // LeaderScheduler). For correctness, this is fine write_stage will never send - // blobs past the point of when this node should stop being leader, so we just - // continue broadcasting until we catch up to write_stage. The downside is we - // can't guarantee the current leader will broadcast the last entry to the next - // scheduled leader, so the next leader will have to rely on avalanche/repairs - // to get this last blob, which could cause slowdowns during leader handoffs. - // See corresponding issue for repairs in repair() function in window.rs. - if let Some(Some(next_leader_id)) = next_leader_id { - if next_leader_id == me.id { - break; - } - let info_result = broadcast_table.iter().position(|n| n.id == next_leader_id); - if let Some(index) = info_result { - orders.push((window_l[w_idx].data.clone(), &broadcast_table[index])); - } - } - } - orders.push((window_l[w_idx].data.clone(), &broadcast_table[br_idx])); br_idx += 1; br_idx %= broadcast_table.len(); diff --git a/src/fullnode.rs b/src/fullnode.rs index 1c730911b6..9887e6e753 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -346,8 +346,6 @@ impl Fullnode { shared_window.clone(), entry_height, entry_receiver, - bank.leader_scheduler.clone(), - bank.tick_height(), tpu_exit, ); let leader_state = LeaderServices::new(tpu, broadcast_stage); @@ -498,8 +496,6 @@ impl Fullnode { self.shared_window.clone(), entry_height, blob_receiver, - self.bank.leader_scheduler.clone(), - tick_height, tpu_exit, ); let leader_state = LeaderServices::new(tpu, broadcast_stage); diff --git a/src/tpu.rs b/src/tpu.rs index 534a51315a..1b3c680ec3 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -54,7 +54,6 @@ pub struct Tpu { } impl Tpu { - #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn new( bank: &Arc, tick_duration: Config, diff --git a/src/window.rs b/src/window.rs index bff981ee9d..9a9c994aa8 100644 --- a/src/window.rs +++ b/src/window.rs @@ -52,7 +52,6 @@ pub trait WindowUtil { fn window_size(&self) -> u64; - #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] fn repair( &mut self, cluster_info: &Arc>, @@ -67,7 +66,6 @@ pub trait WindowUtil { fn print(&self, id: &Pubkey, consumed: u64) -> String; - #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] fn process_blob( &mut self, id: &Pubkey,