crossbeam_channel::Select::ready_timeout might return with success spuriously.
(cherry picked from commit 7476dfeec0)
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
			
			
This commit is contained in:
		@@ -97,11 +97,8 @@ impl AggregateCommitmentService {
 | 
				
			|||||||
                return Ok(());
 | 
					                return Ok(());
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            let mut aggregation_data = receiver.recv_timeout(Duration::from_secs(1))?;
 | 
					            let aggregation_data = receiver.recv_timeout(Duration::from_secs(1))?;
 | 
				
			||||||
 | 
					            let aggregation_data = receiver.try_iter().last().unwrap_or(aggregation_data);
 | 
				
			||||||
            while let Ok(new_data) = receiver.try_recv() {
 | 
					 | 
				
			||||||
                aggregation_data = new_data;
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            let ancestors = aggregation_data.bank.status_cache_ancestors();
 | 
					            let ancestors = aggregation_data.bank.status_cache_ancestors();
 | 
				
			||||||
            if ancestors.is_empty() {
 | 
					            if ancestors.is_empty() {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -164,12 +164,9 @@ impl LedgerCleanupService {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn receive_new_roots(new_root_receiver: &Receiver<Slot>) -> Result<Slot, RecvTimeoutError> {
 | 
					    fn receive_new_roots(new_root_receiver: &Receiver<Slot>) -> Result<Slot, RecvTimeoutError> {
 | 
				
			||||||
        let mut root = new_root_receiver.recv_timeout(Duration::from_secs(1))?;
 | 
					        let root = new_root_receiver.recv_timeout(Duration::from_secs(1))?;
 | 
				
			||||||
        // Get the newest root
 | 
					        // Get the newest root
 | 
				
			||||||
        while let Ok(new_root) = new_root_receiver.try_recv() {
 | 
					        Ok(new_root_receiver.try_iter().last().unwrap_or(root))
 | 
				
			||||||
            root = new_root;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        Ok(root)
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub fn cleanup_ledger(
 | 
					    pub fn cleanup_ledger(
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,6 +1,5 @@
 | 
				
			|||||||
use {
 | 
					use {
 | 
				
			||||||
    crate::{cluster_info_vote_listener::VerifiedLabelVotePacketsReceiver, result::Result},
 | 
					    crate::{cluster_info_vote_listener::VerifiedLabelVotePacketsReceiver, result::Result},
 | 
				
			||||||
    crossbeam_channel::Select,
 | 
					 | 
				
			||||||
    solana_perf::packet::PacketBatch,
 | 
					    solana_perf::packet::PacketBatch,
 | 
				
			||||||
    solana_runtime::bank::Bank,
 | 
					    solana_runtime::bank::Bank,
 | 
				
			||||||
    solana_sdk::{
 | 
					    solana_sdk::{
 | 
				
			||||||
@@ -141,10 +140,10 @@ impl VerifiedVotePackets {
 | 
				
			|||||||
        vote_packets_receiver: &VerifiedLabelVotePacketsReceiver,
 | 
					        vote_packets_receiver: &VerifiedLabelVotePacketsReceiver,
 | 
				
			||||||
        would_be_leader: bool,
 | 
					        would_be_leader: bool,
 | 
				
			||||||
    ) -> Result<()> {
 | 
					    ) -> Result<()> {
 | 
				
			||||||
        let mut sel = Select::new();
 | 
					        const RECV_TIMEOUT: Duration = Duration::from_millis(200);
 | 
				
			||||||
        sel.recv(vote_packets_receiver);
 | 
					        let vote_packets = vote_packets_receiver.recv_timeout(RECV_TIMEOUT)?;
 | 
				
			||||||
        let _ = sel.ready_timeout(Duration::from_millis(200))?;
 | 
					        let vote_packets = std::iter::once(vote_packets).chain(vote_packets_receiver.try_iter());
 | 
				
			||||||
        for gossip_votes in vote_packets_receiver.try_iter() {
 | 
					        for gossip_votes in vote_packets {
 | 
				
			||||||
            if would_be_leader {
 | 
					            if would_be_leader {
 | 
				
			||||||
                for verfied_vote_metadata in gossip_votes {
 | 
					                for verfied_vote_metadata in gossip_votes {
 | 
				
			||||||
                    let VerifiedVoteMetadata {
 | 
					                    let VerifiedVoteMetadata {
 | 
				
			||||||
@@ -283,7 +282,7 @@ mod tests {
 | 
				
			|||||||
        // No new messages, should time out
 | 
					        // No new messages, should time out
 | 
				
			||||||
        assert_matches!(
 | 
					        assert_matches!(
 | 
				
			||||||
            verified_vote_packets.receive_and_process_vote_packets(&r, true),
 | 
					            verified_vote_packets.receive_and_process_vote_packets(&r, true),
 | 
				
			||||||
            Err(Error::ReadyTimeout)
 | 
					            Err(Error::CrossbeamRecvTimeout(_))
 | 
				
			||||||
        );
 | 
					        );
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -233,14 +233,10 @@ fn run_check_duplicate(
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        Ok(())
 | 
					        Ok(())
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
    let timer = Duration::from_millis(200);
 | 
					    const RECV_TIMEOUT: Duration = Duration::from_millis(200);
 | 
				
			||||||
    let shred = shred_receiver.recv_timeout(timer)?;
 | 
					    std::iter::once(shred_receiver.recv_timeout(RECV_TIMEOUT)?)
 | 
				
			||||||
    check_duplicate(shred)?;
 | 
					        .chain(shred_receiver.try_iter())
 | 
				
			||||||
    while let Ok(shred) = shred_receiver.try_recv() {
 | 
					        .try_for_each(check_duplicate)
 | 
				
			||||||
        check_duplicate(shred)?;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    Ok(())
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
fn verify_repair(
 | 
					fn verify_repair(
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user