Restore blob size fix (#5516)

* Revert "Revert "Fix gossip messages growing beyond blob size  (#5460)" (#5512)"

This reverts commit 97d57d168b.

* Fix Crds filters
This commit is contained in:
Sagar Dhawan
2019-08-13 18:04:14 -07:00
committed by GitHub
parent cd14a940d8
commit 1d0608200c
8 changed files with 351 additions and 100 deletions

View File

@@ -17,7 +17,7 @@ use crate::blocktree::Blocktree;
use crate::contact_info::ContactInfo;
use crate::crds_gossip::CrdsGossip;
use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
use crate::crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS};
use crate::crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote};
use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE};
use crate::repair_service::RepairType;
@@ -25,7 +25,7 @@ use crate::result::Result;
use crate::staking_utils;
use crate::streamer::{BlobReceiver, BlobSender};
use crate::weighted_shuffle::weighted_shuffle;
use bincode::{deserialize, serialize};
use bincode::{deserialize, serialize, serialized_size};
use core::cmp;
use itertools::Itertools;
use rand::SeedableRng;
@@ -36,8 +36,7 @@ use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_err
use solana_netutil::{
bind_in_range, bind_to, find_available_port_in_range, multi_bind_in_range, PortRange,
};
use solana_runtime::bloom::Bloom;
use solana_sdk::hash::Hash;
use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature};
use solana_sdk::timing::{duration_as_ms, timestamp};
@@ -63,8 +62,8 @@ pub const GOSSIP_SLEEP_MILLIS: u64 = 100;
/// the number of slots to respond with when responding to `Orphan` requests
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
/// Allow protocol messages to carry only 1KB of data a time
const TARGET_PROTOCOL_PAYLOAD_SIZE: u64 = 1024;
/// The maximum size of a protocol payload
const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64;
#[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError {
@@ -157,7 +156,7 @@ impl Signable for PruneData {
#[allow(clippy::large_enum_variant)]
enum Protocol {
/// Gossip protocol messages
PullRequest(Bloom<Hash>, CrdsValue),
PullRequest(CrdsFilter, CrdsValue),
PullResponse(Pubkey, Vec<CrdsValue>),
PushMessage(Pubkey, Vec<CrdsValue>),
PruneMessage(Pubkey, PruneData),
@@ -832,7 +831,7 @@ impl ClusterInfo {
}
}
// If the network entrypoint hasn't been discovered yet, add it to the crds table
fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, Bloom<Hash>, SocketAddr, CrdsValue)>) {
fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>) {
match &self.entrypoint {
Some(entrypoint) => {
let self_info = self
@@ -841,12 +840,13 @@ impl ClusterInfo {
.lookup(&CrdsValueLabel::ContactInfo(self.id()))
.unwrap_or_else(|| panic!("self_id invalid {}", self.id()));
pulls.push((
entrypoint.id,
self.gossip.pull.build_crds_filter(&self.gossip.crds),
entrypoint.gossip,
self_info.clone(),
))
self.gossip
.pull
.build_crds_filters(&self.gossip.crds, Self::max_bloom_size())
.into_iter()
.for_each(|filter| {
pulls.push((entrypoint.id, filter, entrypoint.gossip, self_info.clone()))
})
}
None => (),
}
@@ -862,43 +862,65 @@ impl ClusterInfo {
let mut size = 0;
let mut payload = vec![];
while let Some(msg) = msgs.pop() {
// always put at least one msg. The PROTOCOL_PAYLOAD_SIZE is not a hard limit
let msg_size = msg.size();
size += msg_size;
payload.push(msg);
if size > TARGET_PROTOCOL_PAYLOAD_SIZE {
if size + msg_size > MAX_PROTOCOL_PAYLOAD_SIZE as u64 {
if msg_size < MAX_PROTOCOL_PAYLOAD_SIZE as u64 {
msgs.push(msg);
} else {
warn!(
"dropping message larger than the maximum payload size {:?}",
msg
);
}
break;
}
size += msg_size;
payload.push(msg);
}
messages.push(payload);
}
messages
}
// computes the maximum size for pull request blooms
pub fn max_bloom_size() -> usize {
let filter_size = serialized_size(&CrdsFilter::default())
.expect("unable to serialize default filter") as usize;
let protocol = Protocol::PullRequest(
CrdsFilter::default(),
CrdsValue::ContactInfo(ContactInfo::default()),
);
let protocol_size =
serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize;
PACKET_DATA_SIZE - (protocol_size - filter_size)
}
fn new_pull_requests(&mut self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
let now = timestamp();
let pulls: Vec<_> = self
let mut pulls: Vec<_> = self
.gossip
.new_pull_request(now, stakes)
.new_pull_request(now, stakes, Self::max_bloom_size())
.ok()
.into_iter()
.collect();
let mut pr: Vec<_> = pulls
.into_iter()
.filter_map(|(peer, filter, self_info)| {
.filter_map(|(peer, filters, me)| {
let peer_label = CrdsValueLabel::ContactInfo(peer);
self.gossip
.crds
.lookup(&peer_label)
.and_then(CrdsValue::contact_info)
.map(|peer_info| (peer, filter, peer_info.gossip, self_info))
.map(move |peer_info| {
filters
.into_iter()
.map(move |f| (peer, f, peer_info.gossip, me.clone()))
})
})
.flatten()
.collect();
if pr.is_empty() {
self.add_entrypoint(&mut pr);
if pulls.is_empty() {
self.add_entrypoint(&mut pulls);
}
pr.into_iter()
pulls
.into_iter()
.map(|(peer, filter, gossip, self_info)| {
self.gossip.mark_pull_request_creation_time(&peer, now);
(gossip, Protocol::PullRequest(filter, self_info))
@@ -1093,7 +1115,7 @@ impl ClusterInfo {
fn handle_pull_request(
me: &Arc<RwLock<Self>>,
filter: Bloom<Hash>,
filter: CrdsFilter,
caller: CrdsValue,
from_addr: &SocketAddr,
) -> Vec<SharedBlob> {
@@ -2063,7 +2085,7 @@ mod tests {
let (_, _, val) = cluster_info
.gossip
.new_pull_request(timestamp(), &HashMap::new())
.new_pull_request(timestamp(), &HashMap::new(), ClusterInfo::max_bloom_size())
.ok()
.unwrap();
assert!(val.verify());
@@ -2251,7 +2273,7 @@ mod tests {
let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp());
cluster_info.set_entrypoint(entrypoint.clone());
let pulls = cluster_info.new_pull_requests(&HashMap::new());
assert_eq!(1, pulls.len());
assert_eq!(1, pulls.len() as u64);
match pulls.get(0) {
Some((addr, msg)) => {
assert_eq!(*addr, entrypoint.gossip);
@@ -2278,7 +2300,7 @@ mod tests {
.write()
.unwrap()
.new_pull_requests(&HashMap::new());
assert_eq!(1, pulls.len());
assert_eq!(1, pulls.len() as u64);
assert_eq!(cluster_info.read().unwrap().entrypoint, Some(entrypoint));
}
@@ -2307,7 +2329,7 @@ mod tests {
fn test_split_messages(value: CrdsValue) {
const NUM_VALUES: usize = 30;
let value_size = value.size();
let expected_len = NUM_VALUES / (TARGET_PROTOCOL_PAYLOAD_SIZE / value_size).max(1) as usize;
let expected_len = NUM_VALUES / (MAX_PROTOCOL_PAYLOAD_SIZE / value_size).max(1) as usize;
let msgs = vec![value; NUM_VALUES];
let split = ClusterInfo::split_gossip_messages(msgs);