From bbf9ea89c5e3f0dea9ba9726afb39ed00750cfbc Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Thu, 9 Aug 2018 18:15:05 -0700 Subject: [PATCH] add some flushing to ledger --- src/ledger.rs | 29 ++++++++++++++++++++++++----- src/write_stage.rs | 3 ++- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/src/ledger.rs b/src/ledger.rs index 64bc7795dc..6657a0a126 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -119,8 +119,7 @@ pub fn verify_ledger(ledger_path: &str, recover: bool) -> io::Result<()> { recover_ledger(ledger_path)?; } - let mut index = File::open(ledger_path.join("index"))?; - let mut data = File::open(ledger_path.join("data"))?; + let index = File::open(ledger_path.join("index"))?; let index_len = index.metadata()?.len(); @@ -130,6 +129,10 @@ pub fn verify_ledger(ledger_path: &str, recover: bool) -> io::Result<()> { "expected back-to-back entries", ))?; } + let mut index = BufReader::with_capacity((WINDOW_SIZE * SIZEOF_U64) as usize, index); + + let data = File::open(ledger_path.join("data"))?; + let mut data = BufReader::with_capacity(WINDOW_SIZE as usize * BLOB_DATA_SIZE, data); let mut last_data_offset = 0; let mut index_offset = 0; @@ -142,7 +145,8 @@ pub fn verify_ledger(ledger_path: &str, recover: bool) -> io::Result<()> { if last_data_offset + last_len != data_offset { Err(io::Error::new( io::ErrorKind::Other, - "expected back-to-back entries", + format!("expected back-to-back entries... entry[{}] has a gap of {} bytes to the previous entry", + index_offset/SIZEOF_U64, data_offset as i64 - (last_data_offset as i64 + last_len as i64)) ))?; } @@ -153,6 +157,7 @@ pub fn verify_ledger(ledger_path: &str, recover: bool) -> io::Result<()> { data_read += last_len; index_offset += SIZEOF_U64; } + let data = data.into_inner(); if data_read != data.metadata()?.len() { Err(io::Error::new( io::ErrorKind::Other, @@ -291,7 +296,7 @@ impl LedgerWriter { Ok(LedgerWriter { index, data }) } - pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> { + fn write_entry_noflush(&mut self, entry: &Entry) -> io::Result<()> { let len = serialized_size(&entry).map_err(err_bincode_to_io)?; serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?; @@ -318,13 +323,22 @@ impl LedgerWriter { Ok(()) } + pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> { + self.write_entry_noflush(&entry)?; + self.index.flush()?; + self.data.flush()?; + Ok(()) + } + pub fn write_entries(&mut self, entries: I) -> io::Result<()> where I: IntoIterator, { for entry in entries { - self.write_entry(&entry)?; + self.write_entry_noflush(&entry)?; } + self.index.flush()?; + self.data.flush()?; Ok(()) } } @@ -672,6 +686,7 @@ mod tests { writer.write_entries(entries.clone()).unwrap(); // drops writer, flushes buffers } + verify_ledger(&ledger_path, false).unwrap(); let mut read_entries = vec![]; for x in read_ledger(&ledger_path).unwrap() { @@ -703,6 +718,7 @@ mod tests { writer.write_entries(entries).unwrap(); writer.data.seek(SeekFrom::Current(0)).unwrap() }; + verify_ledger(&ledger_path, false).unwrap(); let data = OpenOptions::new() .write(true) @@ -758,6 +774,8 @@ mod tests { let mut writer = LedgerWriter::new(&ledger_path, false).unwrap(); writer.write_entry(&entries[entries.len() - 1]).unwrap(); } + verify_ledger(&ledger_path, false).unwrap(); + read_ledger_check(&ledger_path, entries.clone(), entries.len()); ledger_window_check(&ledger_path, entries.clone(), entries.len()); @@ -775,6 +793,7 @@ mod tests { let mut writer = LedgerWriter::new(&ledger_path, false).unwrap(); writer.write_entry(&entries[entries.len() - 1]).unwrap(); } + verify_ledger(&ledger_path, false).unwrap(); read_ledger_check(&ledger_path, entries.clone(), entries.len()); ledger_window_check(&ledger_path, entries.clone(), entries.len()); let _ignored = remove_dir_all(&ledger_path); diff --git a/src/write_stage.rs b/src/write_stage.rs index 2f39ca211b..b7fcf0df63 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -43,8 +43,9 @@ impl WriteStage { let votes = entries_to_votes(&entries); crdt.write().unwrap().insert_votes(&votes); + ledger_writer.write_entries(entries.clone())?; + for entry in entries.clone() { - ledger_writer.write_entry(&entry)?; if !entry.has_more { bank.register_entry_id(&entry.id); }