More idiomatic Rust
This commit is contained in:
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
use bank::Bank;
|
use bank::Bank;
|
||||||
use entry::Entry;
|
use entry::Entry;
|
||||||
use ledger;
|
use ledger::Block;
|
||||||
use packet;
|
use packet;
|
||||||
use result::Result;
|
use result::Result;
|
||||||
use serde_json;
|
use serde_json;
|
||||||
@ -63,7 +63,7 @@ impl<'a> EntryWriter<'a> {
|
|||||||
let mut q = VecDeque::new();
|
let mut q = VecDeque::new();
|
||||||
let list = self.write_entries(writer, entry_receiver)?;
|
let list = self.write_entries(writer, entry_receiver)?;
|
||||||
trace!("New blobs? {}", list.len());
|
trace!("New blobs? {}", list.len());
|
||||||
ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
|
list.to_blobs(blob_recycler, &mut q);
|
||||||
if !q.is_empty() {
|
if !q.is_empty() {
|
||||||
trace!("broadcasting {}", q.len());
|
trace!("broadcasting {}", q.len());
|
||||||
broadcast.send(q)?;
|
broadcast.send(q)?;
|
||||||
|
132
src/ledger.rs
132
src/ledger.rs
@ -16,6 +16,7 @@ use transaction::Transaction;
|
|||||||
pub trait Block {
|
pub trait Block {
|
||||||
/// Verifies the hashes and counts of a slice of transactions are all consistent.
|
/// Verifies the hashes and counts of a slice of transactions are all consistent.
|
||||||
fn verify(&self, start_hash: &Hash) -> bool;
|
fn verify(&self, start_hash: &Hash) -> bool;
|
||||||
|
fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut VecDeque<SharedBlob>);
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Block for [Entry] {
|
impl Block for [Entry] {
|
||||||
@ -24,6 +25,66 @@ impl Block for [Entry] {
|
|||||||
let entry_pairs = genesis.par_iter().chain(self).zip(self);
|
let entry_pairs = genesis.par_iter().chain(self).zip(self);
|
||||||
entry_pairs.all(|(x0, x1)| x1.verify(&x0.id))
|
entry_pairs.all(|(x0, x1)| x1.verify(&x0.id))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut VecDeque<SharedBlob>) {
|
||||||
|
let mut start = 0;
|
||||||
|
let mut end = 0;
|
||||||
|
while start < self.len() {
|
||||||
|
let mut entries: Vec<Vec<Entry>> = Vec::new();
|
||||||
|
let mut total = 0;
|
||||||
|
for i in &self[start..] {
|
||||||
|
total += size_of::<Transaction>() * i.transactions.len();
|
||||||
|
total += size_of::<Entry>();
|
||||||
|
if total >= BLOB_DATA_SIZE {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
end += 1;
|
||||||
|
}
|
||||||
|
// See if we need to split the transactions
|
||||||
|
if end <= start {
|
||||||
|
let mut transaction_start = 0;
|
||||||
|
let num_transactions_per_blob = BLOB_DATA_SIZE / size_of::<Transaction>();
|
||||||
|
let total_entry_chunks = (self[end].transactions.len() + num_transactions_per_blob
|
||||||
|
- 1) / num_transactions_per_blob;
|
||||||
|
trace!(
|
||||||
|
"splitting transactions end: {} total_chunks: {}",
|
||||||
|
end,
|
||||||
|
total_entry_chunks
|
||||||
|
);
|
||||||
|
for _ in 0..total_entry_chunks {
|
||||||
|
let transaction_end = min(
|
||||||
|
transaction_start + num_transactions_per_blob,
|
||||||
|
self[end].transactions.len(),
|
||||||
|
);
|
||||||
|
let mut entry = Entry {
|
||||||
|
num_hashes: self[end].num_hashes,
|
||||||
|
id: self[end].id,
|
||||||
|
transactions: self[end].transactions[transaction_start..transaction_end]
|
||||||
|
.to_vec(),
|
||||||
|
};
|
||||||
|
entries.push(vec![entry]);
|
||||||
|
transaction_start = transaction_end;
|
||||||
|
}
|
||||||
|
end += 1;
|
||||||
|
} else {
|
||||||
|
entries.push(self[start..end].to_vec());
|
||||||
|
}
|
||||||
|
|
||||||
|
for entry in entries {
|
||||||
|
let b = blob_recycler.allocate();
|
||||||
|
let pos = {
|
||||||
|
let mut bd = b.write().unwrap();
|
||||||
|
let mut out = Cursor::new(bd.data_mut());
|
||||||
|
serialize_into(&mut out, &entry).expect("failed to serialize output");
|
||||||
|
out.position() as usize
|
||||||
|
};
|
||||||
|
assert!(pos < BLOB_SIZE);
|
||||||
|
b.write().unwrap().set_size(pos);
|
||||||
|
q.push_back(b);
|
||||||
|
}
|
||||||
|
start = end;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a vector of Entries of length `transaction_batches.len()` from `start_hash` hash, `num_hashes`, and `transaction_batches`.
|
/// Create a vector of Entries of length `transaction_batches.len()` from `start_hash` hash, `num_hashes`, and `transaction_batches`.
|
||||||
@ -43,70 +104,6 @@ pub fn next_entries(
|
|||||||
entries
|
entries
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn process_entry_list_into_blobs(
|
|
||||||
list: &Vec<Entry>,
|
|
||||||
blob_recycler: &packet::BlobRecycler,
|
|
||||||
q: &mut VecDeque<SharedBlob>,
|
|
||||||
) {
|
|
||||||
let mut start = 0;
|
|
||||||
let mut end = 0;
|
|
||||||
while start < list.len() {
|
|
||||||
let mut entries: Vec<Vec<Entry>> = Vec::new();
|
|
||||||
let mut total = 0;
|
|
||||||
for i in &list[start..] {
|
|
||||||
total += size_of::<Transaction>() * i.transactions.len();
|
|
||||||
total += size_of::<Entry>();
|
|
||||||
if total >= BLOB_DATA_SIZE {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
end += 1;
|
|
||||||
}
|
|
||||||
// See if we need to split the transactions
|
|
||||||
if end <= start {
|
|
||||||
let mut transaction_start = 0;
|
|
||||||
let num_transactions_per_blob = BLOB_DATA_SIZE / size_of::<Transaction>();
|
|
||||||
let total_entry_chunks = (list[end].transactions.len() + num_transactions_per_blob - 1)
|
|
||||||
/ num_transactions_per_blob;
|
|
||||||
trace!(
|
|
||||||
"splitting transactions end: {} total_chunks: {}",
|
|
||||||
end,
|
|
||||||
total_entry_chunks
|
|
||||||
);
|
|
||||||
for _ in 0..total_entry_chunks {
|
|
||||||
let transaction_end = min(
|
|
||||||
transaction_start + num_transactions_per_blob,
|
|
||||||
list[end].transactions.len(),
|
|
||||||
);
|
|
||||||
let mut entry = Entry {
|
|
||||||
num_hashes: list[end].num_hashes,
|
|
||||||
id: list[end].id,
|
|
||||||
transactions: list[end].transactions[transaction_start..transaction_end]
|
|
||||||
.to_vec(),
|
|
||||||
};
|
|
||||||
entries.push(vec![entry]);
|
|
||||||
transaction_start = transaction_end;
|
|
||||||
}
|
|
||||||
end += 1;
|
|
||||||
} else {
|
|
||||||
entries.push(list[start..end].to_vec());
|
|
||||||
}
|
|
||||||
|
|
||||||
for entry in entries {
|
|
||||||
let b = blob_recycler.allocate();
|
|
||||||
let pos = {
|
|
||||||
let mut bd = b.write().unwrap();
|
|
||||||
let mut out = Cursor::new(bd.data_mut());
|
|
||||||
serialize_into(&mut out, &entry).expect("failed to serialize output");
|
|
||||||
out.position() as usize
|
|
||||||
};
|
|
||||||
assert!(pos < BLOB_SIZE);
|
|
||||||
b.write().unwrap().set_size(pos);
|
|
||||||
q.push_back(b);
|
|
||||||
}
|
|
||||||
start = end;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn reconstruct_entries_from_blobs(blobs: &VecDeque<SharedBlob>) -> Vec<Entry> {
|
pub fn reconstruct_entries_from_blobs(blobs: &VecDeque<SharedBlob>) -> Vec<Entry> {
|
||||||
let mut entries_to_apply: Vec<Entry> = Vec::new();
|
let mut entries_to_apply: Vec<Entry> = Vec::new();
|
||||||
let mut last_id = Hash::default();
|
let mut last_id = Hash::default();
|
||||||
@ -159,13 +156,12 @@ mod tests {
|
|||||||
let transactions = vec![tx0.clone(); 10000];
|
let transactions = vec![tx0.clone(); 10000];
|
||||||
let e0 = Entry::new(&zero, 0, transactions);
|
let e0 = Entry::new(&zero, 0, transactions);
|
||||||
|
|
||||||
let entry_list = vec![e0.clone(); 1];
|
let entries = vec![e0.clone(); 1];
|
||||||
let blob_recycler = BlobRecycler::default();
|
let blob_recycler = BlobRecycler::default();
|
||||||
let mut blob_q = VecDeque::new();
|
let mut blob_q = VecDeque::new();
|
||||||
process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q);
|
entries.to_blobs(&blob_recycler, &mut blob_q);
|
||||||
let entries = reconstruct_entries_from_blobs(&blob_q);
|
|
||||||
|
|
||||||
assert_eq!(entry_list, entries);
|
assert_eq!(reconstruct_entries_from_blobs(&blob_q), entries);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
Reference in New Issue
Block a user