Processing prune messages acquires an exclusive lock on gossip:
https://github.com/solana-labs/solana/blob/55b0428ff/core/src/cluster_info.rs#L1824-L1825
This can be reduced to a shared lock if active-sets are changed to use
atomic bloom filters:
https://github.com/solana-labs/solana/blob/55b0428ff/core/src/crds_gossip_push.rs#L50
(cherry picked from commit 8f0796436a)
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
			
			
This commit is contained in:
		| @@ -539,7 +539,7 @@ impl ClusterInfo { | |||||||
|  |  | ||||||
|     // Should only be used by tests and simulations |     // Should only be used by tests and simulations | ||||||
|     pub fn clone_with_id(&self, new_id: &Pubkey) -> Self { |     pub fn clone_with_id(&self, new_id: &Pubkey) -> Self { | ||||||
|         let mut gossip = self.gossip.read().unwrap().clone(); |         let mut gossip = self.gossip.read().unwrap().mock_clone(); | ||||||
|         gossip.id = *new_id; |         gossip.id = *new_id; | ||||||
|         let mut my_contact_info = self.my_contact_info.read().unwrap().clone(); |         let mut my_contact_info = self.my_contact_info.read().unwrap().clone(); | ||||||
|         my_contact_info.id = *new_id; |         my_contact_info.id = *new_id; | ||||||
| @@ -1856,8 +1856,7 @@ impl ClusterInfo { | |||||||
|         let mut prune_message_timeout = 0; |         let mut prune_message_timeout = 0; | ||||||
|         let mut bad_prune_destination = 0; |         let mut bad_prune_destination = 0; | ||||||
|         { |         { | ||||||
|             let mut gossip = |             let gossip = self.time_gossip_read_lock("process_prune", &self.stats.process_prune); | ||||||
|                 self.time_gossip_write_lock("process_prune", &self.stats.process_prune); |  | ||||||
|             let now = timestamp(); |             let now = timestamp(); | ||||||
|             for (from, data) in messages { |             for (from, data) in messages { | ||||||
|                 match gossip.process_prune_msg( |                 match gossip.process_prune_msg( | ||||||
|   | |||||||
| @@ -17,7 +17,6 @@ use std::collections::{HashMap, HashSet}; | |||||||
| ///The min size for bloom filters | ///The min size for bloom filters | ||||||
| pub const CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS: usize = 500; | pub const CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS: usize = 500; | ||||||
|  |  | ||||||
| #[derive(Clone)] |  | ||||||
| pub struct CrdsGossip { | pub struct CrdsGossip { | ||||||
|     pub crds: Crds, |     pub crds: Crds, | ||||||
|     pub id: Pubkey, |     pub id: Pubkey, | ||||||
| @@ -108,7 +107,7 @@ impl CrdsGossip { | |||||||
|  |  | ||||||
|     /// add the `from` to the peer's filter of nodes |     /// add the `from` to the peer's filter of nodes | ||||||
|     pub fn process_prune_msg( |     pub fn process_prune_msg( | ||||||
|         &mut self, |         &self, | ||||||
|         peer: &Pubkey, |         peer: &Pubkey, | ||||||
|         destination: &Pubkey, |         destination: &Pubkey, | ||||||
|         origin: &[Pubkey], |         origin: &[Pubkey], | ||||||
| @@ -266,6 +265,16 @@ impl CrdsGossip { | |||||||
|         self.pull.purge_failed_inserts(now); |         self.pull.purge_failed_inserts(now); | ||||||
|         rv |         rv | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     // Only for tests and simulations. | ||||||
|  |     pub(crate) fn mock_clone(&self) -> Self { | ||||||
|  |         Self { | ||||||
|  |             crds: self.crds.clone(), | ||||||
|  |             push: self.push.mock_clone(), | ||||||
|  |             pull: self.pull.clone(), | ||||||
|  |             ..*self | ||||||
|  |         } | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| /// Computes a normalized(log of actual stake) stake | /// Computes a normalized(log of actual stake) stake | ||||||
|   | |||||||
| @@ -20,7 +20,7 @@ use bincode::serialized_size; | |||||||
| use indexmap::map::IndexMap; | use indexmap::map::IndexMap; | ||||||
| use itertools::Itertools; | use itertools::Itertools; | ||||||
| use rand::{seq::SliceRandom, Rng}; | use rand::{seq::SliceRandom, Rng}; | ||||||
| use solana_runtime::bloom::Bloom; | use solana_runtime::bloom::{AtomicBloom, Bloom}; | ||||||
| use solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp}; | use solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp}; | ||||||
| use std::{ | use std::{ | ||||||
|     cmp, |     cmp, | ||||||
| @@ -42,12 +42,11 @@ const PUSH_ACTIVE_TIMEOUT_MS: u64 = 60_000; | |||||||
| // 10 minutes | // 10 minutes | ||||||
| const MAX_PUSHED_TO_TIMEOUT_MS: u64 = 10 * 60 * 1000; | const MAX_PUSHED_TO_TIMEOUT_MS: u64 = 10 * 60 * 1000; | ||||||
|  |  | ||||||
| #[derive(Clone)] |  | ||||||
| pub struct CrdsGossipPush { | pub struct CrdsGossipPush { | ||||||
|     /// max bytes per message |     /// max bytes per message | ||||||
|     pub max_bytes: usize, |     pub max_bytes: usize, | ||||||
|     /// active set of validators for push |     /// active set of validators for push | ||||||
|     active_set: IndexMap<Pubkey, Bloom<Pubkey>>, |     active_set: IndexMap<Pubkey, AtomicBloom<Pubkey>>, | ||||||
|     /// push message queue |     /// push message queue | ||||||
|     push_messages: HashMap<CrdsValueLabel, Hash>, |     push_messages: HashMap<CrdsValueLabel, Hash>, | ||||||
|     /// Cache that tracks which validators a message was received from |     /// Cache that tracks which validators a message was received from | ||||||
| @@ -287,11 +286,11 @@ impl CrdsGossipPush { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// add the `from` to the peer's filter of nodes |     /// add the `from` to the peer's filter of nodes | ||||||
|     pub fn process_prune_msg(&mut self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) { |     pub fn process_prune_msg(&self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) { | ||||||
|         if let Some(peer) = self.active_set.get_mut(peer) { |         if let Some(filter) = self.active_set.get(peer) { | ||||||
|             for origin in origins { |             for origin in origins { | ||||||
|                 if origin != self_pubkey { |                 if origin != self_pubkey { | ||||||
|                     peer.add(origin); |                     filter.add(origin); | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
| @@ -348,7 +347,7 @@ impl CrdsGossipPush { | |||||||
|                         continue; |                         continue; | ||||||
|                     } |                     } | ||||||
|                     let size = cmp::max(CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, network_size); |                     let size = cmp::max(CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, network_size); | ||||||
|                     let mut bloom = Bloom::random(size, 0.1, 1024 * 8 * 4); |                     let bloom: AtomicBloom<_> = Bloom::random(size, 0.1, 1024 * 8 * 4).into(); | ||||||
|                     bloom.add(&item.id); |                     bloom.add(&item.id); | ||||||
|                     new_items.insert(item.id, bloom); |                     new_items.insert(item.id, bloom); | ||||||
|                 } |                 } | ||||||
| @@ -427,6 +426,21 @@ impl CrdsGossipPush { | |||||||
|             !v.is_empty() |             !v.is_empty() | ||||||
|         }); |         }); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     // Only for tests and simulations. | ||||||
|  |     pub(crate) fn mock_clone(&self) -> Self { | ||||||
|  |         let mut active_set = IndexMap::<Pubkey, AtomicBloom<Pubkey>>::new(); | ||||||
|  |         for (k, v) in &self.active_set { | ||||||
|  |             active_set.insert(*k, v.mock_clone()); | ||||||
|  |         } | ||||||
|  |         Self { | ||||||
|  |             active_set, | ||||||
|  |             push_messages: self.push_messages.clone(), | ||||||
|  |             received_cache: self.received_cache.clone(), | ||||||
|  |             last_pushed_to: self.last_pushed_to.clone(), | ||||||
|  |             ..*self | ||||||
|  |         } | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| #[cfg(test)] | #[cfg(test)] | ||||||
|   | |||||||
| @@ -346,7 +346,7 @@ fn network_run_push( | |||||||
|                         network |                         network | ||||||
|                             .get(&from) |                             .get(&from) | ||||||
|                             .map(|node| { |                             .map(|node| { | ||||||
|                                 let mut node = node.lock().unwrap(); |                                 let node = node.lock().unwrap(); | ||||||
|                                 let destination = node.id; |                                 let destination = node.id; | ||||||
|                                 let now = timestamp(); |                                 let now = timestamp(); | ||||||
|                                 node.process_prune_msg(&to, &destination, &prune_keys, now, now) |                                 node.process_prune_msg(&to, &destination, &prune_keys, now, now) | ||||||
|   | |||||||
| @@ -135,7 +135,6 @@ fn bench_add_hash_atomic(bencher: &mut Bencher) { | |||||||
|         for hash_value in &hash_values { |         for hash_value in &hash_values { | ||||||
|             bloom.add(hash_value); |             bloom.add(hash_value); | ||||||
|         } |         } | ||||||
|         let bloom: Bloom<_> = bloom.into(); |  | ||||||
|         let index = rng.gen_range(0, hash_values.len()); |         let index = rng.gen_range(0, hash_values.len()); | ||||||
|         if !bloom.contains(&hash_values[index]) { |         if !bloom.contains(&hash_values[index]) { | ||||||
|             fail += 1; |             fail += 1; | ||||||
|   | |||||||
| @@ -146,16 +146,42 @@ impl<T: BloomHashIndex> From<Bloom<T>> for AtomicBloom<T> { | |||||||
| } | } | ||||||
|  |  | ||||||
| impl<T: BloomHashIndex> AtomicBloom<T> { | impl<T: BloomHashIndex> AtomicBloom<T> { | ||||||
|     pub fn add(&self, key: &T) { |     fn pos(&self, key: &T, hash_index: u64) -> (usize, u64) { | ||||||
|         for k in &self.keys { |         let pos = key.hash_at_index(hash_index) % self.num_bits; | ||||||
|             let pos = key.hash_at_index(*k) % self.num_bits; |  | ||||||
|         // Divide by 64 to figure out which of the |         // Divide by 64 to figure out which of the | ||||||
|         // AtomicU64 bit chunks we need to modify. |         // AtomicU64 bit chunks we need to modify. | ||||||
|         let index = pos >> 6; |         let index = pos >> 6; | ||||||
|         // (pos & 63) is equivalent to mod 64 so that we can find |         // (pos & 63) is equivalent to mod 64 so that we can find | ||||||
|         // the index of the bit within the AtomicU64 to modify. |         // the index of the bit within the AtomicU64 to modify. | ||||||
|             let bit = 1u64 << (pos & 63); |         let mask = 1u64 << (pos & 63); | ||||||
|             self.bits[index as usize].fetch_or(bit, Ordering::Relaxed); |         (index as usize, mask) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn add(&self, key: &T) { | ||||||
|  |         for k in &self.keys { | ||||||
|  |             let (index, mask) = self.pos(key, *k); | ||||||
|  |             self.bits[index].fetch_or(mask, Ordering::Relaxed); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn contains(&self, key: &T) -> bool { | ||||||
|  |         self.keys.iter().all(|k| { | ||||||
|  |             let (index, mask) = self.pos(key, *k); | ||||||
|  |             let bit = self.bits[index].load(Ordering::Relaxed) & mask; | ||||||
|  |             bit != 0u64 | ||||||
|  |         }) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     // Only for tests and simulations. | ||||||
|  |     pub fn mock_clone(&self) -> Self { | ||||||
|  |         Self { | ||||||
|  |             keys: self.keys.clone(), | ||||||
|  |             bits: self | ||||||
|  |                 .bits | ||||||
|  |                 .iter() | ||||||
|  |                 .map(|v| AtomicU64::new(v.load(Ordering::Relaxed))) | ||||||
|  |                 .collect(), | ||||||
|  |             ..*self | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @@ -303,6 +329,9 @@ mod test { | |||||||
|         let bloom: AtomicBloom<_> = bloom.into(); |         let bloom: AtomicBloom<_> = bloom.into(); | ||||||
|         assert_eq!(bloom.num_bits, 9731); |         assert_eq!(bloom.num_bits, 9731); | ||||||
|         assert_eq!(bloom.bits.len(), (9731 + 63) / 64); |         assert_eq!(bloom.bits.len(), (9731 + 63) / 64); | ||||||
|  |         for hash_value in &hash_values { | ||||||
|  |             assert!(bloom.contains(hash_value)); | ||||||
|  |         } | ||||||
|         let bloom: Bloom<_> = bloom.into(); |         let bloom: Bloom<_> = bloom.into(); | ||||||
|         assert_eq!(bloom.num_bits_set, num_bits_set); |         assert_eq!(bloom.num_bits_set, num_bits_set); | ||||||
|         for hash_value in &hash_values { |         for hash_value in &hash_values { | ||||||
| @@ -311,6 +340,9 @@ mod test { | |||||||
|         // Round trip, re-inserting the same hash values. |         // Round trip, re-inserting the same hash values. | ||||||
|         let bloom: AtomicBloom<_> = bloom.into(); |         let bloom: AtomicBloom<_> = bloom.into(); | ||||||
|         hash_values.par_iter().for_each(|v| bloom.add(v)); |         hash_values.par_iter().for_each(|v| bloom.add(v)); | ||||||
|  |         for hash_value in &hash_values { | ||||||
|  |             assert!(bloom.contains(hash_value)); | ||||||
|  |         } | ||||||
|         let bloom: Bloom<_> = bloom.into(); |         let bloom: Bloom<_> = bloom.into(); | ||||||
|         assert_eq!(bloom.num_bits_set, num_bits_set); |         assert_eq!(bloom.num_bits_set, num_bits_set); | ||||||
|         assert_eq!(bloom.bits.len(), 9731); |         assert_eq!(bloom.bits.len(), 9731); | ||||||
| @@ -326,6 +358,17 @@ mod test { | |||||||
|         assert_eq!(bloom.num_bits, 9731); |         assert_eq!(bloom.num_bits, 9731); | ||||||
|         assert_eq!(bloom.bits.len(), (9731 + 63) / 64); |         assert_eq!(bloom.bits.len(), (9731 + 63) / 64); | ||||||
|         more_hash_values.par_iter().for_each(|v| bloom.add(v)); |         more_hash_values.par_iter().for_each(|v| bloom.add(v)); | ||||||
|  |         for hash_value in &hash_values { | ||||||
|  |             assert!(bloom.contains(hash_value)); | ||||||
|  |         } | ||||||
|  |         for hash_value in &more_hash_values { | ||||||
|  |             assert!(bloom.contains(hash_value)); | ||||||
|  |         } | ||||||
|  |         let false_positive = std::iter::repeat_with(|| solana_sdk::hash::new_rand(&mut rng)) | ||||||
|  |             .take(10_000) | ||||||
|  |             .filter(|hash_value| bloom.contains(hash_value)) | ||||||
|  |             .count(); | ||||||
|  |         assert!(false_positive < 2000, "false_positive: {}", false_positive); | ||||||
|         let bloom: Bloom<_> = bloom.into(); |         let bloom: Bloom<_> = bloom.into(); | ||||||
|         assert_eq!(bloom.bits.len(), 9731); |         assert_eq!(bloom.bits.len(), 9731); | ||||||
|         assert!(bloom.num_bits_set > num_bits_set); |         assert!(bloom.num_bits_set > num_bits_set); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user