Revert "Fix gossip messages growing beyond blob size (#5460)" (#5512)

This reverts commit a8eb0409b7.
This commit is contained in:
Sagar Dhawan
2019-08-13 10:29:26 -07:00
committed by Michael Vines
parent 2b219228ce
commit 97d57d168b
8 changed files with 92 additions and 301 deletions

View File

@ -17,7 +17,7 @@ use crate::blocktree::Blocktree;
use crate::contact_info::ContactInfo;
use crate::crds_gossip::CrdsGossip;
use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS};
use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
use crate::crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote};
use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE};
use crate::repair_service::RepairType;
@ -25,7 +25,7 @@ use crate::result::Result;
use crate::staking_utils;
use crate::streamer::{BlobReceiver, BlobSender};
use crate::weighted_shuffle::weighted_shuffle;
use bincode::{deserialize, serialize, serialized_size};
use bincode::{deserialize, serialize};
use core::cmp;
use itertools::Itertools;
use rand::SeedableRng;
@ -36,7 +36,8 @@ use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_err
use solana_netutil::{
bind_in_range, bind_to, find_available_port_in_range, multi_bind_in_range, PortRange,
};
use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_runtime::bloom::Bloom;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature};
use solana_sdk::timing::{duration_as_ms, timestamp};
@ -156,7 +157,7 @@ impl Signable for PruneData {
#[allow(clippy::large_enum_variant)]
enum Protocol {
/// Gossip protocol messages
PullRequest(CrdsFilter, CrdsValue),
PullRequest(Bloom<Hash>, CrdsValue),
PullResponse(Pubkey, Vec<CrdsValue>),
PushMessage(Pubkey, Vec<CrdsValue>),
PruneMessage(Pubkey, PruneData),
@ -831,7 +832,7 @@ impl ClusterInfo {
}
}
// If the network entrypoint hasn't been discovered yet, add it to the crds table
fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>) {
fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, Bloom<Hash>, SocketAddr, CrdsValue)>) {
match &self.entrypoint {
Some(entrypoint) => {
let self_info = self
@ -840,13 +841,12 @@ impl ClusterInfo {
.lookup(&CrdsValueLabel::ContactInfo(self.id()))
.unwrap_or_else(|| panic!("self_id invalid {}", self.id()));
self.gossip
.pull
.build_crds_filters(&self.gossip.crds, Self::max_bloom_size())
.into_iter()
.for_each(|filter| {
pulls.push((entrypoint.id, filter, entrypoint.gossip, self_info.clone()))
})
pulls.push((
entrypoint.id,
self.gossip.pull.build_crds_filter(&self.gossip.crds),
entrypoint.gossip,
self_info.clone(),
))
}
None => (),
}
@ -875,45 +875,30 @@ impl ClusterInfo {
messages
}
// computes the maximum size for pull request blooms
pub fn max_bloom_size() -> usize {
let filter_size = serialized_size(&CrdsFilter::default())
.expect("unable to serialize default filter") as usize;
let protocol = Protocol::PullRequest(
CrdsFilter::default(),
CrdsValue::ContactInfo(ContactInfo::default()),
);
let protocol_size =
serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize;
PACKET_DATA_SIZE - (protocol_size - filter_size)
}
fn new_pull_requests(&mut self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
let now = timestamp();
let mut pulls: Vec<_> = self
let pulls: Vec<_> = self
.gossip
.new_pull_request(now, stakes, Self::max_bloom_size())
.new_pull_request(now, stakes)
.ok()
.into_iter()
.filter_map(|(peer, filters, me)| {
.collect();
let mut pr: Vec<_> = pulls
.into_iter()
.filter_map(|(peer, filter, self_info)| {
let peer_label = CrdsValueLabel::ContactInfo(peer);
self.gossip
.crds
.lookup(&peer_label)
.and_then(CrdsValue::contact_info)
.map(move |peer_info| {
filters
.into_iter()
.map(move |f| (peer, f, peer_info.gossip, me.clone()))
})
.map(|peer_info| (peer, filter, peer_info.gossip, self_info))
})
.flatten()
.collect();
if pulls.is_empty() {
self.add_entrypoint(&mut pulls);
if pr.is_empty() {
self.add_entrypoint(&mut pr);
}
pulls
.into_iter()
pr.into_iter()
.map(|(peer, filter, gossip, self_info)| {
self.gossip.mark_pull_request_creation_time(&peer, now);
(gossip, Protocol::PullRequest(filter, self_info))
@ -1108,7 +1093,7 @@ impl ClusterInfo {
fn handle_pull_request(
me: &Arc<RwLock<Self>>,
filter: CrdsFilter,
filter: Bloom<Hash>,
caller: CrdsValue,
from_addr: &SocketAddr,
) -> Vec<SharedBlob> {
@ -2078,7 +2063,7 @@ mod tests {
let (_, _, val) = cluster_info
.gossip
.new_pull_request(timestamp(), &HashMap::new(), ClusterInfo::max_bloom_size())
.new_pull_request(timestamp(), &HashMap::new())
.ok()
.unwrap();
assert!(val.verify());
@ -2266,7 +2251,7 @@ mod tests {
let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp());
cluster_info.set_entrypoint(entrypoint.clone());
let pulls = cluster_info.new_pull_requests(&HashMap::new());
assert_eq!(1, pulls.len() as u64);
assert_eq!(1, pulls.len());
match pulls.get(0) {
Some((addr, msg)) => {
assert_eq!(*addr, entrypoint.gossip);
@ -2293,7 +2278,7 @@ mod tests {
.write()
.unwrap()
.new_pull_requests(&HashMap::new());
assert_eq!(1, pulls.len() as u64);
assert_eq!(1, pulls.len());
assert_eq!(cluster_info.read().unwrap().entrypoint, Some(entrypoint));
}