Weight push peers by how long we haven't pushed to them (#12620) (#12651)

(cherry picked from commit 71c469c72b)

Co-authored-by: sakridge <sakridge@gmail.com>
This commit is contained in:
mergify[bot]
2020-10-02 22:27:35 +00:00
committed by GitHub
parent a983430ddb
commit ffa0ee69ca

View File

@ -37,6 +37,9 @@ pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15; pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15;
pub const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2; pub const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2;
// 10 minutes
const MAX_PUSHED_TO_TIMEOUT_MS: u64 = 10 * 60 * 1000;
#[derive(Clone)] #[derive(Clone)]
pub struct CrdsGossipPush { pub struct CrdsGossipPush {
/// max bytes per message /// max bytes per message
@ -50,6 +53,8 @@ pub struct CrdsGossipPush {
/// This cache represents a lagging view of which validators /// This cache represents a lagging view of which validators
/// currently have this node in their `active_set` /// currently have this node in their `active_set`
received_cache: HashMap<Pubkey, HashMap<Pubkey, (bool, u64)>>, received_cache: HashMap<Pubkey, HashMap<Pubkey, (bool, u64)>>,
last_pushed_to: HashMap<Pubkey, u64>,
last_pushed_to_cleanup_ts: u64,
pub num_active: usize, pub num_active: usize,
pub push_fanout: usize, pub push_fanout: usize,
pub msg_timeout: u64, pub msg_timeout: u64,
@ -67,6 +72,8 @@ impl Default for CrdsGossipPush {
active_set: IndexMap::new(), active_set: IndexMap::new(),
push_messages: HashMap::new(), push_messages: HashMap::new(),
received_cache: HashMap::new(), received_cache: HashMap::new(),
last_pushed_to: HashMap::new(),
last_pushed_to_cleanup_ts: 0,
num_active: CRDS_GOSSIP_NUM_ACTIVE, num_active: CRDS_GOSSIP_NUM_ACTIVE,
push_fanout: CRDS_GOSSIP_PUSH_FANOUT, push_fanout: CRDS_GOSSIP_PUSH_FANOUT,
msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS,
@ -254,6 +261,15 @@ impl CrdsGossipPush {
self.push_messages.remove(&v.label()); self.push_messages.remove(&v.label());
} }
} }
for target_pubkey in push_messages.keys() {
*self.last_pushed_to.entry(*target_pubkey).or_insert(0) = now;
}
if now - self.last_pushed_to_cleanup_ts > MAX_PUSHED_TO_TIMEOUT_MS {
self.last_pushed_to
.retain(|_id, timestamp| now - *timestamp > MAX_PUSHED_TO_TIMEOUT_MS);
self.last_pushed_to_cleanup_ts = now;
}
push_messages push_messages
} }
@ -357,10 +373,10 @@ impl CrdsGossipPush {
gossip_validators.contains(&info.id) gossip_validators.contains(&info.id)
}) })
}) })
.map(|(info, value)| { .map(|(info, _value)| {
let max_weight = f32::from(u16::max_value()) - 1.0; let max_weight = f32::from(u16::max_value()) - 1.0;
let last_updated: u64 = value.local_timestamp; let last_pushed_to: u64 = *self.last_pushed_to.get(&info.id).unwrap_or(&0);
let since = ((timestamp() - last_updated) / 1024) as u32; let since = ((timestamp() - last_pushed_to) / 1024) as u32;
let stake = get_stake(&info.id, stakes); let stake = get_stake(&info.id, stakes);
let weight = get_weight(max_weight, since, stake); let weight = get_weight(max_weight, since, stake);
(weight, info) (weight, info)
@ -576,9 +592,10 @@ mod test {
} }
#[test] #[test]
fn test_active_set_refresh_with_bank() { fn test_active_set_refresh_with_bank() {
solana_logger::setup();
let time = timestamp() - 1024; //make sure there's at least a 1 second delay let time = timestamp() - 1024; //make sure there's at least a 1 second delay
let mut crds = Crds::default(); let mut crds = Crds::default();
let push = CrdsGossipPush::default(); let mut push = CrdsGossipPush::default();
let mut stakes = HashMap::new(); let mut stakes = HashMap::new();
for i in 1..=100 { for i in 1..=100 {
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
@ -588,6 +605,7 @@ mod test {
let id = peer.label().pubkey(); let id = peer.label().pubkey();
crds.insert(peer.clone(), time).unwrap(); crds.insert(peer.clone(), time).unwrap();
stakes.insert(id, i * 100); stakes.insert(id, i * 100);
push.last_pushed_to.insert(id, time);
} }
let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes, None); let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes, None);
assert!(!options.is_empty()); assert!(!options.is_empty());