diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 611d46cda2..24357078df 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -487,11 +487,6 @@ impl ClusterInfo { contact_info_path: PathBuf::default(), contact_save_interval: 0, // disabled }; - { - let mut gossip = me.gossip.write().unwrap(); - gossip.set_self(&id); - gossip.set_shred_version(me.my_shred_version()); - } me.insert_self(); me.push_self(&HashMap::new(), None); me @@ -499,8 +494,7 @@ impl ClusterInfo { // Should only be used by tests and simulations pub fn clone_with_id(&self, new_id: &Pubkey) -> Self { - let mut gossip = self.gossip.read().unwrap().mock_clone(); - gossip.id = *new_id; + let gossip = self.gossip.read().unwrap().mock_clone(); let mut my_contact_info = self.my_contact_info.read().unwrap().clone(); my_contact_info.id = *new_id; ClusterInfo { @@ -548,10 +542,17 @@ impl ClusterInfo { .lock() .unwrap() .extend(entries); - self.gossip - .write() - .unwrap() - .refresh_push_active_set(stakes, gossip_validators); + let ContactInfo { + id: self_pubkey, + shred_version, + .. + } = *self.my_contact_info.read().unwrap(); + self.gossip.write().unwrap().refresh_push_active_set( + &self_pubkey, + shred_version, + stakes, + gossip_validators, + ); } // TODO kill insert_info, only used by tests @@ -1475,6 +1476,7 @@ impl ClusterInfo { match gossip.new_pull_request( thread_pool, self.keypair.deref(), + self.my_shred_version(), now, gossip_validators, stakes, @@ -1652,10 +1654,6 @@ impl ClusterInfo { entrypoint.shred_version, entrypoint.id ); self.my_contact_info.write().unwrap().shred_version = entrypoint.shred_version; - self.gossip - .write() - .unwrap() - .set_shred_version(entrypoint.shred_version); } } self.my_shred_version() != 0 @@ -1670,14 +1668,15 @@ impl ClusterInfo { bank_forks: Option<&RwLock>, stakes: &HashMap, ) { + let self_pubkey = self.id(); let epoch_duration = get_epoch_duration(bank_forks); let timeouts = { let gossip = self.gossip.read().unwrap(); - gossip.make_timeouts(stakes, epoch_duration) + gossip.make_timeouts(self_pubkey, stakes, epoch_duration) }; let num_purged = self .time_gossip_write_lock("purge", &self.stats.purge) - .purge(thread_pool, timestamp(), &timeouts); + .purge(&self_pubkey, thread_pool, timestamp(), &timeouts); inc_new_counter_info!("cluster_info-purge-count", num_purged); } @@ -1823,11 +1822,13 @@ impl ClusterInfo { ); let mut prune_message_timeout = 0; let mut bad_prune_destination = 0; + let self_pubkey = self.id(); { let gossip = self.time_gossip_read_lock("process_prune", &self.stats.process_prune); let now = timestamp(); for (from, data) in messages { match gossip.process_prune_msg( + &self_pubkey, &from, &data.destination, &data.prunes, @@ -2114,9 +2115,10 @@ impl ClusterInfo { .reduce(HashMap::new, merge) }); if !responses.is_empty() { + let self_pubkey = self.id(); let timeouts = { let gossip = self.gossip.read().unwrap(); - gossip.make_timeouts(stakes, epoch_duration) + gossip.make_timeouts(self_pubkey, stakes, epoch_duration) }; for (from, data) in responses { self.handle_pull_response(&from, data, &timeouts); @@ -2263,9 +2265,10 @@ impl ClusterInfo { .collect() }; // Generate prune messages. + let self_pubkey = self.id(); let prunes = self .time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache) - .prune_received_cache(origins, stakes); + .prune_received_cache(&self_pubkey, origins, stakes); let prunes: Vec<(Pubkey /*from*/, Vec /*origins*/)> = prunes .into_iter() .flat_map(|(from, prunes)| { @@ -3349,17 +3352,21 @@ mod tests { let (spy, _, _) = ClusterInfo::spy_node(&solana_sdk::pubkey::new_rand(), 0); let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info)); cluster_info.insert_info(spy); - cluster_info - .gossip - .write() - .unwrap() - .refresh_push_active_set(&HashMap::new(), None); + { + let mut gossip = cluster_info.gossip.write().unwrap(); + gossip.refresh_push_active_set( + &cluster_info.id(), + cluster_info.my_shred_version(), + &HashMap::new(), // stakes + None, // gossip validators + ); + } let reqs = cluster_info.generate_new_gossip_requests( &thread_pool, - None, // gossip_validators - &HashMap::new(), - true, // generate_pull_requests - false, // require_stake_for_gossip + None, // gossip_validators + &HashMap::new(), // stakes + true, // generate_pull_requests + false, // require_stake_for_gossip ); //assert none of the addrs are invalid. reqs.iter().all(|(addr, _)| { @@ -3465,11 +3472,15 @@ mod tests { .unwrap() .mock_pong(peer.id, peer.gossip, Instant::now()); cluster_info.insert_info(peer); - cluster_info - .gossip - .write() - .unwrap() - .refresh_push_active_set(&HashMap::new(), None); + { + let mut gossip = cluster_info.gossip.write().unwrap(); + gossip.refresh_push_active_set( + &cluster_info.id(), + cluster_info.my_shred_version(), + &HashMap::new(), // stakes + None, // gossip validators + ); + } //check that all types of gossip messages are signed correctly let push_messages = cluster_info .gossip @@ -3490,6 +3501,7 @@ mod tests { .new_pull_request( &thread_pool, cluster_info.keypair.deref(), + cluster_info.my_shred_version(), timestamp(), None, &HashMap::new(), @@ -3790,6 +3802,7 @@ mod tests { let timeouts = { let gossip = cluster_info.gossip.read().unwrap(); gossip.make_timeouts( + cluster_info.id(), &HashMap::default(), // stakes, Duration::from_millis(gossip.pull.crds_timeout), ) diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 84a9baec9f..6edbfc8efb 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -32,34 +32,14 @@ use { }, }; +#[derive(Default)] pub struct CrdsGossip { pub crds: Crds, - pub id: Pubkey, - pub shred_version: u16, pub push: CrdsGossipPush, pub pull: CrdsGossipPull, } -impl Default for CrdsGossip { - fn default() -> Self { - CrdsGossip { - crds: Crds::default(), - id: Pubkey::default(), - shred_version: 0, - push: CrdsGossipPush::default(), - pull: CrdsGossipPull::default(), - } - } -} - impl CrdsGossip { - pub fn set_self(&mut self, id: &Pubkey) { - self.id = *id; - } - pub fn set_shred_version(&mut self, shred_version: u16) { - self.shred_version = shred_version; - } - /// process a push message to the network /// Returns unique origins' pubkeys of upserted values. pub fn process_push_message( @@ -83,18 +63,18 @@ impl CrdsGossip { /// remove redundant paths in the network pub fn prune_received_cache( &mut self, + self_pubkey: &Pubkey, origins: I, // Unique pubkeys of crds values' owners. stakes: &HashMap, ) -> HashMap> where I: IntoIterator, { - let self_pubkey = self.id; origins .into_iter() .flat_map(|origin| { self.push - .prune_received_cache(&self_pubkey, &origin, stakes) + .prune_received_cache(self_pubkey, &origin, stakes) .into_iter() .zip(std::iter::repeat(origin)) }) @@ -179,6 +159,7 @@ impl CrdsGossip { /// add the `from` to the peer's filter of nodes pub fn process_prune_msg( &self, + self_pubkey: &Pubkey, peer: &Pubkey, destination: &Pubkey, origin: &[Pubkey], @@ -189,8 +170,8 @@ impl CrdsGossip { if expired { return Err(CrdsGossipError::PruneMessageTimeout); } - if self.id == *destination { - self.push.process_prune_msg(&self.id, peer, origin); + if self_pubkey == destination { + self.push.process_prune_msg(self_pubkey, peer, origin); Ok(()) } else { Err(CrdsGossipError::BadPruneDestination) @@ -201,6 +182,8 @@ impl CrdsGossip { /// * ratio - number of actives to rotate pub fn refresh_push_active_set( &mut self, + self_pubkey: &Pubkey, + self_shred_version: u16, stakes: &HashMap, gossip_validators: Option<&HashSet>, ) { @@ -208,18 +191,20 @@ impl CrdsGossip { &self.crds, stakes, gossip_validators, - &self.id, - self.shred_version, + self_pubkey, + self_shred_version, self.crds.num_nodes(), CRDS_GOSSIP_NUM_ACTIVE, ) } /// generate a random request + #[allow(clippy::too_many_arguments)] pub fn new_pull_request( &self, thread_pool: &ThreadPool, self_keypair: &Keypair, + self_shred_version: u16, now: u64, gossip_validators: Option<&HashSet>, stakes: &HashMap, @@ -231,7 +216,7 @@ impl CrdsGossip { thread_pool, &self.crds, self_keypair, - self.shred_version, + self_shred_version, now, gossip_validators, stakes, @@ -305,14 +290,16 @@ impl CrdsGossip { pub fn make_timeouts( &self, + self_pubkey: Pubkey, stakes: &HashMap, epoch_duration: Duration, ) -> HashMap { - self.pull.make_timeouts(self.id, stakes, epoch_duration) + self.pull.make_timeouts(self_pubkey, stakes, epoch_duration) } pub fn purge( &mut self, + self_pubkey: &Pubkey, thread_pool: &ThreadPool, now: u64, timeouts: &HashMap, @@ -324,7 +311,7 @@ impl CrdsGossip { } if now > self.pull.crds_timeout { //sanity check - assert_eq!(timeouts[&self.id], std::u64::MAX); + assert_eq!(timeouts[self_pubkey], std::u64::MAX); assert!(timeouts.contains_key(&Pubkey::default())); rv = self .pull @@ -342,7 +329,6 @@ impl CrdsGossip { crds: self.crds.clone(), push: self.push.mock_clone(), pull: self.pull.mock_clone(), - ..*self } } } @@ -374,11 +360,8 @@ mod test { #[test] fn test_prune_errors() { - let mut crds_gossip = CrdsGossip { - id: Pubkey::new(&[0; 32]), - ..CrdsGossip::default() - }; - let id = crds_gossip.id; + let mut crds_gossip = CrdsGossip::default(); + let id = Pubkey::new(&[0; 32]); let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0); let prune_pubkey = Pubkey::new(&[2; 32]); crds_gossip @@ -388,10 +371,16 @@ mod test { 0, ) .unwrap(); - crds_gossip.refresh_push_active_set(&HashMap::new(), None); + crds_gossip.refresh_push_active_set( + &id, + 0, // shred version + &HashMap::new(), // stakes + None, // gossip validators + ); let now = timestamp(); //incorrect dest let mut res = crds_gossip.process_prune_msg( + &id, &ci.id, &Pubkey::new(hash(&[1; 32]).as_ref()), &[prune_pubkey], @@ -400,11 +389,25 @@ mod test { ); assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination)); //correct dest - res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, now); + res = crds_gossip.process_prune_msg( + &id, // self_pubkey + &ci.id, // peer + &id, // destination + &[prune_pubkey], // origins + now, + now, + ); res.unwrap(); //test timeout let timeout = now + crds_gossip.push.prune_timeout * 2; - res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, timeout); + res = crds_gossip.process_prune_msg( + &id, // self_pubkey + &ci.id, // peer + &id, // destination + &[prune_pubkey], // origins + now, + timeout, + ); assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout)); } } diff --git a/gossip/tests/crds_gossip.rs b/gossip/tests/crds_gossip.rs index 0baa28da5c..342580bc65 100644 --- a/gossip/tests/crds_gossip.rs +++ b/gossip/tests/crds_gossip.rs @@ -116,11 +116,9 @@ fn star_network_create(num: usize) -> Network { let node_keypair = Arc::new(Keypair::new()); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); - let id = new.label().pubkey(); let mut node = CrdsGossip::default(); node.crds.insert(new.clone(), timestamp()).unwrap(); node.crds.insert(entry.clone(), timestamp()).unwrap(); - node.set_self(&id); let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); (new.label().pubkey(), node) }) @@ -128,7 +126,6 @@ fn star_network_create(num: usize) -> Network { let mut node = CrdsGossip::default(); let id = entry.label().pubkey(); node.crds.insert(entry, timestamp()).unwrap(); - node.set_self(&id); let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); network.insert(id, node); Network::new(network) @@ -141,18 +138,14 @@ fn rstar_network_create(num: usize) -> Network { let mut origin = CrdsGossip::default(); let id = entry.label().pubkey(); origin.crds.insert(entry, timestamp()).unwrap(); - origin.set_self(&id); let mut network: HashMap<_, _> = (1..num) .map(|_| { let node_keypair = Arc::new(Keypair::new()); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); - let id = new.label().pubkey(); let mut node = CrdsGossip::default(); node.crds.insert(new.clone(), timestamp()).unwrap(); origin.crds.insert(new.clone(), timestamp()).unwrap(); - node.set_self(&id); - let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); (new.label().pubkey(), node) }) @@ -168,10 +161,8 @@ fn ring_network_create(num: usize) -> Network { let node_keypair = Arc::new(Keypair::new()); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); - let id = new.label().pubkey(); let mut node = CrdsGossip::default(); node.crds.insert(new.clone(), timestamp()).unwrap(); - node.set_self(&id); let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); (new.label().pubkey(), node) }) @@ -180,7 +171,7 @@ fn ring_network_create(num: usize) -> Network { for k in 0..keys.len() { let start_info = { let start = &network[&keys[k]]; - let start_id = start.lock().unwrap().id; + let start_id = keys[k]; let label = CrdsValueLabel::ContactInfo(start_id); let gossip = start.gossip.lock().unwrap(); gossip.crds.get(&label).unwrap().value.clone() @@ -202,10 +193,8 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network { let node_keypair = Arc::new(Keypair::new()); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); - let id = new.label().pubkey(); let mut node = CrdsGossip::default(); node.crds.insert(new.clone(), timestamp()).unwrap(); - node.set_self(&id); let node = Node::staked( node_keypair, contact_info, @@ -221,15 +210,14 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network { .iter() .map(|k| { let start = &network[k].lock().unwrap(); - let start_id = start.id; - let start_label = CrdsValueLabel::ContactInfo(start_id); + let start_label = CrdsValueLabel::ContactInfo(*k); start.crds.get(&start_label).unwrap().value.clone() }) .collect(); - for end in network.values_mut() { + for (end_pubkey, end) in network.iter_mut() { for k in 0..keys.len() { let mut end = end.lock().unwrap(); - if keys[k] != end.id { + if keys[k] != *end_pubkey { let start_info = start_entries[k].clone(); end.crds.insert(start_info, timestamp()).unwrap(); } @@ -258,9 +246,13 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver // make sure there is someone in the active set let network_values: Vec = network.values().cloned().collect(); network_values.par_iter().for_each(|node| { - node.lock() - .unwrap() - .refresh_push_active_set(&HashMap::new(), None); + let node_pubkey = node.keypair.pubkey(); + node.lock().unwrap().refresh_push_active_set( + &node_pubkey, + 0, // shred version + &HashMap::new(), // stakes + None, // gossip validators + ); }); let mut total_bytes = bytes_tx; let mut ts = timestamp(); @@ -271,8 +263,9 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver ts += 1000; // push a message to the network network_values.par_iter().for_each(|locked_node| { + let node_pubkey = locked_node.keypair.pubkey(); let node = &mut locked_node.lock().unwrap(); - let label = CrdsValueLabel::ContactInfo(node.id); + let label = CrdsValueLabel::ContactInfo(node_pubkey); let entry = node.crds.get(&label).unwrap(); let mut m = entry.value.contact_info().cloned().unwrap(); m.wallclock = now; @@ -327,13 +320,15 @@ fn network_run_push( let requests: Vec<_> = network_values .par_iter() .map(|node| { + let node_pubkey = node.keypair.pubkey(); let mut node_lock = node.lock().unwrap(); let timeouts = node_lock.make_timeouts( + node_pubkey, &HashMap::default(), // stakes Duration::from_millis(node_lock.pull.crds_timeout), ); - node_lock.purge(thread_pool, now, &timeouts); - (node_lock.id, node_lock.new_push_messages(vec![], now)) + node_lock.purge(&node_pubkey, thread_pool, now, &timeouts); + (node_pubkey, node_lock.new_push_messages(vec![], now)) }) .collect(); let transfered: Vec<_> = requests @@ -356,7 +351,14 @@ fn network_run_push( .collect(); let prunes_map = network .get(&to) - .map(|node| node.lock().unwrap().prune_received_cache(origins, &stakes)) + .map(|node| { + let node_pubkey = node.keypair.pubkey(); + node.lock().unwrap().prune_received_cache( + &node_pubkey, + origins, + &stakes, + ) + }) .unwrap(); for (from, prune_set) in prunes_map { @@ -371,11 +373,19 @@ fn network_run_push( network .get(&from) .map(|node| { + let node_pubkey = node.keypair.pubkey(); let node = node.lock().unwrap(); - let destination = node.id; + let destination = node_pubkey; let now = timestamp(); - node.process_prune_msg(&to, &destination, &prune_keys, now, now) - .unwrap() + node.process_prune_msg( + &node_pubkey, + &to, + &destination, + &prune_keys, + now, + now, + ) + .unwrap() }) .unwrap(); } @@ -399,9 +409,13 @@ fn network_run_push( } if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 { network_values.par_iter().for_each(|node| { - node.lock() - .unwrap() - .refresh_push_active_set(&HashMap::new(), None); + let node_pubkey = node.keypair.pubkey(); + node.lock().unwrap().refresh_push_active_set( + &node_pubkey, + 0, // shred version + &HashMap::new(), // stakes + None, // gossip validators + ); }); } total = network_values @@ -468,6 +482,7 @@ fn network_run_pull( .new_pull_request( thread_pool, from.keypair.deref(), + 0, // shred version. now, None, &HashMap::new(), @@ -476,8 +491,9 @@ fn network_run_pull( &mut pings, ) .ok()?; + let from_pubkey = from.keypair.pubkey(); let gossip = from.gossip.lock().unwrap(); - let label = CrdsValueLabel::ContactInfo(gossip.id); + let label = CrdsValueLabel::ContactInfo(from_pubkey); let self_info = gossip.crds.get(&label).unwrap().value.clone(); Some((peer.id, filters, self_info)) }) @@ -676,11 +692,8 @@ fn test_star_network_large_push() { } #[test] fn test_prune_errors() { - let mut crds_gossip = CrdsGossip { - id: Pubkey::new(&[0; 32]), - ..CrdsGossip::default() - }; - let id = crds_gossip.id; + let mut crds_gossip = CrdsGossip::default(); + let id = Pubkey::new(&[0; 32]); let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0); let prune_pubkey = Pubkey::new(&[2; 32]); crds_gossip @@ -690,22 +703,42 @@ fn test_prune_errors() { 0, ) .unwrap(); - crds_gossip.refresh_push_active_set(&HashMap::new(), None); + crds_gossip.refresh_push_active_set( + &id, + 0, // shred version + &HashMap::new(), // stakes + None, // gossip validators + ); let now = timestamp(); //incorrect dest let mut res = crds_gossip.process_prune_msg( - &ci.id, - &Pubkey::new(hash(&[1; 32]).as_ref()), - &[prune_pubkey], + &id, // self_pubkey + &ci.id, // peer + &Pubkey::new(hash(&[1; 32]).as_ref()), // destination + &[prune_pubkey], // origins now, now, ); assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination)); //correct dest - res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, now); + res = crds_gossip.process_prune_msg( + &id, // self_pubkey + &ci.id, // peer + &id, // destination + &[prune_pubkey], // origins + now, + now, + ); res.unwrap(); //test timeout let timeout = now + crds_gossip.push.prune_timeout * 2; - res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, timeout); + res = crds_gossip.process_prune_msg( + &id, // self_pubkey + &ci.id, // peer + &id, // destination + &[prune_pubkey], // origins + now, + timeout, + ); assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout)); }