adds counters for errors in window-service run_insert (#20670)
This commit is contained in:
@ -50,6 +50,12 @@ struct WindowServiceMetrics {
|
|||||||
num_shreds_received: u64,
|
num_shreds_received: u64,
|
||||||
shred_receiver_elapsed_us: u64,
|
shred_receiver_elapsed_us: u64,
|
||||||
prune_shreds_elapsed_us: u64,
|
prune_shreds_elapsed_us: u64,
|
||||||
|
num_shreds_pruned_invalid_repair: usize,
|
||||||
|
num_errors: u64,
|
||||||
|
num_errors_blockstore: u64,
|
||||||
|
num_errors_cross_beam_recv_timeout: u64,
|
||||||
|
num_errors_other: u64,
|
||||||
|
num_errors_try_crossbeam_send: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WindowServiceMetrics {
|
impl WindowServiceMetrics {
|
||||||
@ -68,8 +74,39 @@ impl WindowServiceMetrics {
|
|||||||
self.prune_shreds_elapsed_us as i64,
|
self.prune_shreds_elapsed_us as i64,
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"num_shreds_pruned_invalid_repair",
|
||||||
|
self.num_shreds_pruned_invalid_repair,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
("num_errors", self.num_errors, i64),
|
||||||
|
("num_errors_blockstore", self.num_errors_blockstore, i64),
|
||||||
|
("num_errors_other", self.num_errors_other, i64),
|
||||||
|
(
|
||||||
|
"num_errors_try_crossbeam_send",
|
||||||
|
self.num_errors_try_crossbeam_send,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"num_errors_cross_beam_recv_timeout",
|
||||||
|
self.num_errors_cross_beam_recv_timeout,
|
||||||
|
i64
|
||||||
|
),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn record_error(&mut self, err: &Error) {
|
||||||
|
self.num_errors += 1;
|
||||||
|
match err {
|
||||||
|
Error::TryCrossbeamSend => self.num_errors_try_crossbeam_send += 1,
|
||||||
|
Error::CrossbeamRecvTimeout(_) => self.num_errors_cross_beam_recv_timeout += 1,
|
||||||
|
Error::Blockstore(err) => {
|
||||||
|
self.num_errors_blockstore += 1;
|
||||||
|
error!("blockstore error: {}", err);
|
||||||
|
}
|
||||||
|
_ => self.num_errors_other += 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
@ -269,6 +306,7 @@ fn run_insert<F>(
|
|||||||
where
|
where
|
||||||
F: Fn(Shred),
|
F: Fn(Shred),
|
||||||
{
|
{
|
||||||
|
ws_metrics.run_insert_count += 1;
|
||||||
let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
|
let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
|
||||||
let timer = Duration::from_millis(200);
|
let timer = Duration::from_millis(200);
|
||||||
let (mut shreds, mut repair_infos) = shred_receiver.recv_timeout(timer)?;
|
let (mut shreds, mut repair_infos) = shred_receiver.recv_timeout(timer)?;
|
||||||
@ -277,15 +315,19 @@ where
|
|||||||
repair_infos.extend(more_repair_infos);
|
repair_infos.extend(more_repair_infos);
|
||||||
}
|
}
|
||||||
shred_receiver_elapsed.stop();
|
shred_receiver_elapsed.stop();
|
||||||
|
ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us();
|
||||||
ws_metrics.num_shreds_received += shreds.len() as u64;
|
ws_metrics.num_shreds_received += shreds.len() as u64;
|
||||||
|
|
||||||
let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed");
|
let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed");
|
||||||
|
let num_shreds = shreds.len();
|
||||||
prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, outstanding_requests);
|
prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, outstanding_requests);
|
||||||
|
ws_metrics.num_shreds_pruned_invalid_repair = num_shreds - shreds.len();
|
||||||
let repairs: Vec<_> = repair_infos
|
let repairs: Vec<_> = repair_infos
|
||||||
.iter()
|
.iter()
|
||||||
.map(|repair_info| repair_info.is_some())
|
.map(|repair_info| repair_info.is_some())
|
||||||
.collect();
|
.collect();
|
||||||
prune_shreds_elapsed.stop();
|
prune_shreds_elapsed.stop();
|
||||||
|
ws_metrics.prune_shreds_elapsed_us += prune_shreds_elapsed.as_us();
|
||||||
|
|
||||||
let (completed_data_sets, inserted_indices) = blockstore.insert_shreds_handle_duplicate(
|
let (completed_data_sets, inserted_indices) = blockstore.insert_shreds_handle_duplicate(
|
||||||
shreds,
|
shreds,
|
||||||
@ -303,11 +345,6 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
completed_data_sets_sender.try_send(completed_data_sets)?;
|
completed_data_sets_sender.try_send(completed_data_sets)?;
|
||||||
|
|
||||||
ws_metrics.run_insert_count += 1;
|
|
||||||
ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us();
|
|
||||||
ws_metrics.prune_shreds_elapsed_us += prune_shreds_elapsed.as_us();
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -567,6 +604,7 @@ impl WindowService {
|
|||||||
&retransmit_sender,
|
&retransmit_sender,
|
||||||
&outstanding_requests,
|
&outstanding_requests,
|
||||||
) {
|
) {
|
||||||
|
ws_metrics.record_error(&e);
|
||||||
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {
|
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user