Fix broadcast metrics (#9461)
* Rework broadcast metrics to support multiple threads * Update dashboards Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
@ -1,6 +1,6 @@
|
|||||||
//! A stage to broadcast data from a leader node to validators
|
//! A stage to broadcast data from a leader node to validators
|
||||||
use self::{
|
use self::{
|
||||||
broadcast_fake_shreds_run::BroadcastFakeShredsRun,
|
broadcast_fake_shreds_run::BroadcastFakeShredsRun, broadcast_metrics::*,
|
||||||
fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun,
|
fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun,
|
||||||
standard_broadcast_run::StandardBroadcastRun,
|
standard_broadcast_run::StandardBroadcastRun,
|
||||||
};
|
};
|
||||||
@ -35,13 +35,16 @@ use std::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
mod broadcast_fake_shreds_run;
|
mod broadcast_fake_shreds_run;
|
||||||
|
pub(crate) mod broadcast_metrics;
|
||||||
pub(crate) mod broadcast_utils;
|
pub(crate) mod broadcast_utils;
|
||||||
mod fail_entry_verification_broadcast_run;
|
mod fail_entry_verification_broadcast_run;
|
||||||
mod standard_broadcast_run;
|
mod standard_broadcast_run;
|
||||||
|
|
||||||
pub const NUM_INSERT_THREADS: usize = 2;
|
pub(crate) const NUM_INSERT_THREADS: usize = 2;
|
||||||
pub type RetransmitSlotsSender = CrossbeamSender<HashMap<Slot, Arc<Bank>>>;
|
pub(crate) type RetransmitSlotsSender = CrossbeamSender<HashMap<Slot, Arc<Bank>>>;
|
||||||
pub type RetransmitSlotsReceiver = CrossbeamReceiver<HashMap<Slot, Arc<Bank>>>;
|
pub(crate) type RetransmitSlotsReceiver = CrossbeamReceiver<HashMap<Slot, Arc<Bank>>>;
|
||||||
|
pub(crate) type RecordReceiver = Receiver<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>;
|
||||||
|
pub(crate) type TransmitReceiver = Receiver<(TransmitShreds, Option<BroadcastShredBatchInfo>)>;
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||||
pub enum BroadcastStageReturnType {
|
pub enum BroadcastStageReturnType {
|
||||||
@ -107,18 +110,18 @@ trait BroadcastRun {
|
|||||||
&mut self,
|
&mut self,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
receiver: &Receiver<WorkingBankEntry>,
|
receiver: &Receiver<WorkingBankEntry>,
|
||||||
socket_sender: &Sender<TransmitShreds>,
|
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||||
blockstore_sender: &Sender<Arc<Vec<Shred>>>,
|
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||||
) -> Result<()>;
|
) -> Result<()>;
|
||||||
fn transmit(
|
fn transmit(
|
||||||
&mut self,
|
&mut self,
|
||||||
receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
|
receiver: &Arc<Mutex<TransmitReceiver>>,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
sock: &UdpSocket,
|
sock: &UdpSocket,
|
||||||
) -> Result<()>;
|
) -> Result<()>;
|
||||||
fn record(
|
fn record(
|
||||||
&self,
|
&mut self,
|
||||||
receiver: &Arc<Mutex<Receiver<Arc<Vec<Shred>>>>>,
|
receiver: &Arc<Mutex<RecordReceiver>>,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
) -> Result<()>;
|
) -> Result<()>;
|
||||||
}
|
}
|
||||||
@ -150,8 +153,8 @@ impl BroadcastStage {
|
|||||||
fn run(
|
fn run(
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
receiver: &Receiver<WorkingBankEntry>,
|
receiver: &Receiver<WorkingBankEntry>,
|
||||||
socket_sender: &Sender<TransmitShreds>,
|
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||||
blockstore_sender: &Sender<Arc<Vec<Shred>>>,
|
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||||
mut broadcast_stage_run: impl BroadcastRun,
|
mut broadcast_stage_run: impl BroadcastRun,
|
||||||
) -> BroadcastStageReturnType {
|
) -> BroadcastStageReturnType {
|
||||||
loop {
|
loop {
|
||||||
@ -250,7 +253,7 @@ impl BroadcastStage {
|
|||||||
let blockstore_receiver = Arc::new(Mutex::new(blockstore_receiver));
|
let blockstore_receiver = Arc::new(Mutex::new(blockstore_receiver));
|
||||||
for _ in 0..NUM_INSERT_THREADS {
|
for _ in 0..NUM_INSERT_THREADS {
|
||||||
let blockstore_receiver = blockstore_receiver.clone();
|
let blockstore_receiver = blockstore_receiver.clone();
|
||||||
let bs_record = broadcast_stage_run.clone();
|
let mut bs_record = broadcast_stage_run.clone();
|
||||||
let btree = blockstore.clone();
|
let btree = blockstore.clone();
|
||||||
let t = Builder::new()
|
let t = Builder::new()
|
||||||
.name("solana-broadcaster-record".to_string())
|
.name("solana-broadcaster-record".to_string())
|
||||||
@ -289,7 +292,7 @@ impl BroadcastStage {
|
|||||||
fn check_retransmit_signals(
|
fn check_retransmit_signals(
|
||||||
blockstore: &Blockstore,
|
blockstore: &Blockstore,
|
||||||
retransmit_slots_receiver: &RetransmitSlotsReceiver,
|
retransmit_slots_receiver: &RetransmitSlotsReceiver,
|
||||||
socket_sender: &Sender<TransmitShreds>,
|
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timer = Duration::from_millis(100);
|
let timer = Duration::from_millis(100);
|
||||||
|
|
||||||
@ -310,7 +313,7 @@ impl BroadcastStage {
|
|||||||
);
|
);
|
||||||
|
|
||||||
if !data_shreds.is_empty() {
|
if !data_shreds.is_empty() {
|
||||||
socket_sender.send((stakes.clone(), data_shreds))?;
|
socket_sender.send(((stakes.clone(), data_shreds), None))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let coding_shreds = Arc::new(
|
let coding_shreds = Arc::new(
|
||||||
@ -320,7 +323,7 @@ impl BroadcastStage {
|
|||||||
);
|
);
|
||||||
|
|
||||||
if !coding_shreds.is_empty() {
|
if !coding_shreds.is_empty() {
|
||||||
socket_sender.send((stakes.clone(), coding_shreds))?;
|
socket_sender.send(((stakes.clone(), coding_shreds), None))?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -478,13 +481,13 @@ pub mod test {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn check_all_shreds_received(
|
fn check_all_shreds_received(
|
||||||
transmit_receiver: &Receiver<TransmitShreds>,
|
transmit_receiver: &TransmitReceiver,
|
||||||
mut data_index: u64,
|
mut data_index: u64,
|
||||||
mut coding_index: u64,
|
mut coding_index: u64,
|
||||||
num_expected_data_shreds: u64,
|
num_expected_data_shreds: u64,
|
||||||
num_expected_coding_shreds: u64,
|
num_expected_coding_shreds: u64,
|
||||||
) {
|
) {
|
||||||
while let Ok(new_retransmit_slots) = transmit_receiver.try_recv() {
|
while let Ok((new_retransmit_slots, _)) = transmit_receiver.try_recv() {
|
||||||
if new_retransmit_slots.1[0].is_data() {
|
if new_retransmit_slots.1[0].is_data() {
|
||||||
for data_shred in new_retransmit_slots.1.iter() {
|
for data_shred in new_retransmit_slots.1.iter() {
|
||||||
assert_eq!(data_shred.index() as u64, data_index);
|
assert_eq!(data_shred.index() as u64, data_index);
|
||||||
|
@ -28,8 +28,8 @@ impl BroadcastRun for BroadcastFakeShredsRun {
|
|||||||
&mut self,
|
&mut self,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
receiver: &Receiver<WorkingBankEntry>,
|
receiver: &Receiver<WorkingBankEntry>,
|
||||||
socket_sender: &Sender<TransmitShreds>,
|
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||||
blockstore_sender: &Sender<Arc<Vec<Shred>>>,
|
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// 1) Pull entries from banking stage
|
// 1) Pull entries from banking stage
|
||||||
let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
|
let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
|
||||||
@ -83,25 +83,31 @@ impl BroadcastRun for BroadcastFakeShredsRun {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let data_shreds = Arc::new(data_shreds);
|
let data_shreds = Arc::new(data_shreds);
|
||||||
blockstore_sender.send(data_shreds.clone())?;
|
blockstore_sender.send((data_shreds.clone(), None))?;
|
||||||
|
|
||||||
// 3) Start broadcast step
|
// 3) Start broadcast step
|
||||||
//some indicates fake shreds
|
//some indicates fake shreds
|
||||||
socket_sender.send((Some(Arc::new(HashMap::new())), Arc::new(fake_data_shreds)))?;
|
socket_sender.send((
|
||||||
socket_sender.send((Some(Arc::new(HashMap::new())), Arc::new(fake_coding_shreds)))?;
|
(Some(Arc::new(HashMap::new())), Arc::new(fake_data_shreds)),
|
||||||
|
None,
|
||||||
|
))?;
|
||||||
|
socket_sender.send((
|
||||||
|
(Some(Arc::new(HashMap::new())), Arc::new(fake_coding_shreds)),
|
||||||
|
None,
|
||||||
|
))?;
|
||||||
//none indicates real shreds
|
//none indicates real shreds
|
||||||
socket_sender.send((None, data_shreds))?;
|
socket_sender.send(((None, data_shreds), None))?;
|
||||||
socket_sender.send((None, Arc::new(coding_shreds)))?;
|
socket_sender.send(((None, Arc::new(coding_shreds)), None))?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
fn transmit(
|
fn transmit(
|
||||||
&mut self,
|
&mut self,
|
||||||
receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
|
receiver: &Arc<Mutex<TransmitReceiver>>,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
sock: &UdpSocket,
|
sock: &UdpSocket,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
for (stakes, data_shreds) in receiver.lock().unwrap().iter() {
|
for ((stakes, data_shreds), _) in receiver.lock().unwrap().iter() {
|
||||||
let peers = cluster_info.read().unwrap().tvu_peers();
|
let peers = cluster_info.read().unwrap().tvu_peers();
|
||||||
peers.iter().enumerate().for_each(|(i, peer)| {
|
peers.iter().enumerate().for_each(|(i, peer)| {
|
||||||
if i <= self.partition && stakes.is_some() {
|
if i <= self.partition && stakes.is_some() {
|
||||||
@ -119,11 +125,11 @@ impl BroadcastRun for BroadcastFakeShredsRun {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
fn record(
|
fn record(
|
||||||
&self,
|
&mut self,
|
||||||
receiver: &Arc<Mutex<Receiver<Arc<Vec<Shred>>>>>,
|
receiver: &Arc<Mutex<RecordReceiver>>,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
for data_shreds in receiver.lock().unwrap().iter() {
|
for (data_shreds, _) in receiver.lock().unwrap().iter() {
|
||||||
blockstore.insert_shreds(data_shreds.to_vec(), None, true)?;
|
blockstore.insert_shreds(data_shreds.to_vec(), None, true)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
288
core/src/broadcast_stage/broadcast_metrics.rs
Normal file
288
core/src/broadcast_stage/broadcast_metrics.rs
Normal file
@ -0,0 +1,288 @@
|
|||||||
|
use super::*;
|
||||||
|
|
||||||
|
pub(crate) trait BroadcastStats {
|
||||||
|
fn update(&mut self, new_stats: &Self);
|
||||||
|
fn report_stats(&mut self, slot: Slot, slot_start: Instant);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub(crate) struct BroadcastShredBatchInfo {
|
||||||
|
pub(crate) slot: Slot,
|
||||||
|
pub(crate) num_expected_batches: Option<usize>,
|
||||||
|
pub(crate) slot_start_ts: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Clone)]
|
||||||
|
pub(crate) struct ProcessShredsStats {
|
||||||
|
// Per-slot elapsed time
|
||||||
|
pub(crate) shredding_elapsed: u64,
|
||||||
|
pub(crate) receive_elapsed: u64,
|
||||||
|
}
|
||||||
|
impl ProcessShredsStats {
|
||||||
|
pub(crate) fn update(&mut self, new_stats: &ProcessShredsStats) {
|
||||||
|
self.shredding_elapsed += new_stats.shredding_elapsed;
|
||||||
|
self.receive_elapsed += new_stats.receive_elapsed;
|
||||||
|
}
|
||||||
|
pub(crate) fn reset(&mut self) {
|
||||||
|
*self = Self::default();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Clone)]
|
||||||
|
pub(crate) struct TransmitShredsStats {
|
||||||
|
pub(crate) transmit_elapsed: u64,
|
||||||
|
pub(crate) send_mmsg_elapsed: u64,
|
||||||
|
pub(crate) get_peers_elapsed: u64,
|
||||||
|
pub(crate) num_shreds: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BroadcastStats for TransmitShredsStats {
|
||||||
|
fn update(&mut self, new_stats: &TransmitShredsStats) {
|
||||||
|
self.transmit_elapsed += new_stats.transmit_elapsed;
|
||||||
|
self.send_mmsg_elapsed += new_stats.send_mmsg_elapsed;
|
||||||
|
self.get_peers_elapsed += new_stats.get_peers_elapsed;
|
||||||
|
self.num_shreds += new_stats.num_shreds;
|
||||||
|
}
|
||||||
|
fn report_stats(&mut self, slot: Slot, slot_start: Instant) {
|
||||||
|
datapoint_info!(
|
||||||
|
"broadcast-transmit-shreds-stats",
|
||||||
|
("slot", slot as i64, i64),
|
||||||
|
(
|
||||||
|
"end_to_end_elapsed",
|
||||||
|
// `slot_start` signals when the first batch of shreds was
|
||||||
|
// received, used to measure duration of broadcast
|
||||||
|
slot_start.elapsed().as_micros() as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
("transmit_elapsed", self.transmit_elapsed as i64, i64),
|
||||||
|
("send_mmsg_elapsed", self.send_mmsg_elapsed as i64, i64),
|
||||||
|
("get_peers_elapsed", self.get_peers_elapsed as i64, i64),
|
||||||
|
("num_shreds", self.num_shreds as i64, i64),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Clone)]
|
||||||
|
pub(crate) struct InsertShredsStats {
|
||||||
|
pub(crate) insert_shreds_elapsed: u64,
|
||||||
|
pub(crate) num_shreds: usize,
|
||||||
|
}
|
||||||
|
impl BroadcastStats for InsertShredsStats {
|
||||||
|
fn update(&mut self, new_stats: &InsertShredsStats) {
|
||||||
|
self.insert_shreds_elapsed += new_stats.insert_shreds_elapsed;
|
||||||
|
self.num_shreds += new_stats.num_shreds;
|
||||||
|
}
|
||||||
|
fn report_stats(&mut self, slot: Slot, slot_start: Instant) {
|
||||||
|
datapoint_info!(
|
||||||
|
"broadcast-insert-shreds-stats",
|
||||||
|
("slot", slot as i64, i64),
|
||||||
|
(
|
||||||
|
"end_to_end_elapsed",
|
||||||
|
// `slot_start` signals when the first batch of shreds was
|
||||||
|
// received, used to measure duration of broadcast
|
||||||
|
slot_start.elapsed().as_micros() as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"insert_shreds_elapsed",
|
||||||
|
self.insert_shreds_elapsed as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
("num_shreds", self.num_shreds as i64, i64),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tracks metrics of type `T` acrosss multiple threads
|
||||||
|
#[derive(Default)]
|
||||||
|
pub(crate) struct BatchCounter<T: BroadcastStats + Default> {
|
||||||
|
// The number of batches processed across all threads so far
|
||||||
|
num_batches: usize,
|
||||||
|
// Filled in when the last batch of shreds is received,
|
||||||
|
// signals how many batches of shreds to expect
|
||||||
|
num_expected_batches: Option<usize>,
|
||||||
|
broadcast_shred_stats: T,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: BroadcastStats + Default> BatchCounter<T> {
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) fn num_batches(&self) -> usize {
|
||||||
|
self.num_batches
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub(crate) struct SlotBroadcastStats<T: BroadcastStats + Default>(HashMap<Slot, BatchCounter<T>>);
|
||||||
|
|
||||||
|
impl<T: BroadcastStats + Default> SlotBroadcastStats<T> {
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) fn get(&self, slot: Slot) -> Option<&BatchCounter<T>> {
|
||||||
|
self.0.get(&slot)
|
||||||
|
}
|
||||||
|
pub(crate) fn update(&mut self, new_stats: &T, batch_info: &Option<BroadcastShredBatchInfo>) {
|
||||||
|
if let Some(batch_info) = batch_info {
|
||||||
|
let mut should_delete = false;
|
||||||
|
{
|
||||||
|
let slot_batch_counter = self.0.entry(batch_info.slot).or_default();
|
||||||
|
slot_batch_counter.broadcast_shred_stats.update(new_stats);
|
||||||
|
// Only count the ones where `broadcast_shred_batch_info`.is_some(), because
|
||||||
|
// there could potentially be other `retransmit` slots inserted into the
|
||||||
|
// transmit pipeline (signaled by ReplayStage) that are not created by the
|
||||||
|
// main shredding/broadcast pipeline
|
||||||
|
slot_batch_counter.num_batches += 1;
|
||||||
|
if let Some(num_expected_batches) = batch_info.num_expected_batches {
|
||||||
|
slot_batch_counter.num_expected_batches = Some(num_expected_batches);
|
||||||
|
}
|
||||||
|
if let Some(num_expected_batches) = slot_batch_counter.num_expected_batches {
|
||||||
|
if slot_batch_counter.num_batches == num_expected_batches {
|
||||||
|
slot_batch_counter
|
||||||
|
.broadcast_shred_stats
|
||||||
|
.report_stats(batch_info.slot, batch_info.slot_start_ts);
|
||||||
|
should_delete = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if should_delete {
|
||||||
|
self.0
|
||||||
|
.remove(&batch_info.slot)
|
||||||
|
.expect("delete should be successful");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct TestStats {
|
||||||
|
sender: Option<Sender<(usize, Slot, Instant)>>,
|
||||||
|
count: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BroadcastStats for TestStats {
|
||||||
|
fn update(&mut self, new_stats: &TestStats) {
|
||||||
|
self.count += new_stats.count;
|
||||||
|
self.sender = new_stats.sender.clone();
|
||||||
|
}
|
||||||
|
fn report_stats(&mut self, slot: Slot, slot_start: Instant) {
|
||||||
|
self.sender
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
|
.send((self.count, slot, slot_start))
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_update() {
|
||||||
|
let start = Instant::now();
|
||||||
|
let mut slot_broadcast_stats = SlotBroadcastStats::default();
|
||||||
|
slot_broadcast_stats.update(
|
||||||
|
&TransmitShredsStats {
|
||||||
|
transmit_elapsed: 1,
|
||||||
|
get_peers_elapsed: 1,
|
||||||
|
send_mmsg_elapsed: 1,
|
||||||
|
num_shreds: 1,
|
||||||
|
},
|
||||||
|
&Some(BroadcastShredBatchInfo {
|
||||||
|
slot: 0,
|
||||||
|
num_expected_batches: Some(2),
|
||||||
|
slot_start_ts: start.clone(),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Singular update
|
||||||
|
let slot_0_stats = slot_broadcast_stats.0.get(&0).unwrap();
|
||||||
|
assert_eq!(slot_0_stats.num_batches, 1);
|
||||||
|
assert_eq!(slot_0_stats.num_expected_batches.unwrap(), 2);
|
||||||
|
assert_eq!(slot_0_stats.broadcast_shred_stats.transmit_elapsed, 1);
|
||||||
|
assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 1);
|
||||||
|
assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 1);
|
||||||
|
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 1);
|
||||||
|
|
||||||
|
slot_broadcast_stats.update(
|
||||||
|
&TransmitShredsStats {
|
||||||
|
transmit_elapsed: 1,
|
||||||
|
get_peers_elapsed: 1,
|
||||||
|
send_mmsg_elapsed: 1,
|
||||||
|
num_shreds: 1,
|
||||||
|
},
|
||||||
|
&None,
|
||||||
|
);
|
||||||
|
|
||||||
|
// If BroadcastShredBatchInfo == None, then update should be ignored
|
||||||
|
let slot_0_stats = slot_broadcast_stats.0.get(&0).unwrap();
|
||||||
|
assert_eq!(slot_0_stats.num_batches, 1);
|
||||||
|
assert_eq!(slot_0_stats.num_expected_batches.unwrap(), 2);
|
||||||
|
assert_eq!(slot_0_stats.broadcast_shred_stats.transmit_elapsed, 1);
|
||||||
|
assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 1);
|
||||||
|
assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 1);
|
||||||
|
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 1);
|
||||||
|
|
||||||
|
// If another batch is given, then total number of batches == num_expected_batches == 2,
|
||||||
|
// so the batch should be purged from the HashMap
|
||||||
|
slot_broadcast_stats.update(
|
||||||
|
&TransmitShredsStats {
|
||||||
|
transmit_elapsed: 1,
|
||||||
|
get_peers_elapsed: 1,
|
||||||
|
send_mmsg_elapsed: 1,
|
||||||
|
num_shreds: 1,
|
||||||
|
},
|
||||||
|
&Some(BroadcastShredBatchInfo {
|
||||||
|
slot: 0,
|
||||||
|
num_expected_batches: None,
|
||||||
|
slot_start_ts: start.clone(),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
assert!(slot_broadcast_stats.0.get(&0).is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_update_multi_threaded() {
|
||||||
|
for round in 0..50 {
|
||||||
|
let start = Instant::now();
|
||||||
|
let slot_broadcast_stats = Arc::new(Mutex::new(SlotBroadcastStats::default()));
|
||||||
|
let num_threads = 5;
|
||||||
|
let slot = 0;
|
||||||
|
let (sender, receiver) = channel();
|
||||||
|
let thread_handles: Vec<_> = (0..num_threads)
|
||||||
|
.into_iter()
|
||||||
|
.map(|i| {
|
||||||
|
let slot_broadcast_stats = slot_broadcast_stats.clone();
|
||||||
|
let sender = Some(sender.clone());
|
||||||
|
let test_stats = TestStats { sender, count: 1 };
|
||||||
|
let mut broadcast_batch_info = BroadcastShredBatchInfo {
|
||||||
|
slot,
|
||||||
|
num_expected_batches: None,
|
||||||
|
slot_start_ts: start.clone(),
|
||||||
|
};
|
||||||
|
if i == round % num_threads {
|
||||||
|
broadcast_batch_info.num_expected_batches = Some(num_threads);
|
||||||
|
}
|
||||||
|
Builder::new()
|
||||||
|
.name("test_update_multi_threaded".to_string())
|
||||||
|
.spawn(move || {
|
||||||
|
slot_broadcast_stats
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.update(&test_stats, &Some(broadcast_batch_info))
|
||||||
|
})
|
||||||
|
.unwrap()
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for t in thread_handles {
|
||||||
|
t.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(slot_broadcast_stats.lock().unwrap().0.get(&slot).is_none());
|
||||||
|
let (returned_count, returned_slot, returned_instant) = receiver.recv().unwrap();
|
||||||
|
assert_eq!(returned_count, num_threads);
|
||||||
|
assert_eq!(returned_slot, slot);
|
||||||
|
assert_eq!(returned_instant, returned_instant);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -23,8 +23,8 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
|||||||
&mut self,
|
&mut self,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
receiver: &Receiver<WorkingBankEntry>,
|
receiver: &Receiver<WorkingBankEntry>,
|
||||||
socket_sender: &Sender<TransmitShreds>,
|
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||||
blockstore_sender: &Sender<Arc<Vec<Shred>>>,
|
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// 1) Pull entries from banking stage
|
// 1) Pull entries from banking stage
|
||||||
let mut receive_results = broadcast_utils::recv_slot_entries(receiver)?;
|
let mut receive_results = broadcast_utils::recv_slot_entries(receiver)?;
|
||||||
@ -61,23 +61,23 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let data_shreds = Arc::new(data_shreds);
|
let data_shreds = Arc::new(data_shreds);
|
||||||
blockstore_sender.send(data_shreds.clone())?;
|
blockstore_sender.send((data_shreds.clone(), None))?;
|
||||||
// 3) Start broadcast step
|
// 3) Start broadcast step
|
||||||
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
|
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
|
||||||
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
|
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
|
||||||
|
|
||||||
let stakes = stakes.map(Arc::new);
|
let stakes = stakes.map(Arc::new);
|
||||||
socket_sender.send((stakes.clone(), data_shreds))?;
|
socket_sender.send(((stakes.clone(), data_shreds), None))?;
|
||||||
socket_sender.send((stakes, Arc::new(coding_shreds)))?;
|
socket_sender.send(((stakes, Arc::new(coding_shreds)), None))?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
fn transmit(
|
fn transmit(
|
||||||
&mut self,
|
&mut self,
|
||||||
receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
|
receiver: &Arc<Mutex<TransmitReceiver>>,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
sock: &UdpSocket,
|
sock: &UdpSocket,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let (stakes, shreds) = receiver.lock().unwrap().recv()?;
|
let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?;
|
||||||
// Broadcast data
|
// Broadcast data
|
||||||
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
|
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
|
||||||
|
|
||||||
@ -94,11 +94,11 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
fn record(
|
fn record(
|
||||||
&self,
|
&mut self,
|
||||||
receiver: &Arc<Mutex<Receiver<Arc<Vec<Shred>>>>>,
|
receiver: &Arc<Mutex<RecordReceiver>>,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let all_shreds = receiver.lock().unwrap().recv()?;
|
let (all_shreds, _) = receiver.lock().unwrap().recv()?;
|
||||||
blockstore
|
blockstore
|
||||||
.insert_shreds(all_shreds.to_vec(), None, true)
|
.insert_shreds(all_shreds.to_vec(), None, true)
|
||||||
.expect("Failed to insert shreds in blockstore");
|
.expect("Failed to insert shreds in blockstore");
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
use super::broadcast_utils::{self, ReceiveResults};
|
use super::{
|
||||||
use super::*;
|
broadcast_utils::{self, ReceiveResults},
|
||||||
|
*,
|
||||||
|
};
|
||||||
use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo;
|
use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo;
|
||||||
use solana_ledger::{
|
use solana_ledger::{
|
||||||
entry::Entry,
|
entry::Entry,
|
||||||
@ -9,44 +11,33 @@ use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us};
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct BroadcastStats {
|
|
||||||
// Per-slot elapsed time
|
|
||||||
shredding_elapsed: u64,
|
|
||||||
insert_shreds_elapsed: u64,
|
|
||||||
broadcast_elapsed: u64,
|
|
||||||
receive_elapsed: u64,
|
|
||||||
seed_elapsed: u64,
|
|
||||||
send_mmsg_elapsed: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BroadcastStats {
|
|
||||||
fn reset(&mut self) {
|
|
||||||
*self = Self::default();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct StandardBroadcastRun {
|
pub struct StandardBroadcastRun {
|
||||||
stats: Arc<RwLock<BroadcastStats>>,
|
process_shreds_stats: ProcessShredsStats,
|
||||||
|
transmit_shreds_stats: Arc<Mutex<SlotBroadcastStats<TransmitShredsStats>>>,
|
||||||
|
insert_shreds_stats: Arc<Mutex<SlotBroadcastStats<InsertShredsStats>>>,
|
||||||
unfinished_slot: Option<UnfinishedSlotInfo>,
|
unfinished_slot: Option<UnfinishedSlotInfo>,
|
||||||
current_slot_and_parent: Option<(u64, u64)>,
|
current_slot_and_parent: Option<(u64, u64)>,
|
||||||
slot_broadcast_start: Option<Instant>,
|
slot_broadcast_start: Option<Instant>,
|
||||||
keypair: Arc<Keypair>,
|
keypair: Arc<Keypair>,
|
||||||
shred_version: u16,
|
shred_version: u16,
|
||||||
last_datapoint_submit: Arc<AtomicU64>,
|
last_datapoint_submit: Arc<AtomicU64>,
|
||||||
|
num_batches: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StandardBroadcastRun {
|
impl StandardBroadcastRun {
|
||||||
pub(super) fn new(keypair: Arc<Keypair>, shred_version: u16) -> Self {
|
pub(super) fn new(keypair: Arc<Keypair>, shred_version: u16) -> Self {
|
||||||
Self {
|
Self {
|
||||||
stats: Arc::new(RwLock::new(BroadcastStats::default())),
|
process_shreds_stats: ProcessShredsStats::default(),
|
||||||
|
transmit_shreds_stats: Arc::new(Mutex::new(SlotBroadcastStats::default())),
|
||||||
|
insert_shreds_stats: Arc::new(Mutex::new(SlotBroadcastStats::default())),
|
||||||
unfinished_slot: None,
|
unfinished_slot: None,
|
||||||
current_slot_and_parent: None,
|
current_slot_and_parent: None,
|
||||||
slot_broadcast_start: None,
|
slot_broadcast_start: None,
|
||||||
keypair,
|
keypair,
|
||||||
shred_version,
|
shred_version,
|
||||||
last_datapoint_submit: Arc::new(AtomicU64::new(0)),
|
last_datapoint_submit: Arc::new(AtomicU64::new(0)),
|
||||||
|
num_batches: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -141,6 +132,7 @@ impl StandardBroadcastRun {
|
|||||||
let brecv = Arc::new(Mutex::new(brecv));
|
let brecv = Arc::new(Mutex::new(brecv));
|
||||||
//data
|
//data
|
||||||
let _ = self.transmit(&srecv, cluster_info, sock);
|
let _ = self.transmit(&srecv, cluster_info, sock);
|
||||||
|
let _ = self.record(&brecv, blockstore);
|
||||||
//coding
|
//coding
|
||||||
let _ = self.transmit(&srecv, cluster_info, sock);
|
let _ = self.transmit(&srecv, cluster_info, sock);
|
||||||
let _ = self.record(&brecv, blockstore);
|
let _ = self.record(&brecv, blockstore);
|
||||||
@ -150,8 +142,8 @@ impl StandardBroadcastRun {
|
|||||||
fn process_receive_results(
|
fn process_receive_results(
|
||||||
&mut self,
|
&mut self,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
socket_sender: &Sender<TransmitShreds>,
|
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||||
blockstore_sender: &Sender<Arc<Vec<Shred>>>,
|
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||||
receive_results: ReceiveResults,
|
receive_results: ReceiveResults,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut receive_elapsed = receive_results.time_elapsed;
|
let mut receive_elapsed = receive_results.time_elapsed;
|
||||||
@ -159,11 +151,13 @@ impl StandardBroadcastRun {
|
|||||||
let bank = receive_results.bank.clone();
|
let bank = receive_results.bank.clone();
|
||||||
let last_tick_height = receive_results.last_tick_height;
|
let last_tick_height = receive_results.last_tick_height;
|
||||||
inc_new_counter_info!("broadcast_service-entries_received", num_entries);
|
inc_new_counter_info!("broadcast_service-entries_received", num_entries);
|
||||||
|
let old_broadcast_start = self.slot_broadcast_start;
|
||||||
|
let old_num_batches = self.num_batches;
|
||||||
if self.current_slot_and_parent.is_none()
|
if self.current_slot_and_parent.is_none()
|
||||||
|| bank.slot() != self.current_slot_and_parent.unwrap().0
|
|| bank.slot() != self.current_slot_and_parent.unwrap().0
|
||||||
{
|
{
|
||||||
self.slot_broadcast_start = Some(Instant::now());
|
self.slot_broadcast_start = Some(Instant::now());
|
||||||
|
self.num_batches = 0;
|
||||||
let slot = bank.slot();
|
let slot = bank.slot();
|
||||||
let parent_slot = bank.parent_slot();
|
let parent_slot = bank.parent_slot();
|
||||||
|
|
||||||
@ -178,16 +172,16 @@ impl StandardBroadcastRun {
|
|||||||
self.check_for_interrupted_slot(bank.ticks_per_slot() as u8);
|
self.check_for_interrupted_slot(bank.ticks_per_slot() as u8);
|
||||||
|
|
||||||
// 2) Convert entries to shreds and coding shreds
|
// 2) Convert entries to shreds and coding shreds
|
||||||
|
|
||||||
let (shredder, next_shred_index) = self.init_shredder(
|
let (shredder, next_shred_index) = self.init_shredder(
|
||||||
blockstore,
|
blockstore,
|
||||||
(bank.tick_height() % bank.ticks_per_slot()) as u8,
|
(bank.tick_height() % bank.ticks_per_slot()) as u8,
|
||||||
);
|
);
|
||||||
let mut data_shreds = self.entries_to_data_shreds(
|
let is_last_in_slot = last_tick_height == bank.max_tick_height();
|
||||||
|
let data_shreds = self.entries_to_data_shreds(
|
||||||
&shredder,
|
&shredder,
|
||||||
next_shred_index,
|
next_shred_index,
|
||||||
&receive_results.entries,
|
&receive_results.entries,
|
||||||
last_tick_height == bank.max_tick_height(),
|
is_last_in_slot,
|
||||||
);
|
);
|
||||||
// Insert the first shred so blockstore stores that the leader started this block
|
// Insert the first shred so blockstore stores that the leader started this block
|
||||||
// This must be done before the blocks are sent out over the wire.
|
// This must be done before the blocks are sent out over the wire.
|
||||||
@ -198,27 +192,56 @@ impl StandardBroadcastRun {
|
|||||||
.expect("Failed to insert shreds in blockstore");
|
.expect("Failed to insert shreds in blockstore");
|
||||||
}
|
}
|
||||||
let last_data_shred = data_shreds.len();
|
let last_data_shred = data_shreds.len();
|
||||||
if let Some(last_shred) = last_unfinished_slot_shred {
|
|
||||||
data_shreds.push(last_shred);
|
|
||||||
}
|
|
||||||
let to_shreds_elapsed = to_shreds_start.elapsed();
|
let to_shreds_elapsed = to_shreds_start.elapsed();
|
||||||
|
|
||||||
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
|
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
|
||||||
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
|
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
|
||||||
let stakes = stakes.map(Arc::new);
|
let stakes = stakes.map(Arc::new);
|
||||||
let data_shreds = Arc::new(data_shreds);
|
|
||||||
socket_sender.send((stakes.clone(), data_shreds.clone()))?;
|
// Broadcast the last shred of the interrupted slot if necessary
|
||||||
blockstore_sender.send(data_shreds.clone())?;
|
if let Some(last_shred) = last_unfinished_slot_shred {
|
||||||
let coding_shreds = shredder.data_shreds_to_coding_shreds(&data_shreds[0..last_data_shred]);
|
let batch_info = Some(BroadcastShredBatchInfo {
|
||||||
let coding_shreds = Arc::new(coding_shreds);
|
slot: last_shred.slot(),
|
||||||
socket_sender.send((stakes, coding_shreds.clone()))?;
|
num_expected_batches: Some(old_num_batches + 1),
|
||||||
blockstore_sender.send(coding_shreds)?;
|
slot_start_ts: old_broadcast_start.expect(
|
||||||
self.update_broadcast_stats(BroadcastStats {
|
"Old broadcast start time for previous slot must exist if the previous slot
|
||||||
shredding_elapsed: duration_as_us(&to_shreds_elapsed),
|
was interrupted",
|
||||||
receive_elapsed: duration_as_us(&receive_elapsed),
|
),
|
||||||
..BroadcastStats::default()
|
});
|
||||||
|
let last_shred = Arc::new(vec![last_shred]);
|
||||||
|
socket_sender.send(((stakes.clone(), last_shred.clone()), batch_info.clone()))?;
|
||||||
|
blockstore_sender.send((last_shred, batch_info))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Increment by two batches, one for the data batch, one for the coding batch.
|
||||||
|
self.num_batches += 2;
|
||||||
|
let num_expected_batches = {
|
||||||
|
if is_last_in_slot {
|
||||||
|
Some(self.num_batches)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let batch_info = Some(BroadcastShredBatchInfo {
|
||||||
|
slot: bank.slot(),
|
||||||
|
num_expected_batches,
|
||||||
|
slot_start_ts: self
|
||||||
|
.slot_broadcast_start
|
||||||
|
.clone()
|
||||||
|
.expect("Start timestamp must exist for a slot if we're broadcasting the slot"),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let data_shreds = Arc::new(data_shreds);
|
||||||
|
socket_sender.send(((stakes.clone(), data_shreds.clone()), batch_info.clone()))?;
|
||||||
|
blockstore_sender.send((data_shreds.clone(), batch_info.clone()))?;
|
||||||
|
let coding_shreds = shredder.data_shreds_to_coding_shreds(&data_shreds[0..last_data_shred]);
|
||||||
|
let coding_shreds = Arc::new(coding_shreds);
|
||||||
|
socket_sender.send(((stakes, coding_shreds.clone()), batch_info.clone()))?;
|
||||||
|
blockstore_sender.send((coding_shreds, batch_info))?;
|
||||||
|
self.process_shreds_stats.update(&ProcessShredsStats {
|
||||||
|
shredding_elapsed: duration_as_us(&to_shreds_elapsed),
|
||||||
|
receive_elapsed: duration_as_us(&receive_elapsed),
|
||||||
|
});
|
||||||
if last_tick_height == bank.max_tick_height() {
|
if last_tick_height == bank.max_tick_height() {
|
||||||
self.report_and_reset_stats();
|
self.report_and_reset_stats();
|
||||||
self.unfinished_slot = None;
|
self.unfinished_slot = None;
|
||||||
@ -227,7 +250,12 @@ impl StandardBroadcastRun {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn insert(&self, blockstore: &Arc<Blockstore>, shreds: Arc<Vec<Shred>>) -> Result<()> {
|
fn insert(
|
||||||
|
&mut self,
|
||||||
|
blockstore: &Arc<Blockstore>,
|
||||||
|
shreds: Arc<Vec<Shred>>,
|
||||||
|
broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>,
|
||||||
|
) -> Result<()> {
|
||||||
// Insert shreds into blockstore
|
// Insert shreds into blockstore
|
||||||
let insert_shreds_start = Instant::now();
|
let insert_shreds_start = Instant::now();
|
||||||
// The first shred is inserted synchronously
|
// The first shred is inserted synchronously
|
||||||
@ -240,29 +268,39 @@ impl StandardBroadcastRun {
|
|||||||
.insert_shreds(data_shreds, None, true)
|
.insert_shreds(data_shreds, None, true)
|
||||||
.expect("Failed to insert shreds in blockstore");
|
.expect("Failed to insert shreds in blockstore");
|
||||||
let insert_shreds_elapsed = insert_shreds_start.elapsed();
|
let insert_shreds_elapsed = insert_shreds_start.elapsed();
|
||||||
self.update_broadcast_stats(BroadcastStats {
|
let new_insert_shreds_stats = InsertShredsStats {
|
||||||
insert_shreds_elapsed: duration_as_us(&insert_shreds_elapsed),
|
insert_shreds_elapsed: duration_as_us(&insert_shreds_elapsed),
|
||||||
..BroadcastStats::default()
|
num_shreds: shreds.len(),
|
||||||
});
|
};
|
||||||
|
self.update_insertion_metrics(&new_insert_shreds_stats, &broadcast_shred_batch_info);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn update_insertion_metrics(
|
||||||
|
&mut self,
|
||||||
|
new_insertion_shreds_stats: &InsertShredsStats,
|
||||||
|
broadcast_shred_batch_info: &Option<BroadcastShredBatchInfo>,
|
||||||
|
) {
|
||||||
|
let mut insert_shreds_stats = self.insert_shreds_stats.lock().unwrap();
|
||||||
|
insert_shreds_stats.update(new_insertion_shreds_stats, broadcast_shred_batch_info);
|
||||||
|
}
|
||||||
|
|
||||||
fn broadcast(
|
fn broadcast(
|
||||||
&mut self,
|
&mut self,
|
||||||
sock: &UdpSocket,
|
sock: &UdpSocket,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
|
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
|
||||||
shreds: Arc<Vec<Shred>>,
|
shreds: Arc<Vec<Shred>>,
|
||||||
|
broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let seed_start = Instant::now();
|
trace!("Broadcasting {:?} shreds", shreds.len());
|
||||||
let seed_elapsed = seed_start.elapsed();
|
// Get the list of peers to broadcast to
|
||||||
|
let get_peers_start = Instant::now();
|
||||||
|
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
|
||||||
|
let get_peers_elapsed = get_peers_start.elapsed();
|
||||||
|
|
||||||
// Broadcast the shreds
|
// Broadcast the shreds
|
||||||
let broadcast_start = Instant::now();
|
let transmit_start = Instant::now();
|
||||||
trace!("Broadcasting {:?} shreds", shreds.len());
|
|
||||||
|
|
||||||
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
|
|
||||||
|
|
||||||
let mut send_mmsg_total = 0;
|
let mut send_mmsg_total = 0;
|
||||||
broadcast_shreds(
|
broadcast_shreds(
|
||||||
sock,
|
sock,
|
||||||
@ -272,42 +310,38 @@ impl StandardBroadcastRun {
|
|||||||
&self.last_datapoint_submit,
|
&self.last_datapoint_submit,
|
||||||
&mut send_mmsg_total,
|
&mut send_mmsg_total,
|
||||||
)?;
|
)?;
|
||||||
|
let transmit_elapsed = transmit_start.elapsed();
|
||||||
let broadcast_elapsed = broadcast_start.elapsed();
|
let new_transmit_shreds_stats = TransmitShredsStats {
|
||||||
|
transmit_elapsed: duration_as_us(&transmit_elapsed),
|
||||||
self.update_broadcast_stats(BroadcastStats {
|
get_peers_elapsed: duration_as_us(&get_peers_elapsed),
|
||||||
broadcast_elapsed: duration_as_us(&broadcast_elapsed),
|
|
||||||
seed_elapsed: duration_as_us(&seed_elapsed),
|
|
||||||
send_mmsg_elapsed: send_mmsg_total,
|
send_mmsg_elapsed: send_mmsg_total,
|
||||||
..BroadcastStats::default()
|
num_shreds: shreds.len(),
|
||||||
});
|
};
|
||||||
|
|
||||||
|
// Process metrics
|
||||||
|
self.update_transmit_metrics(&new_transmit_shreds_stats, &broadcast_shred_batch_info);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_broadcast_stats(&self, stats: BroadcastStats) {
|
fn update_transmit_metrics(
|
||||||
let mut wstats = self.stats.write().unwrap();
|
&mut self,
|
||||||
wstats.receive_elapsed += stats.receive_elapsed;
|
new_transmit_shreds_stats: &TransmitShredsStats,
|
||||||
wstats.shredding_elapsed += stats.shredding_elapsed;
|
broadcast_shred_batch_info: &Option<BroadcastShredBatchInfo>,
|
||||||
wstats.insert_shreds_elapsed += stats.insert_shreds_elapsed;
|
) {
|
||||||
wstats.broadcast_elapsed += stats.broadcast_elapsed;
|
let mut transmit_shreds_stats = self.transmit_shreds_stats.lock().unwrap();
|
||||||
wstats.seed_elapsed += stats.seed_elapsed;
|
transmit_shreds_stats.update(new_transmit_shreds_stats, broadcast_shred_batch_info);
|
||||||
wstats.send_mmsg_elapsed += stats.send_mmsg_elapsed;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn report_and_reset_stats(&mut self) {
|
fn report_and_reset_stats(&mut self) {
|
||||||
let stats = self.stats.read().unwrap();
|
let stats = &self.process_shreds_stats;
|
||||||
assert!(self.unfinished_slot.is_some());
|
assert!(self.unfinished_slot.is_some());
|
||||||
datapoint_info!(
|
datapoint_info!(
|
||||||
"broadcast-bank-stats",
|
"broadcast-process-shreds-stats",
|
||||||
("slot", self.unfinished_slot.unwrap().slot as i64, i64),
|
("slot", self.unfinished_slot.unwrap().slot as i64, i64),
|
||||||
("shredding_time", stats.shredding_elapsed as i64, i64),
|
("shredding_time", stats.shredding_elapsed as i64, i64),
|
||||||
("insertion_time", stats.insert_shreds_elapsed as i64, i64),
|
|
||||||
("broadcast_time", stats.broadcast_elapsed as i64, i64),
|
|
||||||
("receive_time", stats.receive_elapsed as i64, i64),
|
("receive_time", stats.receive_elapsed as i64, i64),
|
||||||
("send_mmsg", stats.send_mmsg_elapsed as i64, i64),
|
|
||||||
("seed", stats.seed_elapsed as i64, i64),
|
|
||||||
(
|
(
|
||||||
"num_shreds",
|
"num_data_shreds",
|
||||||
i64::from(self.unfinished_slot.unwrap().next_shred_index),
|
i64::from(self.unfinished_slot.unwrap().next_shred_index),
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
@ -317,8 +351,7 @@ impl StandardBroadcastRun {
|
|||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
drop(stats);
|
self.process_shreds_stats.reset();
|
||||||
self.stats.write().unwrap().reset();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -327,8 +360,8 @@ impl BroadcastRun for StandardBroadcastRun {
|
|||||||
&mut self,
|
&mut self,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
receiver: &Receiver<WorkingBankEntry>,
|
receiver: &Receiver<WorkingBankEntry>,
|
||||||
socket_sender: &Sender<TransmitShreds>,
|
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||||
blockstore_sender: &Sender<Arc<Vec<Shred>>>,
|
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
|
let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
|
||||||
self.process_receive_results(
|
self.process_receive_results(
|
||||||
@ -340,20 +373,20 @@ impl BroadcastRun for StandardBroadcastRun {
|
|||||||
}
|
}
|
||||||
fn transmit(
|
fn transmit(
|
||||||
&mut self,
|
&mut self,
|
||||||
receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
|
receiver: &Arc<Mutex<TransmitReceiver>>,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
sock: &UdpSocket,
|
sock: &UdpSocket,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let (stakes, shreds) = receiver.lock().unwrap().recv()?;
|
let ((stakes, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?;
|
||||||
self.broadcast(sock, cluster_info, stakes, shreds)
|
self.broadcast(sock, cluster_info, stakes, shreds, slot_start_ts)
|
||||||
}
|
}
|
||||||
fn record(
|
fn record(
|
||||||
&self,
|
&mut self,
|
||||||
receiver: &Arc<Mutex<Receiver<Arc<Vec<Shred>>>>>,
|
receiver: &Arc<Mutex<RecordReceiver>>,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let shreds = receiver.lock().unwrap().recv()?;
|
let (shreds, slot_start_ts) = receiver.lock().unwrap().recv()?;
|
||||||
self.insert(blockstore, shreds)
|
self.insert(blockstore, shreds, slot_start_ts)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -469,12 +502,29 @@ mod test {
|
|||||||
// Make sure the slot is not complete
|
// Make sure the slot is not complete
|
||||||
assert!(!blockstore.is_full(0));
|
assert!(!blockstore.is_full(0));
|
||||||
// Modify the stats, should reset later
|
// Modify the stats, should reset later
|
||||||
|
standard_broadcast_run.process_shreds_stats.receive_elapsed = 10;
|
||||||
|
// Broadcast stats should exist, and 2 batches should have been sent,
|
||||||
|
// one for data, one for coding
|
||||||
|
assert_eq!(
|
||||||
standard_broadcast_run
|
standard_broadcast_run
|
||||||
.stats
|
.transmit_shreds_stats
|
||||||
.write()
|
.lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.receive_elapsed = 10;
|
.get(unfinished_slot.slot)
|
||||||
|
.unwrap()
|
||||||
|
.num_batches(),
|
||||||
|
2
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
standard_broadcast_run
|
||||||
|
.insert_shreds_stats
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.get(unfinished_slot.slot)
|
||||||
|
.unwrap()
|
||||||
|
.num_batches(),
|
||||||
|
2
|
||||||
|
);
|
||||||
// Try to fetch ticks from blockstore, nothing should break
|
// Try to fetch ticks from blockstore, nothing should break
|
||||||
assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), ticks0);
|
assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), ticks0);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
@ -485,7 +535,7 @@ mod test {
|
|||||||
// Step 2: Make a transmission for another bank that interrupts the transmission for
|
// Step 2: Make a transmission for another bank that interrupts the transmission for
|
||||||
// slot 0
|
// slot 0
|
||||||
let bank2 = Arc::new(Bank::new_from_parent(&bank0, &leader_keypair.pubkey(), 2));
|
let bank2 = Arc::new(Bank::new_from_parent(&bank0, &leader_keypair.pubkey(), 2));
|
||||||
|
let interrupted_slot = unfinished_slot.slot;
|
||||||
// Interrupting the slot should cause the unfinished_slot and stats to reset
|
// Interrupting the slot should cause the unfinished_slot and stats to reset
|
||||||
let num_shreds = 1;
|
let num_shreds = 1;
|
||||||
assert!(num_shreds < num_shreds_per_slot);
|
assert!(num_shreds < num_shreds_per_slot);
|
||||||
@ -509,10 +559,24 @@ mod test {
|
|||||||
|
|
||||||
// Check that the stats were reset as well
|
// Check that the stats were reset as well
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
standard_broadcast_run.stats.read().unwrap().receive_elapsed,
|
standard_broadcast_run.process_shreds_stats.receive_elapsed,
|
||||||
0
|
0
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Broadcast stats for interrupted slot should be cleared
|
||||||
|
assert!(standard_broadcast_run
|
||||||
|
.transmit_shreds_stats
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.get(interrupted_slot)
|
||||||
|
.is_none());
|
||||||
|
assert!(standard_broadcast_run
|
||||||
|
.insert_shreds_stats
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.get(interrupted_slot)
|
||||||
|
.is_none());
|
||||||
|
|
||||||
// Try to fetch the incomplete ticks from blockstore, should succeed
|
// Try to fetch the incomplete ticks from blockstore, should succeed
|
||||||
assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), ticks0);
|
assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), ticks0);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user