From ee0195d588f88130ebde994a5c84b4df715b659d Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Wed, 1 Aug 2018 18:01:22 +0000 Subject: [PATCH] Try to measure finality from time seen to when 2/3 of validator.. ..set has voted. Add a timestamp to last_ids and use that to see how long from when 2/3s validator set has voted on them. --- src/bank.rs | 39 +++++++++------ src/vote_stage.rs | 122 +++++++++++++++++++++++++++++++++++++--------- 2 files changed, 123 insertions(+), 38 deletions(-) diff --git a/src/bank.rs b/src/bank.rs index 31be14aa66..acd5d59eca 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -21,7 +21,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::RwLock; use std::time::Instant; use streamer::WINDOW_SIZE; -use timing::duration_as_us; +use timing::{duration_as_us, timestamp}; use transaction::{Instruction, Plan, Transaction}; /// The number of most recent `last_id` values that the bank will track the signatures @@ -79,9 +79,9 @@ pub struct Bank { /// values are so old that the `last_id` has been pulled out of the queue. last_ids: RwLock>, - // Mapping of hashes to signature sets. The bank uses this data to + /// Mapping of hashes to signature sets along with timestamp. The bank uses this data to /// reject transactions with signatures its seen before - last_ids_sigs: RwLock>>, + last_ids_sigs: RwLock, u64)>>, /// The number of transactions the bank has processed without error since the /// start of the ledger. @@ -155,14 +155,14 @@ impl Bank { .expect("'last_ids' read lock in forget_signature_with_last_id") .get_mut(last_id) { - Self::forget_signature(entry, signature); + Self::forget_signature(&mut entry.0, signature); } } /// Forget all signatures. Useful for benchmarking. pub fn clear_signatures(&self) { for (_, sigs) in self.last_ids_sigs.write().unwrap().iter_mut() { - sigs.clear(); + sigs.0.clear(); } } @@ -172,18 +172,25 @@ impl Bank { .expect("'last_ids' read lock in reserve_signature_with_last_id") .get_mut(last_id) { - return Self::reserve_signature(entry, signature); + return Self::reserve_signature(&mut entry.0, signature); } Err(BankError::LastIdNotFound(*last_id)) } /// Look through the last_ids and find all the valid ids /// This is batched to avoid holding the lock for a significant amount of time - pub fn count_valid_ids(&self, ids: &[Hash]) -> usize { + /// + /// Return a vec of tuple of (valid index, timestamp) + /// index is into the passed ids slice to avoid copying hashes + pub fn count_valid_ids(&self, ids: &[Hash]) -> Vec<(usize, u64)> { let last_ids = self.last_ids_sigs.read().unwrap(); - ids.iter() - .map(|id| last_ids.get(id).is_some() as usize) - .sum() + let mut ret = Vec::new(); + for (i, id) in ids.iter().enumerate() { + if let Some(entry) = last_ids.get(id) { + ret.push((i, entry.1)); + } + } + ret } /// Tell the bank which Entry IDs exist on the ledger. This function @@ -201,7 +208,7 @@ impl Bank { let id = last_ids.pop_front().unwrap(); last_ids_sigs.remove(&id); } - last_ids_sigs.insert(*last_id, HashSet::new()); + last_ids_sigs.insert(*last_id, (HashSet::new(), timestamp())); last_ids.push_back(*last_id); } @@ -540,7 +547,7 @@ impl Bank { .read() .expect("'last_ids_sigs' read lock"); for (_hash, signatures) in last_ids_sigs.iter() { - if signatures.contains(signature) { + if signatures.0.contains(signature) { return true; } } @@ -758,9 +765,11 @@ mod tests { last_id }) .collect(); - assert_eq!(bank.count_valid_ids(&[]), 0); - assert_eq!(bank.count_valid_ids(&[mint.last_id()]), 0); - assert_eq!(bank.count_valid_ids(&ids), ids.len()); + assert_eq!(bank.count_valid_ids(&[]).len(), 0); + assert_eq!(bank.count_valid_ids(&[mint.last_id()]).len(), 0); + for (i, id) in bank.count_valid_ids(&ids).iter().enumerate() { + assert_eq!(id.0, i); + } } #[test] diff --git a/src/vote_stage.rs b/src/vote_stage.rs index b664cf3707..9c0d03d9e9 100644 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -4,11 +4,15 @@ use bank::Bank; use bincode::serialize; use counter::Counter; use crdt::Crdt; +use hash::Hash; +use influx_db_client as influxdb; +use metrics; use packet::{BlobRecycler, SharedBlob}; use result::Result; use service::Service; use signature::KeyPair; use std::collections::VecDeque; +use std::result; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{self, sleep, spawn, JoinHandle}; @@ -23,21 +27,25 @@ pub struct VoteStage { thread_hdl: JoinHandle<()>, } +#[derive(Debug, PartialEq, Eq)] +enum VoteError { + NoValidLastIdsToVoteOn, +} + pub fn create_vote_tx_and_blob( - bank: &Arc, + last_id: &Hash, keypair: &KeyPair, crdt: &Arc>, blob_recycler: &BlobRecycler, ) -> Result<(Transaction, SharedBlob)> { - let last_id = bank.last_id(); let shared_blob = blob_recycler.allocate(); let (vote, addr) = { let mut wcrdt = crdt.write().unwrap(); //TODO: doesn't seem like there is a synchronous call to get height and id - info!("voting on {:?}", &last_id.as_ref()[..8]); - wcrdt.new_vote(last_id) + debug!("voting on {:?}", &last_id.as_ref()[..8]); + wcrdt.new_vote(*last_id) }?; - let tx = Transaction::new_vote(&keypair, vote, last_id, 0); + let tx = Transaction::new_vote(&keypair, vote, *last_id, 0); { let mut blob = shared_blob.write().unwrap(); let bytes = serialize(&tx)?; @@ -49,6 +57,38 @@ pub fn create_vote_tx_and_blob( Ok((tx, shared_blob)) } +fn get_last_id_to_vote_on( + debug_id: u64, + ids: &[Hash], + bank: &Arc, + now: u64, + last_vote: &mut u64, +) -> result::Result<(Hash, u64), VoteError> { + let mut valid_ids = bank.count_valid_ids(&ids); + let super_majority_index = (2 * ids.len()) / 3; + + //TODO(anatoly): this isn't stake based voting + debug!( + "{:x}: valid_ids {}/{} {}", + debug_id, + valid_ids.len(), + ids.len(), + super_majority_index, + ); + + if valid_ids.len() > super_majority_index { + *last_vote = now; + + // Sort by timestamp + valid_ids.sort_by(|a, b| a.1.cmp(&b.1)); + + let last_id = ids[valid_ids[super_majority_index].0]; + return Ok((last_id, valid_ids[super_majority_index].1)); + } + + Err(VoteError::NoValidLastIdsToVoteOn) +} + pub fn send_leader_vote( debug_id: u64, keypair: &KeyPair, @@ -60,32 +100,32 @@ pub fn send_leader_vote( ) -> Result<()> { let now = timing::timestamp(); if now - *last_vote > VOTE_TIMEOUT_MS { - //TODO(anatoly): vote if the last id set is mostly valid let ids: Vec<_> = crdt.read() .unwrap() .table .values() .map(|x| x.ledger_state.last_id) .collect(); - let total = bank.count_valid_ids(&ids); - //TODO(anatoly): this isn't stake based voting - info!( - "{:x}: valid_ids {}/{} {}", - debug_id, - total, - ids.len(), - (2 * ids.len()) / 3 - ); - if total > (2 * ids.len()) / 3 { - *last_vote = now; - + if let Ok((last_id, super_majority_timestamp)) = + get_last_id_to_vote_on(debug_id, &ids, bank, now, last_vote) + { if let Ok((tx, shared_blob)) = - create_vote_tx_and_blob(bank, keypair, crdt, blob_recycler) + create_vote_tx_and_blob(&last_id, keypair, crdt, blob_recycler) { bank.process_transaction(&tx)?; vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?; - info!("{:x} leader_sent_vote", debug_id); - inc_new_counter!("write_stage-leader_sent_vote", 1); + let finality_ms = now - super_majority_timestamp; + debug!( + "{:x} leader_sent_vote finality: {} ms", + debug_id, finality_ms + ); + inc_new_counter!("vote_stage-leader_sent_vote", 1); + + metrics::submit( + influxdb::Point::new(&"leader-finality") + .add_field("duration_ms", influxdb::Value::Integer(finality_ms as i64)) + .to_owned(), + ); } } } @@ -99,7 +139,8 @@ fn send_validator_vote( blob_recycler: &BlobRecycler, vote_blob_sender: &BlobSender, ) -> Result<()> { - if let Ok((_, shared_blob)) = create_vote_tx_and_blob(bank, keypair, crdt, blob_recycler) { + let last_id = bank.last_id(); + if let Ok((_, shared_blob)) = create_vote_tx_and_blob(&last_id, keypair, crdt, blob_recycler) { inc_new_counter!("replicate-vote_sent", 1); vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?; @@ -165,7 +206,7 @@ pub mod tests { use bank::Bank; use crdt::{Crdt, NodeInfo, TestNode}; use entry::next_entry; - use hash::Hash; + use hash::{hash, Hash}; use logger; use mint::Mint; use packet::BlobRecycler; @@ -316,4 +357,39 @@ pub mod tests { assert!(vote_blob.is_ok()); } + #[test] + fn test_get_last_id_to_vote_on() { + logger::setup(); + + let mint = Mint::new(1234); + let bank = Arc::new(Bank::new(&mint)); + let mut last_vote = 0; + + // generate 10 last_ids, register 6 with the bank + let ids: Vec<_> = (0..10) + .map(|i| { + let last_id = hash(&serialize(&i).unwrap()); // Unique hash + if i < 6 { + bank.register_entry_id(&last_id); + } + // sleep to get a different timestamp in the bank + sleep(Duration::from_millis(1)); + last_id + }) + .collect(); + + // see that we fail to have 2/3rds consensus + assert!(get_last_id_to_vote_on(1234, &ids, &bank, 0, &mut last_vote).is_err()); + + // register another, see passing + bank.register_entry_id(&ids[6]); + + let res = get_last_id_to_vote_on(1234, &ids, &bank, 0, &mut last_vote); + if let Ok((hash, timestamp)) = res { + assert!(hash == ids[6]); + assert!(timestamp != 0); + } else { + assert!(false, "get_last_id returned error!: {:?}", res); + } + } }