unifies cluster-nodes computation & caching across turbine stages (backport #18971) (#20231)

* sends slots (instead of stakes) through broadcast flow

Current broadcast code is computing stakes for each slot before sending
them down the channel:
https://github.com/solana-labs/solana/blob/049fb0417/core/src/broadcast_stage/standard_broadcast_run.rs#L208-L228
https://github.com/solana-labs/solana/blob/0cf52e206/core/src/broadcast_stage.rs#L342-L349

Since the stakes are a function of epoch the slot belongs to (and so
does not necessarily change from one slot to another), forwarding the
slot itself would allow better caching downstream.

In addition we need to invalidate the cache if the epoch changes (which
the current code does not do), and that requires to know which slot (and
so epoch) current broadcasted shreds belong to:
https://github.com/solana-labs/solana/blob/19bd30262/core/src/broadcast_stage/standard_broadcast_run.rs#L332-L344

(cherry picked from commit 44b11154ca)

# Conflicts:
#	core/src/broadcast_stage/broadcast_duplicates_run.rs
#	core/src/broadcast_stage/standard_broadcast_run.rs

* implements cluster-nodes cache

Cluster nodes are cached keyed by the respective epoch from which stakes
are obtained, and so if epoch changes cluster-nodes will be recomputed.

A time-to-live eviction policy is enforced to refresh entries in case
gossip contact-infos are updated.

(cherry picked from commit ecc1c7957f)

* uses cluster-nodes cache in retransmit stage

The new cluster-nodes cache will:
  * ensure cluster-nodes are recalculated if the epoch (and so the epoch
    staked nodes) changes.
  * encapsulate time-to-live eviction policy.

(cherry picked from commit 30bec3921e)

* uses cluster-nodes cache in broadcast-stage

* Current caching mechanism does not update cluster-nodes when the epoch
  (and so epoch staked nodes) changes:
  https://github.com/solana-labs/solana/blob/19bd30262/core/src/broadcast_stage/standard_broadcast_run.rs#L332-L344

* Additionally, the cache update has a concurrency bug in which the
  thread which does compare_and_swap may be blocked when it tries to
  obtain the write-lock on cache, while other threads will keep running
  ahead with the outdated cache (since the atomic timestamp is already
  updated).

In the new ClusterNodesCache, entries are keyed by epoch, and so if
epoch changes cluster-nodes will be recalculated. The time-to-live
eviction policy is also encapsulated and rigidly enforced.

(cherry picked from commit aa32738dd5)

# Conflicts:
#	core/src/broadcast_stage/broadcast_duplicates_run.rs
#	core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs
#	core/src/broadcast_stage/standard_broadcast_run.rs

* unifies cluster-nodes computation & caching across turbine stages

Broadcast-stage is using epoch_staked_nodes based on the same slot that
shreds belong to:
https://github.com/solana-labs/solana/blob/049fb0417/core/src/broadcast_stage/standard_broadcast_run.rs#L208-L228
https://github.com/solana-labs/solana/blob/0cf52e206/core/src/broadcast_stage.rs#L342-L349

But retransmit-stage is using bank-epoch of the working-bank:
https://github.com/solana-labs/solana/blob/19bd30262/core/src/retransmit_stage.rs#L272-L289

So the two are not consistent at epoch boundaries where some nodes may
have a working bank (or similarly a root bank) lagging other nodes. As a
result the node which obtains a packet may construct turbine broadcast
tree inconsistently with its parent node in the tree and so some packets
may fail to reach all nodes in the tree.

(cherry picked from commit 50d0e830c9)

* adds fallback & metric for when epoch staked-nodes are none

(cherry picked from commit fb69f45f14)

* allows only one thread to update cluster-nodes cache entry for an epoch

If two threads simultaneously call into ClusterNodesCache::get for the
same epoch, and the cache entry is outdated, then both threads recompute
cluster-nodes for the epoch and redundantly overwrite each other.

This commit wraps ClusterNodesCache entries in Arc<Mutex<...>>, so that
when needed only one thread does the computations to update the entry.

(cherry picked from commit eaf927cf49)

* falls back on working-bank if root-bank::epoch-staked-nodes is none

bank.get_leader_schedule_epoch(shred_slot)
is one epoch after epoch_schedule.get_epoch(shred_slot).

At epoch boundaries, shred is already one epoch after the root-slot. So
we need epoch-stakes 2 epochs ahead of the root. But the root bank only
has epoch-stakes for one epoch ahead, and as a result looking up epoch
staked-nodes from the root-bank fails.

To be backward compatible with the current master code, this commit
implements a fallback on working-bank if epoch staked-nodes obtained
from the root-bank is none.

(cherry picked from commit e4be00fece)

* removes backport merge conflicts

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2021-09-26 23:45:42 +00:00
committed by GitHub
parent cc1a3d6645
commit d68377e927
7 changed files with 250 additions and 184 deletions

View File

@ -43,6 +43,9 @@ pub(crate) mod broadcast_utils;
mod fail_entry_verification_broadcast_run;
mod standard_broadcast_run;
const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8;
const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5);
pub(crate) const NUM_INSERT_THREADS: usize = 2;
pub(crate) type RetransmitSlotsSender = CrossbeamSender<HashMap<Slot, Arc<Bank>>>;
pub(crate) type RetransmitSlotsReceiver = CrossbeamReceiver<HashMap<Slot, Arc<Bank>>>;
@ -132,7 +135,7 @@ impl BroadcastStageType {
}
}
pub type TransmitShreds = (Option<Arc<HashMap<Pubkey, u64>>>, Arc<Vec<Shred>>);
type TransmitShreds = (Slot, Arc<Vec<Shred>>);
trait BroadcastRun {
fn run(
&mut self,
@ -336,27 +339,25 @@ impl BroadcastStage {
}
for (_, bank) in retransmit_slots.iter() {
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
let stakes = bank.epoch_staked_nodes(bank_epoch);
let stakes = stakes.map(Arc::new);
let slot = bank.slot();
let data_shreds = Arc::new(
blockstore
.get_data_shreds_for_slot(bank.slot(), 0)
.get_data_shreds_for_slot(slot, 0)
.expect("My own shreds must be reconstructable"),
);
if !data_shreds.is_empty() {
socket_sender.send(((stakes.clone(), data_shreds), None))?;
socket_sender.send(((slot, data_shreds), None))?;
}
let coding_shreds = Arc::new(
blockstore
.get_coding_shreds_for_slot(bank.slot(), 0)
.get_coding_shreds_for_slot(slot, 0)
.expect("My own shreds must be reconstructable"),
);
if !coding_shreds.is_empty() {
socket_sender.send(((stakes.clone(), coding_shreds), None))?;
socket_sender.send(((slot, coding_shreds), None))?;
}
}
@ -461,10 +462,9 @@ pub mod test {
};
#[allow(clippy::implicit_hasher)]
pub fn make_transmit_shreds(
fn make_transmit_shreds(
slot: Slot,
num: u64,
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
) -> (
Vec<Shred>,
Vec<Shred>,
@ -486,11 +486,11 @@ pub mod test {
coding_shreds.clone(),
data_shreds
.into_iter()
.map(|s| (stakes.clone(), Arc::new(vec![s])))
.map(|s| (slot, Arc::new(vec![s])))
.collect(),
coding_shreds
.into_iter()
.map(|s| (stakes.clone(), Arc::new(vec![s])))
.map(|s| (slot, Arc::new(vec![s])))
.collect(),
)
}
@ -534,7 +534,7 @@ pub mod test {
// Make some shreds
let updated_slot = 0;
let (all_data_shreds, all_coding_shreds, _, _all_coding_transmit_shreds) =
make_transmit_shreds(updated_slot, 10, None);
make_transmit_shreds(updated_slot, 10);
let num_data_shreds = all_data_shreds.len();
let num_coding_shreds = all_coding_shreds.len();
assert!(num_data_shreds >= 10);

View File

@ -1,16 +1,19 @@
use super::broadcast_utils::ReceiveResults;
use super::*;
use log::*;
use solana_ledger::entry::{create_ticks, Entry, EntrySlice};
use solana_ledger::shred::Shredder;
use solana_runtime::blockhash_queue::BlockhashQueue;
use solana_sdk::clock::Slot;
use solana_sdk::fee_calculator::FeeCalculator;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, Signer};
use solana_sdk::transaction::Transaction;
use std::collections::VecDeque;
use std::sync::Mutex;
use {
super::{broadcast_utils::ReceiveResults, *},
crate::cluster_nodes::ClusterNodesCache,
solana_ledger::{
entry::{create_ticks, Entry, EntrySlice},
shred::Shredder,
},
solana_runtime::blockhash_queue::BlockhashQueue,
solana_sdk::{
fee_calculator::FeeCalculator,
hash::Hash,
signature::{Keypair, Signer},
transaction::Transaction,
},
std::collections::VecDeque,
};
// Queue which facilitates delivering shreds with a delay
type DelayedQueue = VecDeque<(Option<Pubkey>, Option<Vec<Shred>>)>;
@ -29,6 +32,7 @@ pub(super) struct BroadcastDuplicatesRun {
next_shred_index: u32,
shred_version: u16,
keypair: Arc<Keypair>,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
}
impl BroadcastDuplicatesRun {
@ -37,6 +41,10 @@ impl BroadcastDuplicatesRun {
shred_version: u16,
config: BroadcastDuplicatesConfig,
) -> Self {
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<BroadcastStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
));
let mut delayed_queue = DelayedQueue::new();
delayed_queue.resize(config.duplicate_send_delay, (None, None));
Self {
@ -49,6 +57,7 @@ impl BroadcastDuplicatesRun {
last_duplicate_entry_hash: Hash::default(),
shred_version,
keypair,
cluster_nodes_cache,
}
}
@ -257,29 +266,17 @@ impl BroadcastRun for BroadcastDuplicatesRun {
}
}
let duplicate_recipients = Arc::new(duplicate_recipients);
let real_recipients = Arc::new(real_recipients);
let _duplicate_recipients = Arc::new(duplicate_recipients);
let _real_recipients = Arc::new(real_recipients);
let data_shreds = Arc::new(data_shreds);
blockstore_sender.send((data_shreds.clone(), None))?;
// 3) Start broadcast step
socket_sender.send((
(
Some(duplicate_recipients.clone()),
Arc::new(duplicate_data_shreds),
),
None,
))?;
socket_sender.send((
(
Some(duplicate_recipients),
Arc::new(duplicate_coding_shreds),
),
None,
))?;
socket_sender.send(((Some(real_recipients.clone()), data_shreds), None))?;
socket_sender.send(((Some(real_recipients), Arc::new(coding_shreds)), None))?;
socket_sender.send(((bank.slot(), Arc::new(duplicate_data_shreds)), None))?;
socket_sender.send(((bank.slot(), Arc::new(duplicate_coding_shreds)), None))?;
socket_sender.send(((bank.slot(), data_shreds), None))?;
socket_sender.send(((bank.slot(), Arc::new(coding_shreds)), None))?;
Ok(())
}
@ -288,7 +285,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
receiver: &Arc<Mutex<TransmitReceiver>>,
cluster_info: &ClusterInfo,
sock: &UdpSocket,
_bank_forks: &Arc<RwLock<BankForks>>,
bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> {
// Check the delay queue for shreds that are ready to be sent
let (delayed_recipient, delayed_shreds) = {
@ -300,8 +297,10 @@ impl BroadcastRun for BroadcastDuplicatesRun {
}
};
let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?;
let stakes = stakes.unwrap();
let ((slot, shreds), _) = receiver.lock().unwrap().recv()?;
let root_bank = bank_forks.read().unwrap().root_bank();
let epoch = root_bank.get_leader_schedule_epoch(slot);
let stakes = root_bank.epoch_staked_nodes(epoch).unwrap_or_default();
let socket_addr_space = cluster_info.socket_addr_space();
for peer in cluster_info.tvu_peers() {
// Forward shreds to circumvent gossip

View File

@ -84,19 +84,20 @@ impl BroadcastRun for BroadcastFakeShredsRun {
let data_shreds = Arc::new(data_shreds);
blockstore_sender.send((data_shreds.clone(), None))?;
let slot = bank.slot();
let batch_info = BroadcastShredBatchInfo {
slot,
num_expected_batches: None,
slot_start_ts: Instant::now(),
};
// 3) Start broadcast step
//some indicates fake shreds
socket_sender.send((
(Some(Arc::new(HashMap::new())), Arc::new(fake_data_shreds)),
None,
))?;
socket_sender.send((
(Some(Arc::new(HashMap::new())), Arc::new(fake_coding_shreds)),
None,
))?;
let batch_info = Some(batch_info);
socket_sender.send(((slot, Arc::new(fake_data_shreds)), batch_info.clone()))?;
socket_sender.send(((slot, Arc::new(fake_coding_shreds)), batch_info))?;
//none indicates real shreds
socket_sender.send(((None, data_shreds), None))?;
socket_sender.send(((None, Arc::new(coding_shreds)), None))?;
socket_sender.send(((slot, data_shreds), None))?;
socket_sender.send(((slot, Arc::new(coding_shreds)), None))?;
Ok(())
}
@ -107,18 +108,15 @@ impl BroadcastRun for BroadcastFakeShredsRun {
sock: &UdpSocket,
_bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> {
for ((stakes, data_shreds), _) in receiver.lock().unwrap().iter() {
for ((_slot, data_shreds), batch_info) in receiver.lock().unwrap().iter() {
let fake = batch_info.is_some();
let peers = cluster_info.tvu_peers();
peers.iter().enumerate().for_each(|(i, peer)| {
if i <= self.partition && stakes.is_some() {
if fake == (i <= self.partition) {
// Send fake shreds to the first N peers
data_shreds.iter().for_each(|b| {
sock.send_to(&b.payload, &peer.tvu_forwards).unwrap();
});
} else if i > self.partition && stakes.is_none() {
data_shreds.iter().for_each(|b| {
sock.send_to(&b.payload, &peer.tvu_forwards).unwrap();
});
}
});
}

View File

@ -1,9 +1,10 @@
use super::*;
use crate::cluster_nodes::ClusterNodes;
use solana_ledger::shred::Shredder;
use solana_sdk::hash::Hash;
use solana_sdk::signature::Keypair;
use std::{thread::sleep, time::Duration};
use {
super::*,
crate::cluster_nodes::ClusterNodesCache,
solana_ledger::shred::Shredder,
solana_sdk::{hash::Hash, signature::Keypair},
std::{thread::sleep, time::Duration},
};
pub const NUM_BAD_SLOTS: u64 = 10;
pub const SLOT_TO_RESOLVE: u64 = 32;
@ -15,16 +16,22 @@ pub(super) struct FailEntryVerificationBroadcastRun {
good_shreds: Vec<Shred>,
current_slot: Slot,
next_shred_index: u32,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
}
impl FailEntryVerificationBroadcastRun {
pub(super) fn new(keypair: Arc<Keypair>, shred_version: u16) -> Self {
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<BroadcastStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
));
Self {
shred_version,
keypair,
good_shreds: vec![],
current_slot: 0,
next_shred_index: 0,
cluster_nodes_cache,
}
}
}
@ -101,10 +108,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
let data_shreds = Arc::new(data_shreds);
blockstore_sender.send((data_shreds.clone(), None))?;
// 4) Start broadcast step
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
let stakes = bank.epoch_staked_nodes(bank_epoch);
let stakes = stakes.map(Arc::new);
socket_sender.send(((stakes.clone(), data_shreds), None))?;
socket_sender.send(((bank.slot(), data_shreds), None))?;
if let Some((good_last_data_shred, bad_last_data_shred)) = last_shreds {
// Stash away the good shred so we can rewrite them later
self.good_shreds.extend(good_last_data_shred.clone());
@ -123,7 +127,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
// Store the bad shred so we serve bad repairs to validators catching up
blockstore_sender.send((bad_last_data_shred.clone(), None))?;
// Send bad shreds to rest of network
socket_sender.send(((stakes, bad_last_data_shred), None))?;
socket_sender.send(((bank.slot(), bad_last_data_shred), None))?;
}
Ok(())
}
@ -134,12 +138,15 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
sock: &UdpSocket,
bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> {
let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?;
let ((slot, shreds), _) = receiver.lock().unwrap().recv()?;
let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
};
// Broadcast data
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(
cluster_info,
stakes.as_deref().unwrap_or(&HashMap::default()),
);
let cluster_nodes =
self.cluster_nodes_cache
.get(slot, &root_bank, &working_bank, cluster_info);
broadcast_shreds(
sock,
&shreds,

View File

@ -1,23 +1,26 @@
#![allow(clippy::rc_buffer)]
use super::{
broadcast_utils::{self, ReceiveResults},
*,
};
use crate::{broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodes};
use solana_ledger::{
entry::Entry,
shred::{
ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK,
SHRED_TICK_REFERENCE_MASK,
use {
super::{
broadcast_utils::{self, ReceiveResults},
*,
},
crate::{
broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodesCache,
},
solana_ledger::{
entry::Entry,
shred::{
ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK,
SHRED_TICK_REFERENCE_MASK,
},
},
solana_sdk::{
signature::Keypair,
timing::{duration_as_us, AtomicInterval},
},
std::{ops::Deref, sync::RwLock, time::Duration},
};
use solana_sdk::{
pubkey::Pubkey,
signature::Keypair,
timing::{duration_as_us, AtomicInterval},
};
use std::{collections::HashMap, ops::Deref, sync::RwLock, time::Duration};
#[derive(Clone)]
pub struct StandardBroadcastRun {
@ -31,12 +34,16 @@ pub struct StandardBroadcastRun {
shred_version: u16,
last_datapoint_submit: Arc<AtomicInterval>,
num_batches: usize,
cluster_nodes: Arc<RwLock<ClusterNodes<BroadcastStage>>>,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
last_peer_update: Arc<AtomicInterval>,
}
impl StandardBroadcastRun {
pub(super) fn new(keypair: Arc<Keypair>, shred_version: u16) -> Self {
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<BroadcastStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
));
Self {
process_shreds_stats: ProcessShredsStats::default(),
transmit_shreds_stats: Arc::default(),
@ -48,7 +55,7 @@ impl StandardBroadcastRun {
shred_version,
last_datapoint_submit: Arc::default(),
num_batches: 0,
cluster_nodes: Arc::default(),
cluster_nodes_cache,
last_peer_update: Arc::new(AtomicInterval::default()),
}
}
@ -228,13 +235,11 @@ impl StandardBroadcastRun {
to_shreds_time.stop();
let mut get_leader_schedule_time = Measure::start("broadcast_get_leader_schedule");
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
let stakes = bank.epoch_staked_nodes(bank_epoch).map(Arc::new);
// Broadcast the last shred of the interrupted slot if necessary
if !prev_slot_shreds.is_empty() {
let slot = prev_slot_shreds[0].slot();
let batch_info = Some(BroadcastShredBatchInfo {
slot: prev_slot_shreds[0].slot(),
slot,
num_expected_batches: Some(old_num_batches + 1),
slot_start_ts: old_broadcast_start.expect(
"Old broadcast start time for previous slot must exist if the previous slot
@ -242,7 +247,7 @@ impl StandardBroadcastRun {
),
});
let shreds = Arc::new(prev_slot_shreds);
socket_sender.send(((stakes.clone(), shreds.clone()), batch_info.clone()))?;
socket_sender.send(((slot, shreds.clone()), batch_info.clone()))?;
blockstore_sender.send((shreds, batch_info))?;
}
@ -268,7 +273,7 @@ impl StandardBroadcastRun {
// Send data shreds
let data_shreds = Arc::new(data_shreds);
socket_sender.send(((stakes.clone(), data_shreds.clone()), batch_info.clone()))?;
socket_sender.send(((bank.slot(), data_shreds.clone()), batch_info.clone()))?;
blockstore_sender.send((data_shreds, batch_info.clone()))?;
// Create and send coding shreds
@ -279,7 +284,7 @@ impl StandardBroadcastRun {
&mut process_stats,
);
let coding_shreds = Arc::new(coding_shreds);
socket_sender.send(((stakes, coding_shreds.clone()), batch_info.clone()))?;
socket_sender.send(((bank.slot(), coding_shreds.clone()), batch_info.clone()))?;
blockstore_sender.send((coding_shreds, batch_info))?;
coding_send_time.stop();
@ -337,26 +342,22 @@ impl StandardBroadcastRun {
&mut self,
sock: &UdpSocket,
cluster_info: &ClusterInfo,
stakes: Option<&HashMap<Pubkey, u64>>,
slot: Slot,
shreds: Arc<Vec<Shred>>,
broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>,
bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> {
const BROADCAST_PEER_UPDATE_INTERVAL_MS: u64 = 1000;
trace!("Broadcasting {:?} shreds", shreds.len());
// Get the list of peers to broadcast to
let mut get_peers_time = Measure::start("broadcast::get_peers");
if self
.last_peer_update
.should_update_ext(BROADCAST_PEER_UPDATE_INTERVAL_MS, false)
{
*self.cluster_nodes.write().unwrap() = ClusterNodes::<BroadcastStage>::new(
cluster_info,
stakes.unwrap_or(&HashMap::default()),
);
}
let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
};
let cluster_nodes =
self.cluster_nodes_cache
.get(slot, &root_bank, &working_bank, cluster_info);
get_peers_time.stop();
let cluster_nodes = self.cluster_nodes.read().unwrap();
let mut transmit_stats = TransmitShredsStats::default();
// Broadcast the shreds
@ -372,7 +373,6 @@ impl StandardBroadcastRun {
cluster_info.id(),
bank_forks,
)?;
drop(cluster_nodes);
transmit_time.stop();
transmit_stats.transmit_elapsed = transmit_time.as_us();
@ -477,15 +477,8 @@ impl BroadcastRun for StandardBroadcastRun {
sock: &UdpSocket,
bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> {
let ((stakes, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?;
self.broadcast(
sock,
cluster_info,
stakes.as_deref(),
shreds,
slot_start_ts,
bank_forks,
)
let ((slot, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?;
self.broadcast(sock, cluster_info, slot, shreds, slot_start_ts, bank_forks)
}
fn record(
&mut self,

View File

@ -1,14 +1,27 @@
use {
crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage},
itertools::Itertools,
lru::LruCache,
solana_gossip::{
cluster_info::{compute_retransmit_peers, ClusterInfo},
contact_info::ContactInfo,
crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
weighted_shuffle::{weighted_best, weighted_shuffle},
},
solana_sdk::pubkey::Pubkey,
std::{any::TypeId, cmp::Reverse, collections::HashMap, marker::PhantomData},
solana_runtime::bank::Bank,
solana_sdk::{
clock::{Epoch, Slot},
pubkey::Pubkey,
},
std::{
any::TypeId,
cmp::Reverse,
collections::HashMap,
marker::PhantomData,
ops::Deref,
sync::{Arc, Mutex},
time::{Duration, Instant},
},
};
enum NodeId {
@ -35,6 +48,15 @@ pub struct ClusterNodes<T> {
_phantom: PhantomData<T>,
}
type CacheEntry<T> = Option<(/*as of:*/ Instant, Arc<ClusterNodes<T>>)>;
pub(crate) struct ClusterNodesCache<T> {
// Cache entries are wrapped in Arc<Mutex<...>>, so that, when needed, only
// one thread does the computations to update the entry for the epoch.
cache: Mutex<LruCache<Epoch, Arc<Mutex<CacheEntry<T>>>>>,
ttl: Duration, // Time to live.
}
impl Node {
#[inline]
fn pubkey(&self) -> Pubkey {
@ -54,12 +76,12 @@ impl Node {
}
impl<T> ClusterNodes<T> {
pub fn num_peers(&self) -> usize {
pub(crate) fn num_peers(&self) -> usize {
self.index.len()
}
// A peer is considered live if they generated their contact info recently.
pub fn num_peers_live(&self, now: u64) -> usize {
pub(crate) fn num_peers_live(&self, now: u64) -> usize {
self.index
.iter()
.filter_map(|(_, index)| self.nodes[*index].contact_info())
@ -82,7 +104,7 @@ impl ClusterNodes<BroadcastStage> {
/// Returns the root of turbine broadcast tree, which the leader sends the
/// shred to.
pub fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> {
pub(crate) fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> {
if self.index.is_empty() {
None
} else {
@ -96,11 +118,7 @@ impl ClusterNodes<BroadcastStage> {
}
impl ClusterNodes<RetransmitStage> {
pub fn new(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Self {
new_cluster_nodes(cluster_info, stakes)
}
pub fn get_retransmit_peers(
pub(crate) fn get_retransmit_peers(
&self,
shred_seed: [u8; 32],
fanout: usize,
@ -211,6 +229,70 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
.collect()
}
impl<T> ClusterNodesCache<T> {
pub(crate) fn new(
// Capacity of underlying LRU-cache in terms of number of epochs.
cap: usize,
// A time-to-live eviction policy is enforced to refresh entries in
// case gossip contact-infos are updated.
ttl: Duration,
) -> Self {
Self {
cache: Mutex::new(LruCache::new(cap)),
ttl,
}
}
}
impl<T: 'static> ClusterNodesCache<T> {
fn get_cache_entry(&self, epoch: Epoch) -> Arc<Mutex<CacheEntry<T>>> {
let mut cache = self.cache.lock().unwrap();
match cache.get(&epoch) {
Some(entry) => Arc::clone(entry),
None => {
let entry = Arc::default();
cache.put(epoch, Arc::clone(&entry));
entry
}
}
}
pub(crate) fn get(
&self,
shred_slot: Slot,
root_bank: &Bank,
working_bank: &Bank,
cluster_info: &ClusterInfo,
) -> Arc<ClusterNodes<T>> {
let epoch = root_bank.get_leader_schedule_epoch(shred_slot);
let entry = self.get_cache_entry(epoch);
// Hold the lock on the entry here so that, if needed, only
// one thread recomputes cluster-nodes for this epoch.
let mut entry = entry.lock().unwrap();
if let Some((asof, nodes)) = entry.deref() {
if asof.elapsed() < self.ttl {
return Arc::clone(nodes);
}
}
let epoch_staked_nodes = [root_bank, working_bank]
.iter()
.find_map(|bank| bank.epoch_staked_nodes(epoch));
if epoch_staked_nodes.is_none() {
inc_new_counter_info!("cluster_nodes-unknown_epoch_staked_nodes", 1);
if epoch != root_bank.get_leader_schedule_epoch(root_bank.slot()) {
return self.get(root_bank.slot(), root_bank, working_bank, cluster_info);
}
inc_new_counter_info!("cluster_nodes-unknown_epoch_staked_nodes_root", 1);
}
let nodes = Arc::new(new_cluster_nodes::<T>(
cluster_info,
&epoch_staked_nodes.unwrap_or_default(),
));
*entry = Some((Instant::now(), Arc::clone(&nodes)));
nodes
}
}
impl From<ContactInfo> for NodeId {
fn from(node: ContactInfo) -> Self {
NodeId::ContactInfo(node)
@ -319,7 +401,7 @@ mod tests {
let this_node = cluster_info.my_contact_info();
// ClusterInfo::tvu_peers excludes the node itself.
assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
let cluster_nodes = ClusterNodes::<RetransmitStage>::new(&cluster_info, &stakes);
let cluster_nodes = new_cluster_nodes::<RetransmitStage>(&cluster_info, &stakes);
// All nodes with contact-info should be in the index.
assert_eq!(cluster_nodes.index.len(), nodes.len());
// Staked nodes with no contact-info should be included.

View File

@ -4,7 +4,7 @@
use {
crate::{
cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_nodes::ClusterNodes,
cluster_nodes::ClusterNodesCache,
cluster_slots::ClusterSlots,
cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver},
completed_data_sets_service::CompletedDataSetsSender,
@ -33,7 +33,7 @@ use {
},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
clock::{Epoch, Slot},
clock::Slot,
epoch_schedule::EpochSchedule,
pubkey::Pubkey,
timing::{timestamp, AtomicInterval},
@ -63,6 +63,9 @@ const DEFAULT_LRU_SIZE: usize = 10_000;
// it doesn't pull up too much work.
const MAX_PACKET_BATCH_SIZE: usize = 100;
const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8;
const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5);
#[derive(Default)]
struct RetransmitStats {
total_packets: AtomicU64,
@ -290,36 +293,23 @@ fn check_if_first_shred_received(
}
}
fn maybe_update_peers_cache(
cluster_nodes: &RwLock<ClusterNodes<RetransmitStage>>,
fn maybe_reset_shreds_received_cache(
shreds_received: &Mutex<ShredFilterAndHasher>,
last_peer_update: &AtomicU64,
cluster_info: &ClusterInfo,
bank_epoch: Epoch,
working_bank: &Bank,
hasher_reset_ts: &AtomicU64,
) {
const UPDATE_INTERVAL_MS: u64 = 1000;
if timestamp().saturating_sub(last_peer_update.load(Ordering::Acquire)) < UPDATE_INTERVAL_MS {
return;
}
let now = timestamp();
let prev = hasher_reset_ts.load(Ordering::Acquire);
if now.saturating_sub(prev) > UPDATE_INTERVAL_MS
&& hasher_reset_ts
.compare_exchange(prev, now, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
// Write-lock cluster-nodes here so that only one thread does the
// computations to update peers.
let mut cluster_nodes = cluster_nodes.write().unwrap();
let now = timestamp();
if now.saturating_sub(last_peer_update.load(Ordering::Acquire)) < UPDATE_INTERVAL_MS {
return; // Some other thread has already done the update.
}
let epoch_staked_nodes = working_bank
.epoch_staked_nodes(bank_epoch)
.unwrap_or_default();
*cluster_nodes = ClusterNodes::<RetransmitStage>::new(cluster_info, &epoch_staked_nodes);
last_peer_update.store(now, Ordering::Release);
let mut shreds_received = shreds_received.lock().unwrap();
let (cache, hasher) = shreds_received.deref_mut();
cache.clear();
hasher.reset();
}
let mut shreds_received = shreds_received.lock().unwrap();
let (cache, hasher) = shreds_received.deref_mut();
cache.clear();
hasher.reset();
}
#[allow(clippy::too_many_arguments)]
@ -331,8 +321,8 @@ fn retransmit(
sock: &UdpSocket,
id: u32,
stats: &RetransmitStats,
cluster_nodes: &RwLock<ClusterNodes<RetransmitStage>>,
last_peer_update: &AtomicU64,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
hasher_reset_ts: &AtomicU64,
shreds_received: &Mutex<ShredFilterAndHasher>,
max_slots: &MaxSlots,
first_shreds_received: &Mutex<BTreeSet<Slot>>,
@ -358,20 +348,10 @@ fn retransmit(
let bank_forks = bank_forks.read().unwrap();
(bank_forks.working_bank(), bank_forks.root_bank())
};
let bank_epoch = working_bank.get_leader_schedule_epoch(working_bank.slot());
epoch_fetch.stop();
let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update");
maybe_update_peers_cache(
cluster_nodes,
shreds_received,
last_peer_update,
cluster_info,
bank_epoch,
&working_bank,
);
let cluster_nodes = cluster_nodes.read().unwrap();
let peers_len = cluster_nodes.num_peers();
maybe_reset_shreds_received_cache(shreds_received, hasher_reset_ts);
epoch_cache_update.stop();
let my_id = cluster_info.id();
@ -418,6 +398,8 @@ fn retransmit(
let mut compute_turbine_peers = Measure::start("turbine_start");
let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank));
let cluster_nodes =
cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info);
let (neighbors, children) =
cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader);
// If the node is on the critical path (i.e. the first node in each
@ -471,6 +453,8 @@ fn retransmit(
retransmit_total,
id,
);
let cluster_nodes =
cluster_nodes_cache.get(root_bank.slot(), &root_bank, &working_bank, cluster_info);
update_retransmit_stats(
stats,
timer_start.as_us(),
@ -480,7 +464,7 @@ fn retransmit(
repair_total,
duplicate_retransmit,
compute_turbine_peers_total,
peers_len,
cluster_nodes.num_peers(),
packets_by_slot,
packets_by_source,
epoch_fetch.as_us(),
@ -508,8 +492,11 @@ pub fn retransmitter(
max_slots: Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
) -> Vec<JoinHandle<()>> {
let cluster_nodes = Arc::default();
let last_peer_update = Arc::default();
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<RetransmitStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
));
let hasher_reset_ts = Arc::default();
let stats = Arc::new(RetransmitStats::default());
let shreds_received = Arc::new(Mutex::new((
LruCache::new(DEFAULT_LRU_SIZE),
@ -524,8 +511,8 @@ pub fn retransmitter(
let r = r.clone();
let cluster_info = cluster_info.clone();
let stats = stats.clone();
let cluster_nodes = Arc::clone(&cluster_nodes);
let last_peer_update = Arc::clone(&last_peer_update);
let cluster_nodes_cache = Arc::clone(&cluster_nodes_cache);
let hasher_reset_ts = Arc::clone(&hasher_reset_ts);
let shreds_received = shreds_received.clone();
let max_slots = max_slots.clone();
let first_shreds_received = first_shreds_received.clone();
@ -544,8 +531,8 @@ pub fn retransmitter(
&sockets[s],
s as u32,
&stats,
&cluster_nodes,
&last_peer_update,
&cluster_nodes_cache,
&hasher_reset_ts,
&shreds_received,
&max_slots,
&first_shreds_received,