* checks for authorized voter early on in the vote-listener pipeline (#22169)
Before votes are verified that they are signed by the authorized voter,
they might be dropped in verified-vote-packets code. If there are
enough many spam votes from unauthorized voters, this may potentially
drop valid votes but keep the false ones.
https://github.com/solana-labs/solana/blob/57986f982/core/src/verified_vote_packets.rs#L165-L168
(cherry picked from commit c0c6038654)
# Conflicts:
#	core/src/cluster_info_vote_listener.rs
* removes backport merge conflicts
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
			
			
This commit is contained in:
		@@ -13,7 +13,6 @@ use {
 | 
			
		||||
        unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Select,
 | 
			
		||||
        Sender as CrossbeamSender,
 | 
			
		||||
    },
 | 
			
		||||
    itertools::izip,
 | 
			
		||||
    log::*,
 | 
			
		||||
    solana_gossip::{
 | 
			
		||||
        cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS},
 | 
			
		||||
@@ -294,10 +293,10 @@ pub struct ClusterInfoVoteListener {
 | 
			
		||||
impl ClusterInfoVoteListener {
 | 
			
		||||
    #[allow(clippy::too_many_arguments)]
 | 
			
		||||
    pub fn new(
 | 
			
		||||
        exit: &Arc<AtomicBool>,
 | 
			
		||||
        exit: Arc<AtomicBool>,
 | 
			
		||||
        cluster_info: Arc<ClusterInfo>,
 | 
			
		||||
        verified_packets_sender: CrossbeamSender<Vec<PacketBatch>>,
 | 
			
		||||
        poh_recorder: &Arc<Mutex<PohRecorder>>,
 | 
			
		||||
        poh_recorder: Arc<Mutex<PohRecorder>>,
 | 
			
		||||
        vote_tracker: Arc<VoteTracker>,
 | 
			
		||||
        bank_forks: Arc<RwLock<BankForks>>,
 | 
			
		||||
        subscriptions: Arc<RpcSubscriptions>,
 | 
			
		||||
@@ -308,25 +307,26 @@ impl ClusterInfoVoteListener {
 | 
			
		||||
        bank_notification_sender: Option<BankNotificationSender>,
 | 
			
		||||
        cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender,
 | 
			
		||||
    ) -> Self {
 | 
			
		||||
        let exit_ = exit.clone();
 | 
			
		||||
 | 
			
		||||
        let (verified_vote_label_packets_sender, verified_vote_label_packets_receiver) =
 | 
			
		||||
            unbounded();
 | 
			
		||||
        let (verified_vote_transactions_sender, verified_vote_transactions_receiver) = unbounded();
 | 
			
		||||
        let listen_thread = Builder::new()
 | 
			
		||||
            .name("solana-cluster_info_vote_listener".to_string())
 | 
			
		||||
            .spawn(move || {
 | 
			
		||||
                let _ = Self::recv_loop(
 | 
			
		||||
                    exit_,
 | 
			
		||||
                    &cluster_info,
 | 
			
		||||
                    verified_vote_label_packets_sender,
 | 
			
		||||
                    verified_vote_transactions_sender,
 | 
			
		||||
                );
 | 
			
		||||
            })
 | 
			
		||||
            .unwrap();
 | 
			
		||||
 | 
			
		||||
        let listen_thread = {
 | 
			
		||||
            let exit = exit.clone();
 | 
			
		||||
            let bank_forks = bank_forks.clone();
 | 
			
		||||
            Builder::new()
 | 
			
		||||
                .name("solana-cluster_info_vote_listener".to_string())
 | 
			
		||||
                .spawn(move || {
 | 
			
		||||
                    let _ = Self::recv_loop(
 | 
			
		||||
                        exit,
 | 
			
		||||
                        &cluster_info,
 | 
			
		||||
                        &bank_forks,
 | 
			
		||||
                        verified_vote_label_packets_sender,
 | 
			
		||||
                        verified_vote_transactions_sender,
 | 
			
		||||
                    );
 | 
			
		||||
                })
 | 
			
		||||
                .unwrap()
 | 
			
		||||
        };
 | 
			
		||||
        let exit_ = exit.clone();
 | 
			
		||||
        let poh_recorder = poh_recorder.clone();
 | 
			
		||||
        let bank_send_thread = Builder::new()
 | 
			
		||||
            .name("solana-cluster_info_bank_send".to_string())
 | 
			
		||||
            .spawn(move || {
 | 
			
		||||
@@ -339,12 +339,11 @@ impl ClusterInfoVoteListener {
 | 
			
		||||
            })
 | 
			
		||||
            .unwrap();
 | 
			
		||||
 | 
			
		||||
        let exit_ = exit.clone();
 | 
			
		||||
        let send_thread = Builder::new()
 | 
			
		||||
            .name("solana-cluster_info_process_votes".to_string())
 | 
			
		||||
            .spawn(move || {
 | 
			
		||||
                let _ = Self::process_votes_loop(
 | 
			
		||||
                    exit_,
 | 
			
		||||
                    exit,
 | 
			
		||||
                    verified_vote_transactions_receiver,
 | 
			
		||||
                    vote_tracker,
 | 
			
		||||
                    bank_forks,
 | 
			
		||||
@@ -364,16 +363,14 @@ impl ClusterInfoVoteListener {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn join(self) -> thread::Result<()> {
 | 
			
		||||
        for thread_hdl in self.thread_hdls {
 | 
			
		||||
            thread_hdl.join()?;
 | 
			
		||||
        }
 | 
			
		||||
        Ok(())
 | 
			
		||||
    pub(crate) fn join(self) -> thread::Result<()> {
 | 
			
		||||
        self.thread_hdls.into_iter().try_for_each(JoinHandle::join)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn recv_loop(
 | 
			
		||||
        exit: Arc<AtomicBool>,
 | 
			
		||||
        cluster_info: &ClusterInfo,
 | 
			
		||||
        bank_forks: &RwLock<BankForks>,
 | 
			
		||||
        verified_vote_label_packets_sender: VerifiedLabelVotePacketsSender,
 | 
			
		||||
        verified_vote_transactions_sender: VerifiedVoteTransactionsSender,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
@@ -382,7 +379,7 @@ impl ClusterInfoVoteListener {
 | 
			
		||||
            let votes = cluster_info.get_votes(&mut cursor);
 | 
			
		||||
            inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len());
 | 
			
		||||
            if !votes.is_empty() {
 | 
			
		||||
                let (vote_txs, packets) = Self::verify_votes(votes);
 | 
			
		||||
                let (vote_txs, packets) = Self::verify_votes(votes, bank_forks);
 | 
			
		||||
                verified_vote_transactions_sender.send(vote_txs)?;
 | 
			
		||||
                verified_vote_label_packets_sender.send(packets)?;
 | 
			
		||||
            }
 | 
			
		||||
@@ -392,43 +389,45 @@ impl ClusterInfoVoteListener {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[allow(clippy::type_complexity)]
 | 
			
		||||
    fn verify_votes(votes: Vec<Transaction>) -> (Vec<Transaction>, Vec<VerifiedVoteMetadata>) {
 | 
			
		||||
    fn verify_votes(
 | 
			
		||||
        votes: Vec<Transaction>,
 | 
			
		||||
        bank_forks: &RwLock<BankForks>,
 | 
			
		||||
    ) -> (Vec<Transaction>, Vec<VerifiedVoteMetadata>) {
 | 
			
		||||
        let mut packet_batches = packet::to_packet_batches(&votes, 1);
 | 
			
		||||
 | 
			
		||||
        // Votes should already be filtered by this point.
 | 
			
		||||
        let reject_non_vote = false;
 | 
			
		||||
        sigverify::ed25519_verify_cpu(&mut packet_batches, reject_non_vote);
 | 
			
		||||
 | 
			
		||||
        let (vote_txs, vote_metadata) = izip!(votes.into_iter(), packet_batches)
 | 
			
		||||
            .filter_map(|(vote_tx, packet_batch)| {
 | 
			
		||||
                let (vote, vote_account_key) = vote_transaction::parse_vote_transaction(&vote_tx)
 | 
			
		||||
                    .and_then(|(vote_account_key, vote, _)| {
 | 
			
		||||
                    if vote.slots.is_empty() {
 | 
			
		||||
                        None
 | 
			
		||||
                    } else {
 | 
			
		||||
                        Some((vote, vote_account_key))
 | 
			
		||||
                    }
 | 
			
		||||
                })?;
 | 
			
		||||
 | 
			
		||||
        sigverify::ed25519_verify_cpu(&mut packet_batches, /*reject_non_vote=*/ false);
 | 
			
		||||
        let root_bank = bank_forks.read().unwrap().root_bank();
 | 
			
		||||
        let epoch_schedule = root_bank.epoch_schedule();
 | 
			
		||||
        votes
 | 
			
		||||
            .into_iter()
 | 
			
		||||
            .zip(packet_batches)
 | 
			
		||||
            .filter(|(_, packet_batch)| {
 | 
			
		||||
                // to_packet_batches() above splits into 1 packet long batches
 | 
			
		||||
                assert_eq!(packet_batch.packets.len(), 1);
 | 
			
		||||
                if !packet_batch.packets[0].meta.discard {
 | 
			
		||||
                    if let Some(signature) = vote_tx.signatures.first().cloned() {
 | 
			
		||||
                        return Some((
 | 
			
		||||
                            vote_tx,
 | 
			
		||||
                            VerifiedVoteMetadata {
 | 
			
		||||
                                vote_account_key,
 | 
			
		||||
                                vote,
 | 
			
		||||
                                packet_batch,
 | 
			
		||||
                                signature,
 | 
			
		||||
                            },
 | 
			
		||||
                        ));
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                None
 | 
			
		||||
                !packet_batch.packets[0].meta.discard
 | 
			
		||||
            })
 | 
			
		||||
            .unzip();
 | 
			
		||||
        (vote_txs, vote_metadata)
 | 
			
		||||
            .filter_map(|(tx, packet_batch)| {
 | 
			
		||||
                let (vote_account_key, vote, _) = vote_transaction::parse_vote_transaction(&tx)?;
 | 
			
		||||
                let slot = vote.last_voted_slot()?;
 | 
			
		||||
                let epoch = epoch_schedule.get_epoch(slot);
 | 
			
		||||
                let authorized_voter = root_bank
 | 
			
		||||
                    .epoch_stakes(epoch)?
 | 
			
		||||
                    .epoch_authorized_voters()
 | 
			
		||||
                    .get(&vote_account_key)?;
 | 
			
		||||
                let mut keys = tx.message.account_keys.iter().enumerate();
 | 
			
		||||
                if !keys.any(|(i, key)| tx.message.is_signer(i) && key == authorized_voter) {
 | 
			
		||||
                    return None;
 | 
			
		||||
                }
 | 
			
		||||
                let verified_vote_metadata = VerifiedVoteMetadata {
 | 
			
		||||
                    vote_account_key,
 | 
			
		||||
                    vote,
 | 
			
		||||
                    packet_batch,
 | 
			
		||||
                    signature: *tx.signatures.first()?,
 | 
			
		||||
                };
 | 
			
		||||
                Some((tx, verified_vote_metadata))
 | 
			
		||||
            })
 | 
			
		||||
            .unzip()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn bank_send_loop(
 | 
			
		||||
@@ -555,7 +554,7 @@ impl ClusterInfoVoteListener {
 | 
			
		||||
                return Ok(());
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            let root_bank = bank_forks.read().unwrap().root_bank().clone();
 | 
			
		||||
            let root_bank = bank_forks.read().unwrap().root_bank();
 | 
			
		||||
            if last_process_root.elapsed().as_millis() > DEFAULT_MS_PER_SLOT as u128 {
 | 
			
		||||
                let unrooted_optimistic_slots = confirmation_verifier
 | 
			
		||||
                    .verify_for_unrooted_optimistic_slots(&root_bank, &blockstore);
 | 
			
		||||
@@ -962,6 +961,7 @@ mod tests {
 | 
			
		||||
        solana_vote_program::vote_state::Vote,
 | 
			
		||||
        std::{
 | 
			
		||||
            collections::BTreeSet,
 | 
			
		||||
            iter::repeat_with,
 | 
			
		||||
            sync::{atomic::AtomicU64, Arc},
 | 
			
		||||
        },
 | 
			
		||||
    };
 | 
			
		||||
@@ -1813,8 +1813,11 @@ mod tests {
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_verify_votes_empty() {
 | 
			
		||||
        solana_logger::setup();
 | 
			
		||||
        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
 | 
			
		||||
        let bank = Bank::new_for_tests(&genesis_config);
 | 
			
		||||
        let bank_forks = RwLock::new(BankForks::new(bank));
 | 
			
		||||
        let votes = vec![];
 | 
			
		||||
        let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes);
 | 
			
		||||
        let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, &bank_forks);
 | 
			
		||||
        assert!(vote_txs.is_empty());
 | 
			
		||||
        assert!(packets.is_empty());
 | 
			
		||||
    }
 | 
			
		||||
@@ -1827,25 +1830,40 @@ mod tests {
 | 
			
		||||
        assert_eq!(num_packets, ref_value);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn test_vote_tx(hash: Option<Hash>) -> Transaction {
 | 
			
		||||
        let node_keypair = Keypair::new();
 | 
			
		||||
        let vote_keypair = Keypair::new();
 | 
			
		||||
        let auth_voter_keypair = Keypair::new();
 | 
			
		||||
    fn test_vote_tx(
 | 
			
		||||
        validator_vote_keypairs: Option<&ValidatorVoteKeypairs>,
 | 
			
		||||
        hash: Option<Hash>,
 | 
			
		||||
    ) -> Transaction {
 | 
			
		||||
        let other = ValidatorVoteKeypairs::new_rand();
 | 
			
		||||
        let validator_vote_keypair = validator_vote_keypairs.unwrap_or(&other);
 | 
			
		||||
        // TODO authorized_voter_keypair should be different from vote-keypair
 | 
			
		||||
        // but that is what create_genesis_... currently generates.
 | 
			
		||||
        vote_transaction::new_vote_transaction(
 | 
			
		||||
            vec![0],
 | 
			
		||||
            Hash::default(),
 | 
			
		||||
            Hash::default(),
 | 
			
		||||
            &node_keypair,
 | 
			
		||||
            &vote_keypair,
 | 
			
		||||
            &auth_voter_keypair,
 | 
			
		||||
            &validator_vote_keypair.node_keypair,
 | 
			
		||||
            &validator_vote_keypair.vote_keypair,
 | 
			
		||||
            &validator_vote_keypair.vote_keypair, // authorized_voter_keypair
 | 
			
		||||
            hash,
 | 
			
		||||
        )
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn run_test_verify_votes_1_pass(hash: Option<Hash>) {
 | 
			
		||||
        let vote_tx = test_vote_tx(hash);
 | 
			
		||||
        let voting_keypairs: Vec<_> = repeat_with(ValidatorVoteKeypairs::new_rand)
 | 
			
		||||
            .take(10)
 | 
			
		||||
            .collect();
 | 
			
		||||
        let GenesisConfigInfo { genesis_config, .. } =
 | 
			
		||||
            genesis_utils::create_genesis_config_with_vote_accounts(
 | 
			
		||||
                10_000, // mint_lamports
 | 
			
		||||
                &voting_keypairs,
 | 
			
		||||
                vec![100; voting_keypairs.len()], // stakes
 | 
			
		||||
            );
 | 
			
		||||
        let bank = Bank::new_for_tests(&genesis_config);
 | 
			
		||||
        let bank_forks = RwLock::new(BankForks::new(bank));
 | 
			
		||||
        let vote_tx = test_vote_tx(voting_keypairs.first(), hash);
 | 
			
		||||
        let votes = vec![vote_tx];
 | 
			
		||||
        let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes);
 | 
			
		||||
        let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, &bank_forks);
 | 
			
		||||
        assert_eq!(vote_txs.len(), 1);
 | 
			
		||||
        verify_packets_len(&packets, 1);
 | 
			
		||||
    }
 | 
			
		||||
@@ -1857,11 +1875,22 @@ mod tests {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn run_test_bad_vote(hash: Option<Hash>) {
 | 
			
		||||
        let vote_tx = test_vote_tx(hash);
 | 
			
		||||
        let voting_keypairs: Vec<_> = repeat_with(ValidatorVoteKeypairs::new_rand)
 | 
			
		||||
            .take(10)
 | 
			
		||||
            .collect();
 | 
			
		||||
        let GenesisConfigInfo { genesis_config, .. } =
 | 
			
		||||
            genesis_utils::create_genesis_config_with_vote_accounts(
 | 
			
		||||
                10_000, // mint_lamports
 | 
			
		||||
                &voting_keypairs,
 | 
			
		||||
                vec![100; voting_keypairs.len()], // stakes
 | 
			
		||||
            );
 | 
			
		||||
        let bank = Bank::new_for_tests(&genesis_config);
 | 
			
		||||
        let bank_forks = RwLock::new(BankForks::new(bank));
 | 
			
		||||
        let vote_tx = test_vote_tx(voting_keypairs.first(), hash);
 | 
			
		||||
        let mut bad_vote = vote_tx.clone();
 | 
			
		||||
        bad_vote.signatures[0] = Signature::default();
 | 
			
		||||
        let votes = vec![vote_tx.clone(), bad_vote, vote_tx];
 | 
			
		||||
        let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes);
 | 
			
		||||
        let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, &bank_forks);
 | 
			
		||||
        assert_eq!(vote_txs.len(), 2);
 | 
			
		||||
        verify_packets_len(&packets, 2);
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
@@ -109,10 +109,10 @@ impl Tpu {
 | 
			
		||||
        let (verified_gossip_vote_packets_sender, verified_gossip_vote_packets_receiver) =
 | 
			
		||||
            unbounded();
 | 
			
		||||
        let cluster_info_vote_listener = ClusterInfoVoteListener::new(
 | 
			
		||||
            exit,
 | 
			
		||||
            exit.clone(),
 | 
			
		||||
            cluster_info.clone(),
 | 
			
		||||
            verified_gossip_vote_packets_sender,
 | 
			
		||||
            poh_recorder,
 | 
			
		||||
            poh_recorder.clone(),
 | 
			
		||||
            vote_tracker,
 | 
			
		||||
            bank_forks.clone(),
 | 
			
		||||
            subscriptions.clone(),
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user