diff --git a/src/bin/genesis.rs b/src/bin/genesis.rs index b1a7e11337..c51a059a5f 100644 --- a/src/bin/genesis.rs +++ b/src/bin/genesis.rs @@ -54,7 +54,7 @@ fn main() -> Result<(), Box> { let pkcs8: Vec = serde_json::from_str(&buffer)?; let mint = Mint::new_with_pkcs8(tokens, pkcs8); - let mut ledger_writer = LedgerWriter::new(&ledger_path)?; + let mut ledger_writer = LedgerWriter::new(&ledger_path, true)?; ledger_writer.write_entries(mint.create_entries())?; Ok(()) diff --git a/src/bin/ledger-tool.rs b/src/bin/ledger-tool.rs index 11e05a8db7..9cc7f92034 100644 --- a/src/bin/ledger-tool.rs +++ b/src/bin/ledger-tool.rs @@ -23,9 +23,11 @@ fn main() { let entries = read_ledger(ledger_path).expect("opening ledger"); + stdout().write_all(b"{\"ledger\":[\n").expect("open array"); for entry in entries { let entry = entry.unwrap(); serde_json::to_writer(stdout(), &entry).expect("serialize"); - stdout().write_all(b"\n").expect("newline"); + stdout().write_all(b",\n").expect("newline"); } + stdout().write_all(b"\n]}\n").expect("close array"); } diff --git a/src/drone.rs b/src/drone.rs index be1d3ea3ae..1b6b70fa17 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -262,7 +262,7 @@ mod tests { fn tmp_ledger_path(name: &str) -> String { let keypair = KeyPair::new(); - format!("farf/{}-{}", name, keypair.pubkey()) + format!("/tmp/farf/{}-{}", name, keypair.pubkey()) } #[test] diff --git a/src/fullnode.rs b/src/fullnode.rs index e046cb3c87..d1e6fb7795 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -92,7 +92,7 @@ impl FullNode { node, &network_entry_point, exit.clone(), - ledger_path, + Some(ledger_path), sigverify_disabled, ); info!( @@ -300,7 +300,7 @@ impl FullNode { node: TestNode, entry_point: &NodeInfo, exit: Arc, - ledger_path: &str, + ledger_path: Option<&str>, _sigverify_disabled: bool, ) -> Self { let bank = Arc::new(bank); @@ -377,16 +377,9 @@ mod tests { use mint::Mint; use service::Service; use signature::{KeyPair, KeyPairUtil}; - use std::fs::remove_dir_all; use std::sync::atomic::AtomicBool; use std::sync::Arc; - fn tmp_ledger_path(name: &str) -> String { - let keypair = KeyPair::new(); - - format!("farf/{}-{}", name, keypair.pubkey()) - } - #[test] fn validator_exit() { let kp = KeyPair::new(); @@ -395,15 +388,13 @@ mod tests { let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); let entry = tn.data.clone(); - let lp = tmp_ledger_path("validator_exit"); - let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, &lp, false); + let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, None, false); v.exit(); v.join().unwrap(); - remove_dir_all(lp).unwrap(); } #[test] fn validator_parallel_exit() { - let vals: Vec<(FullNode, String)> = (0..2) + let vals: Vec = (0..2) .map(|_| { let kp = KeyPair::new(); let tn = TestNode::new_localhost_with_pubkey(kp.pubkey()); @@ -411,20 +402,15 @@ mod tests { let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); let entry = tn.data.clone(); - let lp = tmp_ledger_path("validator_parallel_exit"); - ( - FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, &lp, false), - lp, - ) + FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, None, false) }) .collect(); //each validator can exit in parallel to speed many sequential calls to `join` - vals.iter().for_each(|v| v.0.exit()); + vals.iter().for_each(|v| v.exit()); //while join is called sequentially, the above exit call notified all the //validators to exit from all their threads vals.into_iter().for_each(|v| { - v.0.join().unwrap(); - remove_dir_all(v.1).unwrap() + v.join().unwrap(); }); } } diff --git a/src/ledger.rs b/src/ledger.rs index 0760f92945..c1db511dc8 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -5,54 +5,56 @@ use bincode::{self, deserialize, deserialize_from, serialize_into, serialized_size}; use entry::Entry; use hash::Hash; +use log::Level::Trace; use packet::{self, SharedBlob, BLOB_DATA_SIZE}; use rayon::prelude::*; use result::{Error, Result}; use std::collections::VecDeque; -use std::fs::{create_dir_all, File, OpenOptions}; +use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions}; use std::io::prelude::*; use std::io::{self, Cursor, Seek, SeekFrom}; use std::mem::size_of; use std::path::Path; use transaction::Transaction; -/// -/// A persistent ledger is 2 files: -/// ledger_path/ --+ -/// +-- index <== an array of u64 offsets into data, -/// | each offset points to the first bytes -/// | of a usize that contains the length of -/// | the entry -/// +-- data <== concatenated usize length + entry data -/// -/// When opening a ledger, we have the ability to "audit" it, which means -/// we need to pick which file to use as "truth", and correct the other -/// file as necessary, if possible. -/// -/// The protocol for writing the ledger is to append to the data file first, -/// the index file 2nd. If interupted while writing, there are 2 -/// possibilities we need to cover: -/// -/// 1. a partial write of data, which might be a partial write of length -/// or a partial write entry data. -/// 2. a partial or missing write to index for that entry -/// -/// There is also the possibility of "unsynchronized" reading of the ledger -/// during transfer across nodes via rsync (or whatever). In this case, -/// if the transfer of the data file is done before the transfer of the -/// index file, it's possible (likely?) that the index file will be far -/// ahead of the data file in time. -/// -/// The quickest and most reliable strategy for recovery is therefore to -/// treat the data file as nearest to the "truth". -/// -/// The logic for "recovery/audit" is to open index, reading backwards -/// from the last u64-aligned entry to get to where index and data -/// agree (i.e. where a successful deserialization of an entry can -/// be performed). If index is ahead in time, truncate the index file -/// to match data. If index is behind in time, truncate data to the -/// last entry listed in index. -/// +// +// A persistent ledger is 2 files: +// ledger_path/ --+ +// +-- index <== an array of u64 offsets into data, +// | each offset points to the first bytes +// | of a u64 that contains the length of +// | the entry. To make the code smaller, +// | index[0] is set to 0, TODO: this field +// | could later be used for other stuff... +// +-- data <== concatenated instances of +// u64 length +// entry data +// +// When opening a ledger, we have the ability to "audit" it, which means we need +// to pick which file to use as "truth", and correct the other file as +// necessary, if possible. +// +// The protocol for writing the ledger is to append to the data file first, the +// index file 2nd. If the writing node is interupted while appending to the +// ledger, there are some possibilities we need to cover: +// +// 1. a partial write of data, which might be a partial write of length +// or a partial write entry data +// 2. a partial or missing write to index for that entry +// +// There is also the possibility of "unsynchronized" reading of the ledger +// during transfer across nodes via rsync (or whatever). In this case, if the +// transfer of the data file is done before the transfer of the index file, +// it's likely that the index file will be far ahead of the data file in time. +// +// The quickest and most reliable strategy for recovery is therefore to treat +// the data file as nearest to the "truth". +// +// The logic for "recovery/audit" is to open index and read backwards from the +// last u64-aligned entry to get to where index and data agree (i.e. where a +// successful deserialization of an entry can be performed), then truncate +// both files to this syncrhonization point. +// // ledger window #[derive(Debug)] @@ -63,7 +65,6 @@ pub struct LedgerWindow { // use a CONST because there's a cast, and we don't want "sizeof:: as u64"... const SIZEOF_U64: u64 = size_of::() as u64; -const SIZEOF_USIZE: u64 = size_of::() as u64; #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] fn err_bincode_to_io(e: Box) -> io::Error { @@ -73,13 +74,14 @@ fn err_bincode_to_io(e: Box) -> io::Error { fn entry_at(file: &mut File, at: u64) -> io::Result { file.seek(SeekFrom::Start(at))?; - let len = deserialize_from(file.take(SIZEOF_USIZE)).map_err(err_bincode_to_io)?; + let len = deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)?; + trace!("entry_at({}) len: {}", at, len); deserialize_from(file.take(len)).map_err(err_bincode_to_io) } fn next_entry(file: &mut File) -> io::Result { - let len = deserialize_from(file.take(SIZEOF_USIZE)).map_err(err_bincode_to_io)?; + let len = deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)?; deserialize_from(file.take(len)).map_err(err_bincode_to_io) } @@ -90,11 +92,13 @@ fn u64_at(file: &mut File, at: u64) -> io::Result { impl LedgerWindow { // opens a Ledger in directory, provides "infinite" window - pub fn new(directory: &str) -> io::Result { - let directory = Path::new(&directory); + pub fn new(ledger_path: &str) -> io::Result { + let ledger_path = Path::new(&ledger_path); - let index = File::open(directory.join("index"))?; - let data = File::open(directory.join("data"))?; + recover_ledger(ledger_path)?; + + let index = File::open(ledger_path.join("index"))?; + let data = File::open(ledger_path.join("data"))?; Ok(LedgerWindow { index, data }) } @@ -105,6 +109,124 @@ impl LedgerWindow { } } +pub fn verify_ledger(ledger_path: &str, recover: bool) -> io::Result<()> { + let ledger_path = Path::new(&ledger_path); + + if recover { + recover_ledger(ledger_path)?; + } + + let mut index = File::open(ledger_path.join("index"))?; + let mut data = File::open(ledger_path.join("data"))?; + + let index_len = index.metadata()?.len(); + + if index_len % SIZEOF_U64 != 0 { + Err(io::Error::new( + io::ErrorKind::Other, + "expected back-to-back entries", + ))?; + } + + let mut last_data_offset = 0; + let mut index_offset = 0; + let mut data_read = 0; + let mut last_len = 0; + + while index_offset < index_len { + let data_offset = u64_at(&mut index, index_offset)?; + + if last_data_offset + last_len != data_offset { + Err(io::Error::new( + io::ErrorKind::Other, + "expected back-to-back entries", + ))?; + } + + let entry = entry_at(&mut data, data_offset)?; + last_len = serialized_size(&entry).map_err(err_bincode_to_io)? + SIZEOF_U64; + last_data_offset = data_offset; + + data_read += last_len; + index_offset += SIZEOF_U64; + } + if data_read != data.metadata()?.len() { + Err(io::Error::new( + io::ErrorKind::Other, + "garbage on end of data file", + ))?; + } + Ok(()) +} + +fn recover_ledger(ledger_path: &Path) -> io::Result<()> { + let mut index = OpenOptions::new() + .write(true) + .read(true) + .open(ledger_path.join("index"))?; + + let mut data = OpenOptions::new() + .write(true) + .read(true) + .open(ledger_path.join("data"))?; + + // first, truncate to a multiple of SIZEOF_U64 + let len = index.metadata()?.len(); + + if len % SIZEOF_U64 != 0 { + trace!("recover: trimming index len to {}", len - len % SIZEOF_U64); + index.set_len(len - (len % SIZEOF_U64))?; + } + + // next, pull index offsets off one at a time until the last one points + // to a valid entry deserialization offset... + loop { + let len = index.metadata()?.len(); + trace!("recover: index len:{}", len); + + // should never happen + if len < SIZEOF_U64 { + trace!("recover: error index len {} too small", len); + + Err(io::Error::new(io::ErrorKind::Other, "empty ledger index"))?; + } + + let offset = u64_at(&mut index, len - SIZEOF_U64)?; + trace!("recover: offset[{}]: {}", (len / SIZEOF_U64) - 1, offset); + + match entry_at(&mut data, offset) { + Ok(entry) => { + trace!("recover: entry[{}]: {:?}", (len / SIZEOF_U64) - 1, entry); + + let entry_len = serialized_size(&entry).map_err(err_bincode_to_io)?; + + trace!("recover: entry_len: {}", entry_len); + + // now trim data file to size... + data.set_len(offset + SIZEOF_U64 + entry_len)?; + + trace!( + "recover: trimmed data file to {}", + offset + SIZEOF_U64 + entry_len + ); + + break; // all good + } + Err(err) => { + trace!( + "recover: no entry recovered at {} {}", + offset, + err.to_string() + ); + index.set_len(len - SIZEOF_U64)?; + } + } + } + // flush everything to disk... + index.sync_all()?; + data.sync_all() +} + // TODO?? ... we could open the files on demand to support [], but today // LedgerWindow needs "&mut self" // @@ -126,36 +248,67 @@ pub struct LedgerWriter { } impl LedgerWriter { - // opens or creates a LedgerWriter in directory - pub fn new(directory: &str) -> io::Result { - let directory = Path::new(&directory); + // opens or creates a LedgerWriter in ledger_path directory + pub fn new(ledger_path: &str, create: bool) -> io::Result { + let ledger_path = Path::new(&ledger_path); - create_dir_all(directory)?; - - let index = OpenOptions::new() - .create(true) + if create { + let _ignored = remove_dir_all(ledger_path); + create_dir_all(ledger_path)?; + } else { + recover_ledger(ledger_path)?; + } + let mut index = OpenOptions::new() + .create(create) .append(true) - .open(directory.join("index"))?; + .open(ledger_path.join("index"))?; - let data = OpenOptions::new() - .create(true) + if log_enabled!(Trace) { + let offset = index.seek(SeekFrom::Current(0))?; + trace!("LedgerWriter::new: index fp:{}", offset); + } + + let mut data = OpenOptions::new() + .create(create) .append(true) - .open(directory.join("data"))?; + .open(ledger_path.join("data"))?; + + if log_enabled!(Trace) { + let offset = data.seek(SeekFrom::Current(0))?; + trace!("LedgerWriter::new: data fp:{}", offset); + } Ok(LedgerWriter { index, data }) } pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> { - let offset = self.data.seek(SeekFrom::Current(0))?; - let len = serialized_size(&entry).map_err(err_bincode_to_io)?; serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?; + if log_enabled!(Trace) { + let offset = self.data.seek(SeekFrom::Current(0))?; + trace!("write_entry: after len data fp:{}", offset); + } + serialize_into(&mut self.data, &entry).map_err(err_bincode_to_io)?; - self.data.flush()?; + if log_enabled!(Trace) { + let offset = self.data.seek(SeekFrom::Current(0))?; + trace!("write_entry: after entry data fp:{}", offset); + } + + self.data.sync_data()?; + + let offset = self.data.seek(SeekFrom::Current(0))? - len - SIZEOF_U64; + trace!("write_entry: offset:{} len:{}", offset, len); serialize_into(&mut self.index, &offset).map_err(err_bincode_to_io)?; - self.index.flush() + + if log_enabled!(Trace) { + let offset = self.index.seek(SeekFrom::Current(0))?; + trace!("write_entry: end index fp:{}", offset); + } + + self.index.sync_data() } pub fn write_entries(&mut self, entries: I) -> io::Result<()> @@ -171,7 +324,6 @@ impl LedgerWriter { #[derive(Debug)] pub struct LedgerReader { - // index: File, data: File, } @@ -187,27 +339,16 @@ impl Iterator for LedgerReader { } /// Return an iterator for all the entries in the given file. -pub fn read_ledger(directory: &str) -> io::Result>> { - let directory = Path::new(&directory); +pub fn read_ledger(ledger_path: &str) -> io::Result>> { + let ledger_path = Path::new(&ledger_path); - // let index = OpenOptions::new().write(true).open(directory.join("index")); - let data = File::open(directory.join("data"))?; + recover_ledger(ledger_path)?; - // audit_ledger(index, data)?; + let data = File::open(ledger_path.join("data"))?; Ok(LedgerReader { data }) } -pub fn copy_ledger(from: &str, to: &str) -> io::Result<()> { - let mut to = LedgerWriter::new(to)?; - - for entry in read_ledger(from)? { - let entry = entry?; - to.write_entry(&entry)?; - } - Ok(()) -} - // a Block is a slice of Entries pub trait Block { /// Verifies the hashes and counts of a slice of transactions are all consistent. @@ -359,11 +500,13 @@ mod tests { fn tmp_ledger_path(name: &str) -> String { let keypair = KeyPair::new(); - format!("farf/{}-{}", name, keypair.pubkey()) + format!("/tmp/farf/{}-{}", name, keypair.pubkey()) } #[test] fn test_verify_slice() { + use logger; + logger::setup(); let zero = Hash::default(); let one = hash(&zero.as_ref()); assert!(vec![][..].verify(&zero)); // base case @@ -376,6 +519,25 @@ mod tests { assert!(!bad_ticks.verify(&zero)); // inductive step, bad } + fn make_tiny_test_entries(num: usize) -> Vec { + let zero = Hash::default(); + let one = hash(&zero.as_ref()); + let keypair = KeyPair::new(); + + let mut id = one; + let mut num_hashes = 0; + (0..num) + .map(|_| { + Entry::new_mut( + &mut id, + &mut num_hashes, + vec![Transaction::new_timestamp(&keypair, Utc::now(), one)], + false, + ) + }) + .collect() + } + fn make_test_entries() -> Vec { let zero = Hash::default(); let one = hash(&zero.as_ref()); @@ -407,6 +569,8 @@ mod tests { #[test] fn test_entries_to_blobs() { + use logger; + logger::setup(); let entries = make_test_entries(); let blob_recycler = BlobRecycler::default(); @@ -418,6 +582,8 @@ mod tests { #[test] fn test_bad_blobs_attack() { + use logger; + logger::setup(); let blob_recycler = BlobRecycler::default(); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); let blobs_q = packet::to_blobs(vec![(0, addr)], &blob_recycler).unwrap(); // <-- attack! @@ -480,10 +646,12 @@ mod tests { #[test] fn test_ledger_reader_writer() { + use logger; + logger::setup(); let ledger_path = tmp_ledger_path("test_ledger_reader_writer"); - let entries = make_test_entries(); + let entries = make_tiny_test_entries(10); - let mut writer = LedgerWriter::new(&ledger_path).unwrap(); + let mut writer = LedgerWriter::new(&ledger_path, true).unwrap(); writer.write_entries(entries.clone()).unwrap(); let mut read_entries = vec![]; @@ -509,28 +677,91 @@ mod tests { std::fs::remove_dir_all(ledger_path).unwrap(); } - #[test] - fn test_copy_ledger() { - let from = tmp_ledger_path("test_ledger_copy_from"); - let entries = make_test_entries(); - let mut writer = LedgerWriter::new(&from).unwrap(); + fn truncated_last_entry(ledger_path: &str, entries: Vec) { + let mut writer = LedgerWriter::new(&ledger_path, true).unwrap(); + writer.write_entries(entries).unwrap(); + let len = writer.data.seek(SeekFrom::Current(0)).unwrap(); + writer.data.set_len(len - 4).unwrap(); + } + + fn garbage_on_data(ledger_path: &str, entries: Vec) { + let mut writer = LedgerWriter::new(&ledger_path, true).unwrap(); + writer.write_entries(entries).unwrap(); + writer.data.write_all(b"hi there!").unwrap(); + } + + fn read_ledger_check(ledger_path: &str, entries: Vec, len: usize) { + let read_entries = read_ledger(&ledger_path).unwrap(); + let mut i = 0; + + for entry in read_entries { + assert_eq!(entry.unwrap(), entries[i]); + i += 1; + } + assert_eq!(i, len); + } + + fn ledger_window_check(ledger_path: &str, entries: Vec, len: usize) { + let mut window = LedgerWindow::new(&ledger_path).unwrap(); + for i in 0..len { + let entry = window.get_entry(i as u64); + assert_eq!(entry.unwrap(), entries[i]); + } + } + + #[test] + fn test_recover_ledger() { + use logger; + logger::setup(); + + let entries = make_tiny_test_entries(10); + let ledger_path = tmp_ledger_path("test_recover_ledger"); + + // truncate data file, tests recover inside read_ledger_check() + truncated_last_entry(&ledger_path, entries.clone()); + read_ledger_check(&ledger_path, entries.clone(), entries.len() - 1); + + // truncate data file, tests recover inside LedgerWindow::new() + truncated_last_entry(&ledger_path, entries.clone()); + ledger_window_check(&ledger_path, entries.clone(), entries.len() - 1); + + // restore last entry, tests recover_ledger() inside LedgerWriter::new() + truncated_last_entry(&ledger_path, entries.clone()); + let mut writer = LedgerWriter::new(&ledger_path, false).unwrap(); + writer.write_entry(&entries[entries.len() - 1]).unwrap(); + read_ledger_check(&ledger_path, entries.clone(), entries.len()); + ledger_window_check(&ledger_path, entries.clone(), entries.len()); + + // make it look like data is newer in time, check reader... + garbage_on_data(&ledger_path, entries.clone()); + read_ledger_check(&ledger_path, entries.clone(), entries.len()); + + // make it look like data is newer in time, check window... + garbage_on_data(&ledger_path, entries.clone()); + ledger_window_check(&ledger_path, entries.clone(), entries.len()); + + // make it look like data is newer in time, check writer... + garbage_on_data(&ledger_path, entries[..entries.len() - 1].to_vec()); + let mut writer = LedgerWriter::new(&ledger_path, false).unwrap(); + writer.write_entry(&entries[entries.len() - 1]).unwrap(); + read_ledger_check(&ledger_path, entries.clone(), entries.len()); + ledger_window_check(&ledger_path, entries.clone(), entries.len()); + let _ignored = remove_dir_all(&ledger_path); + } + + #[test] + fn test_verify_ledger() { + use logger; + logger::setup(); + + let entries = make_tiny_test_entries(10); + let ledger_path = tmp_ledger_path("test_verify_ledger"); + let mut writer = LedgerWriter::new(&ledger_path, true).unwrap(); writer.write_entries(entries.clone()).unwrap(); - let to = tmp_ledger_path("test_ledger_copy_to"); - - copy_ledger(&from, &to).unwrap(); - - let mut read_entries = vec![]; - for x in read_ledger(&to).unwrap() { - let entry = x.unwrap(); - trace!("entry... {:?}", entry); - read_entries.push(entry); - } - assert_eq!(read_entries, entries); - - std::fs::remove_dir_all(from).unwrap(); - std::fs::remove_dir_all(to).unwrap(); + assert!(verify_ledger(&ledger_path, false).is_ok()); + let _ignored = remove_dir_all(&ledger_path); } } diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index e25f5ee0ad..a8efb1712b 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -31,7 +31,7 @@ impl ReplicateStage { crdt: &Arc>, blob_recycler: &BlobRecycler, window_receiver: &BlobReceiver, - ledger_writer: &mut LedgerWriter, + ledger_writer: Option<&mut LedgerWriter>, ) -> Result<()> { let timer = Duration::new(1, 0); //coalesce all the available blobs into a single vote @@ -50,8 +50,9 @@ impl ReplicateStage { "replicate-transactions", entries.iter().map(|x| x.transactions.len()).sum() ); - - ledger_writer.write_entries(entries.clone())?; + if let Some(ledger_writer) = ledger_writer { + ledger_writer.write_entries(entries.clone())?; + } let res = bank.process_entries(entries); @@ -70,7 +71,7 @@ impl ReplicateStage { crdt: Arc>, blob_recycler: BlobRecycler, window_receiver: BlobReceiver, - ledger_path: &str, + ledger_path: Option<&str>, exit: Arc, ) -> Self { let (vote_blob_sender, vote_blob_receiver) = channel(); @@ -90,7 +91,8 @@ impl ReplicateStage { vote_blob_sender, exit, ); - let mut ledger_writer = LedgerWriter::new(ledger_path).unwrap(); + + let mut ledger_writer = ledger_path.map(|p| LedgerWriter::new(p, false).unwrap()); let t_replicate = Builder::new() .name("solana-replicate-stage".to_string()) @@ -100,7 +102,7 @@ impl ReplicateStage { &crdt, &blob_recycler, &window_receiver, - &mut ledger_writer, + ledger_writer.as_mut(), ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, diff --git a/src/thin_client.rs b/src/thin_client.rs index d5bee03b75..70bc388e74 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -285,6 +285,7 @@ mod tests { use budget::Budget; use crdt::TestNode; use fullnode::FullNode; + use ledger::LedgerWriter; use logger; use mint::Mint; use service::Service; @@ -294,10 +295,15 @@ mod tests { use std::sync::Arc; use transaction::{Instruction, Plan}; - fn tmp_ledger_path(name: &str) -> String { + fn tmp_ledger(name: &str, mint: &Mint) -> String { let keypair = KeyPair::new(); - format!("farf/{}-{}", name, keypair.pubkey()) + let path = format!("/tmp/farf/{}-{}", name, keypair.pubkey()); + + let mut writer = LedgerWriter::new(&path, true).unwrap(); + writer.write_entries(mint.create_entries()).unwrap(); + + path } #[test] @@ -311,7 +317,7 @@ mod tests { let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let ledger_path = tmp_ledger_path("thin_client"); + let ledger_path = tmp_ledger("thin_client", &alice); let server = FullNode::new_leader( leader_keypair, @@ -358,7 +364,7 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let leader_data = leader.data.clone(); - let ledger_path = tmp_ledger_path("bad_sig"); + let ledger_path = tmp_ledger("bad_sig", &alice); let server = FullNode::new_leader( leader_keypair, @@ -418,7 +424,7 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let leader_data = leader.data.clone(); - let ledger_path = tmp_ledger_path("client_check_signature"); + let ledger_path = tmp_ledger("client_check_signature", &alice); let server = FullNode::new_leader( leader_keypair, diff --git a/src/tvu.rs b/src/tvu.rs index 56d8290676..c1406cd796 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -78,7 +78,7 @@ impl Tvu { replicate_socket: UdpSocket, repair_socket: UdpSocket, retransmit_socket: UdpSocket, - ledger_path: &str, + ledger_path: Option<&str>, exit: Arc, ) -> Self { let blob_recycler = BlobRecycler::default(); @@ -154,7 +154,6 @@ pub mod tests { use service::Service; use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; - use std::fs::remove_dir_all; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; @@ -175,12 +174,6 @@ pub mod tests { Ok((ncp, window)) } - fn tmp_ledger_path(name: &str) -> String { - let keypair = KeyPair::new(); - - format!("farf/{}-{}", name, keypair.pubkey()) - } - /// Test that message sent from leader to target1 and replicated to target2 #[test] fn test_replicate() { @@ -240,7 +233,6 @@ pub mod tests { let cref1 = Arc::new(RwLock::new(crdt1)); let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()).unwrap(); - let ledger_path = tmp_ledger_path("replicate"); let tvu = Tvu::new( target1_keypair, &bank, @@ -250,7 +242,7 @@ pub mod tests { target1.sockets.replicate, target1.sockets.repair, target1.sockets.retransmit, - &ledger_path, + None, exit.clone(), ); @@ -320,6 +312,5 @@ pub mod tests { dr_1.0.join().expect("join"); t_receiver.join().expect("join"); t_responder.join().expect("join"); - remove_dir_all(ledger_path).unwrap(); } } diff --git a/src/write_stage.rs b/src/write_stage.rs index 3436a2138a..92035a52d7 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -84,7 +84,7 @@ impl WriteStage { vote_blob_receiver, ); let (blob_sender, blob_receiver) = channel(); - let mut ledger_writer = LedgerWriter::new(ledger_path).unwrap(); + let mut ledger_writer = LedgerWriter::new(ledger_path, false).unwrap(); let thread_hdl = Builder::new() .name("solana-writer".to_string()) diff --git a/tests/multinode.rs b/tests/multinode.rs index 27a09efa47..249c566b33 100755 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -6,7 +6,7 @@ extern crate solana; use solana::crdt::{Crdt, NodeInfo, TestNode}; use solana::fullnode::FullNode; -use solana::ledger::{copy_ledger, LedgerWriter}; +use solana::ledger::{read_ledger, LedgerWriter}; use solana::logger; use solana::mint::Mint; use solana::ncp::Ncp; @@ -18,6 +18,7 @@ use solana::timing::duration_as_s; use std::cmp::max; use std::env; use std::fs::remove_dir_all; +use std::io; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; @@ -75,20 +76,54 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { fn tmp_ledger_path(name: &str) -> String { let keypair = KeyPair::new(); - format!("farf/{}-{}", name, keypair.pubkey()) + format!("/tmp/farf/{}-{}", name, keypair.pubkey()) } fn genesis(name: &str, num: i64) -> (Mint, String) { let mint = Mint::new(num); let path = tmp_ledger_path(name); - let mut writer = LedgerWriter::new(&path).unwrap(); + let mut writer = LedgerWriter::new(&path, true).unwrap(); writer.write_entries(mint.create_entries()).unwrap(); (mint, path) } +//#[test] +//fn test_copy_ledger() { +// let from = tmp_ledger_path("test_ledger_copy_from"); +// let entries = make_tiny_test_entries(10); +// +// let mut writer = LedgerWriter::new(&from, true).unwrap(); +// writer.write_entries(entries.clone()).unwrap(); +// +// let to = tmp_ledger_path("test_ledger_copy_to"); +// +// copy_ledger(&from, &to).unwrap(); +// +// let mut read_entries = vec![]; +// for x in read_ledger(&to).unwrap() { +// let entry = x.unwrap(); +// trace!("entry... {:?}", entry); +// read_entries.push(entry); +// } +// assert_eq!(read_entries, entries); +// +// std::fs::remove_dir_all(from).unwrap(); +// std::fs::remove_dir_all(to).unwrap(); +//} +// +fn copy_ledger(from: &str, to: &str) -> io::Result<()> { + let mut to = LedgerWriter::new(to, true)?; + + for entry in read_ledger(from)? { + let entry = entry?; + to.write_entry(&entry)?; + } + Ok(()) +} + fn tmp_copy_ledger(from: &str, name: &str) -> String { let to = tmp_ledger_path(name); copy_ledger(from, &to).unwrap(); @@ -110,6 +145,12 @@ fn test_multi_node_validator_catchup_from_zero() { let (alice, leader_ledger_path) = genesis("multi_node_validator_catchup_from_zero", 10_000); ledger_paths.push(leader_ledger_path.clone()); + let zero_ledger_path = tmp_copy_ledger( + &leader_ledger_path, + "multi_node_validator_catchup_from_zero", + ); + ledger_paths.push(zero_ledger_path.clone()); + let server = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None); // Send leader some tokens to vote @@ -158,18 +199,14 @@ fn test_multi_node_validator_catchup_from_zero() { assert_eq!(success, servers.len()); success = 0; - // start up another validator, converge and then check everyone's balances + // start up another validator from zero, converge and then check everyone's + // balances let keypair = KeyPair::new(); let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); - let ledger_path = tmp_copy_ledger( - &leader_ledger_path, - "multi_node_validator_catchup_from_zero", - ); - ledger_paths.push(ledger_path.clone()); let val = FullNode::new( validator, false, - &ledger_path, + &zero_ledger_path, keypair, Some(leader_data.contact_info.ncp), );