diff --git a/src/bin/genesis.rs b/src/bin/genesis.rs index 787a7ef646..e10aa377ba 100644 --- a/src/bin/genesis.rs +++ b/src/bin/genesis.rs @@ -55,7 +55,7 @@ fn main() -> Result<(), Box> { let pkcs8: Vec = serde_json::from_str(&buffer)?; let mint = Mint::new_with_pkcs8(tokens, pkcs8); - let mut ledger_writer = LedgerWriter::new(&ledger_path, true)?; + let mut ledger_writer = LedgerWriter::open(&ledger_path, true)?; ledger_writer.write_entries(mint.create_entries())?; Ok(()) diff --git a/src/bin/ledger-tool.rs b/src/bin/ledger-tool.rs index 53502535cb..38334d46f9 100644 --- a/src/bin/ledger-tool.rs +++ b/src/bin/ledger-tool.rs @@ -28,7 +28,7 @@ fn main() { .get_matches(); let ledger_path = matches.value_of("ledger").unwrap(); - let entries = match read_ledger(ledger_path) { + let entries = match read_ledger(ledger_path, true) { Ok(entries) => entries, Err(err) => { println!("Failed to open ledger at {}: {}", ledger_path, err); diff --git a/src/crdt.rs b/src/crdt.rs index fd3345081a..ee05a72306 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -1211,7 +1211,7 @@ impl Crdt { ) -> JoinHandle<()> { let debug_id = obj.read().unwrap().debug_id(); - let mut ledger_window = ledger_path.map(|p| LedgerWindow::new(p).unwrap()); + let mut ledger_window = ledger_path.map(|p| LedgerWindow::open(p).unwrap()); Builder::new() .name("solana-listen".to_string()) @@ -1914,7 +1914,7 @@ mod tests { let path = format!("/tmp/farf/{}-{}", name, keypair.pubkey()); - let mut writer = LedgerWriter::new(&path, true).unwrap(); + let mut writer = LedgerWriter::open(&path, true).unwrap(); let zero = Hash::default(); let one = hash(&zero.as_ref()); writer @@ -1924,7 +1924,7 @@ mod tests { } let ledger_path = tmp_ledger("run_window_request"); - let mut ledger_window = LedgerWindow::new(&ledger_path).unwrap(); + let mut ledger_window = LedgerWindow::open(&ledger_path).unwrap(); let rv = Crdt::run_window_request( &window, diff --git a/src/fullnode.rs b/src/fullnode.rs index 96de8484c2..e366f0de38 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -60,7 +60,7 @@ impl FullNode { info!("creating bank..."); let bank = Bank::new_default(leader); - let entries = read_ledger(ledger_path).expect("opening ledger"); + let entries = read_ledger(ledger_path, true).expect("opening ledger"); let entries = entries.map(|e| e.expect("failed to parse entry")); diff --git a/src/ledger.rs b/src/ledger.rs index 6657a0a126..d76c745cfa 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -93,11 +93,10 @@ fn u64_at(file: &mut A, at: u64) -> io::Result { impl LedgerWindow { // opens a Ledger in directory, provides "infinite" window - pub fn new(ledger_path: &str) -> io::Result { + // + pub fn open(ledger_path: &str) -> io::Result { let ledger_path = Path::new(&ledger_path); - recover_ledger(ledger_path)?; - let index = File::open(ledger_path.join("index"))?; let index = BufReader::with_capacity((WINDOW_SIZE * SIZEOF_U64) as usize, index); let data = File::open(ledger_path.join("data"))?; @@ -112,13 +111,9 @@ impl LedgerWindow { } } -pub fn verify_ledger(ledger_path: &str, recover: bool) -> io::Result<()> { +pub fn verify_ledger(ledger_path: &str) -> io::Result<()> { let ledger_path = Path::new(&ledger_path); - if recover { - recover_ledger(ledger_path)?; - } - let index = File::open(ledger_path.join("index"))?; let index_len = index.metadata()?.len(); @@ -150,10 +145,22 @@ pub fn verify_ledger(ledger_path: &str, recover: bool) -> io::Result<()> { ))?; } - let entry = entry_at(&mut data, data_offset)?; - last_len = serialized_size(&entry).map_err(err_bincode_to_io)? + SIZEOF_U64; - last_data_offset = data_offset; + match entry_at(&mut data, data_offset) { + Err(e) => Err(io::Error::new( + io::ErrorKind::Other, + format!( + "entry[{}] deserialize() failed at offset {}, err: {}", + index_offset / SIZEOF_U64, + data_offset, + e.to_string(), + ), + ))?, + Ok(entry) => { + last_len = serialized_size(&entry).map_err(err_bincode_to_io)? + SIZEOF_U64 + } + } + last_data_offset = data_offset; data_read += last_len; index_offset += SIZEOF_U64; } @@ -167,7 +174,8 @@ pub fn verify_ledger(ledger_path: &str, recover: bool) -> io::Result<()> { Ok(()) } -fn recover_ledger(ledger_path: &Path) -> io::Result<()> { +fn recover_ledger(ledger_path: &str) -> io::Result<()> { + let ledger_path = Path::new(ledger_path); let mut index = OpenOptions::new() .write(true) .read(true) @@ -261,16 +269,21 @@ pub struct LedgerWriter { } impl LedgerWriter { + // recover and open the ledger for writing + pub fn recover(ledger_path: &str) -> io::Result { + recover_ledger(ledger_path)?; + LedgerWriter::open(ledger_path, false) + } + // opens or creates a LedgerWriter in ledger_path directory - pub fn new(ledger_path: &str, create: bool) -> io::Result { + pub fn open(ledger_path: &str, create: bool) -> io::Result { let ledger_path = Path::new(&ledger_path); if create { let _ignored = remove_dir_all(ledger_path); create_dir_all(ledger_path)?; - } else { - recover_ledger(ledger_path)?; } + let index = OpenOptions::new() .create(create) .append(true) @@ -360,11 +373,15 @@ impl Iterator for LedgerReader { } /// Return an iterator for all the entries in the given file. -pub fn read_ledger(ledger_path: &str) -> io::Result>> { +pub fn read_ledger( + ledger_path: &str, + recover: bool, +) -> io::Result>> { + if recover { + recover_ledger(ledger_path)?; + } + let ledger_path = Path::new(&ledger_path); - - recover_ledger(ledger_path)?; - let data = File::open(ledger_path.join("data"))?; let data = BufReader::new(data); @@ -682,21 +699,21 @@ mod tests { let entries = make_tiny_test_entries(10); { - let mut writer = LedgerWriter::new(&ledger_path, true).unwrap(); + let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); writer.write_entries(entries.clone()).unwrap(); // drops writer, flushes buffers } - verify_ledger(&ledger_path, false).unwrap(); + verify_ledger(&ledger_path).unwrap(); let mut read_entries = vec![]; - for x in read_ledger(&ledger_path).unwrap() { + for x in read_ledger(&ledger_path, true).unwrap() { let entry = x.unwrap(); trace!("entry... {:?}", entry); read_entries.push(entry); } assert_eq!(read_entries, entries); - let mut window = LedgerWindow::new(&ledger_path).unwrap(); + let mut window = LedgerWindow::open(&ledger_path).unwrap(); for (i, entry) in entries.iter().enumerate() { let read_entry = window.get_entry(i as u64).unwrap(); @@ -706,19 +723,19 @@ mod tests { std::fs::remove_file(Path::new(&ledger_path).join("data")).unwrap(); // empty data file should fall over - assert!(LedgerWindow::new(&ledger_path).is_err()); - assert!(read_ledger(&ledger_path).is_err()); + assert!(LedgerWindow::open(&ledger_path).is_err()); + assert!(read_ledger(&ledger_path, false).is_err()); std::fs::remove_dir_all(ledger_path).unwrap(); } fn truncated_last_entry(ledger_path: &str, entries: Vec) { let len = { - let mut writer = LedgerWriter::new(&ledger_path, true).unwrap(); + let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); writer.write_entries(entries).unwrap(); writer.data.seek(SeekFrom::Current(0)).unwrap() }; - verify_ledger(&ledger_path, false).unwrap(); + verify_ledger(&ledger_path).unwrap(); let data = OpenOptions::new() .write(true) @@ -728,13 +745,13 @@ mod tests { } fn garbage_on_data(ledger_path: &str, entries: Vec) { - let mut writer = LedgerWriter::new(&ledger_path, true).unwrap(); + let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); writer.write_entries(entries).unwrap(); writer.data.write_all(b"hi there!").unwrap(); } fn read_ledger_check(ledger_path: &str, entries: Vec, len: usize) { - let read_entries = read_ledger(&ledger_path).unwrap(); + let read_entries = read_ledger(&ledger_path, true).unwrap(); let mut i = 0; for entry in read_entries { @@ -745,7 +762,7 @@ mod tests { } fn ledger_window_check(ledger_path: &str, entries: Vec, len: usize) { - let mut window = LedgerWindow::new(&ledger_path).unwrap(); + let mut window = LedgerWindow::open(&ledger_path).unwrap(); for i in 0..len { let entry = window.get_entry(i as u64); assert_eq!(entry.unwrap(), entries[i]); @@ -770,11 +787,14 @@ mod tests { // restore last entry, tests recover_ledger() inside LedgerWriter::new() truncated_last_entry(&ledger_path, entries.clone()); + // verify should fail at first + assert!(verify_ledger(&ledger_path).is_err()); { - let mut writer = LedgerWriter::new(&ledger_path, false).unwrap(); + let mut writer = LedgerWriter::recover(&ledger_path).unwrap(); writer.write_entry(&entries[entries.len() - 1]).unwrap(); } - verify_ledger(&ledger_path, false).unwrap(); + // and be fine after recover() + verify_ledger(&ledger_path).unwrap(); read_ledger_check(&ledger_path, entries.clone(), entries.len()); ledger_window_check(&ledger_path, entries.clone(), entries.len()); @@ -789,11 +809,12 @@ mod tests { // make it look like data is newer in time, check writer... garbage_on_data(&ledger_path, entries[..entries.len() - 1].to_vec()); + assert!(verify_ledger(&ledger_path).is_err()); { - let mut writer = LedgerWriter::new(&ledger_path, false).unwrap(); + let mut writer = LedgerWriter::recover(&ledger_path).unwrap(); writer.write_entry(&entries[entries.len() - 1]).unwrap(); } - verify_ledger(&ledger_path, false).unwrap(); + verify_ledger(&ledger_path).unwrap(); read_ledger_check(&ledger_path, entries.clone(), entries.len()); ledger_window_check(&ledger_path, entries.clone(), entries.len()); let _ignored = remove_dir_all(&ledger_path); @@ -807,11 +828,13 @@ mod tests { let entries = make_tiny_test_entries(10); let ledger_path = tmp_ledger_path("test_verify_ledger"); { - let mut writer = LedgerWriter::new(&ledger_path, true).unwrap(); + let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); writer.write_entries(entries.clone()).unwrap(); } + // TODO more cases that make ledger_verify() fail + // assert!(verify_ledger(&ledger_path).is_err()); - assert!(verify_ledger(&ledger_path, false).is_ok()); + assert!(verify_ledger(&ledger_path).is_ok()); let _ignored = remove_dir_all(&ledger_path); } diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 3a3ed7fbb6..744f5b4bab 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -98,7 +98,7 @@ impl ReplicateStage { exit, ); - let mut ledger_writer = ledger_path.map(|p| LedgerWriter::new(p, false).unwrap()); + let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, false).unwrap()); let t_replicate = Builder::new() .name("solana-replicate-stage".to_string()) diff --git a/src/thin_client.rs b/src/thin_client.rs index ee5e0b8d04..5023652660 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -302,7 +302,7 @@ mod tests { let path = format!("/tmp/tmp-ledger-{}-{}", name, keypair.pubkey()); - let mut writer = LedgerWriter::new(&path, true).unwrap(); + let mut writer = LedgerWriter::open(&path, true).unwrap(); writer.write_entries(mint.create_entries()).unwrap(); path diff --git a/src/write_stage.rs b/src/write_stage.rs index b7fcf0df63..c43aa4739c 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -86,7 +86,7 @@ impl WriteStage { vote_blob_receiver, ); let (blob_sender, blob_receiver) = channel(); - let mut ledger_writer = LedgerWriter::new(ledger_path, false).unwrap(); + let mut ledger_writer = LedgerWriter::recover(ledger_path).unwrap(); let thread_hdl = Builder::new() .name("solana-writer".to_string()) diff --git a/tests/multinode.rs b/tests/multinode.rs index b61f083e14..6b9cf569a9 100755 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -88,7 +88,7 @@ fn genesis(name: &str, num: i64) -> (Mint, String) { let mint = Mint::new(num); let path = tmp_ledger_path(name); - let mut writer = LedgerWriter::new(&path, true).unwrap(); + let mut writer = LedgerWriter::open(&path, true).unwrap(); writer.write_entries(mint.create_entries()).unwrap(); @@ -141,7 +141,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { // and force him to respond to repair from the ledger window { let entries = make_tiny_test_entries(alice.last_id(), WINDOW_SIZE as usize * 2); - let mut writer = LedgerWriter::new(&leader_ledger_path, false).unwrap(); + let mut writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); writer.write_entries(entries).unwrap(); }