uses cluster-nodes cache in retransmit stage

The new cluster-nodes cache will:
  * ensure cluster-nodes are recalculated if the epoch (and so the epoch
    staked nodes) changes.
  * encapsulate time-to-live eviction policy.
This commit is contained in:
behzad nouri
2021-07-28 11:52:39 -04:00
parent ecc1c7957f
commit 30bec3921e

View File

@ -5,7 +5,7 @@ use {
crate::{ crate::{
ancestor_hashes_service::AncestorHashesReplayUpdateReceiver, ancestor_hashes_service::AncestorHashesReplayUpdateReceiver,
cluster_info_vote_listener::VerifiedVoteReceiver, cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_nodes::ClusterNodes, cluster_nodes::ClusterNodesCache,
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver}, cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver},
completed_data_sets_service::CompletedDataSetsSender, completed_data_sets_service::CompletedDataSetsSender,
@ -28,7 +28,7 @@ use {
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}, solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{ solana_sdk::{
clock::{Epoch, Slot}, clock::Slot,
epoch_schedule::EpochSchedule, epoch_schedule::EpochSchedule,
pubkey::Pubkey, pubkey::Pubkey,
timing::{timestamp, AtomicInterval}, timing::{timestamp, AtomicInterval},
@ -58,6 +58,9 @@ const DEFAULT_LRU_SIZE: usize = 10_000;
// it doesn't pull up too much work. // it doesn't pull up too much work.
const MAX_PACKET_BATCH_SIZE: usize = 100; const MAX_PACKET_BATCH_SIZE: usize = 100;
const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8;
const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5);
#[derive(Default)] #[derive(Default)]
struct RetransmitStats { struct RetransmitStats {
total_packets: AtomicU64, total_packets: AtomicU64,
@ -275,36 +278,23 @@ fn check_if_first_shred_received(
} }
} }
fn maybe_update_peers_cache( fn maybe_reset_shreds_received_cache(
cluster_nodes: &RwLock<ClusterNodes<RetransmitStage>>,
shreds_received: &Mutex<ShredFilterAndHasher>, shreds_received: &Mutex<ShredFilterAndHasher>,
last_peer_update: &AtomicU64, hasher_reset_ts: &AtomicU64,
cluster_info: &ClusterInfo,
bank_epoch: Epoch,
working_bank: &Bank,
) { ) {
const UPDATE_INTERVAL_MS: u64 = 1000; const UPDATE_INTERVAL_MS: u64 = 1000;
if timestamp().saturating_sub(last_peer_update.load(Ordering::Acquire)) < UPDATE_INTERVAL_MS {
return;
}
{
// Write-lock cluster-nodes here so that only one thread does the
// computations to update peers.
let mut cluster_nodes = cluster_nodes.write().unwrap();
let now = timestamp(); let now = timestamp();
if now.saturating_sub(last_peer_update.load(Ordering::Acquire)) < UPDATE_INTERVAL_MS { let prev = hasher_reset_ts.load(Ordering::Acquire);
return; // Some other thread has already done the update. if now.saturating_sub(prev) > UPDATE_INTERVAL_MS
} && hasher_reset_ts
let epoch_staked_nodes = working_bank .compare_exchange(prev, now, Ordering::AcqRel, Ordering::Acquire)
.epoch_staked_nodes(bank_epoch) .is_ok()
.unwrap_or_default(); {
*cluster_nodes = ClusterNodes::<RetransmitStage>::new(cluster_info, &epoch_staked_nodes);
last_peer_update.store(now, Ordering::Release);
}
let mut shreds_received = shreds_received.lock().unwrap(); let mut shreds_received = shreds_received.lock().unwrap();
let (cache, hasher) = shreds_received.deref_mut(); let (cache, hasher) = shreds_received.deref_mut();
cache.clear(); cache.clear();
hasher.reset(); hasher.reset();
}
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
@ -316,8 +306,8 @@ fn retransmit(
sock: &UdpSocket, sock: &UdpSocket,
id: u32, id: u32,
stats: &RetransmitStats, stats: &RetransmitStats,
cluster_nodes: &RwLock<ClusterNodes<RetransmitStage>>, cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
last_peer_update: &AtomicU64, hasher_reset_ts: &AtomicU64,
shreds_received: &Mutex<ShredFilterAndHasher>, shreds_received: &Mutex<ShredFilterAndHasher>,
max_slots: &MaxSlots, max_slots: &MaxSlots,
first_shreds_received: &Mutex<BTreeSet<Slot>>, first_shreds_received: &Mutex<BTreeSet<Slot>>,
@ -343,20 +333,11 @@ fn retransmit(
let bank_forks = bank_forks.read().unwrap(); let bank_forks = bank_forks.read().unwrap();
(bank_forks.working_bank(), bank_forks.root_bank()) (bank_forks.working_bank(), bank_forks.root_bank())
}; };
let bank_epoch = working_bank.get_leader_schedule_epoch(working_bank.slot());
epoch_fetch.stop(); epoch_fetch.stop();
let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update"); let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update");
maybe_update_peers_cache( maybe_reset_shreds_received_cache(shreds_received, hasher_reset_ts);
cluster_nodes, let cluster_nodes = cluster_nodes_cache.get(cluster_info, &working_bank);
shreds_received,
last_peer_update,
cluster_info,
bank_epoch,
&working_bank,
);
let cluster_nodes = cluster_nodes.read().unwrap();
let peers_len = cluster_nodes.num_peers();
epoch_cache_update.stop(); epoch_cache_update.stop();
let my_id = cluster_info.id(); let my_id = cluster_info.id();
@ -459,7 +440,7 @@ fn retransmit(
discard_total, discard_total,
repair_total, repair_total,
compute_turbine_peers_total, compute_turbine_peers_total,
peers_len, cluster_nodes.num_peers(),
packets_by_slot, packets_by_slot,
packets_by_source, packets_by_source,
epoch_fetch.as_us(), epoch_fetch.as_us(),
@ -487,8 +468,11 @@ pub fn retransmitter(
max_slots: Arc<MaxSlots>, max_slots: Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>, rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
) -> Vec<JoinHandle<()>> { ) -> Vec<JoinHandle<()>> {
let cluster_nodes = Arc::default(); let cluster_nodes_cache = Arc::new(ClusterNodesCache::<RetransmitStage>::new(
let last_peer_update = Arc::default(); CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
));
let hasher_reset_ts = Arc::default();
let stats = Arc::new(RetransmitStats::default()); let stats = Arc::new(RetransmitStats::default());
let shreds_received = Arc::new(Mutex::new(( let shreds_received = Arc::new(Mutex::new((
LruCache::new(DEFAULT_LRU_SIZE), LruCache::new(DEFAULT_LRU_SIZE),
@ -503,8 +487,8 @@ pub fn retransmitter(
let r = r.clone(); let r = r.clone();
let cluster_info = cluster_info.clone(); let cluster_info = cluster_info.clone();
let stats = stats.clone(); let stats = stats.clone();
let cluster_nodes = Arc::clone(&cluster_nodes); let cluster_nodes_cache = Arc::clone(&cluster_nodes_cache);
let last_peer_update = Arc::clone(&last_peer_update); let hasher_reset_ts = Arc::clone(&hasher_reset_ts);
let shreds_received = shreds_received.clone(); let shreds_received = shreds_received.clone();
let max_slots = max_slots.clone(); let max_slots = max_slots.clone();
let first_shreds_received = first_shreds_received.clone(); let first_shreds_received = first_shreds_received.clone();
@ -523,8 +507,8 @@ pub fn retransmitter(
&sockets[s], &sockets[s],
s as u32, s as u32,
&stats, &stats,
&cluster_nodes, &cluster_nodes_cache,
&last_peer_update, &hasher_reset_ts,
&shreds_received, &shreds_received,
&max_slots, &max_slots,
&first_shreds_received, &first_shreds_received,