diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 8f46488885..83fa34c9bc 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -21,6 +21,7 @@ use crate::{ self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, SnapshotHash, Version, Vote, MAX_WALLCLOCK, }, + data_budget::DataBudget, epoch_slots::EpochSlots, result::{Error, Result}, weighted_shuffle::weighted_shuffle, @@ -103,12 +104,6 @@ pub enum ClusterInfoError { BadContactInfo, BadGossipAddress, } -#[derive(Clone)] -pub struct DataBudget { - bytes: usize, // amount of bytes we have in the budget to send - last_timestamp_ms: u64, // Last time that we upped the bytes count, - // used to detect when to up the bytes budget again -} struct GossipWriteLock<'a> { gossip: RwLockWriteGuard<'a, CrdsGossip>, @@ -252,7 +247,7 @@ pub struct ClusterInfo { pub(crate) keypair: Arc, /// The network entrypoint entrypoint: RwLock>, - outbound_budget: RwLock, + outbound_budget: DataBudget, my_contact_info: RwLock, id: Pubkey, stats: GossipStats, @@ -406,10 +401,7 @@ impl ClusterInfo { gossip: RwLock::new(CrdsGossip::default()), keypair, entrypoint: RwLock::new(None), - outbound_budget: RwLock::new(DataBudget { - bytes: 0, - last_timestamp_ms: 0, - }), + outbound_budget: DataBudget::default(), my_contact_info: RwLock::new(contact_info), id, stats: GossipStats::default(), @@ -435,7 +427,7 @@ impl ClusterInfo { gossip: RwLock::new(gossip), keypair: self.keypair.clone(), entrypoint: RwLock::new(self.entrypoint.read().unwrap().clone()), - outbound_budget: RwLock::new(self.outbound_budget.read().unwrap().clone()), + outbound_budget: self.outbound_budget.clone_non_atomic(), my_contact_info: RwLock::new(my_contact_info), id: *new_id, stats: GossipStats::default(), @@ -1832,24 +1824,18 @@ impl ClusterInfo { } } - fn update_data_budget(&self, stakes: &HashMap) { - let mut w_outbound_budget = self.outbound_budget.write().unwrap(); - - let now = timestamp(); + fn update_data_budget(&self, num_staked: usize) { const INTERVAL_MS: u64 = 100; // allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s const BYTES_PER_INTERVAL: usize = 5000; const MAX_BUDGET_MULTIPLE: usize = 5; // allow budget build-up to 5x the interval default - - if now - w_outbound_budget.last_timestamp_ms > INTERVAL_MS { - let len = std::cmp::max(stakes.len(), 2); - w_outbound_budget.bytes += len * BYTES_PER_INTERVAL; - w_outbound_budget.bytes = std::cmp::min( - w_outbound_budget.bytes, - MAX_BUDGET_MULTIPLE * len * BYTES_PER_INTERVAL, - ); - w_outbound_budget.last_timestamp_ms = now; - } + let num_staked = num_staked.max(2); + self.outbound_budget.update(INTERVAL_MS, |bytes| { + std::cmp::min( + bytes + num_staked * BYTES_PER_INTERVAL, + MAX_BUDGET_MULTIPLE * num_staked * BYTES_PER_INTERVAL, + ) + }); } // Pull requests take an incoming bloom filter of contained entries from a node @@ -1864,7 +1850,7 @@ impl ClusterInfo { let mut caller_and_filters = vec![]; let mut addrs = vec![]; let mut time = Measure::start("handle_pull_requests"); - self.update_data_budget(stakes); + self.update_data_budget(stakes.len()); for pull_data in requests { caller_and_filters.push((pull_data.caller, pull_data.filter)); addrs.push(pull_data.from_addr); @@ -1944,17 +1930,13 @@ impl ClusterInfo { let response = pull_responses[stat.to].0[stat.responses_index].clone(); let protocol = Protocol::PullResponse(self_id, vec![response]); let new_packet = Packet::from_data(&from_addr, protocol); - { - let mut w_outbound_budget = self.outbound_budget.write().unwrap(); - if w_outbound_budget.bytes > new_packet.meta.size { - sent.insert(index); - w_outbound_budget.bytes -= new_packet.meta.size; - total_bytes += new_packet.meta.size; - packets.packets.push(new_packet) - } else { - inc_new_counter_info!("gossip_pull_request-no_budget", 1); - break; - } + if self.outbound_budget.take(new_packet.meta.size) { + sent.insert(index); + total_bytes += new_packet.meta.size; + packets.packets.push(new_packet) + } else { + inc_new_counter_info!("gossip_pull_request-no_budget", 1); + break; } } time.stop(); diff --git a/core/src/data_budget.rs b/core/src/data_budget.rs new file mode 100644 index 0000000000..106c69a2c8 --- /dev/null +++ b/core/src/data_budget.rs @@ -0,0 +1,112 @@ +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; + +#[derive(Default)] +pub struct DataBudget { + // Amount of bytes we have in the budget to send. + bytes: AtomicUsize, + // Last time that we upped the bytes count, used + // to detect when to up the bytes budget again + last_timestamp_ms: AtomicU64, +} + +impl DataBudget { + // If there are enough bytes in the budget, consumes from + // the budget and returns true. Otherwise returns false. + #[must_use] + pub fn take(&self, size: usize) -> bool { + let mut budget = self.bytes.load(Ordering::Acquire); + loop { + if budget < size { + return false; + } + match self.bytes.compare_exchange_weak( + budget, + budget - size, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => return true, + Err(bytes) => budget = bytes, + } + } + } + + // Updates timestamp and returns true, if at least given milliseconds + // has passed since last update. Otherwise returns false. + fn can_update(&self, duration_millis: u64) -> bool { + let now = solana_sdk::timing::timestamp(); + let mut last_timestamp = self.last_timestamp_ms.load(Ordering::Acquire); + loop { + if now < last_timestamp + duration_millis { + return false; + } + match self.last_timestamp_ms.compare_exchange_weak( + last_timestamp, + now, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => return true, + Err(ts) => last_timestamp = ts, + } + } + } + + // Updates the budget if at least given milliseconds has passed since last + // update. Updater function maps current value of bytes to the new one. + pub fn update(&self, duration_millis: u64, updater: F) + where + F: Fn(usize) -> usize, + { + if !self.can_update(duration_millis) { + return; + } + let mut bytes = self.bytes.load(Ordering::Acquire); + loop { + match self.bytes.compare_exchange_weak( + bytes, + updater(bytes), + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(b) => bytes = b, + } + } + } + + // Non-atomic clone only for tests and simulations. + pub fn clone_non_atomic(&self) -> Self { + Self { + bytes: AtomicUsize::new(self.bytes.load(Ordering::Acquire)), + last_timestamp_ms: AtomicU64::new(self.last_timestamp_ms.load(Ordering::Acquire)), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn test_data_budget() { + let budget = DataBudget::default(); + assert!(!budget.take(1)); // budget = 0. + + budget.update(1000, |bytes| bytes + 5); // budget updates to 5. + assert!(budget.take(1)); + assert!(budget.take(2)); + assert!(!budget.take(3)); // budget = 2, out of budget. + + budget.update(30, |_| 10); // no update, budget = 2. + assert!(!budget.take(3)); // budget = 2, out of budget. + + std::thread::sleep(Duration::from_millis(50)); + budget.update(30, |bytes| bytes * 2); // budget updates to 4. + + assert!(budget.take(3)); + assert!(budget.take(1)); + assert!(!budget.take(1)); // budget = 0. + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 2a520ca77c..ae6e2679ae 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -32,6 +32,7 @@ pub mod crds_gossip_pull; pub mod crds_gossip_push; pub mod crds_shards; pub mod crds_value; +pub mod data_budget; pub mod epoch_slots; pub mod fetch_stage; pub mod fork_choice;