diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index d1a29ddd3e..7af22e96cf 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -6,7 +6,7 @@ use rand::seq::SliceRandom; use raptorq::{Decoder, Encoder}; use solana_ledger::entry::{create_ticks, Entry}; use solana_ledger::shred::{ - max_entries_per_n_shred, max_ticks_per_n_shreds, Shred, Shredder, + max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, RECOMMENDED_FEC_RATE, SHRED_PAYLOAD_SIZE, SIZE_OF_DATA_SHRED_IGNORED_TAIL, SIZE_OF_DATA_SHRED_PAYLOAD, }; @@ -40,7 +40,9 @@ fn make_shreds(num_shreds: usize) -> Vec { let entries = make_large_unchained_entries(txs_per_entry, num_entries); let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, Arc::new(Keypair::new()), 0, 0).unwrap(); - let data_shreds = shredder.entries_to_data_shreds(&entries, true, 0).0; + let data_shreds = shredder + .entries_to_data_shreds(&entries, true, 0, &mut ProcessShredsStats::default()) + .0; assert!(data_shreds.len() >= num_shreds); data_shreds } diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 04970cd702..b62feb27ff 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -445,7 +445,7 @@ pub mod test { entry::create_ticks, genesis_utils::{create_genesis_config, GenesisConfigInfo}, get_tmp_ledger_path, - shred::{max_ticks_per_n_shreds, Shredder, RECOMMENDED_FEC_RATE}, + shred::{max_ticks_per_n_shreds, ProcessShredsStats, Shredder, RECOMMENDED_FEC_RATE}, }; use solana_runtime::bank::Bank; use solana_sdk::{ @@ -474,7 +474,8 @@ pub mod test { let shredder = Shredder::new(slot, 0, RECOMMENDED_FEC_RATE, keypair, 0, 0) .expect("Expected to create a new shredder"); - let coding_shreds = shredder.data_shreds_to_coding_shreds(&data_shreds[0..]); + let coding_shreds = shredder + .data_shreds_to_coding_shreds(&data_shreds[0..], &mut ProcessShredsStats::default()); ( data_shreds.clone(), coding_shreds.clone(), diff --git a/core/src/broadcast_stage/broadcast_metrics.rs b/core/src/broadcast_stage/broadcast_metrics.rs index 8f58cfd4d4..64f6ef49e2 100644 --- a/core/src/broadcast_stage/broadcast_metrics.rs +++ b/core/src/broadcast_stage/broadcast_metrics.rs @@ -12,22 +12,6 @@ pub(crate) struct BroadcastShredBatchInfo { pub(crate) slot_start_ts: Instant, } -#[derive(Default, Clone)] -pub(crate) struct ProcessShredsStats { - // Per-slot elapsed time - pub(crate) shredding_elapsed: u64, - pub(crate) receive_elapsed: u64, -} -impl ProcessShredsStats { - pub(crate) fn update(&mut self, new_stats: &ProcessShredsStats) { - self.shredding_elapsed += new_stats.shredding_elapsed; - self.receive_elapsed += new_stats.receive_elapsed; - } - pub(crate) fn reset(&mut self) { - *self = Self::default(); - } -} - #[derive(Default, Clone)] pub struct TransmitShredsStats { pub transmit_elapsed: u64, diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 1228c38546..40756bbcd7 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -5,7 +5,7 @@ use super::{ use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo; use solana_ledger::{ entry::Entry, - shred::{Shred, Shredder, RECOMMENDED_FEC_RATE, SHRED_TICK_REFERENCE_MASK}, + shred::{ProcessShredsStats, Shred, Shredder, RECOMMENDED_FEC_RATE, SHRED_TICK_REFERENCE_MASK}, }; use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us}; use std::collections::HashMap; @@ -115,9 +115,10 @@ impl StandardBroadcastRun { next_shred_index: u32, entries: &[Entry], is_slot_end: bool, + process_stats: &mut ProcessShredsStats, ) -> Vec { let (data_shreds, new_next_shred_index) = - shredder.entries_to_data_shreds(entries, is_slot_end, next_shred_index); + shredder.entries_to_data_shreds(entries, is_slot_end, next_shred_index, process_stats); self.unfinished_slot = Some(UnfinishedSlotInfo { next_shred_index: new_next_shred_index, @@ -176,7 +177,9 @@ impl StandardBroadcastRun { receive_elapsed = Duration::new(0, 0); } - let to_shreds_start = Instant::now(); + let mut process_stats = ProcessShredsStats::default(); + + let mut to_shreds_time = Measure::start("broadcast_to_shreds"); // 1) Check if slot was interrupted let last_unfinished_slot_shred = @@ -193,6 +196,7 @@ impl StandardBroadcastRun { next_shred_index, &receive_results.entries, is_last_in_slot, + &mut process_stats, ); // Insert the first shred so blockstore stores that the leader started this block // This must be done before the blocks are sent out over the wire. @@ -203,8 +207,9 @@ impl StandardBroadcastRun { .expect("Failed to insert shreds in blockstore"); } let last_data_shred = data_shreds.len(); - let to_shreds_elapsed = to_shreds_start.elapsed(); + to_shreds_time.stop(); + let mut get_leader_schedule_time = Measure::start("broadcast_get_leader_schedule"); let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); let stakes = stakes.map(Arc::new); @@ -241,18 +246,31 @@ impl StandardBroadcastRun { .clone() .expect("Start timestamp must exist for a slot if we're broadcasting the slot"), }); + get_leader_schedule_time.stop(); + let mut coding_send_time = Measure::start("broadcast_coding_send"); + + // Send data shreds let data_shreds = Arc::new(data_shreds); socket_sender.send(((stakes.clone(), data_shreds.clone()), batch_info.clone()))?; blockstore_sender.send((data_shreds.clone(), batch_info.clone()))?; - let coding_shreds = shredder.data_shreds_to_coding_shreds(&data_shreds[0..last_data_shred]); + + // Create and send coding shreds + let coding_shreds = shredder + .data_shreds_to_coding_shreds(&data_shreds[0..last_data_shred], &mut process_stats); let coding_shreds = Arc::new(coding_shreds); socket_sender.send(((stakes, coding_shreds.clone()), batch_info.clone()))?; blockstore_sender.send((coding_shreds, batch_info))?; - self.process_shreds_stats.update(&ProcessShredsStats { - shredding_elapsed: duration_as_us(&to_shreds_elapsed), - receive_elapsed: duration_as_us(&receive_elapsed), - }); + + coding_send_time.stop(); + + process_stats.shredding_elapsed = to_shreds_time.as_us(); + process_stats.get_leader_schedule_elapsed = get_leader_schedule_time.as_us(); + process_stats.receive_elapsed = duration_as_us(&receive_elapsed); + process_stats.coding_send_elapsed = coding_send_time.as_us(); + + self.process_shreds_stats.update(&process_stats); + if last_tick_height == bank.max_tick_height() { self.report_and_reset_stats(); self.unfinished_slot = None; @@ -362,8 +380,8 @@ impl StandardBroadcastRun { datapoint_info!( "broadcast-process-shreds-stats", ("slot", self.unfinished_slot.unwrap().slot as i64, i64), - ("shredding_time", stats.shredding_elapsed as i64, i64), - ("receive_time", stats.receive_elapsed as i64, i64), + ("shredding_time", stats.shredding_elapsed, i64), + ("receive_time", stats.receive_elapsed, i64), ( "num_data_shreds", i64::from(self.unfinished_slot.unwrap().next_shred_index), @@ -374,6 +392,16 @@ impl StandardBroadcastRun { self.slot_broadcast_start.unwrap().elapsed().as_micros() as i64, i64 ), + ( + "get_leader_schedule_time", + stats.get_leader_schedule_elapsed, + i64 + ), + ("serialize_shreds_time", stats.serialize_elapsed, i64), + ("gen_data_time", stats.gen_data_elapsed, i64), + ("gen_coding_time", stats.gen_coding_elapsed, i64), + ("sign_coding_time", stats.sign_coding_elapsed, i64), + ("coding_send_time", stats.coding_send_elapsed, i64), ); self.process_shreds_stats.reset(); } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 3f41b29bb5..497297ccd7 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -11,7 +11,7 @@ use rayon::{ ThreadPool, }; use serde::{Deserialize, Serialize}; -use solana_metrics::datapoint_debug; +use solana_measure::measure::Measure; use solana_perf::packet::Packet; use solana_rayon_threadlimit::get_thread_count; use solana_sdk::{ @@ -21,10 +21,38 @@ use solana_sdk::{ pubkey::Pubkey, signature::{Keypair, Signature, Signer}, }; -use std::{mem::size_of, sync::Arc, time::Instant}; +use std::{mem::size_of, sync::Arc}; use thiserror::Error; +#[derive(Default, Clone)] +pub struct ProcessShredsStats { + // Per-slot elapsed time + pub shredding_elapsed: u64, + pub receive_elapsed: u64, + pub serialize_elapsed: u64, + pub gen_data_elapsed: u64, + pub gen_coding_elapsed: u64, + pub sign_coding_elapsed: u64, + pub coding_send_elapsed: u64, + pub get_leader_schedule_elapsed: u64, +} +impl ProcessShredsStats { + pub fn update(&mut self, new_stats: &ProcessShredsStats) { + self.shredding_elapsed += new_stats.shredding_elapsed; + self.receive_elapsed += new_stats.receive_elapsed; + self.serialize_elapsed += new_stats.serialize_elapsed; + self.gen_data_elapsed += new_stats.gen_data_elapsed; + self.gen_coding_elapsed += new_stats.gen_coding_elapsed; + self.sign_coding_elapsed += new_stats.sign_coding_elapsed; + self.coding_send_elapsed += new_stats.gen_coding_elapsed; + self.get_leader_schedule_elapsed += new_stats.get_leader_schedule_elapsed; + } + pub fn reset(&mut self) { + *self = Self::default(); + } +} + pub type Nonce = u32; /// The following constants are computed by hand, and hardcoded. @@ -477,9 +505,10 @@ impl Shredder { is_last_in_slot: bool, next_shred_index: u32, ) -> (Vec, Vec, u32) { + let mut stats = ProcessShredsStats::default(); let (data_shreds, last_shred_index) = - self.entries_to_data_shreds(entries, is_last_in_slot, next_shred_index); - let coding_shreds = self.data_shreds_to_coding_shreds(&data_shreds); + self.entries_to_data_shreds(entries, is_last_in_slot, next_shred_index, &mut stats); + let coding_shreds = self.data_shreds_to_coding_shreds(&data_shreds, &mut stats); (data_shreds, coding_shreds, last_shred_index) } @@ -488,13 +517,14 @@ impl Shredder { entries: &[Entry], is_last_in_slot: bool, next_shred_index: u32, + process_stats: &mut ProcessShredsStats, ) -> (Vec, u32) { - let now = Instant::now(); + let mut serialize_time = Measure::start("shred_serialize"); let serialized_shreds = bincode::serialize(entries).expect("Expect to serialize all entries"); - let serialize_time = now.elapsed().as_millis(); + serialize_time.stop(); - let now = Instant::now(); + let mut gen_data_time = Measure::start("shred_gen_data_time"); let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD; let num_shreds = (serialized_shreds.len() + no_header_size - 1) / no_header_size; @@ -539,19 +569,20 @@ impl Shredder { .collect() }) }); - let gen_data_time = now.elapsed().as_millis(); - datapoint_debug!( - "shredding-stats", - ("slot", self.slot as i64, i64), - ("num_data_shreds", data_shreds.len() as i64, i64), - ("serializing", serialize_time as i64, i64), - ("gen_data", gen_data_time as i64, i64), - ); + gen_data_time.stop(); + + process_stats.serialize_elapsed += serialize_time.as_us(); + process_stats.gen_data_elapsed += gen_data_time.as_us(); + (data_shreds, last_shred_index + 1) } - pub fn data_shreds_to_coding_shreds(&self, data_shreds: &[Shred]) -> Vec { - let now = Instant::now(); + pub fn data_shreds_to_coding_shreds( + &self, + data_shreds: &[Shred], + process_stats: &mut ProcessShredsStats, + ) -> Vec { + let mut gen_coding_time = Measure::start("gen_coding_shreds"); // 2) Generate coding shreds let mut coding_shreds: Vec<_> = PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { @@ -568,9 +599,9 @@ impl Shredder { .collect() }) }); - let gen_coding_time = now.elapsed().as_millis(); + gen_coding_time.stop(); - let now = Instant::now(); + let mut sign_coding_time = Measure::start("sign_coding_shreds"); // 3) Sign coding shreds PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { @@ -579,14 +610,10 @@ impl Shredder { }) }) }); - let sign_coding_time = now.elapsed().as_millis(); + sign_coding_time.stop(); - datapoint_debug!( - "shredding-stats", - ("num_coding_shreds", coding_shreds.len() as i64, i64), - ("gen_coding", gen_coding_time as i64, i64), - ("sign_coding", sign_coding_time as i64, i64), - ); + process_stats.gen_coding_elapsed += gen_coding_time.as_us(); + process_stats.sign_coding_elapsed += sign_coding_time.as_us(); coding_shreds } @@ -1614,19 +1641,22 @@ pub mod tests { }) .collect(); + let mut stats = ProcessShredsStats::default(); let start_index = 0x12; let (data_shreds, _next_index) = - shredder.entries_to_data_shreds(&entries, true, start_index); + shredder.entries_to_data_shreds(&entries, true, start_index, &mut stats); assert!(data_shreds.len() > MAX_DATA_SHREDS_PER_FEC_BLOCK as usize); (1..=MAX_DATA_SHREDS_PER_FEC_BLOCK as usize).for_each(|count| { - let coding_shreds = shredder.data_shreds_to_coding_shreds(&data_shreds[..count]); + let coding_shreds = + shredder.data_shreds_to_coding_shreds(&data_shreds[..count], &mut stats); assert_eq!(coding_shreds.len(), count); }); let coding_shreds = shredder.data_shreds_to_coding_shreds( &data_shreds[..MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1], + &mut stats, ); assert_eq!( coding_shreds.len(),