remove trace! calls, re-arrange replicate_requests to have the bank earlier

This commit is contained in:
Rob Walker
2018-08-06 00:59:42 -07:00
parent 039ed01abf
commit 9fabd34156
2 changed files with 58 additions and 51 deletions

View File

@ -5,7 +5,7 @@
use bincode::{self, deserialize, deserialize_from, serialize_into, serialized_size}; use bincode::{self, deserialize, deserialize_from, serialize_into, serialized_size};
use entry::Entry; use entry::Entry;
use hash::Hash; use hash::Hash;
use log::Level::Trace; //use log::Level::Trace;
use packet::{self, SharedBlob, BLOB_DATA_SIZE}; use packet::{self, SharedBlob, BLOB_DATA_SIZE};
use rayon::prelude::*; use rayon::prelude::*;
use result::{Error, Result}; use result::{Error, Result};
@ -75,7 +75,7 @@ fn entry_at(file: &mut File, at: u64) -> io::Result<Entry> {
file.seek(SeekFrom::Start(at))?; file.seek(SeekFrom::Start(at))?;
let len = deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)?; let len = deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)?;
trace!("entry_at({}) len: {}", at, len); //trace!("entry_at({}) len: {}", at, len);
deserialize_from(file.take(len)).map_err(err_bincode_to_io) deserialize_from(file.take(len)).map_err(err_bincode_to_io)
} }
@ -174,7 +174,7 @@ fn recover_ledger(ledger_path: &Path) -> io::Result<()> {
let len = index.metadata()?.len(); let len = index.metadata()?.len();
if len % SIZEOF_U64 != 0 { if len % SIZEOF_U64 != 0 {
trace!("recover: trimming index len to {}", len - len % SIZEOF_U64); //trace!("recover: trimming index len to {}", len - len % SIZEOF_U64);
index.set_len(len - (len % SIZEOF_U64))?; index.set_len(len - (len % SIZEOF_U64))?;
} }
@ -182,48 +182,50 @@ fn recover_ledger(ledger_path: &Path) -> io::Result<()> {
// to a valid entry deserialization offset... // to a valid entry deserialization offset...
loop { loop {
let len = index.metadata()?.len(); let len = index.metadata()?.len();
trace!("recover: index len:{}", len); //trace!("recover: index len:{}", len);
// should never happen // should never happen
if len < SIZEOF_U64 { if len < SIZEOF_U64 {
trace!("recover: error index len {} too small", len); //trace!("recover: error index len {} too small", len);
Err(io::Error::new(io::ErrorKind::Other, "empty ledger index"))?; Err(io::Error::new(io::ErrorKind::Other, "empty ledger index"))?;
} }
let offset = u64_at(&mut index, len - SIZEOF_U64)?; let offset = u64_at(&mut index, len - SIZEOF_U64)?;
trace!("recover: offset[{}]: {}", (len / SIZEOF_U64) - 1, offset); //trace!("recover: offset[{}]: {}", (len / SIZEOF_U64) - 1, offset);
match entry_at(&mut data, offset) { match entry_at(&mut data, offset) {
Ok(entry) => { Ok(entry) => {
trace!("recover: entry[{}]: {:?}", (len / SIZEOF_U64) - 1, entry); //trace!("recover: entry[{}]: {:?}", (len / SIZEOF_U64) - 1, entry);
let entry_len = serialized_size(&entry).map_err(err_bincode_to_io)?; let entry_len = serialized_size(&entry).map_err(err_bincode_to_io)?;
trace!("recover: entry_len: {}", entry_len); //trace!("recover: entry_len: {}", entry_len);
// now trim data file to size... // now trim data file to size...
data.set_len(offset + SIZEOF_U64 + entry_len)?; data.set_len(offset + SIZEOF_U64 + entry_len)?;
trace!( //trace!(
"recover: trimmed data file to {}", // "recover: trimmed data file to {}",
offset + SIZEOF_U64 + entry_len // offset + SIZEOF_U64 + entry_len
); //);
break; // all good break; // all good
} }
Err(err) => { Err(_err) => {
trace!( //trace!(
"recover: no entry recovered at {} {}", // "recover: no entry recovered at {} {}",
offset, // offset,
err.to_string() // _err.to_string()
); //);
index.set_len(len - SIZEOF_U64)?; index.set_len(len - SIZEOF_U64)?;
} }
} }
} }
let num_entries = index.metadata()?.len() / SIZEOF_U64; //if log_enabled!(Trace) {
trace!("recover: done. {} entries", num_entries); // let num_entries = index.metadata()?.len() / SIZEOF_U64;
// trace!("recover: done. {} entries", num_entries);
//}
// flush everything to disk... // flush everything to disk...
index.sync_all()?; index.sync_all()?;
@ -266,20 +268,20 @@ impl LedgerWriter {
.append(true) .append(true)
.open(ledger_path.join("index"))?; .open(ledger_path.join("index"))?;
if log_enabled!(Trace) { //if log_enabled!(Trace) {
let len = index.metadata()?.len(); // let len = index.metadata()?.len();
trace!("LedgerWriter::new: index fp:{}", len); // trace!("LedgerWriter::new: index fp:{}", len);
} //}
let data = OpenOptions::new() let data = OpenOptions::new()
.create(create) .create(create)
.append(true) .append(true)
.open(ledger_path.join("data"))?; .open(ledger_path.join("data"))?;
if log_enabled!(Trace) { //if log_enabled!(Trace) {
let len = data.metadata()?.len(); // let len = data.metadata()?.len();
trace!("LedgerWriter::new: data fp:{}", len); // trace!("LedgerWriter::new: data fp:{}", len);
} //}
Ok(LedgerWriter { index, data }) Ok(LedgerWriter { index, data })
} }
@ -288,28 +290,28 @@ impl LedgerWriter {
let len = serialized_size(&entry).map_err(err_bincode_to_io)?; let len = serialized_size(&entry).map_err(err_bincode_to_io)?;
serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?; serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?;
if log_enabled!(Trace) { //if log_enabled!(Trace) {
let offset = self.data.seek(SeekFrom::Current(0))?; // let offset = self.data.seek(SeekFrom::Current(0))?;
trace!("write_entry: after len data fp:{}", offset); // trace!("write_entry: after len data fp:{}", offset);
} //}
serialize_into(&mut self.data, &entry).map_err(err_bincode_to_io)?; serialize_into(&mut self.data, &entry).map_err(err_bincode_to_io)?;
if log_enabled!(Trace) { //if log_enabled!(Trace) {
let offset = self.data.seek(SeekFrom::Current(0))?; // let offset = self.data.seek(SeekFrom::Current(0))?;
trace!("write_entry: after entry data fp:{}", offset); // trace!("write_entry: after entry data fp:{}", offset);
} //}
self.data.sync_data()?; self.data.sync_data()?;
let offset = self.data.seek(SeekFrom::Current(0))? - len - SIZEOF_U64; let offset = self.data.seek(SeekFrom::Current(0))? - len - SIZEOF_U64;
trace!("write_entry: offset:{} len:{}", offset, len); //trace!("write_entry: offset:{} len:{}", offset, len);
serialize_into(&mut self.index, &offset).map_err(err_bincode_to_io)?; serialize_into(&mut self.index, &offset).map_err(err_bincode_to_io)?;
if log_enabled!(Trace) { //if log_enabled!(Trace) {
let offset = self.index.seek(SeekFrom::Current(0))?; // let offset = self.index.seek(SeekFrom::Current(0))?;
trace!("write_entry: end index fp:{}", offset); // trace!("write_entry: end index fp:{}", offset);
} //}
self.index.sync_data() self.index.sync_data()
} }

