issue 309 part 1

* limit the number of Tntries per Blob to at most one
* limit the number of Transactions per Entry such that an Entry will
    always fit in a Blob

With a one-to-one map of Entries to Blobs, recovery of a validator
  is a simple fast-forward from the end of the initial genesis.log
  and tx-*.logs Entries.

TODO: initialize validators' blob index with initial # of Entries.
This commit is contained in:
Rob Walker
2018-06-20 11:40:41 -07:00
parent fad9d20820
commit 5e91d31ed3
3 changed files with 142 additions and 123 deletions

View File

@ -2,7 +2,9 @@
//! unique ID that is the hash of the Entry before it, plus the hash of the //! unique ID that is the hash of the Entry before it, plus the hash of the
//! transactions within it. Entries cannot be reordered, and its field `num_hashes` //! transactions within it. Entries cannot be reordered, and its field `num_hashes`
//! represents an approximate amount of time since the last Entry was created. //! represents an approximate amount of time since the last Entry was created.
use bincode::serialized_size;
use hash::{extend_and_hash, hash, Hash}; use hash::{extend_and_hash, hash, Hash};
use packet::BLOB_DATA_SIZE;
use rayon::prelude::*; use rayon::prelude::*;
use transaction::Transaction; use transaction::Transaction;
@ -40,11 +42,13 @@ impl Entry {
pub fn new(start_hash: &Hash, cur_hashes: u64, transactions: Vec<Transaction>) -> Self { pub fn new(start_hash: &Hash, cur_hashes: u64, transactions: Vec<Transaction>) -> Self {
let num_hashes = cur_hashes + if transactions.is_empty() { 0 } else { 1 }; let num_hashes = cur_hashes + if transactions.is_empty() { 0 } else { 1 };
let id = next_hash(start_hash, 0, &transactions); let id = next_hash(start_hash, 0, &transactions);
Entry { let entry = Entry {
num_hashes, num_hashes,
id, id,
transactions, transactions,
} };
assert!(serialized_size(&entry).unwrap() <= BLOB_DATA_SIZE as u64);
entry
} }
/// Creates the next Tick Entry `num_hashes` after `start_hash`. /// Creates the next Tick Entry `num_hashes` after `start_hash`.
@ -56,6 +60,7 @@ impl Entry {
let entry = Self::new(start_hash, *cur_hashes, transactions); let entry = Self::new(start_hash, *cur_hashes, transactions);
*start_hash = entry.id; *start_hash = entry.id;
*cur_hashes = 0; *cur_hashes = 0;
assert!(serialized_size(&entry).unwrap() <= BLOB_DATA_SIZE as u64);
entry entry
} }
@ -180,6 +185,12 @@ mod tests {
let tick = next_entry(&zero, 0, vec![]); let tick = next_entry(&zero, 0, vec![]);
assert_eq!(tick.num_hashes, 0); assert_eq!(tick.num_hashes, 0);
assert_eq!(tick.id, zero); assert_eq!(tick.id, zero);
let keypair = KeyPair::new();
let tx0 = Transaction::new_timestamp(&keypair, Utc::now(), zero);
let entry0 = next_entry(&zero, 1, vec![tx0.clone()]);
assert_eq!(entry0.num_hashes, 1);
assert_eq!(entry0.id, next_hash(&zero, 1, &vec![tx0]));
} }
#[test] #[test]

View File

