Separate out interrupted slots broadcast metrics (#20537)
This commit is contained in:
		@@ -89,6 +89,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
 | 
			
		||||
            slot,
 | 
			
		||||
            num_expected_batches: None,
 | 
			
		||||
            slot_start_ts: Instant::now(),
 | 
			
		||||
            was_interrupted: false,
 | 
			
		||||
        };
 | 
			
		||||
        // 3) Start broadcast step
 | 
			
		||||
        //some indicates fake shreds
 | 
			
		||||
 
 | 
			
		||||
@@ -2,7 +2,7 @@ use super::*;
 | 
			
		||||
 | 
			
		||||
pub(crate) trait BroadcastStats {
 | 
			
		||||
    fn update(&mut self, new_stats: &Self);
 | 
			
		||||
    fn report_stats(&mut self, slot: Slot, slot_start: Instant);
 | 
			
		||||
    fn report_stats(&mut self, slot: Slot, slot_start: Instant, was_interrupted: bool);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
@@ -10,6 +10,7 @@ pub(crate) struct BroadcastShredBatchInfo {
 | 
			
		||||
    pub(crate) slot: Slot,
 | 
			
		||||
    pub(crate) num_expected_batches: Option<usize>,
 | 
			
		||||
    pub(crate) slot_start_ts: Instant,
 | 
			
		||||
    pub(crate) was_interrupted: bool,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Default, Clone)]
 | 
			
		||||
