diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index 3ad6ac803a..7dfd7be9d0 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -7,9 +7,7 @@ use solana_core::cluster_info::{ClusterInfo, Node}; use solana_core::contact_info::ContactInfo; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; -use std::collections::HashMap; -use std::net::UdpSocket; -use std::sync::Arc; +use std::{collections::HashMap, net::UdpSocket, sync::Arc, time::Instant}; use test::Bencher; #[bench] @@ -36,7 +34,13 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { bencher.iter(move || { let shreds = shreds.clone(); cluster_info - .broadcast_shreds(&socket, shreds, &seeds, Some(stakes.clone())) + .broadcast_shreds( + &socket, + shreds, + &seeds, + Some(stakes.clone()), + &mut Instant::now(), + ) .unwrap(); }); } diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index cd860a09c8..d292764055 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -104,7 +104,7 @@ trait BroadcastRun { blockstore_sender: &Sender>>, ) -> Result<()>; fn transmit( - &self, + &mut self, receiver: &Arc>>, cluster_info: &Arc>, sock: &UdpSocket, @@ -226,7 +226,7 @@ impl BroadcastStage { let socket_receiver = Arc::new(Mutex::new(socket_receiver)); for sock in socks.into_iter() { let socket_receiver = socket_receiver.clone(); - let bs_transmit = broadcast_stage_run.clone(); + let mut bs_transmit = broadcast_stage_run.clone(); let cluster_info = cluster_info.clone(); let t = Builder::new() .name("solana-broadcaster-transmit".to_string()) diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index f7333ef0cd..850ce6b809 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -96,7 +96,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { Ok(()) } fn transmit( - &self, + &mut self, receiver: &Arc>>, cluster_info: &Arc>, sock: &UdpSocket, diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index ed389465af..27a4f4d716 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -72,7 +72,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { Ok(()) } fn transmit( - &self, + &mut self, receiver: &Arc>>, cluster_info: &Arc>, sock: &UdpSocket, @@ -81,10 +81,13 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { let all_seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect(); // Broadcast data let all_shred_bufs: Vec> = shreds.to_vec().into_iter().map(|s| s.payload).collect(); - cluster_info - .write() - .unwrap() - .broadcast_shreds(sock, all_shred_bufs, &all_seeds, stakes)?; + cluster_info.read().unwrap().broadcast_shreds( + sock, + all_shred_bufs, + &all_seeds, + stakes, + &mut Instant::now(), + )?; Ok(()) } fn record( diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 8f82a4d0ce..ca6d7a197e 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -37,6 +37,7 @@ pub(super) struct StandardBroadcastRun { slot_broadcast_start: Option, keypair: Arc, shred_version: u16, + last_datapoint_submit: Instant, } impl StandardBroadcastRun { @@ -48,6 +49,7 @@ impl StandardBroadcastRun { slot_broadcast_start: None, keypair, shred_version, + last_datapoint_submit: Instant::now(), } } @@ -249,7 +251,7 @@ impl StandardBroadcastRun { } fn broadcast( - &self, + &mut self, sock: &UdpSocket, cluster_info: &Arc>, stakes: Option>>, @@ -264,10 +266,13 @@ impl StandardBroadcastRun { let shred_bufs: Vec> = shreds.to_vec().into_iter().map(|s| s.payload).collect(); trace!("Broadcasting {:?} shreds", shred_bufs.len()); - cluster_info - .write() - .unwrap() - .broadcast_shreds(sock, shred_bufs, &seeds, stakes)?; + cluster_info.read().unwrap().broadcast_shreds( + sock, + shred_bufs, + &seeds, + stakes, + &mut self.last_datapoint_submit, + )?; let broadcast_elapsed = broadcast_start.elapsed(); @@ -332,7 +337,7 @@ impl BroadcastRun for StandardBroadcastRun { ) } fn transmit( - &self, + &mut self, receiver: &Arc>>, cluster_info: &Arc>, sock: &UdpSocket, diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 543ccacb5a..3aecf08b34 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -101,7 +101,6 @@ pub struct ClusterInfo { pub(crate) keypair: Arc, /// The network entrypoint entrypoint: Option, - last_datapoint_submit: Instant, } #[derive(Default, Clone)] @@ -209,7 +208,6 @@ impl ClusterInfo { gossip: CrdsGossip::default(), keypair, entrypoint: None, - last_datapoint_submit: Instant::now(), }; let id = contact_info.id; me.gossip.set_self(&id); @@ -949,22 +947,23 @@ impl ClusterInfo { /// broadcast messages from the leader to layer 1 nodes /// # Remarks pub fn broadcast_shreds( - &mut self, + &self, s: &UdpSocket, shreds: Vec>, seeds: &[[u8; 32]], stakes: Option>>, + last_datapoint_submit: &mut Instant, ) -> Result<()> { let (peers, peers_and_stakes) = self.sorted_tvu_peers_and_stakes(stakes); let broadcast_len = peers_and_stakes.len(); if broadcast_len == 0 { - if duration_as_s(&Instant::now().duration_since(self.last_datapoint_submit)) >= 1.0 { + if duration_as_s(&Instant::now().duration_since(*last_datapoint_submit)) >= 1.0 { datapoint_info!( "cluster_info-num_nodes", ("live_count", 1, i64), ("broadcast_count", 1, i64) ); - self.last_datapoint_submit = Instant::now(); + *last_datapoint_submit = Instant::now(); } return Ok(()); } @@ -995,13 +994,13 @@ impl ClusterInfo { num_live_peers += 1; } }); - if duration_as_s(&Instant::now().duration_since(self.last_datapoint_submit)) >= 1.0 { + if duration_as_s(&Instant::now().duration_since(*last_datapoint_submit)) >= 1.0 { datapoint_info!( "cluster_info-num_nodes", ("live_count", num_live_peers, i64), ("broadcast_count", broadcast_len + 1, i64) ); - self.last_datapoint_submit = Instant::now(); + *last_datapoint_submit = Instant::now(); } Ok(()) }