* fill in timing gaps in replay_stage
* add replay_stage bank_count metric
* formatting
* handle another gap
* cleanup wait_receive_time to be more straightforward
(cherry picked from commit 935dfdf0f6
)
Co-authored-by: Jeff Washington (jwash) <75863576+jeffwashington@users.noreply.github.com>
This commit is contained in:
@ -109,6 +109,7 @@ pub struct ReplayStageConfig {
|
|||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct ReplayTiming {
|
pub struct ReplayTiming {
|
||||||
last_print: u64,
|
last_print: u64,
|
||||||
|
collect_frozen_banks_elapsed: u64,
|
||||||
compute_bank_stats_elapsed: u64,
|
compute_bank_stats_elapsed: u64,
|
||||||
select_vote_and_reset_forks_elapsed: u64,
|
select_vote_and_reset_forks_elapsed: u64,
|
||||||
start_leader_elapsed: u64,
|
start_leader_elapsed: u64,
|
||||||
@ -119,11 +120,15 @@ pub struct ReplayTiming {
|
|||||||
generate_new_bank_forks_elapsed: u64,
|
generate_new_bank_forks_elapsed: u64,
|
||||||
replay_active_banks_elapsed: u64,
|
replay_active_banks_elapsed: u64,
|
||||||
reset_duplicate_slots_elapsed: u64,
|
reset_duplicate_slots_elapsed: u64,
|
||||||
|
wait_receive_elapsed: u64,
|
||||||
|
heaviest_fork_failures_elapsed: u64,
|
||||||
|
bank_count: u64,
|
||||||
}
|
}
|
||||||
impl ReplayTiming {
|
impl ReplayTiming {
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
fn update(
|
fn update(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
collect_frozen_banks_elapsed: u64,
|
||||||
compute_bank_stats_elapsed: u64,
|
compute_bank_stats_elapsed: u64,
|
||||||
select_vote_and_reset_forks_elapsed: u64,
|
select_vote_and_reset_forks_elapsed: u64,
|
||||||
start_leader_elapsed: u64,
|
start_leader_elapsed: u64,
|
||||||
@ -134,7 +139,11 @@ impl ReplayTiming {
|
|||||||
generate_new_bank_forks_elapsed: u64,
|
generate_new_bank_forks_elapsed: u64,
|
||||||
replay_active_banks_elapsed: u64,
|
replay_active_banks_elapsed: u64,
|
||||||
reset_duplicate_slots_elapsed: u64,
|
reset_duplicate_slots_elapsed: u64,
|
||||||
|
wait_receive_elapsed: u64,
|
||||||
|
heaviest_fork_failures_elapsed: u64,
|
||||||
|
bank_count: u64,
|
||||||
) {
|
) {
|
||||||
|
self.collect_frozen_banks_elapsed += collect_frozen_banks_elapsed;
|
||||||
self.compute_bank_stats_elapsed += compute_bank_stats_elapsed;
|
self.compute_bank_stats_elapsed += compute_bank_stats_elapsed;
|
||||||
self.select_vote_and_reset_forks_elapsed += select_vote_and_reset_forks_elapsed;
|
self.select_vote_and_reset_forks_elapsed += select_vote_and_reset_forks_elapsed;
|
||||||
self.start_leader_elapsed += start_leader_elapsed;
|
self.start_leader_elapsed += start_leader_elapsed;
|
||||||
@ -145,12 +154,20 @@ impl ReplayTiming {
|
|||||||
self.generate_new_bank_forks_elapsed += generate_new_bank_forks_elapsed;
|
self.generate_new_bank_forks_elapsed += generate_new_bank_forks_elapsed;
|
||||||
self.replay_active_banks_elapsed += replay_active_banks_elapsed;
|
self.replay_active_banks_elapsed += replay_active_banks_elapsed;
|
||||||
self.reset_duplicate_slots_elapsed += reset_duplicate_slots_elapsed;
|
self.reset_duplicate_slots_elapsed += reset_duplicate_slots_elapsed;
|
||||||
|
self.wait_receive_elapsed += wait_receive_elapsed;
|
||||||
|
self.heaviest_fork_failures_elapsed += heaviest_fork_failures_elapsed;
|
||||||
|
self.bank_count += bank_count;
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
let elapsed_ms = now - self.last_print;
|
let elapsed_ms = now - self.last_print;
|
||||||
if elapsed_ms > 1000 {
|
if elapsed_ms > 1000 {
|
||||||
datapoint_info!(
|
datapoint_info!(
|
||||||
"replay-loop-timing-stats",
|
"replay-loop-timing-stats",
|
||||||
("total_elapsed_us", elapsed_ms * 1000, i64),
|
("total_elapsed_us", elapsed_ms * 1000, i64),
|
||||||
|
(
|
||||||
|
"collect_frozen_banks_elapsed",
|
||||||
|
self.collect_frozen_banks_elapsed as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
(
|
(
|
||||||
"compute_bank_stats_elapsed",
|
"compute_bank_stats_elapsed",
|
||||||
self.compute_bank_stats_elapsed as i64,
|
self.compute_bank_stats_elapsed as i64,
|
||||||
@ -193,6 +210,17 @@ impl ReplayTiming {
|
|||||||
self.reset_duplicate_slots_elapsed as i64,
|
self.reset_duplicate_slots_elapsed as i64,
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"wait_receive_elapsed",
|
||||||
|
self.wait_receive_elapsed as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"heaviest_fork_failures_elapsed",
|
||||||
|
self.heaviest_fork_failures_elapsed as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
("bank_count", self.bank_count as i64, i64),
|
||||||
);
|
);
|
||||||
|
|
||||||
*self = ReplayTiming::default();
|
*self = ReplayTiming::default();
|
||||||
@ -396,6 +424,7 @@ impl ReplayStage {
|
|||||||
);
|
);
|
||||||
select_vote_and_reset_forks_time.stop();
|
select_vote_and_reset_forks_time.stop();
|
||||||
|
|
||||||
|
let mut heaviest_fork_failures_time = Measure::start("heaviest_fork_failures_time");
|
||||||
if tower.is_recent(heaviest_bank.slot()) && !heaviest_fork_failures.is_empty() {
|
if tower.is_recent(heaviest_bank.slot()) && !heaviest_fork_failures.is_empty() {
|
||||||
info!(
|
info!(
|
||||||
"Couldn't vote on heaviest fork: {:?}, heaviest_fork_failures: {:?}",
|
"Couldn't vote on heaviest fork: {:?}, heaviest_fork_failures: {:?}",
|
||||||
@ -413,6 +442,7 @@ impl ReplayStage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
heaviest_fork_failures_time.stop();
|
||||||
|
|
||||||
let start = allocated.get();
|
let start = allocated.get();
|
||||||
|
|
||||||
@ -555,7 +585,22 @@ impl ReplayStage {
|
|||||||
start_leader_time.stop();
|
start_leader_time.stop();
|
||||||
Self::report_memory(&allocated, "start_leader", start);
|
Self::report_memory(&allocated, "start_leader", start);
|
||||||
|
|
||||||
|
let mut wait_receive_time = Measure::start("wait_receive_time");
|
||||||
|
if !did_complete_bank {
|
||||||
|
// only wait for the signal if we did not just process a bank; maybe there are more slots available
|
||||||
|
|
||||||
|
let timer = Duration::from_millis(100);
|
||||||
|
let result = ledger_signal_receiver.recv_timeout(timer);
|
||||||
|
match result {
|
||||||
|
Err(RecvTimeoutError::Timeout) => (),
|
||||||
|
Err(_) => break,
|
||||||
|
Ok(_) => trace!("blockstore signal"),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
wait_receive_time.stop();
|
||||||
|
|
||||||
replay_timing.update(
|
replay_timing.update(
|
||||||
|
collect_frozen_banks_time.as_us(),
|
||||||
compute_bank_stats_time.as_us(),
|
compute_bank_stats_time.as_us(),
|
||||||
select_vote_and_reset_forks_time.as_us(),
|
select_vote_and_reset_forks_time.as_us(),
|
||||||
start_leader_time.as_us(),
|
start_leader_time.as_us(),
|
||||||
@ -566,19 +611,10 @@ impl ReplayStage {
|
|||||||
generate_new_bank_forks_time.as_us(),
|
generate_new_bank_forks_time.as_us(),
|
||||||
replay_active_banks_time.as_us(),
|
replay_active_banks_time.as_us(),
|
||||||
reset_duplicate_slots_time.as_us(),
|
reset_duplicate_slots_time.as_us(),
|
||||||
|
wait_receive_time.as_us(),
|
||||||
|
heaviest_fork_failures_time.as_us(),
|
||||||
|
if did_complete_bank {1} else {0},
|
||||||
);
|
);
|
||||||
|
|
||||||
if did_complete_bank {
|
|
||||||
//just processed a bank, skip the signal; maybe there's more slots available
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let timer = Duration::from_millis(100);
|
|
||||||
let result = ledger_signal_receiver.recv_timeout(timer);
|
|
||||||
match result {
|
|
||||||
Err(RecvTimeoutError::Timeout) => continue,
|
|
||||||
Err(_) => break,
|
|
||||||
Ok(_) => trace!("blockstore signal"),
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
|
Reference in New Issue
Block a user