Broadcast/Shredding Metrics (#6270)

automerge
This commit is contained in:
carllin
2019-10-08 01:42:42 -07:00
committed by Grimes
parent ac2374e9a1
commit 9dceb8ac74
3 changed files with 20 additions and 10 deletions

View File

@ -27,8 +27,8 @@ const RECEIVE_ENTRY_COUNT_THRESHOLD: usize = 8;
pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result<ReceiveResults> { pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result<ReceiveResults> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let (mut bank, (entry, mut last_tick)) = receiver.recv_timeout(timer)?;
let recv_start = Instant::now(); let recv_start = Instant::now();
let (mut bank, (entry, mut last_tick)) = receiver.recv_timeout(timer)?;
let mut entries = vec![entry]; let mut entries = vec![entry];
let mut slot = bank.slot(); let mut slot = bank.slot();
@ -41,6 +41,7 @@ pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result
// If the bank changed, that implies the previous slot was interrupted and we do not have to // If the bank changed, that implies the previous slot was interrupted and we do not have to
// broadcast its entries. // broadcast its entries.
if try_bank.slot() != slot { if try_bank.slot() != slot {
warn!("Broadcast for slot: {} interrupted", bank.slot());
entries.clear(); entries.clear();
bank = try_bank; bank = try_bank;
slot = bank.slot(); slot = bank.slot();

View File

@ -2,6 +2,7 @@ use super::broadcast_utils;
use super::*; use super::*;
use crate::shred::{Shredder, RECOMMENDED_FEC_RATE}; use crate::shred::{Shredder, RECOMMENDED_FEC_RATE};
use solana_sdk::timing::duration_as_ms; use solana_sdk::timing::duration_as_ms;
use std::time::Duration;
#[derive(Default)] #[derive(Default)]
struct BroadcastStats { struct BroadcastStats {
@ -16,6 +17,8 @@ pub(super) struct StandardBroadcastRun {
shredding_elapsed: u128, shredding_elapsed: u128,
insertion_elapsed: u128, insertion_elapsed: u128,
broadcast_elapsed: u128, broadcast_elapsed: u128,
receive_elapsed: u128,
clone_and_seed_elapsed: u128,
slot_broadcast_start: Option<Instant>, slot_broadcast_start: Option<Instant>,
} }
@ -27,6 +30,8 @@ impl StandardBroadcastRun {
shredding_elapsed: 0, shredding_elapsed: 0,
insertion_elapsed: 0, insertion_elapsed: 0,
broadcast_elapsed: 0, broadcast_elapsed: 0,
receive_elapsed: 0,
clone_and_seed_elapsed: 0,
slot_broadcast_start: None, slot_broadcast_start: None,
} }
} }
@ -80,7 +85,7 @@ impl BroadcastRun for StandardBroadcastRun {
) -> Result<()> { ) -> Result<()> {
// 1) Pull entries from banking stage // 1) Pull entries from banking stage
let receive_results = broadcast_utils::recv_slot_entries(receiver)?; let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
let receive_elapsed = receive_results.time_elapsed; let mut receive_elapsed = receive_results.time_elapsed;
let num_entries = receive_results.entries.len(); let num_entries = receive_results.entries.len();
let bank = receive_results.bank.clone(); let bank = receive_results.bank.clone();
let last_tick = receive_results.last_tick; let last_tick = receive_results.last_tick;
@ -89,6 +94,7 @@ impl BroadcastRun for StandardBroadcastRun {
if Some(bank.slot()) != self.current_slot { if Some(bank.slot()) != self.current_slot {
self.slot_broadcast_start = Some(Instant::now()); self.slot_broadcast_start = Some(Instant::now());
self.current_slot = Some(bank.slot()); self.current_slot = Some(bank.slot());
receive_elapsed = Duration::new(0, 0);
} }
// 2) Convert entries to blobs + generate coding blobs // 2) Convert entries to blobs + generate coding blobs
@ -122,6 +128,7 @@ impl BroadcastRun for StandardBroadcastRun {
); );
let to_shreds_elapsed = to_shreds_start.elapsed(); let to_shreds_elapsed = to_shreds_start.elapsed();
let clone_and_seed_start = Instant::now();
let all_shreds = data_shreds let all_shreds = data_shreds
.iter() .iter()
.cloned() .cloned()
@ -129,6 +136,7 @@ impl BroadcastRun for StandardBroadcastRun {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect(); let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect();
let num_shreds = all_shreds.len(); let num_shreds = all_shreds.len();
let clone_and_seed_elapsed = clone_and_seed_start.elapsed();
// Insert shreds into blocktree // Insert shreds into blocktree
let insert_shreds_start = Instant::now(); let insert_shreds_start = Instant::now();
@ -161,6 +169,8 @@ impl BroadcastRun for StandardBroadcastRun {
self.insertion_elapsed += insert_shreds_elapsed.as_millis(); self.insertion_elapsed += insert_shreds_elapsed.as_millis();
self.shredding_elapsed += to_shreds_elapsed.as_millis(); self.shredding_elapsed += to_shreds_elapsed.as_millis();
self.broadcast_elapsed += broadcast_elapsed.as_millis(); self.broadcast_elapsed += broadcast_elapsed.as_millis();
self.receive_elapsed += receive_elapsed.as_millis();
self.clone_and_seed_elapsed += clone_and_seed_elapsed.as_millis();
if last_tick == bank.max_tick_height() { if last_tick == bank.max_tick_height() {
datapoint_info!( datapoint_info!(
@ -169,6 +179,8 @@ impl BroadcastRun for StandardBroadcastRun {
("shredding_time", self.shredding_elapsed as i64, i64), ("shredding_time", self.shredding_elapsed as i64, i64),
("insertion_time", self.insertion_elapsed as i64, i64), ("insertion_time", self.insertion_elapsed as i64, i64),
("broadcast_time", self.broadcast_elapsed as i64, i64), ("broadcast_time", self.broadcast_elapsed as i64, i64),
("receive_time", self.receive_elapsed as i64, i64),
("clone_and_seed", self.clone_and_seed_elapsed as i64, i64),
("num_shreds", i64::from(latest_shred_index), i64), ("num_shreds", i64::from(latest_shred_index), i64),
( (
"slot_broadcast_time", "slot_broadcast_time",
@ -179,6 +191,8 @@ impl BroadcastRun for StandardBroadcastRun {
self.insertion_elapsed = 0; self.insertion_elapsed = 0;
self.shredding_elapsed = 0; self.shredding_elapsed = 0;
self.broadcast_elapsed = 0; self.broadcast_elapsed = 0;
self.receive_elapsed = 0;
self.clone_and_seed_elapsed = 0;
} }
self.update_broadcast_stats( self.update_broadcast_stats(

View File

@ -330,6 +330,7 @@ impl Shredder {
let now = Instant::now(); let now = Instant::now();
let serialized_shreds = let serialized_shreds =
bincode::serialize(entries).expect("Expect to serialize all entries"); bincode::serialize(entries).expect("Expect to serialize all entries");
let serialize_time = now.elapsed().as_millis();
let no_header_size = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER; let no_header_size = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER;
let num_shreds = (serialized_shreds.len() + no_header_size - 1) / no_header_size; let num_shreds = (serialized_shreds.len() + no_header_size - 1) / no_header_size;
@ -392,7 +393,6 @@ impl Shredder {
}) })
}); });
// TODO: pre-allocate this
let elapsed = now.elapsed().as_millis(); let elapsed = now.elapsed().as_millis();
datapoint_info!( datapoint_info!(
@ -400,13 +400,8 @@ impl Shredder {
("slot", self.slot as i64, i64), ("slot", self.slot as i64, i64),
("num_data_shreds", data_shreds.len() as i64, i64), ("num_data_shreds", data_shreds.len() as i64, i64),
("num_coding_shreds", coding_shreds.len() as i64, i64), ("num_coding_shreds", coding_shreds.len() as i64, i64),
// TODO: update signing_coding_time ("signing_coding", (elapsed - serialize_time) as i64, i64),
("signing_coding", self.signing_coding_time as i64, i64), ("serialzing", serialize_time as i64, i64),
(
"copying_serialzing",
(elapsed - self.signing_coding_time) as i64,
i64
),
); );
(data_shreds, coding_shreds, last_shred_index + 1) (data_shreds, coding_shreds, last_shred_index + 1)