diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index cc3e25718b..4cff79f08c 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1423,13 +1423,13 @@ impl ClusterInfo { /// At random pick a node and try to get updated changes from them fn run_gossip( - obj: &Self, + &self, recycler: &PacketsRecycler, stakes: &HashMap, sender: &PacketSender, generate_pull_requests: bool, ) -> Result<()> { - let reqs = obj.generate_new_gossip_requests(&stakes, generate_pull_requests); + let reqs = self.generate_new_gossip_requests(&stakes, generate_pull_requests); if !reqs.is_empty() { let packets = to_packets_with_destination(recycler.clone(), &reqs); sender.send(packets)?; @@ -1437,14 +1437,14 @@ impl ClusterInfo { Ok(()) } - fn handle_adopt_shred_version(obj: &Arc, adopt_shred_version: &mut bool) { + fn handle_adopt_shred_version(self: &Arc, adopt_shred_version: &mut bool) { // Adopt the entrypoint's `shred_version` if ours is unset if *adopt_shred_version { // If gossip was given an entrypoint, lookup its id - let entrypoint_id = obj.entrypoint.read().unwrap().as_ref().map(|e| e.id); + let entrypoint_id = self.entrypoint.read().unwrap().as_ref().map(|e| e.id); if let Some(entrypoint_id) = entrypoint_id { // If a pull from the entrypoint was successful, it should exist in the crds table - let entrypoint = obj.lookup_contact_info(&entrypoint_id, |ci| ci.clone()); + let entrypoint = self.lookup_contact_info(&entrypoint_id, |ci| ci.clone()); if let Some(entrypoint) = entrypoint { if entrypoint.shred_version == 0 { info!("Unable to adopt entrypoint's shred version"); @@ -1453,13 +1453,13 @@ impl ClusterInfo { "Setting shred version to {:?} from entrypoint {:?}", entrypoint.shred_version, entrypoint.id ); - obj.my_contact_info.write().unwrap().shred_version = + self.my_contact_info.write().unwrap().shred_version = entrypoint.shred_version; - obj.gossip + self.gossip .write() .unwrap() .set_shred_version(entrypoint.shred_version); - obj.insert_self(); + self.insert_self(); *adopt_shred_version = false; } } @@ -1468,7 +1468,7 @@ impl ClusterInfo { } fn handle_purge( - obj: &Arc, + self: &Arc, bank_forks: &Option>>, stakes: &HashMap, ) { @@ -1483,12 +1483,12 @@ impl ClusterInfo { CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS } }; - let timeouts = obj.gossip.read().unwrap().make_timeouts(stakes, timeout); - let num_purged = obj - .time_gossip_write_lock("purge", &obj.stats.purge) + let timeouts = self.gossip.read().unwrap().make_timeouts(stakes, timeout); + let num_purged = self + .time_gossip_write_lock("purge", &self.stats.purge) .purge(timestamp(), &timeouts); inc_new_counter_info!("cluster_info-purge-count", num_purged); - let table_size = obj.gossip.read().unwrap().crds.table.len(); + let table_size = self.gossip.read().unwrap().crds.table.len(); datapoint_debug!( "cluster_info-purge", ("table_size", table_size as i64, i64), @@ -1498,7 +1498,7 @@ impl ClusterInfo { /// randomly pick a node and ask them for updates asynchronously pub fn gossip( - obj: Arc, + self: Arc, bank_forks: Option>>, sender: PacketSender, exit: &Arc, @@ -1509,18 +1509,18 @@ impl ClusterInfo { .spawn(move || { let mut last_push = timestamp(); let mut last_contact_info_trace = timestamp(); - let mut adopt_shred_version = obj.my_shred_version() == 0; + let mut adopt_shred_version = self.my_shred_version() == 0; let recycler = PacketsRecycler::default(); - let message = CrdsData::Version(Version::new(obj.id())); - obj.push_message(CrdsValue::new_signed(message, &obj.keypair)); + let message = CrdsData::Version(Version::new(self.id())); + self.push_message(CrdsValue::new_signed(message, &self.keypair)); let mut generate_pull_requests = true; loop { let start = timestamp(); thread_mem_usage::datapoint("solana-gossip"); if start - last_contact_info_trace > 10000 { // Log contact info every 10 seconds - info!("\n{}", obj.contact_info_trace()); + info!("\n{}", self.contact_info_trace()); last_contact_info_trace = start; } @@ -1531,20 +1531,19 @@ impl ClusterInfo { None => HashMap::new(), }; - let _ = - Self::run_gossip(&obj, &recycler, &stakes, &sender, generate_pull_requests); + let _ = self.run_gossip(&recycler, &stakes, &sender, generate_pull_requests); if exit.load(Ordering::Relaxed) { return; } - Self::handle_purge(&obj, &bank_forks, &stakes); + self.handle_purge(&bank_forks, &stakes); - Self::handle_adopt_shred_version(&obj, &mut adopt_shred_version); + self.handle_adopt_shred_version(&mut adopt_shred_version); //TODO: possibly tune this parameter - //we saw a deadlock passing an obj.read().unwrap().timeout into sleep + //we saw a deadlock passing an self.read().unwrap().timeout into sleep if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { - obj.push_self(&stakes); + self.push_self(&stakes); last_push = timestamp(); } let elapsed = timestamp() - start; @@ -1560,17 +1559,21 @@ impl ClusterInfo { #[allow(clippy::cognitive_complexity)] fn handle_packets( - me: &Self, + &self, recycler: &PacketsRecycler, stakes: &HashMap, packets: Packets, response_sender: &PacketSender, - epoch_ms: u64, + epoch_time_ms: u64, ) { // iter over the packets, collect pulls separately and process everything else let allocated = thread_mem_usage::Allocatedp::default(); let mut gossip_pull_data: Vec = vec![]; - let timeouts = me.gossip.read().unwrap().make_timeouts(&stakes, epoch_ms); + let timeouts = self + .gossip + .read() + .unwrap() + .make_timeouts(&stakes, epoch_time_ms); let mut pull_responses = HashMap::new(); packets.packets.iter().for_each(|packet| { let from_addr = packet.meta.addr(); @@ -1586,12 +1589,12 @@ impl ClusterInfo { 1 ); } else if let Some(contact_info) = caller.contact_info() { - if contact_info.id == me.id() { + if contact_info.id == self.id() { warn!("PullRequest ignored, I'm talking to myself"); inc_new_counter_debug!("cluster_info-window-request-loopback", 1); } else if contact_info.shred_version == 0 - || contact_info.shred_version == me.my_shred_version() - || me.my_shred_version() == 0 + || contact_info.shred_version == self.my_shred_version() + || self.my_shred_version() == 0 { gossip_pull_data.push(PullData { from_addr, @@ -1599,7 +1602,7 @@ impl ClusterInfo { filter, }); } else { - me.stats.skip_pull_shred_version.add_relaxed(1); + self.stats.skip_pull_shred_version.add_relaxed(1); } } datapoint_debug!( @@ -1638,7 +1641,7 @@ impl ClusterInfo { } ret }); - let rsp = Self::handle_push_message(me, recycler, &from, data, stakes); + let rsp = Self::handle_push_message(self, recycler, &from, data, stakes); if let Some(rsp) = rsp { let _ignore_disconnect = response_sender.send(rsp); } @@ -1655,8 +1658,8 @@ impl ClusterInfo { "cluster_info-prune_message-size", data.prunes.len() ); - match me - .time_gossip_write_lock("process_prune", &me.stats.process_prune) + match self + .time_gossip_write_lock("process_prune", &self.stats.process_prune) .process_prune_msg( &from, &data.destination, @@ -1684,11 +1687,11 @@ impl ClusterInfo { }); for (from, data) in pull_responses { - Self::handle_pull_response(me, &from, data, &timeouts); + self.handle_pull_response(&from, data, &timeouts); } // process the collected pulls together - let rsp = Self::handle_pull_requests(me, recycler, gossip_pull_data, stakes); + let rsp = self.handle_pull_requests(recycler, gossip_pull_data, stakes); if let Some(rsp) = rsp { let _ignore_disconnect = response_sender.send(rsp); } @@ -1717,7 +1720,7 @@ impl ClusterInfo { // Pull requests take an incoming bloom filter of contained entries from a node // and tries to send back to them the values it detects are missing. fn handle_pull_requests( - me: &Self, + &self, recycler: &PacketsRecycler, requests: Vec, stakes: &HashMap, @@ -1726,19 +1729,22 @@ impl ClusterInfo { let mut caller_and_filters = vec![]; let mut addrs = vec![]; let mut time = Measure::start("handle_pull_requests"); - me.update_data_budget(stakes); + self.update_data_budget(stakes); for pull_data in requests { caller_and_filters.push((pull_data.caller, pull_data.filter)); addrs.push(pull_data.from_addr); } let now = timestamp(); - let self_id = me.id(); + let self_id = self.id(); - let pull_responses = me - .time_gossip_read_lock("generate_pull_responses", &me.stats.generate_pull_responses) + let pull_responses = self + .time_gossip_read_lock( + "generate_pull_responses", + &self.stats.generate_pull_responses, + ) .generate_pull_responses(&caller_and_filters); - me.time_gossip_write_lock("process_pull_reqs", &me.stats.process_pull_requests) + self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests) .process_pull_requests(caller_and_filters, now); // Filter bad to addresses @@ -1804,7 +1810,7 @@ impl ClusterInfo { let protocol = Protocol::PullResponse(self_id, vec![response]); let new_packet = Packet::from_data(&from_addr, protocol); { - let mut w_outbound_budget = me.outbound_budget.write().unwrap(); + let mut w_outbound_budget = self.outbound_budget.write().unwrap(); if w_outbound_budget.bytes > new_packet.meta.size { sent.insert(index); w_outbound_budget.bytes -= new_packet.meta.size; @@ -1837,31 +1843,31 @@ impl ClusterInfo { // Returns (failed, timeout, success) fn handle_pull_response( - me: &Self, + &self, from: &Pubkey, mut crds_values: Vec, timeouts: &HashMap, ) -> (usize, usize, usize) { let len = crds_values.len(); - trace!("PullResponse me: {} from: {} len={}", me.id, from, len); + trace!("PullResponse me: {} from: {} len={}", self.id, from, len); - if let Some(shred_version) = me.lookup_contact_info(from, |ci| ci.shred_version) { + if let Some(shred_version) = self.lookup_contact_info(from, |ci| ci.shred_version) { Self::filter_by_shred_version( from, &mut crds_values, shred_version, - me.my_shred_version(), + self.my_shred_version(), ); } let filtered_len = crds_values.len(); let mut pull_stats = ProcessPullStats::default(); - let (filtered_pulls, filtered_pulls_expired_timeout) = me - .time_gossip_read_lock("filter_pull_resp", &me.stats.filter_pull_response) + let (filtered_pulls, filtered_pulls_expired_timeout) = self + .time_gossip_read_lock("filter_pull_resp", &self.stats.filter_pull_response) .filter_pull_responses(timeouts, crds_values, timestamp(), &mut pull_stats); if !filtered_pulls.is_empty() || !filtered_pulls_expired_timeout.is_empty() { - me.time_gossip_write_lock("process_pull_resp", &me.stats.process_pull_response) + self.time_gossip_write_lock("process_pull_resp", &self.stats.process_pull_response) .process_pull_responses( from, filtered_pulls, @@ -1871,23 +1877,23 @@ impl ClusterInfo { ); } - me.stats + self.stats .skip_pull_response_shred_version .add_relaxed((len - filtered_len) as u64); - me.stats.process_pull_response_count.add_relaxed(1); - me.stats + self.stats.process_pull_response_count.add_relaxed(1); + self.stats .process_pull_response_len .add_relaxed(filtered_len as u64); - me.stats + self.stats .process_pull_response_timeout .add_relaxed(pull_stats.timeout_count as u64); - me.stats + self.stats .process_pull_response_fail_insert .add_relaxed(pull_stats.failed_insert as u64); - me.stats + self.stats .process_pull_response_fail_timeout .add_relaxed(pull_stats.failed_timeout as u64); - me.stats + self.stats .process_pull_response_success .add_relaxed(pull_stats.success as u64); @@ -1915,46 +1921,46 @@ impl ClusterInfo { } fn handle_push_message( - me: &Self, + &self, recycler: &PacketsRecycler, from: &Pubkey, mut crds_values: Vec, stakes: &HashMap, ) -> Option { - let self_id = me.id(); - me.stats.push_message_count.add_relaxed(1); + let self_id = self.id(); + self.stats.push_message_count.add_relaxed(1); let len = crds_values.len(); - if let Some(shred_version) = me.lookup_contact_info(from, |ci| ci.shred_version) { + if let Some(shred_version) = self.lookup_contact_info(from, |ci| ci.shred_version) { Self::filter_by_shred_version( from, &mut crds_values, shred_version, - me.my_shred_version(), + self.my_shred_version(), ); } let filtered_len = crds_values.len(); - me.stats + self.stats .push_message_value_count .add_relaxed(filtered_len as u64); - me.stats + self.stats .skip_push_message_shred_version .add_relaxed((len - filtered_len) as u64); - let updated: Vec<_> = me - .time_gossip_write_lock("process_push", &me.stats.process_push_message) + let updated: Vec<_> = self + .time_gossip_write_lock("process_push", &self.stats.process_push_message) .process_push_message(from, crds_values, timestamp()); let updated_labels: Vec<_> = updated.into_iter().map(|u| u.value.label()).collect(); - let prunes_map: HashMap> = me - .time_gossip_write_lock("prune_received_cache", &me.stats.prune_received_cache) + let prunes_map: HashMap> = self + .time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache) .prune_received_cache(updated_labels, stakes); let rsp: Vec<_> = prunes_map .into_iter() .filter_map(|(from, prune_set)| { inc_new_counter_debug!("cluster_info-push_message-prunes", prune_set.len()); - me.lookup_contact_info(&from, |ci| ci.clone()) + self.lookup_contact_info(&from, |ci| ci.clone()) .and_then(|ci| { let mut prune_msg = PruneData { pubkey: self_id, @@ -1963,7 +1969,7 @@ impl ClusterInfo { destination: from, wallclock: timestamp(), }; - prune_msg.sign(&me.keypair); + prune_msg.sign(&self.keypair); let rsp = Protocol::PruneMessage(self_id, prune_msg); Some((ci.gossip, rsp)) }) @@ -1973,11 +1979,11 @@ impl ClusterInfo { return None; } let mut packets = to_packets_with_destination(recycler.clone(), &rsp); - me.stats + self.stats .push_response_count .add_relaxed(packets.packets.len() as u64); if !packets.is_empty() { - let pushes: Vec<_> = me.new_push_requests(); + let pushes: Vec<_> = self.new_push_requests(); inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len()); pushes.into_iter().for_each(|(remote_gossip_addr, req)| { if !remote_gossip_addr.ip().is_unspecified() && remote_gossip_addr.port() != 0 { @@ -1993,9 +1999,48 @@ impl ClusterInfo { } } + fn get_stakes_and_epoch_time( + bank_forks: Option<&Arc>>, + ) -> (HashMap, u64) { + let epoch_time_ms; + let stakes: HashMap<_, _> = match bank_forks { + Some(ref bank_forks) => { + let bank = bank_forks.read().unwrap().working_bank(); + let epoch = bank.epoch(); + let epoch_schedule = bank.epoch_schedule(); + epoch_time_ms = epoch_schedule.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT; + staking_utils::staked_nodes(&bank) + } + None => { + inc_new_counter_info!("cluster_info-purge-no_working_bank", 1); + epoch_time_ms = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; + HashMap::new() + } + }; + + (stakes, epoch_time_ms) + } + + fn process_packets( + &self, + requests: Vec, + thread_pool: &ThreadPool, + recycler: &PacketsRecycler, + response_sender: &PacketSender, + stakes: HashMap, + epoch_time_ms: u64, + ) { + let sender = response_sender.clone(); + thread_pool.install(|| { + requests.into_par_iter().for_each_with(sender, |s, reqs| { + self.handle_packets(&recycler, &stakes, reqs, s, epoch_time_ms) + }); + }); + } + /// Process messages from the network fn run_listen( - obj: &Self, + &self, recycler: &PacketsRecycler, bank_forks: Option<&Arc>>, requests_receiver: &PacketReceiver, @@ -2003,7 +2048,6 @@ impl ClusterInfo { thread_pool: &ThreadPool, last_print: &mut Instant, ) -> Result<()> { - //TODO cache connections let timeout = Duration::new(1, 0); let mut requests = vec![requests_receiver.recv_timeout(timeout)?]; let mut num_requests = requests.last().unwrap().packets.len(); @@ -2014,35 +2058,26 @@ impl ClusterInfo { num_requests += more_reqs.packets.len(); requests.push(more_reqs) } + if num_requests >= MAX_GOSSIP_TRAFFIC { warn!( "Too much gossip traffic, ignoring some messages (requests={}, max requests={})", num_requests, MAX_GOSSIP_TRAFFIC ); } - let epoch_ms; - let stakes: HashMap<_, _> = match bank_forks { - Some(ref bank_forks) => { - let bank = bank_forks.read().unwrap().working_bank(); - let epoch = bank.epoch(); - let epoch_schedule = bank.epoch_schedule(); - epoch_ms = epoch_schedule.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT; - staking_utils::staked_nodes(&bank) - } - None => { - inc_new_counter_info!("cluster_info-purge-no_working_bank", 1); - epoch_ms = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; - HashMap::new() - } - }; - let sender = response_sender.clone(); - thread_pool.install(|| { - requests.into_par_iter().for_each_with(sender, |s, reqs| { - Self::handle_packets(obj, &recycler, &stakes, reqs, s, epoch_ms) - }); - }); - Self::print_reset_stats(obj, last_print); + let (stakes, epoch_time_ms) = Self::get_stakes_and_epoch_time(bank_forks); + + self.process_packets( + requests, + thread_pool, + recycler, + response_sender, + stakes, + epoch_time_ms, + ); + + self.print_reset_stats(last_print); Ok(()) } @@ -2214,7 +2249,7 @@ impl ClusterInfo { } pub fn listen( - me: Arc, + self: Arc, bank_forks: Option>>, requests_receiver: PacketReceiver, response_sender: PacketSender, @@ -2232,8 +2267,7 @@ impl ClusterInfo { .unwrap(); let mut last_print = Instant::now(); loop { - let e = Self::run_listen( - &me, + let e = self.run_listen( &recycler, bank_forks.as_ref(), &requests_receiver, @@ -2245,10 +2279,10 @@ impl ClusterInfo { return; } if e.is_err() { - let r_gossip = me.gossip.read().unwrap(); + let r_gossip = self.gossip.read().unwrap(); debug!( "{}: run_listen timeout, table size: {}", - me.id(), + self.id(), r_gossip.crds.table.len() ); }