From 0aad71d46e22fca05bc3e76beee752ccad3e6b1c Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Thu, 3 May 2018 14:35:04 -0700 Subject: [PATCH] fix entry serialize --- src/accountant_skel.rs | 64 ++++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 28 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index c08e86620d..7e548b8710 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -118,29 +118,27 @@ impl AccountantSkel { trace!("notify_entry_info done"); } - fn receive_to_list( + fn receive_all( obj: &SharedSkel, writer: &Arc>, - max: usize, - ) -> Result> { + ) -> Result> { //TODO implement a serialize for channel that does this without allocations - let mut num = 0; - let mut l = LinkedList::new(); + let mut l = vec![]; let entry = obj.historian .output .lock() .unwrap() .recv_timeout(Duration::new(1, 0))?; + trace!("obj.write 1 {:?}", entry); Self::update_entry(obj, writer, &entry); - l.push_back(entry); + trace!("obj.write 1.end"); + l.push(entry); while let Ok(entry) = obj.historian.receive() { + trace!("obj.write 2"); Self::update_entry(obj, writer, &entry); - l.push_back(entry); - num += 1; - if num == max { - break; - } - trace!("receive_to_list entries num: {}", num); + trace!("obj.write 2.end"); + l.push(entry); + trace!("num: {}", num); } Ok(l) } @@ -154,24 +152,34 @@ impl AccountantSkel { writer: &Arc>, exit: Arc, ) -> Result<()> { - // TODO: should it be the serialized Entry size? - let max = BLOB_SIZE / size_of::(); let mut q = VecDeque::new(); - let mut count = 0; trace!("max: {}", max); - while let Ok(list) = Self::receive_to_list(&obj, writer, max) { - trace!("New blobs? {} {}", count, list.len()); - let b = blob_recycler.allocate(); - let pos = { - let mut bd = b.write().unwrap(); - let mut out = Cursor::new(bd.data_mut()); - serialize_into(&mut out, &list).expect("failed to serialize output"); - out.position() as usize - }; - assert!(pos < BLOB_SIZE); - b.write().unwrap().set_size(pos); - q.push_back(b); - count += 1; + while let Ok(list) = Self::receive_all(&obj, writer) { + trace!("New blobs? {}", list.len()); + let mut start = 0; + let mut end = 0; + while start < list.len() { + let total = 0; + for i in list[start..] { + total += size_of::() * i.events.len(); + total += size_of::(); + if total >= BLOB_SIZE { + break; + } + end += 1; + } + let b = blob_recycler.allocate(); + let pos = { + let mut bd = b.write().unwrap(); + let mut out = Cursor::new(bd.data_mut()); + serialize_into(&mut out, &list[start .. end]).expect("failed to serialize output"); + out.position() as usize + }; + assert!(pos < BLOB_SIZE); + b.write().unwrap().set_size(pos); + q.push_back(b); + start = end; + } if exit.load(Ordering::Relaxed) { break; }