From 9afc5da2e162151b070865cf01523b696d307c0d Mon Sep 17 00:00:00 2001 From: carllin Date: Sun, 15 Mar 2020 20:31:05 -0700 Subject: [PATCH] Fix vote polling (#8829) Co-authored-by: Carl --- core/src/cluster_info.rs | 39 +++-- core/src/cluster_info_vote_listener.rs | 191 +++++++++++++++++-------- core/src/lib.rs | 1 + core/src/verified_vote_packets.rs | 164 +++++++++++++++++++++ 4 files changed, 323 insertions(+), 72 deletions(-) create mode 100644 core/src/verified_vote_packets.rs diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 94ce8584dc..113a7dfdd7 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -450,23 +450,23 @@ impl ClusterInfo { /// since. This allows the bank to query for new votes only. /// /// * return - The votes, and the max timestamp from the new set. - pub fn get_votes(&self, since: u64) -> (Vec, u64) { - let votes: Vec<_> = self + pub fn get_votes(&self, since: u64) -> (Vec, Vec, u64) { + let mut max_ts = since; + let (labels, txs): (Vec, Vec) = self .gossip .crds .table - .values() - .filter(|x| x.insert_timestamp > since) - .filter_map(|x| { + .iter() + .filter(|(_, x)| x.insert_timestamp > since) + .filter_map(|(label, x)| { + max_ts = std::cmp::max(x.insert_timestamp, max_ts); x.value .vote() - .map(|v| (x.insert_timestamp, v.transaction.clone())) + .map(|v| (label.clone(), v.transaction.clone())) }) - .collect(); - let max_ts = votes.iter().map(|x| x.0).max().unwrap_or(since); - let txs: Vec = votes.into_iter().map(|x| x.1).collect(); + .unzip(); inc_new_counter_info!("cluster_info-get_votes-count", txs.len()); - (txs, max_ts) + (labels, txs, max_ts) } pub fn get_snapshot_hash(&self, slot: Slot) -> Vec<(Pubkey, Hash)> { @@ -2242,26 +2242,35 @@ mod tests { #[test] fn test_push_vote() { let keys = Keypair::new(); - let now = timestamp(); let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0); let mut cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); // make sure empty crds is handled correctly - let (votes, max_ts) = cluster_info.get_votes(now); + let now = timestamp(); + let (_, votes, max_ts) = cluster_info.get_votes(now); assert_eq!(votes, vec![]); assert_eq!(max_ts, now); // add a vote let tx = test_tx(); - cluster_info.push_vote(0, tx.clone()); + let index = 1; + cluster_info.push_vote(index, tx.clone()); // -1 to make sure that the clock is strictly lower then when insert occurred - let (votes, max_ts) = cluster_info.get_votes(now - 1); + let (labels, votes, max_ts) = cluster_info.get_votes(now - 1); assert_eq!(votes, vec![tx]); + assert_eq!(labels.len(), 1); + match labels[0] { + CrdsValueLabel::Vote(_, pubkey) => { + assert_eq!(pubkey, keys.pubkey()); + } + + _ => panic!("Bad match"), + } assert!(max_ts >= now - 1); // make sure timestamp filter works - let (votes, new_max_ts) = cluster_info.get_votes(max_ts); + let (_, votes, new_max_ts) = cluster_info.get_votes(max_ts); assert_eq!(votes, vec![]); assert_eq!(max_ts, new_max_ts); } diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 3660424a66..02e81cf832 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -1,11 +1,16 @@ -use crate::cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}; -use crate::packet::Packets; -use crate::poh_recorder::PohRecorder; -use crate::result::{Error, Result}; -use crate::{packet, sigverify}; +use crate::{ + cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, + crds_value::CrdsValueLabel, + packet::{self, Packets}, + poh_recorder::PohRecorder, + result::{Error, Result}, + sigverify, + verified_vote_packets::VerifiedVotePackets, +}; use crossbeam_channel::{ unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, }; +use itertools::izip; use log::*; use solana_ledger::bank_forks::BankForks; use solana_metrics::inc_new_counter_debug; @@ -19,15 +24,23 @@ use solana_sdk::{ transaction::Transaction, }; use solana_vote_program::{vote_instruction::VoteInstruction, vote_state::VoteState}; -use std::collections::{HashMap, HashSet}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; -use std::thread::{self, sleep, Builder, JoinHandle}; -use std::time::Duration; +use std::{ + collections::{HashMap, HashSet}, + sync::{ + atomic::{AtomicBool, Ordering}, + {Arc, Mutex, RwLock}, + }, + thread::{self, sleep, Builder, JoinHandle}, + time::{Duration, Instant}, +}; // Map from a vote account to the authorized voter for an epoch pub type EpochAuthorizedVoters = HashMap, Arc>; pub type NodeIdToVoteAccounts = HashMap>>; +pub type VerifiedVotePacketsSender = CrossbeamSender>; +pub type VerifiedVotePacketsReceiver = CrossbeamReceiver>; +pub type VerifiedVoteTransactionsSender = CrossbeamSender>; +pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver>; pub struct SlotVoteTracker { voted: HashSet>, @@ -276,8 +289,9 @@ impl ClusterInfoVoteListener { bank_forks: Arc>, ) -> Self { let exit_ = exit.clone(); - let poh_recorder = poh_recorder.clone(); - let (vote_txs_sender, vote_txs_receiver) = unbounded(); + + let (verified_vote_packets_sender, verified_vote_packets_receiver) = unbounded(); + let (verified_vote_transactions_sender, verified_vote_transactions_receiver) = unbounded(); let listen_thread = Builder::new() .name("solana-cluster_info_vote_listener".to_string()) .spawn(move || { @@ -285,9 +299,22 @@ impl ClusterInfoVoteListener { exit_, &cluster_info, sigverify_disabled, - &sender, - vote_txs_sender, + verified_vote_packets_sender, + verified_vote_transactions_sender, + ); + }) + .unwrap(); + + let exit_ = exit.clone(); + let poh_recorder = poh_recorder.clone(); + let bank_send_thread = Builder::new() + .name("solana-cluster_info_bank_send".to_string()) + .spawn(move || { + let _ = Self::bank_send_loop( + exit_, + verified_vote_packets_receiver, poh_recorder, + &sender, ); }) .unwrap(); @@ -296,13 +323,17 @@ impl ClusterInfoVoteListener { let send_thread = Builder::new() .name("solana-cluster_info_process_votes".to_string()) .spawn(move || { - let _ = - Self::process_votes_loop(exit_, vote_txs_receiver, vote_tracker, &bank_forks); + let _ = Self::process_votes_loop( + exit_, + verified_vote_transactions_receiver, + vote_tracker, + &bank_forks, + ); }) .unwrap(); Self { - thread_hdls: vec![listen_thread, send_thread], + thread_hdls: vec![listen_thread, send_thread, bank_send_thread], } } @@ -317,57 +348,104 @@ impl ClusterInfoVoteListener { exit: Arc, cluster_info: &Arc>, sigverify_disabled: bool, - packets_sender: &CrossbeamSender>, - vote_txs_sender: CrossbeamSender>, - poh_recorder: Arc>, + verified_vote_packets_sender: VerifiedVotePacketsSender, + verified_vote_transactions_sender: VerifiedVoteTransactionsSender, ) -> Result<()> { + let mut last_ts = 0; loop { if exit.load(Ordering::Relaxed) { return Ok(()); } - let poh_bank = poh_recorder.lock().unwrap().bank(); - if let Some(bank) = poh_bank { - let last_ts = bank.last_vote_sync.load(Ordering::Relaxed); - let (votes, new_ts) = cluster_info.read().unwrap().get_votes(last_ts); - bank.last_vote_sync - .compare_and_swap(last_ts, new_ts, Ordering::Relaxed); - inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len()); - let mut msgs = packet::to_packets(&votes); - if !msgs.is_empty() { - let r = if sigverify_disabled { - sigverify::ed25519_verify_disabled(&msgs) + let (labels, votes, new_ts) = cluster_info.read().unwrap().get_votes(last_ts); + inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len()); + + last_ts = new_ts; + let msgs = packet::to_packets(&votes); + if !msgs.is_empty() { + let r = if sigverify_disabled { + sigverify::ed25519_verify_disabled(&msgs) + } else { + sigverify::ed25519_verify_cpu(&msgs) + }; + + assert_eq!( + r.iter() + .map(|packets_results| packets_results.len()) + .sum::(), + votes.len() + ); + + let (vote_txs, packets) = izip!( + labels.into_iter(), + votes.into_iter(), + r.iter().flatten(), + msgs + ) + .filter_map(|(label, vote, verify_result, packet)| { + if *verify_result != 0 { + Some((vote, (label, packet))) } else { - sigverify::ed25519_verify_cpu(&msgs) - }; - assert_eq!( - r.iter() - .map(|packets_results| packets_results.len()) - .sum::(), - votes.len() - ); - let valid_votes: Vec<_> = votes - .into_iter() - .zip(r.iter().flatten()) - .filter_map(|(vote, verify_result)| { - if *verify_result != 0 { - Some(vote) - } else { - None - } - }) - .collect(); - vote_txs_sender.send(valid_votes)?; - sigverify::mark_disabled(&mut msgs, &r); - packets_sender.send(msgs)?; + None + } + }) + .unzip(); + + verified_vote_transactions_sender.send(vote_txs)?; + verified_vote_packets_sender.send(packets)?; + } + + sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); + } + } + + fn bank_send_loop( + exit: Arc, + verified_vote_packets_receiver: VerifiedVotePacketsReceiver, + poh_recorder: Arc>, + packets_sender: &CrossbeamSender>, + ) -> Result<()> { + let mut verified_vote_packets = VerifiedVotePackets::default(); + let mut time_since_lock = Instant::now(); + let mut update_version = 0; + loop { + if exit.load(Ordering::Relaxed) { + return Ok(()); + } + + if let Err(e) = verified_vote_packets + .get_and_process_vote_packets(&verified_vote_packets_receiver, &mut update_version) + { + match e { + Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => { + return Ok(()); + } + Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => { + error!("thread {:?} error {:?}", thread::current().name(), e); + } + } + } + + if time_since_lock.elapsed().as_millis() > GOSSIP_SLEEP_MILLIS as u128 { + let bank = poh_recorder.lock().unwrap().bank(); + if let Some(bank) = bank { + let last_version = bank.last_vote_sync.load(Ordering::Relaxed); + let (new_version, msgs) = verified_vote_packets.get_latest_votes(last_version); + packets_sender.send(msgs)?; + bank.last_vote_sync.compare_and_swap( + last_version, + new_version, + Ordering::Relaxed, + ); + time_since_lock = Instant::now(); } } - sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); } } fn process_votes_loop( exit: Arc, - vote_txs_receiver: CrossbeamReceiver>, + vote_txs_receiver: VerifiedVoteTransactionsReceiver, vote_tracker: Arc, bank_forks: &RwLock, ) -> Result<()> { @@ -425,7 +503,7 @@ impl ClusterInfoVoteListener { } fn get_and_process_votes( - vote_txs_receiver: &CrossbeamReceiver>, + vote_txs_receiver: &VerifiedVoteTransactionsReceiver, vote_tracker: &Arc, last_root: Slot, ) -> Result<()> { @@ -434,7 +512,6 @@ impl ClusterInfoVoteListener { while let Ok(new_txs) = vote_txs_receiver.try_recv() { vote_txs.extend(new_txs); } - Self::process_votes(vote_tracker, vote_txs, last_root); Ok(()) } diff --git a/core/src/lib.rs b/core/src/lib.rs index 1d13b3dcf9..7139e3ddf3 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -58,6 +58,7 @@ pub mod tpu; pub mod transaction_status_service; pub mod tvu; pub mod validator; +pub mod verified_vote_packets; pub mod weighted_shuffle; pub mod window_service; diff --git a/core/src/verified_vote_packets.rs b/core/src/verified_vote_packets.rs new file mode 100644 index 0000000000..8178f55956 --- /dev/null +++ b/core/src/verified_vote_packets.rs @@ -0,0 +1,164 @@ +use crate::{ + cluster_info_vote_listener::VerifiedVotePacketsReceiver, crds_value::CrdsValueLabel, + packet::Packets, result::Result, +}; +use std::{collections::HashMap, ops::Deref, time::Duration}; + +#[derive(Default)] +pub struct VerifiedVotePackets(HashMap); + +impl Deref for VerifiedVotePackets { + type Target = HashMap; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl VerifiedVotePackets { + pub fn get_and_process_vote_packets( + &mut self, + vote_packets_receiver: &VerifiedVotePacketsReceiver, + last_update_version: &mut u64, + ) -> Result<()> { + let timer = Duration::from_millis(200); + let vote_packets = vote_packets_receiver.recv_timeout(timer)?; + *last_update_version += 1; + for (label, packet) in vote_packets { + self.0.insert(label, (*last_update_version, packet)); + } + while let Ok(vote_packets) = vote_packets_receiver.try_recv() { + for (label, packet) in vote_packets { + self.0.insert(label, (*last_update_version, packet)); + } + } + Ok(()) + } + + pub fn get_latest_votes(&self, last_update_version: u64) -> (u64, Vec) { + let mut new_update_version = last_update_version; + let msgs: Vec<_> = self + .iter() + .filter_map(|(_, (msg_update_version, msg))| { + if *msg_update_version > last_update_version { + new_update_version = std::cmp::max(*msg_update_version, new_update_version); + Some(msg) + } else { + None + } + }) + .cloned() + .collect(); + (new_update_version, msgs) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + packet::{Meta, Packet}, + result::Error, + }; + use crossbeam_channel::{unbounded, RecvTimeoutError}; + use solana_sdk::pubkey::Pubkey; + + #[test] + fn test_get_latest_votes() { + let pubkey = Pubkey::new_rand(); + let label1 = CrdsValueLabel::Vote(0 as u8, pubkey); + let label2 = CrdsValueLabel::Vote(1 as u8, pubkey); + let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); + + let data = Packet { + meta: Meta { + repair: true, + ..Meta::default() + }, + ..Packet::default() + }; + + let none_empty_packets = Packets::new(vec![data, Packet::default()]); + + verified_vote_packets + .0 + .insert(label1, (2, none_empty_packets)); + verified_vote_packets + .0 + .insert(label2, (1, Packets::default())); + + // Both updates have timestamps greater than 0, so both should be returned + let (new_update_version, updates) = verified_vote_packets.get_latest_votes(0); + assert_eq!(new_update_version, 2); + assert_eq!(updates.len(), 2); + + // Only the nonempty packet had a timestamp greater than 1 + let (new_update_version, updates) = verified_vote_packets.get_latest_votes(1); + assert_eq!(new_update_version, 2); + assert_eq!(updates.len(), 1); + assert!(updates[0].packets.len() > 0); + + // If the given timestamp is greater than all timestamps in any update, + // returned timestamp should be the same as the given timestamp, and + // no updates should be returned + let (new_update_version, updates) = verified_vote_packets.get_latest_votes(3); + assert_eq!(new_update_version, 3); + assert!(updates.is_empty()); + } + + #[test] + fn test_get_and_process_vote_packets() { + let (s, r) = unbounded(); + let pubkey = Pubkey::new_rand(); + let label1 = CrdsValueLabel::Vote(0 as u8, pubkey); + let label2 = CrdsValueLabel::Vote(1 as u8, pubkey); + let mut update_version = 0; + s.send(vec![(label1.clone(), Packets::default())]).unwrap(); + s.send(vec![(label2.clone(), Packets::default())]).unwrap(); + + let data = Packet { + meta: Meta { + repair: true, + ..Meta::default() + }, + ..Packet::default() + }; + + let later_packets = Packets::new(vec![data, Packet::default()]); + s.send(vec![(label1.clone(), later_packets.clone())]) + .unwrap(); + let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); + verified_vote_packets + .get_and_process_vote_packets(&r, &mut update_version) + .unwrap(); + + // Test timestamps for same batch are the same + let update_version1 = verified_vote_packets.get(&label1).unwrap().0; + assert_eq!( + update_version1, + verified_vote_packets.get(&label2).unwrap().0 + ); + + // Test the later value overwrote the earlier one for this label + assert!(verified_vote_packets.get(&label1).unwrap().1.packets.len() > 1); + assert_eq!( + verified_vote_packets.get(&label2).unwrap().1.packets.len(), + 0 + ); + + // Test timestamp for next batch overwrites the original + s.send(vec![(label2.clone(), Packets::default())]).unwrap(); + verified_vote_packets + .get_and_process_vote_packets(&r, &mut update_version) + .unwrap(); + let update_version2 = verified_vote_packets.get(&label2).unwrap().0; + assert!(update_version2 > update_version1); + + // Test empty doesn't bump the version + let before = update_version; + assert_matches!( + verified_vote_packets.get_and_process_vote_packets(&r, &mut update_version), + Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout)) + ); + assert_eq!(before, update_version); + } +}