View File

@ -39,30 +39,35 @@ impl ReplicateStage {
while let Ok(mut more) = window_receiver.try_recv() { while let Ok(mut more) = window_receiver.try_recv() {
blobs.append(&mut more); blobs.append(&mut more);
} }
let blobs_len = blobs.len();
let entries = reconstruct_entries_from_blobs(blobs.clone())?; let entries = reconstruct_entries_from_blobs(blobs.clone())?;
while let Some(blob) = blobs.pop_front() {
blob_recycler.recycle(blob);
}
{ {
let votes = entries_to_votes(&entries); let votes = entries_to_votes(&entries);
let mut wcrdt = crdt.write().unwrap(); let mut wcrdt = crdt.write().unwrap();
wcrdt.insert_votes(&votes); wcrdt.insert_votes(&votes);
}; }
inc_new_counter!( inc_new_counter!(
"replicate-transactions", "replicate-transactions",
entries.iter().map(|x| x.transactions.len()).sum() entries.iter().map(|x| x.transactions.len()).sum()
); );
if let Some(ledger_writer) = ledger_writer {
ledger_writer.write_entries(entries.clone())?;
}
let res = bank.process_entries(entries); let res = bank.process_entries(entries.clone());
// TODO: move this to another stage?
if let Some(ledger_writer) = ledger_writer {
ledger_writer.write_entries(entries)?;
}
if res.is_err() { if res.is_err() {
error!("process_entries {} {:?}", blobs_len, res); error!("process_entries {:?}", res);
} }
let _ = res?; let _ = res?;
while let Some(blob) = blobs.pop_front() {
blob_recycler.recycle(blob);
}
Ok(()) Ok(())
} }
pub fn new( pub fn new(