diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 55b2decf15..96d0b9cf0e 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -51,7 +51,7 @@ type PacketsAndOffsets = (Packets, Vec); pub type UnprocessedPackets = Vec; /// Transaction forwarding -pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 4; +pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 1; // Fixed thread size seems to be fastest on GCP setup pub const NUM_THREADS: u32 = 4; diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 6de9864455..e9e2cbc7cb 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -254,6 +254,7 @@ pub struct ClusterInfo { my_contact_info: RwLock, id: Pubkey, stats: GossipStats, + socket: UdpSocket, } impl Default for ClusterInfo { @@ -409,6 +410,7 @@ impl ClusterInfo { my_contact_info: RwLock::new(contact_info), id, stats: GossipStats::default(), + socket: UdpSocket::bind("0.0.0.0:0").unwrap(), }; { let mut gossip = me.gossip.write().unwrap(); @@ -434,6 +436,7 @@ impl ClusterInfo { my_contact_info: RwLock::new(my_contact_info), id: *new_id, stats: GossipStats::default(), + socket: UdpSocket::bind("0.0.0.0:0").unwrap(), } } @@ -747,6 +750,13 @@ impl ClusterInfo { .process_push_message(&self.id(), vec![entry], now); } + pub fn send_vote(&self, vote: &Transaction) -> Result<()> { + let tpu = self.my_contact_info().tpu; + let buf = serialize(vote)?; + self.socket.send_to(&buf, &tpu)?; + Ok(()) + } + /// Get votes in the crds /// * since - The timestamp of when the vote inserted must be greater than /// since. This allows the bank to query for new votes only. diff --git a/core/src/crds.rs b/core/src/crds.rs index 109b4c22fe..a957d83ff9 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -36,6 +36,7 @@ use std::collections::HashMap; pub struct Crds { /// Stores the map of labels and values pub table: IndexMap, + pub num_inserts: usize, } #[derive(PartialEq, Debug)] @@ -84,6 +85,7 @@ impl Default for Crds { fn default() -> Self { Crds { table: IndexMap::new(), + num_inserts: 0, } } } @@ -125,6 +127,7 @@ impl Crds { .unwrap_or(true); if do_insert { let old = self.table.insert(label, new_value); + self.num_inserts += 1; Ok(old) } else { trace!("INSERT FAILED data: {} new.wallclock: {}", label, wallclock,); diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 0d578d89b6..817701b61a 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -76,17 +76,10 @@ impl CrdsGossip { stakes: &HashMap, ) -> HashMap> { let id = &self.id; - let crds = &self.crds; let push = &mut self.push; - let versioned = labels - .into_iter() - .filter_map(|label| crds.lookup_versioned(&label)); - let mut prune_map: HashMap> = HashMap::new(); - for val in versioned { - let origin = val.value.pubkey(); - let hash = val.value_hash; - let peers = push.prune_received_cache(id, &origin, hash, stakes); + for origin in labels.iter().map(|k| k.pubkey()) { + let peers = push.prune_received_cache(id, &origin, stakes); for from in peers { prune_map.entry(from).or_default().insert(origin); } @@ -113,7 +106,7 @@ impl CrdsGossip { return Err(CrdsGossipError::PruneMessageTimeout); } if self.id == *destination { - self.push.process_prune_msg(peer, origin); + self.push.process_prune_msg(&self.id, peer, origin); Ok(()) } else { Err(CrdsGossipError::BadPruneDestination) @@ -190,14 +183,15 @@ impl CrdsGossip { now: u64, process_pull_stats: &mut ProcessPullStats, ) { - self.pull.process_pull_responses( + let success = self.pull.process_pull_responses( &mut self.crds, from, responses, responses_expired_timeout, now, process_pull_stats, - ) + ); + self.push.push_pull_responses(success, now); } pub fn make_timeouts_test(&self) -> HashMap { diff --git a/core/src/crds_gossip_error.rs b/core/src/crds_gossip_error.rs index e99ae611e7..add95f5d44 100644 --- a/core/src/crds_gossip_error.rs +++ b/core/src/crds_gossip_error.rs @@ -2,7 +2,6 @@ pub enum CrdsGossipError { NoPeers, PushMessageTimeout, - PushMessageAlreadyReceived, PushMessageOldVersion, BadPruneDestination, PruneMessageTimeout, diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 466f60813f..5334c9fa32 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -134,6 +134,7 @@ pub struct CrdsGossipPull { purged_values: VecDeque<(Hash, u64)>, pub crds_timeout: u64, pub msg_timeout: u64, + pub num_pulls: usize, } impl Default for CrdsGossipPull { @@ -143,6 +144,7 @@ impl Default for CrdsGossipPull { pull_request_time: HashMap::new(), crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, + num_pulls: 0, } } } @@ -313,18 +315,24 @@ impl CrdsGossipPull { responses_expired_timeout: Vec, now: u64, stats: &mut ProcessPullStats, - ) { + ) -> Vec<(CrdsValueLabel, Hash, u64)> { + let mut success = vec![]; let mut owners = HashSet::new(); for r in responses_expired_timeout { stats.failed_insert += crds.insert_versioned(r).is_err() as usize; } for r in responses { let owner = r.value.label().pubkey(); + let label = r.value.label(); + let wc = r.value.wallclock(); + let hash = r.value_hash; let old = crds.insert_versioned(r); if old.is_err() { stats.failed_insert += 1; } else { stats.success += 1; + self.num_pulls += 1; + success.push((label, hash, wc)); } old.ok().map(|opt| { owners.insert(owner); @@ -338,6 +346,7 @@ impl CrdsGossipPull { for owner in owners { crds.update_record_timestamp(&owner, now); } + success } // build a set of filters of the current crds table // num_filters - used to increase the likelyhood of a value in crds being added to some filter diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 6b701f4c7e..1c2b53535d 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -35,6 +35,7 @@ pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6; pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000; pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500; pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15; +pub const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2; #[derive(Clone)] pub struct CrdsGossipPush { @@ -44,12 +45,18 @@ pub struct CrdsGossipPush { active_set: IndexMap>, /// push message queue push_messages: HashMap, - /// cache that tracks which validators a message was received from - received_cache: HashMap)>, + /// Cache that tracks which validators a message was received from + /// bool indicates it has been pruned. + /// This cache represents a lagging view of which validators + /// currently have this node in their `active_set` + received_cache: HashMap>, pub num_active: usize, pub push_fanout: usize, pub msg_timeout: u64, pub prune_timeout: u64, + pub num_total: usize, + pub num_old: usize, + pub num_pushes: usize, } impl Default for CrdsGossipPush { @@ -64,6 +71,9 @@ impl Default for CrdsGossipPush { push_fanout: CRDS_GOSSIP_PUSH_FANOUT, msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, prune_timeout: CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS, + num_total: 0, + num_old: 0, + num_pushes: 0, } } } @@ -81,18 +91,21 @@ impl CrdsGossipPush { &mut self, self_pubkey: &Pubkey, origin: &Pubkey, - hash: Hash, stakes: &HashMap, ) -> Vec { let origin_stake = stakes.get(origin).unwrap_or(&0); let self_stake = stakes.get(self_pubkey).unwrap_or(&0); - let cache = self.received_cache.get(&hash); + let cache = self.received_cache.get(origin); if cache.is_none() { return Vec::new(); } + let peers = cache.unwrap(); - let peers = &cache.unwrap().1; - let peer_stake_total: u64 = peers.iter().map(|p| stakes.get(p).unwrap_or(&0)).sum(); + let peer_stake_total: u64 = peers + .iter() + .filter(|v| !(v.1).0) + .map(|v| stakes.get(v.0).unwrap_or(&0)) + .sum(); let prune_stake_threshold = Self::prune_stake_threshold(*self_stake, *origin_stake); if peer_stake_total < prune_stake_threshold { return Vec::new(); @@ -100,7 +113,8 @@ impl CrdsGossipPush { let staked_peers: Vec<(Pubkey, u64)> = peers .iter() - .filter_map(|p| stakes.get(p).map(|s| (*p, *s))) + .filter(|v| !(v.1).0) + .filter_map(|p| stakes.get(p.0).map(|s| (*p.0, *s))) .filter(|(_, s)| *s > 0) .collect(); @@ -117,16 +131,27 @@ impl CrdsGossipPush { let (next_peer, next_stake) = staked_peers[next]; keep.insert(next_peer); peer_stake_sum += next_stake; - if peer_stake_sum >= prune_stake_threshold { + if peer_stake_sum >= prune_stake_threshold + && keep.len() >= CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES + { break; } } - peers - .iter() + let pruned_peers: Vec = peers + .keys() .filter(|p| !keep.contains(p)) .cloned() - .collect() + .collect(); + pruned_peers.iter().for_each(|p| { + self.received_cache + .get_mut(origin) + .unwrap() + .get_mut(p) + .unwrap() + .0 = true; + }); + pruned_peers } /// process a push message to the network @@ -137,6 +162,7 @@ impl CrdsGossipPush { value: CrdsValue, now: u64, ) -> Result, CrdsGossipError> { + self.num_total += 1; if now > value .wallclock() @@ -149,21 +175,32 @@ impl CrdsGossipPush { return Err(CrdsGossipError::PushMessageTimeout); } let label = value.label(); + let origin = label.pubkey(); let new_value = crds.new_versioned(now, value); let value_hash = new_value.value_hash; - if let Some((_, ref mut received_set)) = self.received_cache.get_mut(&value_hash) { - received_set.insert(from.clone()); - return Err(CrdsGossipError::PushMessageAlreadyReceived); - } + let received_set = self + .received_cache + .entry(origin) + .or_insert_with(HashMap::new); + received_set.entry(*from).or_insert((false, 0)).1 = now; + let old = crds.insert_versioned(new_value); if old.is_err() { + self.num_old += 1; return Err(CrdsGossipError::PushMessageOldVersion); } - let mut received_set = HashSet::new(); - received_set.insert(from.clone()); self.push_messages.insert(label, value_hash); - self.received_cache.insert(value_hash, (now, received_set)); - Ok(old.ok().and_then(|opt| opt)) + Ok(old.unwrap()) + } + + /// push pull responses + pub fn push_pull_responses(&mut self, values: Vec<(CrdsValueLabel, Hash, u64)>, now: u64) { + for (label, value_hash, wc) in values { + if now > wc.checked_add(self.msg_timeout).unwrap_or_else(|| 0) { + continue; + } + self.push_messages.insert(label, value_hash); + } } /// New push message to broadcast to peers. @@ -172,18 +209,10 @@ impl CrdsGossipPush { /// The list of push messages is created such that all the randomly selected peers have not /// pruned the source addresses. pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap> { - let max = self.active_set.len(); - let mut nodes: Vec<_> = (0..max).collect(); - nodes.shuffle(&mut rand::thread_rng()); - let peers: Vec = nodes - .into_iter() - .filter_map(|n| self.active_set.get_index(n)) - .take(self.push_fanout) - .map(|n| *n.0) - .collect(); let mut total_bytes: usize = 0; let mut values = vec![]; let mut push_messages: HashMap> = HashMap::new(); + trace!("new_push_messages {}", self.push_messages.len()); for (label, hash) in &self.push_messages { let res = crds.lookup_versioned(label); if res.is_none() { @@ -203,21 +232,37 @@ impl CrdsGossipPush { } values.push(value.clone()); } + trace!( + "new_push_messages {} {}", + values.len(), + self.active_set.len() + ); for v in values { - for p in peers.iter() { - let filter = self.active_set.get_mut(p); - if filter.is_some() && !filter.unwrap().contains(&v.label().pubkey()) { - push_messages.entry(*p).or_default().push(v.clone()); + //use a consistent index for the same origin so + //the active set learns the MST for that origin + let start = v.label().pubkey().as_ref()[0] as usize; + let max = self.push_fanout.min(self.active_set.len()); + for i in start..(start + max) { + let ix = i % self.active_set.len(); + if let Some((p, filter)) = self.active_set.get_index(ix) { + if !filter.contains(&v.label().pubkey()) { + trace!("new_push_messages insert {} {:?}", *p, v); + push_messages.entry(*p).or_default().push(v.clone()); + self.num_pushes += 1; + } } + self.push_messages.remove(&v.label()); } - self.push_messages.remove(&v.label()); } push_messages } /// add the `from` to the peer's filter of nodes - pub fn process_prune_msg(&mut self, peer: &Pubkey, origins: &[Pubkey]) { + pub fn process_prune_msg(&mut self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) { for origin in origins { + if origin == self_pubkey { + continue; + } if let Some(p) = self.active_set.get_mut(peer) { p.add(origin) } @@ -339,15 +384,11 @@ impl CrdsGossipPush { /// purge received push message cache pub fn purge_old_received_cache(&mut self, min_time: u64) { - let old_msgs: Vec = self - .received_cache - .iter() - .filter_map(|(k, (rcvd_time, _))| if *rcvd_time < min_time { Some(k) } else { None }) - .cloned() - .collect(); - for k in old_msgs { - self.received_cache.remove(&k); - } + self.received_cache + .iter_mut() + .for_each(|v| v.1.retain(|_, v| v.1 > min_time)); + + self.received_cache.retain(|_, v| !v.is_empty()); } } @@ -371,7 +412,6 @@ mod test { let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &origin, 0, ))); - let label = value.label(); let low_staked_peers = (0..10).map(|_| Pubkey::new_rand()); let mut low_staked_set = HashSet::new(); low_staked_peers.for_each(|p| { @@ -380,11 +420,7 @@ mod test { stakes.insert(p, 1); }); - let versioned = crds - .lookup_versioned(&label) - .expect("versioned value should exist"); - let hash = versioned.value_hash; - let pruned = push.prune_received_cache(&self_id, &origin, hash, &stakes); + let pruned = push.prune_received_cache(&self_id, &origin, &stakes); assert!( pruned.is_empty(), "should not prune if min threshold has not been reached" @@ -395,7 +431,7 @@ mod test { stakes.insert(high_staked_peer, high_stake); let _ = push.process_push_message(&mut crds, &high_staked_peer, value.clone(), 0); - let pruned = push.prune_received_cache(&self_id, &origin, hash, &stakes); + let pruned = push.prune_received_cache(&self_id, &origin, &stakes); assert!( pruned.len() < low_staked_set.len() + 1, "should not prune all peers" @@ -409,7 +445,7 @@ mod test { } #[test] - fn test_process_push() { + fn test_process_push_one() { let mut crds = Crds::default(); let mut push = CrdsGossipPush::default(); let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -426,8 +462,8 @@ mod test { // push it again assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0), - Err(CrdsGossipError::PushMessageAlreadyReceived) + push.process_push_message(&mut crds, &Pubkey::default(), value, 0), + Err(CrdsGossipError::PushMessageOldVersion) ); } #[test] @@ -690,6 +726,7 @@ mod test { #[test] fn test_process_prune() { let mut crds = Crds::default(); + let self_id = Pubkey::new_rand(); let mut push = CrdsGossipPush::default(); let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), @@ -707,7 +744,11 @@ mod test { push.process_push_message(&mut crds, &Pubkey::default(), new_msg.clone(), 0), Ok(None) ); - push.process_prune_msg(&peer.label().pubkey(), &[new_msg.label().pubkey()]); + push.process_prune_msg( + &self_id, + &peer.label().pubkey(), + &[new_msg.label().pubkey()], + ); assert_eq!(push.new_push_messages(&crds, 0), expected); } #[test] @@ -749,9 +790,9 @@ mod test { assert_eq!(crds.lookup(&label), Some(&value)); // push it again - assert_eq!( + assert_matches!( push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0), - Err(CrdsGossipError::PushMessageAlreadyReceived) + Err(CrdsGossipError::PushMessageOldVersion) ); // purge the old pushed diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index d1d63be15e..da65fe8424 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -758,7 +758,6 @@ impl ReplayStage { progress.get_fork_stats(bank.slot()).unwrap().total_staked, lockouts_sender, ); - Self::push_vote( cluster_info, bank, @@ -838,6 +837,7 @@ impl ReplayStage { let blockhash = bank.last_blockhash(); vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash); vote_tx.partial_sign(&[authorized_voter_keypair.as_ref()], blockhash); + let _ = cluster_info.send_vote(&vote_tx); cluster_info.push_vote(tower_index, vote_tx); }