diff --git a/src/ledger.rs b/src/ledger.rs index 71fb20df84..ce136c970d 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -5,7 +5,7 @@ use bincode::{self, deserialize, deserialize_from, serialize_into, serialized_size}; use entry::Entry; use hash::Hash; -use log::Level::Trace; +//use log::Level::Trace; use packet::{self, SharedBlob, BLOB_DATA_SIZE}; use rayon::prelude::*; use result::{Error, Result}; @@ -75,7 +75,7 @@ fn entry_at(file: &mut File, at: u64) -> io::Result { file.seek(SeekFrom::Start(at))?; let len = deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)?; - trace!("entry_at({}) len: {}", at, len); + //trace!("entry_at({}) len: {}", at, len); deserialize_from(file.take(len)).map_err(err_bincode_to_io) } @@ -174,7 +174,7 @@ fn recover_ledger(ledger_path: &Path) -> io::Result<()> { let len = index.metadata()?.len(); if len % SIZEOF_U64 != 0 { - trace!("recover: trimming index len to {}", len - len % SIZEOF_U64); + //trace!("recover: trimming index len to {}", len - len % SIZEOF_U64); index.set_len(len - (len % SIZEOF_U64))?; } @@ -182,48 +182,50 @@ fn recover_ledger(ledger_path: &Path) -> io::Result<()> { // to a valid entry deserialization offset... loop { let len = index.metadata()?.len(); - trace!("recover: index len:{}", len); + //trace!("recover: index len:{}", len); // should never happen if len < SIZEOF_U64 { - trace!("recover: error index len {} too small", len); + //trace!("recover: error index len {} too small", len); Err(io::Error::new(io::ErrorKind::Other, "empty ledger index"))?; } let offset = u64_at(&mut index, len - SIZEOF_U64)?; - trace!("recover: offset[{}]: {}", (len / SIZEOF_U64) - 1, offset); + //trace!("recover: offset[{}]: {}", (len / SIZEOF_U64) - 1, offset); match entry_at(&mut data, offset) { Ok(entry) => { - trace!("recover: entry[{}]: {:?}", (len / SIZEOF_U64) - 1, entry); + //trace!("recover: entry[{}]: {:?}", (len / SIZEOF_U64) - 1, entry); let entry_len = serialized_size(&entry).map_err(err_bincode_to_io)?; - trace!("recover: entry_len: {}", entry_len); + //trace!("recover: entry_len: {}", entry_len); // now trim data file to size... data.set_len(offset + SIZEOF_U64 + entry_len)?; - trace!( - "recover: trimmed data file to {}", - offset + SIZEOF_U64 + entry_len - ); + //trace!( + // "recover: trimmed data file to {}", + // offset + SIZEOF_U64 + entry_len + //); break; // all good } - Err(err) => { - trace!( - "recover: no entry recovered at {} {}", - offset, - err.to_string() - ); + Err(_err) => { + //trace!( + // "recover: no entry recovered at {} {}", + // offset, + // _err.to_string() + //); index.set_len(len - SIZEOF_U64)?; } } } - let num_entries = index.metadata()?.len() / SIZEOF_U64; - trace!("recover: done. {} entries", num_entries); + //if log_enabled!(Trace) { + // let num_entries = index.metadata()?.len() / SIZEOF_U64; + // trace!("recover: done. {} entries", num_entries); + //} // flush everything to disk... index.sync_all()?; @@ -266,20 +268,20 @@ impl LedgerWriter { .append(true) .open(ledger_path.join("index"))?; - if log_enabled!(Trace) { - let len = index.metadata()?.len(); - trace!("LedgerWriter::new: index fp:{}", len); - } + //if log_enabled!(Trace) { + // let len = index.metadata()?.len(); + // trace!("LedgerWriter::new: index fp:{}", len); + //} let data = OpenOptions::new() .create(create) .append(true) .open(ledger_path.join("data"))?; - if log_enabled!(Trace) { - let len = data.metadata()?.len(); - trace!("LedgerWriter::new: data fp:{}", len); - } + //if log_enabled!(Trace) { + // let len = data.metadata()?.len(); + // trace!("LedgerWriter::new: data fp:{}", len); + //} Ok(LedgerWriter { index, data }) } @@ -288,28 +290,28 @@ impl LedgerWriter { let len = serialized_size(&entry).map_err(err_bincode_to_io)?; serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?; - if log_enabled!(Trace) { - let offset = self.data.seek(SeekFrom::Current(0))?; - trace!("write_entry: after len data fp:{}", offset); - } + //if log_enabled!(Trace) { + // let offset = self.data.seek(SeekFrom::Current(0))?; + // trace!("write_entry: after len data fp:{}", offset); + //} serialize_into(&mut self.data, &entry).map_err(err_bincode_to_io)?; - if log_enabled!(Trace) { - let offset = self.data.seek(SeekFrom::Current(0))?; - trace!("write_entry: after entry data fp:{}", offset); - } + //if log_enabled!(Trace) { + // let offset = self.data.seek(SeekFrom::Current(0))?; + // trace!("write_entry: after entry data fp:{}", offset); + //} self.data.sync_data()?; let offset = self.data.seek(SeekFrom::Current(0))? - len - SIZEOF_U64; - trace!("write_entry: offset:{} len:{}", offset, len); + //trace!("write_entry: offset:{} len:{}", offset, len); serialize_into(&mut self.index, &offset).map_err(err_bincode_to_io)?; - if log_enabled!(Trace) { - let offset = self.index.seek(SeekFrom::Current(0))?; - trace!("write_entry: end index fp:{}", offset); - } + //if log_enabled!(Trace) { + // let offset = self.index.seek(SeekFrom::Current(0))?; + // trace!("write_entry: end index fp:{}", offset); + //} self.index.sync_data() } diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index a8efb1712b..afbbf7b7e0 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -39,30 +39,35 @@ impl ReplicateStage { while let Ok(mut more) = window_receiver.try_recv() { blobs.append(&mut more); } - let blobs_len = blobs.len(); let entries = reconstruct_entries_from_blobs(blobs.clone())?; + + while let Some(blob) = blobs.pop_front() { + blob_recycler.recycle(blob); + } + { let votes = entries_to_votes(&entries); let mut wcrdt = crdt.write().unwrap(); wcrdt.insert_votes(&votes); - }; + } + inc_new_counter!( "replicate-transactions", entries.iter().map(|x| x.transactions.len()).sum() ); - if let Some(ledger_writer) = ledger_writer { - ledger_writer.write_entries(entries.clone())?; - } - let res = bank.process_entries(entries); + let res = bank.process_entries(entries.clone()); + + // TODO: move this to another stage? + if let Some(ledger_writer) = ledger_writer { + ledger_writer.write_entries(entries)?; + } if res.is_err() { - error!("process_entries {} {:?}", blobs_len, res); + error!("process_entries {:?}", res); } let _ = res?; - while let Some(blob) = blobs.pop_front() { - blob_recycler.recycle(blob); - } + Ok(()) } pub fn new(