Limit the size of gossip push and gossip pull response (#5348)

* Limit the size of gossip push and gossip pull response

* Remove Default::default

* Rename var
This commit is contained in:
Sagar Dhawan
2019-07-30 15:43:17 -07:00
committed by GitHub
parent a7a10e12c7
commit d7a2b790dc
2 changed files with 117 additions and 40 deletions

View File

@ -63,6 +63,9 @@ pub const GOSSIP_SLEEP_MILLIS: u64 = 100;
/// the number of slots to respond with when responding to `Orphan` requests /// the number of slots to respond with when responding to `Orphan` requests
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10; pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
/// Allow protocol messages to carry only 1KB of data a time
const TARGET_PROTOCOL_PAYLOAD_SIZE: u64 = 1024;
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError { pub enum ClusterInfoError {
NoPeers, NoPeers,
@ -849,6 +852,29 @@ impl ClusterInfo {
} }
} }
/// Splits a Vec of CrdsValues into a nested Vec, trying to make sure that
/// each Vec is no larger than `PROTOCOL_PAYLOAD_SIZE`
/// Note: some messages cannot be contained within that size so in the worst case this returns
/// N nested Vecs with 1 item each.
fn split_gossip_messages(mut msgs: Vec<CrdsValue>) -> Vec<Vec<CrdsValue>> {
let mut messages = vec![];
while !msgs.is_empty() {
let mut size = 0;
let mut payload = vec![];
while let Some(msg) = msgs.pop() {
// always put at least one msg. The PROTOCOL_PAYLOAD_SIZE is not a hard limit
let msg_size = msg.size();
size += msg_size;
payload.push(msg);
if size > TARGET_PROTOCOL_PAYLOAD_SIZE {
break;
}
}
messages.push(payload);
}
messages
}
fn new_pull_requests(&mut self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> { fn new_pull_requests(&mut self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
let now = timestamp(); let now = timestamp();
let pulls: Vec<_> = self let pulls: Vec<_> = self
@ -892,7 +918,12 @@ impl ClusterInfo {
.and_then(CrdsValue::contact_info) .and_then(CrdsValue::contact_info)
.map(|p| (p.gossip, messages)) .map(|p| (p.gossip, messages))
}) })
.map(|(peer, msgs)| (peer, Protocol::PushMessage(self_id, msgs))) .map(|(peer, msgs)| {
Self::split_gossip_messages(msgs)
.into_iter()
.map(move |payload| (peer, Protocol::PushMessage(self_id, payload)))
})
.flatten()
.collect() .collect()
} }
@ -1088,11 +1119,18 @@ impl ClusterInfo {
.process_pull_request(caller, filter, now); .process_pull_request(caller, filter, now);
let len = data.len(); let len = data.len();
trace!("get updates since response {}", len); trace!("get updates since response {}", len);
let rsp = Protocol::PullResponse(self_id, data); let responses: Vec<_> = Self::split_gossip_messages(data)
.into_iter()
.map(move |payload| Protocol::PullResponse(self_id, payload))
.collect();
// The remote node may not know its public IP:PORT. Instead of responding to the caller's // The remote node may not know its public IP:PORT. Instead of responding to the caller's
// gossip addr, respond to the origin addr. // gossip addr, respond to the origin addr.
inc_new_counter_debug!("cluster_info-pull_request-rsp", len); inc_new_counter_debug!("cluster_info-pull_request-rsp", len);
to_shared_blob(rsp, *from_addr).ok().into_iter().collect() responses
.into_iter()
.map(|rsp| to_shared_blob(rsp, *from_addr).ok().into_iter())
.flatten()
.collect()
} }
fn handle_pull_response(me: &Arc<RwLock<Self>>, from: &Pubkey, data: Vec<CrdsValue>) { fn handle_pull_response(me: &Arc<RwLock<Self>>, from: &Pubkey, data: Vec<CrdsValue>) {
@ -2201,45 +2239,79 @@ mod tests {
assert_eq!(votes, vec![]); assert_eq!(votes, vec![]);
assert_eq!(max_ts, new_max_ts); assert_eq!(max_ts, new_max_ts);
} }
}
#[test] #[test]
fn test_add_entrypoint() { fn test_add_entrypoint() {
let node_keypair = Arc::new(Keypair::new()); let node_keypair = Arc::new(Keypair::new());
let mut cluster_info = ClusterInfo::new( let mut cluster_info = ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair, node_keypair,
); );
let entrypoint_pubkey = Pubkey::new_rand(); let entrypoint_pubkey = Pubkey::new_rand();
let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp());
cluster_info.set_entrypoint(entrypoint.clone()); cluster_info.set_entrypoint(entrypoint.clone());
let pulls = cluster_info.new_pull_requests(&HashMap::new()); let pulls = cluster_info.new_pull_requests(&HashMap::new());
assert_eq!(1, pulls.len()); assert_eq!(1, pulls.len());
match pulls.get(0) { match pulls.get(0) {
Some((addr, msg)) => { Some((addr, msg)) => {
assert_eq!(*addr, entrypoint.gossip); assert_eq!(*addr, entrypoint.gossip);
match msg { match msg {
Protocol::PullRequest(_, value) => { Protocol::PullRequest(_, value) => {
assert!(value.verify()); assert!(value.verify());
assert_eq!(value.pubkey(), cluster_info.id()) assert_eq!(value.pubkey(), cluster_info.id())
}
_ => panic!("wrong protocol"),
} }
_ => panic!("wrong protocol"),
} }
None => panic!("entrypoint should be a pull destination"),
} }
None => panic!("entrypoint should be a pull destination"),
// now add this message back to the table and make sure after the next pull, the entrypoint is unset
let entrypoint_crdsvalue = CrdsValue::ContactInfo(entrypoint.clone());
let cluster_info = Arc::new(RwLock::new(cluster_info));
ClusterInfo::handle_pull_response(
&cluster_info,
&entrypoint_pubkey,
vec![entrypoint_crdsvalue],
);
let pulls = cluster_info
.write()
.unwrap()
.new_pull_requests(&HashMap::new());
assert_eq!(1, pulls.len());
assert_eq!(cluster_info.read().unwrap().entrypoint, Some(entrypoint));
}
#[test]
fn test_split_messages_small() {
let value = CrdsValue::ContactInfo(ContactInfo::default());
test_split_messages(value);
}
#[test]
fn test_split_messages_large() {
let mut btree_slots = BTreeSet::new();
for i in 0..128 {
btree_slots.insert(i);
}
let value = CrdsValue::EpochSlots(EpochSlots {
from: Pubkey::default(),
root: 0,
slots: btree_slots,
signature: Signature::default(),
wallclock: 0,
});
test_split_messages(value);
}
fn test_split_messages(value: CrdsValue) {
const NUM_VALUES: usize = 30;
let value_size = value.size();
let expected_len = NUM_VALUES / (TARGET_PROTOCOL_PAYLOAD_SIZE / value_size).max(1) as usize;
let msgs = vec![value; NUM_VALUES];
let split = ClusterInfo::split_gossip_messages(msgs);
assert!(split.len() <= expected_len);
} }
// now add this message back to the table and make sure after the next pull, the entrypoint is unset
let entrypoint_crdsvalue = CrdsValue::ContactInfo(entrypoint.clone());
let cluster_info = Arc::new(RwLock::new(cluster_info));
ClusterInfo::handle_pull_response(
&cluster_info,
&entrypoint_pubkey,
vec![entrypoint_crdsvalue],
);
let pulls = cluster_info
.write()
.unwrap()
.new_pull_requests(&HashMap::new());
assert_eq!(1, pulls.len());
assert_eq!(cluster_info.read().unwrap().entrypoint, Some(entrypoint));
} }

View File

@ -1,5 +1,5 @@
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use bincode::serialize; use bincode::{serialize, serialized_size};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, Signable, Signature}; use solana_sdk::signature::{Keypair, Signable, Signature};
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
@ -189,6 +189,11 @@ impl CrdsValue {
CrdsValueLabel::EpochSlots(*key), CrdsValueLabel::EpochSlots(*key),
] ]
} }
/// Returns the size (in bytes) of a CrdsValue
pub fn size(&self) -> u64 {
serialized_size(&self).expect("unable to serialize contact info")
}
} }
impl Signable for CrdsValue { impl Signable for CrdsValue {