From 1fcfbfccbbe439ea0d9da963bc37c3d645921134 Mon Sep 17 00:00:00 2001 From: Brooks Prumo Date: Fri, 8 Oct 2021 08:20:35 -0500 Subject: [PATCH] Add fn to push IncrementalSnapshotHashes to cluster via gossip (#20395) --- gossip/src/cluster_info.rs | 61 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 2b6ecb5963..54bc76e26c 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -115,6 +115,10 @@ const DUPLICATE_SHRED_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 115; /// PACKET_DATA_SIZE. // TODO: Update this to 26 once payload sizes are upgraded across fleet. pub const MAX_SNAPSHOT_HASHES: usize = 16; +/// Maximum number of hashes in IncrementalSnapshotHashes a node publishes +/// such that the serialized size of the push/pull message stays below +/// PACKET_DATA_SIZE. +pub const MAX_INCREMENTAL_SNAPSHOT_HASHES: usize = 25; /// Maximum number of origin nodes that a PruneData may contain, such that the /// serialized size of the PruneMessage stays below PACKET_DATA_SIZE. const MAX_PRUNE_DATA_NODES: usize = 32; @@ -140,6 +144,7 @@ pub enum ClusterInfoError { NoLeader, BadContactInfo, BadGossipAddress, + TooManyIncrementalSnapshotHashes, } pub struct ClusterInfo { @@ -950,6 +955,26 @@ impl ClusterInfo { self.push_message(CrdsValue::new_signed(message, &self.keypair())); } + pub fn push_incremental_snapshot_hashes( + &self, + base: (Slot, Hash), + hashes: Vec<(Slot, Hash)>, + ) -> Result<(), ClusterInfoError> { + if hashes.len() > MAX_INCREMENTAL_SNAPSHOT_HASHES { + return Err(ClusterInfoError::TooManyIncrementalSnapshotHashes); + } + + let message = CrdsData::IncrementalSnapshotHashes(IncrementalSnapshotHashes { + from: self.id(), + base, + hashes, + wallclock: timestamp(), + }); + self.push_message(CrdsValue::new_signed(message, &self.keypair())); + + Ok(()) + } + pub fn push_vote_at_index(&self, vote: Transaction, vote_index: u8) { assert!((vote_index as usize) < MAX_LOCKOUT_HISTORY); let self_pubkey = self.id(); @@ -3214,6 +3239,42 @@ mod tests { } } + #[test] + fn test_max_incremental_snapshot_hashes_with_push_messages() { + let mut rng = rand::thread_rng(); + let incremental_snapshot_hashes = IncrementalSnapshotHashes { + from: Pubkey::new_unique(), + base: (Slot::default(), Hash::default()), + hashes: vec![(Slot::default(), Hash::default()); MAX_INCREMENTAL_SNAPSHOT_HASHES], + wallclock: timestamp(), + }; + let crds_value = CrdsValue::new_signed( + CrdsData::IncrementalSnapshotHashes(incremental_snapshot_hashes), + &Keypair::new(), + ); + let message = Protocol::PushMessage(Pubkey::new_unique(), vec![crds_value]); + let socket = new_rand_socket_addr(&mut rng); + assert!(Packet::from_data(Some(&socket), message).is_ok()); + } + + #[test] + fn test_max_incremental_snapshot_hashes_with_pull_responses() { + let mut rng = rand::thread_rng(); + let incremental_snapshot_hashes = IncrementalSnapshotHashes { + from: Pubkey::new_unique(), + base: (Slot::default(), Hash::default()), + hashes: vec![(Slot::default(), Hash::default()); MAX_INCREMENTAL_SNAPSHOT_HASHES], + wallclock: timestamp(), + }; + let crds_value = CrdsValue::new_signed( + CrdsData::IncrementalSnapshotHashes(incremental_snapshot_hashes), + &Keypair::new(), + ); + let response = Protocol::PullResponse(Pubkey::new_unique(), vec![crds_value]); + let socket = new_rand_socket_addr(&mut rng); + assert!(Packet::from_data(Some(&socket), response).is_ok()); + } + #[test] fn test_max_prune_data_pubkeys() { let mut rng = rand::thread_rng();