Filter push/pulls from spies (#11620) (#11703)

* Filter push/pulls from spies

* Don't pull from peers with shred version == 0, don't push to people with shred_version == 0

Co-authored-by: Carl <carl@solana.com>
(cherry picked from commit 0f0a2ddafe)

Co-authored-by: carllin <wumu727@gmail.com>
This commit is contained in:
mergify[bot]
2020-08-19 03:20:43 +00:00
committed by GitHub
parent 04eb35e679
commit 2c642d4639
3 changed files with 85 additions and 40 deletions

View File

@ -491,6 +491,21 @@ impl ClusterInfo {
.map(map) .map(map)
} }
pub fn lookup_contact_info_by_gossip_addr(
&self,
gossip_addr: &SocketAddr,
) -> Option<ContactInfo> {
for versioned_value in self.gossip.read().unwrap().crds.table.values() {
if let Some(contact_info) = CrdsValue::contact_info(&versioned_value.value) {
if contact_info.gossip == *gossip_addr {
return Some(contact_info.clone());
}
}
}
None
}
pub fn my_contact_info(&self) -> ContactInfo { pub fn my_contact_info(&self) -> ContactInfo {
self.my_contact_info.read().unwrap().clone() self.my_contact_info.read().unwrap().clone()
} }
@ -1445,11 +1460,13 @@ impl ClusterInfo {
fn handle_adopt_shred_version(self: &Arc<Self>, adopt_shred_version: &mut bool) { fn handle_adopt_shred_version(self: &Arc<Self>, adopt_shred_version: &mut bool) {
// Adopt the entrypoint's `shred_version` if ours is unset // Adopt the entrypoint's `shred_version` if ours is unset
if *adopt_shred_version { if *adopt_shred_version {
// If gossip was given an entrypoint, lookup its id // If gossip was given an entrypoint, look up the ContactInfo by the given
let entrypoint_id = self.entrypoint.read().unwrap().as_ref().map(|e| e.id); // entrypoint gossip adddress
if let Some(entrypoint_id) = entrypoint_id { let gossip_addr = self.entrypoint.read().unwrap().as_ref().map(|e| e.gossip);
if let Some(gossip_addr) = gossip_addr {
// If a pull from the entrypoint was successful, it should exist in the crds table // If a pull from the entrypoint was successful, it should exist in the crds table
let entrypoint = self.lookup_contact_info(&entrypoint_id, |ci| ci.clone()); let entrypoint = self.lookup_contact_info_by_gossip_addr(&gossip_addr);
if let Some(entrypoint) = entrypoint { if let Some(entrypoint) = entrypoint {
if entrypoint.shred_version == 0 { if entrypoint.shred_version == 0 {
info!("Unable to adopt entrypoint's shred version"); info!("Unable to adopt entrypoint's shred version");
@ -1465,6 +1482,7 @@ impl ClusterInfo {
.unwrap() .unwrap()
.set_shred_version(entrypoint.shred_version); .set_shred_version(entrypoint.shred_version);
self.insert_self(); self.insert_self();
*self.entrypoint.write().unwrap() = Some(entrypoint);
*adopt_shred_version = false; *adopt_shred_version = false;
} }
} }
@ -1853,15 +1871,15 @@ impl ClusterInfo {
) -> (usize, usize, usize) { ) -> (usize, usize, usize) {
let len = crds_values.len(); let len = crds_values.len();
trace!("PullResponse me: {} from: {} len={}", self.id, from, len); trace!("PullResponse me: {} from: {} len={}", self.id, from, len);
let shred_version = self
if let Some(shred_version) = self.lookup_contact_info(from, |ci| ci.shred_version) { .lookup_contact_info(from, |ci| ci.shred_version)
.unwrap_or(0);
Self::filter_by_shred_version( Self::filter_by_shred_version(
from, from,
&mut crds_values, &mut crds_values,
shred_version, shred_version,
self.my_shred_version(), self.my_shred_version(),
); );
}
let filtered_len = crds_values.len(); let filtered_len = crds_values.len();
let mut pull_stats = ProcessPullStats::default(); let mut pull_stats = ProcessPullStats::default();
@ -1913,7 +1931,8 @@ impl ClusterInfo {
shred_version: u16, shred_version: u16,
my_shred_version: u16, my_shred_version: u16,
) { ) {
if my_shred_version != 0 && shred_version != 0 && shred_version != my_shred_version { // Always run filter on spies
if my_shred_version != 0 && shred_version != my_shred_version {
// Allow someone to update their own ContactInfo so they // Allow someone to update their own ContactInfo so they
// can change shred versions if needed. // can change shred versions if needed.
crds_values.retain(|crds_value| match &crds_value.data { crds_values.retain(|crds_value| match &crds_value.data {
@ -1934,14 +1953,15 @@ impl ClusterInfo {
self.stats.push_message_count.add_relaxed(1); self.stats.push_message_count.add_relaxed(1);
let len = crds_values.len(); let len = crds_values.len();
if let Some(shred_version) = self.lookup_contact_info(from, |ci| ci.shred_version) { let shred_version = self
.lookup_contact_info(from, |ci| ci.shred_version)
.unwrap_or(0);
Self::filter_by_shred_version( Self::filter_by_shred_version(
from, from,
&mut crds_values, &mut crds_values,
shred_version, shred_version,
self.my_shred_version(), self.my_shred_version(),
); );
}
let filtered_len = crds_values.len(); let filtered_len = crds_values.len();
self.stats self.stats
.push_message_value_count .push_message_value_count
@ -3389,4 +3409,36 @@ mod tests {
let vote = CrdsValue::new_signed(CrdsData::Vote(1, vote), &Keypair::new()); let vote = CrdsValue::new_signed(CrdsData::Vote(1, vote), &Keypair::new());
assert!(bincode::serialized_size(&vote).unwrap() <= MAX_PROTOCOL_PAYLOAD_SIZE); assert!(bincode::serialized_size(&vote).unwrap() <= MAX_PROTOCOL_PAYLOAD_SIZE);
} }
#[test]
fn test_handle_adopt_shred_version() {
let node_keypair = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
));
// Simulating starting up with default entrypoint, no known id, only a gossip
// address
let entrypoint_gossip_addr = socketaddr!("127.0.0.2:1234");
let mut entrypoint = ContactInfo::new_localhost(&Pubkey::default(), timestamp());
entrypoint.gossip = entrypoint_gossip_addr;
assert_eq!(entrypoint.shred_version, 0);
cluster_info.set_entrypoint(entrypoint);
// Simulate getting entrypoint ContactInfo from gossip
let mut gossiped_entrypoint_info =
ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
gossiped_entrypoint_info.gossip = entrypoint_gossip_addr;
gossiped_entrypoint_info.shred_version = 1;
cluster_info.insert_info(gossiped_entrypoint_info.clone());
// Adopt the entrypoint's gossiped contact info and verify
ClusterInfo::handle_adopt_shred_version(&cluster_info, &mut true);
assert_eq!(
cluster_info.entrypoint.read().unwrap().as_ref().unwrap(),
&gossiped_entrypoint_info
);
assert_eq!(cluster_info.my_shred_version(), 1);
}
} }

View File

@ -193,9 +193,7 @@ impl CrdsGossipPull {
.filter(|v| { .filter(|v| {
v.id != *self_id v.id != *self_id
&& ContactInfo::is_valid_address(&v.gossip) && ContactInfo::is_valid_address(&v.gossip)
&& (self_shred_version == 0 && (self_shred_version == 0 || self_shred_version == v.shred_version)
|| v.shred_version == 0
|| self_shred_version == v.shred_version)
}) })
.map(|item| { .map(|item| {
let max_weight = f32::from(u16::max_value()) - 1.0; let max_weight = f32::from(u16::max_value()) - 1.0;
@ -591,14 +589,14 @@ mod test {
crds.insert(node_123.clone(), 0).unwrap(); crds.insert(node_123.clone(), 0).unwrap();
crds.insert(node_456.clone(), 0).unwrap(); crds.insert(node_456.clone(), 0).unwrap();
// shred version 123 should ignore 456 nodes // shred version 123 should ignore nodes with versions 0 and 456
let options = node let options = node
.pull_options(&crds, &me.label().pubkey(), 123, 0, &stakes) .pull_options(&crds, &me.label().pubkey(), 123, 0, &stakes)
.iter() .iter()
.map(|(_, c)| c.id) .map(|(_, c)| c.id)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
assert_eq!(options.len(), 2); assert_eq!(options.len(), 1);
assert!(options.contains(&spy.pubkey())); assert!(!options.contains(&spy.pubkey()));
assert!(options.contains(&node_123.pubkey())); assert!(options.contains(&node_123.pubkey()));
// spy nodes will see all // spy nodes will see all

View File

@ -344,9 +344,7 @@ impl CrdsGossipPush {
.filter(|(info, _)| { .filter(|(info, _)| {
info.id != *self_id info.id != *self_id
&& ContactInfo::is_valid_address(&info.gossip) && ContactInfo::is_valid_address(&info.gossip)
&& (self_shred_version == 0 && self_shred_version == info.shred_version
|| info.shred_version == 0
|| self_shred_version == info.shred_version)
}) })
.map(|(info, value)| { .map(|(info, value)| {
let max_weight = f32::from(u16::max_value()) - 1.0; let max_weight = f32::from(u16::max_value()) - 1.0;
@ -641,28 +639,25 @@ mod test {
crds.insert(me.clone(), 0).unwrap(); crds.insert(me.clone(), 0).unwrap();
crds.insert(spy.clone(), 0).unwrap(); crds.insert(spy.clone(), 0).unwrap();
crds.insert(node_123.clone(), 0).unwrap(); crds.insert(node_123.clone(), 0).unwrap();
crds.insert(node_456.clone(), 0).unwrap(); crds.insert(node_456, 0).unwrap();
// shred version 123 should ignore 456 nodes // shred version 123 should ignore nodes with versions 0 and 456
let options = node let options = node
.push_options(&crds, &me.label().pubkey(), 123, &stakes) .push_options(&crds, &me.label().pubkey(), 123, &stakes)
.iter() .iter()
.map(|(_, c)| c.id) .map(|(_, c)| c.id)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
assert_eq!(options.len(), 2); assert_eq!(options.len(), 1);
assert!(options.contains(&spy.pubkey())); assert!(!options.contains(&spy.pubkey()));
assert!(options.contains(&node_123.pubkey())); assert!(options.contains(&node_123.pubkey()));
// spy nodes will see all // spy nodes should not push to people on different shred versions
let options = node let options = node
.push_options(&crds, &spy.label().pubkey(), 0, &stakes) .push_options(&crds, &spy.label().pubkey(), 0, &stakes)
.iter() .iter()
.map(|(_, c)| c.id) .map(|(_, c)| c.id)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
assert_eq!(options.len(), 3); assert!(options.is_empty());
assert!(options.contains(&me.pubkey()));
assert!(options.contains(&node_123.pubkey()));
assert!(options.contains(&node_456.pubkey()));
} }
#[test] #[test]
fn test_new_push_messages() { fn test_new_push_messages() {