more replay_stage grooming (#5163)

This commit is contained in:
Rob Walker
2019-07-18 14:54:27 -07:00
committed by GitHub
parent d47caf2af8
commit afa05acb32
5 changed files with 56 additions and 62 deletions

View File

@ -239,10 +239,10 @@ impl BankingStage {
}
fn consume_or_forward_packets(
my_pubkey: &Pubkey,
leader_pubkey: Option<Pubkey>,
bank_is_available: bool,
would_be_leader: bool,
my_pubkey: &Pubkey,
) -> BufferedPacketsDecision {
leader_pubkey.map_or(
// If leader is not known, return the buffered packets as is
@ -275,18 +275,24 @@ impl BankingStage {
enable_forwarding: bool,
batch_limit: usize,
) -> Result<()> {
let decision = {
let (poh_next_slot_leader, poh_has_bank, would_be_leader) = {
let poh = poh_recorder.lock().unwrap();
Self::consume_or_forward_packets(
(
poh.next_slot_leader(),
poh.bank().is_some(),
poh.has_bank(),
poh.would_be_leader(
(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1) * DEFAULT_TICKS_PER_SLOT,
),
my_pubkey,
)
};
let decision = Self::consume_or_forward_packets(
my_pubkey,
poh_next_slot_leader,
poh_has_bank,
would_be_leader,
);
match decision {
BufferedPacketsDecision::Consume => {
let mut unprocessed = Self::consume_buffered_packets(
@ -300,9 +306,10 @@ impl BankingStage {
}
BufferedPacketsDecision::Forward => {
if enable_forwarding {
let poh = poh_recorder.lock().unwrap();
let next_leader =
poh.leader_after_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET);
let next_leader = poh_recorder
.lock()
.unwrap()
.leader_after_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET);
next_leader.map_or(Ok(()), |leader_pubkey| {
let leader_addr = {
cluster_info
@ -1507,60 +1514,60 @@ mod tests {
let my_pubkey1 = Pubkey::new_rand();
assert_eq!(
BankingStage::consume_or_forward_packets(None, true, false, &my_pubkey),
BankingStage::consume_or_forward_packets(&my_pubkey, None, true, false,),
BufferedPacketsDecision::Hold
);
assert_eq!(
BankingStage::consume_or_forward_packets(None, false, false, &my_pubkey),
BankingStage::consume_or_forward_packets(&my_pubkey, None, false, false),
BufferedPacketsDecision::Hold
);
assert_eq!(
BankingStage::consume_or_forward_packets(None, false, false, &my_pubkey1),
BankingStage::consume_or_forward_packets(&my_pubkey1, None, false, false),
BufferedPacketsDecision::Hold
);
assert_eq!(
BankingStage::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1.clone()),
false,
false,
&my_pubkey
),
BufferedPacketsDecision::Forward
);
assert_eq!(
BankingStage::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1.clone()),
false,
true,
&my_pubkey
),
BufferedPacketsDecision::Hold
);
assert_eq!(
BankingStage::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1.clone()),
true,
false,
&my_pubkey
),
BufferedPacketsDecision::Consume
);
assert_eq!(
BankingStage::consume_or_forward_packets(
&my_pubkey1,
Some(my_pubkey1.clone()),
false,
false,
&my_pubkey1
),
BufferedPacketsDecision::Hold
);
assert_eq!(
BankingStage::consume_or_forward_packets(
&my_pubkey1,
Some(my_pubkey1.clone()),
true,
false,
&my_pubkey1
),
BufferedPacketsDecision::Consume
);

View File

@ -54,7 +54,7 @@ impl ClusterInfoVoteListener {
return Ok(());
}
let (votes, new_ts) = cluster_info.read().unwrap().get_votes(last_ts);
if poh_recorder.lock().unwrap().bank().is_some() {
if poh_recorder.lock().unwrap().has_bank() {
last_ts = new_ts;
inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len());
let msgs = packet::to_packets(&votes);

View File

@ -33,10 +33,6 @@ pub fn num_ticks_left_in_slot(bank: &Bank, tick_height: u64) -> u64 {
bank.ticks_per_slot() - tick_height % bank.ticks_per_slot() - 1
}
pub fn tick_height_to_slot(ticks_per_slot: u64, tick_height: u64) -> u64 {
tick_height / ticks_per_slot
}
fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) {
// Sort first by stake. If stakes are the same, sort by pubkey to ensure a
// deterministic result.

View File

@ -101,6 +101,7 @@ impl PohRecorder {
self.leader_schedule_cache
.slot_leader_at(self.tick_height / self.ticks_per_slot + slots, None)
}
pub fn next_slot_leader(&self) -> Option<Pubkey> {
self.leader_after_slots(1)
}

View File

@ -105,11 +105,13 @@ impl ReplayStage {
let leader_schedule_cache = leader_schedule_cache.clone();
let vote_account = *vote_account;
let voting_keypair = voting_keypair.cloned();
let t_replay = Builder::new()
.name("solana-replay-stage".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit_.clone());
let mut progress = HashMap::new();
loop {
let now = Instant::now();
// Stop getting entries if we get exit signal
@ -123,6 +125,8 @@ impl ReplayStage {
&leader_schedule_cache,
);
let mut tpu_has_bank = poh_recorder.lock().unwrap().has_bank();
let did_complete_bank = Self::replay_active_banks(
&blocktree,
&bank_forks,
@ -156,15 +160,17 @@ impl ReplayStage {
&poh_recorder,
&leader_schedule_cache,
);
assert!(!poh_recorder.lock().unwrap().has_bank());
tpu_has_bank = false;
}
Self::maybe_start_leader(
&my_pubkey,
&bank_forks,
&poh_recorder,
&leader_schedule_cache,
);
if !tpu_has_bank {
Self::maybe_start_leader(
&my_pubkey,
&bank_forks,
&poh_recorder,
&leader_schedule_cache,
);
}
inc_new_counter_info!(
"replicate_stage-duration",
@ -194,37 +200,19 @@ impl ReplayStage {
poh_recorder: &Arc<Mutex<PohRecorder>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) {
let (grace_ticks, poh_slot, parent_slot) = {
let poh_recorder = poh_recorder.lock().unwrap();
// all the individual calls to poh_recorder.lock() are designed to
// increase granularity, decrease contention
// we're done
if poh_recorder.has_bank() {
trace!("{} poh_recorder already has a bank", my_pubkey);
return;
}
assert!(!poh_recorder.lock().unwrap().has_bank());
let (reached_leader_tick, grace_ticks, poh_slot, parent_slot) =
poh_recorder.reached_leader_tick();
let (reached_leader_tick, grace_ticks, poh_slot, parent_slot) =
poh_recorder.lock().unwrap().reached_leader_tick();
if !reached_leader_tick {
trace!("{} poh_recorder hasn't reached_leader_tick", my_pubkey);
return;
}
(grace_ticks, poh_slot, parent_slot)
};
trace!(
"{} reached_leader_tick, poh_slot: {} parent_slot: {}",
my_pubkey,
poh_slot,
parent_slot,
);
if bank_forks.read().unwrap().get(poh_slot).is_some() {
warn!("{} already have bank in forks at {}", my_pubkey, poh_slot);
if !reached_leader_tick {
trace!("{} poh_recorder hasn't reached_leader_tick", my_pubkey);
return;
}
trace!("{} reached_leader_tick", my_pubkey,);
let parent = bank_forks
.read()
@ -233,16 +221,18 @@ impl ReplayStage {
.expect("parent_slot doesn't exist in bank forks")
.clone();
// the parent was still in poh_recorder last time we looked for votable banks
// break out and re-run the consensus loop above
if !parent.is_frozen() {
trace!(
"{} parent {} isn't frozen, must be re-considered",
my_pubkey,
parent.slot()
);
assert!(parent.is_frozen());
if bank_forks.read().unwrap().get(poh_slot).is_some() {
warn!("{} already have bank in forks at {}?", my_pubkey, poh_slot);
return;
}
trace!(
"{} poh_slot {} parent_slot {}",
my_pubkey,
poh_slot,
parent_slot
);
if let Some(next_leader) = leader_schedule_cache.slot_leader_at(poh_slot, Some(&parent)) {
trace!(