@@ -7,9 +7,7 @@ use solana_core::cluster_info::{ClusterInfo, Node};
 | 
				
			|||||||
use solana_core::contact_info::ContactInfo;
 | 
					use solana_core::contact_info::ContactInfo;
 | 
				
			||||||
use solana_sdk::pubkey::Pubkey;
 | 
					use solana_sdk::pubkey::Pubkey;
 | 
				
			||||||
use solana_sdk::timing::timestamp;
 | 
					use solana_sdk::timing::timestamp;
 | 
				
			||||||
use std::collections::HashMap;
 | 
					use std::{collections::HashMap, net::UdpSocket, sync::Arc, time::Instant};
 | 
				
			||||||
use std::net::UdpSocket;
 | 
					 | 
				
			||||||
use std::sync::Arc;
 | 
					 | 
				
			||||||
use test::Bencher;
 | 
					use test::Bencher;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[bench]
 | 
					#[bench]
 | 
				
			||||||
@@ -36,7 +34,13 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
 | 
				
			|||||||
    bencher.iter(move || {
 | 
					    bencher.iter(move || {
 | 
				
			||||||
        let shreds = shreds.clone();
 | 
					        let shreds = shreds.clone();
 | 
				
			||||||
        cluster_info
 | 
					        cluster_info
 | 
				
			||||||
            .broadcast_shreds(&socket, shreds, &seeds, Some(stakes.clone()))
 | 
					            .broadcast_shreds(
 | 
				
			||||||
 | 
					                &socket,
 | 
				
			||||||
 | 
					                shreds,
 | 
				
			||||||
 | 
					                &seeds,
 | 
				
			||||||
 | 
					                Some(stakes.clone()),
 | 
				
			||||||
 | 
					                &mut Instant::now(),
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
            .unwrap();
 | 
					            .unwrap();
 | 
				
			||||||
    });
 | 
					    });
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -104,7 +104,7 @@ trait BroadcastRun {
 | 
				
			|||||||
        blockstore_sender: &Sender<Arc<Vec<Shred>>>,
 | 
					        blockstore_sender: &Sender<Arc<Vec<Shred>>>,
 | 
				
			||||||
    ) -> Result<()>;
 | 
					    ) -> Result<()>;
 | 
				
			||||||
    fn transmit(
 | 
					    fn transmit(
 | 
				
			||||||
        &self,
 | 
					        &mut self,
 | 
				
			||||||
        receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
 | 
					        receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
 | 
				
			||||||
        cluster_info: &Arc<RwLock<ClusterInfo>>,
 | 
					        cluster_info: &Arc<RwLock<ClusterInfo>>,
 | 
				
			||||||
        sock: &UdpSocket,
 | 
					        sock: &UdpSocket,
 | 
				
			||||||
@@ -226,7 +226,7 @@ impl BroadcastStage {
 | 
				
			|||||||
        let socket_receiver = Arc::new(Mutex::new(socket_receiver));
 | 
					        let socket_receiver = Arc::new(Mutex::new(socket_receiver));
 | 
				
			||||||
        for sock in socks.into_iter() {
 | 
					        for sock in socks.into_iter() {
 | 
				
			||||||
            let socket_receiver = socket_receiver.clone();
 | 
					            let socket_receiver = socket_receiver.clone();
 | 
				
			||||||
            let bs_transmit = broadcast_stage_run.clone();
 | 
					            let mut bs_transmit = broadcast_stage_run.clone();
 | 
				
			||||||
            let cluster_info = cluster_info.clone();
 | 
					            let cluster_info = cluster_info.clone();
 | 
				
			||||||
            let t = Builder::new()
 | 
					            let t = Builder::new()
 | 
				
			||||||
                .name("solana-broadcaster-transmit".to_string())
 | 
					                .name("solana-broadcaster-transmit".to_string())
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -96,7 +96,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
 | 
				
			|||||||
        Ok(())
 | 
					        Ok(())
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    fn transmit(
 | 
					    fn transmit(
 | 
				
			||||||
        &self,
 | 
					        &mut self,
 | 
				
			||||||
        receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
 | 
					        receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
 | 
				
			||||||
        cluster_info: &Arc<RwLock<ClusterInfo>>,
 | 
					        cluster_info: &Arc<RwLock<ClusterInfo>>,
 | 
				
			||||||
        sock: &UdpSocket,
 | 
					        sock: &UdpSocket,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -72,7 +72,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
 | 
				
			|||||||
        Ok(())
 | 
					        Ok(())
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    fn transmit(
 | 
					    fn transmit(
 | 
				
			||||||
        &self,
 | 
					        &mut self,
 | 
				
			||||||
        receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
 | 
					        receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
 | 
				
			||||||
        cluster_info: &Arc<RwLock<ClusterInfo>>,
 | 
					        cluster_info: &Arc<RwLock<ClusterInfo>>,
 | 
				
			||||||
        sock: &UdpSocket,
 | 
					        sock: &UdpSocket,
 | 
				
			||||||
@@ -81,10 +81,13 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
 | 
				
			|||||||
        let all_seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect();
 | 
					        let all_seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect();
 | 
				
			||||||
        // Broadcast data
 | 
					        // Broadcast data
 | 
				
			||||||
        let all_shred_bufs: Vec<Vec<u8>> = shreds.to_vec().into_iter().map(|s| s.payload).collect();
 | 
					        let all_shred_bufs: Vec<Vec<u8>> = shreds.to_vec().into_iter().map(|s| s.payload).collect();
 | 
				
			||||||
        cluster_info
 | 
					        cluster_info.read().unwrap().broadcast_shreds(
 | 
				
			||||||
            .write()
 | 
					            sock,
 | 
				
			||||||
            .unwrap()
 | 
					            all_shred_bufs,
 | 
				
			||||||
            .broadcast_shreds(sock, all_shred_bufs, &all_seeds, stakes)?;
 | 
					            &all_seeds,
 | 
				
			||||||
 | 
					            stakes,
 | 
				
			||||||
 | 
					            &mut Instant::now(),
 | 
				
			||||||
 | 
					        )?;
 | 
				
			||||||
        Ok(())
 | 
					        Ok(())
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    fn record(
 | 
					    fn record(
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -37,6 +37,7 @@ pub(super) struct StandardBroadcastRun {
 | 
				
			|||||||
    slot_broadcast_start: Option<Instant>,
 | 
					    slot_broadcast_start: Option<Instant>,
 | 
				
			||||||
    keypair: Arc<Keypair>,
 | 
					    keypair: Arc<Keypair>,
 | 
				
			||||||
    shred_version: u16,
 | 
					    shred_version: u16,
 | 
				
			||||||
 | 
					    last_datapoint_submit: Instant,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl StandardBroadcastRun {
 | 
					impl StandardBroadcastRun {
 | 
				
			||||||
@@ -48,6 +49,7 @@ impl StandardBroadcastRun {
 | 
				
			|||||||
            slot_broadcast_start: None,
 | 
					            slot_broadcast_start: None,
 | 
				
			||||||
            keypair,
 | 
					            keypair,
 | 
				
			||||||
            shred_version,
 | 
					            shred_version,
 | 
				
			||||||
 | 
					            last_datapoint_submit: Instant::now(),
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -249,7 +251,7 @@ impl StandardBroadcastRun {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn broadcast(
 | 
					    fn broadcast(
 | 
				
			||||||
        &self,
 | 
					        &mut self,
 | 
				
			||||||
        sock: &UdpSocket,
 | 
					        sock: &UdpSocket,
 | 
				
			||||||
        cluster_info: &Arc<RwLock<ClusterInfo>>,
 | 
					        cluster_info: &Arc<RwLock<ClusterInfo>>,
 | 
				
			||||||
        stakes: Option<Arc<HashMap<Pubkey, u64>>>,
 | 
					        stakes: Option<Arc<HashMap<Pubkey, u64>>>,
 | 
				
			||||||
@@ -264,10 +266,13 @@ impl StandardBroadcastRun {
 | 
				
			|||||||
        let shred_bufs: Vec<Vec<u8>> = shreds.to_vec().into_iter().map(|s| s.payload).collect();
 | 
					        let shred_bufs: Vec<Vec<u8>> = shreds.to_vec().into_iter().map(|s| s.payload).collect();
 | 
				
			||||||
        trace!("Broadcasting {:?} shreds", shred_bufs.len());
 | 
					        trace!("Broadcasting {:?} shreds", shred_bufs.len());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        cluster_info
 | 
					        cluster_info.read().unwrap().broadcast_shreds(
 | 
				
			||||||
            .write()
 | 
					            sock,
 | 
				
			||||||
            .unwrap()
 | 
					            shred_bufs,
 | 
				
			||||||
            .broadcast_shreds(sock, shred_bufs, &seeds, stakes)?;
 | 
					            &seeds,
 | 
				
			||||||
 | 
					            stakes,
 | 
				
			||||||
 | 
					            &mut self.last_datapoint_submit,
 | 
				
			||||||
 | 
					        )?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let broadcast_elapsed = broadcast_start.elapsed();
 | 
					        let broadcast_elapsed = broadcast_start.elapsed();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -332,7 +337,7 @@ impl BroadcastRun for StandardBroadcastRun {
 | 
				
			|||||||
        )
 | 
					        )
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    fn transmit(
 | 
					    fn transmit(
 | 
				
			||||||
        &self,
 | 
					        &mut self,
 | 
				
			||||||
        receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
 | 
					        receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
 | 
				
			||||||
        cluster_info: &Arc<RwLock<ClusterInfo>>,
 | 
					        cluster_info: &Arc<RwLock<ClusterInfo>>,
 | 
				
			||||||
        sock: &UdpSocket,
 | 
					        sock: &UdpSocket,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -101,7 +101,6 @@ pub struct ClusterInfo {
 | 
				
			|||||||
    pub(crate) keypair: Arc<Keypair>,
 | 
					    pub(crate) keypair: Arc<Keypair>,
 | 
				
			||||||
    /// The network entrypoint
 | 
					    /// The network entrypoint
 | 
				
			||||||
    entrypoint: Option<ContactInfo>,
 | 
					    entrypoint: Option<ContactInfo>,
 | 
				
			||||||
    last_datapoint_submit: Instant,
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[derive(Default, Clone)]
 | 
					#[derive(Default, Clone)]
 | 
				
			||||||
@@ -209,7 +208,6 @@ impl ClusterInfo {
 | 
				
			|||||||
            gossip: CrdsGossip::default(),
 | 
					            gossip: CrdsGossip::default(),
 | 
				
			||||||
            keypair,
 | 
					            keypair,
 | 
				
			||||||
            entrypoint: None,
 | 
					            entrypoint: None,
 | 
				
			||||||
            last_datapoint_submit: Instant::now(),
 | 
					 | 
				
			||||||
        };
 | 
					        };
 | 
				
			||||||
        let id = contact_info.id;
 | 
					        let id = contact_info.id;
 | 
				
			||||||
        me.gossip.set_self(&id);
 | 
					        me.gossip.set_self(&id);
 | 
				
			||||||
@@ -949,22 +947,23 @@ impl ClusterInfo {
 | 
				
			|||||||
    /// broadcast messages from the leader to layer 1 nodes
 | 
					    /// broadcast messages from the leader to layer 1 nodes
 | 
				
			||||||
    /// # Remarks
 | 
					    /// # Remarks
 | 
				
			||||||
    pub fn broadcast_shreds(
 | 
					    pub fn broadcast_shreds(
 | 
				
			||||||
        &mut self,
 | 
					        &self,
 | 
				
			||||||
        s: &UdpSocket,
 | 
					        s: &UdpSocket,
 | 
				
			||||||
        shreds: Vec<Vec<u8>>,
 | 
					        shreds: Vec<Vec<u8>>,
 | 
				
			||||||
        seeds: &[[u8; 32]],
 | 
					        seeds: &[[u8; 32]],
 | 
				
			||||||
        stakes: Option<Arc<HashMap<Pubkey, u64>>>,
 | 
					        stakes: Option<Arc<HashMap<Pubkey, u64>>>,
 | 
				
			||||||
 | 
					        last_datapoint_submit: &mut Instant,
 | 
				
			||||||
    ) -> Result<()> {
 | 
					    ) -> Result<()> {
 | 
				
			||||||
        let (peers, peers_and_stakes) = self.sorted_tvu_peers_and_stakes(stakes);
 | 
					        let (peers, peers_and_stakes) = self.sorted_tvu_peers_and_stakes(stakes);
 | 
				
			||||||
        let broadcast_len = peers_and_stakes.len();
 | 
					        let broadcast_len = peers_and_stakes.len();
 | 
				
			||||||
        if broadcast_len == 0 {
 | 
					        if broadcast_len == 0 {
 | 
				
			||||||
            if duration_as_s(&Instant::now().duration_since(self.last_datapoint_submit)) >= 1.0 {
 | 
					            if duration_as_s(&Instant::now().duration_since(*last_datapoint_submit)) >= 1.0 {
 | 
				
			||||||
                datapoint_info!(
 | 
					                datapoint_info!(
 | 
				
			||||||
                    "cluster_info-num_nodes",
 | 
					                    "cluster_info-num_nodes",
 | 
				
			||||||
                    ("live_count", 1, i64),
 | 
					                    ("live_count", 1, i64),
 | 
				
			||||||
                    ("broadcast_count", 1, i64)
 | 
					                    ("broadcast_count", 1, i64)
 | 
				
			||||||
                );
 | 
					                );
 | 
				
			||||||
                self.last_datapoint_submit = Instant::now();
 | 
					                *last_datapoint_submit = Instant::now();
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            return Ok(());
 | 
					            return Ok(());
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
@@ -995,13 +994,13 @@ impl ClusterInfo {
 | 
				
			|||||||
                num_live_peers += 1;
 | 
					                num_live_peers += 1;
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        });
 | 
					        });
 | 
				
			||||||
        if duration_as_s(&Instant::now().duration_since(self.last_datapoint_submit)) >= 1.0 {
 | 
					        if duration_as_s(&Instant::now().duration_since(*last_datapoint_submit)) >= 1.0 {
 | 
				
			||||||
            datapoint_info!(
 | 
					            datapoint_info!(
 | 
				
			||||||
                "cluster_info-num_nodes",
 | 
					                "cluster_info-num_nodes",
 | 
				
			||||||
                ("live_count", num_live_peers, i64),
 | 
					                ("live_count", num_live_peers, i64),
 | 
				
			||||||
                ("broadcast_count", broadcast_len + 1, i64)
 | 
					                ("broadcast_count", broadcast_len + 1, i64)
 | 
				
			||||||
            );
 | 
					            );
 | 
				
			||||||
            self.last_datapoint_submit = Instant::now();
 | 
					            *last_datapoint_submit = Instant::now();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        Ok(())
 | 
					        Ok(())
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user