removes Slot from TransmitShreds (backport #19327) (#20260)

* removes Slot from TransmitShreds (#19327)

An earlier version of the code was funneling through stakes along with
shreds to broadcast:
https://github.com/solana-labs/solana/blob/b67ffab37/core/src/broadcast_stage.rs#L127

This was changed to only slots as stakes computation was pushed further
down the pipeline in:
https://github.com/solana-labs/solana/pull/18971

However shreds themselves embody which slot they belong to. So pairing
them with slot is redundant and adds rooms for bugs should they become
inconsistent.

(cherry picked from commit 1deb4add81)

# Conflicts:
#	core/benches/cluster_info.rs
#	core/src/broadcast_stage.rs
#	core/src/broadcast_stage/broadcast_duplicates_run.rs
#	core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs
#	core/src/broadcast_stage/standard_broadcast_run.rs

* removes backport merge conflicts

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2021-09-28 12:55:01 +00:00
committed by GitHub
parent 4c4f183515
commit 1bf88556ee
7 changed files with 159 additions and 157 deletions

View File

@ -2,32 +2,37 @@
extern crate test; extern crate test;
use rand::{thread_rng, Rng}; use {
use solana_core::{ rand::{thread_rng, Rng},
broadcast_stage::{broadcast_metrics::TransmitShredsStats, broadcast_shreds, BroadcastStage}, solana_core::{
cluster_nodes::ClusterNodes, broadcast_stage::{
}; broadcast_metrics::TransmitShredsStats, broadcast_shreds, BroadcastStage,
use solana_gossip::{ },
cluster_nodes::ClusterNodesCache,
},
solana_gossip::{
cluster_info::{ClusterInfo, Node}, cluster_info::{ClusterInfo, Node},
contact_info::ContactInfo, contact_info::ContactInfo,
}; },
use solana_ledger::{ solana_ledger::{
genesis_utils::{create_genesis_config, GenesisConfigInfo}, genesis_utils::{create_genesis_config, GenesisConfigInfo},
shred::Shred, shred::Shred,
}; },
use solana_runtime::{bank::Bank, bank_forks::BankForks}; solana_runtime::{bank::Bank, bank_forks::BankForks},
use solana_sdk::{ solana_sdk::{
pubkey, pubkey,
signature::Keypair, signature::Keypair,
timing::{timestamp, AtomicInterval}, timing::{timestamp, AtomicInterval},
}; },
use solana_streamer::socket::SocketAddrSpace; solana_streamer::socket::SocketAddrSpace,
use std::{ std::{
collections::HashMap, collections::HashMap,
net::UdpSocket, net::UdpSocket,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
time::Duration,
},
test::Bencher,
}; };
use test::Bencher;
#[bench] #[bench]
fn broadcast_shreds_bench(bencher: &mut Bencher) { fn broadcast_shreds_bench(bencher: &mut Bencher) {
@ -56,7 +61,10 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64); stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64);
} }
let cluster_info = Arc::new(cluster_info); let cluster_info = Arc::new(cluster_info);
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(&cluster_info, &stakes); let cluster_nodes_cache = ClusterNodesCache::<BroadcastStage>::new(
8, // cap
Duration::from_secs(5), // ttl
);
let shreds = Arc::new(shreds); let shreds = Arc::new(shreds);
let last_datapoint = Arc::new(AtomicInterval::default()); let last_datapoint = Arc::new(AtomicInterval::default());
bencher.iter(move || { bencher.iter(move || {
@ -64,11 +72,11 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
broadcast_shreds( broadcast_shreds(
&socket, &socket,
&shreds, &shreds,
&cluster_nodes, &cluster_nodes_cache,
&last_datapoint, &last_datapoint,
&mut TransmitShredsStats::default(), &mut TransmitShredsStats::default(),
&SocketAddrSpace::Unspecified, &SocketAddrSpace::Unspecified,
cluster_info.id(), &cluster_info,
&bank_forks, &bank_forks,
) )
.unwrap(); .unwrap();

View File

@ -1,32 +1,34 @@
//! A stage to broadcast data from a leader node to validators //! A stage to broadcast data from a leader node to validators
#![allow(clippy::rc_buffer)] #![allow(clippy::rc_buffer)]
use self::{ use {
self::{
broadcast_duplicates_run::BroadcastDuplicatesRun, broadcast_duplicates_run::BroadcastDuplicatesRun,
broadcast_fake_shreds_run::BroadcastFakeShredsRun, broadcast_metrics::*, broadcast_fake_shreds_run::BroadcastFakeShredsRun, broadcast_metrics::*,
fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun, fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun,
standard_broadcast_run::StandardBroadcastRun, standard_broadcast_run::StandardBroadcastRun,
}; },
use crate::{ crate::{
cluster_nodes::ClusterNodes, cluster_nodes::{ClusterNodes, ClusterNodesCache},
result::{Error, Result}, result::{Error, Result},
}; },
use crossbeam_channel::{ crossbeam_channel::{
Receiver as CrossbeamReceiver, RecvTimeoutError as CrossbeamRecvTimeoutError, Receiver as CrossbeamReceiver, RecvTimeoutError as CrossbeamRecvTimeoutError,
Sender as CrossbeamSender, Sender as CrossbeamSender,
}; },
use solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError}; itertools::Itertools,
use solana_ledger::{blockstore::Blockstore, shred::Shred}; solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError},
use solana_measure::measure::Measure; solana_ledger::{blockstore::Blockstore, shred::Shred},
use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; solana_measure::measure::Measure,
use solana_poh::poh_recorder::WorkingBankEntry; solana_metrics::{inc_new_counter_error, inc_new_counter_info},
use solana_runtime::{bank::Bank, bank_forks::BankForks}; solana_poh::poh_recorder::WorkingBankEntry,
use solana_sdk::timing::{timestamp, AtomicInterval}; solana_runtime::{bank::Bank, bank_forks::BankForks},
use solana_sdk::{clock::Slot, pubkey::Pubkey}; solana_sdk::timing::{timestamp, AtomicInterval},
use solana_streamer::{ solana_sdk::{clock::Slot, pubkey::Pubkey},
solana_streamer::{
sendmmsg::{batch_send, SendPktsError}, sendmmsg::{batch_send, SendPktsError},
socket::SocketAddrSpace, socket::SocketAddrSpace,
}; },
use std::{ std::{
collections::HashMap, collections::HashMap,
net::UdpSocket, net::UdpSocket,
sync::atomic::{AtomicBool, Ordering}, sync::atomic::{AtomicBool, Ordering},
@ -34,6 +36,7 @@ use std::{
sync::{Arc, Mutex, RwLock}, sync::{Arc, Mutex, RwLock},
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::{Duration, Instant}, time::{Duration, Instant},
},
}; };
mod broadcast_duplicates_run; mod broadcast_duplicates_run;
@ -50,7 +53,7 @@ pub(crate) const NUM_INSERT_THREADS: usize = 2;
pub(crate) type RetransmitSlotsSender = CrossbeamSender<HashMap<Slot, Arc<Bank>>>; pub(crate) type RetransmitSlotsSender = CrossbeamSender<HashMap<Slot, Arc<Bank>>>;
pub(crate) type RetransmitSlotsReceiver = CrossbeamReceiver<HashMap<Slot, Arc<Bank>>>; pub(crate) type RetransmitSlotsReceiver = CrossbeamReceiver<HashMap<Slot, Arc<Bank>>>;
pub(crate) type RecordReceiver = Receiver<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>; pub(crate) type RecordReceiver = Receiver<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>;
pub(crate) type TransmitReceiver = Receiver<(TransmitShreds, Option<BroadcastShredBatchInfo>)>; pub(crate) type TransmitReceiver = Receiver<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>;
#[derive(Debug, PartialEq, Eq, Clone)] #[derive(Debug, PartialEq, Eq, Clone)]
pub enum BroadcastStageReturnType { pub enum BroadcastStageReturnType {
@ -135,13 +138,12 @@ impl BroadcastStageType {
} }
} }
type TransmitShreds = (Slot, Arc<Vec<Shred>>);
trait BroadcastRun { trait BroadcastRun {
fn run( fn run(
&mut self, &mut self,
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
receiver: &Receiver<WorkingBankEntry>, receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>, socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
) -> Result<()>; ) -> Result<()>;
fn transmit( fn transmit(
@ -185,7 +187,7 @@ impl BroadcastStage {
fn run( fn run(
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
receiver: &Receiver<WorkingBankEntry>, receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>, socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
mut broadcast_stage_run: impl BroadcastRun, mut broadcast_stage_run: impl BroadcastRun,
) -> BroadcastStageReturnType { ) -> BroadcastStageReturnType {
@ -328,7 +330,7 @@ impl BroadcastStage {
fn check_retransmit_signals( fn check_retransmit_signals(
blockstore: &Blockstore, blockstore: &Blockstore,
retransmit_slots_receiver: &RetransmitSlotsReceiver, retransmit_slots_receiver: &RetransmitSlotsReceiver,
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>, socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::from_millis(100); let timer = Duration::from_millis(100);
@ -345,9 +347,9 @@ impl BroadcastStage {
.get_data_shreds_for_slot(slot, 0) .get_data_shreds_for_slot(slot, 0)
.expect("My own shreds must be reconstructable"), .expect("My own shreds must be reconstructable"),
); );
debug_assert!(data_shreds.iter().all(|shred| shred.slot() == slot));
if !data_shreds.is_empty() { if !data_shreds.is_empty() {
socket_sender.send(((slot, data_shreds), None))?; socket_sender.send((data_shreds, None))?;
} }
let coding_shreds = Arc::new( let coding_shreds = Arc::new(
@ -356,8 +358,9 @@ impl BroadcastStage {
.expect("My own shreds must be reconstructable"), .expect("My own shreds must be reconstructable"),
); );
debug_assert!(coding_shreds.iter().all(|shred| shred.slot() == slot));
if !coding_shreds.is_empty() { if !coding_shreds.is_empty() {
socket_sender.send(((slot, coding_shreds), None))?; socket_sender.send((coding_shreds, None))?;
} }
} }
@ -373,11 +376,13 @@ impl BroadcastStage {
} }
fn update_peer_stats( fn update_peer_stats(
num_live_peers: i64, cluster_nodes: &ClusterNodes<BroadcastStage>,
broadcast_len: i64,
last_datapoint_submit: &Arc<AtomicInterval>, last_datapoint_submit: &Arc<AtomicInterval>,
) { ) {
if last_datapoint_submit.should_update(1000) { if last_datapoint_submit.should_update(1000) {
let now = timestamp();
let num_live_peers = cluster_nodes.num_peers_live(now);
let broadcast_len = cluster_nodes.num_peers() + 1;
datapoint_info!( datapoint_info!(
"cluster_info-num_nodes", "cluster_info-num_nodes",
("live_count", num_live_peers, i64), ("live_count", num_live_peers, i64),
@ -391,31 +396,37 @@ fn update_peer_stats(
pub fn broadcast_shreds( pub fn broadcast_shreds(
s: &UdpSocket, s: &UdpSocket,
shreds: &[Shred], shreds: &[Shred],
cluster_nodes: &ClusterNodes<BroadcastStage>, cluster_nodes_cache: &ClusterNodesCache<BroadcastStage>,
last_datapoint_submit: &Arc<AtomicInterval>, last_datapoint_submit: &Arc<AtomicInterval>,
transmit_stats: &mut TransmitShredsStats, transmit_stats: &mut TransmitShredsStats,
socket_addr_space: &SocketAddrSpace, socket_addr_space: &SocketAddrSpace,
self_pubkey: Pubkey, cluster_info: &ClusterInfo,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> { ) -> Result<()> {
let mut result = Ok(()); let mut result = Ok(());
let broadcast_len = cluster_nodes.num_peers();
if broadcast_len == 0 {
update_peer_stats(1, 1, last_datapoint_submit);
return result;
}
let mut shred_select = Measure::start("shred_select"); let mut shred_select = Measure::start("shred_select");
let root_bank = bank_forks.read().unwrap().root_bank(); // Only the leader broadcasts shreds.
let leader = Some(cluster_info.id());
let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
};
let packets: Vec<_> = shreds let packets: Vec<_> = shreds
.iter() .iter()
.filter_map(|shred| { .group_by(|shred| shred.slot())
let seed = shred.seed(Some(self_pubkey), &root_bank); .into_iter()
.flat_map(|(slot, shreds)| {
let cluster_nodes =
cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info);
update_peer_stats(&cluster_nodes, last_datapoint_submit);
let root_bank = root_bank.clone();
shreds.filter_map(move |shred| {
let seed = shred.seed(leader, &root_bank);
let node = cluster_nodes.get_broadcast_peer(seed)?; let node = cluster_nodes.get_broadcast_peer(seed)?;
if socket_addr_space.check(&node.tvu) { socket_addr_space
Some((&shred.payload, node.tvu)) .check(&node.tvu)
} else { .then(|| (&shred.payload, node.tvu))
None })
}
}) })
.collect(); .collect();
shred_select.stop(); shred_select.stop();
@ -429,13 +440,6 @@ pub fn broadcast_shreds(
send_mmsg_time.stop(); send_mmsg_time.stop();
transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us(); transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us();
transmit_stats.total_packets += packets.len(); transmit_stats.total_packets += packets.len();
let num_live_peers = cluster_nodes.num_peers_live(timestamp()) as i64;
update_peer_stats(
num_live_peers,
broadcast_len as i64 + 1,
last_datapoint_submit,
);
result result
} }
@ -462,14 +466,15 @@ pub mod test {
}; };
#[allow(clippy::implicit_hasher)] #[allow(clippy::implicit_hasher)]
#[allow(clippy::type_complexity)]
fn make_transmit_shreds( fn make_transmit_shreds(
slot: Slot, slot: Slot,
num: u64, num: u64,
) -> ( ) -> (
Vec<Shred>, Vec<Shred>,
Vec<Shred>, Vec<Shred>,
Vec<TransmitShreds>, Vec<Arc<Vec<Shred>>>,
Vec<TransmitShreds>, Vec<Arc<Vec<Shred>>>,
) { ) {
let num_entries = max_ticks_per_n_shreds(num, None); let num_entries = max_ticks_per_n_shreds(num, None);
let (data_shreds, _) = make_slot_entries(slot, 0, num_entries); let (data_shreds, _) = make_slot_entries(slot, 0, num_entries);
@ -486,11 +491,11 @@ pub mod test {
coding_shreds.clone(), coding_shreds.clone(),
data_shreds data_shreds
.into_iter() .into_iter()
.map(|s| (slot, Arc::new(vec![s]))) .map(|shred| Arc::new(vec![shred]))
.collect(), .collect(),
coding_shreds coding_shreds
.into_iter() .into_iter()
.map(|s| (slot, Arc::new(vec![s]))) .map(|shred| Arc::new(vec![shred]))
.collect(), .collect(),
) )
} }
@ -502,15 +507,15 @@ pub mod test {
num_expected_data_shreds: u64, num_expected_data_shreds: u64,
num_expected_coding_shreds: u64, num_expected_coding_shreds: u64,
) { ) {
while let Ok((new_retransmit_slots, _)) = transmit_receiver.try_recv() { while let Ok((shreds, _)) = transmit_receiver.try_recv() {
if new_retransmit_slots.1[0].is_data() { if shreds[0].is_data() {
for data_shred in new_retransmit_slots.1.iter() { for data_shred in shreds.iter() {
assert_eq!(data_shred.index() as u64, data_index); assert_eq!(data_shred.index() as u64, data_index);
data_index += 1; data_index += 1;
} }
} else { } else {
assert_eq!(new_retransmit_slots.1[0].index() as u64, coding_index); assert_eq!(shreds[0].index() as u64, coding_index);
for coding_shred in new_retransmit_slots.1.iter() { for coding_shred in shreds.iter() {
assert_eq!(coding_shred.index() as u64, coding_index); assert_eq!(coding_shred.index() as u64, coding_index);
coding_index += 1; coding_index += 1;
} }

View File

@ -150,7 +150,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
&mut self, &mut self,
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
receiver: &Receiver<WorkingBankEntry>, receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>, socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
) -> Result<()> { ) -> Result<()> {
// 1) Pull entries from banking stage // 1) Pull entries from banking stage
@ -273,10 +273,10 @@ impl BroadcastRun for BroadcastDuplicatesRun {
blockstore_sender.send((data_shreds.clone(), None))?; blockstore_sender.send((data_shreds.clone(), None))?;
// 3) Start broadcast step // 3) Start broadcast step
socket_sender.send(((bank.slot(), Arc::new(duplicate_data_shreds)), None))?; socket_sender.send((Arc::new(duplicate_data_shreds), None))?;
socket_sender.send(((bank.slot(), Arc::new(duplicate_coding_shreds)), None))?; socket_sender.send((Arc::new(duplicate_coding_shreds), None))?;
socket_sender.send(((bank.slot(), data_shreds), None))?; socket_sender.send((data_shreds, None))?;
socket_sender.send(((bank.slot(), Arc::new(coding_shreds)), None))?; socket_sender.send((Arc::new(coding_shreds), None))?;
Ok(()) Ok(())
} }
@ -297,7 +297,12 @@ impl BroadcastRun for BroadcastDuplicatesRun {
} }
}; };
let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; let (shreds, _) = receiver.lock().unwrap().recv()?;
if shreds.is_empty() {
return Ok(());
}
let slot = shreds.first().unwrap().slot();
assert!(shreds.iter().all(|shred| shred.slot() == slot));
let root_bank = bank_forks.read().unwrap().root_bank(); let root_bank = bank_forks.read().unwrap().root_bank();
let epoch = root_bank.get_leader_schedule_epoch(slot); let epoch = root_bank.get_leader_schedule_epoch(slot);
let stakes = root_bank.epoch_staked_nodes(epoch).unwrap_or_default(); let stakes = root_bank.epoch_staked_nodes(epoch).unwrap_or_default();

View File

@ -28,7 +28,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
&mut self, &mut self,
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
receiver: &Receiver<WorkingBankEntry>, receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>, socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
) -> Result<()> { ) -> Result<()> {
// 1) Pull entries from banking stage // 1) Pull entries from banking stage
@ -93,11 +93,13 @@ impl BroadcastRun for BroadcastFakeShredsRun {
// 3) Start broadcast step // 3) Start broadcast step
//some indicates fake shreds //some indicates fake shreds
let batch_info = Some(batch_info); let batch_info = Some(batch_info);
socket_sender.send(((slot, Arc::new(fake_data_shreds)), batch_info.clone()))?; assert!(fake_data_shreds.iter().all(|shred| shred.slot() == slot));
socket_sender.send(((slot, Arc::new(fake_coding_shreds)), batch_info))?; assert!(fake_coding_shreds.iter().all(|shred| shred.slot() == slot));
socket_sender.send((Arc::new(fake_data_shreds), batch_info.clone()))?;
socket_sender.send((Arc::new(fake_coding_shreds), batch_info))?;
//none indicates real shreds //none indicates real shreds
socket_sender.send(((slot, data_shreds), None))?; socket_sender.send((data_shreds, None))?;
socket_sender.send(((slot, Arc::new(coding_shreds)), None))?; socket_sender.send((Arc::new(coding_shreds), None))?;
Ok(()) Ok(())
} }
@ -108,7 +110,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
sock: &UdpSocket, sock: &UdpSocket,
_bank_forks: &Arc<RwLock<BankForks>>, _bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> { ) -> Result<()> {
for ((_slot, data_shreds), batch_info) in receiver.lock().unwrap().iter() { for (data_shreds, batch_info) in receiver.lock().unwrap().iter() {
let fake = batch_info.is_some(); let fake = batch_info.is_some();
let peers = cluster_info.tvu_peers(); let peers = cluster_info.tvu_peers();
peers.iter().enumerate().for_each(|(i, peer)| { peers.iter().enumerate().for_each(|(i, peer)| {

View File

@ -41,7 +41,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
&mut self, &mut self,
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
receiver: &Receiver<WorkingBankEntry>, receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>, socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
) -> Result<()> { ) -> Result<()> {
// 1) Pull entries from banking stage // 1) Pull entries from banking stage
@ -108,7 +108,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
let data_shreds = Arc::new(data_shreds); let data_shreds = Arc::new(data_shreds);
blockstore_sender.send((data_shreds.clone(), None))?; blockstore_sender.send((data_shreds.clone(), None))?;
// 4) Start broadcast step // 4) Start broadcast step
socket_sender.send(((bank.slot(), data_shreds), None))?; socket_sender.send((data_shreds, None))?;
if let Some((good_last_data_shred, bad_last_data_shred)) = last_shreds { if let Some((good_last_data_shred, bad_last_data_shred)) = last_shreds {
// Stash away the good shred so we can rewrite them later // Stash away the good shred so we can rewrite them later
self.good_shreds.extend(good_last_data_shred.clone()); self.good_shreds.extend(good_last_data_shred.clone());
@ -127,7 +127,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
// Store the bad shred so we serve bad repairs to validators catching up // Store the bad shred so we serve bad repairs to validators catching up
blockstore_sender.send((bad_last_data_shred.clone(), None))?; blockstore_sender.send((bad_last_data_shred.clone(), None))?;
// Send bad shreds to rest of network // Send bad shreds to rest of network
socket_sender.send(((bank.slot(), bad_last_data_shred), None))?; socket_sender.send((bad_last_data_shred, None))?;
} }
Ok(()) Ok(())
} }
@ -138,27 +138,17 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
sock: &UdpSocket, sock: &UdpSocket,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> { ) -> Result<()> {
let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; let (shreds, _) = receiver.lock().unwrap().recv()?;
let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
};
// Broadcast data
let cluster_nodes =
self.cluster_nodes_cache
.get(slot, &root_bank, &working_bank, cluster_info);
broadcast_shreds( broadcast_shreds(
sock, sock,
&shreds, &shreds,
&cluster_nodes, &self.cluster_nodes_cache,
&Arc::new(AtomicInterval::default()), &Arc::new(AtomicInterval::default()),
&mut TransmitShredsStats::default(), &mut TransmitShredsStats::default(),
cluster_info.socket_addr_space(), cluster_info.socket_addr_space(),
cluster_info.id(), cluster_info,
bank_forks, bank_forks,
)?; )
Ok(())
} }
fn record( fn record(
&mut self, &mut self,

View File

@ -183,7 +183,7 @@ impl StandardBroadcastRun {
fn process_receive_results( fn process_receive_results(
&mut self, &mut self,
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>, socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
receive_results: ReceiveResults, receive_results: ReceiveResults,
) -> Result<()> { ) -> Result<()> {
@ -247,7 +247,8 @@ impl StandardBroadcastRun {
), ),
}); });
let shreds = Arc::new(prev_slot_shreds); let shreds = Arc::new(prev_slot_shreds);
socket_sender.send(((slot, shreds.clone()), batch_info.clone()))?; debug_assert!(shreds.iter().all(|shred| shred.slot() == slot));
socket_sender.send((shreds.clone(), batch_info.clone()))?;
blockstore_sender.send((shreds, batch_info))?; blockstore_sender.send((shreds, batch_info))?;
} }
@ -273,7 +274,8 @@ impl StandardBroadcastRun {
// Send data shreds // Send data shreds
let data_shreds = Arc::new(data_shreds); let data_shreds = Arc::new(data_shreds);
socket_sender.send(((bank.slot(), data_shreds.clone()), batch_info.clone()))?; debug_assert!(data_shreds.iter().all(|shred| shred.slot() == bank.slot()));
socket_sender.send((data_shreds.clone(), batch_info.clone()))?;
blockstore_sender.send((data_shreds, batch_info.clone()))?; blockstore_sender.send((data_shreds, batch_info.clone()))?;
// Create and send coding shreds // Create and send coding shreds
@ -284,7 +286,10 @@ impl StandardBroadcastRun {
&mut process_stats, &mut process_stats,
); );
let coding_shreds = Arc::new(coding_shreds); let coding_shreds = Arc::new(coding_shreds);
socket_sender.send(((bank.slot(), coding_shreds.clone()), batch_info.clone()))?; debug_assert!(coding_shreds
.iter()
.all(|shred| shred.slot() == bank.slot()));
socket_sender.send((coding_shreds.clone(), batch_info.clone()))?;
blockstore_sender.send((coding_shreds, batch_info))?; blockstore_sender.send((coding_shreds, batch_info))?;
coding_send_time.stop(); coding_send_time.stop();
@ -342,23 +347,11 @@ impl StandardBroadcastRun {
&mut self, &mut self,
sock: &UdpSocket, sock: &UdpSocket,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
slot: Slot,
shreds: Arc<Vec<Shred>>, shreds: Arc<Vec<Shred>>,
broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>, broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> { ) -> Result<()> {
trace!("Broadcasting {:?} shreds", shreds.len()); trace!("Broadcasting {:?} shreds", shreds.len());
// Get the list of peers to broadcast to
let mut get_peers_time = Measure::start("broadcast::get_peers");
let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
};
let cluster_nodes =
self.cluster_nodes_cache
.get(slot, &root_bank, &working_bank, cluster_info);
get_peers_time.stop();
let mut transmit_stats = TransmitShredsStats::default(); let mut transmit_stats = TransmitShredsStats::default();
// Broadcast the shreds // Broadcast the shreds
let mut transmit_time = Measure::start("broadcast_shreds"); let mut transmit_time = Measure::start("broadcast_shreds");
@ -366,17 +359,16 @@ impl StandardBroadcastRun {
broadcast_shreds( broadcast_shreds(
sock, sock,
&shreds, &shreds,
&cluster_nodes, &self.cluster_nodes_cache,
&self.last_datapoint_submit, &self.last_datapoint_submit,
&mut transmit_stats, &mut transmit_stats,
cluster_info.socket_addr_space(), cluster_info.socket_addr_space(),
cluster_info.id(), cluster_info,
bank_forks, bank_forks,
)?; )?;
transmit_time.stop(); transmit_time.stop();
transmit_stats.transmit_elapsed = transmit_time.as_us(); transmit_stats.transmit_elapsed = transmit_time.as_us();
transmit_stats.get_peers_elapsed = get_peers_time.as_us();
transmit_stats.num_shreds = shreds.len(); transmit_stats.num_shreds = shreds.len();
// Process metrics // Process metrics
@ -457,7 +449,7 @@ impl BroadcastRun for StandardBroadcastRun {
&mut self, &mut self,
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
receiver: &Receiver<WorkingBankEntry>, receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>, socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
) -> Result<()> { ) -> Result<()> {
let receive_results = broadcast_utils::recv_slot_entries(receiver)?; let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
@ -477,8 +469,8 @@ impl BroadcastRun for StandardBroadcastRun {
sock: &UdpSocket, sock: &UdpSocket,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> { ) -> Result<()> {
let ((slot, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?; let (shreds, batch_info) = receiver.lock().unwrap().recv()?;
self.broadcast(sock, cluster_info, slot, shreds, slot_start_ts, bank_forks) self.broadcast(sock, cluster_info, shreds, batch_info, bank_forks)
} }
fn record( fn record(
&mut self, &mut self,

View File

@ -50,7 +50,7 @@ pub struct ClusterNodes<T> {
type CacheEntry<T> = Option<(/*as of:*/ Instant, Arc<ClusterNodes<T>>)>; type CacheEntry<T> = Option<(/*as of:*/ Instant, Arc<ClusterNodes<T>>)>;
pub(crate) struct ClusterNodesCache<T> { pub struct ClusterNodesCache<T> {
// Cache entries are wrapped in Arc<Mutex<...>>, so that, when needed, only // Cache entries are wrapped in Arc<Mutex<...>>, so that, when needed, only
// one thread does the computations to update the entry for the epoch. // one thread does the computations to update the entry for the epoch.
cache: Mutex<LruCache<Epoch, Arc<Mutex<CacheEntry<T>>>>>, cache: Mutex<LruCache<Epoch, Arc<Mutex<CacheEntry<T>>>>>,
@ -230,7 +230,7 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
} }
impl<T> ClusterNodesCache<T> { impl<T> ClusterNodesCache<T> {
pub(crate) fn new( pub fn new(
// Capacity of underlying LRU-cache in terms of number of epochs. // Capacity of underlying LRU-cache in terms of number of epochs.
cap: usize, cap: usize,
// A time-to-live eviction policy is enforced to refresh entries in // A time-to-live eviction policy is enforced to refresh entries in