fill in timing gaps in replay_stage (#14550)

* fill in timing gaps in replay_stage

* add replay_stage bank_count metric

* formatting

* handle another gap

* cleanup wait_receive_time to be more straightforward
This commit is contained in:
Jeff Washington (jwash)
2021-01-13 10:08:53 -06:00
committed by GitHub
parent e95ebcf864
commit 935dfdf0f6

View File

@ -111,6 +111,7 @@ pub struct ReplayStageConfig {
#[derive(Default)] #[derive(Default)]
pub struct ReplayTiming { pub struct ReplayTiming {
last_print: u64, last_print: u64,
collect_frozen_banks_elapsed: u64,
compute_bank_stats_elapsed: u64, compute_bank_stats_elapsed: u64,
select_vote_and_reset_forks_elapsed: u64, select_vote_and_reset_forks_elapsed: u64,
start_leader_elapsed: u64, start_leader_elapsed: u64,
@ -121,11 +122,15 @@ pub struct ReplayTiming {
generate_new_bank_forks_elapsed: u64, generate_new_bank_forks_elapsed: u64,
replay_active_banks_elapsed: u64, replay_active_banks_elapsed: u64,
reset_duplicate_slots_elapsed: u64, reset_duplicate_slots_elapsed: u64,
wait_receive_elapsed: u64,
heaviest_fork_failures_elapsed: u64,
bank_count: u64,
} }
impl ReplayTiming { impl ReplayTiming {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn update( fn update(
&mut self, &mut self,
collect_frozen_banks_elapsed: u64,
compute_bank_stats_elapsed: u64, compute_bank_stats_elapsed: u64,
select_vote_and_reset_forks_elapsed: u64, select_vote_and_reset_forks_elapsed: u64,
start_leader_elapsed: u64, start_leader_elapsed: u64,
@ -136,7 +141,11 @@ impl ReplayTiming {
generate_new_bank_forks_elapsed: u64, generate_new_bank_forks_elapsed: u64,
replay_active_banks_elapsed: u64, replay_active_banks_elapsed: u64,
reset_duplicate_slots_elapsed: u64, reset_duplicate_slots_elapsed: u64,
wait_receive_elapsed: u64,
heaviest_fork_failures_elapsed: u64,
bank_count: u64,
) { ) {
self.collect_frozen_banks_elapsed += collect_frozen_banks_elapsed;
self.compute_bank_stats_elapsed += compute_bank_stats_elapsed; self.compute_bank_stats_elapsed += compute_bank_stats_elapsed;
self.select_vote_and_reset_forks_elapsed += select_vote_and_reset_forks_elapsed; self.select_vote_and_reset_forks_elapsed += select_vote_and_reset_forks_elapsed;
self.start_leader_elapsed += start_leader_elapsed; self.start_leader_elapsed += start_leader_elapsed;
@ -147,12 +156,20 @@ impl ReplayTiming {
self.generate_new_bank_forks_elapsed += generate_new_bank_forks_elapsed; self.generate_new_bank_forks_elapsed += generate_new_bank_forks_elapsed;
self.replay_active_banks_elapsed += replay_active_banks_elapsed; self.replay_active_banks_elapsed += replay_active_banks_elapsed;
self.reset_duplicate_slots_elapsed += reset_duplicate_slots_elapsed; self.reset_duplicate_slots_elapsed += reset_duplicate_slots_elapsed;
self.wait_receive_elapsed += wait_receive_elapsed;
self.heaviest_fork_failures_elapsed += heaviest_fork_failures_elapsed;
self.bank_count += bank_count;
let now = timestamp(); let now = timestamp();
let elapsed_ms = now - self.last_print; let elapsed_ms = now - self.last_print;
if elapsed_ms > 1000 { if elapsed_ms > 1000 {
datapoint_info!( datapoint_info!(
"replay-loop-timing-stats", "replay-loop-timing-stats",
("total_elapsed_us", elapsed_ms * 1000, i64), ("total_elapsed_us", elapsed_ms * 1000, i64),
(
"collect_frozen_banks_elapsed",
self.collect_frozen_banks_elapsed as i64,
i64
),
( (
"compute_bank_stats_elapsed", "compute_bank_stats_elapsed",
self.compute_bank_stats_elapsed as i64, self.compute_bank_stats_elapsed as i64,
@ -195,6 +212,17 @@ impl ReplayTiming {
self.reset_duplicate_slots_elapsed as i64, self.reset_duplicate_slots_elapsed as i64,
i64 i64
), ),
(
"wait_receive_elapsed",
self.wait_receive_elapsed as i64,
i64
),
(
"heaviest_fork_failures_elapsed",
self.heaviest_fork_failures_elapsed as i64,
i64
),
("bank_count", self.bank_count as i64, i64),
); );
*self = ReplayTiming::default(); *self = ReplayTiming::default();
@ -401,6 +429,7 @@ impl ReplayStage {
); );
select_vote_and_reset_forks_time.stop(); select_vote_and_reset_forks_time.stop();
let mut heaviest_fork_failures_time = Measure::start("heaviest_fork_failures_time");
if tower.is_recent(heaviest_bank.slot()) && !heaviest_fork_failures.is_empty() { if tower.is_recent(heaviest_bank.slot()) && !heaviest_fork_failures.is_empty() {
info!( info!(
"Couldn't vote on heaviest fork: {:?}, heaviest_fork_failures: {:?}", "Couldn't vote on heaviest fork: {:?}, heaviest_fork_failures: {:?}",
@ -418,6 +447,7 @@ impl ReplayStage {
} }
} }
} }
heaviest_fork_failures_time.stop();
let start = allocated.get(); let start = allocated.get();
@ -561,7 +591,22 @@ impl ReplayStage {
start_leader_time.stop(); start_leader_time.stop();
Self::report_memory(&allocated, "start_leader", start); Self::report_memory(&allocated, "start_leader", start);
let mut wait_receive_time = Measure::start("wait_receive_time");
if !did_complete_bank {
// only wait for the signal if we did not just process a bank; maybe there are more slots available
let timer = Duration::from_millis(100);
let result = ledger_signal_receiver.recv_timeout(timer);
match result {
Err(RecvTimeoutError::Timeout) => (),
Err(_) => break,
Ok(_) => trace!("blockstore signal"),
};
}
wait_receive_time.stop();
replay_timing.update( replay_timing.update(
collect_frozen_banks_time.as_us(),
compute_bank_stats_time.as_us(), compute_bank_stats_time.as_us(),
select_vote_and_reset_forks_time.as_us(), select_vote_and_reset_forks_time.as_us(),
start_leader_time.as_us(), start_leader_time.as_us(),
@ -572,19 +617,10 @@ impl ReplayStage {
generate_new_bank_forks_time.as_us(), generate_new_bank_forks_time.as_us(),
replay_active_banks_time.as_us(), replay_active_banks_time.as_us(),
reset_duplicate_slots_time.as_us(), reset_duplicate_slots_time.as_us(),
wait_receive_time.as_us(),
heaviest_fork_failures_time.as_us(),
if did_complete_bank {1} else {0},
); );
if did_complete_bank {
//just processed a bank, skip the signal; maybe there's more slots available
continue;
}
let timer = Duration::from_millis(100);
let result = ledger_signal_receiver.recv_timeout(timer);
match result {
Err(RecvTimeoutError::Timeout) => continue,
Err(_) => break,
Ok(_) => trace!("blockstore signal"),
};
} }
Ok(()) Ok(())
}) })