diff --git a/.gitignore b/.gitignore index 94a99c0561..5e24c10d2f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,5 @@ Cargo.lock /target/ - **/*.rs.bk .cargo @@ -12,5 +11,5 @@ Cargo.lock /config-client/ /multinode-demo/test/config-client/ -# test temp files, ledgers, etc. -/farf/ +# Generated log files +/log-*.txt diff --git a/multinode-demo/leader.sh b/multinode-demo/leader.sh index b0b031e1fa..f58ac643d7 100755 --- a/multinode-demo/leader.sh +++ b/multinode-demo/leader.sh @@ -28,7 +28,7 @@ tune_networking trap 'kill "$pid" && wait "$pid"' INT TERM $program \ --identity "$SOLANA_CONFIG_DIR"/leader.json \ - --ledger "$SOLANA_CONFIG_DIR"/ledger \ + --ledger "$SOLANA_CONFIG_DIR"/ledger.log \ > >($leader_logger) 2>&1 & pid=$! wait "$pid" diff --git a/multinode-demo/setup.sh b/multinode-demo/setup.sh index 094d413aeb..e0a2d6b05a 100755 --- a/multinode-demo/setup.sh +++ b/multinode-demo/setup.sh @@ -91,8 +91,8 @@ if $node_type_leader; then echo "Creating $mint_path with $num_tokens tokens" $solana_keygen -o "$mint_path" - echo "Creating $SOLANA_CONFIG_DIR/ledger" - $solana_genesis --tokens="$num_tokens" --ledger "$SOLANA_CONFIG_DIR"/ledger < "$mint_path" + echo "Creating $SOLANA_CONFIG_DIR/ledger.log" + $solana_genesis --tokens="$num_tokens" < "$mint_path" > "$SOLANA_CONFIG_DIR"/ledger.log echo "Creating $SOLANA_CONFIG_DIR/leader.json" $solana_fullnode_config --keypair="$leader_id_path" "${leader_address_args[@]}" > "$SOLANA_CONFIG_DIR"/leader.json diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index 35ac041bf4..7f0c51af65 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -71,8 +71,8 @@ SOLANA_LEADER_CONFIG_DIR="$SOLANA_CONFIG_DIR"/leader-config rm -rf "$SOLANA_LEADER_CONFIG_DIR" set -ex $rsync -vPrz --max-size=100M "$rsync_leader_url"/config/ "$SOLANA_LEADER_CONFIG_DIR" -[[ -d "$SOLANA_LEADER_CONFIG_DIR"/ledger ]] || { - echo "Unable to retrieve ledger from $rsync_leader_url" +[[ -r "$SOLANA_LEADER_CONFIG_DIR"/ledger.log ]] || { + echo "Unable to retrieve ledger.log from $rsync_leader_url" exit 1 } @@ -80,7 +80,7 @@ trap 'kill "$pid" && wait "$pid"' INT TERM $program \ --identity "$SOLANA_CONFIG_DIR"/validator.json \ --testnet "$leader_address:$leader_port" \ - --ledger "$SOLANA_LEADER_CONFIG_DIR"/ledger \ + --ledger "$SOLANA_LEADER_CONFIG_DIR"/ledger.log \ > >($validator_logger) 2>&1 & pid=$! wait "$pid" diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index f07e8c91cd..076b211615 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -8,7 +8,7 @@ use clap::{App, Arg}; use solana::client::mk_client; use solana::crdt::{NodeInfo, TestNode}; use solana::drone::DRONE_PORT; -use solana::fullnode::{Config, FullNode}; +use solana::fullnode::{Config, FullNode, LedgerFile}; use solana::logger; use solana::metrics::set_panic_hook; use solana::service::Service; @@ -41,12 +41,11 @@ fn main() -> () { ) .arg( Arg::with_name("ledger") - .short("l") + .short("L") .long("ledger") - .value_name("DIR") + .value_name("FILE") .takes_value(true) - .required(true) - .help("use DIR as persistent ledger location"), + .help("use FILE as persistent ledger (defaults to stdin/stdout)"), ) .get_matches(); @@ -73,7 +72,11 @@ fn main() -> () { let leader_pubkey = keypair.pubkey(); let repl_clone = repl_data.clone(); - let ledger_path = matches.value_of("ledger").unwrap(); + let ledger = if let Some(l) = matches.value_of("ledger") { + LedgerFile::Path(l.to_string()) + } else { + LedgerFile::StdInOut + }; let mut node = TestNode::new_with_bind_addr(repl_data, bind_addr); let mut drone_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), DRONE_PORT); @@ -82,11 +85,11 @@ fn main() -> () { let testnet_addr: SocketAddr = testnet_address_string.parse().unwrap(); drone_addr.set_ip(testnet_addr.ip()); - FullNode::new(node, false, ledger_path, keypair, Some(testnet_addr)) + FullNode::new(node, false, ledger, keypair, Some(testnet_addr)) } else { node.data.leader_id = node.data.id; - FullNode::new(node, true, ledger_path, keypair, None) + FullNode::new(node, true, ledger, keypair, None) }; let mut client = mk_client(&repl_clone); diff --git a/src/bin/genesis.rs b/src/bin/genesis.rs index b1a7e11337..8db6fe81a2 100644 --- a/src/bin/genesis.rs +++ b/src/bin/genesis.rs @@ -8,10 +8,10 @@ extern crate solana; use atty::{is, Stream}; use clap::{App, Arg}; -use solana::ledger::LedgerWriter; +use solana::entry_writer::EntryWriter; use solana::mint::Mint; use std::error; -use std::io::{stdin, Read}; +use std::io::{stdin, stdout, Read}; use std::process::exit; fn main() -> Result<(), Box> { @@ -25,19 +25,9 @@ fn main() -> Result<(), Box> { .required(true) .help("Number of tokens with which to initialize mint"), ) - .arg( - Arg::with_name("ledger") - .short("l") - .long("ledger") - .value_name("DIR") - .takes_value(true) - .required(true) - .help("use DIR as persistent ledger location"), - ) .get_matches(); let tokens = value_t_or_exit!(matches, "tokens", i64); - let ledger_path = matches.value_of("ledger").unwrap(); if is(Stream::Stdin) { eprintln!("nothing found on stdin, expected a json file"); @@ -54,8 +44,7 @@ fn main() -> Result<(), Box> { let pkcs8: Vec = serde_json::from_str(&buffer)?; let mint = Mint::new_with_pkcs8(tokens, pkcs8); - let mut ledger_writer = LedgerWriter::new(&ledger_path)?; - ledger_writer.write_entries(mint.create_entries())?; - + let mut writer = stdout(); + EntryWriter::write_entries(&mut writer, mint.create_entries())?; Ok(()) } diff --git a/src/drone.rs b/src/drone.rs index ce5a69cb6f..c0ec1acf19 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -164,7 +164,7 @@ mod tests { use mint::Mint; use service::Service; use signature::{KeyPair, KeyPairUtil}; - use std::fs::remove_dir_all; + use std::io::sink; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -259,21 +259,6 @@ mod tests { assert_eq!(drone.request_cap, REQUEST_CAP); } - fn tmp_ledger_path(name: &str) -> String { - let keypair = KeyPair::new(); - - let id = { - let ids: Vec<_> = keypair - .pubkey() - .iter() - .map(|id| format!("{}", id)) - .collect(); - ids.join("") - }; - - format!("farf/{}-{}", name, id) - } - #[test] #[ignore] fn test_send_airdrop() { @@ -290,7 +275,6 @@ mod tests { let carlos_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let leader_data = leader.data.clone(); - let ledger_path = tmp_ledger_path("send_airdrop"); let server = FullNode::new_leader( leader_keypair, @@ -300,7 +284,7 @@ mod tests { Some(Duration::from_millis(30)), leader, exit.clone(), - &ledger_path, + sink(), false, ); //TODO: this seems unstable @@ -352,6 +336,5 @@ mod tests { exit.store(true, Ordering::Relaxed); server.join().unwrap(); - remove_dir_all(ledger_path).unwrap(); } } diff --git a/src/fullnode.rs b/src/fullnode.rs index fe45f68147..a87207f942 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -3,13 +3,17 @@ use bank::Bank; use crdt::{Crdt, NodeInfo, TestNode}; use entry::Entry; -use ledger::{read_ledger, Block}; +use entry_writer; +use ledger::Block; use ncp::Ncp; use packet::BlobRecycler; use rpu::Rpu; use service::Service; use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; +use std::fs::{File, OpenOptions}; +use std::io::{stdin, stdout, BufReader}; +use std::io::{Read, Write}; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; @@ -26,6 +30,11 @@ pub struct FullNode { thread_hdls: Vec>, } +pub enum LedgerFile { + StdInOut, + Path(String), +} + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] /// Fullnode configuration to be stored in file pub struct Config { @@ -52,17 +61,28 @@ impl FullNode { fn new_internal( mut node: TestNode, leader: bool, - ledger_path: &str, + ledger: LedgerFile, keypair: KeyPair, network_entry_for_validator: Option, sigverify_disabled: bool, ) -> FullNode { info!("creating bank..."); let bank = Bank::default(); - - let entries = read_ledger(ledger_path).expect("opening ledger"); - - let entries = entries.map(|e| e.expect("failed to parse entry")); + let (infile, outfile): (Box, Box) = match ledger { + LedgerFile::Path(path) => ( + Box::new(File::open(path.clone()).expect("opening ledger file")), + Box::new( + OpenOptions::new() + .create(true) + .append(true) + .open(path) + .expect("opening ledger file"), + ), + ), + LedgerFile::StdInOut => (Box::new(stdin()), Box::new(stdout())), + }; + let reader = BufReader::new(infile); + let entries = entry_writer::read_entries(reader).map(|e| e.expect("failed to parse entry")); info!("processing ledger..."); let (entry_height, ledger_tail) = bank.process_ledger(entries).expect("process_ledger"); @@ -92,7 +112,6 @@ impl FullNode { node, &network_entry_point, exit.clone(), - ledger_path, sigverify_disabled, ); info!( @@ -112,7 +131,7 @@ impl FullNode { None, node, exit.clone(), - ledger_path, + outfile, sigverify_disabled, ); info!( @@ -126,7 +145,7 @@ impl FullNode { pub fn new( node: TestNode, leader: bool, - ledger: &str, + ledger: LedgerFile, keypair: KeyPair, network_entry_for_validator: Option, ) -> FullNode { @@ -143,7 +162,7 @@ impl FullNode { pub fn new_without_sigverify( node: TestNode, leader: bool, - ledger: &str, + ledger: LedgerFile, keypair: KeyPair, network_entry_for_validator: Option, ) -> FullNode { @@ -201,7 +220,7 @@ impl FullNode { /// | | `------------` /// `---------------------` /// ``` - pub fn new_leader( + pub fn new_leader( keypair: KeyPair, bank: Bank, entry_height: u64, @@ -209,7 +228,7 @@ impl FullNode { tick_duration: Option, node: TestNode, exit: Arc, - ledger_path: &str, + writer: W, sigverify_disabled: bool, ) -> Self { let bank = Arc::new(bank); @@ -226,9 +245,6 @@ impl FullNode { let window = FullNode::new_window(ledger_tail, entry_height, &node.data, &blob_recycler); let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new"))); - - // let mut ledger_writer = LedgerWriter::new(ledger_path); - let (tpu, blob_receiver) = Tpu::new( keypair, &bank, @@ -237,7 +253,7 @@ impl FullNode { node.sockets.transaction, &blob_recycler, exit.clone(), - ledger_path, + writer, sigverify_disabled, ); thread_hdls.extend(tpu.thread_hdls()); @@ -300,7 +316,6 @@ impl FullNode { node: TestNode, entry_point: &NodeInfo, exit: Arc, - ledger_path: &str, _sigverify_disabled: bool, ) -> Self { let bank = Arc::new(bank); @@ -338,7 +353,6 @@ impl FullNode { node.sockets.replicate, node.sockets.repair, node.sockets.retransmit, - ledger_path, exit.clone(), ); thread_hdls.extend(tvu.thread_hdls()); @@ -377,25 +391,8 @@ mod tests { use mint::Mint; use service::Service; use signature::{KeyPair, KeyPairUtil}; - use std::fs::remove_dir_all; use std::sync::atomic::AtomicBool; use std::sync::Arc; - - fn tmp_ledger_path(name: &str) -> String { - let keypair = KeyPair::new(); - - let id = { - let ids: Vec<_> = keypair - .pubkey() - .iter() - .map(|id| format!("{}", id)) - .collect(); - ids.join("") - }; - - format!("farf/{}-{}", name, id) - } - #[test] fn validator_exit() { let kp = KeyPair::new(); @@ -404,15 +401,13 @@ mod tests { let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); let entry = tn.data.clone(); - let lp = tmp_ledger_path("validator_exit"); - let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, &lp, false); + let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, false); v.exit(); v.join().unwrap(); - remove_dir_all(lp).unwrap(); } #[test] fn validator_parallel_exit() { - let vals: Vec<(FullNode, String)> = (0..2) + let vals: Vec = (0..2) .map(|_| { let kp = KeyPair::new(); let tn = TestNode::new_localhost_with_pubkey(kp.pubkey()); @@ -420,20 +415,13 @@ mod tests { let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); let entry = tn.data.clone(); - let lp = tmp_ledger_path("validator_parallel_exit"); - ( - FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, &lp, false), - lp, - ) + FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, false) }) .collect(); //each validator can exit in parallel to speed many sequential calls to `join` - vals.iter().for_each(|v| v.0.exit()); + vals.iter().for_each(|v| v.exit()); //while join is called sequentially, the above exit call notified all the //validators to exit from all their threads - vals.into_iter().for_each(|v| { - v.0.join().unwrap(); - remove_dir_all(v.1).unwrap() - }); + vals.into_iter().for_each(|v| v.join().unwrap()); } } diff --git a/src/ledger.rs b/src/ledger.rs index 10ad25d120..9a66f57628 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -39,15 +39,17 @@ fn entry_at(file: &mut File, at: u64) -> io::Result { deserialize_from(file.take(len)).map_err(err_bincode_to_io) } -//fn next_offset(file: &mut File) -> io::Result { -// deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io) -//} - -fn next_entry(file: &mut File) -> io::Result { - let len = deserialize_from(file.take(SIZEOF_USIZE)).map_err(err_bincode_to_io)?; - deserialize_from(file.take(len)).map_err(err_bincode_to_io) +fn next_offset(file: &mut File) -> io::Result { + deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io) } +// unused, but would work for the iterator if we only have the data file... +// +//fn next_entry(file: &mut File) -> io::Result { +// let len = deserialize_from(file.take(SIZEOF_USIZE)).map_err(err_bincode_to_io)?; +// deserialize_from(file.take(len)).map_err(err_bincode_to_io) +//} + fn u64_at(file: &mut File, at: u64) -> io::Result { file.seek(SeekFrom::Start(at))?; deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io) @@ -110,7 +112,7 @@ impl LedgerWriter { Ok(LedgerWriter { index, data }) } - pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> { + fn write_entry(&mut self, entry: &Entry) -> io::Result<()> { let offset = self.data.seek(SeekFrom::Current(0))?; let len = serialized_size(&entry).map_err(err_bincode_to_io)?; @@ -144,8 +146,8 @@ impl Iterator for LedgerReader { type Item = io::Result; fn next(&mut self) -> Option> { - match next_entry(&mut self.data) { - Ok(entry) => Some(Ok(entry)), + match next_offset(&mut self.index) { + Ok(offset) => Some(entry_at(&mut self.data, offset)), Err(_) => None, } } @@ -161,7 +163,7 @@ pub fn read_ledger(directory: &str) -> io::Result io::Result<()> { +pub fn copy(from: &str, to: &str) -> io::Result<()> { let mut to = LedgerWriter::new(to)?; for entry in read_ledger(from)? { @@ -319,12 +321,6 @@ mod tests { use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use transaction::{Transaction, Vote}; - fn tmp_ledger_path(name: &str) -> String { - let keypair = KeyPair::new(); - - format!("farf/{}-{}", name, keypair.pubkey()) - } - #[test] fn test_verify_slice() { let zero = Hash::default(); @@ -441,11 +437,18 @@ mod tests { assert!(entries0.verify(&id)); } - fn tmp_ledger_path() -> String {} + fn tmp_ledger_path() -> String { + let keypair = KeyPair::new(); + + format!( + "target/test_ledger_reader_writer_window-{}", + keypair.pubkey() + ) + } #[test] fn test_ledger_reader_writer() { - let ledger_path = tmp_ledger_path("test_ledger_reader_writer"); + let ledger_path = tmp_ledger_path(); let entries = make_test_entries(); let mut writer = LedgerWriter::new(&ledger_path).unwrap(); @@ -475,16 +478,16 @@ mod tests { std::fs::remove_dir_all(ledger_path).unwrap(); } #[test] - fn test_copy_ledger() { - let from = tmp_ledger_path("test_ledger_copy_from"); + fn test_ledger_copy() { + let from = tmp_ledger_path(); let entries = make_test_entries(); let mut writer = LedgerWriter::new(&from).unwrap(); writer.write_entries(entries.clone()).unwrap(); - let to = tmp_ledger_path("test_ledger_copy_to"); + let to = tmp_ledger_path(); - copy_ledger(&from, &to).unwrap(); + copy(&from, &to).unwrap(); let mut read_entries = vec![]; for x in read_ledger(&to).unwrap() { diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index e25f5ee0ad..dbe1051c0f 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -3,7 +3,7 @@ use bank::Bank; use counter::Counter; use crdt::Crdt; -use ledger::{reconstruct_entries_from_blobs, LedgerWriter}; +use ledger; use packet::BlobRecycler; use result::{Error, Result}; use service::Service; @@ -31,7 +31,6 @@ impl ReplicateStage { crdt: &Arc>, blob_recycler: &BlobRecycler, window_receiver: &BlobReceiver, - ledger_writer: &mut LedgerWriter, ) -> Result<()> { let timer = Duration::new(1, 0); //coalesce all the available blobs into a single vote @@ -40,7 +39,7 @@ impl ReplicateStage { blobs.append(&mut more); } let blobs_len = blobs.len(); - let entries = reconstruct_entries_from_blobs(blobs.clone())?; + let entries = ledger::reconstruct_entries_from_blobs(blobs.clone())?; { let votes = entries_to_votes(&entries); let mut wcrdt = crdt.write().unwrap(); @@ -50,11 +49,7 @@ impl ReplicateStage { "replicate-transactions", entries.iter().map(|x| x.transactions.len()).sum() ); - - ledger_writer.write_entries(entries.clone())?; - let res = bank.process_entries(entries); - if res.is_err() { error!("process_entries {} {:?}", blobs_len, res); } @@ -70,7 +65,6 @@ impl ReplicateStage { crdt: Arc>, blob_recycler: BlobRecycler, window_receiver: BlobReceiver, - ledger_path: &str, exit: Arc, ) -> Self { let (vote_blob_sender, vote_blob_receiver) = channel(); @@ -90,18 +84,13 @@ impl ReplicateStage { vote_blob_sender, exit, ); - let mut ledger_writer = LedgerWriter::new(ledger_path).unwrap(); let t_replicate = Builder::new() .name("solana-replicate-stage".to_string()) .spawn(move || loop { - if let Err(e) = Self::replicate_requests( - &bank, - &crdt, - &blob_recycler, - &window_receiver, - &mut ledger_writer, - ) { + if let Err(e) = + Self::replicate_requests(&bank, &crdt, &blob_recycler, &window_receiver) + { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), diff --git a/src/thin_client.rs b/src/thin_client.rs index df2bf47493..d09980dea4 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -289,26 +289,11 @@ mod tests { use mint::Mint; use service::Service; use signature::{KeyPair, KeyPairUtil}; - use std::fs::remove_dir_all; + use std::io::sink; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use transaction::{Instruction, Plan}; - fn tmp_ledger_path(name: &str) -> String { - let keypair = KeyPair::new(); - - let id = { - let ids: Vec<_> = keypair - .pubkey() - .iter() - .map(|id| format!("{}", id)) - .collect(); - ids.join("") - }; - - format!("farf/{}-{}", name, id) - } - #[test] fn test_thin_client() { logger::setup(); @@ -320,7 +305,6 @@ mod tests { let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let ledger_path = tmp_ledger_path("thin_client"); let server = FullNode::new_leader( leader_keypair, @@ -330,7 +314,7 @@ mod tests { Some(Duration::from_millis(30)), leader, exit.clone(), - &ledger_path, + sink(), false, ); sleep(Duration::from_millis(900)); @@ -367,7 +351,6 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let leader_data = leader.data.clone(); - let ledger_path = tmp_ledger_path("bad_sig"); let server = FullNode::new_leader( leader_keypair, @@ -377,7 +360,7 @@ mod tests { Some(Duration::from_millis(30)), leader, exit.clone(), - &ledger_path, + sink(), false, ); //TODO: remove this sleep, or add a retry so CI is stable @@ -414,7 +397,6 @@ mod tests { assert_eq!(balance.unwrap(), 500); exit.store(true, Ordering::Relaxed); server.join().unwrap(); - remove_dir_all(ledger_path).unwrap(); } #[test] @@ -427,8 +409,6 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let leader_data = leader.data.clone(); - let ledger_path = tmp_ledger_path("client_check_signature"); - let server = FullNode::new_leader( leader_keypair, bank, @@ -437,7 +417,7 @@ mod tests { Some(Duration::from_millis(30)), leader, exit.clone(), - &ledger_path, + sink(), false, ); sleep(Duration::from_millis(300)); @@ -463,6 +443,5 @@ mod tests { exit.store(true, Ordering::Relaxed); server.join().unwrap(); - remove_dir_all(ledger_path).unwrap(); } } diff --git a/src/tpu.rs b/src/tpu.rs index 90a4d2845c..d02f161cb4 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -34,6 +34,7 @@ use record_stage::RecordStage; use service::Service; use signature::KeyPair; use sigverify_stage::SigVerifyStage; +use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; @@ -51,7 +52,7 @@ pub struct Tpu { } impl Tpu { - pub fn new( + pub fn new( keypair: KeyPair, bank: &Arc, crdt: &Arc>, @@ -59,7 +60,7 @@ impl Tpu { transactions_socket: UdpSocket, blob_recycler: &BlobRecycler, exit: Arc, - ledger_path: &str, + writer: W, sigverify_disabled: bool, ) -> (Self, BlobReceiver) { let packet_recycler = PacketRecycler::default(); @@ -85,7 +86,7 @@ impl Tpu { bank.clone(), crdt.clone(), blob_recycler.clone(), - ledger_path, + writer, entry_receiver, ); diff --git a/src/tvu.rs b/src/tvu.rs index a3e96d631f..eb421f432e 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -77,7 +77,6 @@ impl Tvu { replicate_socket: UdpSocket, repair_socket: UdpSocket, retransmit_socket: UdpSocket, - ledger_path: &str, exit: Arc, ) -> Self { let blob_recycler = BlobRecycler::default(); @@ -104,7 +103,6 @@ impl Tvu { crdt, blob_recycler, blob_window_receiver, - ledger_path, exit, ); @@ -153,7 +151,6 @@ pub mod tests { use service::Service; use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; - use std::fs::remove_dir_all; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; @@ -173,22 +170,6 @@ pub mod tests { let ncp = Ncp::new(&crdt, window.clone(), listen, send_sock, exit)?; Ok((ncp, window)) } - - fn tmp_ledger_path(name: &str) -> String { - let keypair = KeyPair::new(); - - let id = { - let ids: Vec<_> = keypair - .pubkey() - .iter() - .map(|id| format!("{}", id)) - .collect(); - ids.join("") - }; - - format!("farf/{}-{}", name, id) - } - /// Test that message sent from leader to target1 and replicated to target2 #[test] fn test_replicate() { @@ -248,7 +229,6 @@ pub mod tests { let cref1 = Arc::new(RwLock::new(crdt1)); let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()).unwrap(); - let ledger_path = tmp_ledger_path("replicate"); let tvu = Tvu::new( target1_keypair, &bank, @@ -258,7 +238,6 @@ pub mod tests { target1.sockets.replicate, target1.sockets.repair, target1.sockets.retransmit, - &ledger_path, exit.clone(), ); @@ -328,6 +307,5 @@ pub mod tests { dr_1.0.join().expect("join"); t_receiver.join().expect("join"); t_responder.join().expect("join"); - remove_dir_all(ledger_path).unwrap(); } } diff --git a/src/write_stage.rs b/src/write_stage.rs index 3436a2138a..34de2328ce 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -6,12 +6,14 @@ use bank::Bank; use counter::Counter; use crdt::Crdt; use entry::Entry; -use ledger::{Block, LedgerWriter}; +use entry_writer::EntryWriter; +use ledger::Block; use packet::BlobRecycler; use result::{Error, Result}; use service::Service; use signature::KeyPair; use std::collections::VecDeque; +use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; @@ -29,30 +31,21 @@ pub struct WriteStage { impl WriteStage { /// Process any Entry items that have been published by the RecordStage. /// continuosly broadcast blobs of entries out - pub fn write_and_send_entries( + pub fn write_and_send_entries( crdt: &Arc>, - bank: &Arc, - ledger_writer: &mut LedgerWriter, + entry_writer: &mut EntryWriter, blob_sender: &BlobSender, blob_recycler: &BlobRecycler, entry_receiver: &Receiver>, ) -> Result<()> { let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; - let votes = entries_to_votes(&entries); crdt.write().unwrap().insert_votes(&votes); - for entry in entries.clone() { - ledger_writer.write_entry(&entry)?; - if !entry.has_more { - bank.register_entry_id(&entry.id); - } - } - //TODO(anatoly): real stake based voting needs to change this //leader simply votes if the current set of validators have voted //on a valid last id - + entry_writer.write_and_register_entries(&entries)?; trace!("New blobs? {}", entries.len()); let mut blobs = VecDeque::new(); entries.to_blobs(blob_recycler, &mut blobs); @@ -67,12 +60,12 @@ impl WriteStage { } /// Create a new WriteStage for writing and broadcasting entries. - pub fn new( + pub fn new( keypair: KeyPair, bank: Arc, crdt: Arc>, blob_recycler: BlobRecycler, - ledger_path: &str, + writer: W, entry_receiver: Receiver>, ) -> (Self, BlobReceiver) { let (vote_blob_sender, vote_blob_receiver) = channel(); @@ -84,18 +77,16 @@ impl WriteStage { vote_blob_receiver, ); let (blob_sender, blob_receiver) = channel(); - let mut ledger_writer = LedgerWriter::new(ledger_path).unwrap(); - let thread_hdl = Builder::new() .name("solana-writer".to_string()) .spawn(move || { + let mut entry_writer = EntryWriter::new(&bank, writer); let mut last_vote = 0; let debug_id = crdt.read().unwrap().debug_id(); loop { if let Err(e) = Self::write_and_send_entries( &crdt, - &bank, - &mut ledger_writer, + &mut entry_writer, &blob_sender, &blob_recycler, &entry_receiver, diff --git a/tests/multinode.rs b/tests/multinode.rs index 1559eb22ad..ea5d792ff8 100755 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -4,9 +4,10 @@ extern crate bincode; extern crate serde_json; extern crate solana; -use solana::crdt::{Crdt, NodeInfo, TestNode}; -use solana::fullnode::FullNode; -use solana::ledger::{copy_ledger, LedgerWriter}; +use solana::crdt::TestNode; +use solana::crdt::{Crdt, NodeInfo}; +use solana::entry_writer::EntryWriter; +use solana::fullnode::{FullNode, LedgerFile}; use solana::logger; use solana::mint::Mint; use solana::ncp::Ncp; @@ -17,7 +18,7 @@ use solana::thin_client::ThinClient; use solana::timing::duration_as_s; use std::cmp::max; use std::env; -use std::fs::remove_dir_all; +use std::fs::File; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; @@ -72,27 +73,16 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { rv } -fn tmp_ledger_path(name: &str) -> String { - let keypair = KeyPair::new(); - - format!("farf/{}-{}", name, keypair.pubkey()) -} - -fn genesis(name: &str, num: i64) -> (Mint, String) { +fn genesis(num: i64) -> (Mint, String) { let mint = Mint::new(num); + let path = format!( + "target/test_multi_node_dynamic_network-{}.log", + mint.pubkey() + ); + let mut writer = File::create(path.clone()).unwrap(); - let path = tmp_ledger_path(name); - let mut writer = LedgerWriter::new(&path).unwrap(); - - writer.write_entries(mint.create_entries()).unwrap(); - - (mint, path) -} - -fn tmp_copy_ledger(from: &str, name: &str) -> String { - let to = tmp_ledger_path(name); - copy_ledger(from, &to).unwrap(); - to + EntryWriter::write_entries(&mut writer, mint.create_entries()).unwrap(); + (mint, path.to_string()) } #[test] @@ -105,12 +95,15 @@ fn test_multi_node_validator_catchup_from_zero() { let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.data.clone(); let bob_pubkey = KeyPair::new().pubkey(); - let mut ledger_paths = Vec::new(); - let (alice, leader_ledger_path) = genesis("multi_node_validator_catchup_from_zero", 10_000); - ledger_paths.push(leader_ledger_path.clone()); - - let server = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None); + let (alice, ledger_path) = genesis(10_000); + let server = FullNode::new( + leader, + true, + LedgerFile::Path(ledger_path.clone()), + leader_keypair, + None, + ); // Send leader some tokens to vote let leader_balance = @@ -121,16 +114,10 @@ fn test_multi_node_validator_catchup_from_zero() { for _ in 0..N { let keypair = KeyPair::new(); let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); - let ledger_path = tmp_copy_ledger( - &leader_ledger_path, - "multi_node_validator_catchup_from_zero_validator", - ); - ledger_paths.push(ledger_path.clone()); - let mut val = FullNode::new( validator, false, - &ledger_path, + LedgerFile::Path(ledger_path.clone()), keypair, Some(leader_data.contact_info.ncp), ); @@ -161,15 +148,10 @@ fn test_multi_node_validator_catchup_from_zero() { // start up another validator, converge and then check everyone's balances let keypair = KeyPair::new(); let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); - let ledger_path = tmp_copy_ledger( - &leader_ledger_path, - "multi_node_validator_catchup_from_zero", - ); - ledger_paths.push(ledger_path.clone()); let val = FullNode::new( validator, false, - &ledger_path, + LedgerFile::Path(ledger_path.clone()), keypair, Some(leader_data.contact_info.ncp), ); @@ -209,9 +191,6 @@ fn test_multi_node_validator_catchup_from_zero() { for node in nodes { node.close().unwrap(); } - for path in ledger_paths { - remove_dir_all(path).unwrap(); - } } #[test] @@ -225,11 +204,14 @@ fn test_multi_node_basic() { let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.data.clone(); let bob_pubkey = KeyPair::new().pubkey(); - let mut ledger_paths = Vec::new(); - - let (alice, leader_ledger_path) = genesis("multi_node_basic", 10_000); - ledger_paths.push(leader_ledger_path.clone()); - let server = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None); + let (alice, ledger_path) = genesis(10_000); + let server = FullNode::new( + leader, + true, + LedgerFile::Path(ledger_path.clone()), + leader_keypair, + None, + ); // Send leader some tokens to vote let leader_balance = @@ -240,12 +222,10 @@ fn test_multi_node_basic() { for _ in 0..N { let keypair = KeyPair::new(); let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); - let ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_basic"); - ledger_paths.push(ledger_path.clone()); let val = FullNode::new( validator, false, - &ledger_path, + LedgerFile::Path(ledger_path.clone()), keypair, Some(leader_data.contact_info.ncp), ); @@ -274,9 +254,7 @@ fn test_multi_node_basic() { for node in nodes { node.close().unwrap(); } - for path in ledger_paths { - remove_dir_all(path).unwrap(); - } + std::fs::remove_file(ledger_path).unwrap(); } #[test] @@ -285,12 +263,15 @@ fn test_boot_validator_from_file() { let leader_keypair = KeyPair::new(); let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let bob_pubkey = KeyPair::new().pubkey(); - let (alice, leader_ledger_path) = genesis("boot_validator_from_file", 100_000); - let mut ledger_paths = Vec::new(); - ledger_paths.push(leader_ledger_path.clone()); - + let (alice, ledger_path) = genesis(100_000); let leader_data = leader.data.clone(); - let leader_fullnode = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None); + let leader_fullnode = FullNode::new( + leader, + true, + LedgerFile::Path(ledger_path.clone()), + leader_keypair, + None, + ); let leader_balance = send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap(); assert_eq!(leader_balance, 500); @@ -301,12 +282,10 @@ fn test_boot_validator_from_file() { let keypair = KeyPair::new(); let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.data.clone(); - let ledger_path = tmp_copy_ledger(&leader_ledger_path, "boot_validator_from_file"); - ledger_paths.push(ledger_path.clone()); let val_fullnode = FullNode::new( validator, false, - &ledger_path, + LedgerFile::Path(ledger_path.clone()), keypair, Some(leader_data.contact_info.ncp), ); @@ -316,16 +295,20 @@ fn test_boot_validator_from_file() { val_fullnode.close().unwrap(); leader_fullnode.close().unwrap(); - for path in ledger_paths { - remove_dir_all(path).unwrap(); - } + std::fs::remove_file(ledger_path).unwrap(); } fn create_leader(ledger_path: &str) -> (NodeInfo, FullNode) { let leader_keypair = KeyPair::new(); let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.data.clone(); - let leader_fullnode = FullNode::new(leader, true, &ledger_path, leader_keypair, None); + let leader_fullnode = FullNode::new( + leader, + true, + LedgerFile::Path(ledger_path.to_string()), + leader_keypair, + None, + ); (leader_data, leader_fullnode) } @@ -336,7 +319,7 @@ fn test_leader_restart_validator_start_from_old_ledger() { // ledger (currently up to WINDOW_SIZE entries) logger::setup(); - let (alice, ledger_path) = genesis("leader_restart_validator_start_from_old_ledger", 100_000); + let (alice, ledger_path) = genesis(100_000); let bob_pubkey = KeyPair::new().pubkey(); let (leader_data, leader_fullnode) = create_leader(&ledger_path); @@ -347,10 +330,11 @@ fn test_leader_restart_validator_start_from_old_ledger() { assert_eq!(leader_balance, 500); // create a "stale" ledger by copying current ledger - let stale_ledger_path = tmp_copy_ledger( - &ledger_path, - "leader_restart_validator_start_from_old_ledger", - ); + let mut stale_ledger_path = ledger_path.clone(); + stale_ledger_path.insert_str(ledger_path.rfind("/").unwrap() + 1, "stale_"); + + std::fs::copy(&ledger_path, &stale_ledger_path) + .expect(format!("copy {} to {}", &ledger_path, &stale_ledger_path,).as_str()); // restart the leader leader_fullnode.close().unwrap(); @@ -369,11 +353,10 @@ fn test_leader_restart_validator_start_from_old_ledger() { let keypair = KeyPair::new(); let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.data.clone(); - let val_fullnode = FullNode::new( validator, false, - &stale_ledger_path, + LedgerFile::Path(stale_ledger_path.clone()), keypair, Some(leader_data.contact_info.ncp), ); @@ -399,8 +382,8 @@ fn test_leader_restart_validator_start_from_old_ledger() { val_fullnode.close().unwrap(); leader_fullnode.close().unwrap(); - remove_dir_all(ledger_path).unwrap(); - remove_dir_all(stale_ledger_path).unwrap(); + std::fs::remove_file(ledger_path).unwrap(); + std::fs::remove_file(stale_ledger_path).unwrap(); } //TODO: this test will run a long time so it's disabled for CI @@ -426,16 +409,16 @@ fn test_multi_node_dynamic_network() { let leader_pubkey = leader_keypair.pubkey().clone(); let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let bob_pubkey = KeyPair::new().pubkey(); - let (alice, leader_ledger_path) = genesis("multi_node_dynamic_network", 10_000_000); - - let mut ledger_paths = Vec::new(); - ledger_paths.push(leader_ledger_path.clone()); - + let (alice, ledger_path) = genesis(10_000_000); let alice_arc = Arc::new(RwLock::new(alice)); let leader_data = leader.data.clone(); - - let server = - FullNode::new_without_sigverify(leader, true, &leader_ledger_path, leader_keypair, None); + let server = FullNode::new_without_sigverify( + leader, + true, + LedgerFile::Path(ledger_path.clone()), + leader_keypair, + None, + ); // Send leader some tokens to vote let leader_balance = send_tx_and_retry_get_balance( @@ -495,8 +478,7 @@ fn test_multi_node_dynamic_network() { .into_iter() .map(|keypair| { let leader_data = leader_data.clone(); - let ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_dynamic_network"); - ledger_paths.push(ledger_path.clone()); + let ledger_path = ledger_path.clone(); Builder::new() .name("validator-launch-thread".to_string()) .spawn(move || { @@ -506,7 +488,7 @@ fn test_multi_node_dynamic_network() { let val = FullNode::new_without_sigverify( validator, false, - &ledger_path, + LedgerFile::Path(ledger_path.clone()), keypair, Some(leader_data.contact_info.ncp), ); @@ -615,9 +597,7 @@ fn test_multi_node_dynamic_network() { } server.join().unwrap(); - for path in ledger_paths { - remove_dir_all(path).unwrap(); - } + std::fs::remove_file(ledger_path).unwrap(); } fn mk_client(leader: &NodeInfo) -> ThinClient {