uses thread-pool when handling push messages (#13338)
From runtime profiles, the majority time of solana-listen thread: https://github.com/solana-labs/solana/blob/55b0428ff/core/src/cluster_info.rs#L2720 is spent handling push messages. The code here: https://github.com/solana-labs/solana/blob/55b0428ff/core/src/cluster_info.rs#L2272-L2364 may utilize the idle gossip thread-pool.
This commit is contained in:
		@@ -68,6 +68,7 @@ use std::{
 | 
				
			|||||||
    cmp::min,
 | 
					    cmp::min,
 | 
				
			||||||
    collections::{hash_map::Entry, HashMap, HashSet},
 | 
					    collections::{hash_map::Entry, HashMap, HashSet},
 | 
				
			||||||
    fmt,
 | 
					    fmt,
 | 
				
			||||||
 | 
					    iter::FromIterator,
 | 
				
			||||||
    net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
 | 
					    net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
 | 
				
			||||||
    ops::{Deref, DerefMut},
 | 
					    ops::{Deref, DerefMut},
 | 
				
			||||||
    sync::atomic::{AtomicBool, AtomicU64, Ordering},
 | 
					    sync::atomic::{AtomicBool, AtomicU64, Ordering},
 | 
				
			||||||
@@ -1606,16 +1607,19 @@ impl ClusterInfo {
 | 
				
			|||||||
        let (_, push_messages) = self
 | 
					        let (_, push_messages) = self
 | 
				
			||||||
            .time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests)
 | 
					            .time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests)
 | 
				
			||||||
            .new_push_messages(self.drain_push_queue(), timestamp());
 | 
					            .new_push_messages(self.drain_push_queue(), timestamp());
 | 
				
			||||||
 | 
					        let push_messages: Vec<_> = {
 | 
				
			||||||
 | 
					            let gossip =
 | 
				
			||||||
 | 
					                self.time_gossip_read_lock("push_req_lookup", &self.stats.new_push_requests2);
 | 
				
			||||||
 | 
					            push_messages
 | 
				
			||||||
 | 
					                .into_iter()
 | 
				
			||||||
 | 
					                .filter_map(|(pubkey, messages)| {
 | 
				
			||||||
 | 
					                    let peer = gossip.crds.get_contact_info(&pubkey)?;
 | 
				
			||||||
 | 
					                    Some((peer.gossip, messages))
 | 
				
			||||||
 | 
					                })
 | 
				
			||||||
 | 
					                .collect()
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
        let messages: Vec<_> = push_messages
 | 
					        let messages: Vec<_> = push_messages
 | 
				
			||||||
            .into_iter()
 | 
					            .into_iter()
 | 
				
			||||||
            .filter_map(|(peer, messages)| {
 | 
					 | 
				
			||||||
                let peer_label = CrdsValueLabel::ContactInfo(peer);
 | 
					 | 
				
			||||||
                self.time_gossip_read_lock("push_req_lookup", &self.stats.new_push_requests2)
 | 
					 | 
				
			||||||
                    .crds
 | 
					 | 
				
			||||||
                    .lookup(&peer_label)
 | 
					 | 
				
			||||||
                    .and_then(CrdsValue::contact_info)
 | 
					 | 
				
			||||||
                    .map(|p| (p.gossip, messages))
 | 
					 | 
				
			||||||
            })
 | 
					 | 
				
			||||||
            .flat_map(|(peer, msgs)| {
 | 
					            .flat_map(|(peer, msgs)| {
 | 
				
			||||||
                Self::split_gossip_messages(msgs)
 | 
					                Self::split_gossip_messages(msgs)
 | 
				
			||||||
                    .into_iter()
 | 
					                    .into_iter()
 | 
				
			||||||
@@ -2272,95 +2276,114 @@ impl ClusterInfo {
 | 
				
			|||||||
    fn handle_batch_push_messages(
 | 
					    fn handle_batch_push_messages(
 | 
				
			||||||
        &self,
 | 
					        &self,
 | 
				
			||||||
        messages: Vec<(Pubkey, Vec<CrdsValue>)>,
 | 
					        messages: Vec<(Pubkey, Vec<CrdsValue>)>,
 | 
				
			||||||
 | 
					        thread_pool: &ThreadPool,
 | 
				
			||||||
        recycler: &PacketsRecycler,
 | 
					        recycler: &PacketsRecycler,
 | 
				
			||||||
        stakes: &HashMap<Pubkey, u64>,
 | 
					        stakes: &HashMap<Pubkey, u64>,
 | 
				
			||||||
        response_sender: &PacketSender,
 | 
					        response_sender: &PacketSender,
 | 
				
			||||||
    ) {
 | 
					    ) {
 | 
				
			||||||
        for (from, data) in messages {
 | 
					        if messages.is_empty() {
 | 
				
			||||||
            let response = self.handle_push_message(recycler, &from, data, stakes);
 | 
					            return;
 | 
				
			||||||
            if let Some(response) = response {
 | 
					 | 
				
			||||||
                let _ = response_sender.send(response);
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    fn handle_push_message(
 | 
					 | 
				
			||||||
        &self,
 | 
					 | 
				
			||||||
        recycler: &PacketsRecycler,
 | 
					 | 
				
			||||||
        from: &Pubkey,
 | 
					 | 
				
			||||||
        mut crds_values: Vec<CrdsValue>,
 | 
					 | 
				
			||||||
        stakes: &HashMap<Pubkey, u64>,
 | 
					 | 
				
			||||||
    ) -> Option<Packets> {
 | 
					 | 
				
			||||||
        let self_id = self.id();
 | 
					 | 
				
			||||||
        self.stats.push_message_count.add_relaxed(1);
 | 
					 | 
				
			||||||
        let len = crds_values.len();
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        let shred_version = self
 | 
					 | 
				
			||||||
            .lookup_contact_info(from, |ci| ci.shred_version)
 | 
					 | 
				
			||||||
            .unwrap_or(0);
 | 
					 | 
				
			||||||
        Self::filter_by_shred_version(
 | 
					 | 
				
			||||||
            from,
 | 
					 | 
				
			||||||
            &mut crds_values,
 | 
					 | 
				
			||||||
            shred_version,
 | 
					 | 
				
			||||||
            self.my_shred_version(),
 | 
					 | 
				
			||||||
        );
 | 
					 | 
				
			||||||
        let filtered_len = crds_values.len();
 | 
					 | 
				
			||||||
        self.stats
 | 
					        self.stats
 | 
				
			||||||
            .push_message_value_count
 | 
					            .push_message_count
 | 
				
			||||||
            .add_relaxed(filtered_len as u64);
 | 
					            .add_relaxed(messages.len() as u64);
 | 
				
			||||||
        self.stats
 | 
					        // Obtain shred versions of the origins.
 | 
				
			||||||
            .skip_push_message_shred_version
 | 
					        let shred_versions: Vec<_> = {
 | 
				
			||||||
            .add_relaxed((len - filtered_len) as u64);
 | 
					            let gossip = self.gossip.read().unwrap();
 | 
				
			||||||
 | 
					            messages
 | 
				
			||||||
        let updated: Vec<_> = self
 | 
					                .iter()
 | 
				
			||||||
            .time_gossip_write_lock("process_push", &self.stats.process_push_message)
 | 
					                .map(|(from, _)| match gossip.crds.get_contact_info(from) {
 | 
				
			||||||
            .process_push_message(from, crds_values, timestamp());
 | 
					                    None => 0,
 | 
				
			||||||
 | 
					                    Some(info) => info.shred_version,
 | 
				
			||||||
        let updated_labels: Vec<_> = updated.into_iter().map(|u| u.value.label()).collect();
 | 
					 | 
				
			||||||
        let prunes_map: HashMap<Pubkey, HashSet<Pubkey>> = self
 | 
					 | 
				
			||||||
            .time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache)
 | 
					 | 
				
			||||||
            .prune_received_cache(updated_labels, stakes);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        let rsp: Vec<_> = prunes_map
 | 
					 | 
				
			||||||
            .into_iter()
 | 
					 | 
				
			||||||
            .filter_map(|(from, prune_set)| {
 | 
					 | 
				
			||||||
                inc_new_counter_debug!("cluster_info-push_message-prunes", prune_set.len());
 | 
					 | 
				
			||||||
                self.lookup_contact_info(&from, |ci| ci.clone()).map(|ci| {
 | 
					 | 
				
			||||||
                    let mut prune_msg = PruneData {
 | 
					 | 
				
			||||||
                        pubkey: self_id,
 | 
					 | 
				
			||||||
                        prunes: prune_set.into_iter().collect(),
 | 
					 | 
				
			||||||
                        signature: Signature::default(),
 | 
					 | 
				
			||||||
                        destination: from,
 | 
					 | 
				
			||||||
                        wallclock: timestamp(),
 | 
					 | 
				
			||||||
                    };
 | 
					 | 
				
			||||||
                    prune_msg.sign(&self.keypair);
 | 
					 | 
				
			||||||
                    let rsp = Protocol::PruneMessage(self_id, prune_msg);
 | 
					 | 
				
			||||||
                    (ci.gossip, rsp)
 | 
					 | 
				
			||||||
                })
 | 
					                })
 | 
				
			||||||
 | 
					                .collect()
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
 | 
					        // Filter out data if the origin has different shred version.
 | 
				
			||||||
 | 
					        let self_shred_version = self.my_shred_version();
 | 
				
			||||||
 | 
					        let num_crds_values: u64 = messages.iter().map(|(_, data)| data.len() as u64).sum();
 | 
				
			||||||
 | 
					        let messages: Vec<_> = messages
 | 
				
			||||||
 | 
					            .into_iter()
 | 
				
			||||||
 | 
					            .zip(shred_versions)
 | 
				
			||||||
 | 
					            .filter_map(|((from, mut crds_values), shred_version)| {
 | 
				
			||||||
 | 
					                Self::filter_by_shred_version(
 | 
				
			||||||
 | 
					                    &from,
 | 
				
			||||||
 | 
					                    &mut crds_values,
 | 
				
			||||||
 | 
					                    shred_version,
 | 
				
			||||||
 | 
					                    self_shred_version,
 | 
				
			||||||
 | 
					                );
 | 
				
			||||||
 | 
					                if crds_values.is_empty() {
 | 
				
			||||||
 | 
					                    None
 | 
				
			||||||
 | 
					                } else {
 | 
				
			||||||
 | 
					                    Some((from, crds_values))
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
            })
 | 
					            })
 | 
				
			||||||
            .collect();
 | 
					            .collect();
 | 
				
			||||||
        if rsp.is_empty() {
 | 
					        let num_filtered_crds_values = messages.iter().map(|(_, data)| data.len() as u64).sum();
 | 
				
			||||||
            return None;
 | 
					        self.stats
 | 
				
			||||||
 | 
					            .push_message_value_count
 | 
				
			||||||
 | 
					            .add_relaxed(num_filtered_crds_values);
 | 
				
			||||||
 | 
					        self.stats
 | 
				
			||||||
 | 
					            .skip_push_message_shred_version
 | 
				
			||||||
 | 
					            .add_relaxed(num_crds_values - num_filtered_crds_values);
 | 
				
			||||||
 | 
					        // Update crds values and obtain updated keys.
 | 
				
			||||||
 | 
					        let updated_labels: Vec<_> = {
 | 
				
			||||||
 | 
					            let mut gossip =
 | 
				
			||||||
 | 
					                self.time_gossip_write_lock("process_push", &self.stats.process_push_message);
 | 
				
			||||||
 | 
					            let now = timestamp();
 | 
				
			||||||
 | 
					            messages
 | 
				
			||||||
 | 
					                .into_iter()
 | 
				
			||||||
 | 
					                .flat_map(|(from, crds_values)| {
 | 
				
			||||||
 | 
					                    gossip.process_push_message(&from, crds_values, now)
 | 
				
			||||||
 | 
					                })
 | 
				
			||||||
 | 
					                .map(|v| v.value.label())
 | 
				
			||||||
 | 
					                .collect()
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
 | 
					        // Generate prune messages.
 | 
				
			||||||
 | 
					        let prunes_map = self
 | 
				
			||||||
 | 
					            .time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache)
 | 
				
			||||||
 | 
					            .prune_received_cache(updated_labels, stakes);
 | 
				
			||||||
 | 
					        let prune_messages: Vec<_> = {
 | 
				
			||||||
 | 
					            let gossip = self.gossip.read().unwrap();
 | 
				
			||||||
 | 
					            let wallclock = timestamp();
 | 
				
			||||||
 | 
					            let self_pubkey = self.id();
 | 
				
			||||||
 | 
					            thread_pool.install(|| {
 | 
				
			||||||
 | 
					                Vec::from_iter(prunes_map)
 | 
				
			||||||
 | 
					                    .into_par_iter()
 | 
				
			||||||
 | 
					                    .with_min_len(256)
 | 
				
			||||||
 | 
					                    .filter_map(|(from, prune_set)| {
 | 
				
			||||||
 | 
					                        let peer = gossip.crds.get_contact_info(&from)?;
 | 
				
			||||||
 | 
					                        let mut prune_data = PruneData {
 | 
				
			||||||
 | 
					                            pubkey: self_pubkey,
 | 
				
			||||||
 | 
					                            prunes: Vec::from_iter(prune_set),
 | 
				
			||||||
 | 
					                            signature: Signature::default(),
 | 
				
			||||||
 | 
					                            destination: from,
 | 
				
			||||||
 | 
					                            wallclock,
 | 
				
			||||||
 | 
					                        };
 | 
				
			||||||
 | 
					                        prune_data.sign(&self.keypair);
 | 
				
			||||||
 | 
					                        let prune_message = Protocol::PruneMessage(self_pubkey, prune_data);
 | 
				
			||||||
 | 
					                        Some((peer.gossip, prune_message))
 | 
				
			||||||
 | 
					                    })
 | 
				
			||||||
 | 
					                    .collect()
 | 
				
			||||||
 | 
					            })
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
 | 
					        if prune_messages.is_empty() {
 | 
				
			||||||
 | 
					            return;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        let mut packets = to_packets_with_destination(recycler.clone(), &rsp);
 | 
					        let mut packets = to_packets_with_destination(recycler.clone(), &prune_messages);
 | 
				
			||||||
        self.stats
 | 
					        self.stats
 | 
				
			||||||
            .push_response_count
 | 
					            .push_response_count
 | 
				
			||||||
            .add_relaxed(packets.packets.len() as u64);
 | 
					            .add_relaxed(packets.packets.len() as u64);
 | 
				
			||||||
        if !packets.is_empty() {
 | 
					        let new_push_requests = self.new_push_requests();
 | 
				
			||||||
            let pushes: Vec<_> = self.new_push_requests();
 | 
					        inc_new_counter_debug!("cluster_info-push_message-pushes", new_push_requests.len());
 | 
				
			||||||
            inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len());
 | 
					        for (address, request) in new_push_requests {
 | 
				
			||||||
            pushes.into_iter().for_each(|(remote_gossip_addr, req)| {
 | 
					            if ContactInfo::is_valid_address(&address) {
 | 
				
			||||||
                if !remote_gossip_addr.ip().is_unspecified() && remote_gossip_addr.port() != 0 {
 | 
					                let packet = Packet::from_data(&address, &request);
 | 
				
			||||||
                    let p = Packet::from_data(&remote_gossip_addr, &req);
 | 
					                packets.packets.push(packet);
 | 
				
			||||||
                    packets.packets.push(p);
 | 
					            } else {
 | 
				
			||||||
                } else {
 | 
					                trace!("Dropping Gossip push response, as destination is unknown");
 | 
				
			||||||
                    trace!("Dropping Gossip push response, as destination is unknown");
 | 
					            }
 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
            });
 | 
					 | 
				
			||||||
            Some(packets)
 | 
					 | 
				
			||||||
        } else {
 | 
					 | 
				
			||||||
            None
 | 
					 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					        let _ = response_sender.send(packets);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn get_stakes_and_epoch_time(
 | 
					    fn get_stakes_and_epoch_time(
 | 
				
			||||||
@@ -2430,7 +2453,13 @@ impl ClusterInfo {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
        self.handle_batch_ping_messages(ping_messages, recycler, response_sender);
 | 
					        self.handle_batch_ping_messages(ping_messages, recycler, response_sender);
 | 
				
			||||||
        self.handle_batch_prune_messages(prune_messages);
 | 
					        self.handle_batch_prune_messages(prune_messages);
 | 
				
			||||||
        self.handle_batch_push_messages(push_messages, recycler, &stakes, response_sender);
 | 
					        self.handle_batch_push_messages(
 | 
				
			||||||
 | 
					            push_messages,
 | 
				
			||||||
 | 
					            thread_pool,
 | 
				
			||||||
 | 
					            recycler,
 | 
				
			||||||
 | 
					            &stakes,
 | 
				
			||||||
 | 
					            response_sender,
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
        self.handle_batch_pull_responses(pull_responses, thread_pool, &stakes, epoch_time_ms);
 | 
					        self.handle_batch_pull_responses(pull_responses, thread_pool, &stakes, epoch_time_ms);
 | 
				
			||||||
        self.handle_batch_pong_messages(pong_messages, Instant::now());
 | 
					        self.handle_batch_pong_messages(pong_messages, Instant::now());
 | 
				
			||||||
        self.handle_batch_pull_requests(
 | 
					        self.handle_batch_pull_requests(
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -24,6 +24,7 @@
 | 
				
			|||||||
//! A value is updated to a new version if the labels match, and the value
 | 
					//! A value is updated to a new version if the labels match, and the value
 | 
				
			||||||
//! wallclock is later, or the value hash is greater.
 | 
					//! wallclock is later, or the value hash is greater.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use crate::contact_info::ContactInfo;
 | 
				
			||||||
use crate::crds_shards::CrdsShards;
 | 
					use crate::crds_shards::CrdsShards;
 | 
				
			||||||
use crate::crds_value::{CrdsValue, CrdsValueLabel};
 | 
					use crate::crds_value::{CrdsValue, CrdsValueLabel};
 | 
				
			||||||
use bincode::serialize;
 | 
					use bincode::serialize;
 | 
				
			||||||
@@ -160,6 +161,11 @@ impl Crds {
 | 
				
			|||||||
        self.table.get(label)
 | 
					        self.table.get(label)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pub fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo> {
 | 
				
			||||||
 | 
					        let label = CrdsValueLabel::ContactInfo(*pubkey);
 | 
				
			||||||
 | 
					        self.table.get(&label)?.value.contact_info()
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn update_label_timestamp(&mut self, id: &CrdsValueLabel, now: u64) {
 | 
					    fn update_label_timestamp(&mut self, id: &CrdsValueLabel, now: u64) {
 | 
				
			||||||
        if let Some(e) = self.table.get_mut(id) {
 | 
					        if let Some(e) = self.table.get_mut(id) {
 | 
				
			||||||
            e.local_timestamp = cmp::max(e.local_timestamp, now);
 | 
					            e.local_timestamp = cmp::max(e.local_timestamp, now);
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user