Remove dependency on Entry::tick_height

This commit is contained in:
Michael Vines
2019-02-19 21:29:11 -08:00
parent 8daba3e563
commit 77cb70dd80

View File

@ -33,6 +33,7 @@ pub enum BroadcastServiceReturnType {
struct Broadcast { struct Broadcast {
id: Pubkey, id: Pubkey,
tick_height: u64,
max_tick_height: u64, max_tick_height: u64,
blob_index: u64, blob_index: u64,
@ -41,6 +42,15 @@ struct Broadcast {
} }
impl Broadcast { impl Broadcast {
fn count_ticks(entries: &[Entry]) -> u64 {
entries.iter().fold(0, |mut sum, e| {
if e.is_tick() {
sum += 1
}
sum
})
}
fn run( fn run(
&mut self, &mut self,
broadcast_table: &[NodeInfo], broadcast_table: &[NodeInfo],
@ -54,19 +64,14 @@ impl Broadcast {
let now = Instant::now(); let now = Instant::now();
let mut num_entries = entries.len(); let mut num_entries = entries.len();
let mut ventries = Vec::new(); let mut ventries = Vec::new();
let mut ticks = Self::count_ticks(&entries);
ventries.push(entries); ventries.push(entries);
while let Ok(entries) = receiver.try_recv() { while let Ok(entries) = receiver.try_recv() {
num_entries += entries.len(); num_entries += entries.len();
ticks += Self::count_ticks(&entries);
ventries.push(entries); ventries.push(entries);
} }
let contains_last_tick = {
if let Some(Some(last)) = ventries.last().map(|entries| entries.last()) {
last.tick_height == self.max_tick_height
} else {
false
}
};
inc_new_counter_info!("broadcast_service-entries_received", num_entries); inc_new_counter_info!("broadcast_service-entries_received", num_entries);
@ -74,7 +79,7 @@ impl Broadcast {
// Generate the slot heights for all the entries inside ventries // Generate the slot heights for all the entries inside ventries
// this may span slots if this leader broadcasts for consecutive slots... // this may span slots if this leader broadcasts for consecutive slots...
let slots = generate_slots(&ventries, leader_scheduler); let slots = generate_slots(&ventries, self.tick_height + 1, leader_scheduler);
let blobs: Vec<_> = ventries let blobs: Vec<_> = ventries
.into_par_iter() .into_par_iter()
@ -100,6 +105,10 @@ impl Broadcast {
inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
assert!(self.tick_height + ticks <= self.max_tick_height);
self.tick_height += ticks;
let contains_last_tick = self.tick_height == self.max_tick_height;
if contains_last_tick { if contains_last_tick {
blobs.last().unwrap().write().unwrap().set_is_last_in_slot(); blobs.last().unwrap().write().unwrap().set_is_last_in_slot();
} }
@ -144,6 +153,7 @@ impl Broadcast {
fn generate_slots( fn generate_slots(
ventries: &[Vec<Entry>], ventries: &[Vec<Entry>],
mut tick_height: u64,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>, leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Vec<u64> { ) -> Vec<u64> {
// Generate the slot heights for all the entries inside ventries // Generate the slot heights for all the entries inside ventries
@ -154,12 +164,11 @@ fn generate_slots(
let slot_heights: Vec<u64> = p let slot_heights: Vec<u64> = p
.iter() .iter()
.map(|e| { .map(|e| {
let tick_height = if e.is_tick() { let slot = r_leader_scheduler.tick_height_to_slot(tick_height);
e.tick_height if e.is_tick() {
} else { tick_height += 1;
e.tick_height + 1 }
}; slot
r_leader_scheduler.tick_height_to_slot(tick_height)
}) })
.collect(); .collect();
@ -207,6 +216,7 @@ impl BroadcastService {
let mut broadcast = Broadcast { let mut broadcast = Broadcast {
id: me.id, id: me.id,
tick_height: bank.tick_height(),
max_tick_height, max_tick_height,
blob_index, blob_index,
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]