adds more parallel processing to gossip packets handling (#12988) (#13287)

(cherry picked from commit 3738611f5c)

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2020-10-29 18:05:52 +00:00
committed by GitHub
parent 06067dd823
commit 3a8c6f33a3
2 changed files with 226 additions and 131 deletions

View File

@ -36,13 +36,12 @@ use solana_sdk::sanitize::{Sanitize, SanitizeError};
use bincode::{serialize, serialized_size}; use bincode::{serialize, serialized_size};
use core::cmp; use core::cmp;
use itertools::Itertools; use itertools::Itertools;
use rayon::iter::IntoParallelIterator; use rayon::prelude::*;
use rayon::iter::ParallelIterator;
use rayon::{ThreadPool, ThreadPoolBuilder}; use rayon::{ThreadPool, ThreadPoolBuilder};
use solana_ledger::staking_utils; use solana_ledger::staking_utils;
use solana_measure::measure::Measure; use solana_measure::measure::Measure;
use solana_measure::thread_mem_usage; use solana_measure::thread_mem_usage;
use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error}; use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
use solana_net_utils::{ use solana_net_utils::{
bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range, bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range,
multi_bind_in_range, PortRange, multi_bind_in_range, PortRange,
@ -67,7 +66,7 @@ use solana_streamer::streamer::{PacketReceiver, PacketSender};
use std::{ use std::{
borrow::Cow, borrow::Cow,
cmp::min, cmp::min,
collections::{HashMap, HashSet}, collections::{hash_map::Entry, HashMap, HashSet},
fmt, fmt,
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
@ -1778,122 +1777,103 @@ impl ClusterInfo {
.unwrap() .unwrap()
} }
#[allow(clippy::cognitive_complexity)] fn handle_batch_prune_messages(&self, messages: Vec<(Pubkey, PruneData)>) {
fn handle_packets( if messages.is_empty() {
&self, return;
recycler: &PacketsRecycler,
stakes: &HashMap<Pubkey, u64>,
packets: Vec<(SocketAddr, Protocol)>,
response_sender: &PacketSender,
feature_set: Option<&FeatureSet>,
epoch_time_ms: u64,
) {
// iter over the packets, collect pulls separately and process everything else
let allocated = thread_mem_usage::Allocatedp::default();
let mut gossip_pull_data: Vec<PullData> = vec![];
let timeouts = self
.gossip
.read()
.unwrap()
.make_timeouts(&stakes, epoch_time_ms);
let mut ping_messages = vec![];
let mut pong_messages = vec![];
let mut pull_responses = HashMap::new();
for (from_addr, packet) in packets {
match packet {
Protocol::PullRequest(filter, caller) => {
let start = allocated.get();
if let Some(contact_info) = caller.contact_info() {
if contact_info.id == self.id() {
warn!("PullRequest ignored, I'm talking to myself");
inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
} else if contact_info.shred_version == 0
|| contact_info.shred_version == self.my_shred_version()
|| self.my_shred_version() == 0
{
gossip_pull_data.push(PullData {
from_addr,
caller,
filter,
});
} else {
self.stats.skip_pull_shred_version.add_relaxed(1);
} }
}
datapoint_debug!(
"solana-gossip-listen-memory",
("pull_request", (allocated.get() - start) as i64, i64),
);
}
Protocol::PullResponse(from, data) => {
let start = allocated.get();
let pull_entry = pull_responses.entry(from).or_insert_with(Vec::new);
pull_entry.extend(data);
datapoint_debug!(
"solana-gossip-listen-memory",
("pull_response", (allocated.get() - start) as i64, i64),
);
}
Protocol::PushMessage(from, data) => {
let start = allocated.get();
let rsp = self.handle_push_message(recycler, &from, data, stakes);
if let Some(rsp) = rsp {
let _ignore_disconnect = response_sender.send(rsp);
}
datapoint_debug!(
"solana-gossip-listen-memory",
("push_message", (allocated.get() - start) as i64, i64),
);
}
Protocol::PruneMessage(from, data) => {
let start = allocated.get();
self.stats.prune_message_count.add_relaxed(1);
self.stats self.stats
.prune_message_len .prune_message_count
.add_relaxed(data.prunes.len() as u64); .add_relaxed(messages.len() as u64);
match self self.stats.prune_message_len.add_relaxed(
.time_gossip_write_lock("process_prune", &self.stats.process_prune) messages
.process_prune_msg( .iter()
.map(|(_, data)| data.prunes.len() as u64)
.sum(),
);
let mut prune_message_timeout = 0;
let mut bad_prune_destination = 0;
{
let mut gossip =
self.time_gossip_write_lock("process_prune", &self.stats.process_prune);
let now = timestamp();
for (from, data) in messages {
match gossip.process_prune_msg(
&from, &from,
&data.destination, &data.destination,
&data.prunes, &data.prunes,
data.wallclock, data.wallclock,
timestamp(), now,
) { ) {
Err(CrdsGossipError::PruneMessageTimeout) => { Err(CrdsGossipError::PruneMessageTimeout) => {
inc_new_counter_debug!("cluster_info-prune_message_timeout", 1) prune_message_timeout += 1;
} }
Err(CrdsGossipError::BadPruneDestination) => { Err(CrdsGossipError::BadPruneDestination) => {
inc_new_counter_debug!("cluster_info-bad_prune_destination", 1) bad_prune_destination += 1;
} }
_ => (), _ => (),
} }
datapoint_debug!(
"solana-gossip-listen-memory",
("prune_message", (allocated.get() - start) as i64, i64),
);
} }
Protocol::PingMessage(ping) => ping_messages.push((ping, from_addr)), }
Protocol::PongMessage(pong) => pong_messages.push((pong, from_addr)), if prune_message_timeout != 0 {
inc_new_counter_debug!("cluster_info-prune_message_timeout", prune_message_timeout);
}
if bad_prune_destination != 0 {
inc_new_counter_debug!("cluster_info-bad_prune_destination", bad_prune_destination);
} }
} }
if let Some(response) = self.handle_ping_messages(ping_messages, recycler) { fn handle_batch_pull_requests(
let _ = response_sender.send(response); &self,
// from address, crds filter, caller contact info
requests: Vec<(SocketAddr, CrdsFilter, CrdsValue)>,
thread_pool: &ThreadPool,
recycler: &PacketsRecycler,
stakes: &HashMap<Pubkey, u64>,
response_sender: &PacketSender,
feature_set: Option<&FeatureSet>,
) {
if requests.is_empty() {
return;
} }
self.handle_pong_messages(pong_messages, Instant::now()); let self_pubkey = self.id();
for (from, data) in pull_responses { let self_shred_version = self.my_shred_version();
self.handle_pull_response(&from, data, &timeouts); let requests: Vec<_> = thread_pool.install(|| {
requests
.into_par_iter()
.with_min_len(1024)
.filter(|(_, _, caller)| match caller.contact_info() {
None => false,
Some(caller) if caller.id == self_pubkey => {
warn!("PullRequest ignored, I'm talking to myself");
inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
false
} }
Some(caller) => {
// process the collected pulls together if self_shred_version != 0
if !gossip_pull_data.is_empty() { && caller.shred_version != 0
&& caller.shred_version != self_shred_version
{
self.stats.skip_pull_shred_version.add_relaxed(1);
false
} else {
true
}
}
})
.map(|(from_addr, filter, caller)| PullData {
from_addr,
caller,
filter,
})
.collect()
});
if !requests.is_empty() {
self.stats self.stats
.pull_requests_count .pull_requests_count
.add_relaxed(gossip_pull_data.len() as u64); .add_relaxed(requests.len() as u64);
let rsp = self.handle_pull_requests(recycler, gossip_pull_data, stakes, feature_set); let response = self.handle_pull_requests(recycler, requests, stakes, feature_set);
if !rsp.is_empty() { if !response.is_empty() {
let _ignore_disconnect = response_sender.send(rsp); let _ = response_sender.send(response);
} }
} }
} }
@ -2065,6 +2045,70 @@ impl ClusterInfo {
packets packets
} }
fn handle_batch_pull_responses(
&self,
responses: Vec<(Pubkey, Vec<CrdsValue>)>,
thread_pool: &ThreadPool,
stakes: &HashMap<Pubkey, u64>,
epoch_time_ms: u64,
) {
if responses.is_empty() {
return;
}
fn extend<K, V>(hash_map: &mut HashMap<K, Vec<V>>, (key, mut value): (K, Vec<V>))
where
K: Eq + std::hash::Hash,
{
match hash_map.entry(key) {
Entry::Occupied(mut entry) => {
let entry_value = entry.get_mut();
if entry_value.len() < value.len() {
std::mem::swap(entry_value, &mut value);
}
entry_value.extend(value);
}
Entry::Vacant(entry) => {
entry.insert(value);
}
}
}
fn merge<K, V>(
mut hash_map: HashMap<K, Vec<V>>,
other: HashMap<K, Vec<V>>,
) -> HashMap<K, Vec<V>>
where
K: Eq + std::hash::Hash,
{
if hash_map.len() < other.len() {
return merge(other, hash_map);
}
for kv in other {
extend(&mut hash_map, kv);
}
hash_map
}
let responses = thread_pool.install(|| {
responses
.into_par_iter()
.with_min_len(1024)
.fold(HashMap::new, |mut hash_map, kv| {
extend(&mut hash_map, kv);
hash_map
})
.reduce(HashMap::new, merge)
});
if !responses.is_empty() {
let timeouts = self
.gossip
.read()
.unwrap()
.make_timeouts(&stakes, epoch_time_ms);
for (from, data) in responses {
self.handle_pull_response(&from, data, &timeouts);
}
}
}
// Returns (failed, timeout, success) // Returns (failed, timeout, success)
fn handle_pull_response( fn handle_pull_response(
&self, &self,
@ -2149,13 +2193,26 @@ impl ClusterInfo {
} }
} }
fn handle_batch_ping_messages<I>(
&self,
pings: I,
recycler: &PacketsRecycler,
response_sender: &PacketSender,
) where
I: IntoIterator<Item = (SocketAddr, Ping)>,
{
if let Some(response) = self.handle_ping_messages(pings, recycler) {
let _ = response_sender.send(response);
}
}
fn handle_ping_messages<I>(&self, pings: I, recycler: &PacketsRecycler) -> Option<Packets> fn handle_ping_messages<I>(&self, pings: I, recycler: &PacketsRecycler) -> Option<Packets>
where where
I: IntoIterator<Item = (Ping, SocketAddr)>, I: IntoIterator<Item = (SocketAddr, Ping)>,
{ {
let packets: Vec<_> = pings let packets: Vec<_> = pings
.into_iter() .into_iter()
.filter_map(|(ping, addr)| { .filter_map(|(addr, ping)| {
let pong = Pong::new(&ping, &self.keypair).ok()?; let pong = Pong::new(&ping, &self.keypair).ok()?;
let pong = Protocol::PongMessage(pong); let pong = Protocol::PongMessage(pong);
let packet = Packet::from_data(&addr, pong); let packet = Packet::from_data(&addr, pong);
@ -2171,19 +2228,34 @@ impl ClusterInfo {
} }
} }
fn handle_pong_messages<I>(&self, pongs: I, now: Instant) fn handle_batch_pong_messages<I>(&self, pongs: I, now: Instant)
where where
I: IntoIterator<Item = (Pong, SocketAddr)>, I: IntoIterator<Item = (SocketAddr, Pong)>,
{ {
let mut pongs = pongs.into_iter().peekable(); let mut pongs = pongs.into_iter().peekable();
if pongs.peek().is_some() { if pongs.peek().is_some() {
let mut ping_cache = self.ping_cache.write().unwrap(); let mut ping_cache = self.ping_cache.write().unwrap();
for (pong, addr) in pongs { for (addr, pong) in pongs {
ping_cache.add(&pong, addr, now); ping_cache.add(&pong, addr, now);
} }
} }
} }
fn handle_batch_push_messages(
&self,
messages: Vec<(Pubkey, Vec<CrdsValue>)>,
recycler: &PacketsRecycler,
stakes: &HashMap<Pubkey, u64>,
response_sender: &PacketSender,
) {
for (from, data) in messages {
let response = self.handle_push_message(recycler, &from, data, stakes);
if let Some(response) = response {
let _ = response_sender.send(response);
}
}
}
fn handle_push_message( fn handle_push_message(
&self, &self,
recycler: &PacketsRecycler, recycler: &PacketsRecycler,
@ -2309,13 +2381,37 @@ impl ClusterInfo {
}) })
.collect() .collect()
}); });
self.handle_packets( // Split packets based on their types.
let mut pull_requests = vec![];
let mut pull_responses = vec![];
let mut push_messages = vec![];
let mut prune_messages = vec![];
let mut ping_messages = vec![];
let mut pong_messages = vec![];
for (from_addr, packet) in packets {
match packet {
Protocol::PullRequest(filter, caller) => {
pull_requests.push((from_addr, filter, caller))
}
Protocol::PullResponse(from, data) => pull_responses.push((from, data)),
Protocol::PushMessage(from, data) => push_messages.push((from, data)),
Protocol::PruneMessage(from, data) => prune_messages.push((from, data)),
Protocol::PingMessage(ping) => ping_messages.push((from_addr, ping)),
Protocol::PongMessage(pong) => pong_messages.push((from_addr, pong)),
}
}
self.handle_batch_ping_messages(ping_messages, recycler, response_sender);
self.handle_batch_prune_messages(prune_messages);
self.handle_batch_push_messages(push_messages, recycler, &stakes, response_sender);
self.handle_batch_pull_responses(pull_responses, thread_pool, &stakes, epoch_time_ms);
self.handle_batch_pong_messages(pong_messages, Instant::now());
self.handle_batch_pull_requests(
pull_requests,
thread_pool,
recycler, recycler,
&stakes, &stakes,
packets,
response_sender, response_sender,
feature_set, feature_set,
epoch_time_ms,
); );
self.stats self.stats
.process_gossip_packets_time .process_gossip_packets_time
@ -2878,7 +2974,6 @@ mod tests {
use super::*; use super::*;
use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote}; use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote};
use itertools::izip; use itertools::izip;
use rayon::prelude::*;
use solana_perf::test_tx::test_tx; use solana_perf::test_tx::test_tx;
use solana_sdk::signature::{Keypair, Signer}; use solana_sdk::signature::{Keypair, Signer};
use solana_vote_program::{vote_instruction, vote_state::Vote}; use solana_vote_program::{vote_instruction, vote_state::Vote};
@ -2966,13 +3061,13 @@ mod tests {
}) })
.collect() .collect()
}; };
let pongs: Vec<(Pong, SocketAddr)> = pings let pongs: Vec<(SocketAddr, Pong)> = pings
.iter() .iter()
.zip(&remote_nodes) .zip(&remote_nodes)
.map(|(ping, (keypair, socket))| (Pong::new(ping, keypair).unwrap(), *socket)) .map(|(ping, (keypair, socket))| (*socket, Pong::new(ping, keypair).unwrap()))
.collect(); .collect();
let now = now + Duration::from_millis(1); let now = now + Duration::from_millis(1);
cluster_info.handle_pong_messages(pongs, now); cluster_info.handle_batch_pong_messages(pongs, now);
// Assert that remote nodes now pass the ping/pong check. // Assert that remote nodes now pass the ping/pong check.
{ {
let mut ping_cache = cluster_info.ping_cache.write().unwrap(); let mut ping_cache = cluster_info.ping_cache.write().unwrap();
@ -3015,9 +3110,10 @@ mod tests {
let recycler = PacketsRecycler::default(); let recycler = PacketsRecycler::default();
let packets = cluster_info let packets = cluster_info
.handle_ping_messages( .handle_ping_messages(
pings remote_nodes
.into_iter() .iter()
.zip(remote_nodes.iter().map(|(_, socket)| *socket)), .map(|(_, socket)| *socket)
.zip(pings.into_iter()),
&recycler, &recycler,
) )
.unwrap() .unwrap()

View File

@ -288,12 +288,11 @@ impl CrdsGossipPush {
/// add the `from` to the peer's filter of nodes /// add the `from` to the peer's filter of nodes
pub fn process_prune_msg(&mut self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) { pub fn process_prune_msg(&mut self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) {
if let Some(peer) = self.active_set.get_mut(peer) {
for origin in origins { for origin in origins {
if origin == self_pubkey { if origin != self_pubkey {
continue; peer.add(origin);
} }
if let Some(p) = self.active_set.get_mut(peer) {
p.add(origin)
} }
} }
} }