Add back shredding broadcast stats (#13463)

This commit is contained in:
sakridge
2020-11-09 23:04:27 -08:00
committed by GitHub
parent fb815294b3
commit b4cf968e14
5 changed files with 104 additions and 59 deletions

View File

@ -11,7 +11,7 @@ use rayon::{
ThreadPool,
};
use serde::{Deserialize, Serialize};
use solana_metrics::datapoint_debug;
use solana_measure::measure::Measure;
use solana_perf::packet::Packet;
use solana_rayon_threadlimit::get_thread_count;
use solana_sdk::{
@ -21,10 +21,38 @@ use solana_sdk::{
pubkey::Pubkey,
signature::{Keypair, Signature, Signer},
};
use std::{mem::size_of, sync::Arc, time::Instant};
use std::{mem::size_of, sync::Arc};
use thiserror::Error;
#[derive(Default, Clone)]
pub struct ProcessShredsStats {
// Per-slot elapsed time
pub shredding_elapsed: u64,
pub receive_elapsed: u64,
pub serialize_elapsed: u64,
pub gen_data_elapsed: u64,
pub gen_coding_elapsed: u64,
pub sign_coding_elapsed: u64,
pub coding_send_elapsed: u64,
pub get_leader_schedule_elapsed: u64,
}
impl ProcessShredsStats {
pub fn update(&mut self, new_stats: &ProcessShredsStats) {
self.shredding_elapsed += new_stats.shredding_elapsed;
self.receive_elapsed += new_stats.receive_elapsed;
self.serialize_elapsed += new_stats.serialize_elapsed;
self.gen_data_elapsed += new_stats.gen_data_elapsed;
self.gen_coding_elapsed += new_stats.gen_coding_elapsed;
self.sign_coding_elapsed += new_stats.sign_coding_elapsed;
self.coding_send_elapsed += new_stats.gen_coding_elapsed;
self.get_leader_schedule_elapsed += new_stats.get_leader_schedule_elapsed;
}
pub fn reset(&mut self) {
*self = Self::default();
}
}
pub type Nonce = u32;
/// The following constants are computed by hand, and hardcoded.
@ -477,9 +505,10 @@ impl Shredder {
is_last_in_slot: bool,
next_shred_index: u32,
) -> (Vec<Shred>, Vec<Shred>, u32) {
let mut stats = ProcessShredsStats::default();
let (data_shreds, last_shred_index) =
self.entries_to_data_shreds(entries, is_last_in_slot, next_shred_index);
let coding_shreds = self.data_shreds_to_coding_shreds(&data_shreds);
self.entries_to_data_shreds(entries, is_last_in_slot, next_shred_index, &mut stats);
let coding_shreds = self.data_shreds_to_coding_shreds(&data_shreds, &mut stats);
(data_shreds, coding_shreds, last_shred_index)
}
@ -488,13 +517,14 @@ impl Shredder {
entries: &[Entry],
is_last_in_slot: bool,
next_shred_index: u32,
process_stats: &mut ProcessShredsStats,
) -> (Vec<Shred>, u32) {
let now = Instant::now();
let mut serialize_time = Measure::start("shred_serialize");
let serialized_shreds =
bincode::serialize(entries).expect("Expect to serialize all entries");
let serialize_time = now.elapsed().as_millis();
serialize_time.stop();
let now = Instant::now();
let mut gen_data_time = Measure::start("shred_gen_data_time");
let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD;
let num_shreds = (serialized_shreds.len() + no_header_size - 1) / no_header_size;
@ -539,19 +569,20 @@ impl Shredder {
.collect()
})
});
let gen_data_time = now.elapsed().as_millis();
datapoint_debug!(
"shredding-stats",
("slot", self.slot as i64, i64),
("num_data_shreds", data_shreds.len() as i64, i64),
("serializing", serialize_time as i64, i64),
("gen_data", gen_data_time as i64, i64),
);
gen_data_time.stop();
process_stats.serialize_elapsed += serialize_time.as_us();
process_stats.gen_data_elapsed += gen_data_time.as_us();
(data_shreds, last_shred_index + 1)
}
pub fn data_shreds_to_coding_shreds(&self, data_shreds: &[Shred]) -> Vec<Shred> {
let now = Instant::now();
pub fn data_shreds_to_coding_shreds(
&self,
data_shreds: &[Shred],
process_stats: &mut ProcessShredsStats,
) -> Vec<Shred> {
let mut gen_coding_time = Measure::start("gen_coding_shreds");
// 2) Generate coding shreds
let mut coding_shreds: Vec<_> = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
@ -568,9 +599,9 @@ impl Shredder {
.collect()
})
});
let gen_coding_time = now.elapsed().as_millis();
gen_coding_time.stop();
let now = Instant::now();
let mut sign_coding_time = Measure::start("sign_coding_shreds");
// 3) Sign coding shreds
PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
@ -579,14 +610,10 @@ impl Shredder {
})
})
});
let sign_coding_time = now.elapsed().as_millis();
sign_coding_time.stop();
datapoint_debug!(
"shredding-stats",
("num_coding_shreds", coding_shreds.len() as i64, i64),
("gen_coding", gen_coding_time as i64, i64),
("sign_coding", sign_coding_time as i64, i64),
);
process_stats.gen_coding_elapsed += gen_coding_time.as_us();
process_stats.sign_coding_elapsed += sign_coding_time.as_us();
coding_shreds
}
@ -1614,19 +1641,22 @@ pub mod tests {
})
.collect();
let mut stats = ProcessShredsStats::default();
let start_index = 0x12;
let (data_shreds, _next_index) =
shredder.entries_to_data_shreds(&entries, true, start_index);
shredder.entries_to_data_shreds(&entries, true, start_index, &mut stats);
assert!(data_shreds.len() > MAX_DATA_SHREDS_PER_FEC_BLOCK as usize);
(1..=MAX_DATA_SHREDS_PER_FEC_BLOCK as usize).for_each(|count| {
let coding_shreds = shredder.data_shreds_to_coding_shreds(&data_shreds[..count]);
let coding_shreds =
shredder.data_shreds_to_coding_shreds(&data_shreds[..count], &mut stats);
assert_eq!(coding_shreds.len(), count);
});
let coding_shreds = shredder.data_shreds_to_coding_shreds(
&data_shreds[..MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1],
&mut stats,
);
assert_eq!(
coding_shreds.len(),