From 5e91d31ed39acfce1fe46cf28e71f5022a247e40 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Wed, 20 Jun 2018 11:40:41 -0700 Subject: [PATCH] issue 309 part 1 * limit the number of Tntries per Blob to at most one * limit the number of Transactions per Entry such that an Entry will always fit in a Blob With a one-to-one map of Entries to Blobs, recovery of a validator is a simple fast-forward from the end of the initial genesis.log and tx-*.logs Entries. TODO: initialize validators' blob index with initial # of Entries. --- src/entry.rs | 15 +++- src/ledger.rs | 214 ++++++++++++++++++++++++++------------------------ src/tvu.rs | 36 +++++---- 3 files changed, 142 insertions(+), 123 deletions(-) diff --git a/src/entry.rs b/src/entry.rs index 19e4d13d8c..44382cacf4 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -2,7 +2,9 @@ //! unique ID that is the hash of the Entry before it, plus the hash of the //! transactions within it. Entries cannot be reordered, and its field `num_hashes` //! represents an approximate amount of time since the last Entry was created. +use bincode::serialized_size; use hash::{extend_and_hash, hash, Hash}; +use packet::BLOB_DATA_SIZE; use rayon::prelude::*; use transaction::Transaction; @@ -40,11 +42,13 @@ impl Entry { pub fn new(start_hash: &Hash, cur_hashes: u64, transactions: Vec) -> Self { let num_hashes = cur_hashes + if transactions.is_empty() { 0 } else { 1 }; let id = next_hash(start_hash, 0, &transactions); - Entry { + let entry = Entry { num_hashes, id, transactions, - } + }; + assert!(serialized_size(&entry).unwrap() <= BLOB_DATA_SIZE as u64); + entry } /// Creates the next Tick Entry `num_hashes` after `start_hash`. @@ -56,6 +60,7 @@ impl Entry { let entry = Self::new(start_hash, *cur_hashes, transactions); *start_hash = entry.id; *cur_hashes = 0; + assert!(serialized_size(&entry).unwrap() <= BLOB_DATA_SIZE as u64); entry } @@ -180,6 +185,12 @@ mod tests { let tick = next_entry(&zero, 0, vec![]); assert_eq!(tick.num_hashes, 0); assert_eq!(tick.id, zero); + + let keypair = KeyPair::new(); + let tx0 = Transaction::new_timestamp(&keypair, Utc::now(), zero); + let entry0 = next_entry(&zero, 1, vec![tx0.clone()]); + assert_eq!(entry0.num_hashes, 1); + assert_eq!(entry0.id, next_hash(&zero, 1, &vec![tx0])); } #[test] diff --git a/src/ledger.rs b/src/ledger.rs index 6eaf3b1f26..8b6e043953 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -1,18 +1,17 @@ //! The `ledger` module provides functions for parallel verification of the //! Proof of History ledger. -use bincode::{self, deserialize, serialize_into}; -use entry::{next_entry, Entry}; +use bincode::{self, deserialize, serialize_into, serialized_size}; +use entry::Entry; use hash::Hash; -use packet; -use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_SIZE}; +use packet::{self, SharedBlob, BLOB_DATA_SIZE, BLOB_SIZE}; use rayon::prelude::*; -use std::cmp::min; use std::collections::VecDeque; use std::io::Cursor; -use std::mem::size_of; use transaction::Transaction; +// a Block is a slice of Entries + pub trait Block { /// Verifies the hashes and counts of a slice of transactions are all consistent. fn verify(&self, start_hash: &Hash) -> bool; @@ -27,111 +26,101 @@ impl Block for [Entry] { } fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut VecDeque) { - let mut start = 0; - let mut end = 0; - while start < self.len() { - let mut entries: Vec> = Vec::new(); - let mut total = 0; - for i in &self[start..] { - total += size_of::() * i.transactions.len(); - total += size_of::(); - if total >= BLOB_DATA_SIZE { - break; - } - end += 1; - } - // See if we need to split the transactions - if end <= start { - let mut transaction_start = 0; - let num_transactions_per_blob = BLOB_DATA_SIZE / size_of::(); - let total_entry_chunks = (self[end].transactions.len() + num_transactions_per_blob - - 1) / num_transactions_per_blob; - trace!( - "splitting transactions end: {} total_chunks: {}", - end, - total_entry_chunks - ); - for _ in 0..total_entry_chunks { - let transaction_end = min( - transaction_start + num_transactions_per_blob, - self[end].transactions.len(), - ); - let mut entry = Entry { - num_hashes: self[end].num_hashes, - id: self[end].id, - transactions: self[end].transactions[transaction_start..transaction_end] - .to_vec(), - }; - entries.push(vec![entry]); - transaction_start = transaction_end; - } - end += 1; - } else { - entries.push(self[start..end].to_vec()); - } - - for entry in entries { - let b = blob_recycler.allocate(); - let pos = { - let mut bd = b.write().unwrap(); - let mut out = Cursor::new(bd.data_mut()); - serialize_into(&mut out, &entry).expect("failed to serialize output"); - out.position() as usize - }; - assert!(pos < BLOB_SIZE); - b.write().unwrap().set_size(pos); - q.push_back(b); - } - start = end; + for entry in self { + let blob = blob_recycler.allocate(); + let pos = { + let mut bd = blob.write().unwrap(); + let mut out = Cursor::new(bd.data_mut()); + serialize_into(&mut out, &entry).expect("failed to serialize output"); + out.position() as usize + }; + assert!(pos < BLOB_SIZE); + blob.write().unwrap().set_size(pos); + q.push_back(blob); } } } -/// Create a vector of Entries of length `transaction_batches.len()` from `start_hash` hash, `num_hashes`, and `transaction_batches`. -pub fn next_entries( - start_hash: &Hash, - num_hashes: u64, - transaction_batches: Vec>, -) -> Vec { - let mut id = *start_hash; - let mut entries = vec![]; - for transactions in transaction_batches { - let entry = next_entry(&id, num_hashes, transactions); - id = entry.id; - entries.push(entry); - } - entries -} - pub fn reconstruct_entries_from_blobs(blobs: &VecDeque) -> bincode::Result> { - let mut entries_to_apply: Vec = Vec::new(); - let mut last_id = Hash::default(); + let mut entries: Vec = Vec::with_capacity(blobs.len()); for msgs in blobs { let blob = msgs.read().unwrap(); - let entries: Vec = deserialize(&blob.data()[..blob.meta.size])?; - for entry in entries { - if entry.id == last_id { - if let Some(last_entry) = entries_to_apply.last_mut() { - last_entry.transactions.extend(entry.transactions); - } - } else { - last_id = entry.id; - entries_to_apply.push(entry); - } - } + let entry: Entry = deserialize(&blob.data()[..blob.meta.size])?; + entries.push(entry); } - Ok(entries_to_apply) + Ok(entries) +} + +/// Creates the next entries for given transactions, outputs +/// updates start_hash to id of last Entry, sets cur_hashes to 0 +pub fn next_entries_mut( + start_hash: &mut Hash, + cur_hashes: &mut u64, + transactions: Vec, +) -> Vec { + if transactions.is_empty() { + vec![Entry::new_mut(start_hash, cur_hashes, transactions)] + } else { + let mut chunk_len = transactions.len(); + + // check for fit, make sure they can be serialized + while serialized_size(&Entry { + num_hashes: 0, + id: Hash::default(), + transactions: transactions[0..chunk_len].to_vec(), + }).unwrap() > BLOB_DATA_SIZE as u64 + { + chunk_len /= 2; + } + + let mut entries = Vec::with_capacity(transactions.len() / chunk_len + 1); + + for chunk in transactions.chunks(chunk_len) { + entries.push(Entry::new_mut(start_hash, cur_hashes, chunk.to_vec())); + } + entries + } +} + +/// Creates the next Entries for given transactions +pub fn next_entries( + start_hash: &Hash, + cur_hashes: u64, + transactions: Vec, +) -> Vec { + let mut id = *start_hash; + let mut num_hashes = cur_hashes; + next_entries_mut(&mut id, &mut num_hashes, transactions) } #[cfg(test)] mod tests { use super::*; + use entry::{next_entry, Entry}; use hash::hash; use packet::BlobRecycler; use signature::{KeyPair, KeyPairUtil}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use transaction::Transaction; + /// Create a vector of Entries of length `transaction_batches.len()` + /// from `start_hash` hash, `num_hashes`, and `transaction_batches`. + fn next_entries_batched( + start_hash: &Hash, + cur_hashes: u64, + transaction_batches: Vec>, + ) -> Vec { + let mut id = *start_hash; + let mut entries = vec![]; + let mut num_hashes = cur_hashes; + + for transactions in transaction_batches { + let mut entry_batch = next_entries_mut(&mut id, &mut num_hashes, transactions); + entries.append(&mut entry_batch); + } + entries + } + #[test] fn test_verify_slice() { let zero = Hash::default(); @@ -139,23 +128,22 @@ mod tests { assert!(vec![][..].verify(&zero)); // base case assert!(vec![Entry::new_tick(0, &zero)][..].verify(&zero)); // singleton case 1 assert!(!vec![Entry::new_tick(0, &zero)][..].verify(&one)); // singleton case 2, bad - assert!(next_entries(&zero, 0, vec![vec![]; 2])[..].verify(&zero)); // inductive step + assert!(next_entries_batched(&zero, 0, vec![vec![]; 2])[..].verify(&zero)); // inductive step - let mut bad_ticks = next_entries(&zero, 0, vec![vec![]; 2]); + let mut bad_ticks = next_entries_batched(&zero, 0, vec![vec![]; 2]); bad_ticks[1].id = one; assert!(!bad_ticks.verify(&zero)); // inductive step, bad } #[test] - fn test_entry_to_blobs() { + fn test_entries_to_blobs() { let zero = Hash::default(); let one = hash(&zero); let keypair = KeyPair::new(); let tx0 = Transaction::new(&keypair, keypair.pubkey(), 1, one); - let transactions = vec![tx0; 10000]; - let e0 = Entry::new(&zero, 0, transactions); + let transactions = vec![tx0; 10_000]; + let entries = next_entries(&zero, 0, transactions); - let entries = vec![e0]; let blob_recycler = BlobRecycler::default(); let mut blob_q = VecDeque::new(); entries.to_blobs(&blob_recycler, &mut blob_q); @@ -172,14 +160,16 @@ mod tests { } #[test] - fn test_next_entries() { + fn test_next_entries_batched() { + // this also tests next_entries, ugly, but is an easy way to do vec of vec (batch) let mut id = Hash::default(); let next_id = hash(&id); let keypair = KeyPair::new(); let tx0 = Transaction::new(&keypair, keypair.pubkey(), 1, next_id); + let transactions = vec![tx0; 5]; let transaction_batches = vec![transactions.clone(); 5]; - let entries0 = next_entries(&id, 1, transaction_batches); + let entries0 = next_entries_batched(&id, 0, transaction_batches); assert_eq!(entries0.len(), 5); @@ -197,14 +187,30 @@ mod tests { mod bench { extern crate test; use self::test::Bencher; + use hash::hash; use ledger::*; + use packet::BlobRecycler; + use signature::{KeyPair, KeyPairUtil}; + use transaction::Transaction; #[bench] - fn bench_next_entries(bencher: &mut Bencher) { - let start_hash = Hash::default(); - let entries = next_entries(&start_hash, 10_000, vec![vec![]; 8]); + fn bench_block_to_blobs_to_block(bencher: &mut Bencher) { + let zero = Hash::default(); + let one = hash(&zero); + let keypair = KeyPair::new(); + let tx0 = Transaction::new(&keypair, keypair.pubkey(), 1, one); + let transactions = vec![tx0; 10]; + let entries = next_entries(&zero, 1, transactions); + + let blob_recycler = BlobRecycler::default(); bencher.iter(|| { - assert!(entries.verify(&start_hash)); + let mut blob_q = VecDeque::new(); + entries.to_blobs(&blob_recycler, &mut blob_q); + assert_eq!(reconstruct_entries_from_blobs(&blob_q).unwrap(), entries); + for blob in blob_q { + blob_recycler.recycle(blob); + } }); } + } diff --git a/src/tvu.rs b/src/tvu.rs index 466aefbc5b..f338d0b1e7 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -204,16 +204,11 @@ pub mod tests { let mut alice_ref_balance = starting_balance; let mut msgs = VecDeque::new(); let mut cur_hash = Hash::default(); - let num_blobs = 10; + let mut blob_id = 0; + let num_transfers = 10; let transfer_amount = 501; let bob_keypair = KeyPair::new(); - for i in 0..num_blobs { - let b = resp_recycler.allocate(); - let b_ = b.clone(); - let mut w = b.write().unwrap(); - w.set_index(i).unwrap(); - w.set_id(leader_id).unwrap(); - + for i in 0..num_transfers { let entry0 = Entry::new(&cur_hash, i, vec![]); bank.register_entry_id(&cur_hash); cur_hash = hash(&cur_hash); @@ -226,19 +221,28 @@ pub mod tests { ); bank.register_entry_id(&cur_hash); cur_hash = hash(&cur_hash); - let entry1 = Entry::new(&cur_hash, i + num_blobs, vec![tx0]); + let entry1 = Entry::new(&cur_hash, i + num_transfers, vec![tx0]); bank.register_entry_id(&cur_hash); cur_hash = hash(&cur_hash); alice_ref_balance -= transfer_amount; - let serialized_entry = serialize(&vec![entry0, entry1]).unwrap(); + for entry in vec![entry0, entry1] { + let b = resp_recycler.allocate(); + let b_ = b.clone(); + let mut w = b.write().unwrap(); + w.set_index(blob_id).unwrap(); + blob_id += 1; + w.set_id(leader_id).unwrap(); - w.data_mut()[..serialized_entry.len()].copy_from_slice(&serialized_entry); - w.set_size(serialized_entry.len()); - w.meta.set_addr(&replicate_addr); - drop(w); - msgs.push_back(b_); + let serialized_entry = serialize(&entry).unwrap(); + + w.data_mut()[..serialized_entry.len()].copy_from_slice(&serialized_entry); + w.set_size(serialized_entry.len()); + w.meta.set_addr(&replicate_addr); + drop(w); + msgs.push_back(b_); + } } // send the blobs into the socket @@ -246,10 +250,8 @@ pub mod tests { // receive retransmitted messages let timer = Duration::new(1, 0); - let mut msgs: Vec<_> = Vec::new(); while let Ok(msg) = r_reader.recv_timeout(timer) { trace!("msg: {:?}", msg); - msgs.push(msg); } let alice_balance = bank.get_balance(&mint.keypair().pubkey()).unwrap();