From e04d2379df51742a902f70d6c2ddbe68094addda Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Thu, 28 Feb 2019 00:08:50 -0700 Subject: [PATCH] Remove bank dependency from forward_entries --- src/replay_stage.rs | 44 +++++++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/src/replay_stage.rs b/src/replay_stage.rs index cfba53ed1d..e5a77cb845 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -173,6 +173,7 @@ impl ReplayStage { T: 'static + KeypairUtil + Send + Sync, { let (forward_entry_sender, forward_entry_receiver) = channel(); + // let (_slot_full_sender, _slot_full_receiver) = channel(); let exit_ = exit.clone(); let to_leader_sender = to_leader_sender.clone(); let subscriptions_ = subscriptions.clone(); @@ -300,17 +301,17 @@ impl ReplayStage { }; if !entries.is_empty() { - if let Err(e) = Self::forward_entries( - entries.clone(), - slot, - current_leader_id, - bank.ticks_per_slot(), - bank.tick_height(), - &blocktree, - &forward_entry_sender, - ) { - error!("{} forward_entries failed: {:?}", my_id, e); - } + // if let Err(e) = Self::forward_entries( + // entries.clone(), + // slot, + // current_leader_id, + // bank.ticks_per_slot(), + // bank.tick_height(), + // &blocktree, + // &forward_entry_sender, + // ) { + // error!("{} forward_entries failed: {:?}", my_id, e); + // } if let Err(e) = Self::process_entries( entries, &bank, @@ -329,6 +330,17 @@ impl ReplayStage { // We've reached the end of a slot, reset our state and check // for leader rotation if max_tick_height_for_slot == current_tick_height { + let entries_to_stream = blocktree.get_slot_entries(slot, 0, None).unwrap(); + if let Err(e) = Self::forward_entries( + entries_to_stream, + slot, + current_leader_id, + &blocktree, + &forward_entry_sender, + ) { + error!("{} forward_entries failed: {:?}", my_id, e); + } + // Check for leader rotation let (leader_id, next_slot) = { let slot = (current_tick_height + 1) / bank.ticks_per_slot(); @@ -441,8 +453,6 @@ impl ReplayStage { entries: Vec, slot: u64, slot_leader: Pubkey, - ticks_per_slot: u64, - bank_tick_height: u64, blocktree: &Arc, forward_entry_sender: &EntrySender, ) -> Result<()> { @@ -452,13 +462,17 @@ impl ReplayStage { } else { Some(blocktree_meta.parent_slot) }; - let mut tick_height = bank_tick_height; + let ticks_per_slot = entries + .iter() + .filter(|entry| entry.is_tick()) + .fold(0, |acc, _| acc + 1); + let mut tick_height = ticks_per_slot * slot - 1; let mut entries_with_meta = Vec::new(); for entry in entries.into_iter() { if entry.is_tick() { tick_height += 1; } - let is_end_of_slot = (tick_height + 1 % ticks_per_slot) == 0; + let is_end_of_slot = (tick_height + 1) % ticks_per_slot == 0; let entry_meta = EntryMeta { tick_height, slot,