adds more parallel processing to gossip packets handling (#12988) (#13282)

(cherry picked from commit 3738611f5c)

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2020-10-29 16:47:37 +00:00
committed by GitHub
parent 38a99c0c25
commit 9922f09a1d
2 changed files with 226 additions and 131 deletions

View File

@ -36,13 +36,12 @@ use solana_sdk::sanitize::{Sanitize, SanitizeError};
use bincode::{serialize, serialized_size};
use core::cmp;
use itertools::Itertools;
use rayon::iter::IntoParallelIterator;
use rayon::iter::ParallelIterator;
use rayon::prelude::*;
use rayon::{ThreadPool, ThreadPoolBuilder};
use solana_ledger::staking_utils;
use solana_measure::measure::Measure;
use solana_measure::thread_mem_usage;
use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error};
use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
use solana_net_utils::{
bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range,
multi_bind_in_range, PortRange,
@ -67,7 +66,7 @@ use solana_streamer::streamer::{PacketReceiver, PacketSender};
use std::{
borrow::Cow,
cmp::min,
collections::{HashMap, HashSet},
collections::{hash_map::Entry, HashMap, HashSet},
fmt,
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
ops::{Deref, DerefMut},
@ -1806,122 +1805,103 @@ impl ClusterInfo {
.unwrap()
}
#[allow(clippy::cognitive_complexity)]
fn handle_packets(
&self,
recycler: &PacketsRecycler,
stakes: &HashMap<Pubkey, u64>,
packets: Vec<(SocketAddr, Protocol)>,
response_sender: &PacketSender,
feature_set: Option<&FeatureSet>,
epoch_time_ms: u64,
) {
// iter over the packets, collect pulls separately and process everything else
let allocated = thread_mem_usage::Allocatedp::default();
let mut gossip_pull_data: Vec<PullData> = vec![];
let timeouts = self
.gossip
.read()
.unwrap()
.make_timeouts(&stakes, epoch_time_ms);
let mut ping_messages = vec![];
let mut pong_messages = vec![];
let mut pull_responses = HashMap::new();
for (from_addr, packet) in packets {
match packet {
Protocol::PullRequest(filter, caller) => {
let start = allocated.get();
if let Some(contact_info) = caller.contact_info() {
if contact_info.id == self.id() {
warn!("PullRequest ignored, I'm talking to myself");
inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
} else if contact_info.shred_version == 0
|| contact_info.shred_version == self.my_shred_version()
|| self.my_shred_version() == 0
{
gossip_pull_data.push(PullData {
from_addr,
caller,
filter,
});
} else {
self.stats.skip_pull_shred_version.add_relaxed(1);
}
fn handle_batch_prune_messages(&self, messages: Vec<(Pubkey, PruneData)>) {
if messages.is_empty() {
return;
}
self.stats
.prune_message_count
.add_relaxed(messages.len() as u64);
self.stats.prune_message_len.add_relaxed(
messages
.iter()
.map(|(_, data)| data.prunes.len() as u64)
.sum(),
);
let mut prune_message_timeout = 0;
let mut bad_prune_destination = 0;
{
let mut gossip =
self.time_gossip_write_lock("process_prune", &self.stats.process_prune);
let now = timestamp();
for (from, data) in messages {
match gossip.process_prune_msg(
&from,
&data.destination,
&data.prunes,
data.wallclock,
now,
) {
Err(CrdsGossipError::PruneMessageTimeout) => {
prune_message_timeout += 1;
}
datapoint_debug!(
"solana-gossip-listen-memory",
("pull_request", (allocated.get() - start) as i64, i64),
);
}
Protocol::PullResponse(from, data) => {
let start = allocated.get();
let pull_entry = pull_responses.entry(from).or_insert_with(Vec::new);
pull_entry.extend(data);
datapoint_debug!(
"solana-gossip-listen-memory",
("pull_response", (allocated.get() - start) as i64, i64),
);
}
Protocol::PushMessage(from, data) => {
let start = allocated.get();
let rsp = self.handle_push_message(recycler, &from, data, stakes);
if let Some(rsp) = rsp {
let _ignore_disconnect = response_sender.send(rsp);
Err(CrdsGossipError::BadPruneDestination) => {
bad_prune_destination += 1;
}
datapoint_debug!(
"solana-gossip-listen-memory",
("push_message", (allocated.get() - start) as i64, i64),
);
_ => (),
}
Protocol::PruneMessage(from, data) => {
let start = allocated.get();
self.stats.prune_message_count.add_relaxed(1);
self.stats
.prune_message_len
.add_relaxed(data.prunes.len() as u64);
match self
.time_gossip_write_lock("process_prune", &self.stats.process_prune)
.process_prune_msg(
&from,
&data.destination,
&data.prunes,
data.wallclock,
timestamp(),
) {
Err(CrdsGossipError::PruneMessageTimeout) => {
inc_new_counter_debug!("cluster_info-prune_message_timeout", 1)
}
Err(CrdsGossipError::BadPruneDestination) => {
inc_new_counter_debug!("cluster_info-bad_prune_destination", 1)
}
_ => (),
}
datapoint_debug!(
"solana-gossip-listen-memory",
("prune_message", (allocated.get() - start) as i64, i64),
);
}
Protocol::PingMessage(ping) => ping_messages.push((ping, from_addr)),
Protocol::PongMessage(pong) => pong_messages.push((pong, from_addr)),
}
}
if let Some(response) = self.handle_ping_messages(ping_messages, recycler) {
let _ = response_sender.send(response);
if prune_message_timeout != 0 {
inc_new_counter_debug!("cluster_info-prune_message_timeout", prune_message_timeout);
}
self.handle_pong_messages(pong_messages, Instant::now());
for (from, data) in pull_responses {
self.handle_pull_response(&from, data, &timeouts);
if bad_prune_destination != 0 {
inc_new_counter_debug!("cluster_info-bad_prune_destination", bad_prune_destination);
}
}
// process the collected pulls together
if !gossip_pull_data.is_empty() {
fn handle_batch_pull_requests(
&self,
// from address, crds filter, caller contact info
requests: Vec<(SocketAddr, CrdsFilter, CrdsValue)>,
thread_pool: &ThreadPool,
recycler: &PacketsRecycler,
stakes: &HashMap<Pubkey, u64>,
response_sender: &PacketSender,
feature_set: Option<&FeatureSet>,
) {
if requests.is_empty() {
return;
}
let self_pubkey = self.id();
let self_shred_version = self.my_shred_version();
let requests: Vec<_> = thread_pool.install(|| {
requests
.into_par_iter()
.with_min_len(1024)
.filter(|(_, _, caller)| match caller.contact_info() {
None => false,
Some(caller) if caller.id == self_pubkey => {
warn!("PullRequest ignored, I'm talking to myself");
inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
false
}
Some(caller) => {
if self_shred_version != 0
&& caller.shred_version != 0
&& caller.shred_version != self_shred_version
{
self.stats.skip_pull_shred_version.add_relaxed(1);
false
} else {
true
}
}
})
.map(|(from_addr, filter, caller)| PullData {
from_addr,
caller,
filter,
})
.collect()
});
if !requests.is_empty() {
self.stats
.pull_requests_count
.add_relaxed(gossip_pull_data.len() as u64);
let rsp = self.handle_pull_requests(recycler, gossip_pull_data, stakes, feature_set);
if !rsp.is_empty() {
let _ignore_disconnect = response_sender.send(rsp);
.add_relaxed(requests.len() as u64);
let response = self.handle_pull_requests(recycler, requests, stakes, feature_set);
if !response.is_empty() {
let _ = response_sender.send(response);
}
}
}
@ -2093,6 +2073,70 @@ impl ClusterInfo {
packets
}
fn handle_batch_pull_responses(
&self,
responses: Vec<(Pubkey, Vec<CrdsValue>)>,
thread_pool: &ThreadPool,
stakes: &HashMap<Pubkey, u64>,
epoch_time_ms: u64,
) {
if responses.is_empty() {
return;
}
fn extend<K, V>(hash_map: &mut HashMap<K, Vec<V>>, (key, mut value): (K, Vec<V>))
where
K: Eq + std::hash::Hash,
{
match hash_map.entry(key) {
Entry::Occupied(mut entry) => {
let entry_value = entry.get_mut();
if entry_value.len() < value.len() {
std::mem::swap(entry_value, &mut value);
}
entry_value.extend(value);
}
Entry::Vacant(entry) => {
entry.insert(value);
}
}
}
fn merge<K, V>(
mut hash_map: HashMap<K, Vec<V>>,
other: HashMap<K, Vec<V>>,
) -> HashMap<K, Vec<V>>
where
K: Eq + std::hash::Hash,
{
if hash_map.len() < other.len() {
return merge(other, hash_map);
}
for kv in other {
extend(&mut hash_map, kv);
}
hash_map
}
let responses = thread_pool.install(|| {
responses
.into_par_iter()
.with_min_len(1024)
.fold(HashMap::new, |mut hash_map, kv| {
extend(&mut hash_map, kv);
hash_map
})
.reduce(HashMap::new, merge)
});
if !responses.is_empty() {
let timeouts = self
.gossip
.read()
.unwrap()
.make_timeouts(&stakes, epoch_time_ms);
for (from, data) in responses {
self.handle_pull_response(&from, data, &timeouts);
}
}
}
// Returns (failed, timeout, success)
fn handle_pull_response(
&self,
@ -2177,13 +2221,26 @@ impl ClusterInfo {
}
}
fn handle_batch_ping_messages<I>(
&self,
pings: I,
recycler: &PacketsRecycler,
response_sender: &PacketSender,
) where
I: IntoIterator<Item = (SocketAddr, Ping)>,
{
if let Some(response) = self.handle_ping_messages(pings, recycler) {
let _ = response_sender.send(response);
}
}
fn handle_ping_messages<I>(&self, pings: I, recycler: &PacketsRecycler) -> Option<Packets>
where
I: IntoIterator<Item = (Ping, SocketAddr)>,
I: IntoIterator<Item = (SocketAddr, Ping)>,
{
let packets: Vec<_> = pings
.into_iter()
.filter_map(|(ping, addr)| {
.filter_map(|(addr, ping)| {
let pong = Pong::new(&ping, &self.keypair).ok()?;
let pong = Protocol::PongMessage(pong);
let packet = Packet::from_data(&addr, pong);
@ -2199,19 +2256,34 @@ impl ClusterInfo {
}
}
fn handle_pong_messages<I>(&self, pongs: I, now: Instant)
fn handle_batch_pong_messages<I>(&self, pongs: I, now: Instant)
where
I: IntoIterator<Item = (Pong, SocketAddr)>,
I: IntoIterator<Item = (SocketAddr, Pong)>,
{
let mut pongs = pongs.into_iter().peekable();
if pongs.peek().is_some() {
let mut ping_cache = self.ping_cache.write().unwrap();
for (pong, addr) in pongs {
for (addr, pong) in pongs {
ping_cache.add(&pong, addr, now);
}
}
}
fn handle_batch_push_messages(
&self,
messages: Vec<(Pubkey, Vec<CrdsValue>)>,
recycler: &PacketsRecycler,
stakes: &HashMap<Pubkey, u64>,
response_sender: &PacketSender,
) {
for (from, data) in messages {
let response = self.handle_push_message(recycler, &from, data, stakes);
if let Some(response) = response {
let _ = response_sender.send(response);
}
}
}
fn handle_push_message(
&self,
recycler: &PacketsRecycler,
@ -2337,13 +2409,37 @@ impl ClusterInfo {
})
.collect()
});
self.handle_packets(
// Split packets based on their types.
let mut pull_requests = vec![];
let mut pull_responses = vec![];
let mut push_messages = vec![];
let mut prune_messages = vec![];
let mut ping_messages = vec![];
let mut pong_messages = vec![];
for (from_addr, packet) in packets {
match packet {
Protocol::PullRequest(filter, caller) => {
pull_requests.push((from_addr, filter, caller))
}
Protocol::PullResponse(from, data) => pull_responses.push((from, data)),
Protocol::PushMessage(from, data) => push_messages.push((from, data)),
Protocol::PruneMessage(from, data) => prune_messages.push((from, data)),
Protocol::PingMessage(ping) => ping_messages.push((from_addr, ping)),
Protocol::PongMessage(pong) => pong_messages.push((from_addr, pong)),
}
}
self.handle_batch_ping_messages(ping_messages, recycler, response_sender);
self.handle_batch_prune_messages(prune_messages);
self.handle_batch_push_messages(push_messages, recycler, &stakes, response_sender);
self.handle_batch_pull_responses(pull_responses, thread_pool, &stakes, epoch_time_ms);
self.handle_batch_pong_messages(pong_messages, Instant::now());
self.handle_batch_pull_requests(
pull_requests,
thread_pool,
recycler,
&stakes,
packets,
response_sender,
feature_set,
epoch_time_ms,
);
self.stats
.process_gossip_packets_time
@ -2906,7 +3002,6 @@ mod tests {
use super::*;
use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote};
use itertools::izip;
use rayon::prelude::*;
use solana_perf::test_tx::test_tx;
use solana_sdk::signature::{Keypair, Signer};
use solana_vote_program::{vote_instruction, vote_state::Vote};
@ -2994,13 +3089,13 @@ mod tests {
})
.collect()
};
let pongs: Vec<(Pong, SocketAddr)> = pings
let pongs: Vec<(SocketAddr, Pong)> = pings
.iter()
.zip(&remote_nodes)
.map(|(ping, (keypair, socket))| (Pong::new(ping, keypair).unwrap(), *socket))
.map(|(ping, (keypair, socket))| (*socket, Pong::new(ping, keypair).unwrap()))
.collect();
let now = now + Duration::from_millis(1);
cluster_info.handle_pong_messages(pongs, now);
cluster_info.handle_batch_pong_messages(pongs, now);
// Assert that remote nodes now pass the ping/pong check.
{
let mut ping_cache = cluster_info.ping_cache.write().unwrap();
@ -3043,9 +3138,10 @@ mod tests {
let recycler = PacketsRecycler::default();
let packets = cluster_info
.handle_ping_messages(
pings
.into_iter()
.zip(remote_nodes.iter().map(|(_, socket)| *socket)),
remote_nodes
.iter()
.map(|(_, socket)| *socket)
.zip(pings.into_iter()),
&recycler,
)
.unwrap()

View File

@ -288,12 +288,11 @@ impl CrdsGossipPush {
/// add the `from` to the peer's filter of nodes
pub fn process_prune_msg(&mut self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) {
for origin in origins {
if origin == self_pubkey {
continue;
}
if let Some(p) = self.active_set.get_mut(peer) {
p.add(origin)
if let Some(peer) = self.active_set.get_mut(peer) {
for origin in origins {
if origin != self_pubkey {
peer.add(origin);
}
}
}
}