fix entry serialize

This commit is contained in:
Anatoly Yakovenko 2018-05-03 14:35:04 -07:00
parent 6f9285322d
commit 0aad71d46e

View File

@ -118,29 +118,27 @@ impl AccountantSkel {
trace!("notify_entry_info done"); trace!("notify_entry_info done");
} }
fn receive_to_list<W: Write>( fn receive_all<W: Write>(
obj: &SharedSkel, obj: &SharedSkel,
writer: &Arc<Mutex<W>>, writer: &Arc<Mutex<W>>,
max: usize, ) -> Result<Vec<Entry>> {
) -> Result<LinkedList<Entry>> {
//TODO implement a serialize for channel that does this without allocations //TODO implement a serialize for channel that does this without allocations
let mut num = 0; let mut l = vec![];
let mut l = LinkedList::new();
let entry = obj.historian let entry = obj.historian
.output .output
.lock() .lock()
.unwrap() .unwrap()
.recv_timeout(Duration::new(1, 0))?; .recv_timeout(Duration::new(1, 0))?;
trace!("obj.write 1 {:?}", entry);
Self::update_entry(obj, writer, &entry); Self::update_entry(obj, writer, &entry);
l.push_back(entry); trace!("obj.write 1.end");
l.push(entry);
while let Ok(entry) = obj.historian.receive() { while let Ok(entry) = obj.historian.receive() {
trace!("obj.write 2");
Self::update_entry(obj, writer, &entry); Self::update_entry(obj, writer, &entry);
l.push_back(entry); trace!("obj.write 2.end");
num += 1; l.push(entry);
if num == max { trace!("num: {}", num);
break;
}
trace!("receive_to_list entries num: {}", num);
} }
Ok(l) Ok(l)
} }
@ -154,24 +152,34 @@ impl AccountantSkel {
writer: &Arc<Mutex<W>>, writer: &Arc<Mutex<W>>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Result<()> { ) -> Result<()> {
// TODO: should it be the serialized Entry size?
let max = BLOB_SIZE / size_of::<Entry>();
let mut q = VecDeque::new(); let mut q = VecDeque::new();
let mut count = 0;
trace!("max: {}", max); trace!("max: {}", max);
while let Ok(list) = Self::receive_to_list(&obj, writer, max) { while let Ok(list) = Self::receive_all(&obj, writer) {
trace!("New blobs? {} {}", count, list.len()); trace!("New blobs? {}", list.len());
let mut start = 0;
let mut end = 0;
while start < list.len() {
let total = 0;
for i in list[start..] {
total += size_of::<Event>() * i.events.len();
total += size_of::<Entry>();
if total >= BLOB_SIZE {
break;
}
end += 1;
}
let b = blob_recycler.allocate(); let b = blob_recycler.allocate();
let pos = { let pos = {
let mut bd = b.write().unwrap(); let mut bd = b.write().unwrap();
let mut out = Cursor::new(bd.data_mut()); let mut out = Cursor::new(bd.data_mut());
serialize_into(&mut out, &list).expect("failed to serialize output"); serialize_into(&mut out, &list[start .. end]).expect("failed to serialize output");
out.position() as usize out.position() as usize
}; };
assert!(pos < BLOB_SIZE); assert!(pos < BLOB_SIZE);
b.write().unwrap().set_size(pos); b.write().unwrap().set_size(pos);
q.push_back(b); q.push_back(b);
count += 1; start = end;
}
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break; break;
} }