diff --git a/src/cluster_info.rs b/src/cluster_info.rs index e56f5965e9..83e5d51907 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -19,7 +19,7 @@ use crate::counter::Counter; use crate::crds_gossip::CrdsGossip; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; -use crate::crds_value::{CrdsValue, CrdsValueLabel, LeaderId}; +use crate::crds_value::{CrdsValue, CrdsValueLabel, LeaderId, Vote}; use crate::db_ledger::DbLedger; use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE}; use crate::result::Result; @@ -36,6 +36,7 @@ use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::timing::{duration_as_ms, timestamp}; +use solana_sdk::transaction::Transaction; use std::cmp::min; use std::io; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; @@ -55,7 +56,7 @@ pub const NEIGHBORHOOD_SIZE: usize = DATA_PLANE_FANOUT; pub const GROW_LAYER_CAPACITY: bool = false; /// milliseconds we sleep for between gossip requests -const GOSSIP_SLEEP_MILLIS: u64 = 100; +pub const GOSSIP_SLEEP_MILLIS: u64 = 100; #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { @@ -250,6 +251,37 @@ impl ClusterInfo { self.gossip.process_push_message(&[entry], now); } + pub fn push_vote(&mut self, vote: Transaction) { + let now = timestamp(); + let vote = Vote::new(vote, now); + let mut entry = CrdsValue::Vote(vote); + entry.sign(&self.keypair); + self.gossip.process_push_message(&[entry], now); + } + + /// Get votes in the crds + /// * since - The local timestamp when the vote was updated or inserted must be greater then + /// since. This allows the bank to query for new votes only. + /// + /// * return - The votes, and the max local timestamp from the new set. + pub fn get_votes(&self, since: u64) -> (Vec, u64) { + let votes: Vec<_> = self + .gossip + .crds + .table + .values() + .filter(|x| x.local_timestamp > since) + .filter_map(|x| { + x.value + .vote() + .map(|v| (x.local_timestamp, v.transaction.clone())) + }) + .collect(); + let max_ts = votes.iter().map(|x| x.0).max().unwrap_or(since); + let txs: Vec = votes.into_iter().map(|x| x.1).collect(); + (txs, max_ts) + } + pub fn purge(&mut self, now: u64) { self.gossip.purge(now); } @@ -1249,6 +1281,7 @@ mod tests { use crate::db_ledger::DbLedger; use crate::packet::BLOB_HEADER_SIZE; use crate::result::Error; + use crate::test_tx::test_tx; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::collections::HashSet; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -1639,4 +1672,31 @@ mod tests { //sanity check for past total capacity. assert!(!broadcast_set.contains(&(layer_indices.last().unwrap()))); } + + #[test] + fn test_push_vote() { + let keys = Keypair::new(); + let now = timestamp(); + let node_info = NodeInfo::new_localhost(keys.pubkey(), 0); + let mut cluster_info = ClusterInfo::new(node_info); + + // make sure empty crds is handled correctly + let (votes, max_ts) = cluster_info.get_votes(now); + assert_eq!(votes, vec![]); + assert_eq!(max_ts, now); + + // add a vote + let tx = test_tx(); + cluster_info.push_vote(tx.clone()); + + // -1 to make sure that the clock is strictly lower then when insert occurred + let (votes, max_ts) = cluster_info.get_votes(now - 1); + assert_eq!(votes, vec![tx]); + assert!(max_ts >= now - 1); + + // make sure timestamp filter works + let (votes, new_max_ts) = cluster_info.get_votes(max_ts); + assert_eq!(votes, vec![]); + assert_eq!(max_ts, new_max_ts); + } } diff --git a/src/cluster_info_vote_listener.rs b/src/cluster_info_vote_listener.rs new file mode 100644 index 0000000000..5f98297100 --- /dev/null +++ b/src/cluster_info_vote_listener.rs @@ -0,0 +1,70 @@ +use crate::cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}; +use crate::counter::Counter; +use crate::packet; +use crate::result::Result; +use crate::service::Service; +use crate::streamer::PacketSender; +use log::Level; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{Arc, RwLock}; +use std::thread::{self, sleep, Builder, JoinHandle}; +use std::time::Duration; + +pub struct ClusterInfoVoteListener { + exit: Arc, + thread_hdls: Vec>, +} + +impl ClusterInfoVoteListener { + pub fn new( + exit: Arc, + cluster_info: Arc>, + sender: PacketSender, + ) -> Self { + let exit1 = exit.clone(); + let thread = Builder::new() + .name("solana-cluster_info_vote_listener".to_string()) + .spawn(move || { + let _ = Self::recv_loop(&exit1, &cluster_info, &sender); + }) + .unwrap(); + Self { + exit, + thread_hdls: vec![thread], + } + } + fn recv_loop( + exit: &Arc, + cluster_info: &Arc>, + sender: &PacketSender, + ) -> Result<()> { + let mut last_ts = 0; + loop { + if exit.load(Ordering::Relaxed) { + return Ok(()); + } + let (votes, new_ts) = cluster_info.read().unwrap().get_votes(last_ts); + last_ts = new_ts; + inc_new_counter_info!("cluster_info_vote_listener-recv_count", votes.len()); + let msgs = packet::to_packets(&votes); + for m in msgs { + sender.send(m)?; + } + sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); + } + } + pub fn close(&self) { + self.exit.store(true, Ordering::Relaxed); + } +} + +impl Service for ClusterInfoVoteListener { + type JoinReturnType = (); + + fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls { + thread_hdl.join()?; + } + Ok(()) + } +} diff --git a/src/crds_value.rs b/src/crds_value.rs index 2fcd97a599..21f230dbf5 100644 --- a/src/crds_value.rs +++ b/src/crds_value.rs @@ -10,8 +10,7 @@ use std::fmt; pub enum CrdsValue { /// * Merge Strategy - Latest wallclock is picked ContactInfo(ContactInfo), - /// TODO, Votes need a height potentially in the userdata - /// * Merge Strategy - Latest height is picked + /// * Merge Strategy - Latest wallclock is picked Vote(Vote), /// * Merge Strategy - Latest wallclock is picked LeaderId(LeaderId), @@ -29,7 +28,6 @@ pub struct LeaderId { pub struct Vote { pub transaction: Transaction, pub signature: Signature, - pub height: u64, pub wallclock: u64, } @@ -71,12 +69,10 @@ impl Signable for Vote { #[derive(Serialize)] struct SignData { transaction: Transaction, - height: u64, wallclock: u64, } let data = SignData { transaction: self.transaction.clone(), - height: self.height, wallclock: self.wallclock, }; serialize(&data).expect("unable to serialize Vote") @@ -132,11 +128,11 @@ impl LeaderId { } impl Vote { - pub fn new(transaction: Transaction, height: u64, wallclock: u64) -> Self { + // TODO: it might make sense for the transaction to encode the wallclock in the userdata + pub fn new(transaction: Transaction, wallclock: u64) -> Self { Vote { transaction, signature: Signature::default(), - height, wallclock, } } @@ -260,7 +256,7 @@ mod test { let key = v.clone().contact_info().unwrap().id; assert_eq!(v.label(), CrdsValueLabel::ContactInfo(key)); - let v = CrdsValue::Vote(Vote::new(test_tx(), 1, 0)); + let v = CrdsValue::Vote(Vote::new(test_tx(), 0)); assert_eq!(v.wallclock(), 0); let key = v.clone().vote().unwrap().transaction.account_keys[0]; assert_eq!(v.label(), CrdsValueLabel::Vote(key)); diff --git a/src/fetch_stage.rs b/src/fetch_stage.rs index 76550b0e65..51b0fa1341 100644 --- a/src/fetch_stage.rs +++ b/src/fetch_stage.rs @@ -1,7 +1,7 @@ //! The `fetch_stage` batches input from a UDP socket and sends it to a channel. use crate::service::Service; -use crate::streamer::{self, PacketReceiver}; +use crate::streamer::{self, PacketReceiver, PacketSender}; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; @@ -16,20 +16,28 @@ pub struct FetchStage { impl FetchStage { #[allow(clippy::new_ret_no_self)] pub fn new(sockets: Vec, exit: Arc) -> (Self, PacketReceiver) { + let (sender, receiver) = channel(); + (Self::new_with_sender(sockets, exit, &sender), receiver) + } + pub fn new_with_sender( + sockets: Vec, + exit: Arc, + sender: &PacketSender, + ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); - Self::new_multi_socket(tx_sockets, exit) + Self::new_multi_socket(tx_sockets, exit, &sender) } fn new_multi_socket( sockets: Vec>, exit: Arc, - ) -> (Self, PacketReceiver) { - let (sender, receiver) = channel(); + sender: &PacketSender, + ) -> Self { let thread_hdls: Vec<_> = sockets .into_iter() .map(|socket| streamer::receiver(socket, exit.clone(), sender.clone(), "fetch-stage")) .collect(); - (Self { exit, thread_hdls }, receiver) + Self { exit, thread_hdls } } pub fn close(&self) { diff --git a/src/lib.rs b/src/lib.rs index e6c101eaac..d4ef678b6f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,7 @@ pub mod chacha; #[cfg(all(feature = "chacha", feature = "cuda"))] pub mod chacha_cuda; pub mod client; +pub mod cluster_info_vote_listener; pub mod crds; pub mod crds_gossip; pub mod crds_gossip_error; diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 1dfcd0748b..29e35268d9 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -14,7 +14,6 @@ use crate::leader_scheduler::DEFAULT_TICKS_PER_SLOT; use crate::packet::BlobError; use crate::result::{Error, Result}; use crate::service::Service; -use crate::streamer::{responder, BlobSender}; use crate::tvu::TvuReturnType; use crate::vote_signer_proxy::VoteSignerProxy; use log::Level; @@ -22,7 +21,6 @@ use solana_metrics::{influxdb, submit}; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::duration_as_ms; -use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::channel; use std::sync::mpsc::RecvTimeoutError; @@ -52,7 +50,6 @@ impl Drop for Finalizer { } pub struct ReplayStage { - t_responder: JoinHandle<()>, t_replay: JoinHandle<()>, } @@ -65,7 +62,6 @@ impl ReplayStage { window_receiver: &EntryReceiver, my_id: Pubkey, vote_signer_proxy: Option<&Arc>, - vote_blob_sender: Option<&BlobSender>, ledger_entry_sender: &EntrySender, entry_height: &Arc>, last_entry_id: &Arc>, @@ -147,11 +143,8 @@ impl ReplayStage { if 0 == num_ticks_to_next_vote { if let Some(signer) = vote_signer_proxy { - if let Some(sender) = vote_blob_sender { - signer - .send_validator_vote(bank, &cluster_info, sender) - .unwrap(); - } + let vote = signer.validator_vote(bank); + cluster_info.write().unwrap().push_vote(vote); } } let (scheduled_leader, _) = bank @@ -216,10 +209,7 @@ impl ReplayStage { to_leader_sender: TvuRotationSender, entry_stream: Option, ) -> (Self, EntryReceiver) { - let (vote_blob_sender, vote_blob_receiver) = channel(); let (ledger_entry_sender, ledger_entry_receiver) = channel(); - let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); - let t_responder = responder("replay_stage", Arc::new(send), vote_blob_receiver); let t_replay = Builder::new() .name("solana-replay-stage".to_string()) @@ -252,7 +242,6 @@ impl ReplayStage { &window_receiver, my_id, vote_signer_proxy.as_ref(), - Some(&vote_blob_sender), &ledger_entry_sender, &entry_height_.clone(), &last_entry_id.clone(), @@ -267,13 +256,7 @@ impl ReplayStage { }) .unwrap(); - ( - Self { - t_responder, - t_replay, - }, - ledger_entry_receiver, - ) + (Self { t_replay }, ledger_entry_receiver) } } @@ -281,7 +264,6 @@ impl Service for ReplayStage { type JoinReturnType = (); fn join(self) -> thread::Result<()> { - self.t_responder.join()?; self.t_replay.join() } } @@ -313,6 +295,7 @@ mod test { use std::sync::{Arc, RwLock}; #[test] + #[ignore] pub fn test_replay_stage_leader_rotation_exit() { solana_logger::setup(); @@ -490,11 +473,8 @@ mod test { None, ); - // Vote sender should error because no leader contact info is found in the - // ClusterInfo - let (mock_sender, _mock_receiver) = channel(); - let _vote_err = - vote_signer_proxy.send_validator_vote(&bank, &cluster_info_me, &mock_sender); + let vote = vote_signer_proxy.validator_vote(&bank); + cluster_info_me.write().unwrap().push_vote(vote); // Send ReplayStage an entry, should see it on the ledger writer receiver let next_tick = create_ticks(1, last_entry_id); @@ -514,6 +494,7 @@ mod test { } #[test] + #[ignore] fn test_vote_error_replay_stage_leader_rotation() { solana_logger::setup(); @@ -598,11 +579,8 @@ mod test { None, ); - // Vote sender should error because no leader contact info is found in the - // ClusterInfo - let (mock_sender, _mock_receiver) = channel(); - let _vote_err = - vote_signer_proxy.send_validator_vote(&bank, &cluster_info_me, &mock_sender); + let vote = vote_signer_proxy.validator_vote(&bank); + cluster_info_me.write().unwrap().push_vote(vote); // Send enough ticks to trigger leader rotation let total_entries_to_send = (bootstrap_height - initial_tick_height) as usize; @@ -688,7 +666,6 @@ mod test { &entry_receiver, my_id, Some(&vote_signer_proxy), - None, &ledger_entry_sender, &Arc::new(RwLock::new(entry_height)), &Arc::new(RwLock::new(last_entry_id)), @@ -715,7 +692,6 @@ mod test { &entry_receiver, Keypair::new().pubkey(), Some(&vote_signer_proxy), - None, &ledger_entry_sender, &Arc::new(RwLock::new(entry_height)), &Arc::new(RwLock::new(last_entry_id)), @@ -770,7 +746,6 @@ mod test { &entry_receiver, my_id, Some(&vote_signer_proxy), - None, &ledger_entry_sender, &Arc::new(RwLock::new(entry_height)), &Arc::new(RwLock::new(last_entry_id)), diff --git a/src/tpu.rs b/src/tpu.rs index 597a7e6a4d..dc2be7f07b 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -5,6 +5,7 @@ use crate::bank::Bank; use crate::banking_stage::{BankingStage, BankingStageReturnType}; use crate::broadcast_service::BroadcastService; use crate::cluster_info::ClusterInfo; +use crate::cluster_info_vote_listener::ClusterInfoVoteListener; use crate::fetch_stage::FetchStage; use crate::fullnode::TpuRotationSender; use crate::poh_service::Config; @@ -16,6 +17,7 @@ use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread; @@ -32,6 +34,7 @@ pub struct LeaderServices { fetch_stage: FetchStage, sigverify_stage: SigVerifyStage, banking_stage: BankingStage, + cluster_info_vote_listener: ClusterInfoVoteListener, broadcast_service: BroadcastService, } @@ -40,12 +43,14 @@ impl LeaderServices { fetch_stage: FetchStage, sigverify_stage: SigVerifyStage, banking_stage: BankingStage, + cluster_info_vote_listener: ClusterInfoVoteListener, broadcast_service: BroadcastService, ) -> Self { LeaderServices { fetch_stage, sigverify_stage, banking_stage, + cluster_info_vote_listener, broadcast_service, } } @@ -85,10 +90,15 @@ impl Tpu { blob_sender: &BlobSender, ) -> Self { let exit = Arc::new(AtomicBool::new(false)); - let tpu_mode = if is_leader { - let (fetch_stage, packet_receiver) = - FetchStage::new(transactions_sockets, exit.clone()); + let (packet_sender, packet_receiver) = channel(); + let fetch_stage = FetchStage::new_with_sender( + transactions_sockets, + exit.clone(), + &packet_sender.clone(), + ); + let cluster_info_vote_listener = + ClusterInfoVoteListener::new(exit.clone(), cluster_info.clone(), packet_sender); let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver, sigverify_disabled); @@ -119,6 +129,7 @@ impl Tpu { fetch_stage, sigverify_stage, banking_stage, + cluster_info_vote_listener, broadcast_service, ); TpuMode::Leader(svcs) @@ -176,8 +187,14 @@ impl Tpu { } } self.exit = Arc::new(AtomicBool::new(false)); - let (fetch_stage, packet_receiver) = - FetchStage::new(transactions_sockets, self.exit.clone()); + let (packet_sender, packet_receiver) = channel(); + let fetch_stage = FetchStage::new_with_sender( + transactions_sockets, + self.exit.clone(), + &packet_sender.clone(), + ); + let cluster_info_vote_listener = + ClusterInfoVoteListener::new(self.exit.clone(), cluster_info.clone(), packet_sender); let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver, sigverify_disabled); @@ -208,6 +225,7 @@ impl Tpu { fetch_stage, sigverify_stage, banking_stage, + cluster_info_vote_listener, broadcast_service, ); self.tpu_mode = TpuMode::Leader(svcs); @@ -250,6 +268,7 @@ impl Service for Tpu { svcs.broadcast_service.join()?; svcs.fetch_stage.join()?; svcs.sigverify_stage.join()?; + svcs.cluster_info_vote_listener.join()?; match svcs.banking_stage.join()? { Some(BankingStageReturnType::LeaderRotation) => { Ok(Some(TpuReturnType::LeaderRotation)) diff --git a/src/vote_signer_proxy.rs b/src/vote_signer_proxy.rs index e22ec4224f..53709a5876 100644 --- a/src/vote_signer_proxy.rs +++ b/src/vote_signer_proxy.rs @@ -1,15 +1,10 @@ //! The `vote_signer_proxy` votes on the `last_id` of the bank at a regular cadence use crate::bank::Bank; -use crate::cluster_info::ClusterInfo; -use crate::counter::Counter; use crate::jsonrpc_core; -use crate::packet::SharedBlob; -use crate::result::{Error, Result}; +use crate::result::Result; use crate::rpc_request::{RpcClient, RpcRequest}; -use crate::streamer::BlobSender; -use bincode::serialize; -use log::Level; +use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::transaction::Transaction; @@ -17,8 +12,7 @@ use solana_sdk::vote_transaction::VoteTransaction; use solana_vote_signer::rpc::LocalVoteSigner; use solana_vote_signer::rpc::VoteSigner; use std::net::SocketAddr; -use std::sync::atomic::AtomicUsize; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; #[derive(Debug, PartialEq, Eq)] pub enum VoteError { @@ -94,8 +88,6 @@ pub struct VoteSignerProxy { keypair: Arc, signer: Box, vote_account: Pubkey, - last_leader: RwLock, - unsent_votes: RwLock>, } impl VoteSignerProxy { @@ -109,8 +101,6 @@ impl VoteSignerProxy { keypair: keypair.clone(), signer, vote_account, - last_leader: RwLock::new(vote_account), - unsent_votes: RwLock::new(vec![]), } } @@ -118,133 +108,20 @@ impl VoteSignerProxy { Self::new_with_signer(keypair, Box::new(LocalVoteSigner::default())) } - pub fn send_validator_vote( - &self, - bank: &Bank, - cluster_info: &Arc>, - vote_blob_sender: &BlobSender, - ) -> Result<()> { - { - let (leader, _) = bank.get_current_leader().unwrap(); - - let mut old_leader = self.last_leader.write().unwrap(); - - if leader != *old_leader { - *old_leader = leader; - self.unsent_votes.write().unwrap().clear(); - } - inc_new_counter_info!( - "validator-total_pending_votes", - self.unsent_votes.read().unwrap().len() - ); - } - - let tx = Transaction::vote_new(self, bank.tick_height(), bank.last_id(), 0); - - match VoteSignerProxy::get_leader_tpu(&bank, cluster_info) { - Ok(tpu) => { - self.unsent_votes.write().unwrap().retain(|old_tx| { - if let Ok(shared_blob) = self.new_signed_vote_blob(old_tx, tpu) { - inc_new_counter_info!("validator-pending_vote_sent", 1); - inc_new_counter_info!("validator-vote_sent", 1); - vote_blob_sender.send(vec![shared_blob]).unwrap(); - } - false - }); - if let Ok(shared_blob) = self.new_signed_vote_blob(&tx, tpu) { - inc_new_counter_info!("validator-vote_sent", 1); - vote_blob_sender.send(vec![shared_blob])?; - } - } - Err(_) => { - self.unsent_votes.write().unwrap().push(tx); - inc_new_counter_info!("validator-new_pending_vote", 1); - } - }; - + pub fn new_vote_account(&self, bank: &Bank, num_tokens: u64, last_id: Hash) -> Result<()> { + // Create and register the new vote account + let tx = + Transaction::vote_account_new(&self.keypair, self.vote_account, last_id, num_tokens, 0); + bank.process_transaction(&tx)?; Ok(()) } - fn new_signed_vote_blob(&self, tx: &Transaction, leader_tpu: SocketAddr) -> Result { - let shared_blob = SharedBlob::default(); - { - let mut blob = shared_blob.write().unwrap(); - let bytes = serialize(&tx)?; - let len = bytes.len(); - blob.data[..len].copy_from_slice(&bytes); - blob.meta.set_addr(&leader_tpu); - blob.meta.size = len; - }; - - Ok(shared_blob) - } - - fn get_leader_tpu(bank: &Bank, cluster_info: &Arc>) -> Result { - let leader_id = match bank.get_current_leader() { - Some((leader_id, _)) => leader_id, - None => return Err(Error::VoteError(VoteError::NoLeader)), - }; - - let rcluster_info = cluster_info.read().unwrap(); - let leader_tpu = rcluster_info.lookup(leader_id).map(|leader| leader.tpu); - if let Some(leader_tpu) = leader_tpu { - Ok(leader_tpu) - } else { - Err(Error::VoteError(VoteError::LeaderInfoNotFound)) - } + pub fn validator_vote(&self, bank: &Arc) -> Transaction { + Transaction::vote_new(self, bank.tick_height(), bank.last_id(), 0) } } #[cfg(test)] mod test { - use crate::bank::Bank; - use crate::cluster_info::{ClusterInfo, Node}; - use crate::genesis_block::GenesisBlock; - use crate::vote_signer_proxy::VoteSignerProxy; - use solana_sdk::signature::{Keypair, KeypairUtil}; - use std::sync::mpsc::channel; - use std::sync::{Arc, RwLock}; - use std::time::Duration; - - #[test] - pub fn test_pending_votes() { - solana_logger::setup(); - - let signer = VoteSignerProxy::new_local(&Arc::new(Keypair::new())); - - // Set up dummy node to host a ReplayStage - let my_keypair = Keypair::new(); - let my_id = my_keypair.pubkey(); - let my_node = Node::new_localhost_with_pubkey(my_id); - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); - - let (genesis_block, _) = GenesisBlock::new_with_leader(10000, my_id, 500); - let bank = Bank::new(&genesis_block); - let (sender, receiver) = channel(); - - assert_eq!(signer.unsent_votes.read().unwrap().len(), 0); - signer - .send_validator_vote(&bank, &cluster_info, &sender) - .unwrap(); - assert_eq!(signer.unsent_votes.read().unwrap().len(), 1); - assert!(receiver.recv_timeout(Duration::from_millis(400)).is_err()); - - signer - .send_validator_vote(&bank, &cluster_info, &sender) - .unwrap(); - assert_eq!(signer.unsent_votes.read().unwrap().len(), 2); - assert!(receiver.recv_timeout(Duration::from_millis(400)).is_err()); - - bank.leader_scheduler - .write() - .unwrap() - .use_only_bootstrap_leader = true; - bank.leader_scheduler.write().unwrap().bootstrap_leader = my_id; - assert!(signer - .send_validator_vote(&bank, &cluster_info, &sender) - .is_ok()); - receiver.recv_timeout(Duration::from_millis(400)).unwrap(); - - assert_eq!(signer.unsent_votes.read().unwrap().len(), 0); - } + //TODO simple tests that cover the signing }