From 70a16e91a5e63a0e6ac218f78d65b57d374a19b9 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Mon, 3 Jun 2019 20:38:05 -0700 Subject: [PATCH] Randomize avalanche broadcast peer table for each blob (#4529) * fix clippy warnings --- core/src/broadcast_stage.rs | 44 +++++----- core/src/cluster_info.rs | 152 +++++++-------------------------- core/src/packet.rs | 7 ++ core/src/retransmit_stage.rs | 21 ++--- core/tests/cluster_info.rs | 157 +++++++++++++++++++---------------- 5 files changed, 151 insertions(+), 230 deletions(-) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index bf599ee3f8..a869ed4e05 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -1,7 +1,7 @@ //! A stage to broadcast data from a leader node to validators //! use crate::blocktree::Blocktree; -use crate::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT}; +use crate::cluster_info::{ClusterInfo, ClusterInfoError}; use crate::entry::EntrySlice; use crate::erasure::CodingGenerator; use crate::packet::index_blobs_with_genesis; @@ -9,13 +9,10 @@ use crate::poh_recorder::WorkingBankEntries; use crate::result::{Error, Result}; use crate::service::Service; use crate::staking_utils; -use rand::SeedableRng; -use rand_chacha::ChaChaRng; use rayon::prelude::*; use rayon::ThreadPool; use solana_metrics::{ datapoint, inc_new_counter_debug, inc_new_counter_error, inc_new_counter_info, - inc_new_counter_warn, }; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::duration_as_ms; @@ -86,18 +83,6 @@ impl Broadcast { } } - let bank_epoch = bank.get_stakers_epoch(bank.slot()); - let mut seed = [0; 32]; - seed[0..8].copy_from_slice(&bank.slot().to_le_bytes()); - let mut broadcast_table = cluster_info.read().unwrap().sorted_tvu_peers( - staking_utils::staked_nodes_at_epoch(&bank, bank_epoch).as_ref(), - ChaChaRng::from_seed(seed), - ); - - inc_new_counter_warn!("broadcast_service-num_peers", broadcast_table.len() + 1); - // Layer 1, leader nodes are limited to the fanout size. - broadcast_table.truncate(DATA_PLANE_FANOUT); - inc_new_counter_info!("broadcast_service-entries_received", num_entries); let to_blobs_start = Instant::now(); @@ -127,9 +112,7 @@ impl Broadcast { bank.parent().map_or(0, |parent| parent.slot()), ); - let contains_last_tick = last_tick == max_tick_height; - - if contains_last_tick { + if last_tick == max_tick_height { blobs.last().unwrap().write().unwrap().set_is_last_in_slot(); } @@ -141,13 +124,26 @@ impl Broadcast { let broadcast_start = Instant::now(); - // Send out data - ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?; + let bank_epoch = bank.get_stakers_epoch(bank.slot()); + let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); - inc_new_counter_debug!("streamer-broadcast-sent", blobs.len()); + if let Some(nodes) = stakes.as_ref() { + if nodes.len() > 1 { + // Send out data + cluster_info + .read() + .unwrap() + .broadcast(sock, &blobs, stakes.as_ref())?; - // send out erasures - ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?; + inc_new_counter_debug!("streamer-broadcast-sent", blobs.len()); + + // send out erasures + cluster_info + .read() + .unwrap() + .broadcast(sock, &coding, stakes.as_ref())?; + } + } self.update_broadcast_stats( duration_as_ms(&broadcast_start.elapsed()), diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 9bd3b41ad4..91c1364372 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -28,10 +28,13 @@ use crate::weighted_shuffle::weighted_shuffle; use bincode::{deserialize, serialize}; use core::cmp; use itertools::Itertools; +use rand::SeedableRng; use rand::{thread_rng, Rng}; use rand_chacha::ChaChaRng; use rayon::prelude::*; -use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error}; +use solana_metrics::{ + datapoint_debug, inc_new_counter_debug, inc_new_counter_error, inc_new_counter_warn, +}; use solana_netutil::{ bind_in_range, bind_to, find_available_port_in_range, multi_bind_in_range, PortRange, }; @@ -44,7 +47,6 @@ use solana_sdk::transaction::Transaction; use std::cmp::min; use std::collections::{BTreeSet, HashMap}; use std::fmt; -use std::io; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; @@ -526,7 +528,7 @@ impl ClusterInfo { } /// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list - fn shuffle_peers_and_index( + pub fn shuffle_peers_and_index( &self, stakes: Option<&HashMap>, rng: ChaChaRng, @@ -538,13 +540,11 @@ impl ClusterInfo { let peers: Vec<_> = contacts_and_stakes .into_iter() .enumerate() - .filter_map(|(i, (_, peer))| { + .map(|(i, (_, peer))| { if peer.id == self.id() { index = i; - None - } else { - Some(peer) } + peer }) .collect(); (index, peers) @@ -709,33 +709,32 @@ impl ClusterInfo { /// broadcast messages from the leader to layer 1 nodes /// # Remarks pub fn broadcast( - id: &Pubkey, - contains_last_tick: bool, - broadcast_table: &[ContactInfo], + &self, s: &UdpSocket, blobs: &[SharedBlob], + stakes: Option<&HashMap>, ) -> Result<()> { - if broadcast_table.is_empty() { - debug!("{}:not enough peers in cluster_info table", id); - inc_new_counter_error!("cluster_info-broadcast-not_enough_peers_error", 1); - Err(ClusterInfoError::NoPeers)?; - } + let mut last_err = Ok(()); + let mut broadcast_table_len = 0; + blobs.iter().for_each(|b| { + let blob = b.read().unwrap(); + let broadcast_table = self.sorted_tvu_peers(stakes, ChaChaRng::from_seed(blob.seed())); + broadcast_table_len = cmp::max(broadcast_table_len, broadcast_table.len()); - let orders = Self::create_broadcast_orders(contains_last_tick, blobs, broadcast_table); - - trace!("broadcast orders table {}", orders.len()); - - let errs = Self::send_orders(id, s, orders); - - for e in errs { - if let Err(e) = &e { - trace!("{}: broadcast result {:?}", id, e); + if !broadcast_table.is_empty() { + if let Err(e) = s.send_to(&blob.data[..blob.meta.size], &broadcast_table[0].tvu) { + trace!("{}: broadcast result {:?}", self.id(), e); + last_err = Err(e); + } } - e?; - } + }); + + last_err?; inc_new_counter_debug!("cluster_info-broadcast-max_idx", blobs.len()); - + if broadcast_table_len != 0 { + inc_new_counter_warn!("broadcast_service-num_peers", broadcast_table_len + 1); + } Ok(()) } @@ -788,94 +787,6 @@ impl ClusterInfo { Ok(()) } - fn send_orders( - id: &Pubkey, - s: &UdpSocket, - orders: Vec<(SharedBlob, Vec<&ContactInfo>)>, - ) -> Vec> { - orders - .into_iter() - .flat_map(|(b, vs)| { - let blob = b.read().unwrap(); - - let ids_and_tvus = if log_enabled!(log::Level::Trace) { - let v_ids = vs.iter().map(|v| v.id); - let tvus = vs.iter().map(|v| v.tvu); - let ids_and_tvus = v_ids.zip(tvus).collect(); - - trace!( - "{}: BROADCAST idx: {} sz: {} to {:?} coding: {}", - id, - blob.index(), - blob.meta.size, - ids_and_tvus, - blob.is_coding() - ); - - ids_and_tvus - } else { - vec![] - }; - - assert!(blob.meta.size <= BLOB_SIZE); - let send_errs_for_blob: Vec<_> = vs - .iter() - .map(move |v| { - let e = s.send_to(&blob.data[..blob.meta.size], &v.tvu); - trace!( - "{}: done broadcast {} to {:?}", - id, - blob.meta.size, - ids_and_tvus - ); - e - }) - .collect(); - send_errs_for_blob - }) - .collect() - } - - pub fn create_broadcast_orders<'a, T>( - contains_last_tick: bool, - blobs: &[T], - broadcast_table: &'a [ContactInfo], - ) -> Vec<(T, Vec<&'a ContactInfo>)> - where - T: Clone, - { - // enumerate all the blobs in the window, those are the indices - // transmit them to nodes, starting from a different node. - if blobs.is_empty() { - return vec![]; - } - let mut orders = Vec::with_capacity(blobs.len()); - - let x = thread_rng().gen_range(0, broadcast_table.len()); - for (i, blob) in blobs.iter().enumerate() { - let br_idx = (x + i) % broadcast_table.len(); - - trace!("broadcast order data br_idx {}", br_idx); - - orders.push((blob.clone(), vec![&broadcast_table[br_idx]])); - } - - if contains_last_tick { - // Broadcast the last tick to everyone on the network so it doesn't get dropped - // (Need to maximize probability the next leader in line sees this handoff tick - // despite packet drops) - // If we had a tick at max_tick_height, then we know it must be the last - // Blob in the broadcast, There cannot be an entry that got sent after the - // last tick, guaranteed by the PohService). - orders.push(( - blobs.last().unwrap().clone(), - broadcast_table.iter().collect(), - )); - } - - orders - } - pub fn window_index_request_bytes(&self, slot: u64, blob_index: u64) -> Result> { let req = Protocol::RequestWindowIndex(self.my_data().clone(), slot, blob_index); let out = serialize(&req)?; @@ -1511,16 +1422,11 @@ impl ClusterInfo { /// 1 - also check if there are nodes in the next layer and repeat the layer 1 to layer 2 logic /// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake (Bank Balance) -pub fn compute_retransmit_peers( - stakes: Option<&HashMap>, - cluster_info: &Arc>, +pub fn compute_retransmit_peers( fanout: usize, - rng: ChaChaRng, + my_index: usize, + peers: Vec, ) -> (Vec, Vec) { - let (my_index, peers) = cluster_info - .read() - .unwrap() - .shuffle_peers_and_index(stakes, rng); //calc num_layers and num_neighborhoods using the total number of nodes let (num_layers, layer_indices) = ClusterInfo::describe_data_plane(peers.len(), fanout); diff --git a/core/src/packet.rs b/core/src/packet.rs index 79d43f8a7c..ba58d2a4e9 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -399,6 +399,13 @@ impl Blob { LittleEndian::write_u64(&mut self.data[INDEX_RANGE], ix); } + pub fn seed(&self) -> [u8; 32] { + let mut seed = [0; 32]; + seed[0..8].copy_from_slice(&self.index().to_le_bytes()); + seed[8..16].copy_from_slice(&self.slot().to_le_bytes()); + seed + } + /// sender id, we use this for identifying if its a blob from the leader that we should /// retransmit. eventually blobs should have a signature that we can use for spam filtering pub fn id(&self) -> Pubkey { diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 6a8b24442e..ec3b8b9b2b 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -44,18 +44,15 @@ fn retransmit( let r_bank = bank_forks.read().unwrap().working_bank(); let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot()); for blob in &blobs { - let slot = blob.read().unwrap().slot(); - let mut seed = [0; 32]; - seed[0..8].copy_from_slice(&slot.to_le_bytes()); - let (neighbors, children) = avalanche_topology_cache.entry(slot).or_insert_with(|| { - cache_history.push(slot); - compute_retransmit_peers( - staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch).as_ref(), - cluster_info, - DATA_PLANE_FANOUT, - ChaChaRng::from_seed(seed), - ) - }); + let (my_index, mut peers) = cluster_info.read().unwrap().shuffle_peers_and_index( + staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch).as_ref(), + ChaChaRng::from_seed(blob.read().unwrap().seed()), + ); + + peers.remove(my_index); + + let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, peers); + let leader = leader_schedule_cache .slot_leader_at(blob.read().unwrap().slot(), Some(r_bank.as_ref())); if blob.read().unwrap().meta.forward { diff --git a/core/tests/cluster_info.rs b/core/tests/cluster_info.rs index 5e83685c68..9c31e6758c 100644 --- a/core/tests/cluster_info.rs +++ b/core/tests/cluster_info.rs @@ -1,6 +1,6 @@ use rand::SeedableRng; use rand_chacha::ChaChaRng; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon::iter::ParallelIterator; use rayon::prelude::*; use solana::cluster_info::{compute_retransmit_peers, ClusterInfo}; use solana::contact_info::ContactInfo; @@ -9,11 +9,11 @@ use std::collections::{HashMap, HashSet}; use std::sync::mpsc::channel; use std::sync::mpsc::TryRecvError; use std::sync::mpsc::{Receiver, Sender}; +use std::sync::Arc; use std::sync::Mutex; -use std::sync::{Arc, RwLock}; use std::time::Instant; -type Nodes = HashMap, Receiver<(i32, bool)>)>; +type Nodes = HashMap, Receiver<(i32, bool)>)>; fn num_threads() -> usize { sys_info::cpu_num().unwrap_or(10) as usize @@ -23,11 +23,48 @@ fn num_threads() -> usize { fn find_insert_blob(id: &Pubkey, blob: i32, batches: &mut [Nodes]) { batches.par_iter_mut().for_each(|batch| { if batch.contains_key(id) { - let _ = batch.get_mut(id).unwrap().0.insert(blob); + let _ = batch.get_mut(id).unwrap().1.insert(blob); } }); } +fn retransmit( + mut shuffled_nodes: Vec, + senders: &HashMap>, + cluster: &ClusterInfo, + fanout: usize, + blob: i32, + retransmit: bool, +) -> i32 { + let mut seed = [0; 32]; + let mut my_index = 0; + let mut index = 0; + shuffled_nodes.retain(|c| { + if c.id == cluster.id() { + my_index = index; + false + } else { + index += 1; + true + } + }); + seed[0..4].copy_from_slice(&blob.to_le_bytes()); + let (neighbors, children) = compute_retransmit_peers(fanout, my_index, shuffled_nodes); + children.iter().for_each(|p| { + let s = senders.get(&p.id).unwrap(); + let _ = s.send((blob, retransmit)); + }); + + if retransmit { + neighbors.iter().for_each(|p| { + let s = senders.get(&p.id).unwrap(); + let _ = s.send((blob, false)); + }); + } + + blob +} + fn run_simulation(stakes: &[u64], fanout: usize) { let num_threads = num_threads(); // set timeout to 5 minutes @@ -50,7 +87,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) { batches .get_mut(0) .unwrap() - .insert(leader_info.id, (HashSet::new(), r)); + .insert(leader_info.id, (false, HashSet::new(), r)); let range: Vec<_> = (1..=stakes.len()).collect(); let chunk_size = (stakes.len() + num_threads - 1) / num_threads; range.chunks(chunk_size).for_each(|chunk| { @@ -64,99 +101,77 @@ fn run_simulation(stakes: &[u64], fanout: usize) { batches .get_mut(batch_ix) .unwrap() - .insert(node.id, (HashSet::new(), r)); + .insert(node.id, (false, HashSet::new(), r)); senders.lock().unwrap().insert(node.id, s); }) }); let c_info = cluster_info.clone(); - // create some "blobs". - let blobs: Vec<(_, _)> = (0..100).into_par_iter().map(|i| (i as i32, true)).collect(); - - // pretend to broadcast from leader - cluster_info::create_broadcast_orders - let mut broadcast_table = - cluster_info.sorted_tvu_peers(Some(&staked_nodes), ChaChaRng::from_seed([0x5a; 32])); - broadcast_table.truncate(fanout); - let orders = ClusterInfo::create_broadcast_orders(false, &blobs, &broadcast_table); - - // send blobs to layer 1 nodes - orders.iter().for_each(|(b, vc)| { - vc.iter().for_each(|c| { - find_insert_blob(&c.id, b.0, &mut batches); + let blobs_len = 100; + let shuffled_peers: Vec> = (0..blobs_len as i32) + .map(|i| { + let mut seed = [0; 32]; + seed[0..4].copy_from_slice(&i.to_le_bytes()); + let (_, peers) = cluster_info + .shuffle_peers_and_index(Some(&staked_nodes), ChaChaRng::from_seed(seed)); + peers }) + .collect(); + + // create some "blobs". + (0..blobs_len).into_iter().for_each(|i| { + let broadcast_table = &shuffled_peers[i]; + find_insert_blob(&broadcast_table[0].id, i as i32, &mut batches); }); + assert!(!batches.is_empty()); // start avalanche simulation let now = Instant::now(); batches.par_iter_mut().for_each(|batch| { let mut cluster = c_info.clone(); - let batch_size = batch.len(); - let mut remaining = batch_size; + let mut remaining = batch.len(); let senders: HashMap<_, _> = senders.lock().unwrap().clone(); - // A map that holds neighbors and children senders for a given node - let mut mapped_peers: HashMap< - Pubkey, - (Vec>, Vec>), - > = HashMap::new(); while remaining > 0 { - for (id, (recv, r)) in batch.iter_mut() { + for (id, (layer1_done, recv, r)) in batch.iter_mut() { assert!( now.elapsed().as_secs() < timeout, "Timed out with {:?} remaining nodes", remaining ); cluster.gossip.set_self(&*id); - if !mapped_peers.contains_key(id) { - let (neighbors, children) = compute_retransmit_peers( - Some(&staked_nodes), - &Arc::new(RwLock::new(cluster.clone())), - fanout, - ChaChaRng::from_seed([0x5a; 32]), - ); - let vec_children: Vec<_> = children - .iter() - .map(|p| { - let s = senders.get(&p.id).unwrap(); - recv.iter().for_each(|i| { - let _ = s.send((*i, true)); - }); - s.clone() - }) - .collect(); - - let vec_neighbors: Vec<_> = neighbors - .iter() - .map(|p| { - let s = senders.get(&p.id).unwrap(); - recv.iter().for_each(|i| { - let _ = s.send((*i, false)); - }); - s.clone() - }) - .collect(); - mapped_peers.insert(*id, (vec_neighbors, vec_children)); + if !*layer1_done { + recv.iter().for_each(|i| { + retransmit( + shuffled_peers[*i as usize].clone(), + &senders, + &cluster, + fanout, + *i, + true, + ); + }); + *layer1_done = true; } - let (vec_neighbors, vec_children) = mapped_peers.get(id).unwrap(); //send and recv - if recv.len() < blobs.len() { + if recv.len() < blobs_len { loop { match r.try_recv() { - Ok((data, retransmit)) => { + Ok((data, retx)) => { if recv.insert(data) { - vec_children.iter().for_each(|s| { - let _ = s.send((data, retransmit)); - }); - if retransmit { - vec_neighbors.iter().for_each(|s| { - let _ = s.send((data, false)); - }) - } - if recv.len() == blobs.len() { - remaining -= 1; - break; - } + let _ = retransmit( + shuffled_peers[data as usize].clone(), + &senders, + &cluster, + fanout, + data, + retx, + ); + } + if recv.len() == blobs_len { + remaining -= 1; + break; } } Err(TryRecvError::Disconnected) => break,