From 7672506b45c4e5f0a880a593a7d57887c51a3c48 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 19 Jul 2018 21:27:35 -0700 Subject: [PATCH] Validators now vote once a second regardless --- src/lib.rs | 1 + src/replicate_stage.rs | 81 +++++++--------------- src/tvu.rs | 35 +++++----- src/vote_stage.rs | 153 +++++++++++++++++++++++++++++++++++++++++ tests/multinode.rs | 2 +- 5 files changed, 199 insertions(+), 73 deletions(-) create mode 100644 src/vote_stage.rs diff --git a/src/lib.rs b/src/lib.rs index 3577591c81..58526729ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,6 +49,7 @@ pub mod timing; pub mod tpu; pub mod transaction; pub mod tvu; +pub mod vote_stage; pub mod voting; pub mod window_stage; pub mod write_stage; diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 5b217af6dd..dbe1051c0f 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -1,7 +1,6 @@ //! The `replicate_stage` replicates transactions broadcast by the leader. use bank::Bank; -use bincode::serialize; use counter::Counter; use crdt::Crdt; use ledger; @@ -9,35 +8,29 @@ use packet::BlobRecycler; use result::{Error, Result}; use service::Service; use signature::KeyPair; -use std::collections::VecDeque; use std::net::UdpSocket; +use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::channel; use std::sync::mpsc::RecvTimeoutError; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; -use streamer::{responder, BlobReceiver, BlobSender}; -use timing; -use transaction::Transaction; +use streamer::{responder, BlobReceiver}; +use vote_stage::VoteStage; use voting::entries_to_votes; pub struct ReplicateStage { thread_hdls: Vec>, } -const VOTE_TIMEOUT_MS: u64 = 1000; - impl ReplicateStage { /// Process entry blobs, already in order fn replicate_requests( - keypair: &Arc, bank: &Arc, crdt: &Arc>, blob_recycler: &BlobRecycler, window_receiver: &BlobReceiver, - vote_blob_sender: &BlobSender, - last_vote: &mut u64, ) -> Result<()> { let timer = Duration::new(1, 0); //coalesce all the available blobs into a single vote @@ -61,30 +54,6 @@ impl ReplicateStage { error!("process_entries {} {:?}", blobs_len, res); } let _ = res?; - let now = timing::timestamp(); - if now - *last_vote > VOTE_TIMEOUT_MS { - let last_id = bank.last_id(); - let shared_blob = blob_recycler.allocate(); - let (vote, addr) = { - let mut wcrdt = crdt.write().unwrap(); - //TODO: doesn't seem like there is a synchronous call to get height and id - info!("replicate_stage {:?}", &last_id[..8]); - wcrdt.new_vote(last_id) - }?; - { - let mut blob = shared_blob.write().unwrap(); - let tx = Transaction::new_vote(&keypair, vote, last_id, 0); - let bytes = serialize(&tx)?; - let len = bytes.len(); - blob.data[..len].copy_from_slice(&bytes); - blob.meta.set_addr(&addr); - blob.meta.size = len; - } - inc_new_counter!("replicate-vote_sent", 1); - *last_vote = now; - - vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?; - } while let Some(blob) = blobs.pop_front() { blob_recycler.recycle(blob); } @@ -96,6 +65,7 @@ impl ReplicateStage { crdt: Arc>, blob_recycler: BlobRecycler, window_receiver: BlobReceiver, + exit: Arc, ) -> Self { let (vote_blob_sender, vote_blob_receiver) = channel(); let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); @@ -105,34 +75,35 @@ impl ReplicateStage { blob_recycler.clone(), vote_blob_receiver, ); - let skeypair = Arc::new(keypair); + + let vote_stage = VoteStage::new( + Arc::new(keypair), + bank.clone(), + crdt.clone(), + blob_recycler.clone(), + vote_blob_sender, + exit, + ); let t_replicate = Builder::new() .name("solana-replicate-stage".to_string()) - .spawn(move || { - let mut timestamp: u64 = 0; - loop { - if let Err(e) = Self::replicate_requests( - &skeypair, - &bank, - &crdt, - &blob_recycler, - &window_receiver, - &vote_blob_sender, - &mut timestamp, - ) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => error!("{:?}", e), - } + .spawn(move || loop { + if let Err(e) = + Self::replicate_requests(&bank, &crdt, &blob_recycler, &window_receiver) + { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => error!("{:?}", e), } } }) .unwrap(); - ReplicateStage { - thread_hdls: vec![t_responder, t_replicate], - } + + let mut thread_hdls = vec![t_responder, t_replicate]; + thread_hdls.extend(vote_stage.thread_hdls()); + + ReplicateStage { thread_hdls } } } diff --git a/src/tvu.rs b/src/tvu.rs index 3c9699fb25..8221430dc6 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -4,22 +4,22 @@ //! ```text //! .--------------------------------------------. //! | | -//! | .--------------------------------+---------. -//! | | TVU | | -//! | | | | -//! | | | | .------------. -//! | | .------------+----------->| Validators | -//! v | .-------. | | | `------------` -//! .----+---. | | | .----+---. .----+------. | -//! | Leader |--------->| Blob | | Window | | Replicate | | -//! `--------` | | Fetch |-->| Stage |-->| Stage | | -//! .------------. | | Stage | | | | | | -//! | Validators |----->| | `--------` `----+------` | -//! `------------` | `-------` | | -//! | | | -//! | | | -//! | | | -//! `--------------------------------|---------` +//! | .--------------------------------+------------. +//! | | TVU | | +//! | | | | +//! | | | | .------------. +//! | | .------------+-------------->| Validators | +//! v | .-------. | | | `------------` +//! .----+---. | | | .----+---. .----+---------. | +//! | Leader |--------->| Blob | | Window | | Replicate | | +//! `--------` | | Fetch |-->| Stage |-->| Stage / | | +//! .------------. | | Stage | | | | Vote Stage | | +//! | Validators |----->| | `--------` `----+---------` | +//! `------------` | `-------` | | +//! | | | +//! | | | +//! | | | +//! `--------------------------------|------------` //! | //! v //! .------. @@ -82,7 +82,7 @@ impl Tvu { let blob_recycler = BlobRecycler::default(); let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket( vec![replicate_socket, repair_socket], - exit, + exit.clone(), &blob_recycler, ); //TODO @@ -103,6 +103,7 @@ impl Tvu { crdt, blob_recycler, blob_window_receiver, + exit, ); Tvu { diff --git a/src/vote_stage.rs b/src/vote_stage.rs new file mode 100644 index 0000000000..7f1567a170 --- /dev/null +++ b/src/vote_stage.rs @@ -0,0 +1,153 @@ +//! The `vote_stage` votes on the `last_id` of the bank at a regular cadence + +use bank::Bank; +use bincode::serialize; +use counter::Counter; +use crdt::Crdt; +use hash::Hash; +use packet::BlobRecycler; +use result::Result; +use service::Service; +use signature::KeyPair; +use std::collections::VecDeque; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{Arc, RwLock}; +use std::thread::{self, sleep, spawn, JoinHandle}; +use std::time::Duration; +use streamer::BlobSender; +use transaction::Transaction; + +const VOTE_TIMEOUT_MS: u64 = 1000; + +pub struct VoteStage { + thread_hdl: JoinHandle<()>, +} + +impl VoteStage { + pub fn new( + keypair: Arc, + bank: Arc, + crdt: Arc>, + blob_recycler: BlobRecycler, + vote_blob_sender: BlobSender, + exit: Arc, + ) -> Self { + let thread_hdl = spawn(move || { + Self::run( + &keypair, + &bank, + &crdt, + &blob_recycler, + &vote_blob_sender, + &exit, + ); + }); + VoteStage { thread_hdl } + } + + fn run( + keypair: &Arc, + bank: &Arc, + crdt: &Arc>, + blob_recycler: &BlobRecycler, + vote_blob_sender: &BlobSender, + exit: &Arc, + ) { + while !exit.load(Ordering::Relaxed) { + let last_id = bank.last_id(); + + if let Err(err) = Self::vote(&last_id, keypair, crdt, blob_recycler, vote_blob_sender) { + info!("Vote failed: {:?}", err); + } + sleep(Duration::from_millis(VOTE_TIMEOUT_MS)); + } + } + + fn vote( + last_id: &Hash, + keypair: &Arc, + crdt: &Arc>, + blob_recycler: &BlobRecycler, + vote_blob_sender: &BlobSender, + ) -> Result<()> { + let shared_blob = blob_recycler.allocate(); + let (vote, addr) = { + let mut wcrdt = crdt.write().unwrap(); + //TODO: doesn't seem like there is a synchronous call to get height and id + info!("replicate_stage {:?}", &last_id[..8]); + wcrdt.new_vote(*last_id) + }?; + { + let mut blob = shared_blob.write().unwrap(); + let tx = Transaction::new_vote(&keypair, vote, *last_id, 0); + let bytes = serialize(&tx)?; + let len = bytes.len(); + blob.data[..len].copy_from_slice(&bytes); + blob.meta.set_addr(&addr); + blob.meta.size = len; + } + inc_new_counter!("replicate-vote_sent", 1); + + vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?; + Ok(()) + } +} + +impl Service for VoteStage { + fn thread_hdls(self) -> Vec> { + vec![self.thread_hdl] + } + + fn join(self) -> thread::Result<()> { + self.thread_hdl.join()?; + Ok(()) + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + use bank::Bank; + use crdt::{Crdt, TestNode}; + use mint::Mint; + use packet::BlobRecycler; + use service::Service; + use signature::{KeyPair, KeyPairUtil}; + use std::sync::atomic::AtomicBool; + use std::sync::mpsc::channel; + use std::sync::{Arc, RwLock}; + + /// Ensure the VoteStage issues votes at the expected cadence + #[test] + fn test_vote_cadence() { + let keypair = KeyPair::new(); + + let mint = Mint::new(1234); + let bank = Arc::new(Bank::new(&mint)); + + let node = TestNode::new_localhost(); + let mut crdt = Crdt::new(node.data.clone()).expect("Crdt::new"); + crdt.set_leader(node.data.id); + let blob_recycler = BlobRecycler::default(); + let (sender, receiver) = channel(); + let exit = Arc::new(AtomicBool::new(false)); + + let vote_stage = VoteStage::new( + Arc::new(keypair), + bank.clone(), + Arc::new(RwLock::new(crdt)), + blob_recycler.clone(), + sender, + exit.clone(), + ); + + receiver.recv().unwrap(); + + let timeout = Duration::from_millis(VOTE_TIMEOUT_MS * 2); + receiver.recv_timeout(timeout).unwrap(); + receiver.recv_timeout(timeout).unwrap(); + + exit.store(true, Ordering::Relaxed); + vote_stage.join().expect("join"); + } +} diff --git a/tests/multinode.rs b/tests/multinode.rs index f067dace4f..0e0b7c25fc 100755 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -360,8 +360,8 @@ fn test_leader_restart_validator_start_from_old_ledger() { let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(expected)); assert_eq!(getbal, Some(expected)); - leader_fullnode.close().unwrap(); val_fullnode.close().unwrap(); + leader_fullnode.close().unwrap(); std::fs::remove_file(ledger_path).unwrap(); std::fs::remove_file(stale_ledger_path).unwrap(); }