Cluster has no way to know which slots are available (#8732)

automerge
This commit is contained in:
anatoly yakovenko
2020-03-11 21:31:50 -07:00
committed by GitHub
parent fe1c99c0cf
commit f64ab49307
9 changed files with 1026 additions and 782 deletions

View File

@@ -12,8 +12,6 @@
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
//!
//! Bank needs to provide an interface for us to query the stake weight
use crate::crds_value::CompressionType::*;
use crate::crds_value::EpochIncompleteSlots;
use crate::packet::limited_deserialize;
use crate::streamer::{PacketReceiver, PacketSender};
use crate::{
@@ -21,14 +19,16 @@ use crate::{
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlots, SnapshotHash, Vote},
crds_value::{
self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, SnapshotHash, Vote,
},
epoch_slots::EpochSlots,
packet::{Packet, PACKET_DATA_SIZE},
result::{Error, Result},
sendmmsg::{multicast, send_mmsg},
weighted_shuffle::{weighted_best, weighted_shuffle},
};
use bincode::{serialize, serialized_size};
use compression::prelude::*;
use core::cmp;
use itertools::Itertools;
use rayon::iter::IntoParallelIterator;
@@ -46,7 +46,7 @@ use solana_rayon_threadlimit::get_thread_count;
use solana_sdk::hash::Hash;
use solana_sdk::timing::duration_as_s;
use solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_SLOTS_PER_EPOCH},
pubkey::Pubkey,
signature::{Keypair, Signable, Signature},
timing::{duration_as_ms, timestamp},
@@ -55,7 +55,7 @@ use solana_sdk::{
use std::{
borrow::Cow,
cmp::min,
collections::{BTreeSet, HashMap, HashSet},
collections::{HashMap, HashSet},
fmt,
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
sync::atomic::{AtomicBool, Ordering},
@@ -71,7 +71,8 @@ pub const DATA_PLANE_FANOUT: usize = 200;
/// milliseconds we sleep for between gossip requests
pub const GOSSIP_SLEEP_MILLIS: u64 = 100;
/// The maximum size of a bloom filter
pub const MAX_BLOOM_SIZE: usize = 1018;
pub const MAX_BLOOM_SIZE: usize = MAX_CRDS_OBJECT_SIZE;
pub const MAX_CRDS_OBJECT_SIZE: usize = 928;
/// The maximum size of a protocol payload
const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE;
/// The largest protocol header size
@@ -81,9 +82,6 @@ const MAX_PROTOCOL_HEADER_SIZE: u64 = 214;
/// 128MB/PACKET_DATA_SIZE
const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE;
const NUM_BITS_PER_BYTE: u64 = 8;
const MIN_SIZE_TO_COMPRESS_GZIP: u64 = 64;
/// Keep the number of snapshot hashes a node publishes under MAX_PROTOCOL_PAYLOAD_SIZE
pub const MAX_SNAPSHOT_HASHES: usize = 16;
@@ -254,6 +252,16 @@ impl ClusterInfo {
self.lookup(&self.id()).cloned().unwrap()
}
pub fn lookup_epoch_slots(&self, ix: EpochSlotsIndex) -> EpochSlots {
let entry = CrdsValueLabel::EpochSlots(ix, self.id());
self.gossip
.crds
.lookup(&entry)
.and_then(CrdsValue::epoch_slots)
.cloned()
.unwrap_or_else(|| EpochSlots::new(self.id(), timestamp()))
}
pub fn contact_info_trace(&self) -> String {
let now = timestamp();
let mut spy_nodes = 0;
@@ -328,121 +336,81 @@ impl ClusterInfo {
)
}
pub fn compress_incomplete_slots(incomplete_slots: &BTreeSet<Slot>) -> EpochIncompleteSlots {
if !incomplete_slots.is_empty() {
let first_slot = incomplete_slots
.iter()
.next()
.expect("expected to find at least one slot");
let last_slot = incomplete_slots
.iter()
.next_back()
.expect("expected to find last slot");
let num_uncompressed_bits = last_slot.saturating_sub(*first_slot) + 1;
let num_uncompressed_bytes = if num_uncompressed_bits % NUM_BITS_PER_BYTE > 0 {
1
} else {
0
} + num_uncompressed_bits / NUM_BITS_PER_BYTE;
let mut uncompressed = vec![0u8; num_uncompressed_bytes as usize];
incomplete_slots.iter().for_each(|slot| {
let offset_from_first_slot = slot.saturating_sub(*first_slot);
let index = offset_from_first_slot / NUM_BITS_PER_BYTE;
let bit_index = offset_from_first_slot % NUM_BITS_PER_BYTE;
uncompressed[index as usize] |= 1 << bit_index;
});
if num_uncompressed_bytes >= MIN_SIZE_TO_COMPRESS_GZIP {
if let Ok(compressed) = uncompressed
.iter()
.cloned()
.encode(&mut GZipEncoder::new(), Action::Finish)
.collect::<std::result::Result<Vec<u8>, _>>()
{
return EpochIncompleteSlots {
first: *first_slot,
compression: GZip,
compressed_list: compressed,
};
}
} else {
return EpochIncompleteSlots {
first: *first_slot,
compression: Uncompressed,
compressed_list: uncompressed,
};
}
}
EpochIncompleteSlots::default()
}
fn bitmap_to_slot_list(first: Slot, bitmap: &[u8]) -> BTreeSet<Slot> {
let mut old_incomplete_slots: BTreeSet<Slot> = BTreeSet::new();
bitmap.iter().enumerate().for_each(|(i, val)| {
if *val != 0 {
(0..8).for_each(|bit_index| {
if (1 << bit_index & *val) != 0 {
let slot = first + i as u64 * NUM_BITS_PER_BYTE + bit_index as u64;
old_incomplete_slots.insert(slot);
}
})
}
});
old_incomplete_slots
}
pub fn decompress_incomplete_slots(slots: &EpochIncompleteSlots) -> BTreeSet<Slot> {
match slots.compression {
Uncompressed => Self::bitmap_to_slot_list(slots.first, &slots.compressed_list),
GZip => {
if let Ok(decompressed) = slots
.compressed_list
.iter()
.cloned()
.decode(&mut GZipDecoder::new())
.collect::<std::result::Result<Vec<u8>, _>>()
{
Self::bitmap_to_slot_list(slots.first, &decompressed)
} else {
BTreeSet::new()
}
}
BZip2 => {
if let Ok(decompressed) = slots
.compressed_list
.iter()
.cloned()
.decode(&mut BZip2Decoder::new())
.collect::<std::result::Result<Vec<u8>, _>>()
{
Self::bitmap_to_slot_list(slots.first, &decompressed)
} else {
BTreeSet::new()
}
}
}
}
pub fn push_epoch_slots(
&mut self,
id: Pubkey,
root: Slot,
min: Slot,
slots: BTreeSet<Slot>,
incomplete_slots: &BTreeSet<Slot>,
) {
let compressed = Self::compress_incomplete_slots(incomplete_slots);
pub fn push_lowest_slot(&mut self, id: Pubkey, min: Slot) {
let now = timestamp();
let entry = CrdsValue::new_signed(
CrdsData::EpochSlots(
0,
EpochSlots::new(id, root, min, slots, vec![compressed], now),
),
&self.keypair,
);
self.gossip
.process_push_message(&self.id(), vec![entry], now);
let last = self
.gossip
.crds
.lookup(&CrdsValueLabel::LowestSlot(self.id()))
.and_then(|x| x.lowest_slot())
.map(|x| x.lowest)
.unwrap_or(0);
if min > last {
let entry = CrdsValue::new_signed(
CrdsData::LowestSlot(0, LowestSlot::new(id, min, now)),
&self.keypair,
);
self.gossip
.process_push_message(&self.id(), vec![entry], now);
}
}
pub fn push_epoch_slots(&mut self, update: &[Slot]) {
let mut num = 0;
let mut current_slots: Vec<_> = (0..crds_value::MAX_EPOCH_SLOTS)
.filter_map(|ix| {
Some((
self.gossip
.crds
.lookup(&CrdsValueLabel::EpochSlots(ix, self.id()))
.and_then(CrdsValue::epoch_slots)
.and_then(|x| Some((x.wallclock, x.first_slot()?)))?,
ix,
))
})
.collect();
current_slots.sort();
let min_slot: Slot = current_slots
.iter()
.map(|((_, s), _)| *s)
.min()
.unwrap_or(0);
let max_slot: Slot = update.iter().max().cloned().unwrap_or(0);
let total_slots = max_slot as isize - min_slot as isize;
// WARN if CRDS is not storing at least a full epoch worth of slots
if DEFAULT_SLOTS_PER_EPOCH as isize > total_slots
&& crds_value::MAX_EPOCH_SLOTS as usize <= current_slots.len()
{
inc_new_counter_warn!("cluster_info-epoch_slots-filled", 1);
warn!(
"EPOCH_SLOTS are filling up FAST {}/{}",
total_slots,
current_slots.len()
);
}
let mut reset = false;
let mut epoch_slot_index = current_slots.last().map(|(_, x)| *x).unwrap_or(0);
while num < update.len() {
let ix = (epoch_slot_index % crds_value::MAX_EPOCH_SLOTS) as u8;
let now = timestamp();
let mut slots = if !reset {
self.lookup_epoch_slots(ix)
} else {
EpochSlots::new(self.id(), now)
};
let n = slots.fill(&update[num..], now);
if n > 0 {
let entry = CrdsValue::new_signed(CrdsData::EpochSlots(ix, slots), &self.keypair);
self.gossip
.process_push_message(&self.id(), vec![entry], now);
}
num += n;
if num < update.len() {
epoch_slot_index += 1;
reset = true;
}
}
}
pub fn push_snapshot_hashes(&mut self, snapshot_hashes: Vec<(Slot, Hash)>) {
if snapshot_hashes.len() > MAX_SNAPSHOT_HASHES {
warn!(
@@ -526,21 +494,39 @@ impl ClusterInfo {
.map(|x| &x.value.snapshot_hash().unwrap().hashes)
}
pub fn get_epoch_state_for_node(
pub fn get_lowest_slot_for_node(
&self,
pubkey: &Pubkey,
since: Option<u64>,
) -> Option<(&EpochSlots, u64)> {
) -> Option<(&LowestSlot, u64)> {
self.gossip
.crds
.table
.get(&CrdsValueLabel::EpochSlots(*pubkey))
.get(&CrdsValueLabel::LowestSlot(*pubkey))
.filter(|x| {
since
.map(|since| x.insert_timestamp > since)
.unwrap_or(true)
})
.map(|x| (x.value.epoch_slots().unwrap(), x.insert_timestamp))
.map(|x| (x.value.lowest_slot().unwrap(), x.insert_timestamp))
}
pub fn get_epoch_slots_since(&self, since: Option<u64>) -> (Vec<EpochSlots>, Option<u64>) {
let vals: Vec<_> = self
.gossip
.crds
.table
.values()
.filter(|x| {
since
.map(|since| x.insert_timestamp > since)
.unwrap_or(true)
})
.filter_map(|x| Some((x.value.epoch_slots()?, x.insert_timestamp)))
.collect();
let max = vals.iter().map(|x| x.1).max().or(since);
let vec = vals.into_iter().map(|x| x.0).cloned().collect();
(vec, max)
}
pub fn get_contact_info_for_node(&self, pubkey: &Pubkey) -> Option<&ContactInfo> {
@@ -684,8 +670,8 @@ impl ClusterInfo {
&& x.shred_version == me.shred_version
&& ContactInfo::is_valid_address(&x.serve_repair)
&& {
self.get_epoch_state_for_node(&x.id, None)
.map(|(epoch_slots, _)| epoch_slots.lowest <= slot)
self.get_lowest_slot_for_node(&x.id, None)
.map(|(lowest_slot, _)| lowest_slot.lowest <= slot)
.unwrap_or_else(|| /* fallback to legacy behavior */ true)
}
})
@@ -2280,6 +2266,29 @@ mod tests {
assert_eq!(max_ts, new_max_ts);
}
#[test]
fn test_push_epoch_slots() {
let keys = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0);
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
let (slots, since) = cluster_info.get_epoch_slots_since(None);
assert!(slots.is_empty());
assert!(since.is_none());
cluster_info.push_epoch_slots(&[0]);
let (slots, since) = cluster_info.get_epoch_slots_since(Some(std::u64::MAX));
assert!(slots.is_empty());
assert_eq!(since, Some(std::u64::MAX));
let (slots, since) = cluster_info.get_epoch_slots_since(None);
assert_eq!(slots.len(), 1);
assert!(since.is_some());
let (slots, since2) = cluster_info.get_epoch_slots_since(since.clone());
assert!(slots.is_empty());
assert_eq!(since2, since);
}
#[test]
fn test_add_entrypoint() {
let node_keypair = Arc::new(Keypair::new());
@@ -2333,20 +2342,9 @@ mod tests {
#[test]
fn test_split_messages_large() {
let mut btree_slots = BTreeSet::new();
for i in 0..128 {
btree_slots.insert(i);
}
let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(
let value = CrdsValue::new_unsigned(CrdsData::LowestSlot(
0,
EpochSlots {
from: Pubkey::default(),
root: 0,
lowest: 0,
slots: btree_slots,
stash: vec![],
wallclock: 0,
},
LowestSlot::new(Pubkey::default(), 0, 0),
));
test_split_messages(value);
}
@@ -2358,39 +2356,19 @@ mod tests {
let payload: Vec<CrdsValue> = vec![];
let vec_size = serialized_size(&payload).unwrap();
let desired_size = MAX_PROTOCOL_PAYLOAD_SIZE - vec_size;
let mut value = CrdsValue::new_unsigned(CrdsData::EpochSlots(
0,
EpochSlots {
from: Pubkey::default(),
root: 0,
lowest: 0,
slots: BTreeSet::new(),
stash: vec![],
wallclock: 0,
},
));
let mut value = CrdsValue::new_unsigned(CrdsData::SnapshotHash(SnapshotHash {
from: Pubkey::default(),
hashes: vec![],
wallclock: 0,
}));
let mut i = 0;
while value.size() <= desired_size {
let slots = (0..i).collect::<BTreeSet<_>>();
if slots.len() > 200 {
panic!(
"impossible to match size: last {:?} vs desired {:?}",
serialized_size(&value).unwrap(),
desired_size
);
}
value.data = CrdsData::EpochSlots(
0,
EpochSlots {
from: Pubkey::default(),
root: 0,
lowest: 0,
slots,
stash: vec![],
wallclock: 0,
},
);
value.data = CrdsData::SnapshotHash(SnapshotHash {
from: Pubkey::default(),
hashes: vec![(0, Hash::default()); i],
wallclock: 0,
});
i += 1;
}
let split = ClusterInfo::split_gossip_messages(vec![value.clone()]);
@@ -2520,26 +2498,17 @@ mod tests {
node_keypair,
);
for i in 0..10 {
let mut peer_root = 5;
let mut peer_lowest = 0;
if i >= 5 {
// make these invalid for the upcoming repair request
peer_root = 15;
peer_lowest = 10;
}
let other_node_pubkey = Pubkey::new_rand();
let other_node = ContactInfo::new_localhost(&other_node_pubkey, timestamp());
cluster_info.insert_info(other_node.clone());
let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(
let value = CrdsValue::new_unsigned(CrdsData::LowestSlot(
0,
EpochSlots::new(
other_node_pubkey,
peer_root,
peer_lowest,
BTreeSet::new(),
vec![],
timestamp(),
),
LowestSlot::new(other_node_pubkey, peer_lowest, timestamp()),
));
let _ = cluster_info.gossip.crds.insert(value, timestamp());
}
@@ -2549,7 +2518,8 @@ mod tests {
#[test]
fn test_max_bloom_size() {
assert_eq!(MAX_BLOOM_SIZE, max_bloom_size());
// check that the constant fits into the dynamic size
assert!(MAX_BLOOM_SIZE <= max_bloom_size());
}
#[test]
@@ -2602,36 +2572,24 @@ mod tests {
}
#[test]
fn test_compress_incomplete_slots() {
let mut incomplete_slots: BTreeSet<Slot> = BTreeSet::new();
assert_eq!(
EpochIncompleteSlots::default(),
ClusterInfo::compress_incomplete_slots(&incomplete_slots)
fn test_push_epoch_slots_large() {
use rand::Rng;
let node_keypair = Arc::new(Keypair::new());
let mut cluster_info = ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
);
incomplete_slots.insert(100);
let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
assert_eq!(100, compressed.first);
let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed);
assert_eq!(incomplete_slots, decompressed);
incomplete_slots.insert(104);
let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
assert_eq!(100, compressed.first);
let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed);
assert_eq!(incomplete_slots, decompressed);
incomplete_slots.insert(80);
let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
assert_eq!(80, compressed.first);
let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed);
assert_eq!(incomplete_slots, decompressed);
incomplete_slots.insert(10000);
let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
assert_eq!(80, compressed.first);
let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed);
assert_eq!(incomplete_slots, decompressed);
let mut range: Vec<Slot> = vec![];
//random should be hard to compress
for _ in 0..32000 {
let last = *range.last().unwrap_or(&0);
range.push(last + rand::thread_rng().gen_range(1, 32));
}
cluster_info.push_epoch_slots(&range[..16000]);
cluster_info.push_epoch_slots(&range[16000..]);
let (slots, since) = cluster_info.get_epoch_slots_since(None);
let slots: Vec<_> = slots.iter().flat_map(|x| x.to_slots(0)).collect();
assert_eq!(slots, range);
assert!(since.is_some());
}
}