From aa32738dd5afb1dd4bced7bac0fdacd733650112 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 29 Jul 2021 12:25:56 -0400 Subject: [PATCH] uses cluster-nodes cache in broadcast-stage * Current caching mechanism does not update cluster-nodes when the epoch (and so epoch staked nodes) changes: https://github.com/solana-labs/solana/blob/19bd30262/core/src/broadcast_stage/standard_broadcast_run.rs#L332-L344 * Additionally, the cache update has a concurrency bug in which the thread which does compare_and_swap may be blocked when it tries to obtain the write-lock on cache, while other threads will keep running ahead with the outdated cache (since the atomic timestamp is already updated). In the new ClusterNodesCache, entries are keyed by epoch, and so if epoch changes cluster-nodes will be recalculated. The time-to-live eviction policy is also encapsulated and rigidly enforced. --- core/src/broadcast_stage.rs | 3 ++ .../broadcast_duplicates_run.rs | 30 +++++++++++-------- .../fail_entry_verification_broadcast_run.rs | 24 ++++++++------- .../broadcast_stage/standard_broadcast_run.rs | 19 ++++++------ 4 files changed, 45 insertions(+), 31 deletions(-) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 9f2c2b3f6e..15c4c815d4 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -44,6 +44,9 @@ pub(crate) mod broadcast_utils; mod fail_entry_verification_broadcast_run; mod standard_broadcast_run; +const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; +const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); + pub(crate) const NUM_INSERT_THREADS: usize = 2; pub(crate) type RetransmitSlotsSender = CrossbeamSender>>; pub(crate) type RetransmitSlotsReceiver = CrossbeamReceiver>>; diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 8e7ba4a479..a593973a5e 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -1,11 +1,14 @@ -use super::*; -use solana_entry::entry::Entry; -use solana_ledger::shred::Shredder; -use solana_runtime::blockhash_queue::BlockhashQueue; -use solana_sdk::{ - hash::Hash, - signature::{Keypair, Signer}, - system_transaction, +use { + super::*, + crate::cluster_nodes::ClusterNodesCache, + solana_entry::entry::Entry, + solana_ledger::shred::Shredder, + solana_runtime::blockhash_queue::BlockhashQueue, + solana_sdk::{ + hash::Hash, + signature::{Keypair, Signer}, + system_transaction, + }, }; pub const MINIMUM_DUPLICATE_SLOT: Slot = 20; @@ -32,10 +35,15 @@ pub(super) struct BroadcastDuplicatesRun { recent_blockhash: Option, prev_entry_hash: Option, num_slots_broadcasted: usize, + cluster_nodes_cache: Arc>, } impl BroadcastDuplicatesRun { pub(super) fn new(shred_version: u16, config: BroadcastDuplicatesConfig) -> Self { + let cluster_nodes_cache = Arc::new(ClusterNodesCache::::new( + CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, + CLUSTER_NODES_CACHE_TTL, + )); Self { config, duplicate_queue: BlockhashQueue::default(), @@ -47,6 +55,7 @@ impl BroadcastDuplicatesRun { recent_blockhash: None, prev_entry_hash: None, num_slots_broadcasted: 0, + cluster_nodes_cache, } } } @@ -220,11 +229,8 @@ impl BroadcastRun for BroadcastDuplicatesRun { ) -> Result<()> { let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; let root_bank = bank_forks.read().unwrap().root_bank(); - let epoch = root_bank.get_leader_schedule_epoch(slot); - let stakes = root_bank.epoch_staked_nodes(epoch); // Broadcast data - let cluster_nodes = - ClusterNodes::::new(cluster_info, &stakes.unwrap_or_default()); + let cluster_nodes = self.cluster_nodes_cache.get(slot, &root_bank, cluster_info); broadcast_shreds( sock, &shreds, diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 39d92af654..978421d33d 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -1,9 +1,10 @@ -use super::*; -use crate::cluster_nodes::ClusterNodes; -use solana_ledger::shred::Shredder; -use solana_sdk::hash::Hash; -use solana_sdk::signature::Keypair; -use std::{thread::sleep, time::Duration}; +use { + super::*, + crate::cluster_nodes::ClusterNodesCache, + solana_ledger::shred::Shredder, + solana_sdk::{hash::Hash, signature::Keypair}, + std::{thread::sleep, time::Duration}, +}; pub const NUM_BAD_SLOTS: u64 = 10; pub const SLOT_TO_RESOLVE: u64 = 32; @@ -14,15 +15,21 @@ pub(super) struct FailEntryVerificationBroadcastRun { good_shreds: Vec, current_slot: Slot, next_shred_index: u32, + cluster_nodes_cache: Arc>, } impl FailEntryVerificationBroadcastRun { pub(super) fn new(shred_version: u16) -> Self { + let cluster_nodes_cache = Arc::new(ClusterNodesCache::::new( + CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, + CLUSTER_NODES_CACHE_TTL, + )); Self { shred_version, good_shreds: vec![], current_slot: 0, next_shred_index: 0, + cluster_nodes_cache, } } } @@ -132,11 +139,8 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { ) -> Result<()> { let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; let root_bank = bank_forks.read().unwrap().root_bank(); - let epoch = root_bank.get_leader_schedule_epoch(slot); - let stakes = root_bank.epoch_staked_nodes(epoch); // Broadcast data - let cluster_nodes = - ClusterNodes::::new(cluster_info, &stakes.unwrap_or_default()); + let cluster_nodes = self.cluster_nodes_cache.get(slot, &root_bank, cluster_info); broadcast_shreds( sock, &shreds, diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index db02a0b3e4..7688abefcc 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -5,7 +5,9 @@ use { broadcast_utils::{self, ReceiveResults}, *, }, - crate::{broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodes}, + crate::{ + broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodesCache, + }, solana_entry::entry::Entry, solana_ledger::shred::{ ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, @@ -29,12 +31,16 @@ pub struct StandardBroadcastRun { shred_version: u16, last_datapoint_submit: Arc, num_batches: usize, - cluster_nodes: Arc>>, + cluster_nodes_cache: Arc>, last_peer_update: Arc, } impl StandardBroadcastRun { pub(super) fn new(shred_version: u16) -> Self { + let cluster_nodes_cache = Arc::new(ClusterNodesCache::::new( + CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, + CLUSTER_NODES_CACHE_TTL, + )); Self { process_shreds_stats: ProcessShredsStats::default(), transmit_shreds_stats: Arc::default(), @@ -45,7 +51,7 @@ impl StandardBroadcastRun { shred_version, last_datapoint_submit: Arc::default(), num_batches: 0, - cluster_nodes: Arc::default(), + cluster_nodes_cache, last_peer_update: Arc::new(AtomicInterval::default()), } } @@ -342,12 +348,8 @@ impl StandardBroadcastRun { // Get the list of peers to broadcast to let mut get_peers_time = Measure::start("broadcast::get_peers"); let root_bank = bank_forks.read().unwrap().root_bank(); - let epoch = root_bank.get_leader_schedule_epoch(slot); - let stakes = root_bank.epoch_staked_nodes(epoch); - *self.cluster_nodes.write().unwrap() = - ClusterNodes::::new(cluster_info, &stakes.unwrap_or_default()); + let cluster_nodes = self.cluster_nodes_cache.get(slot, &root_bank, cluster_info); get_peers_time.stop(); - let cluster_nodes = self.cluster_nodes.read().unwrap(); let mut transmit_stats = TransmitShredsStats::default(); // Broadcast the shreds @@ -363,7 +365,6 @@ impl StandardBroadcastRun { bank_forks, cluster_info.socket_addr_space(), )?; - drop(cluster_nodes); transmit_time.stop(); transmit_stats.transmit_elapsed = transmit_time.as_us();