From 3dbbb398df207cc5198923e02ca225392524ebd0 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Fri, 22 Jun 2018 12:20:29 -0700 Subject: [PATCH] use next_entries() in recorder, recycle blobs in reconstruct_from_blobs --- src/ledger.rs | 39 ++++++++++++++++++++++++++++----------- src/record_stage.rs | 11 +++++++++-- src/recorder.rs | 11 ++++++++--- src/replicate_stage.rs | 8 +++----- 4 files changed, 48 insertions(+), 21 deletions(-) diff --git a/src/ledger.rs b/src/ledger.rs index 8b6e043953..22b0e729ab 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -41,12 +41,26 @@ impl Block for [Entry] { } } -pub fn reconstruct_entries_from_blobs(blobs: &VecDeque) -> bincode::Result> { +pub fn reconstruct_entries_from_blobs( + blobs: VecDeque, + blob_recycler: &packet::BlobRecycler, +) -> bincode::Result> { let mut entries: Vec = Vec::with_capacity(blobs.len()); - for msgs in blobs { - let blob = msgs.read().unwrap(); - let entry: Entry = deserialize(&blob.data()[..blob.meta.size])?; - entries.push(entry); + + for blob in blobs { + let entry = { + let msg = blob.read().unwrap(); + deserialize(&msg.data()[..msg.meta.size]) + }; + blob_recycler.recycle(blob); + + match entry { + Ok(entry) => entries.push(entry), + Err(err) => { + trace!("reconstruct_entry_from_blobs: {}", err); + return Err(err); + } + } } Ok(entries) } @@ -148,7 +162,10 @@ mod tests { let mut blob_q = VecDeque::new(); entries.to_blobs(&blob_recycler, &mut blob_q); - assert_eq!(reconstruct_entries_from_blobs(&blob_q).unwrap(), entries); + assert_eq!( + reconstruct_entries_from_blobs(blob_q, &blob_recycler).unwrap(), + entries + ); } #[test] @@ -156,7 +173,7 @@ mod tests { let blob_recycler = BlobRecycler::default(); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); let blobs_q = packet::to_blobs(vec![(0, addr)], &blob_recycler).unwrap(); // <-- attack! - assert!(reconstruct_entries_from_blobs(&blobs_q).is_err()); + assert!(reconstruct_entries_from_blobs(blobs_q, &blob_recycler).is_err()); } #[test] @@ -206,10 +223,10 @@ mod bench { bencher.iter(|| { let mut blob_q = VecDeque::new(); entries.to_blobs(&blob_recycler, &mut blob_q); - assert_eq!(reconstruct_entries_from_blobs(&blob_q).unwrap(), entries); - for blob in blob_q { - blob_recycler.recycle(blob); - } + assert_eq!( + reconstruct_entries_from_blobs(blob_q, &blob_recycler).unwrap(), + entries + ); }); } diff --git a/src/record_stage.rs b/src/record_stage.rs index ae8ae09d40..b4e3970d45 100644 --- a/src/record_stage.rs +++ b/src/record_stage.rs @@ -90,8 +90,15 @@ impl RecordStage { } else { vec![] }; - let entry = recorder.record(txs); - sender.send(entry).map_err(|_| ()) + let entries = recorder.record(txs); + let mut result = Ok(()); + for entry in entries { + result = sender.send(entry).map_err(|_| ()); + if result.is_err() { + break; + } + } + result } fn process_signals( diff --git a/src/recorder.rs b/src/recorder.rs index f457df8edc..1056cab3c3 100644 --- a/src/recorder.rs +++ b/src/recorder.rs @@ -3,6 +3,7 @@ use entry::Entry; use hash::{hash, Hash}; +use ledger::next_entries_mut; use std::time::{Duration, Instant}; use transaction::Transaction; @@ -26,15 +27,19 @@ impl Recorder { self.num_hashes += 1; } - pub fn record(&mut self, transactions: Vec) -> Entry { - Entry::new_mut(&mut self.last_hash, &mut self.num_hashes, transactions) + pub fn record(&mut self, transactions: Vec) -> Vec { + next_entries_mut(&mut self.last_hash, &mut self.num_hashes, transactions) } pub fn tick(&mut self, start_time: Instant, tick_duration: Duration) -> Option { if start_time.elapsed() > tick_duration * (self.num_ticks + 1) { // TODO: don't let this overflow u32 self.num_ticks += 1; - Some(self.record(vec![])) + Some(Entry::new_mut( + &mut self.last_hash, + &mut self.num_hashes, + vec![], + )) } else { None } diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index fbb86b4feb..f0564fa41d 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -23,15 +23,13 @@ impl ReplicateStage { ) -> Result<()> { let timer = Duration::new(1, 0); let blobs = blob_receiver.recv_timeout(timer)?; - let entries = ledger::reconstruct_entries_from_blobs(&blobs)?; + let blobs_len = blobs.len(); + let entries = ledger::reconstruct_entries_from_blobs(blobs, &blob_recycler)?; let res = bank.process_entries(entries); if res.is_err() { - error!("process_entries {} {:?}", blobs.len(), res); + error!("process_entries {} {:?}", blobs_len, res); } res?; - for blob in blobs { - blob_recycler.recycle(blob); - } Ok(()) }