@ -1,18 +1,17 @@
//! The `ledger` module provides functions for parallel verification of the //! The `ledger` module provides functions for parallel verification of the
//! Proof of History ledger. //! Proof of History ledger.
use bincode::{self, deserialize, serialize_into}; use bincode::{self, deserialize, serialize_into, serialized_size};
use entry::{next_entry, Entry}; use entry::Entry;
use hash::Hash; use hash::Hash;
use packet; use packet::{self, SharedBlob, BLOB_DATA_SIZE, BLOB_SIZE};
use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_SIZE};
use rayon::prelude::*; use rayon::prelude::*;
use std::cmp::min;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::Cursor; use std::io::Cursor;
use std::mem::size_of;
use transaction::Transaction; use transaction::Transaction;
// a Block is a slice of Entries
pub trait Block { pub trait Block {
/// Verifies the hashes and counts of a slice of transactions are all consistent. /// Verifies the hashes and counts of a slice of transactions are all consistent.
fn verify(&self, start_hash: &Hash) -> bool; fn verify(&self, start_hash: &Hash) -> bool;
@ -27,111 +26,101 @@ impl Block for [Entry] {
} }
fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut VecDeque<SharedBlob>) { fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut VecDeque<SharedBlob>) {
let mut start = 0; for entry in self {
let mut end = 0; let blob = blob_recycler.allocate();
while start < self.len() {
let mut entries: Vec<Vec<Entry>> = Vec::new();
let mut total = 0;
for i in &self[start..] {
total += size_of::<Transaction>() * i.transactions.len();
total += size_of::<Entry>();
if total >= BLOB_DATA_SIZE {
break;
}
end += 1;
}
// See if we need to split the transactions
if end <= start {
let mut transaction_start = 0;
let num_transactions_per_blob = BLOB_DATA_SIZE / size_of::<Transaction>();
let total_entry_chunks = (self[end].transactions.len() + num_transactions_per_blob
- 1) / num_transactions_per_blob;
trace!(
"splitting transactions end: {} total_chunks: {}",
end,
total_entry_chunks
);
for _ in 0..total_entry_chunks {
let transaction_end = min(
transaction_start + num_transactions_per_blob,
self[end].transactions.len(),
);
let mut entry = Entry {
num_hashes: self[end].num_hashes,
id: self[end].id,
transactions: self[end].transactions[transaction_start..transaction_end]
.to_vec(),
};
entries.push(vec![entry]);
transaction_start = transaction_end;
}
end += 1;
} else {
entries.push(self[start..end].to_vec());
}
for entry in entries {
let b = blob_recycler.allocate();
let pos = { let pos = {
let mut bd = b.write().unwrap(); let mut bd = blob.write().unwrap();
let mut out = Cursor::new(bd.data_mut()); let mut out = Cursor::new(bd.data_mut());
serialize_into(&mut out, &entry).expect("failed to serialize output"); serialize_into(&mut out, &entry).expect("failed to serialize output");
out.position() as usize out.position() as usize
}; };
assert!(pos < BLOB_SIZE); assert!(pos < BLOB_SIZE);
b.write().unwrap().set_size(pos); blob.write().unwrap().set_size(pos);
q.push_back(b); q.push_back(blob);
}
start = end;
} }
} }
} }
/// Create a vector of Entries of length `transaction_batches.len()` from `start_hash` hash, `num_hashes`, and `transaction_batches`.
pub fn next_entries(
start_hash: &Hash,
num_hashes: u64,
transaction_batches: Vec<Vec<Transaction>>,
) -> Vec<Entry> {
let mut id = *start_hash;
let mut entries = vec![];
for transactions in transaction_batches {
let entry = next_entry(&id, num_hashes, transactions);
id = entry.id;
entries.push(entry);
}
entries
}
pub fn reconstruct_entries_from_blobs(blobs: &VecDeque<SharedBlob>) -> bincode::Result<Vec<Entry>> { pub fn reconstruct_entries_from_blobs(blobs: &VecDeque<SharedBlob>) -> bincode::Result<Vec<Entry>> {
let mut entries_to_apply: Vec<Entry> = Vec::new(); let mut entries: Vec<Entry> = Vec::with_capacity(blobs.len());
let mut last_id = Hash::default();
for msgs in blobs { for msgs in blobs {
let blob = msgs.read().unwrap(); let blob = msgs.read().unwrap();
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size])?; let entry: Entry = deserialize(&blob.data()[..blob.meta.size])?;
for entry in entries { entries.push(entry);
if entry.id == last_id {
if let Some(last_entry) = entries_to_apply.last_mut() {
last_entry.transactions.extend(entry.transactions);
} }
Ok(entries)
}
/// Creates the next entries for given transactions, outputs
/// updates start_hash to id of last Entry, sets cur_hashes to 0
pub fn next_entries_mut(
start_hash: &mut Hash,
cur_hashes: &mut u64,
transactions: Vec<Transaction>,
) -> Vec<Entry> {
if transactions.is_empty() {
vec![Entry::new_mut(start_hash, cur_hashes, transactions)]
} else { } else {
last_id = entry.id; let mut chunk_len = transactions.len();
entries_to_apply.push(entry);
// check for fit, make sure they can be serialized
while serialized_size(&Entry {
num_hashes: 0,
id: Hash::default(),
transactions: transactions[0..chunk_len].to_vec(),
}).unwrap() > BLOB_DATA_SIZE as u64
{
chunk_len /= 2;
}
let mut entries = Vec::with_capacity(transactions.len() / chunk_len + 1);
for chunk in transactions.chunks(chunk_len) {
entries.push(Entry::new_mut(start_hash, cur_hashes, chunk.to_vec()));
}
entries
} }
} }
}
Ok(entries_to_apply) /// Creates the next Entries for given transactions
pub fn next_entries(
start_hash: &Hash,
cur_hashes: u64,
transactions: Vec<Transaction>,
) -> Vec<Entry> {
let mut id = *start_hash;
let mut num_hashes = cur_hashes;
next_entries_mut(&mut id, &mut num_hashes, transactions)
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use entry::{next_entry, Entry};
use hash::hash; use hash::hash;
use packet::BlobRecycler; use packet::BlobRecycler;
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use transaction::Transaction; use transaction::Transaction;
/// Create a vector of Entries of length `transaction_batches.len()`
/// from `start_hash` hash, `num_hashes`, and `transaction_batches`.
fn next_entries_batched(
start_hash: &Hash,
cur_hashes: u64,
transaction_batches: Vec<Vec<Transaction>>,
) -> Vec<Entry> {
let mut id = *start_hash;
let mut entries = vec![];
let mut num_hashes = cur_hashes;
for transactions in transaction_batches {
let mut entry_batch = next_entries_mut(&mut id, &mut num_hashes, transactions);
entries.append(&mut entry_batch);
}
entries
}
#[test] #[test]
fn test_verify_slice() { fn test_verify_slice() {
let zero = Hash::default(); let zero = Hash::default();
@ -139,23 +128,22 @@ mod tests {
assert!(vec![][..].verify(&zero)); // base case assert!(vec![][..].verify(&zero)); // base case
assert!(vec![Entry::new_tick(0, &zero)][..].verify(&zero)); // singleton case 1 assert!(vec![Entry::new_tick(0, &zero)][..].verify(&zero)); // singleton case 1
assert!(!vec![Entry::new_tick(0, &zero)][..].verify(&one)); // singleton case 2, bad assert!(!vec![Entry::new_tick(0, &zero)][..].verify(&one)); // singleton case 2, bad
assert!(next_entries(&zero, 0, vec![vec![]; 2])[..].verify(&zero)); // inductive step assert!(next_entries_batched(&zero, 0, vec![vec![]; 2])[..].verify(&zero)); // inductive step
let mut bad_ticks = next_entries(&zero, 0, vec![vec![]; 2]); let mut bad_ticks = next_entries_batched(&zero, 0, vec![vec![]; 2]);
bad_ticks[1].id = one; bad_ticks[1].id = one;
assert!(!bad_ticks.verify(&zero)); // inductive step, bad assert!(!bad_ticks.verify(&zero)); // inductive step, bad
} }
#[test] #[test]
fn test_entry_to_blobs() { fn test_entries_to_blobs() {
let zero = Hash::default(); let zero = Hash::default();
let one = hash(&zero); let one = hash(&zero);
let keypair = KeyPair::new(); let keypair = KeyPair::new();
let tx0 = Transaction::new(&keypair, keypair.pubkey(), 1, one); let tx0 = Transaction::new(&keypair, keypair.pubkey(), 1, one);
let transactions = vec![tx0; 10000]; let transactions = vec![tx0; 10_000];
let e0 = Entry::new(&zero, 0, transactions); let entries = next_entries(&zero, 0, transactions);
let entries = vec![e0];
let blob_recycler = BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
let mut blob_q = VecDeque::new(); let mut blob_q = VecDeque::new();
entries.to_blobs(&blob_recycler, &mut blob_q); entries.to_blobs(&blob_recycler, &mut blob_q);
@ -172,14 +160,16 @@ mod tests {
} }
#[test] #[test]
fn test_next_entries() { fn test_next_entries_batched() {
// this also tests next_entries, ugly, but is an easy way to do vec of vec (batch)
let mut id = Hash::default(); let mut id = Hash::default();
let next_id = hash(&id); let next_id = hash(&id);
let keypair = KeyPair::new(); let keypair = KeyPair::new();
let tx0 = Transaction::new(&keypair, keypair.pubkey(), 1, next_id); let tx0 = Transaction::new(&keypair, keypair.pubkey(), 1, next_id);
let transactions = vec![tx0; 5]; let transactions = vec![tx0; 5];
let transaction_batches = vec![transactions.clone(); 5]; let transaction_batches = vec![transactions.clone(); 5];
let entries0 = next_entries(&id, 1, transaction_batches); let entries0 = next_entries_batched(&id, 0, transaction_batches);
assert_eq!(entries0.len(), 5); assert_eq!(entries0.len(), 5);
@ -197,14 +187,30 @@ mod tests {
mod bench { mod bench {
extern crate test; extern crate test;
use self::test::Bencher; use self::test::Bencher;
use hash::hash;
use ledger::*; use ledger::*;
use packet::BlobRecycler;
use signature::{KeyPair, KeyPairUtil};
use transaction::Transaction;
#[bench] #[bench]
fn bench_next_entries(bencher: &mut Bencher) { fn bench_block_to_blobs_to_block(bencher: &mut Bencher) {
let start_hash = Hash::default(); let zero = Hash::default();
let entries = next_entries(&start_hash, 10_000, vec![vec![]; 8]); let one = hash(&zero);
let keypair = KeyPair::new();
let tx0 = Transaction::new(&keypair, keypair.pubkey(), 1, one);
let transactions = vec![tx0; 10];
let entries = next_entries(&zero, 1, transactions);
let blob_recycler = BlobRecycler::default();
bencher.iter(|| { bencher.iter(|| {
assert!(entries.verify(&start_hash)); let mut blob_q = VecDeque::new();
entries.to_blobs(&blob_recycler, &mut blob_q);
assert_eq!(reconstruct_entries_from_blobs(&blob_q).unwrap(), entries);
for blob in blob_q {
blob_recycler.recycle(blob);
}
}); });
} }
} }

View File

@ -204,16 +204,11 @@ pub mod tests {
let mut alice_ref_balance = starting_balance; let mut alice_ref_balance = starting_balance;
let mut msgs = VecDeque::new(); let mut msgs = VecDeque::new();
let mut cur_hash = Hash::default(); let mut cur_hash = Hash::default();
let num_blobs = 10; let mut blob_id = 0;
let num_transfers = 10;
let transfer_amount = 501; let transfer_amount = 501;
let bob_keypair = KeyPair::new(); let bob_keypair = KeyPair::new();
for i in 0..num_blobs { for i in 0..num_transfers {
let b = resp_recycler.allocate();
let b_ = b.clone();
let mut w = b.write().unwrap();
w.set_index(i).unwrap();
w.set_id(leader_id).unwrap();
let entry0 = Entry::new(&cur_hash, i, vec![]); let entry0 = Entry::new(&cur_hash, i, vec![]);
bank.register_entry_id(&cur_hash); bank.register_entry_id(&cur_hash);
cur_hash = hash(&cur_hash); cur_hash = hash(&cur_hash);
@ -226,13 +221,21 @@ pub mod tests {
); );
bank.register_entry_id(&cur_hash); bank.register_entry_id(&cur_hash);
cur_hash = hash(&cur_hash); cur_hash = hash(&cur_hash);
let entry1 = Entry::new(&cur_hash, i + num_blobs, vec![tx0]); let entry1 = Entry::new(&cur_hash, i + num_transfers, vec![tx0]);
bank.register_entry_id(&cur_hash); bank.register_entry_id(&cur_hash);
cur_hash = hash(&cur_hash); cur_hash = hash(&cur_hash);
alice_ref_balance -= transfer_amount; alice_ref_balance -= transfer_amount;
let serialized_entry = serialize(&vec![entry0, entry1]).unwrap(); for entry in vec![entry0, entry1] {
let b = resp_recycler.allocate();
let b_ = b.clone();
let mut w = b.write().unwrap();
w.set_index(blob_id).unwrap();
blob_id += 1;
w.set_id(leader_id).unwrap();
let serialized_entry = serialize(&entry).unwrap();
w.data_mut()[..serialized_entry.len()].copy_from_slice(&serialized_entry); w.data_mut()[..serialized_entry.len()].copy_from_slice(&serialized_entry);
w.set_size(serialized_entry.len()); w.set_size(serialized_entry.len());
@ -240,16 +243,15 @@ pub mod tests {
drop(w); drop(w);
msgs.push_back(b_); msgs.push_back(b_);
} }
}
// send the blobs into the socket // send the blobs into the socket
s_responder.send(msgs).expect("send"); s_responder.send(msgs).expect("send");
// receive retransmitted messages // receive retransmitted messages
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let mut msgs: Vec<_> = Vec::new();
while let Ok(msg) = r_reader.recv_timeout(timer) { while let Ok(msg) = r_reader.recv_timeout(timer) {
trace!("msg: {:?}", msg); trace!("msg: {:?}", msg);
msgs.push(msg);
} }
let alice_balance = bank.get_balance(&mint.keypair().pubkey()).unwrap(); let alice_balance = bank.get_balance(&mint.keypair().pubkey()).unwrap();