From a67a88c8ef756900222c31056363a535d37025e4 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Wed, 27 Feb 2019 11:58:01 -0700 Subject: [PATCH] Hoist EntrySender in ReplayStage --- src/replay_stage.rs | 75 ++++++++++++++++++++++++++++----------------- 1 file changed, 47 insertions(+), 28 deletions(-) diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 45e4e2cf4f..1eaa59d31e 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -60,12 +60,9 @@ impl ReplayStage { bank: &Arc, cluster_info: &Arc>, voting_keypair: &Option>, - ledger_entry_sender: &EntrySender, current_blob_index: &mut u64, last_entry_id: &mut Hash, subscriptions: &Arc, - slot: u64, - parent_slot: Option, ) -> Result<()> { // Coalesce all the available entries into a single vote submit( @@ -89,16 +86,13 @@ impl ReplayStage { let num_ticks = bank.tick_height(); let slot_height = bank.slot_height(); - let leader_id = leader_schedule_utils::slot_leader(bank); + let mut num_ticks_to_next_vote = leader_schedule_utils::num_ticks_left_in_slot(bank, num_ticks); - let mut entry_tick_height = num_ticks; - let mut entries_with_meta = Vec::new(); for (i, entry) in entries.iter().enumerate() { inc_new_counter_info!("replicate-stage_bank-tick", bank.tick_height() as usize); if entry.is_tick() { - entry_tick_height += 1; if num_ticks_to_next_vote == 0 { num_ticks_to_next_vote = bank.ticks_per_slot(); } @@ -108,14 +102,6 @@ impl ReplayStage { "replicate-stage_tick-to-vote", num_ticks_to_next_vote as usize ); - entries_with_meta.push(EntryMeta { - tick_height: entry_tick_height, - slot, - slot_leader: leader_id, - num_ticks_left_in_slot: num_ticks_to_next_vote, - parent_slot, - entry: entry.clone(), - }); // If it's the last entry in the vector, i will be vec len - 1. // If we don't process the entry now, the for loop will exit and the entry // will be dropped. @@ -149,7 +135,6 @@ impl ReplayStage { // If leader rotation happened, only write the entries up to leader rotation. entries.truncate(num_entries_to_write); - entries_with_meta.truncate(num_entries_to_write); *last_entry_id = entries .last() .expect("Entries cannot be empty at this point") @@ -161,12 +146,6 @@ impl ReplayStage { ); let entries_len = entries.len() as u64; - // TODO: In line with previous behavior, this will write all the entries even if - // an error occurred processing one of the entries (causing the rest of the entries to - // not be processed). - if entries_len != 0 { - ledger_entry_sender.send(entries_with_meta)?; - } *current_blob_index += entries_len; res?; @@ -193,7 +172,7 @@ impl ReplayStage { where T: 'static + KeypairUtil + Send + Sync, { - let (ledger_entry_sender, ledger_entry_receiver) = channel(); + let (forward_entry_sender, forward_entry_receiver) = channel(); let exit_ = exit.clone(); let to_leader_sender = to_leader_sender.clone(); let subscriptions_ = subscriptions.clone(); @@ -319,20 +298,26 @@ impl ReplayStage { vec![] } }; - let parent_slot = blocktree.meta(slot).unwrap().map(|meta| meta.parent_slot); if !entries.is_empty() { + if let Err(e) = Self::forward_entries( + &entries, + slot, + current_leader_id, + &bank, + &blocktree, + &forward_entry_sender, + ) { + error!("{} forward_entries failed: {:?}", my_id, e); + } if let Err(e) = Self::process_entries( entries, &bank, &cluster_info, &voting_keypair, - &ledger_entry_sender, &mut current_blob_index, &mut last_entry_id, &subscriptions_, - slot, - parent_slot, ) { error!("{} process_entries failed: {:?}", my_id, e); } @@ -410,7 +395,7 @@ impl ReplayStage { }) .unwrap(); - (Self { t_replay, exit }, ledger_entry_receiver) + (Self { t_replay, exit }, forward_entry_receiver) } pub fn close(self) -> thread::Result<()> { @@ -450,6 +435,40 @@ impl ReplayStage { let next_slots = blocktree.get_slots_since(&[slot_index]).expect("Db error"); next_slots.first().cloned() } + + fn forward_entries( + entries: &[Entry], + slot: u64, + slot_leader: Pubkey, + bank: &Arc, + blocktree: &Arc, + forward_entry_sender: &EntrySender, + ) -> Result<()> { + let parent_slot = blocktree.meta(slot).unwrap().map(|meta| meta.parent_slot); + let mut entry_tick_height = bank.tick_height(); + let mut num_ticks_left_in_slot = + bank.ticks_per_slot() - entry_tick_height % bank.ticks_per_slot() - 1; + let mut entries_with_meta = Vec::new(); + for entry in entries.iter() { + if entry.is_tick() { + entry_tick_height += 1; + if num_ticks_left_in_slot == 0 { + num_ticks_left_in_slot = bank.ticks_per_slot(); + } + num_ticks_left_in_slot -= 1; + } + entries_with_meta.push(EntryMeta { + tick_height: entry_tick_height, + slot, + slot_leader, + num_ticks_left_in_slot, + parent_slot, + entry: entry.clone(), + }); + } + forward_entry_sender.send(entries_with_meta)?; + Ok(()) + } } impl Service for ReplayStage {