From 9dceb8ac749a51477c1e2ffa4dfbe8e4bfedc10a Mon Sep 17 00:00:00 2001 From: carllin Date: Tue, 8 Oct 2019 01:42:42 -0700 Subject: [PATCH] Broadcast/Shredding Metrics (#6270) automerge --- core/src/broadcast_stage/broadcast_utils.rs | 3 ++- .../broadcast_stage/standard_broadcast_run.rs | 16 +++++++++++++++- core/src/shred.rs | 11 +++-------- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index 0334f07efd..30cacb6813 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -27,8 +27,8 @@ const RECEIVE_ENTRY_COUNT_THRESHOLD: usize = 8; pub(super) fn recv_slot_entries(receiver: &Receiver) -> Result { let timer = Duration::new(1, 0); - let (mut bank, (entry, mut last_tick)) = receiver.recv_timeout(timer)?; let recv_start = Instant::now(); + let (mut bank, (entry, mut last_tick)) = receiver.recv_timeout(timer)?; let mut entries = vec![entry]; let mut slot = bank.slot(); @@ -41,6 +41,7 @@ pub(super) fn recv_slot_entries(receiver: &Receiver) -> Result // If the bank changed, that implies the previous slot was interrupted and we do not have to // broadcast its entries. if try_bank.slot() != slot { + warn!("Broadcast for slot: {} interrupted", bank.slot()); entries.clear(); bank = try_bank; slot = bank.slot(); diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 4724fbe3d1..a915701f84 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -2,6 +2,7 @@ use super::broadcast_utils; use super::*; use crate::shred::{Shredder, RECOMMENDED_FEC_RATE}; use solana_sdk::timing::duration_as_ms; +use std::time::Duration; #[derive(Default)] struct BroadcastStats { @@ -16,6 +17,8 @@ pub(super) struct StandardBroadcastRun { shredding_elapsed: u128, insertion_elapsed: u128, broadcast_elapsed: u128, + receive_elapsed: u128, + clone_and_seed_elapsed: u128, slot_broadcast_start: Option, } @@ -27,6 +30,8 @@ impl StandardBroadcastRun { shredding_elapsed: 0, insertion_elapsed: 0, broadcast_elapsed: 0, + receive_elapsed: 0, + clone_and_seed_elapsed: 0, slot_broadcast_start: None, } } @@ -80,7 +85,7 @@ impl BroadcastRun for StandardBroadcastRun { ) -> Result<()> { // 1) Pull entries from banking stage let receive_results = broadcast_utils::recv_slot_entries(receiver)?; - let receive_elapsed = receive_results.time_elapsed; + let mut receive_elapsed = receive_results.time_elapsed; let num_entries = receive_results.entries.len(); let bank = receive_results.bank.clone(); let last_tick = receive_results.last_tick; @@ -89,6 +94,7 @@ impl BroadcastRun for StandardBroadcastRun { if Some(bank.slot()) != self.current_slot { self.slot_broadcast_start = Some(Instant::now()); self.current_slot = Some(bank.slot()); + receive_elapsed = Duration::new(0, 0); } // 2) Convert entries to blobs + generate coding blobs @@ -122,6 +128,7 @@ impl BroadcastRun for StandardBroadcastRun { ); let to_shreds_elapsed = to_shreds_start.elapsed(); + let clone_and_seed_start = Instant::now(); let all_shreds = data_shreds .iter() .cloned() @@ -129,6 +136,7 @@ impl BroadcastRun for StandardBroadcastRun { .collect::>(); let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect(); let num_shreds = all_shreds.len(); + let clone_and_seed_elapsed = clone_and_seed_start.elapsed(); // Insert shreds into blocktree let insert_shreds_start = Instant::now(); @@ -161,6 +169,8 @@ impl BroadcastRun for StandardBroadcastRun { self.insertion_elapsed += insert_shreds_elapsed.as_millis(); self.shredding_elapsed += to_shreds_elapsed.as_millis(); self.broadcast_elapsed += broadcast_elapsed.as_millis(); + self.receive_elapsed += receive_elapsed.as_millis(); + self.clone_and_seed_elapsed += clone_and_seed_elapsed.as_millis(); if last_tick == bank.max_tick_height() { datapoint_info!( @@ -169,6 +179,8 @@ impl BroadcastRun for StandardBroadcastRun { ("shredding_time", self.shredding_elapsed as i64, i64), ("insertion_time", self.insertion_elapsed as i64, i64), ("broadcast_time", self.broadcast_elapsed as i64, i64), + ("receive_time", self.receive_elapsed as i64, i64), + ("clone_and_seed", self.clone_and_seed_elapsed as i64, i64), ("num_shreds", i64::from(latest_shred_index), i64), ( "slot_broadcast_time", @@ -179,6 +191,8 @@ impl BroadcastRun for StandardBroadcastRun { self.insertion_elapsed = 0; self.shredding_elapsed = 0; self.broadcast_elapsed = 0; + self.receive_elapsed = 0; + self.clone_and_seed_elapsed = 0; } self.update_broadcast_stats( diff --git a/core/src/shred.rs b/core/src/shred.rs index 8a9c63b604..44f8d610f0 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -330,6 +330,7 @@ impl Shredder { let now = Instant::now(); let serialized_shreds = bincode::serialize(entries).expect("Expect to serialize all entries"); + let serialize_time = now.elapsed().as_millis(); let no_header_size = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER; let num_shreds = (serialized_shreds.len() + no_header_size - 1) / no_header_size; @@ -392,7 +393,6 @@ impl Shredder { }) }); - // TODO: pre-allocate this let elapsed = now.elapsed().as_millis(); datapoint_info!( @@ -400,13 +400,8 @@ impl Shredder { ("slot", self.slot as i64, i64), ("num_data_shreds", data_shreds.len() as i64, i64), ("num_coding_shreds", coding_shreds.len() as i64, i64), - // TODO: update signing_coding_time - ("signing_coding", self.signing_coding_time as i64, i64), - ( - "copying_serialzing", - (elapsed - self.signing_coding_time) as i64, - i64 - ), + ("signing_coding", (elapsed - serialize_time) as i64, i64), + ("serialzing", serialize_time as i64, i64), ); (data_shreds, coding_shreds, last_shred_index + 1)