@@ -33,25 +34,39 @@ impl BroadcastStats for TransmitShredsStats {
 | 
			
		||||
        self.total_packets += new_stats.total_packets;
 | 
			
		||||
        self.dropped_packets += new_stats.dropped_packets;
 | 
			
		||||
    }
 | 
			
		||||
    fn report_stats(&mut self, slot: Slot, slot_start: Instant) {
 | 
			
		||||
        datapoint_info!(
 | 
			
		||||
            "broadcast-transmit-shreds-stats",
 | 
			
		||||
            ("slot", slot as i64, i64),
 | 
			
		||||
            (
 | 
			
		||||
                "end_to_end_elapsed",
 | 
			
		||||
                // `slot_start` signals when the first batch of shreds was
 | 
			
		||||
                // received, used to measure duration of broadcast
 | 
			
		||||
                slot_start.elapsed().as_micros() as i64,
 | 
			
		||||
                i64
 | 
			
		||||
            ),
 | 
			
		||||
            ("transmit_elapsed", self.transmit_elapsed as i64, i64),
 | 
			
		||||
            ("send_mmsg_elapsed", self.send_mmsg_elapsed as i64, i64),
 | 
			
		||||
            ("get_peers_elapsed", self.get_peers_elapsed as i64, i64),
 | 
			
		||||
            ("num_shreds", self.num_shreds as i64, i64),
 | 
			
		||||
            ("shred_select", self.shred_select as i64, i64),
 | 
			
		||||
            ("total_packets", self.total_packets as i64, i64),
 | 
			
		||||
            ("dropped_packets", self.dropped_packets as i64, i64),
 | 
			
		||||
        );
 | 
			
		||||
    fn report_stats(&mut self, slot: Slot, slot_start: Instant, was_interrupted: bool) {
 | 
			
		||||
        if was_interrupted {
 | 
			
		||||
            datapoint_info!(
 | 
			
		||||
                "broadcast-transmit-shreds-interrupted-stats",
 | 
			
		||||
                ("slot", slot as i64, i64),
 | 
			
		||||
                ("transmit_elapsed", self.transmit_elapsed as i64, i64),
 | 
			
		||||
                ("send_mmsg_elapsed", self.send_mmsg_elapsed as i64, i64),
 | 
			
		||||
                ("get_peers_elapsed", self.get_peers_elapsed as i64, i64),
 | 
			
		||||
                ("num_shreds", self.num_shreds as i64, i64),
 | 
			
		||||
                ("shred_select", self.shred_select as i64, i64),
 | 
			
		||||
                ("total_packets", self.total_packets as i64, i64),
 | 
			
		||||
                ("dropped_packets", self.dropped_packets as i64, i64),
 | 
			
		||||
            );
 | 
			
		||||
        } else {
 | 
			
		||||
            datapoint_info!(
 | 
			
		||||
                "broadcast-transmit-shreds-stats",
 | 
			
		||||
                ("slot", slot as i64, i64),
 | 
			
		||||
                (
 | 
			
		||||
                    "end_to_end_elapsed",
 | 
			
		||||
                    // `slot_start` signals when the first batch of shreds was
 | 
			
		||||
                    // received, used to measure duration of broadcast
 | 
			
		||||
                    slot_start.elapsed().as_micros() as i64,
 | 
			
		||||
                    i64
 | 
			
		||||
                ),
 | 
			
		||||
                ("transmit_elapsed", self.transmit_elapsed as i64, i64),
 | 
			
		||||
                ("send_mmsg_elapsed", self.send_mmsg_elapsed as i64, i64),
 | 
			
		||||
                ("get_peers_elapsed", self.get_peers_elapsed as i64, i64),
 | 
			
		||||
                ("num_shreds", self.num_shreds as i64, i64),
 | 
			
		||||
                ("shred_select", self.shred_select as i64, i64),
 | 
			
		||||
                ("total_packets", self.total_packets as i64, i64),
 | 
			
		||||
                ("dropped_packets", self.dropped_packets as i64, i64),
 | 
			
		||||
            );
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -65,24 +80,37 @@ impl BroadcastStats for InsertShredsStats {
 | 
			
		||||
        self.insert_shreds_elapsed += new_stats.insert_shreds_elapsed;
 | 
			
		||||
        self.num_shreds += new_stats.num_shreds;
 | 
			
		||||
    }
 | 
			
		||||
    fn report_stats(&mut self, slot: Slot, slot_start: Instant) {
 | 
			
		||||
        datapoint_info!(
 | 
			
		||||
            "broadcast-insert-shreds-stats",
 | 
			
		||||
            ("slot", slot as i64, i64),
 | 
			
		||||
            (
 | 
			
		||||
                "end_to_end_elapsed",
 | 
			
		||||
                // `slot_start` signals when the first batch of shreds was
 | 
			
		||||
                // received, used to measure duration of broadcast
 | 
			
		||||
                slot_start.elapsed().as_micros() as i64,
 | 
			
		||||
                i64
 | 
			
		||||
            ),
 | 
			
		||||
            (
 | 
			
		||||
                "insert_shreds_elapsed",
 | 
			
		||||
                self.insert_shreds_elapsed as i64,
 | 
			
		||||
                i64
 | 
			
		||||
            ),
 | 
			
		||||
            ("num_shreds", self.num_shreds as i64, i64),
 | 
			
		||||
        );
 | 
			
		||||
    fn report_stats(&mut self, slot: Slot, slot_start: Instant, was_interrupted: bool) {
 | 
			
		||||
        if was_interrupted {
 | 
			
		||||
            datapoint_info!(
 | 
			
		||||
                "broadcast-insert-shreds-interrupted-stats",
 | 
			
		||||
                ("slot", slot as i64, i64),
 | 
			
		||||
                (
 | 
			
		||||
                    "insert_shreds_elapsed",
 | 
			
		||||
                    self.insert_shreds_elapsed as i64,
 | 
			
		||||
                    i64
 | 
			
		||||
                ),
 | 
			
		||||
                ("num_shreds", self.num_shreds as i64, i64),
 | 
			
		||||
            );
 | 
			
		||||
        } else {
 | 
			
		||||
            datapoint_info!(
 | 
			
		||||
                "broadcast-insert-shreds-stats",
 | 
			
		||||
                ("slot", slot as i64, i64),
 | 
			
		||||
                (
 | 
			
		||||
                    "end_to_end_elapsed",
 | 
			
		||||
                    // `slot_start` signals when the first batch of shreds was
 | 
			
		||||
                    // received, used to measure duration of broadcast
 | 
			
		||||
                    slot_start.elapsed().as_micros() as i64,
 | 
			
		||||
                    i64
 | 
			
		||||
                ),
 | 
			
		||||
                (
 | 
			
		||||
                    "insert_shreds_elapsed",
 | 
			
		||||
                    self.insert_shreds_elapsed as i64,
 | 
			
		||||
                    i64
 | 
			
		||||
                ),
 | 
			
		||||
                ("num_shreds", self.num_shreds as i64, i64),
 | 
			
		||||
            );
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -128,9 +156,11 @@ impl<T: BroadcastStats + Default> SlotBroadcastStats<T> {
 | 
			
		||||
                }
 | 
			
		||||
                if let Some(num_expected_batches) = slot_batch_counter.num_expected_batches {
 | 
			
		||||
                    if slot_batch_counter.num_batches == num_expected_batches {
 | 
			
		||||
                        slot_batch_counter
 | 
			
		||||
                            .broadcast_shred_stats
 | 
			
		||||
                            .report_stats(batch_info.slot, batch_info.slot_start_ts);
 | 
			
		||||
                        slot_batch_counter.broadcast_shred_stats.report_stats(
 | 
			
		||||
                            batch_info.slot,
 | 
			
		||||
                            batch_info.slot_start_ts,
 | 
			
		||||
                            batch_info.was_interrupted,
 | 
			
		||||
                        );
 | 
			
		||||
                        should_delete = true;
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
@@ -159,7 +189,7 @@ mod test {
 | 
			
		||||
            self.count += new_stats.count;
 | 
			
		||||
            self.sender = new_stats.sender.clone();
 | 
			
		||||
        }
 | 
			
		||||
        fn report_stats(&mut self, slot: Slot, slot_start: Instant) {
 | 
			
		||||
        fn report_stats(&mut self, slot: Slot, slot_start: Instant, _was_interrupted: bool) {
 | 
			
		||||
            self.sender
 | 
			
		||||
                .as_ref()
 | 
			
		||||
                .unwrap()
 | 
			
		||||
@@ -186,6 +216,7 @@ mod test {
 | 
			
		||||
                slot: 0,
 | 
			
		||||
                num_expected_batches: Some(2),
 | 
			
		||||
                slot_start_ts: start,
 | 
			
		||||
                was_interrupted: false,
 | 
			
		||||
            }),
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
@@ -242,6 +273,7 @@ mod test {
 | 
			
		||||
                slot: 0,
 | 
			
		||||
                num_expected_batches: None,
 | 
			
		||||
                slot_start_ts: start,
 | 
			
		||||
                was_interrupted: false,
 | 
			
		||||
            }),
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
@@ -265,6 +297,7 @@ mod test {
 | 
			
		||||
                        slot,
 | 
			
		||||
                        num_expected_batches: None,
 | 
			
		||||
                        slot_start_ts: start,
 | 
			
		||||
                        was_interrupted: false,
 | 
			
		||||
                    };
 | 
			
		||||
                    if i == round % num_threads {
 | 
			
		||||
                        broadcast_batch_info.num_expected_batches = Some(num_threads);
 | 
			
		||||
 
 | 
			
		||||
@@ -92,7 +92,7 @@ impl StandardBroadcastRun {
 | 
			
		||||
                    stats,
 | 
			
		||||
                );
 | 
			
		||||
                shreds.insert(0, shred);
 | 
			
		||||
                self.report_and_reset_stats();
 | 
			
		||||
                self.report_and_reset_stats(true);
 | 
			
		||||
                self.unfinished_slot = None;
 | 
			
		||||
                shreds
 | 
			
		||||
            }
 | 
			
		||||
@@ -240,6 +240,7 @@ impl StandardBroadcastRun {
 | 
			
		||||
                    "Old broadcast start time for previous slot must exist if the previous slot
 | 
			
		||||
                 was interrupted",
 | 
			
		||||
                ),
 | 
			
		||||
                was_interrupted: true,
 | 
			
		||||
            });
 | 
			
		||||
            let shreds = Arc::new(prev_slot_shreds);
 | 
			
		||||
            debug_assert!(shreds.iter().all(|shred| shred.slot() == slot));
 | 
			
		||||
@@ -262,6 +263,7 @@ impl StandardBroadcastRun {
 | 
			
		||||
            slot_start_ts: self
 | 
			
		||||
                .slot_broadcast_start
 | 
			
		||||
                .expect("Start timestamp must exist for a slot if we're broadcasting the slot"),
 | 
			
		||||
            was_interrupted: false,
 | 
			
		||||
        });
 | 
			
		||||
        get_leader_schedule_time.stop();
 | 
			
		||||
 | 
			
		||||
@@ -297,7 +299,7 @@ impl StandardBroadcastRun {
 | 
			
		||||
        self.process_shreds_stats.update(&process_stats);
 | 
			
		||||
 | 
			
		||||
        if last_tick_height == bank.max_tick_height() {
 | 
			
		||||
            self.report_and_reset_stats();
 | 
			
		||||
            self.report_and_reset_stats(false);
 | 
			
		||||
            self.unfinished_slot = None;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@@ -380,35 +382,59 @@ impl StandardBroadcastRun {
 | 
			
		||||
        transmit_shreds_stats.update(new_transmit_shreds_stats, broadcast_shred_batch_info);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn report_and_reset_stats(&mut self) {
 | 
			
		||||
    fn report_and_reset_stats(&mut self, was_interrupted: bool) {
 | 
			
		||||
        let stats = &self.process_shreds_stats;
 | 
			
		||||
        let unfinished_slot = self.unfinished_slot.as_ref().unwrap();
 | 
			
		||||
        datapoint_info!(
 | 
			
		||||
            "broadcast-process-shreds-stats",
 | 
			
		||||
            ("slot", unfinished_slot.slot as i64, i64),
 | 
			
		||||
            ("shredding_time", stats.shredding_elapsed, i64),
 | 
			
		||||
            ("receive_time", stats.receive_elapsed, i64),
 | 
			
		||||
            (
 | 
			
		||||
                "num_data_shreds",
 | 
			
		||||
                unfinished_slot.next_shred_index as i64,
 | 
			
		||||
                i64
 | 
			
		||||
            ),
 | 
			
		||||
            (
 | 
			
		||||
                "slot_broadcast_time",
 | 
			
		||||
                self.slot_broadcast_start.unwrap().elapsed().as_micros() as i64,
 | 
			
		||||
                i64
 | 
			
		||||
            ),
 | 
			
		||||
            (
 | 
			
		||||
                "get_leader_schedule_time",
 | 
			
		||||
                stats.get_leader_schedule_elapsed,
 | 
			
		||||
                i64
 | 
			
		||||
            ),
 | 
			
		||||
            ("serialize_shreds_time", stats.serialize_elapsed, i64),
 | 
			
		||||
            ("gen_data_time", stats.gen_data_elapsed, i64),
 | 
			
		||||
            ("gen_coding_time", stats.gen_coding_elapsed, i64),
 | 
			
		||||
            ("sign_coding_time", stats.sign_coding_elapsed, i64),
 | 
			
		||||
            ("coding_send_time", stats.coding_send_elapsed, i64),
 | 
			
		||||
        );
 | 
			
		||||
        if was_interrupted {
 | 
			
		||||
            datapoint_info!(
 | 
			
		||||
                "broadcast-process-shreds-interrupted-stats",
 | 
			
		||||
                ("slot", unfinished_slot.slot as i64, i64),
 | 
			
		||||
                ("shredding_time", stats.shredding_elapsed, i64),
 | 
			
		||||
                ("receive_time", stats.receive_elapsed, i64),
 | 
			
		||||
                (
 | 
			
		||||
                    "num_data_shreds",
 | 
			
		||||
                    unfinished_slot.next_shred_index as i64,
 | 
			
		||||
                    i64
 | 
			
		||||
                ),
 | 
			
		||||
                (
 | 
			
		||||
                    "get_leader_schedule_time",
 | 
			
		||||
                    stats.get_leader_schedule_elapsed,
 | 
			
		||||
                    i64
 | 
			
		||||
                ),
 | 
			
		||||
                ("serialize_shreds_time", stats.serialize_elapsed, i64),
 | 
			
		||||
                ("gen_data_time", stats.gen_data_elapsed, i64),
 | 
			
		||||
                ("gen_coding_time", stats.gen_coding_elapsed, i64),
 | 
			
		||||
                ("sign_coding_time", stats.sign_coding_elapsed, i64),
 | 
			
		||||
                ("coding_send_time", stats.coding_send_elapsed, i64),
 | 
			
		||||
            );
 | 
			
		||||
        } else {
 | 
			
		||||
            datapoint_info!(
 | 
			
		||||
                "broadcast-process-shreds-stats",
 | 
			
		||||
                ("slot", unfinished_slot.slot as i64, i64),
 | 
			
		||||
                ("shredding_time", stats.shredding_elapsed, i64),
 | 
			
		||||
                ("receive_time", stats.receive_elapsed, i64),
 | 
			
		||||
                (
 | 
			
		||||
                    "num_data_shreds",
 | 
			
		||||
                    unfinished_slot.next_shred_index as i64,
 | 
			
		||||
                    i64
 | 
			
		||||
                ),
 | 
			
		||||
                (
 | 
			
		||||
                    "slot_broadcast_time",
 | 
			
		||||
                    self.slot_broadcast_start.unwrap().elapsed().as_micros() as i64,
 | 
			
		||||
                    i64
 | 
			
		||||
                ),
 | 
			
		||||
                (
 | 
			
		||||
                    "get_leader_schedule_time",
 | 
			
		||||
                    stats.get_leader_schedule_elapsed,
 | 
			
		||||
                    i64
 | 
			
		||||
                ),
 | 
			
		||||
                ("serialize_shreds_time", stats.serialize_elapsed, i64),
 | 
			
		||||
                ("gen_data_time", stats.gen_data_elapsed, i64),
 | 
			
		||||
                ("gen_coding_time", stats.gen_coding_elapsed, i64),
 | 
			
		||||
                ("sign_coding_time", stats.sign_coding_elapsed, i64),
 | 
			
		||||
                ("coding_send_time", stats.coding_send_elapsed, i64),
 | 
			
		||||
            );
 | 
			
		||||
        }
 | 
			
		||||
        self.process_shreds_stats.reset();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user