diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 9ded7d2f9e..0d9a7ad799 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -11,11 +11,11 @@ use solana::crdt::{ReplicatedData, TestNode}; use solana::fullnode::start; use std::env; use std::fs::File; -use std::io::{stdin, stdout, Write}; +use std::io::stdin; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::process::exit; use std::sync::atomic::AtomicBool; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; //use std::time::Duration; fn print_usage(program: &str, opts: Options) { @@ -79,24 +79,17 @@ fn main() -> () { exit(1); } } - let node = TestNode::new_with_bind_addr(repl_data, bind_addr); + let mut node = TestNode::new_with_bind_addr(repl_data, bind_addr); let exit = Arc::new(AtomicBool::new(false)); - let mut reader = stdin().lock(); let threads = if matches.opt_present("t") { let testnet_address_string = matches.opt_str("t").unwrap(); let testnet_addr = testnet_address_string.parse().unwrap(); - start(node, false, &mut reader, Some(testnet_addr), None, exit) + start(node, false, None, Some(testnet_addr), None, exit) } else { - repl_data.current_leader_id = repl_data.id.clone(); + node.data.current_leader_id = node.data.id.clone(); - let outfile: Write + Send + 'static = if matches.opt_present("o") { - let path = matches.opt_str("o").unwrap(); - let f = File::create(&path).expect(&format!("unable to open output file \"{}\"", path)); - Mutex::new(f) - } else { - stdout().lock() - }; - start(node, true, &mut reader, None, Some(&outfile), exit) + let outfile = matches.opt_str("o"); + start(node, true, None, None, outfile, exit) }; for t in threads { t.join().expect("join"); diff --git a/src/fullnode.rs b/src/fullnode.rs index 2e2b09d016..425cfea003 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -2,32 +2,42 @@ use bank::Bank; use crdt::{ReplicatedData, TestNode}; use entry_writer; use server::Server; -use std::io::{BufRead, Write}; +use std::fs::File; +use std::io::{stdin, stdout, BufReader}; use std::net::SocketAddr; use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::thread::JoinHandle; //use std::time::Duration; -pub fn start<'a, R: BufRead, W: Write + Send + 'static>( +pub fn start( mut node: TestNode, leader: bool, - infile: &'a mut R, + infile: Option, network_entry_for_validator: Option, - outfile_for_leader: Option, + outfile_for_leader: Option, exit: Arc, ) -> Vec> { - eprintln!("creating bank..."); - let entries = entry_writer::read_entries(infile).map(|e| e.expect("failed to parse entry")); + info!("creating bank..."); let bank = Bank::default(); + let entry_height = if let Some(path) = infile { + let f = File::open(path).unwrap(); + let mut r = BufReader::new(f); + let entries = entry_writer::read_entries(&mut r).map(|e| e.expect("failed to parse entry")); + info!("processing ledger..."); + bank.process_ledger(entries).expect("process_ledger") + } else { + let mut r = BufReader::new(stdin()); + let entries = entry_writer::read_entries(&mut r).map(|e| e.expect("failed to parse entry")); + info!("processing ledger..."); + bank.process_ledger(entries).expect("process_ledger") + }; // entry_height is the network-wide agreed height of the ledger. // initialize it from the input ledger - eprintln!("processing ledger..."); - let entry_height = bank.process_ledger(entries).expect("process_ledger"); - eprintln!("processed {} ledger...", entry_height); + info!("processed {} ledger...", entry_height); - eprintln!("creating networking stack..."); + info!("creating networking stack..."); let local_gossip_addr = node.sockets.gossip.local_addr().unwrap(); let local_requests_addr = node.sockets.requests.local_addr().unwrap(); @@ -58,22 +68,37 @@ pub fn start<'a, R: BufRead, W: Write + Send + 'static>( s.thread_hdls } else { node.data.current_leader_id = node.data.id.clone(); - let f = outfile_for_leader.expect("outfile is needed for leader"); - let outfile: Box = Box::new(f); - let server = Server::new_leader( - bank, - entry_height, - //Some(Duration::from_millis(1000)), - None, - node.data.clone(), - node.sockets.requests, - node.sockets.transaction, - node.sockets.broadcast, - node.sockets.respond, - node.sockets.gossip, - exit.clone(), - outfile, - ); + let server = if let Some(file) = outfile_for_leader { + Server::new_leader( + bank, + entry_height, + //Some(Duration::from_millis(1000)), + None, + node.data.clone(), + node.sockets.requests, + node.sockets.transaction, + node.sockets.broadcast, + node.sockets.respond, + node.sockets.gossip, + exit.clone(), + File::create(file).expect("opening ledger file"), + ) + } else { + Server::new_leader( + bank, + entry_height, + //Some(Duration::from_millis(1000)), + None, + node.data.clone(), + node.sockets.requests, + node.sockets.transaction, + node.sockets.broadcast, + node.sockets.respond, + node.sockets.gossip, + exit.clone(), + stdout(), + ) + }; info!( "leader ready... local request address: {} (advertising {})", local_requests_addr, node.data.requests_addr diff --git a/src/streamer.rs b/src/streamer.rs index c76b6b1348..739489e91e 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -228,6 +228,7 @@ fn recv_window( let leader_id = crdt.read() .expect("'crdt' read lock in fn recv_window") .leader_data() + .expect("leader not ready") .id; while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq)