From d06dc6c8a6837f75fd3695390a4ccae8c2c3660c Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 29 Jul 2021 16:20:15 +0000 Subject: [PATCH] shares cluster-nodes between retransmit threads (#18947) cluster_nodes and last_peer_update are not shared between retransmit threads, as each thread have its own value: https://github.com/solana-labs/solana/blob/65ccfed86/core/src/retransmit_stage.rs#L476-L477 Additionally, with shared references, this code: https://github.com/solana-labs/solana/blob/0167daa11/core/src/retransmit_stage.rs#L315-L328 has a concurrency bug where the thread which does compare_and_swap, updates cluster_nodes much later after other threads have run with outdated cluster_nodes for a while. In particular, the write-lock there may block. --- core/benches/retransmit_stage.rs | 66 +++++++------ core/src/retransmit_stage.rs | 162 +++++++++++++++++++------------ 2 files changed, 136 insertions(+), 92 deletions(-) diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 43db1e58ac..4c3bba2858 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -3,34 +3,42 @@ extern crate solana_core; extern crate test; -use log::*; -use solana_core::retransmit_stage::retransmitter; -use solana_entry::entry::Entry; -use solana_gossip::cluster_info::{ClusterInfo, Node}; -use solana_gossip::contact_info::ContactInfo; -use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; -use solana_ledger::leader_schedule_cache::LeaderScheduleCache; -use solana_ledger::shred::Shredder; -use solana_measure::measure::Measure; -use solana_perf::packet::{Packet, Packets}; -use solana_rpc::max_slots::MaxSlots; -use solana_runtime::bank::Bank; -use solana_runtime::bank_forks::BankForks; -use solana_sdk::hash::Hash; -use solana_sdk::pubkey; -use solana_sdk::signature::{Keypair, Signer}; -use solana_sdk::system_transaction; -use solana_sdk::timing::timestamp; -use solana_streamer::socket::SocketAddrSpace; -use std::net::UdpSocket; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::channel; -use std::sync::Mutex; -use std::sync::{Arc, RwLock}; -use std::thread::sleep; -use std::thread::Builder; -use std::time::Duration; -use test::Bencher; +use { + log::*, + solana_core::retransmit_stage::retransmitter, + solana_entry::entry::Entry, + solana_gossip::{ + cluster_info::{ClusterInfo, Node}, + contact_info::ContactInfo, + }, + solana_ledger::{ + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + leader_schedule_cache::LeaderScheduleCache, + shred::Shredder, + }, + solana_measure::measure::Measure, + solana_perf::packet::{Packet, Packets}, + solana_runtime::{bank::Bank, bank_forks::BankForks}, + solana_sdk::{ + hash::Hash, + pubkey, + signature::{Keypair, Signer}, + system_transaction, + timing::timestamp, + }, + solana_streamer::socket::SocketAddrSpace, + std::{ + net::UdpSocket, + sync::{ + atomic::{AtomicUsize, Ordering}, + mpsc::channel, + Arc, Mutex, RwLock, + }, + thread::{sleep, Builder}, + time::Duration, + }, + test::Bencher, +}; #[bench] #[allow(clippy::same_item_push)] @@ -102,7 +110,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { &leader_schedule_cache, cluster_info, packet_receiver, - &Arc::new(MaxSlots::default()), + Arc::default(), // solana_rpc::max_slots::MaxSlots None, ); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 1efd2ff21e..aa61933d60 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -1,47 +1,54 @@ //! The `retransmit_stage` retransmits shreds between validators #![allow(clippy::rc_buffer)] -use crate::{ - ancestor_hashes_service::AncestorHashesReplayUpdateReceiver, - cluster_info_vote_listener::VerifiedVoteReceiver, - cluster_nodes::ClusterNodes, - cluster_slots::ClusterSlots, - cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver}, - completed_data_sets_service::CompletedDataSetsSender, - repair_service::{DuplicateSlotsResetSender, RepairInfo}, - result::{Error, Result}, - window_service::{should_retransmit_and_persist, WindowService}, -}; -use crossbeam_channel::{Receiver, Sender}; -use lru::LruCache; -use solana_client::rpc_response::SlotUpdate; -use solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT}; -use solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}; -use solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache}; -use solana_measure::measure::Measure; -use solana_metrics::inc_new_counter_error; -use solana_perf::packet::{Packet, Packets}; -use solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}; -use solana_runtime::{bank::Bank, bank_forks::BankForks}; -use solana_sdk::{ - clock::Slot, - epoch_schedule::EpochSchedule, - pubkey::Pubkey, - timing::{timestamp, AtomicInterval}, -}; -use solana_streamer::streamer::PacketReceiver; -use std::{ - collections::hash_set::HashSet, - collections::{BTreeMap, BTreeSet, HashMap}, - net::UdpSocket, - ops::{Deref, DerefMut}, - sync::atomic::{AtomicBool, AtomicU64, Ordering}, - sync::mpsc::channel, - sync::mpsc::RecvTimeoutError, - sync::Mutex, - sync::{Arc, RwLock}, - thread::{self, Builder, JoinHandle}, - time::Duration, +use { + crate::{ + ancestor_hashes_service::AncestorHashesReplayUpdateReceiver, + cluster_info_vote_listener::VerifiedVoteReceiver, + cluster_nodes::ClusterNodes, + cluster_slots::ClusterSlots, + cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver}, + completed_data_sets_service::CompletedDataSetsSender, + packet_hasher::PacketHasher, + repair_service::{DuplicateSlotsResetSender, RepairInfo}, + result::{Error, Result}, + window_service::{should_retransmit_and_persist, WindowService}, + }, + crossbeam_channel::{Receiver, Sender}, + lru::LruCache, + solana_client::rpc_response::SlotUpdate, + solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT}, + solana_ledger::{ + shred::{get_shred_slot_index_type, ShredFetchStats}, + {blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache}, + }, + solana_measure::measure::Measure, + solana_metrics::inc_new_counter_error, + solana_perf::packet::{Packet, Packets}, + solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}, + solana_runtime::{bank::Bank, bank_forks::BankForks}, + solana_sdk::{ + clock::{Epoch, Slot}, + epoch_schedule::EpochSchedule, + pubkey::Pubkey, + timing::{timestamp, AtomicInterval}, + }, + solana_streamer::streamer::PacketReceiver, + std::{ + collections::{ + hash_set::HashSet, + {BTreeMap, BTreeSet, HashMap}, + }, + net::UdpSocket, + ops::DerefMut, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + mpsc::{channel, RecvTimeoutError}, + Arc, Mutex, RwLock, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, + }, }; const MAX_DUPLICATE_COUNT: usize = 2; @@ -209,7 +216,6 @@ fn update_retransmit_stats( } } -use crate::packet_hasher::PacketHasher; // Map of shred (slot, index, is_data) => list of hash values seen for that key. pub type ShredFilter = LruCache<(Slot, u32, bool), Vec>; @@ -269,6 +275,38 @@ fn check_if_first_shred_received( } } +fn maybe_update_peers_cache( + cluster_nodes: &RwLock>, + shreds_received: &Mutex, + last_peer_update: &AtomicU64, + cluster_info: &ClusterInfo, + bank_epoch: Epoch, + working_bank: &Bank, +) { + const UPDATE_INTERVAL_MS: u64 = 1000; + if timestamp().saturating_sub(last_peer_update.load(Ordering::Acquire)) < UPDATE_INTERVAL_MS { + return; + } + { + // Write-lock cluster-nodes here so that only one thread does the + // computations to update peers. + let mut cluster_nodes = cluster_nodes.write().unwrap(); + let now = timestamp(); + if now.saturating_sub(last_peer_update.load(Ordering::Acquire)) < UPDATE_INTERVAL_MS { + return; // Some other thread has already done the update. + } + let epoch_staked_nodes = working_bank + .epoch_staked_nodes(bank_epoch) + .unwrap_or_default(); + *cluster_nodes = ClusterNodes::::new(cluster_info, &epoch_staked_nodes); + last_peer_update.store(now, Ordering::Release); + } + let mut shreds_received = shreds_received.lock().unwrap(); + let (cache, hasher) = shreds_received.deref_mut(); + cache.clear(); + hasher.reset(); +} + #[allow(clippy::too_many_arguments)] fn retransmit( bank_forks: &RwLock, @@ -279,7 +317,7 @@ fn retransmit( id: u32, stats: &RetransmitStats, cluster_nodes: &RwLock>, - last_peer_update: &AtomicInterval, + last_peer_update: &AtomicU64, shreds_received: &Mutex, max_slots: &MaxSlots, first_shreds_received: &Mutex>, @@ -301,26 +339,22 @@ fn retransmit( drop(r_lock); let mut epoch_fetch = Measure::start("retransmit_epoch_fetch"); - let (r_bank, root_bank) = { + let (working_bank, root_bank) = { let bank_forks = bank_forks.read().unwrap(); (bank_forks.working_bank(), bank_forks.root_bank()) }; - let bank_epoch = r_bank.get_leader_schedule_epoch(r_bank.slot()); + let bank_epoch = working_bank.get_leader_schedule_epoch(working_bank.slot()); epoch_fetch.stop(); let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update"); - if last_peer_update.should_update_ext(1000, false) { - let epoch_staked_nodes = r_bank.epoch_staked_nodes(bank_epoch); - *cluster_nodes.write().unwrap() = ClusterNodes::::new( - cluster_info, - &epoch_staked_nodes.unwrap_or_default(), - ); - { - let mut sr = shreds_received.lock().unwrap(); - sr.0.clear(); - sr.1.reset(); - } - } + maybe_update_peers_cache( + cluster_nodes, + shreds_received, + last_peer_update, + cluster_info, + bank_epoch, + &working_bank, + ); let cluster_nodes = cluster_nodes.read().unwrap(); let mut peers_len = 0; epoch_cache_update.stop(); @@ -363,7 +397,7 @@ fn retransmit( } let mut compute_turbine_peers = Measure::start("turbine_start"); - let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(r_bank.deref())); + let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)); let (neighbors, children) = cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader); // If the node is on the critical path (i.e. the first node in each @@ -451,9 +485,11 @@ pub fn retransmitter( leader_schedule_cache: &Arc, cluster_info: Arc, r: Arc>, - max_slots: &Arc, + max_slots: Arc, rpc_subscriptions: Option>, ) -> Vec> { + let cluster_nodes = Arc::default(); + let last_peer_update = Arc::default(); let stats = Arc::new(RetransmitStats::default()); let shreds_received = Arc::new(Mutex::new(( LruCache::new(DEFAULT_LRU_SIZE), @@ -468,8 +504,8 @@ pub fn retransmitter( let r = r.clone(); let cluster_info = cluster_info.clone(); let stats = stats.clone(); - let cluster_nodes = Arc::default(); - let last_peer_update = Arc::new(AtomicInterval::default()); + let cluster_nodes = Arc::clone(&cluster_nodes); + let last_peer_update = Arc::clone(&last_peer_update); let shreds_received = shreds_received.clone(); let max_slots = max_slots.clone(); let first_shreds_received = first_shreds_received.clone(); @@ -552,7 +588,7 @@ impl RetransmitStage { leader_schedule_cache, cluster_info.clone(), retransmit_receiver, - max_slots, + Arc::clone(max_slots), rpc_subscriptions, ); @@ -686,7 +722,7 @@ mod tests { &leader_schedule_cache, cluster_info, Arc::new(Mutex::new(retransmit_receiver)), - &Arc::new(MaxSlots::default()), + Arc::default(), // MaxSlots None, );