diff --git a/src/blocktree.rs b/src/blocktree.rs index 409efac30e..8ffd48d59e 100644 --- a/src/blocktree.rs +++ b/src/blocktree.rs @@ -1483,7 +1483,7 @@ pub mod tests { fn test_read_blobs_bytes() { let shared_blobs = make_tiny_test_entries(10).to_shared_blobs(); let slot = 0; - index_blobs(&shared_blobs, &mut 0, &[slot; 10]); + index_blobs(&shared_blobs, &mut 0, slot); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index e5b0fdab85..0274f707f2 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -6,7 +6,6 @@ use crate::entry::Entry; use crate::entry::EntrySlice; #[cfg(feature = "erasure")] use crate::erasure::CodingGenerator; -use crate::leader_scheduler::LeaderScheduler; use crate::packet::index_blobs; use crate::result::{Error, Result}; use crate::service::Service; @@ -33,7 +32,6 @@ pub enum BroadcastServiceReturnType { struct Broadcast { id: Pubkey, - max_tick_height: u64, blob_index: u64, #[cfg(feature = "erasure")] @@ -43,10 +41,11 @@ struct Broadcast { impl Broadcast { fn run( &mut self, + slot_height: u64, + max_tick_height: u64, broadcast_table: &[NodeInfo], receiver: &Receiver>, sock: &UdpSocket, - leader_scheduler: &Arc>, blocktree: &Arc, ) -> Result<()> { let timer = Duration::new(1, 0); @@ -67,10 +66,6 @@ impl Broadcast { let to_blobs_start = Instant::now(); - // Generate the slot heights for all the entries inside ventries - // this may span slots if this leader broadcasts for consecutive slots... - let slots = generate_slots(&ventries, leader_scheduler); - let blobs: Vec<_> = ventries .into_par_iter() .flat_map(|p| { @@ -80,12 +75,12 @@ impl Broadcast { .collect(); // TODO: blob_index should be slot-relative... - index_blobs(&blobs, &mut self.blob_index, &slots); + index_blobs(&blobs, &mut self.blob_index, slot_height); let parent = { - if slots[0] == 0 { + if slot_height == 0 { 0 } else { - slots[0] - 1 + slot_height - 1 } }; for b in blobs.iter() { @@ -98,8 +93,8 @@ impl Broadcast { inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); - assert!(last_tick <= self.max_tick_height); - let contains_last_tick = last_tick == self.max_tick_height; + assert!(last_tick <= max_tick_height); + let contains_last_tick = last_tick == max_tick_height; if contains_last_tick { blobs.last().unwrap().write().unwrap().set_is_last_in_slot(); @@ -143,25 +138,6 @@ impl Broadcast { } } -fn generate_slots( - ventries: &[Vec<(Entry, u64)>], - leader_scheduler: &Arc>, -) -> Vec { - // Generate the slot heights for all the entries inside ventries - let r_leader_scheduler = leader_scheduler.read().unwrap(); - ventries - .iter() - .flat_map(|p| { - let slot_heights: Vec = p - .iter() - .map(|(_, tick_height)| r_leader_scheduler.tick_height_to_slot(*tick_height)) - .collect(); - - slot_heights - }) - .collect() -} - // Implement a destructor for the BroadcastService3 thread to signal it exited // even on panics struct Finalizer { @@ -187,13 +163,12 @@ pub struct BroadcastService { impl BroadcastService { #[allow(clippy::too_many_arguments)] fn run( + slot_height: u64, bank: &Arc, sock: &UdpSocket, cluster_info: &Arc>, blob_index: u64, - leader_scheduler: &Arc>, receiver: &Receiver>, - max_tick_height: u64, exit_signal: &Arc, blocktree: &Arc, ) -> BroadcastServiceReturnType { @@ -201,12 +176,13 @@ impl BroadcastService { let mut broadcast = Broadcast { id: me.id, - max_tick_height, blob_index, #[cfg(feature = "erasure")] coding_generator: CodingGenerator::new(), }; + let max_tick_height = (slot_height + 1) * bank.ticks_per_slot() - 1; + loop { if exit_signal.load(Ordering::Relaxed) { return BroadcastServiceReturnType::ExitSignal; @@ -219,10 +195,11 @@ impl BroadcastService { broadcast_table.truncate(DATA_PLANE_FANOUT); inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1); if let Err(e) = broadcast.run( + slot_height, + max_tick_height, &broadcast_table, receiver, sock, - leader_scheduler, blocktree, ) { match e { @@ -257,13 +234,12 @@ impl BroadcastService { /// completing the cycle. #[allow(clippy::too_many_arguments)] pub fn new( + slot_height: u64, bank: Arc, sock: UdpSocket, cluster_info: Arc>, blob_index: u64, - leader_scheduler: Arc>, receiver: Receiver>, - max_tick_height: u64, exit_sender: Arc, blocktree: &Arc, ) -> Self { @@ -274,13 +250,12 @@ impl BroadcastService { .spawn(move || { let _exit = Finalizer::new(exit_sender); Self::run( + slot_height, &bank, &sock, &cluster_info, blob_index, - &leader_scheduler, &receiver, - max_tick_height, &exit_signal, &blocktree, ) @@ -306,6 +281,7 @@ mod test { use crate::blocktree::Blocktree; use crate::cluster_info::{ClusterInfo, Node}; use crate::entry::create_ticks; + use crate::leader_scheduler::LeaderScheduler; use crate::service::Service; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -322,12 +298,11 @@ mod test { } fn setup_dummy_broadcast_service( + slot_height: u64, leader_pubkey: Pubkey, ledger_path: &str, - leader_scheduler: Arc>, entry_receiver: Receiver>, blob_index: u64, - max_tick_height: u64, ) -> MockBroadcastService { // Make the database ledger let blocktree = Arc::new(Blocktree::open(ledger_path).unwrap()); @@ -349,13 +324,12 @@ mod test { // Start up the broadcast stage let broadcast_service = BroadcastService::new( + slot_height, bank.clone(), leader_info.sockets.broadcast, cluster_info, blob_index, - leader_scheduler, entry_receiver, - max_tick_height, exit_sender, &blocktree, ); @@ -381,15 +355,13 @@ mod test { let start_tick_height = 0; let max_tick_height = start_tick_height + DEFAULT_TICKS_PER_SLOT; - let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); let (entry_sender, entry_receiver) = channel(); let broadcast_service = setup_dummy_broadcast_service( + 0, leader_keypair.pubkey(), &ledger_path, - leader_scheduler.clone(), entry_receiver, 0, - max_tick_height, ); let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default()); @@ -403,10 +375,7 @@ mod test { let blocktree = broadcast_service.blocktree; let mut blob_index = 0; for i in 0..max_tick_height - start_tick_height { - let slot = leader_scheduler - .read() - .unwrap() - .tick_height_to_slot(start_tick_height + i + 1); + let slot = leader_scheduler.tick_height_to_slot(start_tick_height + i + 1); let result = blocktree.get_data_blob(slot, blob_index).unwrap(); diff --git a/src/db_window.rs b/src/db_window.rs index 13eb68e549..e068eee18a 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -450,7 +450,7 @@ mod test { let num_entries = 10; let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs(); - index_blobs(&shared_blobs, &mut 0, &vec![slot; num_entries]); + index_blobs(&shared_blobs, &mut 0, slot); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); @@ -545,7 +545,7 @@ mod test { let original_entries = make_tiny_test_entries(num_entries); let shared_blobs = original_entries.clone().to_shared_blobs(); - index_blobs(&shared_blobs, &mut 0, &vec![0; num_entries]); + index_blobs(&shared_blobs, &mut 0, 0); for blob in shared_blobs.iter().rev() { process_blob(&leader_scheduler, &blocktree, blob) diff --git a/src/erasure.rs b/src/erasure.rs index f39aca048a..a60f00c5b6 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -889,7 +889,7 @@ pub mod test { } // Make some dummy slots - index_blobs(&blobs, &mut (offset as u64), &vec![slot; blobs.len()]); + index_blobs(&blobs, &mut (offset as u64), slot); for b in blobs { let idx = b.read().unwrap().index() as usize % WINDOW_SIZE; @@ -902,7 +902,7 @@ pub mod test { fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec { let blobs = make_tiny_test_entries(num_blobs).to_shared_blobs(); - index_blobs(&blobs, &mut (offset as u64), &vec![0; blobs.len()]); + index_blobs(&blobs, &mut (offset as u64), 0); blobs } diff --git a/src/fullnode.rs b/src/fullnode.rs index 6c0f2b55a6..45453e53ba 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -101,7 +101,6 @@ pub struct Fullnode { node_services: NodeServices, rotation_receiver: TvuRotationReceiver, blocktree: Arc, - leader_scheduler: Arc>, bank_forks: Arc>, } @@ -252,7 +251,6 @@ impl Fullnode { broadcast_socket: node.sockets.broadcast, rotation_receiver, blocktree, - leader_scheduler, bank_forks, } } @@ -306,7 +304,6 @@ impl Fullnode { rotation_info.slot, rotation_info.last_entry_id, &self.blocktree, - &self.leader_scheduler, ); transition } else { diff --git a/src/packet.rs b/src/packet.rs index e9ac5b24c5..1ccfbbb0b0 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -452,13 +452,13 @@ impl Blob { } } -pub fn index_blobs(blobs: &[SharedBlob], blob_index: &mut u64, slots: &[u64]) { +pub fn index_blobs(blobs: &[SharedBlob], blob_index: &mut u64, slot: u64) { // enumerate all the blobs, those are the indices - for (blob, slot) in blobs.iter().zip(slots) { + for blob in blobs.iter() { let mut blob = blob.write().unwrap(); blob.set_index(*blob_index); - blob.set_slot(*slot); + blob.set_slot(slot); blob.forward(true); *blob_index += 1; } diff --git a/src/tpu.rs b/src/tpu.rs index e8278bc84b..5f5b543972 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -7,7 +7,6 @@ use crate::broadcast_service::BroadcastService; use crate::cluster_info::ClusterInfo; use crate::cluster_info_vote_listener::ClusterInfoVoteListener; use crate::fetch_stage::FetchStage; -use crate::leader_scheduler::LeaderScheduler; use crate::poh_service::PohServiceConfig; use crate::service::Service; use crate::sigverify_stage::SigVerifyStage; @@ -199,7 +198,6 @@ impl Tpu { slot: u64, last_entry_id: Hash, blocktree: &Arc, - leader_scheduler: &Arc>, ) { self.close_and_forward_unprocessed_packets(); @@ -240,13 +238,12 @@ impl Tpu { ); let broadcast_service = BroadcastService::new( + slot, bank, broadcast_socket, self.cluster_info.clone(), blob_index, - leader_scheduler.clone(), entry_receiver, - max_tick_height, self.exit.clone(), blocktree, ); diff --git a/tests/tvu.rs b/tests/tvu.rs index 153b12c8ec..8fd18710bd 100644 --- a/tests/tvu.rs +++ b/tests/tvu.rs @@ -159,7 +159,7 @@ fn test_replay() { let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2]; let blobs = entries.to_shared_blobs(); - index_blobs(&blobs, &mut blob_idx, &vec![0; blobs.len()]); + index_blobs(&blobs, &mut blob_idx, 0); blobs .iter() .for_each(|b| b.write().unwrap().meta.set_addr(&tvu_addr));