diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index 5069e1f581..07dfb809c7 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -89,6 +89,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { slot, num_expected_batches: None, slot_start_ts: Instant::now(), + was_interrupted: false, }; // 3) Start broadcast step //some indicates fake shreds diff --git a/core/src/broadcast_stage/broadcast_metrics.rs b/core/src/broadcast_stage/broadcast_metrics.rs index 364da3d79a..a05a3d9384 100644 --- a/core/src/broadcast_stage/broadcast_metrics.rs +++ b/core/src/broadcast_stage/broadcast_metrics.rs @@ -2,7 +2,7 @@ use super::*; pub(crate) trait BroadcastStats { fn update(&mut self, new_stats: &Self); - fn report_stats(&mut self, slot: Slot, slot_start: Instant); + fn report_stats(&mut self, slot: Slot, slot_start: Instant, was_interrupted: bool); } #[derive(Clone)] @@ -10,6 +10,7 @@ pub(crate) struct BroadcastShredBatchInfo { pub(crate) slot: Slot, pub(crate) num_expected_batches: Option, pub(crate) slot_start_ts: Instant, + pub(crate) was_interrupted: bool, } #[derive(Default, Clone)] @@ -33,25 +34,39 @@ impl BroadcastStats for TransmitShredsStats { self.total_packets += new_stats.total_packets; self.dropped_packets += new_stats.dropped_packets; } - fn report_stats(&mut self, slot: Slot, slot_start: Instant) { - datapoint_info!( - "broadcast-transmit-shreds-stats", - ("slot", slot as i64, i64), - ( - "end_to_end_elapsed", - // `slot_start` signals when the first batch of shreds was - // received, used to measure duration of broadcast - slot_start.elapsed().as_micros() as i64, - i64 - ), - ("transmit_elapsed", self.transmit_elapsed as i64, i64), - ("send_mmsg_elapsed", self.send_mmsg_elapsed as i64, i64), - ("get_peers_elapsed", self.get_peers_elapsed as i64, i64), - ("num_shreds", self.num_shreds as i64, i64), - ("shred_select", self.shred_select as i64, i64), - ("total_packets", self.total_packets as i64, i64), - ("dropped_packets", self.dropped_packets as i64, i64), - ); + fn report_stats(&mut self, slot: Slot, slot_start: Instant, was_interrupted: bool) { + if was_interrupted { + datapoint_info!( + "broadcast-transmit-shreds-interrupted-stats", + ("slot", slot as i64, i64), + ("transmit_elapsed", self.transmit_elapsed as i64, i64), + ("send_mmsg_elapsed", self.send_mmsg_elapsed as i64, i64), + ("get_peers_elapsed", self.get_peers_elapsed as i64, i64), + ("num_shreds", self.num_shreds as i64, i64), + ("shred_select", self.shred_select as i64, i64), + ("total_packets", self.total_packets as i64, i64), + ("dropped_packets", self.dropped_packets as i64, i64), + ); + } else { + datapoint_info!( + "broadcast-transmit-shreds-stats", + ("slot", slot as i64, i64), + ( + "end_to_end_elapsed", + // `slot_start` signals when the first batch of shreds was + // received, used to measure duration of broadcast + slot_start.elapsed().as_micros() as i64, + i64 + ), + ("transmit_elapsed", self.transmit_elapsed as i64, i64), + ("send_mmsg_elapsed", self.send_mmsg_elapsed as i64, i64), + ("get_peers_elapsed", self.get_peers_elapsed as i64, i64), + ("num_shreds", self.num_shreds as i64, i64), + ("shred_select", self.shred_select as i64, i64), + ("total_packets", self.total_packets as i64, i64), + ("dropped_packets", self.dropped_packets as i64, i64), + ); + } } } @@ -65,24 +80,37 @@ impl BroadcastStats for InsertShredsStats { self.insert_shreds_elapsed += new_stats.insert_shreds_elapsed; self.num_shreds += new_stats.num_shreds; } - fn report_stats(&mut self, slot: Slot, slot_start: Instant) { - datapoint_info!( - "broadcast-insert-shreds-stats", - ("slot", slot as i64, i64), - ( - "end_to_end_elapsed", - // `slot_start` signals when the first batch of shreds was - // received, used to measure duration of broadcast - slot_start.elapsed().as_micros() as i64, - i64 - ), - ( - "insert_shreds_elapsed", - self.insert_shreds_elapsed as i64, - i64 - ), - ("num_shreds", self.num_shreds as i64, i64), - ); + fn report_stats(&mut self, slot: Slot, slot_start: Instant, was_interrupted: bool) { + if was_interrupted { + datapoint_info!( + "broadcast-insert-shreds-interrupted-stats", + ("slot", slot as i64, i64), + ( + "insert_shreds_elapsed", + self.insert_shreds_elapsed as i64, + i64 + ), + ("num_shreds", self.num_shreds as i64, i64), + ); + } else { + datapoint_info!( + "broadcast-insert-shreds-stats", + ("slot", slot as i64, i64), + ( + "end_to_end_elapsed", + // `slot_start` signals when the first batch of shreds was + // received, used to measure duration of broadcast + slot_start.elapsed().as_micros() as i64, + i64 + ), + ( + "insert_shreds_elapsed", + self.insert_shreds_elapsed as i64, + i64 + ), + ("num_shreds", self.num_shreds as i64, i64), + ); + } } } @@ -128,9 +156,11 @@ impl SlotBroadcastStats { } if let Some(num_expected_batches) = slot_batch_counter.num_expected_batches { if slot_batch_counter.num_batches == num_expected_batches { - slot_batch_counter - .broadcast_shred_stats - .report_stats(batch_info.slot, batch_info.slot_start_ts); + slot_batch_counter.broadcast_shred_stats.report_stats( + batch_info.slot, + batch_info.slot_start_ts, + batch_info.was_interrupted, + ); should_delete = true; } } @@ -159,7 +189,7 @@ mod test { self.count += new_stats.count; self.sender = new_stats.sender.clone(); } - fn report_stats(&mut self, slot: Slot, slot_start: Instant) { + fn report_stats(&mut self, slot: Slot, slot_start: Instant, _was_interrupted: bool) { self.sender .as_ref() .unwrap() @@ -186,6 +216,7 @@ mod test { slot: 0, num_expected_batches: Some(2), slot_start_ts: start, + was_interrupted: false, }), ); @@ -242,6 +273,7 @@ mod test { slot: 0, num_expected_batches: None, slot_start_ts: start, + was_interrupted: false, }), ); @@ -265,6 +297,7 @@ mod test { slot, num_expected_batches: None, slot_start_ts: start, + was_interrupted: false, }; if i == round % num_threads { broadcast_batch_info.num_expected_batches = Some(num_threads); diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index c8dc2b35ee..bfd488f13e 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -92,7 +92,7 @@ impl StandardBroadcastRun { stats, ); shreds.insert(0, shred); - self.report_and_reset_stats(); + self.report_and_reset_stats(true); self.unfinished_slot = None; shreds } @@ -240,6 +240,7 @@ impl StandardBroadcastRun { "Old broadcast start time for previous slot must exist if the previous slot was interrupted", ), + was_interrupted: true, }); let shreds = Arc::new(prev_slot_shreds); debug_assert!(shreds.iter().all(|shred| shred.slot() == slot)); @@ -262,6 +263,7 @@ impl StandardBroadcastRun { slot_start_ts: self .slot_broadcast_start .expect("Start timestamp must exist for a slot if we're broadcasting the slot"), + was_interrupted: false, }); get_leader_schedule_time.stop(); @@ -297,7 +299,7 @@ impl StandardBroadcastRun { self.process_shreds_stats.update(&process_stats); if last_tick_height == bank.max_tick_height() { - self.report_and_reset_stats(); + self.report_and_reset_stats(false); self.unfinished_slot = None; } @@ -380,35 +382,59 @@ impl StandardBroadcastRun { transmit_shreds_stats.update(new_transmit_shreds_stats, broadcast_shred_batch_info); } - fn report_and_reset_stats(&mut self) { + fn report_and_reset_stats(&mut self, was_interrupted: bool) { let stats = &self.process_shreds_stats; let unfinished_slot = self.unfinished_slot.as_ref().unwrap(); - datapoint_info!( - "broadcast-process-shreds-stats", - ("slot", unfinished_slot.slot as i64, i64), - ("shredding_time", stats.shredding_elapsed, i64), - ("receive_time", stats.receive_elapsed, i64), - ( - "num_data_shreds", - unfinished_slot.next_shred_index as i64, - i64 - ), - ( - "slot_broadcast_time", - self.slot_broadcast_start.unwrap().elapsed().as_micros() as i64, - i64 - ), - ( - "get_leader_schedule_time", - stats.get_leader_schedule_elapsed, - i64 - ), - ("serialize_shreds_time", stats.serialize_elapsed, i64), - ("gen_data_time", stats.gen_data_elapsed, i64), - ("gen_coding_time", stats.gen_coding_elapsed, i64), - ("sign_coding_time", stats.sign_coding_elapsed, i64), - ("coding_send_time", stats.coding_send_elapsed, i64), - ); + if was_interrupted { + datapoint_info!( + "broadcast-process-shreds-interrupted-stats", + ("slot", unfinished_slot.slot as i64, i64), + ("shredding_time", stats.shredding_elapsed, i64), + ("receive_time", stats.receive_elapsed, i64), + ( + "num_data_shreds", + unfinished_slot.next_shred_index as i64, + i64 + ), + ( + "get_leader_schedule_time", + stats.get_leader_schedule_elapsed, + i64 + ), + ("serialize_shreds_time", stats.serialize_elapsed, i64), + ("gen_data_time", stats.gen_data_elapsed, i64), + ("gen_coding_time", stats.gen_coding_elapsed, i64), + ("sign_coding_time", stats.sign_coding_elapsed, i64), + ("coding_send_time", stats.coding_send_elapsed, i64), + ); + } else { + datapoint_info!( + "broadcast-process-shreds-stats", + ("slot", unfinished_slot.slot as i64, i64), + ("shredding_time", stats.shredding_elapsed, i64), + ("receive_time", stats.receive_elapsed, i64), + ( + "num_data_shreds", + unfinished_slot.next_shred_index as i64, + i64 + ), + ( + "slot_broadcast_time", + self.slot_broadcast_start.unwrap().elapsed().as_micros() as i64, + i64 + ), + ( + "get_leader_schedule_time", + stats.get_leader_schedule_elapsed, + i64 + ), + ("serialize_shreds_time", stats.serialize_elapsed, i64), + ("gen_data_time", stats.gen_data_elapsed, i64), + ("gen_coding_time", stats.gen_coding_elapsed, i64), + ("sign_coding_time", stats.sign_coding_elapsed, i64), + ("coding_send_time", stats.coding_send_elapsed, i64), + ); + } self.process_shreds_stats.reset(); } }