diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index e4876798db..2c4433b0cb 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -109,8 +109,10 @@ pub(super) fn entries_to_shreds( ) .expect("Expected to create a new shredder"); + let now = Instant::now(); bincode::serialize_into(&mut shredder, &entries) .expect("Expect to write all entries to shreds"); + let elapsed = now.elapsed().as_millis(); let unfinished_slot = if last_tick == bank_max_tick { shredder.finalize_slot(); @@ -124,9 +126,20 @@ pub(super) fn entries_to_shreds( }) }; + let num_shreds = shredder.shreds.len(); shreds.append(&mut shredder.shreds); - trace!("Inserting {:?} shreds in blocktree", shreds.len()); + datapoint_info!( + "shredding-stats", + ("slot", slot as i64, i64), + ("num_shreds", num_shreds as i64, i64), + ("signing_coding", shredder.signing_coding_time as i64, i64), + ( + "copying_serializing", + (elapsed - shredder.signing_coding_time) as i64, + i64 + ), + ); (shreds, unfinished_slot) } diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index fa599b4bbb..03e9c3c441 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -13,6 +13,11 @@ struct BroadcastStats { pub(super) struct StandardBroadcastRun { stats: BroadcastStats, unfinished_slot: Option, + current_slot: Option, + shredding_elapsed: u128, + insertion_elapsed: u128, + broadcast_elapsed: u128, + slot_broadcast_start: Option, } impl StandardBroadcastRun { @@ -20,6 +25,11 @@ impl StandardBroadcastRun { Self { stats: BroadcastStats::default(), unfinished_slot: None, + current_slot: None, + shredding_elapsed: 0, + insertion_elapsed: 0, + broadcast_elapsed: 0, + slot_broadcast_start: None, } } @@ -78,6 +88,11 @@ impl BroadcastRun for StandardBroadcastRun { let last_tick = receive_results.last_tick; inc_new_counter_info!("broadcast_service-entries_received", num_entries); + if Some(bank.slot()) != self.current_slot { + self.slot_broadcast_start = Some(Instant::now()); + self.current_slot = Some(bank.slot()); + } + // 2) Convert entries to blobs + generate coding blobs let keypair = &cluster_info.read().unwrap().keypair.clone(); let latest_shred_index = blocktree @@ -121,6 +136,7 @@ impl BroadcastRun for StandardBroadcastRun { let all_shred_bufs: Vec> = shred_infos.into_iter().map(|s| s.payload).collect(); trace!("Broadcasting {:?} shreds", all_shred_bufs.len()); + cluster_info.read().unwrap().broadcast_shreds( sock, &all_shred_bufs, @@ -136,6 +152,30 @@ impl BroadcastRun for StandardBroadcastRun { .map(|meta| meta.consumed) .unwrap_or(0) }); + + self.insertion_elapsed += insert_shreds_elapsed.as_millis(); + self.shredding_elapsed += to_shreds_elapsed.as_millis(); + self.broadcast_elapsed += broadcast_elapsed.as_millis(); + + if last_tick == bank.max_tick_height() { + datapoint_info!( + "broadcast-bank-stats", + ("slot", bank.slot() as i64, i64), + ("shredding_time", self.shredding_elapsed as i64, i64), + ("insertion_time", self.insertion_elapsed as i64, i64), + ("broadcast_time", self.broadcast_elapsed as i64, i64), + ("num_shreds", latest_shred_index as i64, i64), + ( + "slot_broadcast_time", + self.slot_broadcast_start.unwrap().elapsed().as_millis() as i64, + i64 + ), + ); + self.insertion_elapsed = 0; + self.shredding_elapsed = 0; + self.broadcast_elapsed = 0; + } + self.update_broadcast_stats( duration_as_ms(&receive_elapsed), duration_as_ms(&to_shreds_elapsed), diff --git a/core/src/shred.rs b/core/src/shred.rs index 179572da12..9f1d4e8e0b 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -15,6 +15,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use std::io; use std::io::{Error as IOError, ErrorKind, Write}; use std::sync::Arc; +use std::time::Instant; lazy_static! { static ref SIZE_OF_CODING_SHRED_HEADER: usize = @@ -261,6 +262,7 @@ pub struct Shredder { active_shred: Vec, active_shred_header: DataShredHeader, active_offset: usize, + pub signing_coding_time: u128, } impl Write for Shredder { @@ -277,7 +279,9 @@ impl Write for Shredder { } if self.index - self.fec_set_index >= MAX_DATA_SHREDS_PER_FEC_BLOCK { + let now = Instant::now(); self.sign_unsigned_shreds_and_generate_codes(); + self.signing_coding_time += now.elapsed().as_millis(); } Ok(slice_len) @@ -330,6 +334,7 @@ impl Shredder { active_shred, active_shred_header: header, active_offset: 0, + signing_coding_time: 0, }) } }