From b87a1d2bc5c2c456452c513bc54b53dbc9cb7c0d Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 6 Apr 2020 19:11:23 -0700 Subject: [PATCH] Optimize broadcast cluster_info critical section (#9327) (#9344) automerge --- core/benches/cluster_info.rs | 28 +++--- core/src/broadcast_stage.rs | 85 ++++++++++++++++++ .../fail_entry_verification_broadcast_run.rs | 15 ++-- .../broadcast_stage/standard_broadcast_run.rs | 28 +++--- core/src/cluster_info.rs | 87 +++---------------- streamer/src/sendmmsg.rs | 32 +++---- 6 files changed, 151 insertions(+), 124 deletions(-) diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index 7dfd7be9d0..9f3a99351c 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -3,10 +3,13 @@ extern crate test; use rand::{thread_rng, Rng}; +use solana_core::broadcast_stage::{broadcast_shreds, get_broadcast_peers}; use solana_core::cluster_info::{ClusterInfo, Node}; use solana_core::contact_info::ContactInfo; +use solana_ledger::shred::Shred; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; +use std::sync::RwLock; use std::{collections::HashMap, net::UdpSocket, sync::Arc, time::Instant}; use test::Bencher; @@ -18,10 +21,8 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.info.clone()); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - const SHRED_SIZE: usize = 1024; const NUM_SHREDS: usize = 32; - let shreds = vec![vec![0; SHRED_SIZE]; NUM_SHREDS]; - let seeds = vec![[0u8; 32]; NUM_SHREDS]; + let shreds = vec![Shred::new_empty_data_shred(); NUM_SHREDS]; let mut stakes = HashMap::new(); const NUM_PEERS: usize = 200; for _ in 0..NUM_PEERS { @@ -31,16 +32,19 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64); } let stakes = Arc::new(stakes); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(stakes.clone())); + let shreds = Arc::new(shreds); bencher.iter(move || { let shreds = shreds.clone(); - cluster_info - .broadcast_shreds( - &socket, - shreds, - &seeds, - Some(stakes.clone()), - &mut Instant::now(), - ) - .unwrap(); + broadcast_shreds( + &socket, + &shreds, + &peers_and_stakes, + &peers, + &mut Instant::now(), + &mut 0, + ) + .unwrap(); }); } diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index d292764055..071091b78c 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -4,6 +4,9 @@ use self::{ fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun, standard_broadcast_run::StandardBroadcastRun, }; +use crate::contact_info::ContactInfo; +use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; +use crate::weighted_shuffle::weighted_best; use crate::{ cluster_info::{ClusterInfo, ClusterInfoError}, poh_recorder::WorkingBankEntry, @@ -14,9 +17,13 @@ use crossbeam_channel::{ Sender as CrossbeamSender, }; use solana_ledger::{blockstore::Blockstore, shred::Shred, staking_utils}; +use solana_measure::measure::Measure; use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; use solana_runtime::bank::Bank; +use solana_sdk::timing::duration_as_s; +use solana_sdk::timing::timestamp; use solana_sdk::{clock::Slot, pubkey::Pubkey}; +use solana_streamer::sendmmsg::send_mmsg; use std::{ collections::HashMap, net::UdpSocket, @@ -328,6 +335,84 @@ impl BroadcastStage { } } +fn update_peer_stats(num_live_peers: i64, broadcast_len: i64, last_datapoint_submit: &mut Instant) { + if duration_as_s(&Instant::now().duration_since(*last_datapoint_submit)) >= 1.0 { + datapoint_info!( + "cluster_info-num_nodes", + ("live_count", num_live_peers, i64), + ("broadcast_count", broadcast_len, i64) + ); + *last_datapoint_submit = Instant::now(); + } +} + +pub fn get_broadcast_peers( + cluster_info: &Arc>, + stakes: Option>>, +) -> (Vec, Vec<(u64, usize)>) { + use crate::cluster_info; + let mut peers = cluster_info.read().unwrap().tvu_peers(); + let peers_and_stakes = cluster_info::stake_weight_peers(&mut peers, stakes); + (peers, peers_and_stakes) +} + +/// broadcast messages from the leader to layer 1 nodes +/// # Remarks +pub fn broadcast_shreds( + s: &UdpSocket, + shreds: &Arc>, + peers_and_stakes: &[(u64, usize)], + peers: &[ContactInfo], + last_datapoint_submit: &mut Instant, + send_mmsg_total: &mut u64, +) -> Result<()> { + let broadcast_len = peers_and_stakes.len(); + if broadcast_len == 0 { + update_peer_stats(1, 1, last_datapoint_submit); + return Ok(()); + } + let packets: Vec<_> = shreds + .iter() + .map(|shred| { + let broadcast_index = weighted_best(&peers_and_stakes, shred.seed()); + + (&shred.payload, &peers[broadcast_index].tvu) + }) + .collect(); + + let mut sent = 0; + let mut send_mmsg_time = Measure::start("send_mmsg"); + while sent < packets.len() { + match send_mmsg(s, &packets[sent..]) { + Ok(n) => sent += n, + Err(e) => { + return Err(Error::IO(e)); + } + } + } + send_mmsg_time.stop(); + *send_mmsg_total += send_mmsg_time.as_us(); + + let num_live_peers = num_live_peers(&peers); + update_peer_stats( + num_live_peers, + broadcast_len as i64 + 1, + last_datapoint_submit, + ); + Ok(()) +} + +fn num_live_peers(peers: &[ContactInfo]) -> i64 { + let mut num_live_peers = 1i64; + peers.iter().for_each(|p| { + // A peer is considered live if they generated their contact info recently + if timestamp() - p.wallclock <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS { + num_live_peers += 1; + } + }); + num_live_peers +} + #[cfg(test)] pub mod test { use super::*; diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 27a4f4d716..51b4f5f5dc 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -78,16 +78,19 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { sock: &UdpSocket, ) -> Result<()> { let (stakes, shreds) = receiver.lock().unwrap().recv()?; - let all_seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect(); // Broadcast data - let all_shred_bufs: Vec> = shreds.to_vec().into_iter().map(|s| s.payload).collect(); - cluster_info.read().unwrap().broadcast_shreds( + let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes); + + let mut send_mmsg_total = 0; + broadcast_shreds( sock, - all_shred_bufs, - &all_seeds, - stakes, + &shreds, + &peers_and_stakes, + &peers, &mut Instant::now(), + &mut send_mmsg_total, )?; + Ok(()) } fn record( diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index ca6d7a197e..6b07108e86 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -17,20 +17,17 @@ struct BroadcastStats { broadcast_elapsed: u64, receive_elapsed: u64, seed_elapsed: u64, + send_mmsg_elapsed: u64, } impl BroadcastStats { fn reset(&mut self) { - self.insert_shreds_elapsed = 0; - self.shredding_elapsed = 0; - self.broadcast_elapsed = 0; - self.receive_elapsed = 0; - self.seed_elapsed = 0; + *self = Self::default(); } } #[derive(Clone)] -pub(super) struct StandardBroadcastRun { +pub struct StandardBroadcastRun { stats: Arc>, unfinished_slot: Option, current_slot_and_parent: Option<(u64, u64)>, @@ -258,20 +255,22 @@ impl StandardBroadcastRun { shreds: Arc>, ) -> Result<()> { let seed_start = Instant::now(); - let seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect(); let seed_elapsed = seed_start.elapsed(); // Broadcast the shreds let broadcast_start = Instant::now(); - let shred_bufs: Vec> = shreds.to_vec().into_iter().map(|s| s.payload).collect(); - trace!("Broadcasting {:?} shreds", shred_bufs.len()); + trace!("Broadcasting {:?} shreds", shreds.len()); - cluster_info.read().unwrap().broadcast_shreds( + let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes); + + let mut send_mmsg_total = 0; + broadcast_shreds( sock, - shred_bufs, - &seeds, - stakes, + &shreds, + &peers_and_stakes, + &peers, &mut self.last_datapoint_submit, + &mut send_mmsg_total, )?; let broadcast_elapsed = broadcast_start.elapsed(); @@ -279,6 +278,7 @@ impl StandardBroadcastRun { self.update_broadcast_stats(BroadcastStats { broadcast_elapsed: duration_as_us(&broadcast_elapsed), seed_elapsed: duration_as_us(&seed_elapsed), + send_mmsg_elapsed: send_mmsg_total, ..BroadcastStats::default() }); Ok(()) @@ -291,6 +291,7 @@ impl StandardBroadcastRun { wstats.insert_shreds_elapsed += stats.insert_shreds_elapsed; wstats.broadcast_elapsed += stats.broadcast_elapsed; wstats.seed_elapsed += stats.seed_elapsed; + wstats.send_mmsg_elapsed += stats.send_mmsg_elapsed; } fn report_and_reset_stats(&mut self) { @@ -303,6 +304,7 @@ impl StandardBroadcastRun { ("insertion_time", stats.insert_shreds_elapsed as i64, i64), ("broadcast_time", stats.broadcast_elapsed as i64, i64), ("receive_time", stats.receive_elapsed as i64, i64), + ("send_mmsg", stats.send_mmsg_elapsed as i64, i64), ("seed", stats.seed_elapsed as i64, i64), ( "num_shreds", diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 3aecf08b34..96533074fa 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -22,7 +22,7 @@ use crate::{ }, epoch_slots::EpochSlots, result::{Error, Result}, - weighted_shuffle::{weighted_best, weighted_shuffle}, + weighted_shuffle::weighted_shuffle, }; use bincode::{serialize, serialized_size}; use core::cmp; @@ -43,7 +43,6 @@ use solana_perf::packet::{ }; use solana_rayon_threadlimit::get_thread_count; use solana_sdk::hash::Hash; -use solana_sdk::timing::duration_as_s; use solana_sdk::{ clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_SLOTS_PER_EPOCH}, pubkey::Pubkey, @@ -51,7 +50,7 @@ use solana_sdk::{ timing::{duration_as_ms, timestamp}, transaction::Transaction, }; -use solana_streamer::sendmmsg::{multicast, send_mmsg}; +use solana_streamer::sendmmsg::multicast; use solana_streamer::streamer::{PacketReceiver, PacketSender}; use std::{ borrow::Cow, @@ -934,77 +933,6 @@ impl ClusterInfo { .collect() } - fn sorted_tvu_peers_and_stakes( - &self, - stakes: Option>>, - ) -> (Vec, Vec<(u64, usize)>) { - let mut peers = self.tvu_peers(); - peers.dedup(); - let stakes_and_index = ClusterInfo::sorted_stakes_with_index(&peers, stakes); - (peers, stakes_and_index) - } - - /// broadcast messages from the leader to layer 1 nodes - /// # Remarks - pub fn broadcast_shreds( - &self, - s: &UdpSocket, - shreds: Vec>, - seeds: &[[u8; 32]], - stakes: Option>>, - last_datapoint_submit: &mut Instant, - ) -> Result<()> { - let (peers, peers_and_stakes) = self.sorted_tvu_peers_and_stakes(stakes); - let broadcast_len = peers_and_stakes.len(); - if broadcast_len == 0 { - if duration_as_s(&Instant::now().duration_since(*last_datapoint_submit)) >= 1.0 { - datapoint_info!( - "cluster_info-num_nodes", - ("live_count", 1, i64), - ("broadcast_count", 1, i64) - ); - *last_datapoint_submit = Instant::now(); - } - return Ok(()); - } - let mut packets: Vec<_> = shreds - .into_iter() - .zip(seeds) - .map(|(shred, seed)| { - let broadcast_index = weighted_best(&peers_and_stakes, *seed); - - (shred, &peers[broadcast_index].tvu) - }) - .collect(); - - let mut sent = 0; - while sent < packets.len() { - match send_mmsg(s, &mut packets[sent..]) { - Ok(n) => sent += n, - Err(e) => { - return Err(Error::IO(e)); - } - } - } - - let mut num_live_peers = 1i64; - peers.iter().for_each(|p| { - // A peer is considered live if they generated their contact info recently - if timestamp() - p.wallclock <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS { - num_live_peers += 1; - } - }); - if duration_as_s(&Instant::now().duration_since(*last_datapoint_submit)) >= 1.0 { - datapoint_info!( - "cluster_info-num_nodes", - ("live_count", num_live_peers, i64), - ("broadcast_count", broadcast_len + 1, i64) - ); - *last_datapoint_submit = Instant::now(); - } - Ok(()) - } - /// retransmit messages to a list of nodes /// # Remarks /// We need to avoid having obj locked while doing a io, such as the `send_to` @@ -1942,6 +1870,14 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) { } } +pub fn stake_weight_peers( + peers: &mut Vec, + stakes: Option>>, +) -> Vec<(u64, usize)> { + peers.dedup(); + ClusterInfo::sorted_stakes_with_index(peers, stakes) +} + #[cfg(test)] mod tests { use super::*; @@ -2480,7 +2416,8 @@ mod tests { stakes.insert(id4, 10); let stakes = Arc::new(stakes); - let (peers, peers_and_stakes) = cluster_info.sorted_tvu_peers_and_stakes(Some(stakes)); + let mut peers = cluster_info.tvu_peers(); + let peers_and_stakes = stake_weight_peers(&mut peers, Some(stakes)); assert_eq!(peers.len(), 2); assert_eq!(peers[0].id, id); assert_eq!(peers[1].id, id2); diff --git a/streamer/src/sendmmsg.rs b/streamer/src/sendmmsg.rs index 4ad5b327e4..39d320ea50 100644 --- a/streamer/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -4,7 +4,7 @@ use std::io; use std::net::{SocketAddr, UdpSocket}; #[cfg(not(target_os = "linux"))] -pub fn send_mmsg(sock: &UdpSocket, packets: &mut [(Vec, &SocketAddr)]) -> io::Result { +pub fn send_mmsg(sock: &UdpSocket, packets: &[(&Vec, &SocketAddr)]) -> io::Result { let count = packets.len(); for (p, a) in packets { sock.send_to(p, *a)?; @@ -18,7 +18,7 @@ use libc::{iovec, mmsghdr, sockaddr_in, sockaddr_in6}; #[cfg(target_os = "linux")] fn mmsghdr_for_packet( - packet: &mut [u8], + packet: &[u8], dest: &SocketAddr, index: usize, addr_in_len: u32, @@ -32,7 +32,7 @@ fn mmsghdr_for_packet( use std::mem; iovs.push(iovec { - iov_base: packet.as_mut_ptr() as *mut c_void, + iov_base: packet.as_ptr() as *mut c_void, iov_len: packet.len(), }); @@ -57,7 +57,7 @@ fn mmsghdr_for_packet( } #[cfg(target_os = "linux")] -pub fn send_mmsg(sock: &UdpSocket, packets: &mut [(Vec, &SocketAddr)]) -> io::Result { +pub fn send_mmsg(sock: &UdpSocket, packets: &[(&Vec, &SocketAddr)]) -> io::Result { use libc::{sendmmsg, socklen_t}; use std::mem; use std::os::unix::io::AsRawFd; @@ -73,7 +73,7 @@ pub fn send_mmsg(sock: &UdpSocket, packets: &mut [(Vec, &SocketAddr)]) -> io let sock_fd = sock.as_raw_fd(); let mut hdrs: Vec = packets - .iter_mut() + .iter() .enumerate() .map(|(i, (packet, dest))| { mmsghdr_for_packet( @@ -160,11 +160,10 @@ mod tests { let addr = reader.local_addr().unwrap(); let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let mut packets: Vec<_> = (0..32) - .map(|_| (vec![0u8; PACKET_DATA_SIZE], &addr)) - .collect(); + let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); + let packet_refs: Vec<_> = packets.iter().map(|p| (p, &addr)).collect(); - let sent = send_mmsg(&sender, &mut packets).ok(); + let sent = send_mmsg(&sender, &packet_refs).ok(); assert_eq!(sent, Some(32)); let mut packets = vec![Packet::default(); 32]; @@ -182,17 +181,14 @@ mod tests { let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let mut packets: Vec<_> = (0..32) - .map(|i| { - if i < 16 { - (vec![0u8; PACKET_DATA_SIZE], &addr) - } else { - (vec![0u8; PACKET_DATA_SIZE], &addr2) - } - }) + let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); + let packet_refs: Vec<_> = packets + .iter() + .enumerate() + .map(|(i, p)| if i < 16 { (p, &addr) } else { (p, &addr2) }) .collect(); - let sent = send_mmsg(&sender, &mut packets).ok(); + let sent = send_mmsg(&sender, &packet_refs).ok(); assert_eq!(sent, Some(32)); let mut packets = vec![Packet::default(); 32];