Hoist EntrySender in ReplayStage

This commit is contained in:
Tyera Eulberg
2019-02-27 11:58:01 -07:00
committed by Grimes
parent 6d1b43f1b1
commit a67a88c8ef

View File

@ -60,12 +60,9 @@ impl ReplayStage {
bank: &Arc<Bank>, bank: &Arc<Bank>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
voting_keypair: &Option<Arc<T>>, voting_keypair: &Option<Arc<T>>,
ledger_entry_sender: &EntrySender,
current_blob_index: &mut u64, current_blob_index: &mut u64,
last_entry_id: &mut Hash, last_entry_id: &mut Hash,
subscriptions: &Arc<RpcSubscriptions>, subscriptions: &Arc<RpcSubscriptions>,
slot: u64,
parent_slot: Option<u64>,
) -> Result<()> { ) -> Result<()> {
// Coalesce all the available entries into a single vote // Coalesce all the available entries into a single vote
submit( submit(
@ -89,16 +86,13 @@ impl ReplayStage {
let num_ticks = bank.tick_height(); let num_ticks = bank.tick_height();
let slot_height = bank.slot_height(); let slot_height = bank.slot_height();
let leader_id = leader_schedule_utils::slot_leader(bank);
let mut num_ticks_to_next_vote = let mut num_ticks_to_next_vote =
leader_schedule_utils::num_ticks_left_in_slot(bank, num_ticks); leader_schedule_utils::num_ticks_left_in_slot(bank, num_ticks);
let mut entry_tick_height = num_ticks;
let mut entries_with_meta = Vec::new();
for (i, entry) in entries.iter().enumerate() { for (i, entry) in entries.iter().enumerate() {
inc_new_counter_info!("replicate-stage_bank-tick", bank.tick_height() as usize); inc_new_counter_info!("replicate-stage_bank-tick", bank.tick_height() as usize);
if entry.is_tick() { if entry.is_tick() {
entry_tick_height += 1;
if num_ticks_to_next_vote == 0 { if num_ticks_to_next_vote == 0 {
num_ticks_to_next_vote = bank.ticks_per_slot(); num_ticks_to_next_vote = bank.ticks_per_slot();
} }
@ -108,14 +102,6 @@ impl ReplayStage {
"replicate-stage_tick-to-vote", "replicate-stage_tick-to-vote",
num_ticks_to_next_vote as usize num_ticks_to_next_vote as usize
); );
entries_with_meta.push(EntryMeta {
tick_height: entry_tick_height,
slot,
slot_leader: leader_id,
num_ticks_left_in_slot: num_ticks_to_next_vote,
parent_slot,
entry: entry.clone(),
});
// If it's the last entry in the vector, i will be vec len - 1. // If it's the last entry in the vector, i will be vec len - 1.
// If we don't process the entry now, the for loop will exit and the entry // If we don't process the entry now, the for loop will exit and the entry
// will be dropped. // will be dropped.
@ -149,7 +135,6 @@ impl ReplayStage {
// If leader rotation happened, only write the entries up to leader rotation. // If leader rotation happened, only write the entries up to leader rotation.
entries.truncate(num_entries_to_write); entries.truncate(num_entries_to_write);
entries_with_meta.truncate(num_entries_to_write);
*last_entry_id = entries *last_entry_id = entries
.last() .last()
.expect("Entries cannot be empty at this point") .expect("Entries cannot be empty at this point")
@ -161,12 +146,6 @@ impl ReplayStage {
); );
let entries_len = entries.len() as u64; let entries_len = entries.len() as u64;
// TODO: In line with previous behavior, this will write all the entries even if
// an error occurred processing one of the entries (causing the rest of the entries to
// not be processed).
if entries_len != 0 {
ledger_entry_sender.send(entries_with_meta)?;
}
*current_blob_index += entries_len; *current_blob_index += entries_len;
res?; res?;
@ -193,7 +172,7 @@ impl ReplayStage {
where where
T: 'static + KeypairUtil + Send + Sync, T: 'static + KeypairUtil + Send + Sync,
{ {
let (ledger_entry_sender, ledger_entry_receiver) = channel(); let (forward_entry_sender, forward_entry_receiver) = channel();
let exit_ = exit.clone(); let exit_ = exit.clone();
let to_leader_sender = to_leader_sender.clone(); let to_leader_sender = to_leader_sender.clone();
let subscriptions_ = subscriptions.clone(); let subscriptions_ = subscriptions.clone();
@ -319,20 +298,26 @@ impl ReplayStage {
vec![] vec![]
} }
}; };
let parent_slot = blocktree.meta(slot).unwrap().map(|meta| meta.parent_slot);
if !entries.is_empty() { if !entries.is_empty() {
if let Err(e) = Self::forward_entries(
&entries,
slot,
current_leader_id,
&bank,
&blocktree,
&forward_entry_sender,
) {
error!("{} forward_entries failed: {:?}", my_id, e);
}
if let Err(e) = Self::process_entries( if let Err(e) = Self::process_entries(
entries, entries,
&bank, &bank,
&cluster_info, &cluster_info,
&voting_keypair, &voting_keypair,
&ledger_entry_sender,
&mut current_blob_index, &mut current_blob_index,
&mut last_entry_id, &mut last_entry_id,
&subscriptions_, &subscriptions_,
slot,
parent_slot,
) { ) {
error!("{} process_entries failed: {:?}", my_id, e); error!("{} process_entries failed: {:?}", my_id, e);
} }
@ -410,7 +395,7 @@ impl ReplayStage {
}) })
.unwrap(); .unwrap();
(Self { t_replay, exit }, ledger_entry_receiver) (Self { t_replay, exit }, forward_entry_receiver)
} }
pub fn close(self) -> thread::Result<()> { pub fn close(self) -> thread::Result<()> {
@ -450,6 +435,40 @@ impl ReplayStage {
let next_slots = blocktree.get_slots_since(&[slot_index]).expect("Db error"); let next_slots = blocktree.get_slots_since(&[slot_index]).expect("Db error");
next_slots.first().cloned() next_slots.first().cloned()
} }
fn forward_entries(
entries: &[Entry],
slot: u64,
slot_leader: Pubkey,
bank: &Arc<Bank>,
blocktree: &Arc<Blocktree>,
forward_entry_sender: &EntrySender,
) -> Result<()> {
let parent_slot = blocktree.meta(slot).unwrap().map(|meta| meta.parent_slot);
let mut entry_tick_height = bank.tick_height();
let mut num_ticks_left_in_slot =
bank.ticks_per_slot() - entry_tick_height % bank.ticks_per_slot() - 1;
let mut entries_with_meta = Vec::new();
for entry in entries.iter() {
if entry.is_tick() {
entry_tick_height += 1;
if num_ticks_left_in_slot == 0 {
num_ticks_left_in_slot = bank.ticks_per_slot();
}
num_ticks_left_in_slot -= 1;
}
entries_with_meta.push(EntryMeta {
tick_height: entry_tick_height,
slot,
slot_leader,
num_ticks_left_in_slot,
parent_slot,
entry: entry.clone(),
});
}
forward_entry_sender.send(entries_with_meta)?;
Ok(())
}
} }
impl Service for ReplayStage { impl Service for ReplayStage {