From 7e613c7a788f2a43157027367e1d4bc25473713d Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 9 Jul 2021 13:33:40 +0000 Subject: [PATCH] skips process_push_message for local messages (backport #18493) (#18532) * skips process_push_message for local messages (#18493) received_cache is not relevant for local messages, and does not need to be updated: https://github.com/solana-labs/solana/blob/92c5cdab6/gossip/src/crds_gossip_push.rs#L166-L189 (cherry picked from commit 27cc7577a172c6a7dd493c6e2a761ccef0bf67fc) # Conflicts: # gossip/src/cluster_info.rs * removes backport merge conflicts Co-authored-by: behzad nouri --- gossip/src/cluster_info.rs | 26 ++++++++++++++------------ gossip/src/crds_gossip.rs | 31 +++++++++++++++++-------------- gossip/src/crds_gossip_push.rs | 2 +- 3 files changed, 32 insertions(+), 27 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 990568cf98..611d46cda2 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -957,7 +957,12 @@ impl ClusterInfo { reset = true; } let mut gossip = self.gossip.write().unwrap(); - gossip.process_push_message(&self_pubkey, entries, timestamp()); + let now = timestamp(); + for entry in entries { + if let Err(err) = gossip.crds.insert(entry, now) { + error!("push_epoch_slots failed: {:?}", err); + } + } } fn time_gossip_read_lock<'a>( @@ -1016,10 +1021,10 @@ impl ClusterInfo { let vote = Vote::new(self_pubkey, vote, now); let vote = CrdsData::Vote(vote_index, vote); let vote = CrdsValue::new_signed(vote, &self.keypair); - self.gossip - .write() - .unwrap() - .process_push_message(&self_pubkey, vec![vote], now); + let mut gossip = self.gossip.write().unwrap(); + if let Err(err) = gossip.crds.insert(vote, now) { + error!("push_vote failed: {:?}", err); + } } pub fn push_vote(&self, tower: &[Slot], vote: Transaction) { @@ -1514,7 +1519,10 @@ impl ClusterInfo { pub fn flush_push_queue(&self) { let pending_push_messages = self.drain_push_queue(); let mut gossip = self.gossip.write().unwrap(); - gossip.process_push_message(&self.id, pending_push_messages, timestamp()); + let now = timestamp(); + for entry in pending_push_messages { + let _ = gossip.crds.insert(entry, now); + } } fn new_push_requests( &self, @@ -3513,7 +3521,6 @@ mod tests { None, // payer ); cluster_info.push_vote(&unrefresh_tower, unrefresh_tx.clone()); - cluster_info.flush_push_queue(); let mut cursor = Cursor::default(); let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes, vec![unrefresh_tx.clone()]); @@ -3535,7 +3542,6 @@ mod tests { // Trying to refresh vote when it doesn't yet exist in gossip // shouldn't add the vote cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot); - cluster_info.flush_push_queue(); let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes, vec![]); let (_, votes) = cluster_info.get_votes(&mut Cursor::default()); @@ -3544,7 +3550,6 @@ mod tests { // Push the new vote for `refresh_slot` cluster_info.push_vote(&refresh_tower, refresh_tx.clone()); - cluster_info.flush_push_queue(); // Should be two votes in gossip let (_, votes) = cluster_info.get_votes(&mut Cursor::default()); @@ -3570,8 +3575,6 @@ mod tests { ); cluster_info.refresh_vote(latest_refresh_tx.clone(), refresh_slot); } - cluster_info.flush_push_queue(); - // The diff since `max_ts` should only be the latest refreshed vote let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 1); @@ -3612,7 +3615,6 @@ mod tests { ); let tower = vec![7]; // Last slot in the vote. cluster_info.push_vote(&tower, tx.clone()); - cluster_info.flush_push_queue(); let (labels, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes, vec![tx]); diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 8fd8b9423e..84a9baec9f 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -61,16 +61,16 @@ impl CrdsGossip { } /// process a push message to the network - /// Returns origins' pubkeys of upserted values. + /// Returns unique origins' pubkeys of upserted values. pub fn process_push_message( &mut self, from: &Pubkey, values: Vec, now: u64, - ) -> Vec { + ) -> HashSet { values .into_iter() - .flat_map(|val| { + .filter_map(|val| { let origin = val.pubkey(); self.push .process_push_message(&mut self.crds, from, val, now) @@ -106,8 +106,9 @@ impl CrdsGossip { pending_push_messages: Vec, now: u64, ) -> HashMap> { - let self_pubkey = self.id; - self.process_push_message(&self_pubkey, pending_push_messages, now); + for entry in pending_push_messages { + let _ = self.crds.insert(entry, now); + } self.push.new_push_messages(&self.crds, now) } @@ -161,15 +162,17 @@ impl CrdsGossip { } else { offset }; - let entries = chunks - .enumerate() - .map(|(k, chunk)| { - let index = (offset + k as DuplicateShredIndex) % MAX_DUPLICATE_SHREDS; - let data = CrdsData::DuplicateShred(index, chunk); - CrdsValue::new_signed(data, keypair) - }) - .collect(); - self.process_push_message(&pubkey, entries, timestamp()); + let entries = chunks.enumerate().map(|(k, chunk)| { + let index = (offset + k as DuplicateShredIndex) % MAX_DUPLICATE_SHREDS; + let data = CrdsData::DuplicateShred(index, chunk); + CrdsValue::new_signed(data, keypair) + }); + let now = timestamp(); + for entry in entries { + if let Err(err) = self.crds.insert(entry, now) { + error!("push_duplicate_shred faild: {:?}", err); + } + } Ok(()) } diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index ca836546b6..24179f7ab9 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -164,7 +164,7 @@ impl CrdsGossipPush { } /// process a push message to the network - pub fn process_push_message( + pub(crate) fn process_push_message( &mut self, crds: &mut Crds, from: &Pubkey,