* skips process_push_message for local messages (#18493)
received_cache is not relevant for local messages, and does not need to
be updated:
https://github.com/solana-labs/solana/blob/92c5cdab6/gossip/src/crds_gossip_push.rs#L166-L189
(cherry picked from commit 27cc7577a1
)
# Conflicts:
# gossip/src/cluster_info.rs
* removes backport merge conflicts
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
@ -957,7 +957,12 @@ impl ClusterInfo {
|
||||
reset = true;
|
||||
}
|
||||
let mut gossip = self.gossip.write().unwrap();
|
||||
gossip.process_push_message(&self_pubkey, entries, timestamp());
|
||||
let now = timestamp();
|
||||
for entry in entries {
|
||||
if let Err(err) = gossip.crds.insert(entry, now) {
|
||||
error!("push_epoch_slots failed: {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn time_gossip_read_lock<'a>(
|
||||
@ -1016,10 +1021,10 @@ impl ClusterInfo {
|
||||
let vote = Vote::new(self_pubkey, vote, now);
|
||||
let vote = CrdsData::Vote(vote_index, vote);
|
||||
let vote = CrdsValue::new_signed(vote, &self.keypair);
|
||||
self.gossip
|
||||
.write()
|
||||
.unwrap()
|
||||
.process_push_message(&self_pubkey, vec![vote], now);
|
||||
let mut gossip = self.gossip.write().unwrap();
|
||||
if let Err(err) = gossip.crds.insert(vote, now) {
|
||||
error!("push_vote failed: {:?}", err);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_vote(&self, tower: &[Slot], vote: Transaction) {
|
||||
@ -1514,7 +1519,10 @@ impl ClusterInfo {
|
||||
pub fn flush_push_queue(&self) {
|
||||
let pending_push_messages = self.drain_push_queue();
|
||||
let mut gossip = self.gossip.write().unwrap();
|
||||
gossip.process_push_message(&self.id, pending_push_messages, timestamp());
|
||||
let now = timestamp();
|
||||
for entry in pending_push_messages {
|
||||
let _ = gossip.crds.insert(entry, now);
|
||||
}
|
||||
}
|
||||
fn new_push_requests(
|
||||
&self,
|
||||
@ -3513,7 +3521,6 @@ mod tests {
|
||||
None, // payer
|
||||
);
|
||||
cluster_info.push_vote(&unrefresh_tower, unrefresh_tx.clone());
|
||||
cluster_info.flush_push_queue();
|
||||
let mut cursor = Cursor::default();
|
||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||
assert_eq!(votes, vec![unrefresh_tx.clone()]);
|
||||
@ -3535,7 +3542,6 @@ mod tests {
|
||||
// Trying to refresh vote when it doesn't yet exist in gossip
|
||||
// shouldn't add the vote
|
||||
cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot);
|
||||
cluster_info.flush_push_queue();
|
||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||
assert_eq!(votes, vec![]);
|
||||
let (_, votes) = cluster_info.get_votes(&mut Cursor::default());
|
||||
@ -3544,7 +3550,6 @@ mod tests {
|
||||
|
||||
// Push the new vote for `refresh_slot`
|
||||
cluster_info.push_vote(&refresh_tower, refresh_tx.clone());
|
||||
cluster_info.flush_push_queue();
|
||||
|
||||
// Should be two votes in gossip
|
||||
let (_, votes) = cluster_info.get_votes(&mut Cursor::default());
|
||||
@ -3570,8 +3575,6 @@ mod tests {
|
||||
);
|
||||
cluster_info.refresh_vote(latest_refresh_tx.clone(), refresh_slot);
|
||||
}
|
||||
cluster_info.flush_push_queue();
|
||||
|
||||
// The diff since `max_ts` should only be the latest refreshed vote
|
||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||
assert_eq!(votes.len(), 1);
|
||||
@ -3612,7 +3615,6 @@ mod tests {
|
||||
);
|
||||
let tower = vec![7]; // Last slot in the vote.
|
||||
cluster_info.push_vote(&tower, tx.clone());
|
||||
cluster_info.flush_push_queue();
|
||||
|
||||
let (labels, votes) = cluster_info.get_votes(&mut cursor);
|
||||
assert_eq!(votes, vec![tx]);
|
||||
|
@ -61,16 +61,16 @@ impl CrdsGossip {
|
||||
}
|
||||
|
||||
/// process a push message to the network
|
||||
/// Returns origins' pubkeys of upserted values.
|
||||
/// Returns unique origins' pubkeys of upserted values.
|
||||
pub fn process_push_message(
|
||||
&mut self,
|
||||
from: &Pubkey,
|
||||
values: Vec<CrdsValue>,
|
||||
now: u64,
|
||||
) -> Vec<Pubkey> {
|
||||
) -> HashSet<Pubkey> {
|
||||
values
|
||||
.into_iter()
|
||||
.flat_map(|val| {
|
||||
.filter_map(|val| {
|
||||
let origin = val.pubkey();
|
||||
self.push
|
||||
.process_push_message(&mut self.crds, from, val, now)
|
||||
@ -106,8 +106,9 @@ impl CrdsGossip {
|
||||
pending_push_messages: Vec<CrdsValue>,
|
||||
now: u64,
|
||||
) -> HashMap<Pubkey, Vec<CrdsValue>> {
|
||||
let self_pubkey = self.id;
|
||||
self.process_push_message(&self_pubkey, pending_push_messages, now);
|
||||
for entry in pending_push_messages {
|
||||
let _ = self.crds.insert(entry, now);
|
||||
}
|
||||
self.push.new_push_messages(&self.crds, now)
|
||||
}
|
||||
|
||||
@ -161,15 +162,17 @@ impl CrdsGossip {
|
||||
} else {
|
||||
offset
|
||||
};
|
||||
let entries = chunks
|
||||
.enumerate()
|
||||
.map(|(k, chunk)| {
|
||||
let index = (offset + k as DuplicateShredIndex) % MAX_DUPLICATE_SHREDS;
|
||||
let data = CrdsData::DuplicateShred(index, chunk);
|
||||
CrdsValue::new_signed(data, keypair)
|
||||
})
|
||||
.collect();
|
||||
self.process_push_message(&pubkey, entries, timestamp());
|
||||
let entries = chunks.enumerate().map(|(k, chunk)| {
|
||||
let index = (offset + k as DuplicateShredIndex) % MAX_DUPLICATE_SHREDS;
|
||||
let data = CrdsData::DuplicateShred(index, chunk);
|
||||
CrdsValue::new_signed(data, keypair)
|
||||
});
|
||||
let now = timestamp();
|
||||
for entry in entries {
|
||||
if let Err(err) = self.crds.insert(entry, now) {
|
||||
error!("push_duplicate_shred faild: {:?}", err);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -164,7 +164,7 @@ impl CrdsGossipPush {
|
||||
}
|
||||
|
||||
/// process a push message to the network
|
||||
pub fn process_push_message(
|
||||
pub(crate) fn process_push_message(
|
||||
&mut self,
|
||||
crds: &mut Crds,
|
||||
from: &Pubkey,
|
||||
|
Reference in New Issue
Block a user