From f48236837ce1559773eeffe1d943376998b1ad9a Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 16 Feb 2021 09:53:08 +0000 Subject: [PATCH] fill in timing gaps in replay_stage (#14550) (#15197) * fill in timing gaps in replay_stage * add replay_stage bank_count metric * formatting * handle another gap * cleanup wait_receive_time to be more straightforward (cherry picked from commit 935dfdf0f65b500374dc51c5387708760df60c62) Co-authored-by: Jeff Washington (jwash) <75863576+jeffwashington@users.noreply.github.com> --- core/src/replay_stage.rs | 60 ++++++++++++++++++++++++++++++++-------- 1 file changed, 48 insertions(+), 12 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index f6c30e6f3c..1b00046935 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -109,6 +109,7 @@ pub struct ReplayStageConfig { #[derive(Default)] pub struct ReplayTiming { last_print: u64, + collect_frozen_banks_elapsed: u64, compute_bank_stats_elapsed: u64, select_vote_and_reset_forks_elapsed: u64, start_leader_elapsed: u64, @@ -119,11 +120,15 @@ pub struct ReplayTiming { generate_new_bank_forks_elapsed: u64, replay_active_banks_elapsed: u64, reset_duplicate_slots_elapsed: u64, + wait_receive_elapsed: u64, + heaviest_fork_failures_elapsed: u64, + bank_count: u64, } impl ReplayTiming { #[allow(clippy::too_many_arguments)] fn update( &mut self, + collect_frozen_banks_elapsed: u64, compute_bank_stats_elapsed: u64, select_vote_and_reset_forks_elapsed: u64, start_leader_elapsed: u64, @@ -134,7 +139,11 @@ impl ReplayTiming { generate_new_bank_forks_elapsed: u64, replay_active_banks_elapsed: u64, reset_duplicate_slots_elapsed: u64, + wait_receive_elapsed: u64, + heaviest_fork_failures_elapsed: u64, + bank_count: u64, ) { + self.collect_frozen_banks_elapsed += collect_frozen_banks_elapsed; self.compute_bank_stats_elapsed += compute_bank_stats_elapsed; self.select_vote_and_reset_forks_elapsed += select_vote_and_reset_forks_elapsed; self.start_leader_elapsed += start_leader_elapsed; @@ -145,12 +154,20 @@ impl ReplayTiming { self.generate_new_bank_forks_elapsed += generate_new_bank_forks_elapsed; self.replay_active_banks_elapsed += replay_active_banks_elapsed; self.reset_duplicate_slots_elapsed += reset_duplicate_slots_elapsed; + self.wait_receive_elapsed += wait_receive_elapsed; + self.heaviest_fork_failures_elapsed += heaviest_fork_failures_elapsed; + self.bank_count += bank_count; let now = timestamp(); let elapsed_ms = now - self.last_print; if elapsed_ms > 1000 { datapoint_info!( "replay-loop-timing-stats", ("total_elapsed_us", elapsed_ms * 1000, i64), + ( + "collect_frozen_banks_elapsed", + self.collect_frozen_banks_elapsed as i64, + i64 + ), ( "compute_bank_stats_elapsed", self.compute_bank_stats_elapsed as i64, @@ -193,6 +210,17 @@ impl ReplayTiming { self.reset_duplicate_slots_elapsed as i64, i64 ), + ( + "wait_receive_elapsed", + self.wait_receive_elapsed as i64, + i64 + ), + ( + "heaviest_fork_failures_elapsed", + self.heaviest_fork_failures_elapsed as i64, + i64 + ), + ("bank_count", self.bank_count as i64, i64), ); *self = ReplayTiming::default(); @@ -396,6 +424,7 @@ impl ReplayStage { ); select_vote_and_reset_forks_time.stop(); + let mut heaviest_fork_failures_time = Measure::start("heaviest_fork_failures_time"); if tower.is_recent(heaviest_bank.slot()) && !heaviest_fork_failures.is_empty() { info!( "Couldn't vote on heaviest fork: {:?}, heaviest_fork_failures: {:?}", @@ -413,6 +442,7 @@ impl ReplayStage { } } } + heaviest_fork_failures_time.stop(); let start = allocated.get(); @@ -555,7 +585,22 @@ impl ReplayStage { start_leader_time.stop(); Self::report_memory(&allocated, "start_leader", start); + let mut wait_receive_time = Measure::start("wait_receive_time"); + if !did_complete_bank { + // only wait for the signal if we did not just process a bank; maybe there are more slots available + + let timer = Duration::from_millis(100); + let result = ledger_signal_receiver.recv_timeout(timer); + match result { + Err(RecvTimeoutError::Timeout) => (), + Err(_) => break, + Ok(_) => trace!("blockstore signal"), + }; + } + wait_receive_time.stop(); + replay_timing.update( + collect_frozen_banks_time.as_us(), compute_bank_stats_time.as_us(), select_vote_and_reset_forks_time.as_us(), start_leader_time.as_us(), @@ -566,19 +611,10 @@ impl ReplayStage { generate_new_bank_forks_time.as_us(), replay_active_banks_time.as_us(), reset_duplicate_slots_time.as_us(), + wait_receive_time.as_us(), + heaviest_fork_failures_time.as_us(), + if did_complete_bank {1} else {0}, ); - - if did_complete_bank { - //just processed a bank, skip the signal; maybe there's more slots available - continue; - } - let timer = Duration::from_millis(100); - let result = ledger_signal_receiver.recv_timeout(timer); - match result { - Err(RecvTimeoutError::Timeout) => continue, - Err(_) => break, - Ok(_) => trace!("blockstore signal"), - }; } Ok(()) })