Bitwise compress incomplete epoch slots (#8341)

This commit is contained in:
Pankaj Garg
2020-02-19 20:24:09 -08:00
committed by GitHub
parent 221866f74e
commit ea8d9d1aea
2 changed files with 109 additions and 94 deletions

View File

@@ -12,6 +12,7 @@
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
//!
//! Bank needs to provide an interface for us to query the stake weight
use crate::crds_value::EpochIncompleteSlots;
use crate::packet::limited_deserialize;
use crate::streamer::{PacketReceiver, PacketSender};
use crate::{
@@ -77,6 +78,8 @@ const MAX_PROTOCOL_HEADER_SIZE: u64 = 214;
/// 128MB/PACKET_DATA_SIZE
const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE;
const NUM_BITS_PER_BYTE: u64 = 8;
#[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError {
NoPeers,
@@ -316,7 +319,7 @@ impl ClusterInfo {
)
}
pub fn compress_incomplete_slots(incomplete_slots: &BTreeSet<Slot>) -> (Slot, Vec<u8>) {
pub fn compress_incomplete_slots(incomplete_slots: &BTreeSet<Slot>) -> EpochIncompleteSlots {
if !incomplete_slots.is_empty() {
let first_slot = incomplete_slots
.iter()
@@ -326,9 +329,18 @@ impl ClusterInfo {
.iter()
.next_back()
.expect("expected to find last slot");
let mut uncompressed = vec![0u8; (last_slot.saturating_sub(*first_slot) + 1) as usize];
let num_uncompressed_bits = last_slot.saturating_sub(*first_slot) + 1;
let num_uncompressed_bytes = if num_uncompressed_bits % NUM_BITS_PER_BYTE > 0 {
1
} else {
0
} + num_uncompressed_bits / NUM_BITS_PER_BYTE;
let mut uncompressed = vec![0u8; num_uncompressed_bytes as usize];
incomplete_slots.iter().for_each(|slot| {
uncompressed[slot.saturating_sub(*first_slot) as usize] = 1;
let offset_from_first_slot = slot.saturating_sub(*first_slot);
let index = offset_from_first_slot / NUM_BITS_PER_BYTE;
let bit_index = offset_from_first_slot % NUM_BITS_PER_BYTE;
uncompressed[index as usize] |= 1 << bit_index;
});
if let Ok(compressed) = uncompressed
.iter()
@@ -336,27 +348,33 @@ impl ClusterInfo {
.encode(&mut GZipEncoder::new(), Action::Finish)
.collect::<std::result::Result<Vec<u8>, _>>()
{
(*first_slot, compressed)
} else {
(0, vec![])
return EpochIncompleteSlots {
first: *first_slot,
compressed_list: compressed,
};
}
} else {
(0, vec![])
}
EpochIncompleteSlots::default()
}
pub fn decompress_incomplete_slots(first_slot: u64, compressed: &[u8]) -> BTreeSet<Slot> {
pub fn decompress_incomplete_slots(slots: &EpochIncompleteSlots) -> BTreeSet<Slot> {
let mut old_incomplete_slots: BTreeSet<Slot> = BTreeSet::new();
if let Ok(decompressed) = compressed
if let Ok(decompressed) = slots
.compressed_list
.iter()
.cloned()
.decode(&mut GZipDecoder::new())
.collect::<std::result::Result<Vec<u8>, _>>()
{
decompressed.iter().enumerate().for_each(|(i, val)| {
if *val == 1 {
old_incomplete_slots.insert(first_slot + i as u64);
if *val != 0 {
(0..8).for_each(|bit_index| {
if (1 << bit_index & *val) != 0 {
let slot = slots.first + i as u64 * NUM_BITS_PER_BYTE + bit_index;
old_incomplete_slots.insert(slot as u64);
}
})
}
})
}
@@ -372,19 +390,13 @@ impl ClusterInfo {
slots: BTreeSet<Slot>,
incomplete_slots: &BTreeSet<Slot>,
) {
let (first_missing_slot, compressed_map) =
Self::compress_incomplete_slots(incomplete_slots);
let compressed = Self::compress_incomplete_slots(incomplete_slots);
let now = timestamp();
let entry = CrdsValue::new_signed(
CrdsData::EpochSlots(EpochSlots::new(
id,
root,
min,
slots,
first_missing_slot,
compressed_map,
now,
)),
CrdsData::EpochSlots(
0,
EpochSlots::new(id, root, min, slots, vec![compressed], now),
),
&self.keypair,
);
self.gossip
@@ -2221,15 +2233,17 @@ mod tests {
for i in 0..128 {
btree_slots.insert(i);
}
let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots {
from: Pubkey::default(),
root: 0,
lowest: 0,
slots: btree_slots,
first_missing: 0,
stash: vec![],
wallclock: 0,
}));
let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(
0,
EpochSlots {
from: Pubkey::default(),
root: 0,
lowest: 0,
slots: btree_slots,
stash: vec![],
wallclock: 0,
},
));
test_split_messages(value);
}
@@ -2240,15 +2254,17 @@ mod tests {
let payload: Vec<CrdsValue> = vec![];
let vec_size = serialized_size(&payload).unwrap();
let desired_size = MAX_PROTOCOL_PAYLOAD_SIZE - vec_size;
let mut value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots {
from: Pubkey::default(),
root: 0,
lowest: 0,
slots: BTreeSet::new(),
first_missing: 0,
stash: vec![],
wallclock: 0,
}));
let mut value = CrdsValue::new_unsigned(CrdsData::EpochSlots(
0,
EpochSlots {
from: Pubkey::default(),
root: 0,
lowest: 0,
slots: BTreeSet::new(),
stash: vec![],
wallclock: 0,
},
));
let mut i = 0;
while value.size() <= desired_size {
@@ -2260,15 +2276,17 @@ mod tests {
desired_size
);
}
value.data = CrdsData::EpochSlots(EpochSlots {
from: Pubkey::default(),
root: 0,
lowest: 0,
slots,
first_missing: 0,
stash: vec![],
wallclock: 0,
});
value.data = CrdsData::EpochSlots(
0,
EpochSlots {
from: Pubkey::default(),
root: 0,
lowest: 0,
slots,
stash: vec![],
wallclock: 0,
},
);
i += 1;
}
let split = ClusterInfo::split_gossip_messages(vec![value.clone()]);
@@ -2408,15 +2426,17 @@ mod tests {
let other_node_pubkey = Pubkey::new_rand();
let other_node = ContactInfo::new_localhost(&other_node_pubkey, timestamp());
cluster_info.insert_info(other_node.clone());
let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new(
other_node_pubkey,
peer_root,
peer_lowest,
BTreeSet::new(),
let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(
0,
vec![],
timestamp(),
)));
EpochSlots::new(
other_node_pubkey,
peer_root,
peer_lowest,
BTreeSet::new(),
vec![],
timestamp(),
),
));
let _ = cluster_info.gossip.crds.insert(value, timestamp());
}
// only half the visible peers should be eligible to serve this repair
@@ -2482,26 +2502,26 @@ mod tests {
let mut incomplete_slots: BTreeSet<Slot> = BTreeSet::new();
assert_eq!(
(0, vec![]),
EpochIncompleteSlots::default(),
ClusterInfo::compress_incomplete_slots(&incomplete_slots)
);
incomplete_slots.insert(100);
let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
assert_eq!(100, first);
let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed);
let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
assert_eq!(100, compressed.first);
let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed);
assert_eq!(incomplete_slots, decompressed);
incomplete_slots.insert(104);
let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
assert_eq!(100, first);
let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed);
let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
assert_eq!(100, compressed.first);
let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed);
assert_eq!(incomplete_slots, decompressed);
incomplete_slots.insert(80);
let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
assert_eq!(80, first);
let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed);
let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
assert_eq!(80, compressed.first);
let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed);
assert_eq!(incomplete_slots, decompressed);
}
}