Change EpochSlots to use BtreeSet so that serialization/deserialization returns the same order (#4404)

automerge
This commit is contained in:
carllin
2019-05-23 03:50:41 -07:00
committed by Grimes
parent 591fd72e0b
commit 6a9e0bc593
4 changed files with 80 additions and 40 deletions

View File

@ -40,7 +40,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature};
use solana_sdk::timing::{duration_as_ms, timestamp}; use solana_sdk::timing::{duration_as_ms, timestamp};
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use std::cmp::min; use std::cmp::min;
use std::collections::HashSet; use std::collections::BTreeSet;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
@ -306,7 +306,7 @@ impl ClusterInfo {
} }
} }
pub fn push_epoch_slots(&mut self, id: Pubkey, root: u64, slots: HashSet<u64>) { pub fn push_epoch_slots(&mut self, id: Pubkey, root: u64, slots: BTreeSet<u64>) {
let now = timestamp(); let now = timestamp();
let mut entry = CrdsValue::EpochSlots(EpochSlots::new(id, root, slots, now)); let mut entry = CrdsValue::EpochSlots(EpochSlots::new(id, root, slots, now));
entry.sign(&self.keypair); entry.sign(&self.keypair);
@ -1205,11 +1205,13 @@ impl ClusterInfo {
) -> Vec<SharedBlob> { ) -> Vec<SharedBlob> {
let self_id = me.read().unwrap().gossip.id; let self_id = me.read().unwrap().gossip.id;
inc_new_counter_debug!("cluster_info-push_message", 1, 0, 1000); inc_new_counter_debug!("cluster_info-push_message", 1, 0, 1000);
let prunes: Vec<_> = me let prunes: Vec<_> = me
.write() .write()
.unwrap() .unwrap()
.gossip .gossip
.process_push_message(data, timestamp()); .process_push_message(data, timestamp());
if !prunes.is_empty() { if !prunes.is_empty() {
inc_new_counter_debug!("cluster_info-push_message-prunes", prunes.len()); inc_new_counter_debug!("cluster_info-push_message-prunes", prunes.len());
let ci = me.read().unwrap().lookup(from).cloned(); let ci = me.read().unwrap().lookup(from).cloned();

View File

@ -444,7 +444,7 @@ mod tests {
use crate::blocktree::tests::make_many_slot_entries; use crate::blocktree::tests::make_many_slot_entries;
use crate::packet::{Blob, SharedBlob}; use crate::packet::{Blob, SharedBlob};
use crate::streamer; use crate::streamer;
use std::collections::HashSet; use std::collections::BTreeSet;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::mpsc::Receiver; use std::sync::mpsc::Receiver;
@ -528,7 +528,7 @@ mod tests {
// Set up the repairee's EpochSlots, such that they are missing every odd indexed slot // Set up the repairee's EpochSlots, such that they are missing every odd indexed slot
// in the range (repairee_root, num_slots] // in the range (repairee_root, num_slots]
let repairee_root = 0; let repairee_root = 0;
let repairee_slots: HashSet<_> = (0..=num_slots).step_by(2).collect(); let repairee_slots: BTreeSet<_> = (0..=num_slots).step_by(2).collect();
let repairee_epoch_slots = let repairee_epoch_slots =
EpochSlots::new(mock_repairee.id, repairee_root, repairee_slots, 1); EpochSlots::new(mock_repairee.id, repairee_root, repairee_slots, 1);
@ -608,7 +608,7 @@ mod tests {
// Thus, no repairmen should send any blobs to this repairee b/c this repairee // Thus, no repairmen should send any blobs to this repairee b/c this repairee
// already has all the slots for which they have a confirmed leader schedule // already has all the slots for which they have a confirmed leader schedule
let repairee_root = 0; let repairee_root = 0;
let repairee_slots: HashSet<_> = (0..=slots_per_epoch).collect(); let repairee_slots: BTreeSet<_> = (0..=slots_per_epoch).collect();
let repairee_epoch_slots = let repairee_epoch_slots =
EpochSlots::new(mock_repairee.id, repairee_root, repairee_slots.clone(), 1); EpochSlots::new(mock_repairee.id, repairee_root, repairee_slots.clone(), 1);

View File

@ -3,7 +3,7 @@ use bincode::serialize;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, Signable, Signature}; use solana_sdk::signature::{Keypair, Signable, Signature};
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use std::collections::HashSet; use std::collections::BTreeSet;
use std::fmt; use std::fmt;
/// CrdsValue that is replicated across the cluster /// CrdsValue that is replicated across the cluster
@ -21,13 +21,13 @@ pub enum CrdsValue {
pub struct EpochSlots { pub struct EpochSlots {
pub from: Pubkey, pub from: Pubkey,
pub root: u64, pub root: u64,
pub slots: HashSet<u64>, pub slots: BTreeSet<u64>,
pub signature: Signature, pub signature: Signature,
pub wallclock: u64, pub wallclock: u64,
} }
impl EpochSlots { impl EpochSlots {
pub fn new(from: Pubkey, root: u64, slots: HashSet<u64>, wallclock: u64) -> Self { pub fn new(from: Pubkey, root: u64, slots: BTreeSet<u64>, wallclock: u64) -> Self {
Self { Self {
from, from,
root, root,
@ -47,7 +47,7 @@ impl Signable for EpochSlots {
#[derive(Serialize)] #[derive(Serialize)]
struct SignData<'a> { struct SignData<'a> {
root: u64, root: u64,
slots: &'a HashSet<u64>, slots: &'a BTreeSet<u64>,
wallclock: u64, wallclock: u64,
} }
let data = SignData { let data = SignData {
@ -220,7 +220,11 @@ impl Signable for CrdsValue {
} }
fn get_signature(&self) -> Signature { fn get_signature(&self) -> Signature {
unimplemented!() match self {
CrdsValue::ContactInfo(contact_info) => contact_info.get_signature(),
CrdsValue::Vote(vote) => vote.get_signature(),
CrdsValue::EpochSlots(epoch_slots) => epoch_slots.get_signature(),
}
} }
fn set_signature(&mut self, _: Signature) { fn set_signature(&mut self, _: Signature) {
@ -233,6 +237,7 @@ mod test {
use super::*; use super::*;
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use crate::test_tx::test_tx; use crate::test_tx::test_tx;
use bincode::deserialize;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::timing::timestamp; use solana_sdk::timing::timestamp;
@ -261,7 +266,7 @@ mod test {
let key = v.clone().vote().unwrap().from; let key = v.clone().vote().unwrap().from;
assert_eq!(v.label(), CrdsValueLabel::Vote(key)); assert_eq!(v.label(), CrdsValueLabel::Vote(key));
let v = CrdsValue::EpochSlots(EpochSlots::new(Pubkey::default(), 0, HashSet::new(), 0)); let v = CrdsValue::EpochSlots(EpochSlots::new(Pubkey::default(), 0, BTreeSet::new(), 0));
assert_eq!(v.wallclock(), 0); assert_eq!(v.wallclock(), 0);
let key = v.clone().epoch_slots().unwrap().from; let key = v.clone().epoch_slots().unwrap().from;
assert_eq!(v.label(), CrdsValueLabel::EpochSlots(key)); assert_eq!(v.label(), CrdsValueLabel::EpochSlots(key));
@ -275,15 +280,28 @@ mod test {
verify_signatures(&mut v, &keypair, &wrong_keypair); verify_signatures(&mut v, &keypair, &wrong_keypair);
v = CrdsValue::Vote(Vote::new(&keypair.pubkey(), test_tx(), timestamp())); v = CrdsValue::Vote(Vote::new(&keypair.pubkey(), test_tx(), timestamp()));
verify_signatures(&mut v, &keypair, &wrong_keypair); verify_signatures(&mut v, &keypair, &wrong_keypair);
v = CrdsValue::EpochSlots(EpochSlots::new( let btreeset: BTreeSet<u64> = vec![1, 2, 3, 6, 8].into_iter().collect();
keypair.pubkey(), v = CrdsValue::EpochSlots(EpochSlots::new(keypair.pubkey(), 0, btreeset, timestamp()));
0,
HashSet::new(),
timestamp(),
));
verify_signatures(&mut v, &keypair, &wrong_keypair); verify_signatures(&mut v, &keypair, &wrong_keypair);
} }
fn test_serialize_deserialize_value(value: &mut CrdsValue, keypair: &Keypair) {
let num_tries = 10;
value.sign(keypair);
let original_signature = value.get_signature();
for _ in 0..num_tries {
let serialized_value = serialize(value).unwrap();
let deserialized_value: CrdsValue = deserialize(&serialized_value).unwrap();
// Signatures shouldn't change
let deserialized_signature = deserialized_value.get_signature();
assert_eq!(original_signature, deserialized_signature);
// After deserializing, check that the signature is still the same
assert!(deserialized_value.verify());
}
}
fn verify_signatures( fn verify_signatures(
value: &mut CrdsValue, value: &mut CrdsValue,
correct_keypair: &Keypair, correct_keypair: &Keypair,
@ -294,5 +312,6 @@ mod test {
assert!(value.verify()); assert!(value.verify());
value.sign(&wrong_keypair); value.sign(&wrong_keypair);
assert!(!value.verify()); assert!(!value.verify());
test_serialize_deserialize_value(value, correct_keypair);
} }
} }

View File

@ -9,8 +9,9 @@ use crate::service::Service;
use solana_metrics::datapoint_info; use solana_metrics::datapoint_info;
use solana_runtime::epoch_schedule::EpochSchedule; use solana_runtime::epoch_schedule::EpochSchedule;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::collections::HashSet; use std::collections::BTreeSet;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::ops::Bound::{Excluded, Unbounded};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::sleep; use std::thread::sleep;
@ -89,20 +90,21 @@ impl RepairService {
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
repair_strategy: RepairStrategy, repair_strategy: RepairStrategy,
) { ) {
let mut epoch_slots: HashSet<u64> = HashSet::new(); let mut epoch_slots: BTreeSet<u64> = BTreeSet::new();
let id = cluster_info.read().unwrap().id(); let id = cluster_info.read().unwrap().id();
let mut current_root = 0;
if let RepairStrategy::RepairAll { if let RepairStrategy::RepairAll {
ref bank_forks, ref bank_forks,
ref epoch_schedule, ref epoch_schedule,
.. ..
} = repair_strategy } = repair_strategy
{ {
let root = bank_forks.read().unwrap().root(); current_root = bank_forks.read().unwrap().root();
Self::initialize_epoch_slots( Self::initialize_epoch_slots(
id, id,
blocktree, blocktree,
&mut epoch_slots, &mut epoch_slots,
root, current_root,
epoch_schedule, epoch_schedule,
cluster_info, cluster_info,
); );
@ -128,15 +130,16 @@ impl RepairService {
ref completed_slots_receiver, ref completed_slots_receiver,
.. ..
} => { } => {
let root = bank_forks.read().unwrap().root(); let new_root = bank_forks.read().unwrap().root();
Self::update_epoch_slots( Self::update_epoch_slots(
id, id,
root, new_root,
&mut current_root,
&mut epoch_slots, &mut epoch_slots,
&cluster_info, &cluster_info,
completed_slots_receiver, completed_slots_receiver,
); );
Self::generate_repairs(blocktree, root, MAX_REPAIR_LENGTH) Self::generate_repairs(blocktree, new_root, MAX_REPAIR_LENGTH)
} }
} }
}; };
@ -281,7 +284,7 @@ impl RepairService {
fn get_completed_slots_past_root( fn get_completed_slots_past_root(
blocktree: &Blocktree, blocktree: &Blocktree,
slots_in_gossip: &mut HashSet<u64>, slots_in_gossip: &mut BTreeSet<u64>,
root: u64, root: u64,
epoch_schedule: &EpochSchedule, epoch_schedule: &EpochSchedule,
) { ) {
@ -305,7 +308,7 @@ impl RepairService {
fn initialize_epoch_slots( fn initialize_epoch_slots(
id: Pubkey, id: Pubkey,
blocktree: &Blocktree, blocktree: &Blocktree,
slots_in_gossip: &mut HashSet<u64>, slots_in_gossip: &mut BTreeSet<u64>,
root: u64, root: u64,
epoch_schedule: &EpochSchedule, epoch_schedule: &EpochSchedule,
cluster_info: &RwLock<ClusterInfo>, cluster_info: &RwLock<ClusterInfo>,
@ -326,30 +329,44 @@ impl RepairService {
// for details. // for details.
fn update_epoch_slots( fn update_epoch_slots(
id: Pubkey, id: Pubkey,
root: u64, latest_known_root: u64,
slots_in_gossip: &mut HashSet<u64>, prev_root: &mut u64,
slots_in_gossip: &mut BTreeSet<u64>,
cluster_info: &RwLock<ClusterInfo>, cluster_info: &RwLock<ClusterInfo>,
completed_slots_receiver: &CompletedSlotsReceiver, completed_slots_receiver: &CompletedSlotsReceiver,
) { ) {
let mut should_update = false; let mut should_update = false;
while let Ok(completed_slots) = completed_slots_receiver.try_recv() { while let Ok(completed_slots) = completed_slots_receiver.try_recv() {
should_update = latest_known_root != *prev_root;
for slot in completed_slots { for slot in completed_slots {
// If the newly completed slot > root, and the set did not contain this value // If the newly completed slot > root, and the set did not contain this value
// before, we should update gossip. // before, we should update gossip.
if slot > root && slots_in_gossip.insert(slot) { if slot > latest_known_root {
should_update = true; should_update |= slots_in_gossip.insert(slot);
} }
} }
} }
if should_update { if should_update {
slots_in_gossip.retain(|x| *x > root); // Filter out everything <= root
cluster_info if latest_known_root != *prev_root {
.write() *prev_root = latest_known_root;
.unwrap() Self::retain_slots_greater_than_root(slots_in_gossip, latest_known_root);
.push_epoch_slots(id, root, slots_in_gossip.clone()); }
cluster_info.write().unwrap().push_epoch_slots(
id,
latest_known_root,
slots_in_gossip.clone(),
);
} }
} }
fn retain_slots_greater_than_root(slot_set: &mut BTreeSet<u64>, root: u64) {
*slot_set = slot_set
.range((Excluded(&root), Unbounded))
.cloned()
.collect();
}
} }
impl Service for RepairService { impl Service for RepairService {
@ -603,7 +620,7 @@ mod test {
blobs blobs
}) })
.collect(); .collect();
let mut full_slots = HashSet::new(); let mut full_slots = BTreeSet::new();
blocktree.write_blobs(&fork1_blobs).unwrap(); blocktree.write_blobs(&fork1_blobs).unwrap();
blocktree.write_blobs(&fork2_incomplete_blobs).unwrap(); blocktree.write_blobs(&fork2_incomplete_blobs).unwrap();
@ -618,7 +635,7 @@ mod test {
&epoch_schedule, &epoch_schedule,
); );
let mut expected: HashSet<_> = fork1.into_iter().filter(|x| *x > root).collect(); let mut expected: BTreeSet<_> = fork1.into_iter().filter(|x| *x > root).collect();
assert_eq!(full_slots, expected); assert_eq!(full_slots, expected);
// Test that slots past the last confirmed epoch boundary don't get included // Test that slots past the last confirmed epoch boundary don't get included
@ -682,7 +699,7 @@ mod test {
}) })
.unwrap(); .unwrap();
let mut completed_slots = HashSet::new(); let mut completed_slots = BTreeSet::new();
let node_info = Node::new_localhost_with_pubkey(&Pubkey::default()); let node_info = Node::new_localhost_with_pubkey(&Pubkey::default());
let cluster_info = RwLock::new(ClusterInfo::new_with_invalid_keypair( let cluster_info = RwLock::new(ClusterInfo::new_with_invalid_keypair(
node_info.info.clone(), node_info.info.clone(),
@ -692,13 +709,14 @@ mod test {
RepairService::update_epoch_slots( RepairService::update_epoch_slots(
Pubkey::default(), Pubkey::default(),
root, root,
&mut root.clone(),
&mut completed_slots, &mut completed_slots,
&cluster_info, &cluster_info,
&completed_slots_receiver, &completed_slots_receiver,
); );
} }
let mut expected: HashSet<_> = (1..num_slots + 1).collect(); let mut expected: BTreeSet<_> = (1..num_slots + 1).collect();
assert_eq!(completed_slots, expected); assert_eq!(completed_slots, expected);
// Update with new root, should filter out the slots <= root // Update with new root, should filter out the slots <= root
@ -708,12 +726,13 @@ mod test {
RepairService::update_epoch_slots( RepairService::update_epoch_slots(
Pubkey::default(), Pubkey::default(),
root, root,
&mut 0,
&mut completed_slots, &mut completed_slots,
&cluster_info, &cluster_info,
&completed_slots_receiver, &completed_slots_receiver,
); );
expected.insert(num_slots + 2); expected.insert(num_slots + 2);
expected.retain(|x| *x > root); RepairService::retain_slots_greater_than_root(&mut expected, root);
assert_eq!(completed_slots, expected); assert_eq!(completed_slots, expected);
writer.join().unwrap(); writer.join().unwrap();
} }