diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 04f465e12d..dd6b99e31f 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -29,9 +29,17 @@ pub enum BroadcastStageReturnType { ChannelDisconnected, } +#[derive(Default)] +struct BroadcastStats { + num_entries: Vec, + run_elapsed: Vec, + to_blobs_elapsed: Vec, +} + struct Broadcast { id: Pubkey, coding_generator: CodingGenerator, + stats: BroadcastStats, } impl Broadcast { @@ -47,7 +55,7 @@ impl Broadcast { let (mut bank, entries) = receiver.recv_timeout(timer)?; let mut max_tick_height = bank.max_tick_height(); - let now = Instant::now(); + let run_start = Instant::now(); let mut num_entries = entries.len(); let mut ventries = Vec::new(); let mut last_tick = entries.last().map(|v| v.1).unwrap_or(0); @@ -134,21 +142,42 @@ impl Broadcast { // send out erasures ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?; - let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed()); - - inc_new_counter_info!( - "broadcast_service-time_ms", - duration_as_ms(&now.elapsed()) as usize + self.update_broadcast_stats( + duration_as_ms(&broadcast_start.elapsed()), + duration_as_ms(&run_start.elapsed()), + num_entries, + to_blobs_elapsed, + blob_index, ); - info!( - "broadcast: {} entries, blob time {} broadcast time {}", - num_entries, to_blobs_elapsed, broadcast_elapsed - ); - - datapoint!("broadcast-service", ("transmit-index", blob_index, i64)); Ok(()) } + + fn update_broadcast_stats( + &mut self, + broadcast_elapsed: u64, + run_elapsed: u64, + num_entries: usize, + to_blobs_elapsed: u64, + blob_index: u64, + ) { + inc_new_counter_info!("broadcast_service-time_ms", broadcast_elapsed as usize); + + self.stats.num_entries.push(num_entries); + self.stats.to_blobs_elapsed.push(to_blobs_elapsed); + self.stats.run_elapsed.push(run_elapsed); + if self.stats.num_entries.len() >= 16 { + info!( + "broadcast: entries: {:?} blob times ms: {:?} broadcast times ms: {:?}", + self.stats.num_entries, self.stats.to_blobs_elapsed, self.stats.run_elapsed + ); + self.stats.num_entries.clear(); + self.stats.to_blobs_elapsed.clear(); + self.stats.run_elapsed.clear(); + } + + datapoint!("broadcast-service", ("transmit-index", blob_index, i64)); + } } // Implement a destructor for the BroadcastStage thread to signal it exited @@ -188,6 +217,7 @@ impl BroadcastStage { let mut broadcast = Broadcast { id: me.id, coding_generator, + stats: BroadcastStats::default(), }; loop {