From aa2098d11503931f357b8d299b0590d5aec07c16 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 2 Sep 2021 11:05:15 +0000 Subject: [PATCH] Write helper for multithread update (#18808) (#19282) Co-authored-by: sakridge --- core/benches/cluster_info.rs | 14 +++--- core/src/banking_stage.rs | 18 ++----- core/src/broadcast_stage.rs | 14 ++---- .../fail_entry_verification_broadcast_run.rs | 2 +- .../broadcast_stage/standard_broadcast_run.rs | 23 +++++---- core/src/retransmit_stage.rs | 28 +++++------ runtime/src/accounts_db.rs | 47 +++---------------- runtime/src/secondary_index.rs | 16 ++----- sdk/src/timing.rs | 37 +++++++++++++++ 9 files changed, 85 insertions(+), 114 deletions(-) diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index c366599848..c7a83021ef 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -12,13 +12,13 @@ use solana_gossip::{ contact_info::ContactInfo, }; use solana_ledger::shred::Shred; -use solana_sdk::{pubkey, signature::Keypair, timing::timestamp}; -use solana_streamer::socket::SocketAddrSpace; -use std::{ - collections::HashMap, - net::UdpSocket, - sync::{atomic::AtomicU64, Arc}, +use solana_sdk::{ + pubkey, + signature::Keypair, + timing::{timestamp, AtomicInterval}, }; +use solana_streamer::socket::SocketAddrSpace; +use std::{collections::HashMap, net::UdpSocket, sync::Arc}; use test::Bencher; #[bench] @@ -46,7 +46,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { let cluster_info = Arc::new(cluster_info); let cluster_nodes = ClusterNodes::::new(&cluster_info, &stakes); let shreds = Arc::new(shreds); - let last_datapoint = Arc::new(AtomicU64::new(0)); + let last_datapoint = Arc::new(AtomicInterval::default()); bencher.iter(move || { let shreds = shreds.clone(); broadcast_shreds( diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 386d41024e..3215bc67f5 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -36,7 +36,7 @@ use solana_sdk::{ pubkey::Pubkey, short_vec::decode_shortu16_len, signature::Signature, - timing::{duration_as_ms, timestamp}, + timing::{duration_as_ms, timestamp, AtomicInterval}, transaction::{self, Transaction, TransactionError}, }; use solana_transaction_status::token_balances::{ @@ -78,7 +78,7 @@ const DEFAULT_LRU_SIZE: usize = 200_000; #[derive(Debug, Default)] pub struct BankingStageStats { - last_report: AtomicU64, + last_report: AtomicInterval, id: u32, process_packets_count: AtomicUsize, new_tx_count: AtomicUsize, @@ -107,19 +107,7 @@ impl BankingStageStats { } fn report(&self, report_interval_ms: u64) { - let should_report = { - let last = self.last_report.load(Ordering::Relaxed); - let now = solana_sdk::timing::timestamp(); - now.saturating_sub(last) > report_interval_ms - && self.last_report.compare_exchange( - last, - now, - Ordering::Relaxed, - Ordering::Relaxed, - ) == Ok(last) - }; - - if should_report { + if self.last_report.should_update(report_interval_ms) { datapoint_info!( "banking_stage-loop-stats", ("id", self.id as i64, i64), diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 668ebee6c6..a57b02ccd2 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -20,10 +20,9 @@ use solana_measure::measure::Measure; use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; use solana_poh::poh_recorder::WorkingBankEntry; use solana_runtime::bank::Bank; -use solana_sdk::timing::timestamp; +use solana_sdk::timing::{timestamp, AtomicInterval}; use solana_sdk::{clock::Slot, pubkey::Pubkey}; use solana_streamer::{sendmmsg::send_mmsg, socket::SocketAddrSpace}; -use std::sync::atomic::AtomicU64; use std::{ collections::HashMap, net::UdpSocket, @@ -362,14 +361,9 @@ impl BroadcastStage { fn update_peer_stats( num_live_peers: i64, broadcast_len: i64, - last_datapoint_submit: &Arc, + last_datapoint_submit: &Arc, ) { - let now = timestamp(); - let last = last_datapoint_submit.load(Ordering::Relaxed); - #[allow(deprecated)] - if now.saturating_sub(last) > 1000 - && last_datapoint_submit.compare_and_swap(last, now, Ordering::Relaxed) == last - { + if last_datapoint_submit.should_update(1000) { datapoint_info!( "cluster_info-num_nodes", ("live_count", num_live_peers, i64), @@ -384,7 +378,7 @@ pub fn broadcast_shreds( s: &UdpSocket, shreds: &[Shred], cluster_nodes: &ClusterNodes, - last_datapoint_submit: &Arc, + last_datapoint_submit: &Arc, transmit_stats: &mut TransmitShredsStats, socket_addr_space: &SocketAddrSpace, ) -> Result<()> { diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 4fb06e416c..79a7637bd8 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -143,7 +143,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { sock, &shreds, &cluster_nodes, - &Arc::new(AtomicU64::new(0)), + &Arc::new(AtomicInterval::default()), &mut TransmitShredsStats::default(), cluster_info.socket_addr_space(), )?; diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 27caca8eab..ecafc69d75 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -12,7 +12,11 @@ use solana_ledger::{ SHRED_TICK_REFERENCE_MASK, }, }; -use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us}; +use solana_sdk::{ + pubkey::Pubkey, + signature::Keypair, + timing::{duration_as_us, AtomicInterval}, +}; use std::{collections::HashMap, ops::Deref, sync::RwLock, time::Duration}; #[derive(Clone)] @@ -25,10 +29,10 @@ pub struct StandardBroadcastRun { slot_broadcast_start: Option, keypair: Arc, shred_version: u16, - last_datapoint_submit: Arc, + last_datapoint_submit: Arc, num_batches: usize, cluster_nodes: Arc>>, - last_peer_update: Arc, + last_peer_update: Arc, } impl StandardBroadcastRun { @@ -45,7 +49,7 @@ impl StandardBroadcastRun { last_datapoint_submit: Arc::default(), num_batches: 0, cluster_nodes: Arc::default(), - last_peer_update: Arc::default(), + last_peer_update: Arc::new(AtomicInterval::default()), } } @@ -339,14 +343,9 @@ impl StandardBroadcastRun { trace!("Broadcasting {:?} shreds", shreds.len()); // Get the list of peers to broadcast to let mut get_peers_time = Measure::start("broadcast::get_peers"); - let now = timestamp(); - let last = self.last_peer_update.load(Ordering::Relaxed); - #[allow(deprecated)] - if now - last > BROADCAST_PEER_UPDATE_INTERVAL_MS - && self - .last_peer_update - .compare_and_swap(last, now, Ordering::Relaxed) - == last + if self + .last_peer_update + .should_update_ext(BROADCAST_PEER_UPDATE_INTERVAL_MS, false) { *self.cluster_nodes.write().unwrap() = ClusterNodes::::new( cluster_info, diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 50f837e1c1..2f8011afb0 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -28,7 +28,12 @@ use solana_rpc::{ rpc_subscriptions::RpcSubscriptions, }; use solana_runtime::{bank::Bank, bank_forks::BankForks}; -use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}; +use solana_sdk::{ + clock::Slot, + epoch_schedule::EpochSchedule, + pubkey::Pubkey, + timing::{timestamp, AtomicInterval}, +}; use solana_streamer::streamer::PacketReceiver; use std::{ collections::hash_set::HashSet, @@ -61,7 +66,7 @@ struct RetransmitStats { repair_total: AtomicU64, discard_total: AtomicU64, retransmit_total: AtomicU64, - last_ts: AtomicU64, + last_ts: AtomicInterval, compute_turbine_peers_total: AtomicU64, retransmit_tree_mismatch: AtomicU64, packets_by_slot: Mutex>, @@ -121,12 +126,7 @@ fn update_retransmit_stats( } } - let now = timestamp(); - let last = stats.last_ts.load(Ordering::Relaxed); - #[allow(deprecated)] - if now.saturating_sub(last) > 2000 - && stats.last_ts.compare_and_swap(last, now, Ordering::Relaxed) == last - { + if stats.last_ts.should_update(2000) { datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64)); datapoint_info!( "retransmit-stage", @@ -284,7 +284,7 @@ fn retransmit( id: u32, stats: &RetransmitStats, cluster_nodes: &RwLock>, - last_peer_update: &AtomicU64, + last_peer_update: &AtomicInterval, shreds_received: &Mutex, max_slots: &MaxSlots, first_shreds_received: &Mutex>, @@ -314,12 +314,7 @@ fn retransmit( epoch_fetch.stop(); let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update"); - let now = timestamp(); - let last = last_peer_update.load(Ordering::Relaxed); - #[allow(deprecated)] - if now.saturating_sub(last) > 1000 - && last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last - { + if last_peer_update.should_update_ext(1000, false) { let epoch_staked_nodes = r_bank.epoch_staked_nodes(bank_epoch); *cluster_nodes.write().unwrap() = ClusterNodes::::new( cluster_info, @@ -478,7 +473,7 @@ pub fn retransmitter( let cluster_info = cluster_info.clone(); let stats = stats.clone(); let cluster_nodes = Arc::default(); - let last_peer_update = Arc::new(AtomicU64::new(0)); + let last_peer_update = Arc::new(AtomicInterval::default()); let shreds_received = shreds_received.clone(); let max_slots = max_slots.clone(); let first_shreds_received = first_shreds_received.clone(); @@ -671,6 +666,7 @@ mod tests { let me_retransmit = UdpSocket::bind(format!("127.0.0.1:{}", port)).unwrap(); // need to make sure tvu and tpu are valid addresses me.tvu_forwards = me_retransmit.local_addr().unwrap(); + let port = find_available_port_in_range(ip_addr, (8000, 10000)).unwrap(); me.tvu = UdpSocket::bind(format!("127.0.0.1:{}", port)) .unwrap() diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 12e646c2b4..2a3e9a49dc 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -52,6 +52,7 @@ use solana_sdk::{ genesis_config::ClusterType, hash::{Hash, Hasher}, pubkey::Pubkey, + timing::AtomicInterval, }; use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; use std::{ @@ -960,7 +961,7 @@ struct AccountsStats { delta_hash_accumulate_time_total_us: AtomicU64, delta_hash_num: AtomicU64, - last_store_report: AtomicU64, + last_store_report: AtomicInterval, store_hash_accounts: AtomicU64, calc_stored_meta: AtomicU64, store_accounts: AtomicU64, @@ -980,7 +981,7 @@ struct AccountsStats { #[derive(Debug, Default)] struct PurgeStats { - last_report: AtomicU64, + last_report: AtomicInterval, safety_checks_elapsed: AtomicU64, remove_cache_elapsed: AtomicU64, remove_storage_entries_elapsed: AtomicU64, @@ -999,18 +1000,7 @@ struct PurgeStats { impl PurgeStats { fn report(&self, metric_name: &'static str, report_interval_ms: Option) { let should_report = report_interval_ms - .map(|report_interval_ms| { - let last = self.last_report.load(Ordering::Relaxed); - let now = solana_sdk::timing::timestamp(); - now.saturating_sub(last) > report_interval_ms - && self.last_report.compare_exchange( - last, - now, - Ordering::Relaxed, - Ordering::Relaxed, - ) == Ok(last) - && last != 0 - }) + .map(|report_interval_ms| self.last_report.should_update(report_interval_ms)) .unwrap_or(true); if should_report { @@ -1184,7 +1174,7 @@ impl CleanAccountsStats { #[derive(Debug, Default)] struct ShrinkStats { - last_report: AtomicU64, + last_report: AtomicInterval, num_slots_shrunk: AtomicUsize, storage_read_elapsed: AtomicU64, index_read_elapsed: AtomicU64, @@ -1205,20 +1195,7 @@ struct ShrinkStats { impl ShrinkStats { fn report(&self) { - let last = self.last_report.load(Ordering::Relaxed); - let now = solana_sdk::timing::timestamp(); - - // last is initialized to 0 by ::default() - // thus, the first 'report' call would always log. - // Instead, the first call now initialializes 'last_report' to now. - let is_first_call = last == 0; - let should_report = now.saturating_sub(last) > 1000 - && self - .last_report - .compare_exchange(last, now, Ordering::Relaxed, Ordering::Relaxed) - == Ok(last); - - if !is_first_call && should_report { + if self.last_report.should_update(1000) { datapoint_info!( "shrink_stats", ( @@ -5522,17 +5499,7 @@ impl AccountsDb { } fn report_store_timings(&self) { - let last = self.stats.last_store_report.load(Ordering::Relaxed); - let now = solana_sdk::timing::timestamp(); - - if now.saturating_sub(last) > 1000 - && self.stats.last_store_report.compare_exchange( - last, - now, - Ordering::Relaxed, - Ordering::Relaxed, - ) == Ok(last) - { + if self.stats.last_store_report.should_update(1000) { let (read_only_cache_hits, read_only_cache_misses) = self.read_only_accounts_cache.get_and_reset_stats(); datapoint_info!( diff --git a/runtime/src/secondary_index.rs b/runtime/src/secondary_index.rs index 37199f2668..6b0e045423 100644 --- a/runtime/src/secondary_index.rs +++ b/runtime/src/secondary_index.rs @@ -1,5 +1,5 @@ use dashmap::{mapref::entry::Entry::Occupied, DashMap}; -use solana_sdk::pubkey::Pubkey; +use solana_sdk::{pubkey::Pubkey, timing::AtomicInterval}; use std::{ collections::HashSet, fmt::Debug, @@ -26,7 +26,7 @@ pub trait SecondaryIndexEntry: Debug { #[derive(Debug, Default)] pub struct SecondaryIndexStats { - last_report: AtomicU64, + last_report: AtomicInterval, num_inner_keys: AtomicU64, } @@ -142,17 +142,7 @@ impl } } - let now = solana_sdk::timing::timestamp(); - let last = self.stats.last_report.load(Ordering::Relaxed); - let should_report = now.saturating_sub(last) > 1000 - && self.stats.last_report.compare_exchange( - last, - now, - Ordering::Relaxed, - Ordering::Relaxed, - ) == Ok(last); - - if should_report { + if self.stats.last_report.should_update(1000) { datapoint_info!( self.metrics_name, ("num_secondary_keys", self.index.len() as i64, i64), diff --git a/sdk/src/timing.rs b/sdk/src/timing.rs index 1edc1de325..f415848a49 100644 --- a/sdk/src/timing.rs +++ b/sdk/src/timing.rs @@ -1,5 +1,6 @@ //! The `timing` module provides std::time utility functions. use crate::unchecked_div_by_const; +use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; pub fn duration_as_ns(d: &Duration) -> u64 { @@ -59,10 +60,46 @@ pub fn slot_duration_from_slots_per_year(slots_per_year: f64) -> Duration { Duration::from_nanos(slot_in_ns as u64) } +#[derive(Debug, Default)] +pub struct AtomicInterval { + last_update: AtomicU64, +} + +impl AtomicInterval { + pub fn should_update(&self, interval_time: u64) -> bool { + self.should_update_ext(interval_time, true) + } + + pub fn should_update_ext(&self, interval_time: u64, skip_first: bool) -> bool { + let now = timestamp(); + let last = self.last_update.load(Ordering::Relaxed); + now.saturating_sub(last) > interval_time + && self + .last_update + .compare_exchange(last, now, Ordering::Relaxed, Ordering::Relaxed) + == Ok(last) + && !(skip_first && last == 0) + } +} + #[cfg(test)] mod test { use super::*; + #[test] + fn test_interval_update() { + solana_logger::setup(); + let i = AtomicInterval::default(); + assert!(!i.should_update(1000)); + + let i = AtomicInterval::default(); + assert!(i.should_update_ext(1000, false)); + + std::thread::sleep(Duration::from_millis(10)); + assert!(i.should_update(9)); + assert!(!i.should_update(100)); + } + #[test] #[allow(clippy::float_cmp)] fn test_years_as_slots() {