diff --git a/core/benches/cluster_nodes.rs b/core/benches/cluster_nodes.rs index 0f3b836444..c2302fcabc 100644 --- a/core/benches/cluster_nodes.rs +++ b/core/benches/cluster_nodes.rs @@ -9,7 +9,12 @@ use { retransmit_stage::RetransmitStage, }, solana_gossip::contact_info::ContactInfo, - solana_sdk::{clock::Slot, hash::hashv, pubkey::Pubkey, signature::Signature}, + solana_ledger::{ + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + shred::Shred, + }, + solana_runtime::bank::Bank, + solana_sdk::pubkey::Pubkey, test::Bencher, }; @@ -26,87 +31,48 @@ fn make_cluster_nodes( fn get_retransmit_peers_deterministic( cluster_nodes: &ClusterNodes, - slot: &Slot, + shred: &mut Shred, slot_leader: &Pubkey, + root_bank: &Bank, num_simulated_shreds: usize, ) { for i in 0..num_simulated_shreds { - // see Shred::seed - let shred_seed = hashv(&[ - &slot.to_le_bytes(), - &(i as u32).to_le_bytes(), - &slot_leader.to_bytes(), - ]) - .to_bytes(); - - let (_neighbors, _children) = cluster_nodes.get_retransmit_peers_deterministic( - shred_seed, - solana_gossip::cluster_info::DATA_PLANE_FANOUT, + shred.common_header.index = i as u32; + let (_neighbors, _children) = cluster_nodes.get_retransmit_peers( *slot_leader, - ); - } -} - -fn get_retransmit_peers_compat( - cluster_nodes: &ClusterNodes, - slot_leader: &Pubkey, - signatures: &[Signature], -) { - for signature in signatures.iter() { - // see Shred::seed - let signature = signature.as_ref(); - let offset = signature.len().checked_sub(32).unwrap(); - let shred_seed = signature[offset..].try_into().unwrap(); - - let (_neighbors, _children) = cluster_nodes.get_retransmit_peers_compat( - shred_seed, + shred, + root_bank, solana_gossip::cluster_info::DATA_PLANE_FANOUT, - *slot_leader, ); } } fn get_retransmit_peers_deterministic_wrapper(b: &mut Bencher, unstaked_ratio: Option<(u32, u32)>) { let mut rng = rand::thread_rng(); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new_for_benches(&genesis_config); let (nodes, cluster_nodes) = make_cluster_nodes(&mut rng, unstaked_ratio); let slot_leader = nodes[1..].choose(&mut rng).unwrap().id; let slot = rand::random::(); + let mut shred = Shred::new_empty_data_shred(); + shred.common_header.slot = slot; b.iter(|| { get_retransmit_peers_deterministic( &cluster_nodes, - &slot, + &mut shred, &slot_leader, + &bank, NUM_SIMULATED_SHREDS, ) }); } -fn get_retransmit_peers_compat_wrapper(b: &mut Bencher, unstaked_ratio: Option<(u32, u32)>) { - let mut rng = rand::thread_rng(); - let (nodes, cluster_nodes) = make_cluster_nodes(&mut rng, unstaked_ratio); - let slot_leader = nodes[1..].choose(&mut rng).unwrap().id; - let signatures: Vec<_> = std::iter::repeat_with(Signature::new_unique) - .take(NUM_SIMULATED_SHREDS) - .collect(); - b.iter(|| get_retransmit_peers_compat(&cluster_nodes, &slot_leader, &signatures)); -} - #[bench] fn bench_get_retransmit_peers_deterministic_unstaked_ratio_1_2(b: &mut Bencher) { get_retransmit_peers_deterministic_wrapper(b, Some((1, 2))); } -#[bench] -fn bench_get_retransmit_peers_compat_unstaked_ratio_1_2(b: &mut Bencher) { - get_retransmit_peers_compat_wrapper(b, Some((1, 2))); -} - #[bench] fn bench_get_retransmit_peers_deterministic_unstaked_ratio_1_32(b: &mut Bencher) { get_retransmit_peers_deterministic_wrapper(b, Some((1, 32))); } - -#[bench] -fn bench_get_retransmit_peers_compat_unstaked_ratio_1_32(b: &mut Bencher) { - get_retransmit_peers_compat_wrapper(b, Some((1, 32))); -} diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index eba9bf179c..f52183ba75 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -10,13 +10,12 @@ use { crds::GossipRoute, crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, crds_value::{CrdsData, CrdsValue}, - weighted_shuffle::{weighted_best, weighted_shuffle, WeightedShuffle}, + weighted_shuffle::WeightedShuffle, }, solana_ledger::shred::Shred, solana_runtime::bank::Bank, solana_sdk::{ clock::{Epoch, Slot}, - feature_set, pubkey::Pubkey, signature::Keypair, timing::timestamp, @@ -56,10 +55,6 @@ pub struct ClusterNodes { // Reverse index from nodes pubkey to their index in self.nodes. index: HashMap, weighted_shuffle: WeightedShuffle, - // Weights and indices for sampling peers. weighted_{shuffle,best} expect - // weights >= 1. For backward compatibility we use max(1, stake) for - // weights and exclude nodes with no contact-info. - compat_index: Vec<(/*weight:*/ u64, /*index:*/ usize)>, _phantom: PhantomData, } @@ -92,14 +87,15 @@ impl Node { impl ClusterNodes { pub(crate) fn num_peers(&self) -> usize { - self.compat_index.len() + self.nodes.len().saturating_sub(1) } // A peer is considered live if they generated their contact info recently. pub(crate) fn num_peers_live(&self, now: u64) -> usize { - self.compat_index + self.nodes .iter() - .filter_map(|(_, index)| self.nodes[*index].contact_info()) + .filter(|node| node.pubkey() != self.pubkey) + .filter_map(|node| node.contact_info()) .filter(|node| { let elapsed = if node.wallclock < now { now - node.wallclock @@ -120,20 +116,12 @@ impl ClusterNodes { pub(crate) fn get_broadcast_addrs( &self, shred: &Shred, - root_bank: &Bank, + _root_bank: &Bank, fanout: usize, socket_addr_space: &SocketAddrSpace, ) -> Vec { const MAX_CONTACT_INFO_AGE: Duration = Duration::from_secs(2 * 60); let shred_seed = shred.seed(self.pubkey); - if !enable_turbine_peers_shuffle_patch(shred.slot(), root_bank) { - if let Some(node) = self.get_broadcast_peer(shred_seed) { - if socket_addr_space.check(&node.tvu) { - return vec![node.tvu]; - } - } - return Vec::default(); - } let mut rng = ChaChaRng::from_seed(shred_seed); let index = match self.weighted_shuffle.first(&mut rng) { None => return Vec::default(), @@ -175,20 +163,6 @@ impl ClusterNodes { .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) .collect() } - - /// Returns the root of turbine broadcast tree, which the leader sends the - /// shred to. - fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> { - if self.compat_index.is_empty() { - None - } else { - let index = weighted_best(&self.compat_index, shred_seed); - match &self.nodes[index].node { - NodeId::ContactInfo(node) => Some(node), - NodeId::Pubkey(_) => panic!("this should not happen!"), - } - } - } } impl ClusterNodes { @@ -223,32 +197,17 @@ impl ClusterNodes { .collect() } - fn get_retransmit_peers( + pub fn get_retransmit_peers( &self, slot_leader: Pubkey, shred: &Shred, - root_bank: &Bank, + _root_bank: &Bank, fanout: usize, ) -> ( Vec<&Node>, // neighbors Vec<&Node>, // children ) { let shred_seed = shred.seed(slot_leader); - if !enable_turbine_peers_shuffle_patch(shred.slot(), root_bank) { - return self.get_retransmit_peers_compat(shred_seed, fanout, slot_leader); - } - self.get_retransmit_peers_deterministic(shred_seed, fanout, slot_leader) - } - - pub fn get_retransmit_peers_deterministic( - &self, - shred_seed: [u8; 32], - fanout: usize, - slot_leader: Pubkey, - ) -> ( - Vec<&Node>, // neighbors - Vec<&Node>, // children - ) { let mut weighted_shuffle = self.weighted_shuffle.clone(); // Exclude slot leader from list of nodes. if slot_leader == self.pubkey { @@ -271,46 +230,6 @@ impl ClusterNodes { debug_assert_eq!(neighbors[self_index % fanout].pubkey(), self.pubkey); (neighbors, children) } - - pub fn get_retransmit_peers_compat( - &self, - shred_seed: [u8; 32], - fanout: usize, - slot_leader: Pubkey, - ) -> ( - Vec<&Node>, // neighbors - Vec<&Node>, // children - ) { - // Exclude leader from list of nodes. - let (weights, index): (Vec, Vec) = if slot_leader == self.pubkey { - error!("retransmit from slot leader: {}", slot_leader); - self.compat_index.iter().copied().unzip() - } else { - self.compat_index - .iter() - .filter(|(_, i)| self.nodes[*i].pubkey() != slot_leader) - .copied() - .unzip() - }; - let index: Vec<_> = { - let shuffle = weighted_shuffle(weights.into_iter(), shred_seed); - shuffle.into_iter().map(|i| index[i]).collect() - }; - let self_index = index - .iter() - .position(|i| self.nodes[*i].pubkey() == self.pubkey) - .unwrap(); - let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &index); - // Assert that the node itself is included in the set of neighbors, at - // the right offset. - debug_assert_eq!( - self.nodes[neighbors[self_index % fanout]].pubkey(), - self.pubkey - ); - let neighbors = neighbors.into_iter().map(|i| &self.nodes[i]).collect(); - let children = children.into_iter().map(|i| &self.nodes[i]).collect(); - (neighbors, children) - } } pub fn new_cluster_nodes( @@ -330,26 +249,11 @@ pub fn new_cluster_nodes( if broadcast { weighted_shuffle.remove_index(index[&self_pubkey]); } - // For backward compatibility: - // * nodes which do not have contact-info are excluded. - // * stakes are floored at 1. - // The sorting key here should be equivalent to - // solana_gossip::deprecated::sorted_stakes_with_index. - // Leader itself is excluded when sampling broadcast peers. - let compat_index = nodes - .iter() - .enumerate() - .filter(|(_, node)| node.contact_info().is_some()) - .filter(|(_, node)| !broadcast || node.pubkey() != self_pubkey) - .sorted_by_key(|(_, node)| Reverse((node.stake.max(1), node.pubkey()))) - .map(|(index, node)| (node.stake.max(1), index)) - .collect(); ClusterNodes { pubkey: self_pubkey, nodes, index, weighted_shuffle, - compat_index, _phantom: PhantomData::default(), } } @@ -387,21 +291,6 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap) -> Vec bool { - let feature_slot = root_bank - .feature_set - .activated_slot(&feature_set::turbine_peers_shuffle::id()); - match feature_slot { - None => false, - Some(feature_slot) => { - let epoch_schedule = root_bank.epoch_schedule(); - let feature_epoch = epoch_schedule.get_epoch(feature_slot); - let shred_epoch = epoch_schedule.get_epoch(shred_slot); - feature_epoch < shred_epoch - } - } -} - impl ClusterNodesCache { pub fn new( // Capacity of underlying LRU-cache in terms of number of epochs. @@ -563,7 +452,6 @@ mod tests { assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1); let cluster_nodes = new_cluster_nodes::(&cluster_info, &stakes); // All nodes with contact-info should be in the index. - assert_eq!(cluster_nodes.compat_index.len(), nodes.len()); // Staked nodes with no contact-info should be included. assert!(cluster_nodes.nodes.len() > nodes.len()); // Assert that all nodes keep their contact-info. @@ -586,22 +474,6 @@ mod tests { let (peers, stakes_and_index) = sorted_retransmit_peers_and_stakes(&cluster_info, Some(&stakes)); assert_eq!(stakes_and_index.len(), peers.len()); - assert_eq!(cluster_nodes.compat_index.len(), peers.len()); - for (i, node) in cluster_nodes - .compat_index - .iter() - .map(|(_, i)| &cluster_nodes.nodes[*i]) - .enumerate() - { - let (stake, index) = stakes_and_index[i]; - // Wallclock may be update by ClusterInfo::push_self. - if node.pubkey() == this_node.id { - assert_eq!(this_node.id, peers[index].id) - } else { - assert_eq!(node.contact_info().unwrap(), &peers[index]); - } - assert_eq!(node.stake.max(1), stake); - } let slot_leader = nodes[1..].choose(&mut rng).unwrap().id; // Remove slot leader from peers indices. let stakes_and_index: Vec<_> = stakes_and_index @@ -618,21 +490,6 @@ mod tests { .map(|(_, index)| index) .collect(); assert_eq!(this_node.id, peers[shuffled_index[self_index]].id); - for fanout in 1..200 { - let (neighbors_indices, children_indices) = - compute_retransmit_peers(fanout, self_index, &shuffled_index); - let (neighbors, children) = - cluster_nodes.get_retransmit_peers_compat(shred_seed, fanout, slot_leader); - assert_eq!(children.len(), children_indices.len()); - for (node, index) in children.into_iter().zip(children_indices) { - assert_eq!(*node.contact_info().unwrap(), peers[index]); - } - assert_eq!(neighbors.len(), neighbors_indices.len()); - assert_eq!(neighbors[0].pubkey(), peers[neighbors_indices[0]].id); - for (node, index) in neighbors.into_iter().zip(neighbors_indices).skip(1) { - assert_eq!(*node.contact_info().unwrap(), peers[index]); - } - } } #[test] @@ -644,7 +501,6 @@ mod tests { let cluster_nodes = ClusterNodes::::new(&cluster_info, &stakes); // All nodes with contact-info should be in the index. // Excluding this node itself. - assert_eq!(cluster_nodes.compat_index.len() + 1, nodes.len()); // Staked nodes with no contact-info should be included. assert!(cluster_nodes.nodes.len() > nodes.len()); // Assert that all nodes keep their contact-info. @@ -666,23 +522,5 @@ mod tests { } let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(&stakes)); assert_eq!(peers_and_stakes.len(), peers.len()); - assert_eq!(cluster_nodes.compat_index.len(), peers.len()); - for (i, node) in cluster_nodes - .compat_index - .iter() - .map(|(_, i)| &cluster_nodes.nodes[*i]) - .enumerate() - { - let (stake, index) = peers_and_stakes[i]; - assert_eq!(node.contact_info().unwrap(), &peers[index]); - assert_eq!(node.stake.max(1), stake); - } - for _ in 0..100 { - let mut shred_seed = [0u8; 32]; - rng.fill(&mut shred_seed[..]); - let index = weighted_best(&peers_and_stakes, shred_seed); - let peer = cluster_nodes.get_broadcast_peer(shred_seed).unwrap(); - assert_eq!(*peer, peers[index]); - } } } diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 77e1f9feea..9347f43e22 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -211,10 +211,6 @@ pub mod send_to_tpu_vote_port { solana_sdk::declare_id!("C5fh68nJ7uyKAuYZg2x9sEQ5YrVf3dkW6oojNBSc3Jvo"); } -pub mod turbine_peers_shuffle { - solana_sdk::declare_id!("4VvpgRD6UsHvkXwpuQhtR5NG1G4esMaExeWuSEpsYRUa"); -} - pub mod requestable_heap_size { solana_sdk::declare_id!("CCu4boMmfLuqcmfTLPHQiUo22ZdUsXjgzPAURYaWt1Bw"); } @@ -386,7 +382,6 @@ lazy_static! { (optimize_epoch_boundary_updates::id(), "optimize epoch boundary updates"), (remove_native_loader::id(), "remove support for the native loader"), (send_to_tpu_vote_port::id(), "send votes to the tpu vote port"), - (turbine_peers_shuffle::id(), "turbine peers shuffle patch"), (requestable_heap_size::id(), "Requestable heap frame size"), (disable_fee_calculator::id(), "deprecate fee calculator"), (add_compute_budget_program::id(), "Add compute_budget_program"),