From 10fa4f45ab15e0592c15ab8acbcdd0f42394be2b Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 4 Nov 2020 19:15:58 +0000 Subject: [PATCH] uses thread-pool when handling push messages (#13338) From runtime profiles, the majority time of solana-listen thread: https://github.com/solana-labs/solana/blob/55b0428ff/core/src/cluster_info.rs#L2720 is spent handling push messages. The code here: https://github.com/solana-labs/solana/blob/55b0428ff/core/src/cluster_info.rs#L2272-L2364 may utilize the idle gossip thread-pool. --- core/src/cluster_info.rs | 197 ++++++++++++++++++++++----------------- core/src/crds.rs | 6 ++ 2 files changed, 119 insertions(+), 84 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 6f53568506..80cb9abddf 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -68,6 +68,7 @@ use std::{ cmp::min, collections::{hash_map::Entry, HashMap, HashSet}, fmt, + iter::FromIterator, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, ops::{Deref, DerefMut}, sync::atomic::{AtomicBool, AtomicU64, Ordering}, @@ -1606,16 +1607,19 @@ impl ClusterInfo { let (_, push_messages) = self .time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests) .new_push_messages(self.drain_push_queue(), timestamp()); + let push_messages: Vec<_> = { + let gossip = + self.time_gossip_read_lock("push_req_lookup", &self.stats.new_push_requests2); + push_messages + .into_iter() + .filter_map(|(pubkey, messages)| { + let peer = gossip.crds.get_contact_info(&pubkey)?; + Some((peer.gossip, messages)) + }) + .collect() + }; let messages: Vec<_> = push_messages .into_iter() - .filter_map(|(peer, messages)| { - let peer_label = CrdsValueLabel::ContactInfo(peer); - self.time_gossip_read_lock("push_req_lookup", &self.stats.new_push_requests2) - .crds - .lookup(&peer_label) - .and_then(CrdsValue::contact_info) - .map(|p| (p.gossip, messages)) - }) .flat_map(|(peer, msgs)| { Self::split_gossip_messages(msgs) .into_iter() @@ -2272,95 +2276,114 @@ impl ClusterInfo { fn handle_batch_push_messages( &self, messages: Vec<(Pubkey, Vec)>, + thread_pool: &ThreadPool, recycler: &PacketsRecycler, stakes: &HashMap, response_sender: &PacketSender, ) { - for (from, data) in messages { - let response = self.handle_push_message(recycler, &from, data, stakes); - if let Some(response) = response { - let _ = response_sender.send(response); - } + if messages.is_empty() { + return; } - } - - fn handle_push_message( - &self, - recycler: &PacketsRecycler, - from: &Pubkey, - mut crds_values: Vec, - stakes: &HashMap, - ) -> Option { - let self_id = self.id(); - self.stats.push_message_count.add_relaxed(1); - let len = crds_values.len(); - - let shred_version = self - .lookup_contact_info(from, |ci| ci.shred_version) - .unwrap_or(0); - Self::filter_by_shred_version( - from, - &mut crds_values, - shred_version, - self.my_shred_version(), - ); - let filtered_len = crds_values.len(); self.stats - .push_message_value_count - .add_relaxed(filtered_len as u64); - self.stats - .skip_push_message_shred_version - .add_relaxed((len - filtered_len) as u64); - - let updated: Vec<_> = self - .time_gossip_write_lock("process_push", &self.stats.process_push_message) - .process_push_message(from, crds_values, timestamp()); - - let updated_labels: Vec<_> = updated.into_iter().map(|u| u.value.label()).collect(); - let prunes_map: HashMap> = self - .time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache) - .prune_received_cache(updated_labels, stakes); - - let rsp: Vec<_> = prunes_map - .into_iter() - .filter_map(|(from, prune_set)| { - inc_new_counter_debug!("cluster_info-push_message-prunes", prune_set.len()); - self.lookup_contact_info(&from, |ci| ci.clone()).map(|ci| { - let mut prune_msg = PruneData { - pubkey: self_id, - prunes: prune_set.into_iter().collect(), - signature: Signature::default(), - destination: from, - wallclock: timestamp(), - }; - prune_msg.sign(&self.keypair); - let rsp = Protocol::PruneMessage(self_id, prune_msg); - (ci.gossip, rsp) + .push_message_count + .add_relaxed(messages.len() as u64); + // Obtain shred versions of the origins. + let shred_versions: Vec<_> = { + let gossip = self.gossip.read().unwrap(); + messages + .iter() + .map(|(from, _)| match gossip.crds.get_contact_info(from) { + None => 0, + Some(info) => info.shred_version, }) + .collect() + }; + // Filter out data if the origin has different shred version. + let self_shred_version = self.my_shred_version(); + let num_crds_values: u64 = messages.iter().map(|(_, data)| data.len() as u64).sum(); + let messages: Vec<_> = messages + .into_iter() + .zip(shred_versions) + .filter_map(|((from, mut crds_values), shred_version)| { + Self::filter_by_shred_version( + &from, + &mut crds_values, + shred_version, + self_shred_version, + ); + if crds_values.is_empty() { + None + } else { + Some((from, crds_values)) + } }) .collect(); - if rsp.is_empty() { - return None; + let num_filtered_crds_values = messages.iter().map(|(_, data)| data.len() as u64).sum(); + self.stats + .push_message_value_count + .add_relaxed(num_filtered_crds_values); + self.stats + .skip_push_message_shred_version + .add_relaxed(num_crds_values - num_filtered_crds_values); + // Update crds values and obtain updated keys. + let updated_labels: Vec<_> = { + let mut gossip = + self.time_gossip_write_lock("process_push", &self.stats.process_push_message); + let now = timestamp(); + messages + .into_iter() + .flat_map(|(from, crds_values)| { + gossip.process_push_message(&from, crds_values, now) + }) + .map(|v| v.value.label()) + .collect() + }; + // Generate prune messages. + let prunes_map = self + .time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache) + .prune_received_cache(updated_labels, stakes); + let prune_messages: Vec<_> = { + let gossip = self.gossip.read().unwrap(); + let wallclock = timestamp(); + let self_pubkey = self.id(); + thread_pool.install(|| { + Vec::from_iter(prunes_map) + .into_par_iter() + .with_min_len(256) + .filter_map(|(from, prune_set)| { + let peer = gossip.crds.get_contact_info(&from)?; + let mut prune_data = PruneData { + pubkey: self_pubkey, + prunes: Vec::from_iter(prune_set), + signature: Signature::default(), + destination: from, + wallclock, + }; + prune_data.sign(&self.keypair); + let prune_message = Protocol::PruneMessage(self_pubkey, prune_data); + Some((peer.gossip, prune_message)) + }) + .collect() + }) + }; + if prune_messages.is_empty() { + return; } - let mut packets = to_packets_with_destination(recycler.clone(), &rsp); + let mut packets = to_packets_with_destination(recycler.clone(), &prune_messages); self.stats .push_response_count .add_relaxed(packets.packets.len() as u64); - if !packets.is_empty() { - let pushes: Vec<_> = self.new_push_requests(); - inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len()); - pushes.into_iter().for_each(|(remote_gossip_addr, req)| { - if !remote_gossip_addr.ip().is_unspecified() && remote_gossip_addr.port() != 0 { - let p = Packet::from_data(&remote_gossip_addr, &req); - packets.packets.push(p); - } else { - trace!("Dropping Gossip push response, as destination is unknown"); - } - }); - Some(packets) - } else { - None + let new_push_requests = self.new_push_requests(); + inc_new_counter_debug!("cluster_info-push_message-pushes", new_push_requests.len()); + for (address, request) in new_push_requests { + if ContactInfo::is_valid_address(&address) { + let packet = Packet::from_data(&address, &request); + packets.packets.push(packet); + } else { + trace!("Dropping Gossip push response, as destination is unknown"); + } } + let _ = response_sender.send(packets); } fn get_stakes_and_epoch_time( @@ -2430,7 +2453,13 @@ impl ClusterInfo { } self.handle_batch_ping_messages(ping_messages, recycler, response_sender); self.handle_batch_prune_messages(prune_messages); - self.handle_batch_push_messages(push_messages, recycler, &stakes, response_sender); + self.handle_batch_push_messages( + push_messages, + thread_pool, + recycler, + &stakes, + response_sender, + ); self.handle_batch_pull_responses(pull_responses, thread_pool, &stakes, epoch_time_ms); self.handle_batch_pong_messages(pong_messages, Instant::now()); self.handle_batch_pull_requests( diff --git a/core/src/crds.rs b/core/src/crds.rs index 98973d66d0..24d5561a78 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -24,6 +24,7 @@ //! A value is updated to a new version if the labels match, and the value //! wallclock is later, or the value hash is greater. +use crate::contact_info::ContactInfo; use crate::crds_shards::CrdsShards; use crate::crds_value::{CrdsValue, CrdsValueLabel}; use bincode::serialize; @@ -160,6 +161,11 @@ impl Crds { self.table.get(label) } + pub fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo> { + let label = CrdsValueLabel::ContactInfo(*pubkey); + self.table.get(&label)?.value.contact_info() + } + fn update_label_timestamp(&mut self, id: &CrdsValueLabel, now: u64) { if let Some(e) = self.table.get_mut(id) { e.local_timestamp = cmp::max(e.local_timestamp, now);