(cherry picked from commit 05cf15a382
)
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
@ -21,6 +21,7 @@ use crate::{
|
|||||||
self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, SnapshotHash,
|
self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, SnapshotHash,
|
||||||
Version, Vote, MAX_WALLCLOCK,
|
Version, Vote, MAX_WALLCLOCK,
|
||||||
},
|
},
|
||||||
|
data_budget::DataBudget,
|
||||||
epoch_slots::EpochSlots,
|
epoch_slots::EpochSlots,
|
||||||
result::{Error, Result},
|
result::{Error, Result},
|
||||||
weighted_shuffle::weighted_shuffle,
|
weighted_shuffle::weighted_shuffle,
|
||||||
@ -103,12 +104,6 @@ pub enum ClusterInfoError {
|
|||||||
BadContactInfo,
|
BadContactInfo,
|
||||||
BadGossipAddress,
|
BadGossipAddress,
|
||||||
}
|
}
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct DataBudget {
|
|
||||||
bytes: usize, // amount of bytes we have in the budget to send
|
|
||||||
last_timestamp_ms: u64, // Last time that we upped the bytes count,
|
|
||||||
// used to detect when to up the bytes budget again
|
|
||||||
}
|
|
||||||
|
|
||||||
struct GossipWriteLock<'a> {
|
struct GossipWriteLock<'a> {
|
||||||
gossip: RwLockWriteGuard<'a, CrdsGossip>,
|
gossip: RwLockWriteGuard<'a, CrdsGossip>,
|
||||||
@ -252,7 +247,7 @@ pub struct ClusterInfo {
|
|||||||
pub(crate) keypair: Arc<Keypair>,
|
pub(crate) keypair: Arc<Keypair>,
|
||||||
/// The network entrypoint
|
/// The network entrypoint
|
||||||
entrypoint: RwLock<Option<ContactInfo>>,
|
entrypoint: RwLock<Option<ContactInfo>>,
|
||||||
outbound_budget: RwLock<DataBudget>,
|
outbound_budget: DataBudget,
|
||||||
my_contact_info: RwLock<ContactInfo>,
|
my_contact_info: RwLock<ContactInfo>,
|
||||||
id: Pubkey,
|
id: Pubkey,
|
||||||
stats: GossipStats,
|
stats: GossipStats,
|
||||||
@ -406,10 +401,7 @@ impl ClusterInfo {
|
|||||||
gossip: RwLock::new(CrdsGossip::default()),
|
gossip: RwLock::new(CrdsGossip::default()),
|
||||||
keypair,
|
keypair,
|
||||||
entrypoint: RwLock::new(None),
|
entrypoint: RwLock::new(None),
|
||||||
outbound_budget: RwLock::new(DataBudget {
|
outbound_budget: DataBudget::default(),
|
||||||
bytes: 0,
|
|
||||||
last_timestamp_ms: 0,
|
|
||||||
}),
|
|
||||||
my_contact_info: RwLock::new(contact_info),
|
my_contact_info: RwLock::new(contact_info),
|
||||||
id,
|
id,
|
||||||
stats: GossipStats::default(),
|
stats: GossipStats::default(),
|
||||||
@ -435,7 +427,7 @@ impl ClusterInfo {
|
|||||||
gossip: RwLock::new(gossip),
|
gossip: RwLock::new(gossip),
|
||||||
keypair: self.keypair.clone(),
|
keypair: self.keypair.clone(),
|
||||||
entrypoint: RwLock::new(self.entrypoint.read().unwrap().clone()),
|
entrypoint: RwLock::new(self.entrypoint.read().unwrap().clone()),
|
||||||
outbound_budget: RwLock::new(self.outbound_budget.read().unwrap().clone()),
|
outbound_budget: self.outbound_budget.clone_non_atomic(),
|
||||||
my_contact_info: RwLock::new(my_contact_info),
|
my_contact_info: RwLock::new(my_contact_info),
|
||||||
id: *new_id,
|
id: *new_id,
|
||||||
stats: GossipStats::default(),
|
stats: GossipStats::default(),
|
||||||
@ -1832,24 +1824,18 @@ impl ClusterInfo {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_data_budget(&self, stakes: &HashMap<Pubkey, u64>) {
|
fn update_data_budget(&self, num_staked: usize) {
|
||||||
let mut w_outbound_budget = self.outbound_budget.write().unwrap();
|
|
||||||
|
|
||||||
let now = timestamp();
|
|
||||||
const INTERVAL_MS: u64 = 100;
|
const INTERVAL_MS: u64 = 100;
|
||||||
// allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s
|
// allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s
|
||||||
const BYTES_PER_INTERVAL: usize = 5000;
|
const BYTES_PER_INTERVAL: usize = 5000;
|
||||||
const MAX_BUDGET_MULTIPLE: usize = 5; // allow budget build-up to 5x the interval default
|
const MAX_BUDGET_MULTIPLE: usize = 5; // allow budget build-up to 5x the interval default
|
||||||
|
let num_staked = num_staked.max(2);
|
||||||
if now - w_outbound_budget.last_timestamp_ms > INTERVAL_MS {
|
self.outbound_budget.update(INTERVAL_MS, |bytes| {
|
||||||
let len = std::cmp::max(stakes.len(), 2);
|
std::cmp::min(
|
||||||
w_outbound_budget.bytes += len * BYTES_PER_INTERVAL;
|
bytes + num_staked * BYTES_PER_INTERVAL,
|
||||||
w_outbound_budget.bytes = std::cmp::min(
|
MAX_BUDGET_MULTIPLE * num_staked * BYTES_PER_INTERVAL,
|
||||||
w_outbound_budget.bytes,
|
)
|
||||||
MAX_BUDGET_MULTIPLE * len * BYTES_PER_INTERVAL,
|
});
|
||||||
);
|
|
||||||
w_outbound_budget.last_timestamp_ms = now;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pull requests take an incoming bloom filter of contained entries from a node
|
// Pull requests take an incoming bloom filter of contained entries from a node
|
||||||
@ -1864,7 +1850,7 @@ impl ClusterInfo {
|
|||||||
let mut caller_and_filters = vec![];
|
let mut caller_and_filters = vec![];
|
||||||
let mut addrs = vec![];
|
let mut addrs = vec![];
|
||||||
let mut time = Measure::start("handle_pull_requests");
|
let mut time = Measure::start("handle_pull_requests");
|
||||||
self.update_data_budget(stakes);
|
self.update_data_budget(stakes.len());
|
||||||
for pull_data in requests {
|
for pull_data in requests {
|
||||||
caller_and_filters.push((pull_data.caller, pull_data.filter));
|
caller_and_filters.push((pull_data.caller, pull_data.filter));
|
||||||
addrs.push(pull_data.from_addr);
|
addrs.push(pull_data.from_addr);
|
||||||
@ -1944,17 +1930,13 @@ impl ClusterInfo {
|
|||||||
let response = pull_responses[stat.to].0[stat.responses_index].clone();
|
let response = pull_responses[stat.to].0[stat.responses_index].clone();
|
||||||
let protocol = Protocol::PullResponse(self_id, vec![response]);
|
let protocol = Protocol::PullResponse(self_id, vec![response]);
|
||||||
let new_packet = Packet::from_data(&from_addr, protocol);
|
let new_packet = Packet::from_data(&from_addr, protocol);
|
||||||
{
|
if self.outbound_budget.take(new_packet.meta.size) {
|
||||||
let mut w_outbound_budget = self.outbound_budget.write().unwrap();
|
sent.insert(index);
|
||||||
if w_outbound_budget.bytes > new_packet.meta.size {
|
total_bytes += new_packet.meta.size;
|
||||||
sent.insert(index);
|
packets.packets.push(new_packet)
|
||||||
w_outbound_budget.bytes -= new_packet.meta.size;
|
} else {
|
||||||
total_bytes += new_packet.meta.size;
|
inc_new_counter_info!("gossip_pull_request-no_budget", 1);
|
||||||
packets.packets.push(new_packet)
|
break;
|
||||||
} else {
|
|
||||||
inc_new_counter_info!("gossip_pull_request-no_budget", 1);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
time.stop();
|
time.stop();
|
||||||
|
112
core/src/data_budget.rs
Normal file
112
core/src/data_budget.rs
Normal file
@ -0,0 +1,112 @@
|
|||||||
|
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct DataBudget {
|
||||||
|
// Amount of bytes we have in the budget to send.
|
||||||
|
bytes: AtomicUsize,
|
||||||
|
// Last time that we upped the bytes count, used
|
||||||
|
// to detect when to up the bytes budget again
|
||||||
|
last_timestamp_ms: AtomicU64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DataBudget {
|
||||||
|
// If there are enough bytes in the budget, consumes from
|
||||||
|
// the budget and returns true. Otherwise returns false.
|
||||||
|
#[must_use]
|
||||||
|
pub fn take(&self, size: usize) -> bool {
|
||||||
|
let mut budget = self.bytes.load(Ordering::Acquire);
|
||||||
|
loop {
|
||||||
|
if budget < size {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
match self.bytes.compare_exchange_weak(
|
||||||
|
budget,
|
||||||
|
budget - size,
|
||||||
|
Ordering::AcqRel,
|
||||||
|
Ordering::Acquire,
|
||||||
|
) {
|
||||||
|
Ok(_) => return true,
|
||||||
|
Err(bytes) => budget = bytes,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Updates timestamp and returns true, if at least given milliseconds
|
||||||
|
// has passed since last update. Otherwise returns false.
|
||||||
|
fn can_update(&self, duration_millis: u64) -> bool {
|
||||||
|
let now = solana_sdk::timing::timestamp();
|
||||||
|
let mut last_timestamp = self.last_timestamp_ms.load(Ordering::Acquire);
|
||||||
|
loop {
|
||||||
|
if now < last_timestamp + duration_millis {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
match self.last_timestamp_ms.compare_exchange_weak(
|
||||||
|
last_timestamp,
|
||||||
|
now,
|
||||||
|
Ordering::AcqRel,
|
||||||
|
Ordering::Acquire,
|
||||||
|
) {
|
||||||
|
Ok(_) => return true,
|
||||||
|
Err(ts) => last_timestamp = ts,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Updates the budget if at least given milliseconds has passed since last
|
||||||
|
// update. Updater function maps current value of bytes to the new one.
|
||||||
|
pub fn update<F>(&self, duration_millis: u64, updater: F)
|
||||||
|
where
|
||||||
|
F: Fn(usize) -> usize,
|
||||||
|
{
|
||||||
|
if !self.can_update(duration_millis) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let mut bytes = self.bytes.load(Ordering::Acquire);
|
||||||
|
loop {
|
||||||
|
match self.bytes.compare_exchange_weak(
|
||||||
|
bytes,
|
||||||
|
updater(bytes),
|
||||||
|
Ordering::AcqRel,
|
||||||
|
Ordering::Acquire,
|
||||||
|
) {
|
||||||
|
Ok(_) => break,
|
||||||
|
Err(b) => bytes = b,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Non-atomic clone only for tests and simulations.
|
||||||
|
pub fn clone_non_atomic(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
bytes: AtomicUsize::new(self.bytes.load(Ordering::Acquire)),
|
||||||
|
last_timestamp_ms: AtomicU64::new(self.last_timestamp_ms.load(Ordering::Acquire)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_data_budget() {
|
||||||
|
let budget = DataBudget::default();
|
||||||
|
assert!(!budget.take(1)); // budget = 0.
|
||||||
|
|
||||||
|
budget.update(1000, |bytes| bytes + 5); // budget updates to 5.
|
||||||
|
assert!(budget.take(1));
|
||||||
|
assert!(budget.take(2));
|
||||||
|
assert!(!budget.take(3)); // budget = 2, out of budget.
|
||||||
|
|
||||||
|
budget.update(30, |_| 10); // no update, budget = 2.
|
||||||
|
assert!(!budget.take(3)); // budget = 2, out of budget.
|
||||||
|
|
||||||
|
std::thread::sleep(Duration::from_millis(50));
|
||||||
|
budget.update(30, |bytes| bytes * 2); // budget updates to 4.
|
||||||
|
|
||||||
|
assert!(budget.take(3));
|
||||||
|
assert!(budget.take(1));
|
||||||
|
assert!(!budget.take(1)); // budget = 0.
|
||||||
|
}
|
||||||
|
}
|
@ -32,6 +32,7 @@ pub mod crds_gossip_pull;
|
|||||||
pub mod crds_gossip_push;
|
pub mod crds_gossip_push;
|
||||||
pub mod crds_shards;
|
pub mod crds_shards;
|
||||||
pub mod crds_value;
|
pub mod crds_value;
|
||||||
|
pub mod data_budget;
|
||||||
pub mod epoch_slots;
|
pub mod epoch_slots;
|
||||||
pub mod fetch_stage;
|
pub mod fetch_stage;
|
||||||
pub mod fork_choice;
|
pub mod fork_choice;
|
||||||
|
Reference in New Issue
Block a user