From 6a9e0bc593102cc284dd4e0b8225f1a9641667b2 Mon Sep 17 00:00:00 2001 From: carllin Date: Thu, 23 May 2019 03:50:41 -0700 Subject: [PATCH] Change EpochSlots to use BtreeSet so that serialization/deserialization returns the same order (#4404) automerge --- core/src/cluster_info.rs | 6 ++- core/src/cluster_info_repair_listener.rs | 6 +-- core/src/crds_value.rs | 43 +++++++++++----- core/src/repair_service.rs | 65 +++++++++++++++--------- 4 files changed, 80 insertions(+), 40 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 9c9705515b..006e3d7697 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -40,7 +40,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::timing::{duration_as_ms, timestamp}; use solana_sdk::transaction::Transaction; use std::cmp::min; -use std::collections::HashSet; +use std::collections::BTreeSet; use std::fmt; use std::io; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; @@ -306,7 +306,7 @@ impl ClusterInfo { } } - pub fn push_epoch_slots(&mut self, id: Pubkey, root: u64, slots: HashSet) { + pub fn push_epoch_slots(&mut self, id: Pubkey, root: u64, slots: BTreeSet) { let now = timestamp(); let mut entry = CrdsValue::EpochSlots(EpochSlots::new(id, root, slots, now)); entry.sign(&self.keypair); @@ -1205,11 +1205,13 @@ impl ClusterInfo { ) -> Vec { let self_id = me.read().unwrap().gossip.id; inc_new_counter_debug!("cluster_info-push_message", 1, 0, 1000); + let prunes: Vec<_> = me .write() .unwrap() .gossip .process_push_message(data, timestamp()); + if !prunes.is_empty() { inc_new_counter_debug!("cluster_info-push_message-prunes", prunes.len()); let ci = me.read().unwrap().lookup(from).cloned(); diff --git a/core/src/cluster_info_repair_listener.rs b/core/src/cluster_info_repair_listener.rs index ba1eedf6ab..8778caabf7 100644 --- a/core/src/cluster_info_repair_listener.rs +++ b/core/src/cluster_info_repair_listener.rs @@ -444,7 +444,7 @@ mod tests { use crate::blocktree::tests::make_many_slot_entries; use crate::packet::{Blob, SharedBlob}; use crate::streamer; - use std::collections::HashSet; + use std::collections::BTreeSet; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::mpsc::Receiver; @@ -528,7 +528,7 @@ mod tests { // Set up the repairee's EpochSlots, such that they are missing every odd indexed slot // in the range (repairee_root, num_slots] let repairee_root = 0; - let repairee_slots: HashSet<_> = (0..=num_slots).step_by(2).collect(); + let repairee_slots: BTreeSet<_> = (0..=num_slots).step_by(2).collect(); let repairee_epoch_slots = EpochSlots::new(mock_repairee.id, repairee_root, repairee_slots, 1); @@ -608,7 +608,7 @@ mod tests { // Thus, no repairmen should send any blobs to this repairee b/c this repairee // already has all the slots for which they have a confirmed leader schedule let repairee_root = 0; - let repairee_slots: HashSet<_> = (0..=slots_per_epoch).collect(); + let repairee_slots: BTreeSet<_> = (0..=slots_per_epoch).collect(); let repairee_epoch_slots = EpochSlots::new(mock_repairee.id, repairee_root, repairee_slots.clone(), 1); diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 6eeaa1f2eb..0fa14cc56c 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -3,7 +3,7 @@ use bincode::serialize; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, Signable, Signature}; use solana_sdk::transaction::Transaction; -use std::collections::HashSet; +use std::collections::BTreeSet; use std::fmt; /// CrdsValue that is replicated across the cluster @@ -21,13 +21,13 @@ pub enum CrdsValue { pub struct EpochSlots { pub from: Pubkey, pub root: u64, - pub slots: HashSet, + pub slots: BTreeSet, pub signature: Signature, pub wallclock: u64, } impl EpochSlots { - pub fn new(from: Pubkey, root: u64, slots: HashSet, wallclock: u64) -> Self { + pub fn new(from: Pubkey, root: u64, slots: BTreeSet, wallclock: u64) -> Self { Self { from, root, @@ -47,7 +47,7 @@ impl Signable for EpochSlots { #[derive(Serialize)] struct SignData<'a> { root: u64, - slots: &'a HashSet, + slots: &'a BTreeSet, wallclock: u64, } let data = SignData { @@ -220,7 +220,11 @@ impl Signable for CrdsValue { } fn get_signature(&self) -> Signature { - unimplemented!() + match self { + CrdsValue::ContactInfo(contact_info) => contact_info.get_signature(), + CrdsValue::Vote(vote) => vote.get_signature(), + CrdsValue::EpochSlots(epoch_slots) => epoch_slots.get_signature(), + } } fn set_signature(&mut self, _: Signature) { @@ -233,6 +237,7 @@ mod test { use super::*; use crate::contact_info::ContactInfo; use crate::test_tx::test_tx; + use bincode::deserialize; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::timing::timestamp; @@ -261,7 +266,7 @@ mod test { let key = v.clone().vote().unwrap().from; assert_eq!(v.label(), CrdsValueLabel::Vote(key)); - let v = CrdsValue::EpochSlots(EpochSlots::new(Pubkey::default(), 0, HashSet::new(), 0)); + let v = CrdsValue::EpochSlots(EpochSlots::new(Pubkey::default(), 0, BTreeSet::new(), 0)); assert_eq!(v.wallclock(), 0); let key = v.clone().epoch_slots().unwrap().from; assert_eq!(v.label(), CrdsValueLabel::EpochSlots(key)); @@ -275,15 +280,28 @@ mod test { verify_signatures(&mut v, &keypair, &wrong_keypair); v = CrdsValue::Vote(Vote::new(&keypair.pubkey(), test_tx(), timestamp())); verify_signatures(&mut v, &keypair, &wrong_keypair); - v = CrdsValue::EpochSlots(EpochSlots::new( - keypair.pubkey(), - 0, - HashSet::new(), - timestamp(), - )); + let btreeset: BTreeSet = vec![1, 2, 3, 6, 8].into_iter().collect(); + v = CrdsValue::EpochSlots(EpochSlots::new(keypair.pubkey(), 0, btreeset, timestamp())); verify_signatures(&mut v, &keypair, &wrong_keypair); } + fn test_serialize_deserialize_value(value: &mut CrdsValue, keypair: &Keypair) { + let num_tries = 10; + value.sign(keypair); + let original_signature = value.get_signature(); + for _ in 0..num_tries { + let serialized_value = serialize(value).unwrap(); + let deserialized_value: CrdsValue = deserialize(&serialized_value).unwrap(); + + // Signatures shouldn't change + let deserialized_signature = deserialized_value.get_signature(); + assert_eq!(original_signature, deserialized_signature); + + // After deserializing, check that the signature is still the same + assert!(deserialized_value.verify()); + } + } + fn verify_signatures( value: &mut CrdsValue, correct_keypair: &Keypair, @@ -294,5 +312,6 @@ mod test { assert!(value.verify()); value.sign(&wrong_keypair); assert!(!value.verify()); + test_serialize_deserialize_value(value, correct_keypair); } } diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 8e98b391ee..57186d56a6 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -9,8 +9,9 @@ use crate::service::Service; use solana_metrics::datapoint_info; use solana_runtime::epoch_schedule::EpochSchedule; use solana_sdk::pubkey::Pubkey; -use std::collections::HashSet; +use std::collections::BTreeSet; use std::net::UdpSocket; +use std::ops::Bound::{Excluded, Unbounded}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::sleep; @@ -89,20 +90,21 @@ impl RepairService { cluster_info: &Arc>, repair_strategy: RepairStrategy, ) { - let mut epoch_slots: HashSet = HashSet::new(); + let mut epoch_slots: BTreeSet = BTreeSet::new(); let id = cluster_info.read().unwrap().id(); + let mut current_root = 0; if let RepairStrategy::RepairAll { ref bank_forks, ref epoch_schedule, .. } = repair_strategy { - let root = bank_forks.read().unwrap().root(); + current_root = bank_forks.read().unwrap().root(); Self::initialize_epoch_slots( id, blocktree, &mut epoch_slots, - root, + current_root, epoch_schedule, cluster_info, ); @@ -128,15 +130,16 @@ impl RepairService { ref completed_slots_receiver, .. } => { - let root = bank_forks.read().unwrap().root(); + let new_root = bank_forks.read().unwrap().root(); Self::update_epoch_slots( id, - root, + new_root, + &mut current_root, &mut epoch_slots, &cluster_info, completed_slots_receiver, ); - Self::generate_repairs(blocktree, root, MAX_REPAIR_LENGTH) + Self::generate_repairs(blocktree, new_root, MAX_REPAIR_LENGTH) } } }; @@ -281,7 +284,7 @@ impl RepairService { fn get_completed_slots_past_root( blocktree: &Blocktree, - slots_in_gossip: &mut HashSet, + slots_in_gossip: &mut BTreeSet, root: u64, epoch_schedule: &EpochSchedule, ) { @@ -305,7 +308,7 @@ impl RepairService { fn initialize_epoch_slots( id: Pubkey, blocktree: &Blocktree, - slots_in_gossip: &mut HashSet, + slots_in_gossip: &mut BTreeSet, root: u64, epoch_schedule: &EpochSchedule, cluster_info: &RwLock, @@ -326,30 +329,44 @@ impl RepairService { // for details. fn update_epoch_slots( id: Pubkey, - root: u64, - slots_in_gossip: &mut HashSet, + latest_known_root: u64, + prev_root: &mut u64, + slots_in_gossip: &mut BTreeSet, cluster_info: &RwLock, completed_slots_receiver: &CompletedSlotsReceiver, ) { let mut should_update = false; while let Ok(completed_slots) = completed_slots_receiver.try_recv() { + should_update = latest_known_root != *prev_root; for slot in completed_slots { // If the newly completed slot > root, and the set did not contain this value // before, we should update gossip. - if slot > root && slots_in_gossip.insert(slot) { - should_update = true; + if slot > latest_known_root { + should_update |= slots_in_gossip.insert(slot); } } } if should_update { - slots_in_gossip.retain(|x| *x > root); - cluster_info - .write() - .unwrap() - .push_epoch_slots(id, root, slots_in_gossip.clone()); + // Filter out everything <= root + if latest_known_root != *prev_root { + *prev_root = latest_known_root; + Self::retain_slots_greater_than_root(slots_in_gossip, latest_known_root); + } + cluster_info.write().unwrap().push_epoch_slots( + id, + latest_known_root, + slots_in_gossip.clone(), + ); } } + + fn retain_slots_greater_than_root(slot_set: &mut BTreeSet, root: u64) { + *slot_set = slot_set + .range((Excluded(&root), Unbounded)) + .cloned() + .collect(); + } } impl Service for RepairService { @@ -603,7 +620,7 @@ mod test { blobs }) .collect(); - let mut full_slots = HashSet::new(); + let mut full_slots = BTreeSet::new(); blocktree.write_blobs(&fork1_blobs).unwrap(); blocktree.write_blobs(&fork2_incomplete_blobs).unwrap(); @@ -618,7 +635,7 @@ mod test { &epoch_schedule, ); - let mut expected: HashSet<_> = fork1.into_iter().filter(|x| *x > root).collect(); + let mut expected: BTreeSet<_> = fork1.into_iter().filter(|x| *x > root).collect(); assert_eq!(full_slots, expected); // Test that slots past the last confirmed epoch boundary don't get included @@ -682,7 +699,7 @@ mod test { }) .unwrap(); - let mut completed_slots = HashSet::new(); + let mut completed_slots = BTreeSet::new(); let node_info = Node::new_localhost_with_pubkey(&Pubkey::default()); let cluster_info = RwLock::new(ClusterInfo::new_with_invalid_keypair( node_info.info.clone(), @@ -692,13 +709,14 @@ mod test { RepairService::update_epoch_slots( Pubkey::default(), root, + &mut root.clone(), &mut completed_slots, &cluster_info, &completed_slots_receiver, ); } - let mut expected: HashSet<_> = (1..num_slots + 1).collect(); + let mut expected: BTreeSet<_> = (1..num_slots + 1).collect(); assert_eq!(completed_slots, expected); // Update with new root, should filter out the slots <= root @@ -708,12 +726,13 @@ mod test { RepairService::update_epoch_slots( Pubkey::default(), root, + &mut 0, &mut completed_slots, &cluster_info, &completed_slots_receiver, ); expected.insert(num_slots + 2); - expected.retain(|x| *x > root); + RepairService::retain_slots_greater_than_root(&mut expected, root); assert_eq!(completed_slots, expected); writer.join().unwrap(); }