Randomize avalanche broadcast peer table for each blob (#4529)

* fix clippy warnings
This commit is contained in:
Pankaj Garg
2019-06-03 20:38:05 -07:00
committed by GitHub
parent 41daf1ef0c
commit 70a16e91a5
5 changed files with 151 additions and 230 deletions

View File

@ -1,7 +1,7 @@
//! A stage to broadcast data from a leader node to validators
//!
use crate::blocktree::Blocktree;
use crate::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT};
use crate::cluster_info::{ClusterInfo, ClusterInfoError};
use crate::entry::EntrySlice;
use crate::erasure::CodingGenerator;
use crate::packet::index_blobs_with_genesis;
@ -9,13 +9,10 @@ use crate::poh_recorder::WorkingBankEntries;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::staking_utils;
use rand::SeedableRng;
use rand_chacha::ChaChaRng;
use rayon::prelude::*;
use rayon::ThreadPool;
use solana_metrics::{
datapoint, inc_new_counter_debug, inc_new_counter_error, inc_new_counter_info,
inc_new_counter_warn,
};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::duration_as_ms;
@ -86,18 +83,6 @@ impl Broadcast {
}
}
let bank_epoch = bank.get_stakers_epoch(bank.slot());
let mut seed = [0; 32];
seed[0..8].copy_from_slice(&bank.slot().to_le_bytes());
let mut broadcast_table = cluster_info.read().unwrap().sorted_tvu_peers(
staking_utils::staked_nodes_at_epoch(&bank, bank_epoch).as_ref(),
ChaChaRng::from_seed(seed),
);
inc_new_counter_warn!("broadcast_service-num_peers", broadcast_table.len() + 1);
// Layer 1, leader nodes are limited to the fanout size.
broadcast_table.truncate(DATA_PLANE_FANOUT);
inc_new_counter_info!("broadcast_service-entries_received", num_entries);
let to_blobs_start = Instant::now();
@ -127,9 +112,7 @@ impl Broadcast {
bank.parent().map_or(0, |parent| parent.slot()),
);
let contains_last_tick = last_tick == max_tick_height;
if contains_last_tick {
if last_tick == max_tick_height {
blobs.last().unwrap().write().unwrap().set_is_last_in_slot();
}
@ -141,13 +124,26 @@ impl Broadcast {
let broadcast_start = Instant::now();
// Send out data
ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?;
let bank_epoch = bank.get_stakers_epoch(bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
inc_new_counter_debug!("streamer-broadcast-sent", blobs.len());
if let Some(nodes) = stakes.as_ref() {
if nodes.len() > 1 {
// Send out data
cluster_info
.read()
.unwrap()
.broadcast(sock, &blobs, stakes.as_ref())?;
// send out erasures
ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?;
inc_new_counter_debug!("streamer-broadcast-sent", blobs.len());
// send out erasures
cluster_info
.read()
.unwrap()
.broadcast(sock, &coding, stakes.as_ref())?;
}
}
self.update_broadcast_stats(
duration_as_ms(&broadcast_start.elapsed()),

View File

@ -28,10 +28,13 @@ use crate::weighted_shuffle::weighted_shuffle;
use bincode::{deserialize, serialize};
use core::cmp;
use itertools::Itertools;
use rand::SeedableRng;
use rand::{thread_rng, Rng};
use rand_chacha::ChaChaRng;
use rayon::prelude::*;
use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error};
use solana_metrics::{
datapoint_debug, inc_new_counter_debug, inc_new_counter_error, inc_new_counter_warn,
};
use solana_netutil::{
bind_in_range, bind_to, find_available_port_in_range, multi_bind_in_range, PortRange,
};
@ -44,7 +47,6 @@ use solana_sdk::transaction::Transaction;
use std::cmp::min;
use std::collections::{BTreeSet, HashMap};
use std::fmt;
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
@ -526,7 +528,7 @@ impl ClusterInfo {
}
/// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list
fn shuffle_peers_and_index<S: std::hash::BuildHasher>(
pub fn shuffle_peers_and_index<S: std::hash::BuildHasher>(
&self,
stakes: Option<&HashMap<Pubkey, u64, S>>,
rng: ChaChaRng,
@ -538,13 +540,11 @@ impl ClusterInfo {
let peers: Vec<_> = contacts_and_stakes
.into_iter()
.enumerate()
.filter_map(|(i, (_, peer))| {
.map(|(i, (_, peer))| {
if peer.id == self.id() {
index = i;
None
} else {
Some(peer)
}
peer
})
.collect();
(index, peers)
@ -709,33 +709,32 @@ impl ClusterInfo {
/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
pub fn broadcast(
id: &Pubkey,
contains_last_tick: bool,
broadcast_table: &[ContactInfo],
&self,
s: &UdpSocket,
blobs: &[SharedBlob],
stakes: Option<&HashMap<Pubkey, u64>>,
) -> Result<()> {
if broadcast_table.is_empty() {
debug!("{}:not enough peers in cluster_info table", id);
inc_new_counter_error!("cluster_info-broadcast-not_enough_peers_error", 1);
Err(ClusterInfoError::NoPeers)?;
}
let mut last_err = Ok(());
let mut broadcast_table_len = 0;
blobs.iter().for_each(|b| {
let blob = b.read().unwrap();
let broadcast_table = self.sorted_tvu_peers(stakes, ChaChaRng::from_seed(blob.seed()));
broadcast_table_len = cmp::max(broadcast_table_len, broadcast_table.len());
let orders = Self::create_broadcast_orders(contains_last_tick, blobs, broadcast_table);
trace!("broadcast orders table {}", orders.len());
let errs = Self::send_orders(id, s, orders);
for e in errs {
if let Err(e) = &e {
trace!("{}: broadcast result {:?}", id, e);
if !broadcast_table.is_empty() {
if let Err(e) = s.send_to(&blob.data[..blob.meta.size], &broadcast_table[0].tvu) {
trace!("{}: broadcast result {:?}", self.id(), e);
last_err = Err(e);
}
}
e?;
}
});
last_err?;
inc_new_counter_debug!("cluster_info-broadcast-max_idx", blobs.len());
if broadcast_table_len != 0 {
inc_new_counter_warn!("broadcast_service-num_peers", broadcast_table_len + 1);
}
Ok(())
}
@ -788,94 +787,6 @@ impl ClusterInfo {
Ok(())
}
fn send_orders(
id: &Pubkey,
s: &UdpSocket,
orders: Vec<(SharedBlob, Vec<&ContactInfo>)>,
) -> Vec<io::Result<usize>> {
orders
.into_iter()
.flat_map(|(b, vs)| {
let blob = b.read().unwrap();
let ids_and_tvus = if log_enabled!(log::Level::Trace) {
let v_ids = vs.iter().map(|v| v.id);
let tvus = vs.iter().map(|v| v.tvu);
let ids_and_tvus = v_ids.zip(tvus).collect();
trace!(
"{}: BROADCAST idx: {} sz: {} to {:?} coding: {}",
id,
blob.index(),
blob.meta.size,
ids_and_tvus,
blob.is_coding()
);
ids_and_tvus
} else {
vec![]
};
assert!(blob.meta.size <= BLOB_SIZE);
let send_errs_for_blob: Vec<_> = vs
.iter()
.map(move |v| {
let e = s.send_to(&blob.data[..blob.meta.size], &v.tvu);
trace!(
"{}: done broadcast {} to {:?}",
id,
blob.meta.size,
ids_and_tvus
);
e
})
.collect();
send_errs_for_blob
})
.collect()
}
pub fn create_broadcast_orders<'a, T>(
contains_last_tick: bool,
blobs: &[T],
broadcast_table: &'a [ContactInfo],
) -> Vec<(T, Vec<&'a ContactInfo>)>
where
T: Clone,
{
// enumerate all the blobs in the window, those are the indices
// transmit them to nodes, starting from a different node.
if blobs.is_empty() {
return vec![];
}
let mut orders = Vec::with_capacity(blobs.len());
let x = thread_rng().gen_range(0, broadcast_table.len());
for (i, blob) in blobs.iter().enumerate() {
let br_idx = (x + i) % broadcast_table.len();
trace!("broadcast order data br_idx {}", br_idx);
orders.push((blob.clone(), vec![&broadcast_table[br_idx]]));
}
if contains_last_tick {
// Broadcast the last tick to everyone on the network so it doesn't get dropped
// (Need to maximize probability the next leader in line sees this handoff tick
// despite packet drops)
// If we had a tick at max_tick_height, then we know it must be the last
// Blob in the broadcast, There cannot be an entry that got sent after the
// last tick, guaranteed by the PohService).
orders.push((
blobs.last().unwrap().clone(),
broadcast_table.iter().collect(),
));
}
orders
}
pub fn window_index_request_bytes(&self, slot: u64, blob_index: u64) -> Result<Vec<u8>> {
let req = Protocol::RequestWindowIndex(self.my_data().clone(), slot, blob_index);
let out = serialize(&req)?;
@ -1511,16 +1422,11 @@ impl ClusterInfo {
/// 1 - also check if there are nodes in the next layer and repeat the layer 1 to layer 2 logic
/// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake (Bank Balance)
pub fn compute_retransmit_peers<S: std::hash::BuildHasher>(
stakes: Option<&HashMap<Pubkey, u64, S>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
pub fn compute_retransmit_peers(
fanout: usize,
rng: ChaChaRng,
my_index: usize,
peers: Vec<ContactInfo>,
) -> (Vec<ContactInfo>, Vec<ContactInfo>) {
let (my_index, peers) = cluster_info
.read()
.unwrap()
.shuffle_peers_and_index(stakes, rng);
//calc num_layers and num_neighborhoods using the total number of nodes
let (num_layers, layer_indices) = ClusterInfo::describe_data_plane(peers.len(), fanout);

View File

@ -399,6 +399,13 @@ impl Blob {
LittleEndian::write_u64(&mut self.data[INDEX_RANGE], ix);
}
pub fn seed(&self) -> [u8; 32] {
let mut seed = [0; 32];
seed[0..8].copy_from_slice(&self.index().to_le_bytes());
seed[8..16].copy_from_slice(&self.slot().to_le_bytes());
seed
}
/// sender id, we use this for identifying if its a blob from the leader that we should
/// retransmit. eventually blobs should have a signature that we can use for spam filtering
pub fn id(&self) -> Pubkey {

View File

@ -44,18 +44,15 @@ fn retransmit(
let r_bank = bank_forks.read().unwrap().working_bank();
let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot());
for blob in &blobs {
let slot = blob.read().unwrap().slot();
let mut seed = [0; 32];
seed[0..8].copy_from_slice(&slot.to_le_bytes());
let (neighbors, children) = avalanche_topology_cache.entry(slot).or_insert_with(|| {
cache_history.push(slot);
compute_retransmit_peers(
staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch).as_ref(),
cluster_info,
DATA_PLANE_FANOUT,
ChaChaRng::from_seed(seed),
)
});
let (my_index, mut peers) = cluster_info.read().unwrap().shuffle_peers_and_index(
staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch).as_ref(),
ChaChaRng::from_seed(blob.read().unwrap().seed()),
);
peers.remove(my_index);
let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, peers);
let leader = leader_schedule_cache
.slot_leader_at(blob.read().unwrap().slot(), Some(r_bank.as_ref()));
if blob.read().unwrap().meta.forward {