Broadcast Metrics (#6166)
* Add timing logigng to broadcast * Shred metrics * Fixes
This commit is contained in:
		@@ -109,8 +109,10 @@ pub(super) fn entries_to_shreds(
 | 
			
		||||
    )
 | 
			
		||||
    .expect("Expected to create a new shredder");
 | 
			
		||||
 | 
			
		||||
    let now = Instant::now();
 | 
			
		||||
    bincode::serialize_into(&mut shredder, &entries)
 | 
			
		||||
        .expect("Expect to write all entries to shreds");
 | 
			
		||||
    let elapsed = now.elapsed().as_millis();
 | 
			
		||||
 | 
			
		||||
    let unfinished_slot = if last_tick == bank_max_tick {
 | 
			
		||||
        shredder.finalize_slot();
 | 
			
		||||
@@ -124,9 +126,20 @@ pub(super) fn entries_to_shreds(
 | 
			
		||||
        })
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    let num_shreds = shredder.shreds.len();
 | 
			
		||||
    shreds.append(&mut shredder.shreds);
 | 
			
		||||
 | 
			
		||||
    trace!("Inserting {:?} shreds in blocktree", shreds.len());
 | 
			
		||||
    datapoint_info!(
 | 
			
		||||
        "shredding-stats",
 | 
			
		||||
        ("slot", slot as i64, i64),
 | 
			
		||||
        ("num_shreds", num_shreds as i64, i64),
 | 
			
		||||
        ("signing_coding", shredder.signing_coding_time as i64, i64),
 | 
			
		||||
        (
 | 
			
		||||
            "copying_serializing",
 | 
			
		||||
            (elapsed - shredder.signing_coding_time) as i64,
 | 
			
		||||
            i64
 | 
			
		||||
        ),
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    (shreds, unfinished_slot)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -13,6 +13,11 @@ struct BroadcastStats {
 | 
			
		||||
pub(super) struct StandardBroadcastRun {
 | 
			
		||||
    stats: BroadcastStats,
 | 
			
		||||
    unfinished_slot: Option<UnfinishedSlotInfo>,
 | 
			
		||||
    current_slot: Option<u64>,
 | 
			
		||||
    shredding_elapsed: u128,
 | 
			
		||||
    insertion_elapsed: u128,
 | 
			
		||||
    broadcast_elapsed: u128,
 | 
			
		||||
    slot_broadcast_start: Option<Instant>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl StandardBroadcastRun {
 | 
			
		||||
@@ -20,6 +25,11 @@ impl StandardBroadcastRun {
 | 
			
		||||
        Self {
 | 
			
		||||
            stats: BroadcastStats::default(),
 | 
			
		||||
            unfinished_slot: None,
 | 
			
		||||
            current_slot: None,
 | 
			
		||||
            shredding_elapsed: 0,
 | 
			
		||||
            insertion_elapsed: 0,
 | 
			
		||||
            broadcast_elapsed: 0,
 | 
			
		||||
            slot_broadcast_start: None,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -78,6 +88,11 @@ impl BroadcastRun for StandardBroadcastRun {
 | 
			
		||||
        let last_tick = receive_results.last_tick;
 | 
			
		||||
        inc_new_counter_info!("broadcast_service-entries_received", num_entries);
 | 
			
		||||
 | 
			
		||||
        if Some(bank.slot()) != self.current_slot {
 | 
			
		||||
            self.slot_broadcast_start = Some(Instant::now());
 | 
			
		||||
            self.current_slot = Some(bank.slot());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // 2) Convert entries to blobs + generate coding blobs
 | 
			
		||||
        let keypair = &cluster_info.read().unwrap().keypair.clone();
 | 
			
		||||
        let latest_shred_index = blocktree
 | 
			
		||||
@@ -121,6 +136,7 @@ impl BroadcastRun for StandardBroadcastRun {
 | 
			
		||||
 | 
			
		||||
        let all_shred_bufs: Vec<Vec<u8>> = shred_infos.into_iter().map(|s| s.payload).collect();
 | 
			
		||||
        trace!("Broadcasting {:?} shreds", all_shred_bufs.len());
 | 
			
		||||
 | 
			
		||||
        cluster_info.read().unwrap().broadcast_shreds(
 | 
			
		||||
            sock,
 | 
			
		||||
            &all_shred_bufs,
 | 
			
		||||
@@ -136,6 +152,30 @@ impl BroadcastRun for StandardBroadcastRun {
 | 
			
		||||
                .map(|meta| meta.consumed)
 | 
			
		||||
                .unwrap_or(0)
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        self.insertion_elapsed += insert_shreds_elapsed.as_millis();
 | 
			
		||||
        self.shredding_elapsed += to_shreds_elapsed.as_millis();
 | 
			
		||||
        self.broadcast_elapsed += broadcast_elapsed.as_millis();
 | 
			
		||||
 | 
			
		||||
        if last_tick == bank.max_tick_height() {
 | 
			
		||||
            datapoint_info!(
 | 
			
		||||
                "broadcast-bank-stats",
 | 
			
		||||
                ("slot", bank.slot() as i64, i64),
 | 
			
		||||
                ("shredding_time", self.shredding_elapsed as i64, i64),
 | 
			
		||||
                ("insertion_time", self.insertion_elapsed as i64, i64),
 | 
			
		||||
                ("broadcast_time", self.broadcast_elapsed as i64, i64),
 | 
			
		||||
                ("num_shreds", latest_shred_index as i64, i64),
 | 
			
		||||
                (
 | 
			
		||||
                    "slot_broadcast_time",
 | 
			
		||||
                    self.slot_broadcast_start.unwrap().elapsed().as_millis() as i64,
 | 
			
		||||
                    i64
 | 
			
		||||
                ),
 | 
			
		||||
            );
 | 
			
		||||
            self.insertion_elapsed = 0;
 | 
			
		||||
            self.shredding_elapsed = 0;
 | 
			
		||||
            self.broadcast_elapsed = 0;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        self.update_broadcast_stats(
 | 
			
		||||
            duration_as_ms(&receive_elapsed),
 | 
			
		||||
            duration_as_ms(&to_shreds_elapsed),
 | 
			
		||||
 
 | 
			
		||||
@@ -15,6 +15,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
 | 
			
		||||
use std::io;
 | 
			
		||||
use std::io::{Error as IOError, ErrorKind, Write};
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::time::Instant;
 | 
			
		||||
 | 
			
		||||
lazy_static! {
 | 
			
		||||
    static ref SIZE_OF_CODING_SHRED_HEADER: usize =
 | 
			
		||||
@@ -261,6 +262,7 @@ pub struct Shredder {
 | 
			
		||||
    active_shred: Vec<u8>,
 | 
			
		||||
    active_shred_header: DataShredHeader,
 | 
			
		||||
    active_offset: usize,
 | 
			
		||||
    pub signing_coding_time: u128,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Write for Shredder {
 | 
			
		||||
@@ -277,7 +279,9 @@ impl Write for Shredder {
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if self.index - self.fec_set_index >= MAX_DATA_SHREDS_PER_FEC_BLOCK {
 | 
			
		||||
            let now = Instant::now();
 | 
			
		||||
            self.sign_unsigned_shreds_and_generate_codes();
 | 
			
		||||
            self.signing_coding_time += now.elapsed().as_millis();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(slice_len)
 | 
			
		||||
@@ -330,6 +334,7 @@ impl Shredder {
 | 
			
		||||
                active_shred,
 | 
			
		||||
                active_shred_header: header,
 | 
			
		||||
                active_offset: 0,
 | 
			
		||||
                signing_coding_time: 0,
 | 
			
		||||
            })
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user