Filter push/pulls from spies (#11620)

* Filter push/pulls from spies

* Don't pull from peers with shred version == 0, don't push to people with shred_version == 0

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin
2020-08-18 18:52:45 -07:00
committed by GitHub
parent 46830124f8
commit 0f0a2ddafe
3 changed files with 85 additions and 40 deletions

View File

@@ -491,6 +491,21 @@ impl ClusterInfo {
.map(map)
}
pub fn lookup_contact_info_by_gossip_addr(
&self,
gossip_addr: &SocketAddr,
) -> Option<ContactInfo> {
for versioned_value in self.gossip.read().unwrap().crds.table.values() {
if let Some(contact_info) = CrdsValue::contact_info(&versioned_value.value) {
if contact_info.gossip == *gossip_addr {
return Some(contact_info.clone());
}
}
}
None
}
pub fn my_contact_info(&self) -> ContactInfo {
self.my_contact_info.read().unwrap().clone()
}
@@ -1446,11 +1461,13 @@ impl ClusterInfo {
fn handle_adopt_shred_version(self: &Arc<Self>, adopt_shred_version: &mut bool) {
// Adopt the entrypoint's `shred_version` if ours is unset
if *adopt_shred_version {
// If gossip was given an entrypoint, lookup its id
let entrypoint_id = self.entrypoint.read().unwrap().as_ref().map(|e| e.id);
if let Some(entrypoint_id) = entrypoint_id {
// If gossip was given an entrypoint, look up the ContactInfo by the given
// entrypoint gossip adddress
let gossip_addr = self.entrypoint.read().unwrap().as_ref().map(|e| e.gossip);
if let Some(gossip_addr) = gossip_addr {
// If a pull from the entrypoint was successful, it should exist in the crds table
let entrypoint = self.lookup_contact_info(&entrypoint_id, |ci| ci.clone());
let entrypoint = self.lookup_contact_info_by_gossip_addr(&gossip_addr);
if let Some(entrypoint) = entrypoint {
if entrypoint.shred_version == 0 {
info!("Unable to adopt entrypoint's shred version");
@@ -1466,6 +1483,7 @@ impl ClusterInfo {
.unwrap()
.set_shred_version(entrypoint.shred_version);
self.insert_self();
*self.entrypoint.write().unwrap() = Some(entrypoint);
*adopt_shred_version = false;
}
}
@@ -1854,15 +1872,15 @@ impl ClusterInfo {
) -> (usize, usize, usize) {
let len = crds_values.len();
trace!("PullResponse me: {} from: {} len={}", self.id, from, len);
if let Some(shred_version) = self.lookup_contact_info(from, |ci| ci.shred_version) {
Self::filter_by_shred_version(
from,
&mut crds_values,
shred_version,
self.my_shred_version(),
);
}
let shred_version = self
.lookup_contact_info(from, |ci| ci.shred_version)
.unwrap_or(0);
Self::filter_by_shred_version(
from,
&mut crds_values,
shred_version,
self.my_shred_version(),
);
let filtered_len = crds_values.len();
let mut pull_stats = ProcessPullStats::default();
@@ -1914,7 +1932,8 @@ impl ClusterInfo {
shred_version: u16,
my_shred_version: u16,
) {
if my_shred_version != 0 && shred_version != 0 && shred_version != my_shred_version {
// Always run filter on spies
if my_shred_version != 0 && shred_version != my_shred_version {
// Allow someone to update their own ContactInfo so they
// can change shred versions if needed.
crds_values.retain(|crds_value| match &crds_value.data {
@@ -1935,14 +1954,15 @@ impl ClusterInfo {
self.stats.push_message_count.add_relaxed(1);
let len = crds_values.len();
if let Some(shred_version) = self.lookup_contact_info(from, |ci| ci.shred_version) {
Self::filter_by_shred_version(
from,
&mut crds_values,
shred_version,
self.my_shred_version(),
);
}
let shred_version = self
.lookup_contact_info(from, |ci| ci.shred_version)
.unwrap_or(0);
Self::filter_by_shred_version(
from,
&mut crds_values,
shred_version,
self.my_shred_version(),
);
let filtered_len = crds_values.len();
self.stats
.push_message_value_count
@@ -3393,4 +3413,36 @@ mod tests {
let vote = CrdsValue::new_signed(CrdsData::Vote(1, vote), &Keypair::new());
assert!(bincode::serialized_size(&vote).unwrap() <= MAX_PROTOCOL_PAYLOAD_SIZE);
}
#[test]
fn test_handle_adopt_shred_version() {
let node_keypair = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
));
// Simulating starting up with default entrypoint, no known id, only a gossip
// address
let entrypoint_gossip_addr = socketaddr!("127.0.0.2:1234");
let mut entrypoint = ContactInfo::new_localhost(&Pubkey::default(), timestamp());
entrypoint.gossip = entrypoint_gossip_addr;
assert_eq!(entrypoint.shred_version, 0);
cluster_info.set_entrypoint(entrypoint);
// Simulate getting entrypoint ContactInfo from gossip
let mut gossiped_entrypoint_info =
ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
gossiped_entrypoint_info.gossip = entrypoint_gossip_addr;
gossiped_entrypoint_info.shred_version = 1;
cluster_info.insert_info(gossiped_entrypoint_info.clone());
// Adopt the entrypoint's gossiped contact info and verify
ClusterInfo::handle_adopt_shred_version(&cluster_info, &mut true);
assert_eq!(
cluster_info.entrypoint.read().unwrap().as_ref().unwrap(),
&gossiped_entrypoint_info
);
assert_eq!(cluster_info.my_shred_version(), 1);
}
}