revises turbine peers shuffling order (#20480)

Turbine randomly shuffles cluster nodes on a broadcast tree for each
shred. This requires knowing the stakes and nodes' contact-infos (from
gossip).

However gossip is subject to partitioning and propogation delays.
Additionally unstaked nodes may join and leave the cluster at any
moment, changing the cluster view from one node to another.

This commit:
* Always arranges the unstaked nodes at the bottom of turbine broadcast
  tree.
* Staked nodes are always included regardless of if their contact-info
  is available in gossip or not.
* Uses the unbiased WeightedShuffle construct for shuffling nodes.
This commit is contained in:
behzad nouri
2021-10-14 15:09:36 +00:00
committed by GitHub
parent 6649dfa899
commit 0c0384ec32
7 changed files with 355 additions and 64 deletions

View File

@ -17,7 +17,7 @@ use {
Sender as CrossbeamSender,
},
itertools::Itertools,
solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError},
solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT},
solana_ledger::{blockstore::Blockstore, shred::Shred},
solana_measure::measure::Measure,
solana_metrics::{inc_new_counter_error, inc_new_counter_info},
@ -33,6 +33,7 @@ use {
},
std::{
collections::HashMap,
iter::repeat,
net::UdpSocket,
sync::{
atomic::{AtomicBool, Ordering},
@ -412,8 +413,6 @@ pub fn broadcast_shreds(
) -> Result<()> {
let mut result = Ok(());
let mut shred_select = Measure::start("shred_select");
// Only the leader broadcasts shreds.
let leader = cluster_info.id();
let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
@ -427,12 +426,13 @@ pub fn broadcast_shreds(
cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info);
update_peer_stats(&cluster_nodes, last_datapoint_submit);
let root_bank = root_bank.clone();
shreds.filter_map(move |shred| {
let seed = shred.seed(leader, &root_bank);
let node = cluster_nodes.get_broadcast_peer(seed)?;
socket_addr_space
.check(&node.tvu)
.then(|| (&shred.payload, node.tvu))
shreds.flat_map(move |shred| {
repeat(&shred.payload).zip(cluster_nodes.get_broadcast_addrs(
shred,
&root_bank,
DATA_PLANE_FANOUT,
socket_addr_space,
))
})
})
.collect();

View File

@ -3,6 +3,7 @@ use {
crate::cluster_nodes::ClusterNodesCache,
itertools::Itertools,
solana_entry::entry::Entry,
solana_gossip::cluster_info::DATA_PLANE_FANOUT,
solana_ledger::shred::Shredder,
solana_sdk::{
hash::Hash,
@ -246,6 +247,11 @@ impl BroadcastRun for BroadcastDuplicatesRun {
(bank_forks.root_bank(), bank_forks.working_bank())
};
let self_pubkey = cluster_info.id();
let nodes: Vec<_> = cluster_info
.all_peers()
.into_iter()
.map(|(node, _)| node)
.collect();
// Creat cluster partition.
let cluster_partition: HashSet<Pubkey> = {
@ -273,8 +279,11 @@ impl BroadcastRun for BroadcastDuplicatesRun {
let packets: Vec<_> = shreds
.iter()
.filter_map(|shred| {
let seed = shred.seed(self_pubkey, &root_bank);
let node = cluster_nodes.get_broadcast_peer(seed)?;
let addr = cluster_nodes
.get_broadcast_addrs(shred, &root_bank, DATA_PLANE_FANOUT, socket_addr_space)
.first()
.copied()?;
let node = nodes.iter().find(|node| node.tvu == addr)?;
if !socket_addr_space.check(&node.tvu) {
return None;
}

View File

@ -2,22 +2,31 @@ use {
crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage},
itertools::Itertools,
lru::LruCache,
rand::{Rng, SeedableRng},
rand_chacha::ChaChaRng,
solana_gossip::{
cluster_info::{compute_retransmit_peers, ClusterInfo},
contact_info::ContactInfo,
crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
weighted_shuffle::{weighted_best, weighted_shuffle},
weighted_shuffle::{
weighted_best, weighted_sample_single, weighted_shuffle, WeightedShuffle,
},
},
solana_ledger::shred::Shred,
solana_runtime::bank::Bank,
solana_sdk::{
clock::{Epoch, Slot},
feature_set,
pubkey::Pubkey,
timing::timestamp,
},
solana_streamer::socket::SocketAddrSpace,
std::{
any::TypeId,
cmp::Reverse,
collections::HashMap,
marker::PhantomData,
net::SocketAddr,
ops::Deref,
sync::{Arc, Mutex},
time::{Duration, Instant},
@ -41,6 +50,9 @@ pub struct ClusterNodes<T> {
// All staked nodes + other known tvu-peers + the node itself;
// sorted by (stake, pubkey) in descending order.
nodes: Vec<Node>,
// Cumulative stakes (excluding the node itself), used for sampling
// broadcast peers.
cumulative_weights: Vec<u64>,
// Weights and indices for sampling peers. weighted_{shuffle,best} expect
// weights >= 1. For backward compatibility we use max(1, stake) for
// weights and exclude nodes with no contact-info.
@ -102,9 +114,68 @@ impl ClusterNodes<BroadcastStage> {
new_cluster_nodes(cluster_info, stakes)
}
pub(crate) fn get_broadcast_addrs(
&self,
shred: &Shred,
root_bank: &Bank,
fanout: usize,
socket_addr_space: &SocketAddrSpace,
) -> Vec<SocketAddr> {
const MAX_CONTACT_INFO_AGE: Duration = Duration::from_secs(2 * 60);
let shred_seed = shred.seed(self.pubkey, root_bank);
if !enable_turbine_peers_shuffle_patch(shred.slot(), root_bank) {
if let Some(node) = self.get_broadcast_peer(shred_seed) {
if socket_addr_space.check(&node.tvu) {
return vec![node.tvu];
}
}
return Vec::default();
}
let mut rng = ChaChaRng::from_seed(shred_seed);
let index = match weighted_sample_single(&mut rng, &self.cumulative_weights) {
None => return Vec::default(),
Some(index) => index,
};
if let Some(node) = self.nodes[index].contact_info() {
let now = timestamp();
let age = Duration::from_millis(now.saturating_sub(node.wallclock));
if age < MAX_CONTACT_INFO_AGE
&& ContactInfo::is_valid_address(&node.tvu, socket_addr_space)
{
return vec![node.tvu];
}
}
let nodes: Vec<_> = self
.nodes
.iter()
.filter(|node| node.pubkey() != self.pubkey)
.collect();
if nodes.is_empty() {
return Vec::default();
}
let mut rng = ChaChaRng::from_seed(shred_seed);
let nodes = shuffle_nodes(&mut rng, &nodes);
let (neighbors, children) = compute_retransmit_peers(fanout, 0, &nodes);
neighbors[..1]
.iter()
.filter_map(|node| Some(node.contact_info()?.tvu))
.chain(
neighbors[1..]
.iter()
.filter_map(|node| Some(node.contact_info()?.tvu_forwards)),
)
.chain(
children
.iter()
.filter_map(|node| Some(node.contact_info()?.tvu)),
)
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
.collect()
}
/// Returns the root of turbine broadcast tree, which the leader sends the
/// shred to.
pub(crate) fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> {
fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> {
if self.index.is_empty() {
None
} else {
@ -118,14 +189,82 @@ impl ClusterNodes<BroadcastStage> {
}
impl ClusterNodes<RetransmitStage> {
pub(crate) fn get_retransmit_peers(
pub(crate) fn get_retransmit_addrs(
&self,
slot_leader: Pubkey,
shred: &Shred,
root_bank: &Bank,
fanout: usize,
) -> Vec<SocketAddr> {
let (neighbors, children) =
self.get_retransmit_peers(slot_leader, shred, root_bank, fanout);
// If the node is on the critical path (i.e. the first node in each
// neighborhood), it should send the packet to tvu socket of its
// children and also tvu_forward socket of its neighbors. Otherwise it
// should only forward to tvu_forwards socket of its children.
if neighbors[0].pubkey() != self.pubkey {
return children
.iter()
.filter_map(|node| Some(node.contact_info()?.tvu_forwards))
.collect();
}
// First neighbor is this node itself, so skip it.
neighbors[1..]
.iter()
.filter_map(|node| Some(node.contact_info()?.tvu_forwards))
.chain(
children
.iter()
.filter_map(|node| Some(node.contact_info()?.tvu)),
)
.collect()
}
fn get_retransmit_peers(
&self,
slot_leader: Pubkey,
shred: &Shred,
root_bank: &Bank,
fanout: usize,
) -> (
Vec<&Node>, // neighbors
Vec<&Node>, // children
) {
let shred_seed = shred.seed(slot_leader, root_bank);
if !enable_turbine_peers_shuffle_patch(shred.slot(), root_bank) {
return self.get_retransmit_peers_compat(shred_seed, fanout, slot_leader);
}
// Exclude slot leader from list of nodes.
let nodes: Vec<_> = if slot_leader == self.pubkey {
error!("retransmit from slot leader: {}", slot_leader);
self.nodes.iter().collect()
} else {
self.nodes
.iter()
.filter(|node| node.pubkey() != slot_leader)
.collect()
};
let mut rng = ChaChaRng::from_seed(shred_seed);
let nodes = shuffle_nodes(&mut rng, &nodes);
let self_index = nodes
.iter()
.position(|node| node.pubkey() == self.pubkey)
.unwrap();
let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &nodes);
// Assert that the node itself is included in the set of neighbors, at
// the right offset.
debug_assert_eq!(neighbors[self_index % fanout].pubkey(), self.pubkey);
(neighbors, children)
}
fn get_retransmit_peers_compat(
&self,
shred_seed: [u8; 32],
fanout: usize,
slot_leader: Pubkey,
) -> (
Vec<&ContactInfo>, // neighbors
Vec<&ContactInfo>, // children
Vec<&Node>, // neighbors
Vec<&Node>, // children
) {
// Exclude leader from list of nodes.
let (weights, index): (Vec<u64>, Vec<usize>) = if slot_leader == self.pubkey {
@ -153,16 +292,36 @@ impl ClusterNodes<RetransmitStage> {
self.nodes[neighbors[self_index % fanout]].pubkey(),
self.pubkey
);
let get_contact_infos = |index: Vec<usize>| -> Vec<&ContactInfo> {
index
.into_iter()
.map(|i| self.nodes[i].contact_info().unwrap())
.collect()
};
(get_contact_infos(neighbors), get_contact_infos(children))
let neighbors = neighbors.into_iter().map(|i| &self.nodes[i]).collect();
let children = children.into_iter().map(|i| &self.nodes[i]).collect();
(neighbors, children)
}
}
fn build_cumulative_weights(self_pubkey: Pubkey, nodes: &[Node]) -> Vec<u64> {
let cumulative_stakes: Vec<_> = nodes
.iter()
.scan(0, |acc, node| {
if node.pubkey() != self_pubkey {
*acc += node.stake;
}
Some(*acc)
})
.collect();
if cumulative_stakes.last() != Some(&0) {
return cumulative_stakes;
}
nodes
.iter()
.scan(0, |acc, node| {
if node.pubkey() != self_pubkey {
*acc += 1;
}
Some(*acc)
})
.collect()
}
fn new_cluster_nodes<T: 'static>(
cluster_info: &ClusterInfo,
stakes: &HashMap<Pubkey, u64>,
@ -170,6 +329,11 @@ fn new_cluster_nodes<T: 'static>(
let self_pubkey = cluster_info.id();
let nodes = get_nodes(cluster_info, stakes);
let broadcast = TypeId::of::<T>() == TypeId::of::<BroadcastStage>();
let cumulative_weights = if broadcast {
build_cumulative_weights(self_pubkey, &nodes)
} else {
Vec::default()
};
// For backward compatibility:
// * nodes which do not have contact-info are excluded.
// * stakes are floored at 1.
@ -187,6 +351,7 @@ fn new_cluster_nodes<T: 'static>(
ClusterNodes {
pubkey: self_pubkey,
nodes,
cumulative_weights,
index,
_phantom: PhantomData::default(),
}
@ -225,6 +390,44 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
.collect()
}
fn enable_turbine_peers_shuffle_patch(shred_slot: Slot, root_bank: &Bank) -> bool {
let feature_slot = root_bank
.feature_set
.activated_slot(&feature_set::turbine_peers_shuffle::id());
match feature_slot {
None => false,
Some(feature_slot) => {
let epoch_schedule = root_bank.epoch_schedule();
let feature_epoch = epoch_schedule.get_epoch(feature_slot);
let shred_epoch = epoch_schedule.get_epoch(shred_slot);
feature_epoch < shred_epoch
}
}
}
// Shuffles nodes w.r.t their stakes.
// Unstaked nodes will always appear at the very end.
fn shuffle_nodes<'a, R: Rng>(rng: &mut R, nodes: &[&'a Node]) -> Vec<&'a Node> {
// Nodes are sorted by (stake, pubkey) in descending order.
let stakes: Vec<u64> = nodes
.iter()
.map(|node| node.stake)
.take_while(|stake| *stake > 0)
.collect();
let num_staked = stakes.len();
let mut out: Vec<_> = WeightedShuffle::new(rng, &stakes)
.unwrap()
.map(|i| nodes[i])
.collect();
let weights = vec![1; nodes.len() - num_staked];
out.extend(
WeightedShuffle::new(rng, &weights)
.unwrap()
.map(|i| nodes[i + num_staked]),
);
out
}
impl<T> ClusterNodesCache<T> {
pub fn new(
// Capacity of underlying LRU-cache in terms of number of epochs.
@ -306,6 +509,7 @@ impl<T> Default for ClusterNodes<T> {
Self {
pubkey: Pubkey::default(),
nodes: Vec::default(),
cumulative_weights: Vec::default(),
index: Vec::default(),
_phantom: PhantomData::default(),
}
@ -458,15 +662,15 @@ mod tests {
let (neighbors_indices, children_indices) =
compute_retransmit_peers(fanout, self_index, &shuffled_index);
let (neighbors, children) =
cluster_nodes.get_retransmit_peers(shred_seed, fanout, slot_leader);
cluster_nodes.get_retransmit_peers_compat(shred_seed, fanout, slot_leader);
assert_eq!(children.len(), children_indices.len());
for (node, index) in children.into_iter().zip(children_indices) {
assert_eq!(*node, peers[index]);
assert_eq!(*node.contact_info().unwrap(), peers[index]);
}
assert_eq!(neighbors.len(), neighbors_indices.len());
assert_eq!(neighbors[0].id, peers[neighbors_indices[0]].id);
assert_eq!(neighbors[0].pubkey(), peers[neighbors_indices[0]].id);
for (node, index) in neighbors.into_iter().zip(neighbors_indices).skip(1) {
assert_eq!(*node, peers[index]);
assert_eq!(*node.contact_info().unwrap(), peers[index]);
}
}
}

View File

@ -17,7 +17,10 @@ use {
lru::LruCache,
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_client::rpc_response::SlotUpdate,
solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT},
solana_gossip::{
cluster_info::{ClusterInfo, DATA_PLANE_FANOUT},
contact_info::ContactInfo,
},
solana_ledger::{
shred::Shred,
{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
@ -28,6 +31,7 @@ use {
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp},
solana_streamer::sendmmsg::{multi_target_send, SendPktsError},
std::{
collections::{BTreeSet, HashSet},
net::UdpSocket,
@ -215,7 +219,6 @@ fn retransmit(
epoch_cache_update.stop();
stats.epoch_cache_update += epoch_cache_update.as_us();
let my_id = cluster_info.id();
let socket_addr_space = cluster_info.socket_addr_space();
let retransmit_shred = |shred: Shred, socket: &UdpSocket| {
if should_skip_retransmit(&shred, shreds_received) {
@ -253,37 +256,29 @@ fn retransmit(
};
let cluster_nodes =
cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info);
let shred_seed = shred.seed(slot_leader, &root_bank);
let (neighbors, children) =
cluster_nodes.get_retransmit_peers(shred_seed, DATA_PLANE_FANOUT, slot_leader);
let anchor_node = neighbors[0].id == my_id;
let addrs: Vec<_> = cluster_nodes
.get_retransmit_addrs(slot_leader, &shred, &root_bank, DATA_PLANE_FANOUT)
.into_iter()
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
.collect();
compute_turbine_peers.stop();
stats
.compute_turbine_peers_total
.fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed);
let mut retransmit_time = Measure::start("retransmit_to");
// If the node is on the critical path (i.e. the first node in each
// neighborhood), it should send the packet to tvu socket of its
// children and also tvu_forward socket of its neighbors. Otherwise it
// should only forward to tvu_forward socket of its children.
if anchor_node {
// First neighbor is this node itself, so skip it.
ClusterInfo::retransmit_to(
&neighbors[1..],
&shred.payload,
socket,
true, // forward socket
socket_addr_space,
if let Err(SendPktsError::IoError(ioerr, num_failed)) =
multi_target_send(socket, &shred.payload, &addrs)
{
inc_new_counter_info!("cluster_info-retransmit-packets", addrs.len(), 1);
inc_new_counter_error!("cluster_info-retransmit-error", num_failed, 1);
error!(
"retransmit_to multi_target_send error: {:?}, {}/{} packets failed",
ioerr,
num_failed,
addrs.len(),
);
}
ClusterInfo::retransmit_to(
&children,
&shred.payload,
socket,
!anchor_node, // send to forward socket!
socket_addr_space,
);
retransmit_time.stop();
stats
.retransmit_total