ClusterInfo::process_packets handles incoming packets in a thread_pool:
https://github.com/solana-labs/solana/blob/87311cce7/core/src/cluster_info.rs#L2118-L2134
However, profiling runtime shows that threads are not well utilized and
a lot of the processing is done sequentially.
This commit redistributes the work done in parallel. Testing on a gce
cluster shows 20%+ improvement in processing gossip packets with much
smaller variations.
(cherry picked from commit 75d62ca095)
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
			
			
This commit is contained in:
		@@ -211,6 +211,7 @@ struct GossipStats {
 | 
			
		||||
    new_push_requests2: Counter,
 | 
			
		||||
    new_push_requests_num: Counter,
 | 
			
		||||
    filter_pull_response: Counter,
 | 
			
		||||
    process_gossip_packets_time: Counter,
 | 
			
		||||
    process_pull_response: Counter,
 | 
			
		||||
    process_pull_response_count: Counter,
 | 
			
		||||
    process_pull_response_len: Counter,
 | 
			
		||||
@@ -366,6 +367,59 @@ enum Protocol {
 | 
			
		||||
    PruneMessage(Pubkey, PruneData),
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Protocol {
 | 
			
		||||
    fn par_verify(self) -> Option<Self> {
 | 
			
		||||
        match self {
 | 
			
		||||
            Protocol::PullRequest(_, ref caller) => {
 | 
			
		||||
                if caller.verify() {
 | 
			
		||||
                    Some(self)
 | 
			
		||||
                } else {
 | 
			
		||||
                    inc_new_counter_info!("cluster_info-gossip_pull_request_verify_fail", 1);
 | 
			
		||||
                    None
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            Protocol::PullResponse(from, data) => {
 | 
			
		||||
                let size = data.len();
 | 
			
		||||
                let data: Vec<_> = data.into_par_iter().filter(Signable::verify).collect();
 | 
			
		||||
                if size != data.len() {
 | 
			
		||||
                    inc_new_counter_info!(
 | 
			
		||||
                        "cluster_info-gossip_pull_response_verify_fail",
 | 
			
		||||
                        size - data.len()
 | 
			
		||||
                    );
 | 
			
		||||
                }
 | 
			
		||||
                if data.is_empty() {
 | 
			
		||||
                    None
 | 
			
		||||
                } else {
 | 
			
		||||
                    Some(Protocol::PullResponse(from, data))
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            Protocol::PushMessage(from, data) => {
 | 
			
		||||
                let size = data.len();
 | 
			
		||||
                let data: Vec<_> = data.into_par_iter().filter(Signable::verify).collect();
 | 
			
		||||
                if size != data.len() {
 | 
			
		||||
                    inc_new_counter_info!(
 | 
			
		||||
                        "cluster_info-gossip_push_msg_verify_fail",
 | 
			
		||||
                        size - data.len()
 | 
			
		||||
                    );
 | 
			
		||||
                }
 | 
			
		||||
                if data.is_empty() {
 | 
			
		||||
                    None
 | 
			
		||||
                } else {
 | 
			
		||||
                    Some(Protocol::PushMessage(from, data))
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            Protocol::PruneMessage(_, ref data) => {
 | 
			
		||||
                if data.verify() {
 | 
			
		||||
                    Some(self)
 | 
			
		||||
                } else {
 | 
			
		||||
                    inc_new_counter_debug!("cluster_info-gossip_prune_msg_verify_fail", 1);
 | 
			
		||||
                    None
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Sanitize for Protocol {
 | 
			
		||||
    fn sanitize(&self) -> std::result::Result<(), SanitizeError> {
 | 
			
		||||
        match self {
 | 
			
		||||
@@ -1650,7 +1704,7 @@ impl ClusterInfo {
 | 
			
		||||
        &self,
 | 
			
		||||
        recycler: &PacketsRecycler,
 | 
			
		||||
        stakes: &HashMap<Pubkey, u64>,
 | 
			
		||||
        packets: Packets,
 | 
			
		||||
        packets: Vec<(SocketAddr, Protocol)>,
 | 
			
		||||
        response_sender: &PacketSender,
 | 
			
		||||
        feature_set: Option<&FeatureSet>,
 | 
			
		||||
        epoch_time_ms: u64,
 | 
			
		||||
@@ -1664,115 +1718,82 @@ impl ClusterInfo {
 | 
			
		||||
            .unwrap()
 | 
			
		||||
            .make_timeouts(&stakes, epoch_time_ms);
 | 
			
		||||
        let mut pull_responses = HashMap::new();
 | 
			
		||||
        packets.packets.iter().for_each(|packet| {
 | 
			
		||||
            let from_addr = packet.meta.addr();
 | 
			
		||||
            limited_deserialize(&packet.data[..packet.meta.size])
 | 
			
		||||
                .into_iter()
 | 
			
		||||
                .filter(|r: &Protocol| r.sanitize().is_ok())
 | 
			
		||||
                .for_each(|request| match request {
 | 
			
		||||
                    Protocol::PullRequest(filter, caller) => {
 | 
			
		||||
                        let start = allocated.get();
 | 
			
		||||
                        if !caller.verify() {
 | 
			
		||||
                            inc_new_counter_info!(
 | 
			
		||||
                                "cluster_info-gossip_pull_request_verify_fail",
 | 
			
		||||
                                1
 | 
			
		||||
                            );
 | 
			
		||||
                        } else if let Some(contact_info) = caller.contact_info() {
 | 
			
		||||
                            if contact_info.id == self.id() {
 | 
			
		||||
                                warn!("PullRequest ignored, I'm talking to myself");
 | 
			
		||||
                                inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
 | 
			
		||||
                            } else if contact_info.shred_version == 0
 | 
			
		||||
                                || contact_info.shred_version == self.my_shred_version()
 | 
			
		||||
                                || self.my_shred_version() == 0
 | 
			
		||||
                            {
 | 
			
		||||
                                gossip_pull_data.push(PullData {
 | 
			
		||||
                                    from_addr,
 | 
			
		||||
                                    caller,
 | 
			
		||||
                                    filter,
 | 
			
		||||
                                });
 | 
			
		||||
                            } else {
 | 
			
		||||
                                self.stats.skip_pull_shred_version.add_relaxed(1);
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                        datapoint_debug!(
 | 
			
		||||
                            "solana-gossip-listen-memory",
 | 
			
		||||
                            ("pull_request", (allocated.get() - start) as i64, i64),
 | 
			
		||||
                        );
 | 
			
		||||
                    }
 | 
			
		||||
                    Protocol::PullResponse(from, mut data) => {
 | 
			
		||||
                        let start = allocated.get();
 | 
			
		||||
                        data.retain(|v| {
 | 
			
		||||
                            let ret = v.verify();
 | 
			
		||||
                            if !ret {
 | 
			
		||||
                                inc_new_counter_info!(
 | 
			
		||||
                                    "cluster_info-gossip_pull_response_verify_fail",
 | 
			
		||||
                                    1
 | 
			
		||||
                                );
 | 
			
		||||
                            }
 | 
			
		||||
                            ret
 | 
			
		||||
                        });
 | 
			
		||||
                        let pull_entry = pull_responses.entry(from).or_insert_with(Vec::new);
 | 
			
		||||
                        pull_entry.extend(data);
 | 
			
		||||
                        datapoint_debug!(
 | 
			
		||||
                            "solana-gossip-listen-memory",
 | 
			
		||||
                            ("pull_response", (allocated.get() - start) as i64, i64),
 | 
			
		||||
                        );
 | 
			
		||||
                    }
 | 
			
		||||
                    Protocol::PushMessage(from, mut data) => {
 | 
			
		||||
                        let start = allocated.get();
 | 
			
		||||
                        data.retain(|v| {
 | 
			
		||||
                            let ret = v.verify();
 | 
			
		||||
                            if !ret {
 | 
			
		||||
                                inc_new_counter_info!(
 | 
			
		||||
                                    "cluster_info-gossip_push_msg_verify_fail",
 | 
			
		||||
                                    1
 | 
			
		||||
                                );
 | 
			
		||||
                            }
 | 
			
		||||
                            ret
 | 
			
		||||
                        });
 | 
			
		||||
                        let rsp = self.handle_push_message(recycler, &from, data, stakes);
 | 
			
		||||
                        if let Some(rsp) = rsp {
 | 
			
		||||
                            let _ignore_disconnect = response_sender.send(rsp);
 | 
			
		||||
                        }
 | 
			
		||||
                        datapoint_debug!(
 | 
			
		||||
                            "solana-gossip-listen-memory",
 | 
			
		||||
                            ("push_message", (allocated.get() - start) as i64, i64),
 | 
			
		||||
                        );
 | 
			
		||||
                    }
 | 
			
		||||
                    Protocol::PruneMessage(from, data) => {
 | 
			
		||||
                        let start = allocated.get();
 | 
			
		||||
                        if data.verify() {
 | 
			
		||||
                            self.stats.prune_message_count.add_relaxed(1);
 | 
			
		||||
                            self.stats
 | 
			
		||||
                                .prune_message_len
 | 
			
		||||
                                .add_relaxed(data.prunes.len() as u64);
 | 
			
		||||
                            match self
 | 
			
		||||
                                .time_gossip_write_lock("process_prune", &self.stats.process_prune)
 | 
			
		||||
                                .process_prune_msg(
 | 
			
		||||
                                    &from,
 | 
			
		||||
                                    &data.destination,
 | 
			
		||||
                                    &data.prunes,
 | 
			
		||||
                                    data.wallclock,
 | 
			
		||||
                                    timestamp(),
 | 
			
		||||
                                ) {
 | 
			
		||||
                                Err(CrdsGossipError::PruneMessageTimeout) => {
 | 
			
		||||
                                    inc_new_counter_debug!("cluster_info-prune_message_timeout", 1)
 | 
			
		||||
                                }
 | 
			
		||||
                                Err(CrdsGossipError::BadPruneDestination) => {
 | 
			
		||||
                                    inc_new_counter_debug!("cluster_info-bad_prune_destination", 1)
 | 
			
		||||
                                }
 | 
			
		||||
                                _ => (),
 | 
			
		||||
                            }
 | 
			
		||||
        for (from_addr, packet) in packets {
 | 
			
		||||
            match packet {
 | 
			
		||||
                Protocol::PullRequest(filter, caller) => {
 | 
			
		||||
                    let start = allocated.get();
 | 
			
		||||
                    if let Some(contact_info) = caller.contact_info() {
 | 
			
		||||
                        if contact_info.id == self.id() {
 | 
			
		||||
                            warn!("PullRequest ignored, I'm talking to myself");
 | 
			
		||||
                            inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
 | 
			
		||||
                        } else if contact_info.shred_version == 0
 | 
			
		||||
                            || contact_info.shred_version == self.my_shred_version()
 | 
			
		||||
                            || self.my_shred_version() == 0
 | 
			
		||||
                        {
 | 
			
		||||
                            gossip_pull_data.push(PullData {
 | 
			
		||||
                                from_addr,
 | 
			
		||||
                                caller,
 | 
			
		||||
                                filter,
 | 
			
		||||
                            });
 | 
			
		||||
                        } else {
 | 
			
		||||
                            inc_new_counter_debug!("cluster_info-gossip_prune_msg_verify_fail", 1);
 | 
			
		||||
                            self.stats.skip_pull_shred_version.add_relaxed(1);
 | 
			
		||||
                        }
 | 
			
		||||
                        datapoint_debug!(
 | 
			
		||||
                            "solana-gossip-listen-memory",
 | 
			
		||||
                            ("prune_message", (allocated.get() - start) as i64, i64),
 | 
			
		||||
                        );
 | 
			
		||||
                    }
 | 
			
		||||
                })
 | 
			
		||||
        });
 | 
			
		||||
                    datapoint_debug!(
 | 
			
		||||
                        "solana-gossip-listen-memory",
 | 
			
		||||
                        ("pull_request", (allocated.get() - start) as i64, i64),
 | 
			
		||||
                    );
 | 
			
		||||
                }
 | 
			
		||||
                Protocol::PullResponse(from, data) => {
 | 
			
		||||
                    let start = allocated.get();
 | 
			
		||||
                    let pull_entry = pull_responses.entry(from).or_insert_with(Vec::new);
 | 
			
		||||
                    pull_entry.extend(data);
 | 
			
		||||
                    datapoint_debug!(
 | 
			
		||||
                        "solana-gossip-listen-memory",
 | 
			
		||||
                        ("pull_response", (allocated.get() - start) as i64, i64),
 | 
			
		||||
                    );
 | 
			
		||||
                }
 | 
			
		||||
                Protocol::PushMessage(from, data) => {
 | 
			
		||||
                    let start = allocated.get();
 | 
			
		||||
                    let rsp = self.handle_push_message(recycler, &from, data, stakes);
 | 
			
		||||
                    if let Some(rsp) = rsp {
 | 
			
		||||
                        let _ignore_disconnect = response_sender.send(rsp);
 | 
			
		||||
                    }
 | 
			
		||||
                    datapoint_debug!(
 | 
			
		||||
                        "solana-gossip-listen-memory",
 | 
			
		||||
                        ("push_message", (allocated.get() - start) as i64, i64),
 | 
			
		||||
                    );
 | 
			
		||||
                }
 | 
			
		||||
                Protocol::PruneMessage(from, data) => {
 | 
			
		||||
                    let start = allocated.get();
 | 
			
		||||
                    self.stats.prune_message_count.add_relaxed(1);
 | 
			
		||||
                    self.stats
 | 
			
		||||
                        .prune_message_len
 | 
			
		||||
                        .add_relaxed(data.prunes.len() as u64);
 | 
			
		||||
                    match self
 | 
			
		||||
                        .time_gossip_write_lock("process_prune", &self.stats.process_prune)
 | 
			
		||||
                        .process_prune_msg(
 | 
			
		||||
                            &from,
 | 
			
		||||
                            &data.destination,
 | 
			
		||||
                            &data.prunes,
 | 
			
		||||
                            data.wallclock,
 | 
			
		||||
                            timestamp(),
 | 
			
		||||
                        ) {
 | 
			
		||||
                        Err(CrdsGossipError::PruneMessageTimeout) => {
 | 
			
		||||
                            inc_new_counter_debug!("cluster_info-prune_message_timeout", 1)
 | 
			
		||||
                        }
 | 
			
		||||
                        Err(CrdsGossipError::BadPruneDestination) => {
 | 
			
		||||
                            inc_new_counter_debug!("cluster_info-bad_prune_destination", 1)
 | 
			
		||||
                        }
 | 
			
		||||
                        _ => (),
 | 
			
		||||
                    }
 | 
			
		||||
                    datapoint_debug!(
 | 
			
		||||
                        "solana-gossip-listen-memory",
 | 
			
		||||
                        ("prune_message", (allocated.get() - start) as i64, i64),
 | 
			
		||||
                    );
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        for (from, data) in pull_responses {
 | 
			
		||||
            self.handle_pull_response(&from, data, &timeouts);
 | 
			
		||||
@@ -2125,12 +2146,31 @@ impl ClusterInfo {
 | 
			
		||||
        feature_set: Option<&FeatureSet>,
 | 
			
		||||
        epoch_time_ms: u64,
 | 
			
		||||
    ) {
 | 
			
		||||
        let sender = response_sender.clone();
 | 
			
		||||
        thread_pool.install(|| {
 | 
			
		||||
            requests.into_par_iter().for_each_with(sender, |s, reqs| {
 | 
			
		||||
                self.handle_packets(&recycler, &stakes, reqs, s, feature_set, epoch_time_ms)
 | 
			
		||||
            });
 | 
			
		||||
        let mut timer = Measure::start("process_gossip_packets_time");
 | 
			
		||||
        let packets: Vec<_> = thread_pool.install(|| {
 | 
			
		||||
            requests
 | 
			
		||||
                .into_par_iter()
 | 
			
		||||
                .flat_map(|request| request.packets.into_par_iter())
 | 
			
		||||
                .filter_map(|packet| {
 | 
			
		||||
                    let protocol: Protocol =
 | 
			
		||||
                        limited_deserialize(&packet.data[..packet.meta.size]).ok()?;
 | 
			
		||||
                    protocol.sanitize().ok()?;
 | 
			
		||||
                    let protocol = protocol.par_verify()?;
 | 
			
		||||
                    Some((packet.meta.addr(), protocol))
 | 
			
		||||
                })
 | 
			
		||||
                .collect()
 | 
			
		||||
        });
 | 
			
		||||
        self.handle_packets(
 | 
			
		||||
            recycler,
 | 
			
		||||
            &stakes,
 | 
			
		||||
            packets,
 | 
			
		||||
            response_sender,
 | 
			
		||||
            feature_set,
 | 
			
		||||
            epoch_time_ms,
 | 
			
		||||
        );
 | 
			
		||||
        self.stats
 | 
			
		||||
            .process_gossip_packets_time
 | 
			
		||||
            .add_measure(&mut timer);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Process messages from the network
 | 
			
		||||
@@ -2241,6 +2281,11 @@ impl ClusterInfo {
 | 
			
		||||
                    i64
 | 
			
		||||
                ),
 | 
			
		||||
                ("purge", self.stats.purge.clear(), i64),
 | 
			
		||||
                (
 | 
			
		||||
                    "process_gossip_packets_time",
 | 
			
		||||
                    self.stats.process_gossip_packets_time.clear(),
 | 
			
		||||
                    i64
 | 
			
		||||
                ),
 | 
			
		||||
                (
 | 
			
		||||
                    "process_pull_resp",
 | 
			
		||||
                    self.stats.process_pull_response.clear(),
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user