@ -77,7 +77,6 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
|
|||||||
}
|
}
|
||||||
|
|
||||||
blocktree.insert_shreds(data_shreds.clone(), None)?;
|
blocktree.insert_shreds(data_shreds.clone(), None)?;
|
||||||
blocktree.insert_shreds(coding_shreds.clone(), None)?;
|
|
||||||
|
|
||||||
// 3) Start broadcast step
|
// 3) Start broadcast step
|
||||||
let peers = cluster_info.read().unwrap().tvu_peers();
|
let peers = cluster_info.read().unwrap().tvu_peers();
|
||||||
|
@ -3,8 +3,10 @@ use super::*;
|
|||||||
use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo;
|
use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo;
|
||||||
use solana_ledger::entry::Entry;
|
use solana_ledger::entry::Entry;
|
||||||
use solana_ledger::shred::{Shred, Shredder, RECOMMENDED_FEC_RATE, SHRED_TICK_REFERENCE_MASK};
|
use solana_ledger::shred::{Shred, Shredder, RECOMMENDED_FEC_RATE, SHRED_TICK_REFERENCE_MASK};
|
||||||
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use solana_sdk::signature::Keypair;
|
use solana_sdk::signature::Keypair;
|
||||||
use solana_sdk::timing::duration_as_us;
|
use solana_sdk::timing::duration_as_us;
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
@ -14,7 +16,7 @@ struct BroadcastStats {
|
|||||||
insert_shreds_elapsed: u64,
|
insert_shreds_elapsed: u64,
|
||||||
broadcast_elapsed: u64,
|
broadcast_elapsed: u64,
|
||||||
receive_elapsed: u64,
|
receive_elapsed: u64,
|
||||||
clone_and_seed_elapsed: u64,
|
seed_elapsed: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BroadcastStats {
|
impl BroadcastStats {
|
||||||
@ -23,7 +25,7 @@ impl BroadcastStats {
|
|||||||
self.shredding_elapsed = 0;
|
self.shredding_elapsed = 0;
|
||||||
self.broadcast_elapsed = 0;
|
self.broadcast_elapsed = 0;
|
||||||
self.receive_elapsed = 0;
|
self.receive_elapsed = 0;
|
||||||
self.clone_and_seed_elapsed = 0;
|
self.seed_elapsed = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -77,27 +79,6 @@ impl StandardBroadcastRun {
|
|||||||
last_unfinished_slot_shred
|
last_unfinished_slot_shred
|
||||||
}
|
}
|
||||||
|
|
||||||
fn coalesce_shreds(
|
|
||||||
data_shreds: Vec<Shred>,
|
|
||||||
coding_shreds: Vec<Shred>,
|
|
||||||
last_unfinished_slot_shred: Option<Shred>,
|
|
||||||
) -> Vec<Shred> {
|
|
||||||
if let Some(shred) = last_unfinished_slot_shred {
|
|
||||||
data_shreds
|
|
||||||
.iter()
|
|
||||||
.chain(coding_shreds.iter())
|
|
||||||
.cloned()
|
|
||||||
.chain(std::iter::once(shred))
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
} else {
|
|
||||||
data_shreds
|
|
||||||
.iter()
|
|
||||||
.chain(coding_shreds.iter())
|
|
||||||
.cloned()
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn entries_to_shreds(
|
fn entries_to_shreds(
|
||||||
&mut self,
|
&mut self,
|
||||||
blocktree: &Blocktree,
|
blocktree: &Blocktree,
|
||||||
@ -175,80 +156,100 @@ impl StandardBroadcastRun {
|
|||||||
self.check_for_interrupted_slot(bank.ticks_per_slot() as u8);
|
self.check_for_interrupted_slot(bank.ticks_per_slot() as u8);
|
||||||
|
|
||||||
// 2) Convert entries to shreds and coding shreds
|
// 2) Convert entries to shreds and coding shreds
|
||||||
let (data_shreds, coding_shreds) = self.entries_to_shreds(
|
let (mut data_shreds, coding_shreds) = self.entries_to_shreds(
|
||||||
blocktree,
|
blocktree,
|
||||||
&receive_results.entries,
|
&receive_results.entries,
|
||||||
last_tick_height == bank.max_tick_height(),
|
last_tick_height == bank.max_tick_height(),
|
||||||
(bank.tick_height() % bank.ticks_per_slot()) as u8,
|
(bank.tick_height() % bank.ticks_per_slot()) as u8,
|
||||||
);
|
);
|
||||||
|
if let Some(last_shred) = last_unfinished_slot_shred {
|
||||||
|
data_shreds.push(last_shred);
|
||||||
|
}
|
||||||
let to_shreds_elapsed = to_shreds_start.elapsed();
|
let to_shreds_elapsed = to_shreds_start.elapsed();
|
||||||
|
|
||||||
let clone_and_seed_start = Instant::now();
|
|
||||||
let all_shreds =
|
|
||||||
Self::coalesce_shreds(data_shreds, coding_shreds, last_unfinished_slot_shred);
|
|
||||||
let all_shreds_ = all_shreds.clone();
|
|
||||||
let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect();
|
|
||||||
let clone_and_seed_elapsed = clone_and_seed_start.elapsed();
|
|
||||||
|
|
||||||
// 3) Insert shreds into blocktree
|
|
||||||
let insert_shreds_start = Instant::now();
|
|
||||||
blocktree
|
|
||||||
.insert_shreds(all_shreds_, None)
|
|
||||||
.expect("Failed to insert shreds in blocktree");
|
|
||||||
let insert_shreds_elapsed = insert_shreds_start.elapsed();
|
|
||||||
|
|
||||||
// 4) Broadcast the shreds
|
|
||||||
let broadcast_start = Instant::now();
|
|
||||||
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
|
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
|
||||||
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
|
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
|
||||||
|
|
||||||
let all_shred_bufs: Vec<Vec<u8>> = all_shreds.into_iter().map(|s| s.payload).collect();
|
self.maybe_insert_and_broadcast(
|
||||||
trace!("Broadcasting {:?} shreds", all_shred_bufs.len());
|
data_shreds,
|
||||||
|
true,
|
||||||
cluster_info.read().unwrap().broadcast_shreds(
|
blocktree,
|
||||||
sock,
|
cluster_info,
|
||||||
all_shred_bufs,
|
|
||||||
&all_seeds,
|
|
||||||
stakes.as_ref(),
|
stakes.as_ref(),
|
||||||
|
sock,
|
||||||
|
)?;
|
||||||
|
self.maybe_insert_and_broadcast(
|
||||||
|
coding_shreds,
|
||||||
|
false,
|
||||||
|
blocktree,
|
||||||
|
cluster_info,
|
||||||
|
stakes.as_ref(),
|
||||||
|
sock,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let broadcast_elapsed = broadcast_start.elapsed();
|
self.update_broadcast_stats(BroadcastStats {
|
||||||
|
shredding_elapsed: duration_as_us(&to_shreds_elapsed),
|
||||||
self.update_broadcast_stats(
|
receive_elapsed: duration_as_us(&receive_elapsed),
|
||||||
duration_as_us(&receive_elapsed),
|
..BroadcastStats::default()
|
||||||
duration_as_us(&to_shreds_elapsed),
|
});
|
||||||
duration_as_us(&insert_shreds_elapsed),
|
|
||||||
duration_as_us(&broadcast_elapsed),
|
|
||||||
duration_as_us(&clone_and_seed_elapsed),
|
|
||||||
last_tick_height == bank.max_tick_height(),
|
|
||||||
);
|
|
||||||
|
|
||||||
if last_tick_height == bank.max_tick_height() {
|
if last_tick_height == bank.max_tick_height() {
|
||||||
|
self.report_and_reset_stats();
|
||||||
self.unfinished_slot = None;
|
self.unfinished_slot = None;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
fn maybe_insert_and_broadcast(
|
||||||
fn update_broadcast_stats(
|
|
||||||
&mut self,
|
&mut self,
|
||||||
receive_entries_elapsed: u64,
|
shreds: Vec<Shred>,
|
||||||
shredding_elapsed: u64,
|
insert: bool,
|
||||||
insert_shreds_elapsed: u64,
|
blocktree: &Arc<Blocktree>,
|
||||||
broadcast_elapsed: u64,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
clone_and_seed_elapsed: u64,
|
stakes: Option<&HashMap<Pubkey, u64>>,
|
||||||
slot_ended: bool,
|
sock: &UdpSocket,
|
||||||
) {
|
) -> Result<()> {
|
||||||
self.stats.receive_elapsed += receive_entries_elapsed;
|
let seed_start = Instant::now();
|
||||||
self.stats.shredding_elapsed += shredding_elapsed;
|
let seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect();
|
||||||
self.stats.insert_shreds_elapsed += insert_shreds_elapsed;
|
let seed_elapsed = seed_start.elapsed();
|
||||||
self.stats.broadcast_elapsed += broadcast_elapsed;
|
|
||||||
self.stats.clone_and_seed_elapsed += clone_and_seed_elapsed;
|
|
||||||
|
|
||||||
if slot_ended {
|
// Insert shreds into blocktree
|
||||||
self.report_and_reset_stats()
|
let insert_shreds_start = Instant::now();
|
||||||
|
if insert {
|
||||||
|
blocktree
|
||||||
|
.insert_shreds(shreds.clone(), None)
|
||||||
|
.expect("Failed to insert shreds in blocktree");
|
||||||
}
|
}
|
||||||
|
let insert_shreds_elapsed = insert_shreds_start.elapsed();
|
||||||
|
|
||||||
|
// Broadcast the shreds
|
||||||
|
let broadcast_start = Instant::now();
|
||||||
|
let shred_bufs: Vec<Vec<u8>> = shreds.into_iter().map(|s| s.payload).collect();
|
||||||
|
trace!("Broadcasting {:?} shreds", shred_bufs.len());
|
||||||
|
|
||||||
|
cluster_info
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.broadcast_shreds(sock, shred_bufs, &seeds, stakes)?;
|
||||||
|
|
||||||
|
let broadcast_elapsed = broadcast_start.elapsed();
|
||||||
|
|
||||||
|
self.update_broadcast_stats(BroadcastStats {
|
||||||
|
insert_shreds_elapsed: duration_as_us(&insert_shreds_elapsed),
|
||||||
|
broadcast_elapsed: duration_as_us(&broadcast_elapsed),
|
||||||
|
seed_elapsed: duration_as_us(&seed_elapsed),
|
||||||
|
..BroadcastStats::default()
|
||||||
|
});
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_broadcast_stats(&mut self, stats: BroadcastStats) {
|
||||||
|
self.stats.receive_elapsed += stats.receive_elapsed;
|
||||||
|
self.stats.shredding_elapsed += stats.shredding_elapsed;
|
||||||
|
self.stats.insert_shreds_elapsed += stats.insert_shreds_elapsed;
|
||||||
|
self.stats.broadcast_elapsed += stats.broadcast_elapsed;
|
||||||
|
self.stats.seed_elapsed += stats.seed_elapsed;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn report_and_reset_stats(&mut self) {
|
fn report_and_reset_stats(&mut self) {
|
||||||
@ -264,11 +265,7 @@ impl StandardBroadcastRun {
|
|||||||
),
|
),
|
||||||
("broadcast_time", self.stats.broadcast_elapsed as i64, i64),
|
("broadcast_time", self.stats.broadcast_elapsed as i64, i64),
|
||||||
("receive_time", self.stats.receive_elapsed as i64, i64),
|
("receive_time", self.stats.receive_elapsed as i64, i64),
|
||||||
(
|
("seed", self.stats.seed_elapsed as i64, i64),
|
||||||
"clone_and_seed",
|
|
||||||
self.stats.clone_and_seed_elapsed as i64,
|
|
||||||
i64
|
|
||||||
),
|
|
||||||
(
|
(
|
||||||
"num_shreds",
|
"num_shreds",
|
||||||
i64::from(self.unfinished_slot.unwrap().next_shred_index),
|
i64::from(self.unfinished_slot.unwrap().next_shred_index),
|
||||||
|
Reference in New Issue
Block a user