Try to measure finality from time seen to when 2/3 of validator..
..set has voted. Add a timestamp to last_ids and use that to see how long from when 2/3s validator set has voted on them.
This commit is contained in:
committed by
sakridge
parent
448b8b1c17
commit
ee0195d588
39
src/bank.rs
39
src/bank.rs
@ -21,7 +21,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::RwLock;
|
||||
use std::time::Instant;
|
||||
use streamer::WINDOW_SIZE;
|
||||
use timing::duration_as_us;
|
||||
use timing::{duration_as_us, timestamp};
|
||||
use transaction::{Instruction, Plan, Transaction};
|
||||
|
||||
/// The number of most recent `last_id` values that the bank will track the signatures
|
||||
@ -79,9 +79,9 @@ pub struct Bank {
|
||||
/// values are so old that the `last_id` has been pulled out of the queue.
|
||||
last_ids: RwLock<VecDeque<Hash>>,
|
||||
|
||||
// Mapping of hashes to signature sets. The bank uses this data to
|
||||
/// Mapping of hashes to signature sets along with timestamp. The bank uses this data to
|
||||
/// reject transactions with signatures its seen before
|
||||
last_ids_sigs: RwLock<HashMap<Hash, HashSet<Signature>>>,
|
||||
last_ids_sigs: RwLock<HashMap<Hash, (HashSet<Signature>, u64)>>,
|
||||
|
||||
/// The number of transactions the bank has processed without error since the
|
||||
/// start of the ledger.
|
||||
@ -155,14 +155,14 @@ impl Bank {
|
||||
.expect("'last_ids' read lock in forget_signature_with_last_id")
|
||||
.get_mut(last_id)
|
||||
{
|
||||
Self::forget_signature(entry, signature);
|
||||
Self::forget_signature(&mut entry.0, signature);
|
||||
}
|
||||
}
|
||||
|
||||
/// Forget all signatures. Useful for benchmarking.
|
||||
pub fn clear_signatures(&self) {
|
||||
for (_, sigs) in self.last_ids_sigs.write().unwrap().iter_mut() {
|
||||
sigs.clear();
|
||||
sigs.0.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@ -172,18 +172,25 @@ impl Bank {
|
||||
.expect("'last_ids' read lock in reserve_signature_with_last_id")
|
||||
.get_mut(last_id)
|
||||
{
|
||||
return Self::reserve_signature(entry, signature);
|
||||
return Self::reserve_signature(&mut entry.0, signature);
|
||||
}
|
||||
Err(BankError::LastIdNotFound(*last_id))
|
||||
}
|
||||
|
||||
/// Look through the last_ids and find all the valid ids
|
||||
/// This is batched to avoid holding the lock for a significant amount of time
|
||||
pub fn count_valid_ids(&self, ids: &[Hash]) -> usize {
|
||||
///
|
||||
/// Return a vec of tuple of (valid index, timestamp)
|
||||
/// index is into the passed ids slice to avoid copying hashes
|
||||
pub fn count_valid_ids(&self, ids: &[Hash]) -> Vec<(usize, u64)> {
|
||||
let last_ids = self.last_ids_sigs.read().unwrap();
|
||||
ids.iter()
|
||||
.map(|id| last_ids.get(id).is_some() as usize)
|
||||
.sum()
|
||||
let mut ret = Vec::new();
|
||||
for (i, id) in ids.iter().enumerate() {
|
||||
if let Some(entry) = last_ids.get(id) {
|
||||
ret.push((i, entry.1));
|
||||
}
|
||||
}
|
||||
ret
|
||||
}
|
||||
|
||||
/// Tell the bank which Entry IDs exist on the ledger. This function
|
||||
@ -201,7 +208,7 @@ impl Bank {
|
||||
let id = last_ids.pop_front().unwrap();
|
||||
last_ids_sigs.remove(&id);
|
||||
}
|
||||
last_ids_sigs.insert(*last_id, HashSet::new());
|
||||
last_ids_sigs.insert(*last_id, (HashSet::new(), timestamp()));
|
||||
last_ids.push_back(*last_id);
|
||||
}
|
||||
|
||||
@ -540,7 +547,7 @@ impl Bank {
|
||||
.read()
|
||||
.expect("'last_ids_sigs' read lock");
|
||||
for (_hash, signatures) in last_ids_sigs.iter() {
|
||||
if signatures.contains(signature) {
|
||||
if signatures.0.contains(signature) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -758,9 +765,11 @@ mod tests {
|
||||
last_id
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(bank.count_valid_ids(&[]), 0);
|
||||
assert_eq!(bank.count_valid_ids(&[mint.last_id()]), 0);
|
||||
assert_eq!(bank.count_valid_ids(&ids), ids.len());
|
||||
assert_eq!(bank.count_valid_ids(&[]).len(), 0);
|
||||
assert_eq!(bank.count_valid_ids(&[mint.last_id()]).len(), 0);
|
||||
for (i, id) in bank.count_valid_ids(&ids).iter().enumerate() {
|
||||
assert_eq!(id.0, i);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -4,11 +4,15 @@ use bank::Bank;
|
||||
use bincode::serialize;
|
||||
use counter::Counter;
|
||||
use crdt::Crdt;
|
||||
use hash::Hash;
|
||||
use influx_db_client as influxdb;
|
||||
use metrics;
|
||||
use packet::{BlobRecycler, SharedBlob};
|
||||
use result::Result;
|
||||
use service::Service;
|
||||
use signature::KeyPair;
|
||||
use std::collections::VecDeque;
|
||||
use std::result;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{self, sleep, spawn, JoinHandle};
|
||||
@ -23,21 +27,25 @@ pub struct VoteStage {
|
||||
thread_hdl: JoinHandle<()>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
enum VoteError {
|
||||
NoValidLastIdsToVoteOn,
|
||||
}
|
||||
|
||||
pub fn create_vote_tx_and_blob(
|
||||
bank: &Arc<Bank>,
|
||||
last_id: &Hash,
|
||||
keypair: &KeyPair,
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
blob_recycler: &BlobRecycler,
|
||||
) -> Result<(Transaction, SharedBlob)> {
|
||||
let last_id = bank.last_id();
|
||||
let shared_blob = blob_recycler.allocate();
|
||||
let (vote, addr) = {
|
||||
let mut wcrdt = crdt.write().unwrap();
|
||||
//TODO: doesn't seem like there is a synchronous call to get height and id
|
||||
info!("voting on {:?}", &last_id.as_ref()[..8]);
|
||||
wcrdt.new_vote(last_id)
|
||||
debug!("voting on {:?}", &last_id.as_ref()[..8]);
|
||||
wcrdt.new_vote(*last_id)
|
||||
}?;
|
||||
let tx = Transaction::new_vote(&keypair, vote, last_id, 0);
|
||||
let tx = Transaction::new_vote(&keypair, vote, *last_id, 0);
|
||||
{
|
||||
let mut blob = shared_blob.write().unwrap();
|
||||
let bytes = serialize(&tx)?;
|
||||
@ -49,6 +57,38 @@ pub fn create_vote_tx_and_blob(
|
||||
Ok((tx, shared_blob))
|
||||
}
|
||||
|
||||
fn get_last_id_to_vote_on(
|
||||
debug_id: u64,
|
||||
ids: &[Hash],
|
||||
bank: &Arc<Bank>,
|
||||
now: u64,
|
||||
last_vote: &mut u64,
|
||||
) -> result::Result<(Hash, u64), VoteError> {
|
||||
let mut valid_ids = bank.count_valid_ids(&ids);
|
||||
let super_majority_index = (2 * ids.len()) / 3;
|
||||
|
||||
//TODO(anatoly): this isn't stake based voting
|
||||
debug!(
|
||||
"{:x}: valid_ids {}/{} {}",
|
||||
debug_id,
|
||||
valid_ids.len(),
|
||||
ids.len(),
|
||||
super_majority_index,
|
||||
);
|
||||
|
||||
if valid_ids.len() > super_majority_index {
|
||||
*last_vote = now;
|
||||
|
||||
// Sort by timestamp
|
||||
valid_ids.sort_by(|a, b| a.1.cmp(&b.1));
|
||||
|
||||
let last_id = ids[valid_ids[super_majority_index].0];
|
||||
return Ok((last_id, valid_ids[super_majority_index].1));
|
||||
}
|
||||
|
||||
Err(VoteError::NoValidLastIdsToVoteOn)
|
||||
}
|
||||
|
||||
pub fn send_leader_vote(
|
||||
debug_id: u64,
|
||||
keypair: &KeyPair,
|
||||
@ -60,32 +100,32 @@ pub fn send_leader_vote(
|
||||
) -> Result<()> {
|
||||
let now = timing::timestamp();
|
||||
if now - *last_vote > VOTE_TIMEOUT_MS {
|
||||
//TODO(anatoly): vote if the last id set is mostly valid
|
||||
let ids: Vec<_> = crdt.read()
|
||||
.unwrap()
|
||||
.table
|
||||
.values()
|
||||
.map(|x| x.ledger_state.last_id)
|
||||
.collect();
|
||||
let total = bank.count_valid_ids(&ids);
|
||||
//TODO(anatoly): this isn't stake based voting
|
||||
info!(
|
||||
"{:x}: valid_ids {}/{} {}",
|
||||
debug_id,
|
||||
total,
|
||||
ids.len(),
|
||||
(2 * ids.len()) / 3
|
||||
);
|
||||
if total > (2 * ids.len()) / 3 {
|
||||
*last_vote = now;
|
||||
|
||||
if let Ok((last_id, super_majority_timestamp)) =
|
||||
get_last_id_to_vote_on(debug_id, &ids, bank, now, last_vote)
|
||||
{
|
||||
if let Ok((tx, shared_blob)) =
|
||||
create_vote_tx_and_blob(bank, keypair, crdt, blob_recycler)
|
||||
create_vote_tx_and_blob(&last_id, keypair, crdt, blob_recycler)
|
||||
{
|
||||
bank.process_transaction(&tx)?;
|
||||
vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?;
|
||||
info!("{:x} leader_sent_vote", debug_id);
|
||||
inc_new_counter!("write_stage-leader_sent_vote", 1);
|
||||
let finality_ms = now - super_majority_timestamp;
|
||||
debug!(
|
||||
"{:x} leader_sent_vote finality: {} ms",
|
||||
debug_id, finality_ms
|
||||
);
|
||||
inc_new_counter!("vote_stage-leader_sent_vote", 1);
|
||||
|
||||
metrics::submit(
|
||||
influxdb::Point::new(&"leader-finality")
|
||||
.add_field("duration_ms", influxdb::Value::Integer(finality_ms as i64))
|
||||
.to_owned(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -99,7 +139,8 @@ fn send_validator_vote(
|
||||
blob_recycler: &BlobRecycler,
|
||||
vote_blob_sender: &BlobSender,
|
||||
) -> Result<()> {
|
||||
if let Ok((_, shared_blob)) = create_vote_tx_and_blob(bank, keypair, crdt, blob_recycler) {
|
||||
let last_id = bank.last_id();
|
||||
if let Ok((_, shared_blob)) = create_vote_tx_and_blob(&last_id, keypair, crdt, blob_recycler) {
|
||||
inc_new_counter!("replicate-vote_sent", 1);
|
||||
|
||||
vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?;
|
||||
@ -165,7 +206,7 @@ pub mod tests {
|
||||
use bank::Bank;
|
||||
use crdt::{Crdt, NodeInfo, TestNode};
|
||||
use entry::next_entry;
|
||||
use hash::Hash;
|
||||
use hash::{hash, Hash};
|
||||
use logger;
|
||||
use mint::Mint;
|
||||
use packet::BlobRecycler;
|
||||
@ -316,4 +357,39 @@ pub mod tests {
|
||||
assert!(vote_blob.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_last_id_to_vote_on() {
|
||||
logger::setup();
|
||||
|
||||
let mint = Mint::new(1234);
|
||||
let bank = Arc::new(Bank::new(&mint));
|
||||
let mut last_vote = 0;
|
||||
|
||||
// generate 10 last_ids, register 6 with the bank
|
||||
let ids: Vec<_> = (0..10)
|
||||
.map(|i| {
|
||||
let last_id = hash(&serialize(&i).unwrap()); // Unique hash
|
||||
if i < 6 {
|
||||
bank.register_entry_id(&last_id);
|
||||
}
|
||||
// sleep to get a different timestamp in the bank
|
||||
sleep(Duration::from_millis(1));
|
||||
last_id
|
||||
})
|
||||
.collect();
|
||||
|
||||
// see that we fail to have 2/3rds consensus
|
||||
assert!(get_last_id_to_vote_on(1234, &ids, &bank, 0, &mut last_vote).is_err());
|
||||
|
||||
// register another, see passing
|
||||
bank.register_entry_id(&ids[6]);
|
||||
|
||||
let res = get_last_id_to_vote_on(1234, &ids, &bank, 0, &mut last_vote);
|
||||
if let Ok((hash, timestamp)) = res {
|
||||
assert!(hash == ids[6]);
|
||||
assert!(timestamp != 0);
|
||||
} else {
|
||||
assert!(false, "get_last_id returned error!: {:?}", res);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user