Remove Blobs and switch to Packets (#6937)

* Remove Blobs and switch to Packets

* Fix some gossip messages not respecting MTU size

* Failure to serialize is not fatal

* Add log macros

* Remove unused extern

* Apparently macro use is required

* Explicitly scope macro

* Fix test compile
This commit is contained in:
Sagar Dhawan
2019-11-14 10:24:53 -08:00
committed by GitHub
parent d6cbb02c92
commit f108f483b7
14 changed files with 279 additions and 712 deletions

View File

@@ -12,8 +12,9 @@
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
//!
//! Bank needs to provide an interface for us to query the stake weight
use crate::packet::limited_deserialize;
use crate::streamer::{PacketReceiver, PacketSender};
use crate::{
blob::{limited_deserialize, to_shared_blob, Blob, SharedBlob},
contact_info::ContactInfo,
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
@@ -23,7 +24,6 @@ use crate::{
repair_service::RepairType,
result::{Error, Result},
sendmmsg::{multicast, send_mmsg},
streamer::{BlobReceiver, BlobSender},
weighted_shuffle::{weighted_best, weighted_shuffle},
};
use bincode::{serialize, serialized_size};
@@ -36,6 +36,7 @@ use solana_net_utils::{
bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range,
multi_bind_in_range, PortRange,
};
use solana_perf::packet::{to_packets_with_destination, Packets};
use solana_sdk::{
clock::Slot,
pubkey::Pubkey,
@@ -64,9 +65,12 @@ pub const GOSSIP_SLEEP_MILLIS: u64 = 100;
/// the number of slots to respond with when responding to `Orphan` requests
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
/// The maximum size of a bloom filter
pub const MAX_BLOOM_SIZE: usize = 1030;
/// The maximum size of a protocol payload
const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64;
const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE;
/// The largest protocol header size
const MAX_PROTOCOL_HEADER_SIZE: u64 = 202;
#[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError {
@@ -109,7 +113,7 @@ impl fmt::Debug for Locality {
}
}
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Default, Deserialize, Serialize)]
pub struct PruneData {
/// Pubkey of the node that sent this prune data
pub pubkey: Pubkey,
@@ -892,7 +896,7 @@ impl ClusterInfo {
return self
.gossip
.pull
.build_crds_filters(&self.gossip.crds, Self::max_bloom_size())
.build_crds_filters(&self.gossip.crds, MAX_BLOOM_SIZE)
.into_iter()
.for_each(|filter| {
pulls.push((entrypoint.id, filter, entrypoint.gossip, self_info.clone()))
@@ -908,8 +912,8 @@ impl ClusterInfo {
fn split_gossip_messages(mut msgs: Vec<CrdsValue>) -> Vec<Vec<CrdsValue>> {
let mut messages = vec![];
while !msgs.is_empty() {
let mut size = 0;
let mut payload = vec![];
let mut size = serialized_size(&payload).expect("Couldn't check size");
while let Some(msg) = msgs.pop() {
let msg_size = msg.size();
if size + msg_size > MAX_PROTOCOL_PAYLOAD_SIZE as u64 {
@@ -931,24 +935,11 @@ impl ClusterInfo {
messages
}
// computes the maximum size for pull request blooms
pub fn max_bloom_size() -> usize {
let filter_size = serialized_size(&CrdsFilter::default())
.expect("unable to serialize default filter") as usize;
let protocol = Protocol::PullRequest(
CrdsFilter::default(),
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())),
);
let protocol_size =
serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize;
PACKET_DATA_SIZE - (protocol_size - filter_size)
}
fn new_pull_requests(&mut self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
let now = timestamp();
let mut pulls: Vec<_> = self
.gossip
.new_pull_request(now, stakes, Self::max_bloom_size())
.new_pull_request(now, stakes, MAX_BLOOM_SIZE)
.ok()
.into_iter()
.filter_map(|(peer, filters, me)| {
@@ -1006,14 +997,11 @@ impl ClusterInfo {
fn run_gossip(
obj: &Arc<RwLock<Self>>,
stakes: &HashMap<Pubkey, u64>,
blob_sender: &BlobSender,
sender: &PacketSender,
) -> Result<()> {
let reqs = obj.write().unwrap().gossip_request(&stakes);
let blobs = reqs
.into_iter()
.filter_map(|(remote_gossip_addr, req)| to_shared_blob(req, remote_gossip_addr).ok())
.collect();
blob_sender.send(blobs)?;
let packets = to_packets_with_destination(&reqs);
sender.send(packets)?;
Ok(())
}
@@ -1021,7 +1009,7 @@ impl ClusterInfo {
pub fn gossip(
obj: Arc<RwLock<Self>>,
bank_forks: Option<Arc<RwLock<BankForks>>>,
blob_sender: BlobSender,
sender: PacketSender,
exit: &Arc<AtomicBool>,
) -> JoinHandle<()> {
let exit = exit.clone();
@@ -1044,7 +1032,7 @@ impl ClusterInfo {
}
None => HashMap::new(),
};
let _ = Self::run_gossip(&obj, &stakes, &blob_sender);
let _ = Self::run_gossip(&obj, &stakes, &sender);
if exit.load(Ordering::Relaxed) {
return;
}
@@ -1065,13 +1053,20 @@ impl ClusterInfo {
.unwrap()
}
fn get_data_shred_as_blob(
fn get_data_shred_as_packet(
blocktree: &Arc<Blocktree>,
slot: Slot,
shred_index: u64,
) -> Result<Option<Blob>> {
let bytes = blocktree.get_data_shred(slot, shred_index)?;
Ok(bytes.map(|bytes| Blob::new(&bytes)))
dest: &SocketAddr,
) -> Result<Option<Packet>> {
let data = blocktree.get_data_shred(slot, shred_index)?;
Ok(data.map(|data| {
let mut packet = Packet::default();
packet.meta.size = data.len();
packet.meta.set_addr(dest);
packet.data.copy_from_slice(&data);
packet
}))
}
fn run_window_request(
@@ -1080,17 +1075,15 @@ impl ClusterInfo {
blocktree: Option<&Arc<Blocktree>>,
me: &ContactInfo,
slot: Slot,
blob_index: u64,
) -> Vec<SharedBlob> {
shred_index: u64,
) -> Packets {
if let Some(blocktree) = blocktree {
// Try to find the requested index in one of the slots
let blob = Self::get_data_shred_as_blob(blocktree, slot, blob_index);
let packet = Self::get_data_shred_as_packet(blocktree, slot, shred_index, from_addr);
if let Ok(Some(mut blob)) = blob {
if let Ok(Some(packet)) = packet {
inc_new_counter_debug!("cluster_info-window-request-ledger", 1);
blob.meta.set_addr(from_addr);
return vec![Arc::new(RwLock::new(blob))];
return Packets::new(vec![packet]);
}
}
@@ -1100,10 +1093,10 @@ impl ClusterInfo {
me.id,
from.id,
slot,
blob_index,
shred_index,
);
vec![]
Packets::default()
}
fn run_highest_window_request(
@@ -1111,7 +1104,7 @@ impl ClusterInfo {
blocktree: Option<&Arc<Blocktree>>,
slot: Slot,
highest_index: u64,
) -> Vec<SharedBlob> {
) -> Packets {
if let Some(blocktree) = blocktree {
// Try to find the requested index in one of the slots
let meta = blocktree.meta(slot);
@@ -1119,17 +1112,21 @@ impl ClusterInfo {
if let Ok(Some(meta)) = meta {
if meta.received > highest_index {
// meta.received must be at least 1 by this point
let blob = Self::get_data_shred_as_blob(blocktree, slot, meta.received - 1);
let packet = Self::get_data_shred_as_packet(
blocktree,
slot,
meta.received - 1,
from_addr,
);
if let Ok(Some(mut blob)) = blob {
blob.meta.set_addr(from_addr);
return vec![Arc::new(RwLock::new(blob))];
if let Ok(Some(packet)) = packet {
return Packets::new(vec![packet]);
}
}
}
}
vec![]
Packets::default()
}
fn run_orphan(
@@ -1137,20 +1134,20 @@ impl ClusterInfo {
blocktree: Option<&Arc<Blocktree>>,
mut slot: Slot,
max_responses: usize,
) -> Vec<SharedBlob> {
let mut res = vec![];
) -> Packets {
let mut res = Packets::default();
if let Some(blocktree) = blocktree {
// Try to find the next "n" parent slots of the input slot
while let Ok(Some(meta)) = blocktree.meta(slot) {
if meta.received == 0 {
break;
}
let blob = Self::get_data_shred_as_blob(blocktree, slot, meta.received - 1);
if let Ok(Some(mut blob)) = blob {
blob.meta.set_addr(from_addr);
res.push(Arc::new(RwLock::new(blob)));
let packet =
Self::get_data_shred_as_packet(blocktree, slot, meta.received - 1, from_addr);
if let Ok(Some(packet)) = packet {
res.packets.push(packet);
}
if meta.is_parent_set() && res.len() <= max_responses {
if meta.is_parent_set() && res.packets.len() <= max_responses {
slot = meta.parent_slot;
} else {
break;
@@ -1161,19 +1158,18 @@ impl ClusterInfo {
res
}
fn handle_blobs(
fn handle_packets(
me: &Arc<RwLock<Self>>,
blocktree: Option<&Arc<Blocktree>>,
stakes: &HashMap<Pubkey, u64>,
blobs: &[SharedBlob],
response_sender: &BlobSender,
packets: Packets,
response_sender: &PacketSender,
) {
// iter over the blobs, collect pulls separately and process everything else
let mut gossip_pull_data: Vec<PullData> = vec![];
blobs.iter().for_each(|blob| {
let blob = blob.read().unwrap();
let from_addr = blob.meta.addr();
limited_deserialize(&blob.data[..blob.meta.size])
packets.packets.iter().for_each(|packet| {
let from_addr = packet.meta.addr();
limited_deserialize(&packet.data[..packet.meta.size])
.into_iter()
.for_each(|request| match request {
Protocol::PullRequest(filter, caller) => {
@@ -1259,7 +1255,7 @@ impl ClusterInfo {
response_sender.send(Self::handle_pull_requests(me, gossip_pull_data));
}
fn handle_pull_requests(me: &Arc<RwLock<Self>>, requests: Vec<PullData>) -> Vec<SharedBlob> {
fn handle_pull_requests(me: &Arc<RwLock<Self>>, requests: Vec<PullData>) -> Packets {
// split the requests into addrs and filters
let mut caller_and_filters = vec![];
let mut addrs = vec![];
@@ -1274,24 +1270,27 @@ impl ClusterInfo {
.unwrap()
.gossip
.process_pull_requests(caller_and_filters, now);
let mut packets = Packets::default();
pull_responses
.into_iter()
.zip(addrs.into_iter())
.flat_map(|(response, from_addr)| {
.for_each(|(response, from_addr)| {
let len = response.len();
trace!("get updates since response {}", len);
inc_new_counter_debug!("cluster_info-pull_request-rsp", len);
Self::split_gossip_messages(response)
.into_iter()
.filter_map(move |payload| {
.for_each(|payload| {
let protocol = Protocol::PullResponse(self_id, payload);
// The remote node may not know its public IP:PORT. Instead of responding to the caller's
// gossip addr, respond to the origin addr. The last origin addr is picked from the list of
// addrs.
to_shared_blob(protocol, from_addr).ok()
packets
.packets
.push(Packet::from_data(&from_addr, protocol))
})
})
.collect()
});
packets
}
fn handle_pull_response(me: &Arc<RwLock<Self>>, from: &Pubkey, data: Vec<CrdsValue>) {
@@ -1314,7 +1313,7 @@ impl ClusterInfo {
from: &Pubkey,
data: Vec<CrdsValue>,
stakes: &HashMap<Pubkey, u64>,
) -> Vec<SharedBlob> {
) -> Packets {
let self_id = me.read().unwrap().gossip.id;
inc_new_counter_debug!("cluster_info-push_message", 1);
@@ -1331,9 +1330,9 @@ impl ClusterInfo {
.gossip
.prune_received_cache(updated_labels, stakes);
let mut rsp: Vec<_> = prunes_map
let rsp: Vec<_> = prunes_map
.into_iter()
.map(|(from, prune_set)| {
.filter_map(|(from, prune_set)| {
inc_new_counter_debug!("cluster_info-push_message-prunes", prune_set.len());
me.read().unwrap().lookup(&from).cloned().and_then(|ci| {
let mut prune_msg = PruneData {
@@ -1345,25 +1344,22 @@ impl ClusterInfo {
};
prune_msg.sign(&me.read().unwrap().keypair);
let rsp = Protocol::PruneMessage(self_id, prune_msg);
to_shared_blob(rsp, ci.gossip).ok()
Some((ci.gossip, rsp))
})
})
.flatten()
.collect();
let mut packets = to_packets_with_destination(&rsp);
if !rsp.is_empty() {
if !packets.is_empty() {
let pushes: Vec<_> = me.write().unwrap().new_push_requests();
inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len());
let mut blobs: Vec<_> = pushes
.into_iter()
.filter_map(|(remote_gossip_addr, req)| {
to_shared_blob(req, remote_gossip_addr).ok()
})
.collect();
rsp.append(&mut blobs);
rsp
pushes.into_iter().for_each(|(remote_gossip_addr, req)| {
let p = Packet::from_data(&remote_gossip_addr, &req);
packets.packets.push(p);
});
packets
} else {
vec![]
Packets::default()
}
}
@@ -1381,7 +1377,7 @@ impl ClusterInfo {
from_addr: &SocketAddr,
blocktree: Option<&Arc<Blocktree>>,
request: Protocol,
) -> Vec<SharedBlob> {
) -> Packets {
let now = Instant::now();
//TODO this doesn't depend on cluster_info module, could be moved
@@ -1396,7 +1392,7 @@ impl ClusterInfo {
self_id, from.id,
);
inc_new_counter_debug!("cluster_info-handle-repair--eq", 1);
return vec![];
return Packets::default();
}
me.write()
@@ -1456,15 +1452,12 @@ impl ClusterInfo {
obj: &Arc<RwLock<Self>>,
blocktree: Option<&Arc<Blocktree>>,
bank_forks: Option<&Arc<RwLock<BankForks>>>,
requests_receiver: &BlobReceiver,
response_sender: &BlobSender,
requests_receiver: &PacketReceiver,
response_sender: &PacketSender,
) -> Result<()> {
//TODO cache connections
let timeout = Duration::new(1, 0);
let mut reqs = requests_receiver.recv_timeout(timeout)?;
while let Ok(mut more) = requests_receiver.try_recv() {
reqs.append(&mut more);
}
let reqs = requests_receiver.recv_timeout(timeout)?;
let stakes: HashMap<_, _> = match bank_forks {
Some(ref bank_forks) => {
@@ -1473,15 +1466,15 @@ impl ClusterInfo {
None => HashMap::new(),
};
Self::handle_blobs(obj, blocktree, &stakes, &reqs, response_sender);
Self::handle_packets(obj, blocktree, &stakes, reqs, response_sender);
Ok(())
}
pub fn listen(
me: Arc<RwLock<Self>>,
blocktree: Option<Arc<Blocktree>>,
bank_forks: Option<Arc<RwLock<BankForks>>>,
requests_receiver: BlobReceiver,
response_sender: BlobSender,
requests_receiver: PacketReceiver,
response_sender: PacketSender,
exit: &Arc<AtomicBool>,
) -> JoinHandle<()> {
let exit = exit.clone();
@@ -2005,10 +1998,9 @@ mod tests {
);
assert!(!rv.is_empty());
let rv: Vec<Shred> = rv
.packets
.into_iter()
.filter_map(|b| {
Shred::new_from_serialized_shred(b.read().unwrap().data.to_vec()).ok()
})
.filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok())
.collect();
assert_eq!(rv[0].index(), 1);
assert_eq!(rv[0].slot(), 2);
@@ -2039,10 +2031,9 @@ mod tests {
let rv =
ClusterInfo::run_highest_window_request(&socketaddr_any!(), Some(&blocktree), 2, 1);
let rv: Vec<Shred> = rv
.packets
.into_iter()
.filter_map(|b| {
Shred::new_from_serialized_shred(b.read().unwrap().data.to_vec()).ok()
})
.filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok())
.collect();
assert!(!rv.is_empty());
let index = blocktree.meta(2).unwrap().unwrap().received - 1;
@@ -2084,16 +2075,22 @@ mod tests {
// For slot 3, we should return the highest blobs from slots 3, 2, 1 respectively
// for this request
let rv: Vec<_> = ClusterInfo::run_orphan(&socketaddr_any!(), Some(&blocktree), 3, 5)
.packets
.iter()
.map(|b| b.read().unwrap().clone())
.map(|b| b.clone())
.collect();
let expected: Vec<_> = (1..=3)
.rev()
.map(|slot| {
let index = blocktree.meta(slot).unwrap().unwrap().received - 1;
ClusterInfo::get_data_shred_as_blob(&blocktree, slot, index)
.unwrap()
.unwrap()
ClusterInfo::get_data_shred_as_packet(
&blocktree,
slot,
index,
&socketaddr_any!(),
)
.unwrap()
.unwrap()
})
.collect();
assert_eq!(rv, expected)
@@ -2200,7 +2197,7 @@ mod tests {
let (_, _, val) = cluster_info
.gossip
.new_pull_request(timestamp(), &HashMap::new(), ClusterInfo::max_bloom_size())
.new_pull_request(timestamp(), &HashMap::new(), MAX_BLOOM_SIZE)
.ok()
.unwrap();
assert!(val.verify());
@@ -2457,7 +2454,7 @@ mod tests {
check_pull_request_size(CrdsFilter::new_rand(1000, 10));
check_pull_request_size(CrdsFilter::new_rand(1000, 1000));
check_pull_request_size(CrdsFilter::new_rand(100000, 1000));
check_pull_request_size(CrdsFilter::new_rand(100000, ClusterInfo::max_bloom_size()));
check_pull_request_size(CrdsFilter::new_rand(100000, MAX_BLOOM_SIZE));
}
fn check_pull_request_size(filter: CrdsFilter) {
@@ -2543,4 +2540,77 @@ mod tests {
assert_eq!(1, pulls.len() as u64);
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
}
#[test]
fn test_max_bloom_size() {
assert_eq!(MAX_BLOOM_SIZE, max_bloom_size());
}
#[test]
fn test_protocol_size() {
let contact_info = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
let dummy_vec =
vec![CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); 10];
let dummy_vec_size = serialized_size(&dummy_vec).unwrap();
let mut max_protocol_size;
max_protocol_size =
serialized_size(&Protocol::PullRequest(CrdsFilter::default(), contact_info)).unwrap()
- serialized_size(&CrdsFilter::default()).unwrap();
max_protocol_size = max_protocol_size.max(
serialized_size(&Protocol::PullResponse(
Pubkey::default(),
dummy_vec.clone(),
))
.unwrap()
- dummy_vec_size,
);
max_protocol_size = max_protocol_size.max(
serialized_size(&Protocol::PushMessage(Pubkey::default(), dummy_vec)).unwrap()
- dummy_vec_size,
);
max_protocol_size = max_protocol_size.max(
serialized_size(&Protocol::PruneMessage(
Pubkey::default(),
PruneData::default(),
))
.unwrap()
- serialized_size(&PruneData::default()).unwrap(),
);
// make sure repairs are always smaller than the gossip messages
assert!(
max_protocol_size
> serialized_size(&Protocol::RequestWindowIndex(ContactInfo::default(), 0, 0))
.unwrap()
);
assert!(
max_protocol_size
> serialized_size(&Protocol::RequestHighestWindowIndex(
ContactInfo::default(),
0,
0
))
.unwrap()
);
assert!(
max_protocol_size
> serialized_size(&Protocol::RequestOrphan(ContactInfo::default(), 0)).unwrap()
);
// finally assert the header size estimation is correct
assert_eq!(MAX_PROTOCOL_HEADER_SIZE, max_protocol_size);
}
// computes the maximum size for pull request blooms
fn max_bloom_size() -> usize {
let filter_size = serialized_size(&CrdsFilter::default())
.expect("unable to serialize default filter") as usize;
let protocol = Protocol::PullRequest(
CrdsFilter::default(),
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())),
);
let protocol_size =
serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize;
PACKET_DATA_SIZE - (protocol_size - filter_size)
}
}