From 77cb70dd80f12077fd58c64450fe5e2de0e6a9ca Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Tue, 19 Feb 2019 21:29:11 -0800 Subject: [PATCH] Remove dependency on Entry::tick_height --- src/broadcast_service.rs | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index f821528657..8682ad381c 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -33,6 +33,7 @@ pub enum BroadcastServiceReturnType { struct Broadcast { id: Pubkey, + tick_height: u64, max_tick_height: u64, blob_index: u64, @@ -41,6 +42,15 @@ struct Broadcast { } impl Broadcast { + fn count_ticks(entries: &[Entry]) -> u64 { + entries.iter().fold(0, |mut sum, e| { + if e.is_tick() { + sum += 1 + } + sum + }) + } + fn run( &mut self, broadcast_table: &[NodeInfo], @@ -54,19 +64,14 @@ impl Broadcast { let now = Instant::now(); let mut num_entries = entries.len(); let mut ventries = Vec::new(); + let mut ticks = Self::count_ticks(&entries); ventries.push(entries); while let Ok(entries) = receiver.try_recv() { num_entries += entries.len(); + ticks += Self::count_ticks(&entries); ventries.push(entries); } - let contains_last_tick = { - if let Some(Some(last)) = ventries.last().map(|entries| entries.last()) { - last.tick_height == self.max_tick_height - } else { - false - } - }; inc_new_counter_info!("broadcast_service-entries_received", num_entries); @@ -74,7 +79,7 @@ impl Broadcast { // Generate the slot heights for all the entries inside ventries // this may span slots if this leader broadcasts for consecutive slots... - let slots = generate_slots(&ventries, leader_scheduler); + let slots = generate_slots(&ventries, self.tick_height + 1, leader_scheduler); let blobs: Vec<_> = ventries .into_par_iter() @@ -100,6 +105,10 @@ impl Broadcast { inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); + assert!(self.tick_height + ticks <= self.max_tick_height); + self.tick_height += ticks; + let contains_last_tick = self.tick_height == self.max_tick_height; + if contains_last_tick { blobs.last().unwrap().write().unwrap().set_is_last_in_slot(); } @@ -144,6 +153,7 @@ impl Broadcast { fn generate_slots( ventries: &[Vec], + mut tick_height: u64, leader_scheduler: &Arc>, ) -> Vec { // Generate the slot heights for all the entries inside ventries @@ -154,12 +164,11 @@ fn generate_slots( let slot_heights: Vec = p .iter() .map(|e| { - let tick_height = if e.is_tick() { - e.tick_height - } else { - e.tick_height + 1 - }; - r_leader_scheduler.tick_height_to_slot(tick_height) + let slot = r_leader_scheduler.tick_height_to_slot(tick_height); + if e.is_tick() { + tick_height += 1; + } + slot }) .collect(); @@ -207,6 +216,7 @@ impl BroadcastService { let mut broadcast = Broadcast { id: me.id, + tick_height: bank.tick_height(), max_tick_height, blob_index, #[cfg(feature = "erasure")]