From a705764ca72ba975643774dde1ee43665562d1f7 Mon Sep 17 00:00:00 2001 From: sakridge Date: Mon, 8 Jun 2020 17:05:55 -0700 Subject: [PATCH] v1.1 gossip lock optimizations (#10459) * Skip gossip requests with different shred version and split lock (#10240) (cherry picked from commit 3f508b37fdfceceaf39bae5c6d4576e0f584438e) * More cluster stats and add epoch stakes cache in retransmit stage (#10345) * More cluster info metrics for push request/response counts * Cache staked peers for the epoch (cherry picked from commit ef37b82ffa8ca38b2c6db02bbb5283ac043d1f1e) * Cache tvu peers for broadcast (#10373) (cherry picked from commit 2cf719ac2c988a7c8cb989e97e04e7eb78439212) * Add pull request count metrics (#10421) (cherry picked from commit 3d2230f1a94c45ae55c2c3c7474d4a75ff039d59) --- core/benches/cluster_info.rs | 3 +- core/src/broadcast_stage.rs | 9 +- core/src/broadcast_stage/broadcast_metrics.rs | 46 +-- .../fail_entry_verification_broadcast_run.rs | 3 +- .../broadcast_stage/standard_broadcast_run.rs | 56 +++- core/src/cluster_info.rs | 301 ++++++++++++++++-- core/src/crds_gossip.rs | 18 +- core/src/crds_gossip_pull.rs | 37 ++- core/src/retransmit_stage.rs | 88 ++++- core/tests/crds_gossip.rs | 22 +- 10 files changed, 472 insertions(+), 111 deletions(-) diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index bc839f1449..bd3b00cfaf 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -3,6 +3,7 @@ extern crate test; use rand::{thread_rng, Rng}; +use solana_core::broadcast_stage::broadcast_metrics::TransmitShredsStats; use solana_core::broadcast_stage::{broadcast_shreds, get_broadcast_peers}; use solana_core::cluster_info::{ClusterInfo, Node}; use solana_core::contact_info::ContactInfo; @@ -48,7 +49,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { &peers_and_stakes, &peers, &last_datapoint, - &mut 0, + &mut TransmitShredsStats::default(), ) .unwrap(); }); diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index db4ad5160f..71a467be35 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -35,7 +35,7 @@ use std::{ }; mod broadcast_fake_shreds_run; -pub(crate) mod broadcast_metrics; +pub mod broadcast_metrics; pub(crate) mod broadcast_utils; mod fail_entry_verification_broadcast_run; mod standard_broadcast_run; @@ -374,13 +374,14 @@ pub fn broadcast_shreds( peers_and_stakes: &[(u64, usize)], peers: &[ContactInfo], last_datapoint_submit: &Arc, - send_mmsg_total: &mut u64, + transmit_stats: &mut TransmitShredsStats, ) -> Result<()> { let broadcast_len = peers_and_stakes.len(); if broadcast_len == 0 { update_peer_stats(1, 1, last_datapoint_submit); return Ok(()); } + let mut shred_select = Measure::start("shred_select"); let packets: Vec<_> = shreds .iter() .map(|shred| { @@ -389,6 +390,8 @@ pub fn broadcast_shreds( (&shred.payload, &peers[broadcast_index].tvu) }) .collect(); + shred_select.stop(); + transmit_stats.shred_select += shred_select.as_us(); let mut sent = 0; let mut send_mmsg_time = Measure::start("send_mmsg"); @@ -401,7 +404,7 @@ pub fn broadcast_shreds( } } send_mmsg_time.stop(); - *send_mmsg_total += send_mmsg_time.as_us(); + transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us(); let num_live_peers = num_live_peers(&peers); update_peer_stats( diff --git a/core/src/broadcast_stage/broadcast_metrics.rs b/core/src/broadcast_stage/broadcast_metrics.rs index dbe5af44a4..2e59b74909 100644 --- a/core/src/broadcast_stage/broadcast_metrics.rs +++ b/core/src/broadcast_stage/broadcast_metrics.rs @@ -29,11 +29,12 @@ impl ProcessShredsStats { } #[derive(Default, Clone)] -pub(crate) struct TransmitShredsStats { - pub(crate) transmit_elapsed: u64, - pub(crate) send_mmsg_elapsed: u64, - pub(crate) get_peers_elapsed: u64, - pub(crate) num_shreds: usize, +pub struct TransmitShredsStats { + pub transmit_elapsed: u64, + pub send_mmsg_elapsed: u64, + pub get_peers_elapsed: u64, + pub shred_select: u64, + pub num_shreds: usize, } impl BroadcastStats for TransmitShredsStats { @@ -42,6 +43,7 @@ impl BroadcastStats for TransmitShredsStats { self.send_mmsg_elapsed += new_stats.send_mmsg_elapsed; self.get_peers_elapsed += new_stats.get_peers_elapsed; self.num_shreds += new_stats.num_shreds; + self.shred_select += new_stats.shred_select; } fn report_stats(&mut self, slot: Slot, slot_start: Instant) { datapoint_info!( @@ -58,6 +60,7 @@ impl BroadcastStats for TransmitShredsStats { ("send_mmsg_elapsed", self.send_mmsg_elapsed as i64, i64), ("get_peers_elapsed", self.get_peers_elapsed as i64, i64), ("num_shreds", self.num_shreds as i64, i64), + ("shred_select", self.shred_select as i64, i64), ); } } @@ -176,15 +179,16 @@ mod test { } #[test] - fn test_update() { + fn test_update_broadcast() { let start = Instant::now(); let mut slot_broadcast_stats = SlotBroadcastStats::default(); slot_broadcast_stats.update( &TransmitShredsStats { transmit_elapsed: 1, - get_peers_elapsed: 1, - send_mmsg_elapsed: 1, - num_shreds: 1, + get_peers_elapsed: 2, + send_mmsg_elapsed: 3, + shred_select: 4, + num_shreds: 5, }, &Some(BroadcastShredBatchInfo { slot: 0, @@ -198,16 +202,18 @@ mod test { assert_eq!(slot_0_stats.num_batches, 1); assert_eq!(slot_0_stats.num_expected_batches.unwrap(), 2); assert_eq!(slot_0_stats.broadcast_shred_stats.transmit_elapsed, 1); - assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 1); - assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 1); - assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 1); + assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 2); + assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 3); + assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4); + assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5); slot_broadcast_stats.update( &TransmitShredsStats { - transmit_elapsed: 1, - get_peers_elapsed: 1, - send_mmsg_elapsed: 1, - num_shreds: 1, + transmit_elapsed: 7, + get_peers_elapsed: 8, + send_mmsg_elapsed: 9, + shred_select: 10, + num_shreds: 11, }, &None, ); @@ -217,9 +223,10 @@ mod test { assert_eq!(slot_0_stats.num_batches, 1); assert_eq!(slot_0_stats.num_expected_batches.unwrap(), 2); assert_eq!(slot_0_stats.broadcast_shred_stats.transmit_elapsed, 1); - assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 1); - assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 1); - assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 1); + assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 2); + assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 3); + assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4); + assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5); // If another batch is given, then total number of batches == num_expected_batches == 2, // so the batch should be purged from the HashMap @@ -228,6 +235,7 @@ mod test { transmit_elapsed: 1, get_peers_elapsed: 1, send_mmsg_elapsed: 1, + shred_select: 1, num_shreds: 1, }, &Some(BroadcastShredBatchInfo { diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 870b05219a..24ffa94dc3 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -81,14 +81,13 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { // Broadcast data let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes); - let mut send_mmsg_total = 0; broadcast_shreds( sock, &shreds, &peers_and_stakes, &peers, &Arc::new(AtomicU64::new(0)), - &mut send_mmsg_total, + &mut TransmitShredsStats::default(), )?; Ok(()) diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 0fb89cb9cb..829dec871b 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -9,6 +9,7 @@ use solana_ledger::{ }; use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us}; use std::collections::HashMap; +use std::sync::RwLock; use std::time::Duration; #[derive(Clone)] @@ -23,6 +24,14 @@ pub struct StandardBroadcastRun { shred_version: u16, last_datapoint_submit: Arc, num_batches: usize, + broadcast_peer_cache: Arc>, + last_peer_update: Arc, +} + +#[derive(Default)] +struct BroadcastPeerCache { + peers: Vec, + peers_and_stakes: Vec<(u64, usize)>, } impl StandardBroadcastRun { @@ -38,6 +47,8 @@ impl StandardBroadcastRun { shred_version, last_datapoint_submit: Arc::new(AtomicU64::new(0)), num_batches: 0, + broadcast_peer_cache: Arc::new(RwLock::new(BroadcastPeerCache::default())), + last_peer_update: Arc::new(AtomicU64::new(0)), } } @@ -293,33 +304,46 @@ impl StandardBroadcastRun { shreds: Arc>, broadcast_shred_batch_info: Option, ) -> Result<()> { + const BROADCAST_PEER_UPDATE_INTERVAL_MS: u64 = 1000; trace!("Broadcasting {:?} shreds", shreds.len()); // Get the list of peers to broadcast to - let get_peers_start = Instant::now(); - let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes); - let get_peers_elapsed = get_peers_start.elapsed(); + let mut get_peers_time = Measure::start("broadcast::get_peers"); + let now = timestamp(); + let last = self.last_peer_update.load(Ordering::Relaxed); + if now - last > BROADCAST_PEER_UPDATE_INTERVAL_MS + && self + .last_peer_update + .compare_and_swap(now, last, Ordering::Relaxed) + == last + { + let mut w_broadcast_peer_cache = self.broadcast_peer_cache.write().unwrap(); + let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes); + w_broadcast_peer_cache.peers = peers; + w_broadcast_peer_cache.peers_and_stakes = peers_and_stakes; + } + get_peers_time.stop(); + let r_broadcast_peer_cache = self.broadcast_peer_cache.read().unwrap(); + let mut transmit_stats = TransmitShredsStats::default(); // Broadcast the shreds - let transmit_start = Instant::now(); - let mut send_mmsg_total = 0; + let mut transmit_time = Measure::start("broadcast_shreds"); broadcast_shreds( sock, &shreds, - &peers_and_stakes, - &peers, + &r_broadcast_peer_cache.peers_and_stakes, + &r_broadcast_peer_cache.peers, &self.last_datapoint_submit, - &mut send_mmsg_total, + &mut transmit_stats, )?; - let transmit_elapsed = transmit_start.elapsed(); - let new_transmit_shreds_stats = TransmitShredsStats { - transmit_elapsed: duration_as_us(&transmit_elapsed), - get_peers_elapsed: duration_as_us(&get_peers_elapsed), - send_mmsg_elapsed: send_mmsg_total, - num_shreds: shreds.len(), - }; + drop(r_broadcast_peer_cache); + transmit_time.stop(); + + transmit_stats.transmit_elapsed = transmit_time.as_us(); + transmit_stats.get_peers_elapsed = get_peers_time.as_us(); + transmit_stats.num_shreds = shreds.len(); // Process metrics - self.update_transmit_metrics(&new_transmit_shreds_stats, &broadcast_shred_batch_info); + self.update_transmit_metrics(&transmit_stats, &broadcast_shred_batch_info); Ok(()) } diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index feab4ae8fa..00ac5030e4 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -214,11 +214,15 @@ struct GossipStats { repair_peers: Counter, new_push_requests: Counter, new_push_requests2: Counter, + new_push_requests_num: Counter, process_pull_response: Counter, process_pull_response_count: Counter, process_pull_response_len: Counter, process_pull_response_timeout: Counter, + process_pull_response_fail: Counter, + process_pull_response_success: Counter, process_pull_requests: Counter, + generate_pull_responses: Counter, process_prune: Counter, process_push_message: Counter, prune_received_cache: Counter, @@ -227,7 +231,14 @@ struct GossipStats { epoch_slots_push: Counter, push_message: Counter, new_pull_requests: Counter, + new_pull_requests_count: Counter, mark_pull_request: Counter, + skip_pull_response_shred_version: Counter, + skip_pull_shred_version: Counter, + skip_push_message_shred_version: Counter, + push_message_count: Counter, + push_message_value_count: Counter, + push_response_count: Counter, } pub struct ClusterInfo { @@ -1397,6 +1408,9 @@ impl ClusterInfo { .collect() }; self.append_entrypoint_to_pulls(&mut pulls); + self.stats + .new_pull_requests_count + .add_relaxed(pulls.len() as u64); pulls .into_iter() .map(|(peer, filter, gossip, self_info)| { @@ -1411,7 +1425,7 @@ impl ClusterInfo { let (_, push_messages) = self .time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests) .new_push_messages(timestamp()); - push_messages + let messages: Vec<_> = push_messages .into_iter() .filter_map(|(peer, messages)| { let peer_label = CrdsValueLabel::ContactInfo(peer); @@ -1426,7 +1440,11 @@ impl ClusterInfo { .into_iter() .map(move |payload| (peer, Protocol::PushMessage(self_id, payload))) }) - .collect() + .collect(); + self.stats + .new_push_requests_num + .add_relaxed(messages.len() as u64); + messages } fn gossip_request(&self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { @@ -1583,12 +1601,17 @@ impl ClusterInfo { if contact_info.id == me.id() { warn!("PullRequest ignored, I'm talking to myself"); inc_new_counter_debug!("cluster_info-window-request-loopback", 1); - } else { + } else if contact_info.shred_version == 0 + || contact_info.shred_version == me.my_shred_version() + || me.my_shred_version() == 0 + { gossip_pull_data.push(PullData { from_addr, caller, filter, }); + } else { + me.stats.skip_pull_shred_version.add_relaxed(1); } } datapoint_debug!( @@ -1677,6 +1700,26 @@ impl ClusterInfo { } } + fn update_data_budget(&self, stakes: &HashMap) { + let mut w_outbound_budget = self.outbound_budget.write().unwrap(); + + let now = timestamp(); + const INTERVAL_MS: u64 = 100; + // allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s + const BYTES_PER_INTERVAL: usize = 5000; + const MAX_BUDGET_MULTIPLE: usize = 5; // allow budget build-up to 5x the interval default + + if now - w_outbound_budget.last_timestamp_ms > INTERVAL_MS { + let len = std::cmp::max(stakes.len(), 2); + w_outbound_budget.bytes += len * BYTES_PER_INTERVAL; + w_outbound_budget.bytes = std::cmp::min( + w_outbound_budget.bytes, + MAX_BUDGET_MULTIPLE * len * BYTES_PER_INTERVAL, + ); + w_outbound_budget.last_timestamp_ms = now; + } + } + // Pull requests take an incoming bloom filter of contained entries from a node // and tries to send back to them the values it detects are missing. fn handle_pull_requests( @@ -1689,33 +1732,19 @@ impl ClusterInfo { let mut caller_and_filters = vec![]; let mut addrs = vec![]; let mut time = Measure::start("handle_pull_requests"); - { - let mut w_outbound_budget = me.outbound_budget.write().unwrap(); - - let now = timestamp(); - const INTERVAL_MS: u64 = 100; - // allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s - const BYTES_PER_INTERVAL: usize = 5000; - const MAX_BUDGET_MULTIPLE: usize = 5; // allow budget build-up to 5x the interval default - - if now - w_outbound_budget.last_timestamp_ms > INTERVAL_MS { - let len = std::cmp::max(stakes.len(), 2); - w_outbound_budget.bytes += len * BYTES_PER_INTERVAL; - w_outbound_budget.bytes = std::cmp::min( - w_outbound_budget.bytes, - MAX_BUDGET_MULTIPLE * len * BYTES_PER_INTERVAL, - ); - w_outbound_budget.last_timestamp_ms = now; - } - } + me.update_data_budget(stakes); for pull_data in requests { caller_and_filters.push((pull_data.caller, pull_data.filter)); addrs.push(pull_data.from_addr); } let now = timestamp(); let self_id = me.id(); + let pull_responses = me - .time_gossip_write_lock("process_pull_reqs", &me.stats.process_pull_requests) + .time_gossip_read_lock("generate_pull_responses", &me.stats.generate_pull_responses) + .generate_pull_responses(&caller_and_filters); + + me.time_gossip_write_lock("process_pull_reqs", &me.stats.process_pull_requests) .process_pull_requests(caller_and_filters, now); // Filter bad to addresses @@ -1812,37 +1841,94 @@ impl ClusterInfo { Some(packets) } + // Returns (failed, timeout, success) fn handle_pull_response( me: &Self, from: &Pubkey, - data: Vec, + mut crds_values: Vec, timeouts: &HashMap, - ) { - let len = data.len(); + ) -> (usize, usize, usize) { + let len = crds_values.len(); trace!("PullResponse me: {} from: {} len={}", me.id, from, len); - let (_fail, timeout_count) = me + + if let Some(shred_version) = me.lookup_contact_info(from, |ci| ci.shred_version) { + Self::filter_by_shred_version( + from, + &mut crds_values, + shred_version, + me.my_shred_version(), + ); + } + let filtered_len = crds_values.len(); + + let (fail, timeout_count, success) = me .time_gossip_write_lock("process_pull", &me.stats.process_pull_response) - .process_pull_response(from, timeouts, data, timestamp()); + .process_pull_response(from, timeouts, crds_values, timestamp()); + + me.stats + .skip_pull_response_shred_version + .add_relaxed((len - filtered_len) as u64); me.stats.process_pull_response_count.add_relaxed(1); - me.stats.process_pull_response_len.add_relaxed(len as u64); + me.stats + .process_pull_response_len + .add_relaxed(filtered_len as u64); me.stats .process_pull_response_timeout .add_relaxed(timeout_count as u64); + me.stats.process_pull_response_fail.add_relaxed(fail as u64); + me.stats + .process_pull_response_success + .add_relaxed(success as u64); + + (fail, timeout_count, success) + } + + fn filter_by_shred_version( + from: &Pubkey, + crds_values: &mut Vec, + shred_version: u16, + my_shred_version: u16, + ) { + if my_shred_version != 0 && shred_version != 0 && shred_version != my_shred_version { + // Allow someone to update their own ContactInfo so they + // can change shred versions if needed. + crds_values.retain(|crds_value| match &crds_value.data { + CrdsData::ContactInfo(contact_info) => contact_info.id == *from, + _ => false, + }); + } } fn handle_push_message( me: &Self, recycler: &PacketsRecycler, from: &Pubkey, - data: Vec, + mut crds_values: Vec, stakes: &HashMap, ) -> Option { let self_id = me.id(); - inc_new_counter_debug!("cluster_info-push_message", 1); + me.stats.push_message_count.add_relaxed(1); + let len = crds_values.len(); + + if let Some(shred_version) = me.lookup_contact_info(from, |ci| ci.shred_version) { + Self::filter_by_shred_version( + from, + &mut crds_values, + shred_version, + me.my_shred_version(), + ); + } + let filtered_len = crds_values.len(); + me.stats + .push_message_value_count + .add_relaxed(filtered_len as u64); + me.stats + .skip_push_message_shred_version + .add_relaxed((len - filtered_len) as u64); let updated: Vec<_> = me .time_gossip_write_lock("process_push", &me.stats.process_push_message) - .process_push_message(from, data, timestamp()); + .process_push_message(from, crds_values, timestamp()); let updated_labels: Vec<_> = updated.into_iter().map(|u| u.value.label()).collect(); let prunes_map: HashMap> = me @@ -1872,6 +1958,9 @@ impl ClusterInfo { return None; } let mut packets = to_packets_with_destination(recycler.clone(), &rsp); + me.stats + .push_response_count + .add_relaxed(packets.packets.len() as u64); if !packets.is_empty() { let pushes: Vec<_> = me.new_push_requests(); inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len()); @@ -1963,6 +2052,11 @@ impl ClusterInfo { ), ("all_tvu_peers", self.stats.all_tvu_peers.clear(), i64), ("tvu_peers", self.stats.tvu_peers.clear(), i64), + ( + "new_push_requests_num", + self.stats.new_push_requests2.clear(), + i64 + ), ); datapoint_info!( "cluster_info_stats2", @@ -1989,6 +2083,26 @@ impl ClusterInfo { self.stats.process_pull_response_count.clear(), i64 ), + ( + "process_pull_resp_success", + self.stats.process_pull_response_success.clear(), + i64 + ), + ( + "process_pull_resp_timeout", + self.stats.process_pull_response_timeout.clear(), + i64 + ), + ( + "process_pull_resp_fail", + self.stats.process_pull_response_fail.clear(), + i64 + ), + ( + "push_response_count", + self.stats.push_response_count.clear(), + i64 + ), ); datapoint_info!( "cluster_info_stats3", @@ -2002,6 +2116,11 @@ impl ClusterInfo { self.stats.process_pull_requests.clear(), i64 ), + ( + "generate_pull_responses", + self.stats.generate_pull_responses.clear(), + i64 + ), ("process_prune", self.stats.process_prune.clear(), i64), ( "process_push_message", @@ -2031,6 +2150,39 @@ impl ClusterInfo { i64 ), ); + datapoint_info!( + "cluster_info_stats4", + ( + "skip_push_message_shred_version", + self.stats.skip_push_message_shred_version.clear(), + i64 + ), + ( + "skip_pull_response_shred_version", + self.stats.skip_pull_response_shred_version.clear(), + i64 + ), + ( + "skip_pull_shred_version", + self.stats.skip_pull_shred_version.clear(), + i64 + ), + ( + "push_message_count", + self.stats.push_message_count.clear(), + i64 + ), + ( + "push_message_value_count", + self.stats.push_message_value_count.clear(), + i64 + ), + ( + "new_pull_requests_count", + self.stats.new_pull_requests_count.clear(), + i64 + ), + ); *last_print = Instant::now(); } @@ -2412,6 +2564,91 @@ mod tests { assert!(ClusterInfo::is_spy_node(&node)); } + #[test] + fn test_handle_pull() { + let node = Node::new_localhost(); + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info)); + + let entrypoint_pubkey = Pubkey::new_rand(); + let data = test_crds_values(entrypoint_pubkey); + let timeouts = HashMap::new(); + assert_eq!( + (0, 0, 1), + ClusterInfo::handle_pull_response( + &cluster_info, + &entrypoint_pubkey, + data.clone(), + &timeouts + ) + ); + + let entrypoint_pubkey2 = Pubkey::new_rand(); + assert_eq!( + (1, 0, 0), + ClusterInfo::handle_pull_response(&cluster_info, &entrypoint_pubkey2, data, &timeouts) + ); + } + + fn test_crds_values(pubkey: Pubkey) -> Vec { + let entrypoint = ContactInfo::new_localhost(&pubkey, timestamp()); + let entrypoint_crdsvalue = + CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone())); + vec![entrypoint_crdsvalue] + } + + #[test] + fn test_filter_shred_version() { + let from = Pubkey::new_rand(); + let my_shred_version = 1; + let other_shred_version = 1; + + // Allow same shred_version + let mut values = test_crds_values(from); + ClusterInfo::filter_by_shred_version( + &from, + &mut values, + other_shred_version, + my_shred_version, + ); + assert_eq!(values.len(), 1); + + // Allow shred_version=0. + let other_shred_version = 0; + ClusterInfo::filter_by_shred_version( + &from, + &mut values, + other_shred_version, + my_shred_version, + ); + assert_eq!(values.len(), 1); + + // Change to sender's ContactInfo version, allow that. + let other_shred_version = 2; + ClusterInfo::filter_by_shred_version( + &from, + &mut values, + other_shred_version, + my_shred_version, + ); + assert_eq!(values.len(), 1); + + let snapshot_hash_data = CrdsValue::new_unsigned(CrdsData::SnapshotHashes(SnapshotHash { + from: Pubkey::new_rand(), + hashes: vec![], + wallclock: 0, + })); + values.push(snapshot_hash_data); + // Change to sender's ContactInfo version, allow that. + let other_shred_version = 2; + ClusterInfo::filter_by_shred_version( + &from, + &mut values, + other_shred_version, + my_shred_version, + ); + assert_eq!(values.len(), 1); + } + #[test] fn test_cluster_spy_gossip() { //check that gossip doesn't try to push to invalid addresses diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 7cce505532..f6e2739dc7 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -158,14 +158,18 @@ impl CrdsGossip { self.pull.mark_pull_request_creation_time(from, now) } /// process a pull request and create a response - pub fn process_pull_requests( - &mut self, - filters: Vec<(CrdsValue, CrdsFilter)>, - now: u64, - ) -> Vec> { + pub fn process_pull_requests(&mut self, filters: Vec<(CrdsValue, CrdsFilter)>, now: u64) { self.pull - .process_pull_requests(&mut self.crds, filters, now) + .process_pull_requests(&mut self.crds, filters, now); } + + pub fn generate_pull_responses( + &self, + filters: &[(CrdsValue, CrdsFilter)], + ) -> Vec> { + self.pull.generate_pull_responses(&self.crds, filters) + } + /// process a pull response pub fn process_pull_response( &mut self, @@ -173,7 +177,7 @@ impl CrdsGossip { timeouts: &HashMap, response: Vec, now: u64, - ) -> (usize, usize) { + ) -> (usize, usize, usize) { self.pull .process_pull_response(&mut self.crds, from, timeouts, response, now) } diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 750df1d4ae..534abea0bb 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -204,14 +204,13 @@ impl CrdsGossipPull { self.purged_values.push_back((hash, timestamp)) } - /// process a pull request and create a response + /// process a pull request pub fn process_pull_requests( &mut self, crds: &mut Crds, requests: Vec<(CrdsValue, CrdsFilter)>, now: u64, - ) -> Vec> { - let rv = self.filter_crds_values(crds, &requests); + ) { requests.into_iter().for_each(|(caller, _)| { let key = caller.label().pubkey(); let old = crds.insert(caller, now); @@ -221,8 +220,17 @@ impl CrdsGossipPull { } crds.update_record_timestamp(&key, now); }); - rv } + + /// Create gossip responses to pull requests + pub fn generate_pull_responses( + &self, + crds: &Crds, + requests: &[(CrdsValue, CrdsFilter)], + ) -> Vec> { + self.filter_crds_values(crds, requests) + } + /// process a pull response pub fn process_pull_response( &mut self, @@ -231,9 +239,10 @@ impl CrdsGossipPull { timeouts: &HashMap, response: Vec, now: u64, - ) -> (usize, usize) { + ) -> (usize, usize, usize) { let mut failed = 0; let mut timeout_count = 0; + let mut success = 0; for r in response { let owner = r.label().pubkey(); // Check if the crds value is older than the msg_timeout @@ -274,7 +283,11 @@ impl CrdsGossipPull { } } let old = crds.insert(r, now); - failed += old.is_err() as usize; + if old.is_err() { + failed += 1; + } else { + success += 1; + } old.ok().map(|opt| { crds.update_record_timestamp(&owner, now); opt.map(|val| { @@ -284,7 +297,7 @@ impl CrdsGossipPull { }); } crds.update_record_timestamp(from, now); - (failed, timeout_count) + (failed, timeout_count, success) } // build a set of filters of the current crds table // num_filters - used to increase the likelyhood of a value in crds being added to some filter @@ -573,8 +586,9 @@ mod test { let mut dest_crds = Crds::default(); let mut dest = CrdsGossipPull::default(); let (_, filters, caller) = req.unwrap(); - let filters = filters.into_iter().map(|f| (caller.clone(), f)).collect(); - let rsp = dest.process_pull_requests(&mut dest_crds, filters, 1); + let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); + let rsp = dest.generate_pull_responses(&dest_crds, &filters); + dest.process_pull_requests(&mut dest_crds, filters, 1); assert!(rsp.iter().all(|rsp| rsp.is_empty())); assert!(dest_crds.lookup(&caller.label()).is_some()); assert_eq!( @@ -643,8 +657,9 @@ mod test { PACKET_DATA_SIZE, ); let (_, filters, caller) = req.unwrap(); - let filters = filters.into_iter().map(|f| (caller.clone(), f)).collect(); - let mut rsp = dest.process_pull_requests(&mut dest_crds, filters, 0); + let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); + let mut rsp = dest.generate_pull_responses(&dest_crds, &filters); + dest.process_pull_requests(&mut dest_crds, filters, 0); // if there is a false positive this is empty // prob should be around 0.1 per iteration if rsp.is_empty() { diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 34ebd210a9..e764480f45 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -3,6 +3,7 @@ use crate::{ cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, cluster_slots::ClusterSlots, + contact_info::ContactInfo, repair_service::RepairStrategy, result::{Error, Result}, window_service::{should_retransmit_and_persist, WindowService}, @@ -17,8 +18,9 @@ use solana_ledger::{ use solana_measure::measure::Measure; use solana_metrics::inc_new_counter_error; use solana_perf::packet::Packets; -use solana_sdk::clock::Slot; +use solana_sdk::clock::{Epoch, Slot}; use solana_sdk::epoch_schedule::EpochSchedule; +use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use solana_streamer::streamer::PacketReceiver; use std::{ @@ -43,6 +45,8 @@ struct RetransmitStats { total_packets: AtomicU64, total_batches: AtomicU64, total_time: AtomicU64, + epoch_fetch: AtomicU64, + epoch_cache_update: AtomicU64, repair_total: AtomicU64, discard_total: AtomicU64, retransmit_total: AtomicU64, @@ -64,6 +68,8 @@ fn update_retransmit_stats( peers_len: usize, packets_by_slot: HashMap, packets_by_source: HashMap, + epoch_fetch: u64, + epoch_cach_update: u64, ) { stats.total_time.fetch_add(total_time, Ordering::Relaxed); stats @@ -82,6 +88,10 @@ fn update_retransmit_stats( .compute_turbine_peers_total .fetch_add(compute_turbine_peers_total, Ordering::Relaxed); stats.total_batches.fetch_add(1, Ordering::Relaxed); + stats.epoch_fetch.fetch_add(epoch_fetch, Ordering::Relaxed); + stats + .epoch_cache_update + .fetch_add(epoch_cach_update, Ordering::Relaxed); { let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap(); for (slot, count) in packets_by_slot { @@ -106,6 +116,16 @@ fn update_retransmit_stats( stats.total_time.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "epoch_fetch", + stats.epoch_fetch.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "epoch_cache_update", + stats.epoch_cache_update.swap(0, Ordering::Relaxed) as i64, + i64 + ), ( "total_batches", stats.total_batches.swap(0, Ordering::Relaxed) as i64, @@ -147,6 +167,14 @@ fn update_retransmit_stats( } } +#[derive(Default)] +struct EpochStakesCache { + epoch: Epoch, + stakes: Option>>, + peers: Vec, + stakes_and_index: Vec<(u64, usize)>, +} + fn retransmit( bank_forks: &Arc>, leader_schedule_cache: &Arc, @@ -155,6 +183,8 @@ fn retransmit( sock: &UdpSocket, id: u32, stats: &Arc, + epoch_stakes_cache: &Arc>, + last_peer_update: &Arc, ) -> Result<()> { let timer = Duration::new(1, 0); let r_lock = r.lock().unwrap(); @@ -171,12 +201,42 @@ fn retransmit( } drop(r_lock); + let mut epoch_fetch = Measure::start("retransmit_epoch_fetch"); let r_bank = bank_forks.read().unwrap().working_bank(); let bank_epoch = r_bank.get_leader_schedule_epoch(r_bank.slot()); + epoch_fetch.stop(); + + let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update"); + let mut r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap(); + if r_epoch_stakes_cache.epoch != bank_epoch { + drop(r_epoch_stakes_cache); + let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap(); + if w_epoch_stakes_cache.epoch != bank_epoch { + let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch); + let stakes = stakes.map(Arc::new); + w_epoch_stakes_cache.stakes = stakes; + w_epoch_stakes_cache.epoch = bank_epoch; + } + drop(w_epoch_stakes_cache); + r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap(); + } + + let now = timestamp(); + let last = last_peer_update.load(Ordering::Relaxed); + if now - last > 1000 && last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last + { + drop(r_epoch_stakes_cache); + let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap(); + let (peers, stakes_and_index) = + cluster_info.sorted_retransmit_peers_and_stakes(w_epoch_stakes_cache.stakes.clone()); + w_epoch_stakes_cache.peers = peers; + w_epoch_stakes_cache.stakes_and_index = stakes_and_index; + drop(w_epoch_stakes_cache); + r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap(); + } let mut peers_len = 0; - let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch); - let stakes = stakes.map(Arc::new); - let (peers, stakes_and_index) = cluster_info.sorted_retransmit_peers_and_stakes(stakes); + epoch_cache_update.stop(); + let my_id = cluster_info.id(); let mut discard_total = 0; let mut repair_total = 0; @@ -201,8 +261,8 @@ fn retransmit( let mut compute_turbine_peers = Measure::start("turbine_start"); let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( &my_id, - &peers, - &stakes_and_index, + &r_epoch_stakes_cache.peers, + &r_epoch_stakes_cache.stakes_and_index, packet.meta.seed, ); peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); @@ -215,8 +275,14 @@ fn retransmit( let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes); - let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect(); - let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect(); + let neighbors: Vec<_> = neighbors + .into_iter() + .map(|index| &r_epoch_stakes_cache.peers[index]) + .collect(); + let children: Vec<_> = children + .into_iter() + .map(|index| &r_epoch_stakes_cache.peers[index]) + .collect(); compute_turbine_peers.stop(); compute_turbine_peers_total += compute_turbine_peers.as_us(); @@ -257,6 +323,8 @@ fn retransmit( peers_len, packets_by_slot, packets_by_source, + epoch_fetch.as_us(), + epoch_cache_update.as_us(), ); Ok(()) @@ -286,6 +354,8 @@ pub fn retransmitter( let r = r.clone(); let cluster_info = cluster_info.clone(); let stats = stats.clone(); + let epoch_stakes_cache = Arc::new(RwLock::new(EpochStakesCache::default())); + let last_peer_update = Arc::new(AtomicU64::new(0)); Builder::new() .name("solana-retransmitter".to_string()) @@ -300,6 +370,8 @@ pub fn retransmitter( &sockets[s], s as u32, &stats, + &epoch_stakes_cache, + &last_peer_update, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 244d52d5df..0c37d9f0e4 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -426,23 +426,21 @@ fn network_run_pull( .map(|f| f.filter.bits.len() as usize / 8) .sum::(); bytes += serialized_size(&caller_info).unwrap() as usize; - let filters = filters + let filters: Vec<_> = filters .into_iter() .map(|f| (caller_info.clone(), f)) .collect(); - let rsp = network + let rsp: Vec<_> = network .get(&to) .map(|node| { - let mut rsp = vec![]; - rsp.append( - &mut node - .lock() - .unwrap() - .process_pull_requests(filters, now) - .into_iter() - .flatten() - .collect(), - ); + let rsp = node + .lock() + .unwrap() + .generate_pull_responses(&filters) + .into_iter() + .flatten() + .collect(); + node.lock().unwrap().process_pull_requests(filters, now); rsp }) .unwrap();