Serialize entries over multiple blobs

This commit is contained in:
Stephen Akridge
2018-05-09 15:41:18 -07:00
parent 0601e05978
commit 900b4f2644

View File

@ -9,7 +9,7 @@ use ecdsa;
use entry::Entry; use entry::Entry;
use event::Event; use event::Event;
use packet; use packet;
use packet::{SharedBlob, SharedPackets, BLOB_SIZE}; use packet::{SharedBlob, SharedPackets, BLOB_DATA_SIZE, BLOB_SIZE};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use rayon::prelude::*; use rayon::prelude::*;
use result::Result; use result::Result;
@ -83,33 +83,49 @@ impl Tpu {
let mut start = 0; let mut start = 0;
let mut end = 0; let mut end = 0;
while start < list.len() { while start < list.len() {
let mut entries: Vec<Vec<Entry>> = Vec::new();
let mut total = 0; let mut total = 0;
for i in &list[start..] { for i in &list[start..] {
total += size_of::<Event>() * i.events.len(); total += size_of::<Event>() * i.events.len();
total += size_of::<Entry>(); total += size_of::<Entry>();
if total >= BLOB_SIZE { if total >= BLOB_DATA_SIZE {
break; break;
} }
end += 1; end += 1;
} }
// See that we made progress and a single // See if we need to split the events
// vec of Events wasn't too big for a single packet
if end <= start { if end <= start {
// Trust the recorder to not package more than we can trace!("splitting events");
// serialize let mut event_start = 0;
let num_events_per_blob = BLOB_DATA_SIZE / size_of::<Event>();
let total_entry_chunks = list[end].events.len() / num_events_per_blob;
for _ in 0..total_entry_chunks {
let event_end = event_start + num_events_per_blob;
let mut entry = Entry {
num_hashes: list[end].num_hashes,
id: list[end].id,
events: list[end].events[event_start..event_end].to_vec(),
};
entries.push(vec![entry]);
event_start = event_end;
}
end += 1; end += 1;
} else {
entries.push(list[start..end].to_vec());
} }
let b = blob_recycler.allocate(); for entry in entries {
let pos = { let b = blob_recycler.allocate();
let mut bd = b.write().unwrap(); let pos = {
let mut out = Cursor::new(bd.data_mut()); let mut bd = b.write().unwrap();
serialize_into(&mut out, &list[start..end]).expect("failed to serialize output"); let mut out = Cursor::new(bd.data_mut());
out.position() as usize serialize_into(&mut out, &entry).expect("failed to serialize output");
}; out.position() as usize
assert!(pos < BLOB_SIZE); };
b.write().unwrap().set_size(pos); assert!(pos < BLOB_SIZE);
q.push_back(b); b.write().unwrap().set_size(pos);
q.push_back(b);
}
start = end; start = end;
} }
} }
@ -693,7 +709,7 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke
mod tests { mod tests {
use accountant::Accountant; use accountant::Accountant;
use accounting_stage::AccountingStage; use accounting_stage::AccountingStage;
use bincode::serialize; use bincode::{deserialize, serialize};
use chrono::prelude::*; use chrono::prelude::*;
use crdt::Crdt; use crdt::Crdt;
use ecdsa; use ecdsa;
@ -883,10 +899,11 @@ mod tests {
let zero = Hash::default(); let zero = Hash::default();
let keypair = KeyPair::new(); let keypair = KeyPair::new();
let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 0, zero)); let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 0, zero));
let tr1 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, zero)); let events = vec![tr0.clone(); 10000];
let e0 = entry::create_entry(&zero, 0, vec![tr0.clone(), tr1.clone()]); //let tr1 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, zero));
let e0 = entry::create_entry(&zero, 0, events);
let entry_list = vec![e0; 1000]; let entry_list = vec![e0.clone(); 1];
let blob_recycler = BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
let mut blob_q = VecDeque::new(); let mut blob_q = VecDeque::new();
Tpu::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q); Tpu::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q);
@ -895,6 +912,18 @@ mod tests {
if serialized_entry_list.len() % BLOB_SIZE != 0 { if serialized_entry_list.len() % BLOB_SIZE != 0 {
num_blobs_ref += 1 num_blobs_ref += 1
} }
let mut new_events = Vec::new();
for b in &blob_q {
let blob = b.read().unwrap();
let entries: Vec<entry::Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].num_hashes, e0.num_hashes);
assert_eq!(entries[0].id, e0.id);
new_events.extend(entries[0].events.clone());
}
for (i, e) in new_events.iter().enumerate() {
assert_eq!(*e, e0.events[i]);
}
trace!("len: {} ref_len: {}", blob_q.len(), num_blobs_ref); trace!("len: {} ref_len: {}", blob_q.len(), num_blobs_ref);
assert!(blob_q.len() > num_blobs_ref); assert!(blob_q.len() > num_blobs_ref);
} }