Process votes from gossip only in leader node (#3707)
This commit is contained in:
		@@ -1,4 +1,5 @@
 | 
				
			|||||||
use crate::cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS};
 | 
					use crate::cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS};
 | 
				
			||||||
 | 
					use crate::poh_recorder::PohRecorder;
 | 
				
			||||||
use crate::result::Result;
 | 
					use crate::result::Result;
 | 
				
			||||||
use crate::service::Service;
 | 
					use crate::service::Service;
 | 
				
			||||||
use crate::sigverify_stage::VerifiedPackets;
 | 
					use crate::sigverify_stage::VerifiedPackets;
 | 
				
			||||||
@@ -6,7 +7,7 @@ use crate::{packet, sigverify};
 | 
				
			|||||||
use solana_metrics::counter::Counter;
 | 
					use solana_metrics::counter::Counter;
 | 
				
			||||||
use std::sync::atomic::{AtomicBool, Ordering};
 | 
					use std::sync::atomic::{AtomicBool, Ordering};
 | 
				
			||||||
use std::sync::mpsc::Sender;
 | 
					use std::sync::mpsc::Sender;
 | 
				
			||||||
use std::sync::{Arc, RwLock};
 | 
					use std::sync::{Arc, Mutex, RwLock};
 | 
				
			||||||
use std::thread::{self, sleep, Builder, JoinHandle};
 | 
					use std::thread::{self, sleep, Builder, JoinHandle};
 | 
				
			||||||
use std::time::Duration;
 | 
					use std::time::Duration;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -20,12 +21,20 @@ impl ClusterInfoVoteListener {
 | 
				
			|||||||
        cluster_info: Arc<RwLock<ClusterInfo>>,
 | 
					        cluster_info: Arc<RwLock<ClusterInfo>>,
 | 
				
			||||||
        sigverify_disabled: bool,
 | 
					        sigverify_disabled: bool,
 | 
				
			||||||
        sender: Sender<VerifiedPackets>,
 | 
					        sender: Sender<VerifiedPackets>,
 | 
				
			||||||
 | 
					        poh_recorder: &Arc<Mutex<PohRecorder>>,
 | 
				
			||||||
    ) -> Self {
 | 
					    ) -> Self {
 | 
				
			||||||
        let exit = exit.clone();
 | 
					        let exit = exit.clone();
 | 
				
			||||||
 | 
					        let poh_recorder = poh_recorder.clone();
 | 
				
			||||||
        let thread = Builder::new()
 | 
					        let thread = Builder::new()
 | 
				
			||||||
            .name("solana-cluster_info_vote_listener".to_string())
 | 
					            .name("solana-cluster_info_vote_listener".to_string())
 | 
				
			||||||
            .spawn(move || {
 | 
					            .spawn(move || {
 | 
				
			||||||
                let _ = Self::recv_loop(exit, &cluster_info, sigverify_disabled, &sender);
 | 
					                let _ = Self::recv_loop(
 | 
				
			||||||
 | 
					                    exit,
 | 
				
			||||||
 | 
					                    &cluster_info,
 | 
				
			||||||
 | 
					                    sigverify_disabled,
 | 
				
			||||||
 | 
					                    &sender,
 | 
				
			||||||
 | 
					                    poh_recorder,
 | 
				
			||||||
 | 
					                );
 | 
				
			||||||
            })
 | 
					            })
 | 
				
			||||||
            .unwrap();
 | 
					            .unwrap();
 | 
				
			||||||
        Self {
 | 
					        Self {
 | 
				
			||||||
@@ -37,6 +46,7 @@ impl ClusterInfoVoteListener {
 | 
				
			|||||||
        cluster_info: &Arc<RwLock<ClusterInfo>>,
 | 
					        cluster_info: &Arc<RwLock<ClusterInfo>>,
 | 
				
			||||||
        sigverify_disabled: bool,
 | 
					        sigverify_disabled: bool,
 | 
				
			||||||
        sender: &Sender<VerifiedPackets>,
 | 
					        sender: &Sender<VerifiedPackets>,
 | 
				
			||||||
 | 
					        poh_recorder: Arc<Mutex<PohRecorder>>,
 | 
				
			||||||
    ) -> Result<()> {
 | 
					    ) -> Result<()> {
 | 
				
			||||||
        let mut last_ts = 0;
 | 
					        let mut last_ts = 0;
 | 
				
			||||||
        loop {
 | 
					        loop {
 | 
				
			||||||
@@ -44,6 +54,7 @@ impl ClusterInfoVoteListener {
 | 
				
			|||||||
                return Ok(());
 | 
					                return Ok(());
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            let (votes, new_ts) = cluster_info.read().unwrap().get_votes(last_ts);
 | 
					            let (votes, new_ts) = cluster_info.read().unwrap().get_votes(last_ts);
 | 
				
			||||||
 | 
					            if poh_recorder.lock().unwrap().bank().is_some() {
 | 
				
			||||||
                last_ts = new_ts;
 | 
					                last_ts = new_ts;
 | 
				
			||||||
                inc_new_counter_info!("cluster_info_vote_listener-recv_count", votes.len());
 | 
					                inc_new_counter_info!("cluster_info_vote_listener-recv_count", votes.len());
 | 
				
			||||||
                let msgs = packet::to_packets(&votes);
 | 
					                let msgs = packet::to_packets(&votes);
 | 
				
			||||||
@@ -53,6 +64,7 @@ impl ClusterInfoVoteListener {
 | 
				
			|||||||
                    sigverify::ed25519_verify_cpu(&msgs)
 | 
					                    sigverify::ed25519_verify_cpu(&msgs)
 | 
				
			||||||
                };
 | 
					                };
 | 
				
			||||||
                sender.send(msgs.into_iter().zip(r).collect())?;
 | 
					                sender.send(msgs.into_iter().zip(r).collect())?;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
            sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
 | 
					            sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -60,6 +60,7 @@ impl Tpu {
 | 
				
			|||||||
            cluster_info.clone(),
 | 
					            cluster_info.clone(),
 | 
				
			||||||
            sigverify_disabled,
 | 
					            sigverify_disabled,
 | 
				
			||||||
            verified_sender,
 | 
					            verified_sender,
 | 
				
			||||||
 | 
					            &poh_recorder,
 | 
				
			||||||
        );
 | 
					        );
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let banking_stage = BankingStage::new(&cluster_info, poh_recorder, verified_receiver);
 | 
					        let banking_stage = BankingStage::new(&cluster_info, poh_recorder, verified_receiver);
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user