Reduce broadcast prints (#4368)
This commit is contained in:
@ -29,9 +29,17 @@ pub enum BroadcastStageReturnType {
|
|||||||
ChannelDisconnected,
|
ChannelDisconnected,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct BroadcastStats {
|
||||||
|
num_entries: Vec<usize>,
|
||||||
|
run_elapsed: Vec<u64>,
|
||||||
|
to_blobs_elapsed: Vec<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
struct Broadcast {
|
struct Broadcast {
|
||||||
id: Pubkey,
|
id: Pubkey,
|
||||||
coding_generator: CodingGenerator,
|
coding_generator: CodingGenerator,
|
||||||
|
stats: BroadcastStats,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Broadcast {
|
impl Broadcast {
|
||||||
@ -47,7 +55,7 @@ impl Broadcast {
|
|||||||
let (mut bank, entries) = receiver.recv_timeout(timer)?;
|
let (mut bank, entries) = receiver.recv_timeout(timer)?;
|
||||||
let mut max_tick_height = bank.max_tick_height();
|
let mut max_tick_height = bank.max_tick_height();
|
||||||
|
|
||||||
let now = Instant::now();
|
let run_start = Instant::now();
|
||||||
let mut num_entries = entries.len();
|
let mut num_entries = entries.len();
|
||||||
let mut ventries = Vec::new();
|
let mut ventries = Vec::new();
|
||||||
let mut last_tick = entries.last().map(|v| v.1).unwrap_or(0);
|
let mut last_tick = entries.last().map(|v| v.1).unwrap_or(0);
|
||||||
@ -134,21 +142,42 @@ impl Broadcast {
|
|||||||
// send out erasures
|
// send out erasures
|
||||||
ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?;
|
ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?;
|
||||||
|
|
||||||
let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed());
|
self.update_broadcast_stats(
|
||||||
|
duration_as_ms(&broadcast_start.elapsed()),
|
||||||
inc_new_counter_info!(
|
duration_as_ms(&run_start.elapsed()),
|
||||||
"broadcast_service-time_ms",
|
num_entries,
|
||||||
duration_as_ms(&now.elapsed()) as usize
|
to_blobs_elapsed,
|
||||||
|
blob_index,
|
||||||
);
|
);
|
||||||
info!(
|
|
||||||
"broadcast: {} entries, blob time {} broadcast time {}",
|
|
||||||
num_entries, to_blobs_elapsed, broadcast_elapsed
|
|
||||||
);
|
|
||||||
|
|
||||||
datapoint!("broadcast-service", ("transmit-index", blob_index, i64));
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn update_broadcast_stats(
|
||||||
|
&mut self,
|
||||||
|
broadcast_elapsed: u64,
|
||||||
|
run_elapsed: u64,
|
||||||
|
num_entries: usize,
|
||||||
|
to_blobs_elapsed: u64,
|
||||||
|
blob_index: u64,
|
||||||
|
) {
|
||||||
|
inc_new_counter_info!("broadcast_service-time_ms", broadcast_elapsed as usize);
|
||||||
|
|
||||||
|
self.stats.num_entries.push(num_entries);
|
||||||
|
self.stats.to_blobs_elapsed.push(to_blobs_elapsed);
|
||||||
|
self.stats.run_elapsed.push(run_elapsed);
|
||||||
|
if self.stats.num_entries.len() >= 16 {
|
||||||
|
info!(
|
||||||
|
"broadcast: entries: {:?} blob times ms: {:?} broadcast times ms: {:?}",
|
||||||
|
self.stats.num_entries, self.stats.to_blobs_elapsed, self.stats.run_elapsed
|
||||||
|
);
|
||||||
|
self.stats.num_entries.clear();
|
||||||
|
self.stats.to_blobs_elapsed.clear();
|
||||||
|
self.stats.run_elapsed.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
datapoint!("broadcast-service", ("transmit-index", blob_index, i64));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implement a destructor for the BroadcastStage thread to signal it exited
|
// Implement a destructor for the BroadcastStage thread to signal it exited
|
||||||
@ -188,6 +217,7 @@ impl BroadcastStage {
|
|||||||
let mut broadcast = Broadcast {
|
let mut broadcast = Broadcast {
|
||||||
id: me.id,
|
id: me.id,
|
||||||
coding_generator,
|
coding_generator,
|
||||||
|
stats: BroadcastStats::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
Reference in New Issue
Block a user