From 8f0796436a5ed1efa0fab7b30709a969b62e5b83 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 5 Nov 2020 15:42:00 +0000 Subject: [PATCH] shares the lock on gossip when processing prune messages (#13339) Processing prune messages acquires an exclusive lock on gossip: https://github.com/solana-labs/solana/blob/55b0428ff/core/src/cluster_info.rs#L1824-L1825 This can be reduced to a shared lock if active-sets are changed to use atomic bloom filters: https://github.com/solana-labs/solana/blob/55b0428ff/core/src/crds_gossip_push.rs#L50 --- core/src/cluster_info.rs | 5 ++- core/src/crds_gossip.rs | 13 ++++++-- core/src/crds_gossip_push.rs | 28 ++++++++++++----- core/tests/crds_gossip.rs | 2 +- runtime/benches/bloom.rs | 1 - runtime/src/bloom.rs | 59 +++++++++++++++++++++++++++++++----- 6 files changed, 86 insertions(+), 22 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 43a7a78575..990528b6dd 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -539,7 +539,7 @@ impl ClusterInfo { // Should only be used by tests and simulations pub fn clone_with_id(&self, new_id: &Pubkey) -> Self { - let mut gossip = self.gossip.read().unwrap().clone(); + let mut gossip = self.gossip.read().unwrap().mock_clone(); gossip.id = *new_id; let mut my_contact_info = self.my_contact_info.read().unwrap().clone(); my_contact_info.id = *new_id; @@ -1856,8 +1856,7 @@ impl ClusterInfo { let mut prune_message_timeout = 0; let mut bad_prune_destination = 0; { - let mut gossip = - self.time_gossip_write_lock("process_prune", &self.stats.process_prune); + let gossip = self.time_gossip_read_lock("process_prune", &self.stats.process_prune); let now = timestamp(); for (from, data) in messages { match gossip.process_prune_msg( diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index d0a437876b..a04068d1e4 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -17,7 +17,6 @@ use std::collections::{HashMap, HashSet}; ///The min size for bloom filters pub const CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS: usize = 500; -#[derive(Clone)] pub struct CrdsGossip { pub crds: Crds, pub id: Pubkey, @@ -108,7 +107,7 @@ impl CrdsGossip { /// add the `from` to the peer's filter of nodes pub fn process_prune_msg( - &mut self, + &self, peer: &Pubkey, destination: &Pubkey, origin: &[Pubkey], @@ -266,6 +265,16 @@ impl CrdsGossip { self.pull.purge_failed_inserts(now); rv } + + // Only for tests and simulations. + pub(crate) fn mock_clone(&self) -> Self { + Self { + crds: self.crds.clone(), + push: self.push.mock_clone(), + pull: self.pull.clone(), + ..*self + } + } } /// Computes a normalized(log of actual stake) stake diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 958fcbb16d..c18d156183 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -20,7 +20,7 @@ use bincode::serialized_size; use indexmap::map::IndexMap; use itertools::Itertools; use rand::{seq::SliceRandom, Rng}; -use solana_runtime::bloom::Bloom; +use solana_runtime::bloom::{AtomicBloom, Bloom}; use solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp}; use std::{ cmp, @@ -42,12 +42,11 @@ const PUSH_ACTIVE_TIMEOUT_MS: u64 = 60_000; // 10 minutes const MAX_PUSHED_TO_TIMEOUT_MS: u64 = 10 * 60 * 1000; -#[derive(Clone)] pub struct CrdsGossipPush { /// max bytes per message pub max_bytes: usize, /// active set of validators for push - active_set: IndexMap>, + active_set: IndexMap>, /// push message queue push_messages: HashMap, /// Cache that tracks which validators a message was received from @@ -287,11 +286,11 @@ impl CrdsGossipPush { } /// add the `from` to the peer's filter of nodes - pub fn process_prune_msg(&mut self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) { - if let Some(peer) = self.active_set.get_mut(peer) { + pub fn process_prune_msg(&self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) { + if let Some(filter) = self.active_set.get(peer) { for origin in origins { if origin != self_pubkey { - peer.add(origin); + filter.add(origin); } } } @@ -348,7 +347,7 @@ impl CrdsGossipPush { continue; } let size = cmp::max(CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, network_size); - let mut bloom = Bloom::random(size, 0.1, 1024 * 8 * 4); + let bloom: AtomicBloom<_> = Bloom::random(size, 0.1, 1024 * 8 * 4).into(); bloom.add(&item.id); new_items.insert(item.id, bloom); } @@ -427,6 +426,21 @@ impl CrdsGossipPush { !v.is_empty() }); } + + // Only for tests and simulations. + pub(crate) fn mock_clone(&self) -> Self { + let mut active_set = IndexMap::>::new(); + for (k, v) in &self.active_set { + active_set.insert(*k, v.mock_clone()); + } + Self { + active_set, + push_messages: self.push_messages.clone(), + received_cache: self.received_cache.clone(), + last_pushed_to: self.last_pushed_to.clone(), + ..*self + } + } } #[cfg(test)] diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index c1011f6ef7..96df3679db 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -346,7 +346,7 @@ fn network_run_push( network .get(&from) .map(|node| { - let mut node = node.lock().unwrap(); + let node = node.lock().unwrap(); let destination = node.id; let now = timestamp(); node.process_prune_msg(&to, &destination, &prune_keys, now, now) diff --git a/runtime/benches/bloom.rs b/runtime/benches/bloom.rs index a6f06e0c1f..a1b953bd4f 100644 --- a/runtime/benches/bloom.rs +++ b/runtime/benches/bloom.rs @@ -135,7 +135,6 @@ fn bench_add_hash_atomic(bencher: &mut Bencher) { for hash_value in &hash_values { bloom.add(hash_value); } - let bloom: Bloom<_> = bloom.into(); let index = rng.gen_range(0, hash_values.len()); if !bloom.contains(&hash_values[index]) { fail += 1; diff --git a/runtime/src/bloom.rs b/runtime/src/bloom.rs index 4efea1f600..7f1db09e11 100644 --- a/runtime/src/bloom.rs +++ b/runtime/src/bloom.rs @@ -146,16 +146,42 @@ impl From> for AtomicBloom { } impl AtomicBloom { + fn pos(&self, key: &T, hash_index: u64) -> (usize, u64) { + let pos = key.hash_at_index(hash_index) % self.num_bits; + // Divide by 64 to figure out which of the + // AtomicU64 bit chunks we need to modify. + let index = pos >> 6; + // (pos & 63) is equivalent to mod 64 so that we can find + // the index of the bit within the AtomicU64 to modify. + let mask = 1u64 << (pos & 63); + (index as usize, mask) + } + pub fn add(&self, key: &T) { for k in &self.keys { - let pos = key.hash_at_index(*k) % self.num_bits; - // Divide by 64 to figure out which of the - // AtomicU64 bit chunks we need to modify. - let index = pos >> 6; - // (pos & 63) is equivalent to mod 64 so that we can find - // the index of the bit within the AtomicU64 to modify. - let bit = 1u64 << (pos & 63); - self.bits[index as usize].fetch_or(bit, Ordering::Relaxed); + let (index, mask) = self.pos(key, *k); + self.bits[index].fetch_or(mask, Ordering::Relaxed); + } + } + + pub fn contains(&self, key: &T) -> bool { + self.keys.iter().all(|k| { + let (index, mask) = self.pos(key, *k); + let bit = self.bits[index].load(Ordering::Relaxed) & mask; + bit != 0u64 + }) + } + + // Only for tests and simulations. + pub fn mock_clone(&self) -> Self { + Self { + keys: self.keys.clone(), + bits: self + .bits + .iter() + .map(|v| AtomicU64::new(v.load(Ordering::Relaxed))) + .collect(), + ..*self } } } @@ -303,6 +329,9 @@ mod test { let bloom: AtomicBloom<_> = bloom.into(); assert_eq!(bloom.num_bits, 9731); assert_eq!(bloom.bits.len(), (9731 + 63) / 64); + for hash_value in &hash_values { + assert!(bloom.contains(hash_value)); + } let bloom: Bloom<_> = bloom.into(); assert_eq!(bloom.num_bits_set, num_bits_set); for hash_value in &hash_values { @@ -311,6 +340,9 @@ mod test { // Round trip, re-inserting the same hash values. let bloom: AtomicBloom<_> = bloom.into(); hash_values.par_iter().for_each(|v| bloom.add(v)); + for hash_value in &hash_values { + assert!(bloom.contains(hash_value)); + } let bloom: Bloom<_> = bloom.into(); assert_eq!(bloom.num_bits_set, num_bits_set); assert_eq!(bloom.bits.len(), 9731); @@ -326,6 +358,17 @@ mod test { assert_eq!(bloom.num_bits, 9731); assert_eq!(bloom.bits.len(), (9731 + 63) / 64); more_hash_values.par_iter().for_each(|v| bloom.add(v)); + for hash_value in &hash_values { + assert!(bloom.contains(hash_value)); + } + for hash_value in &more_hash_values { + assert!(bloom.contains(hash_value)); + } + let false_positive = std::iter::repeat_with(|| solana_sdk::hash::new_rand(&mut rng)) + .take(10_000) + .filter(|hash_value| bloom.contains(hash_value)) + .count(); + assert!(false_positive < 2000, "false_positive: {}", false_positive); let bloom: Bloom<_> = bloom.into(); assert_eq!(bloom.bits.len(), 9731); assert!(bloom.num_bits_set > num_bits_set);