This commit is contained in:
Anatoly Yakovenko
2018-07-02 14:46:23 -07:00
committed by Greg Fitzgerald
parent 430d9d9314
commit 2952027d04
3 changed files with 59 additions and 40 deletions

View File

@ -11,11 +11,11 @@ use solana::crdt::{ReplicatedData, TestNode};
use solana::fullnode::start; use solana::fullnode::start;
use std::env; use std::env;
use std::fs::File; use std::fs::File;
use std::io::{stdin, stdout, Write}; use std::io::stdin;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::process::exit; use std::process::exit;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex}; use std::sync::Arc;
//use std::time::Duration; //use std::time::Duration;
fn print_usage(program: &str, opts: Options) { fn print_usage(program: &str, opts: Options) {
@ -79,24 +79,17 @@ fn main() -> () {
exit(1); exit(1);
} }
} }
let node = TestNode::new_with_bind_addr(repl_data, bind_addr); let mut node = TestNode::new_with_bind_addr(repl_data, bind_addr);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let mut reader = stdin().lock();
let threads = if matches.opt_present("t") { let threads = if matches.opt_present("t") {
let testnet_address_string = matches.opt_str("t").unwrap(); let testnet_address_string = matches.opt_str("t").unwrap();
let testnet_addr = testnet_address_string.parse().unwrap(); let testnet_addr = testnet_address_string.parse().unwrap();
start(node, false, &mut reader, Some(testnet_addr), None, exit) start(node, false, None, Some(testnet_addr), None, exit)
} else { } else {
repl_data.current_leader_id = repl_data.id.clone(); node.data.current_leader_id = node.data.id.clone();
let outfile: Write + Send + 'static = if matches.opt_present("o") { let outfile = matches.opt_str("o");
let path = matches.opt_str("o").unwrap(); start(node, true, None, None, outfile, exit)
let f = File::create(&path).expect(&format!("unable to open output file \"{}\"", path));
Mutex::new(f)
} else {
stdout().lock()
};
start(node, true, &mut reader, None, Some(&outfile), exit)
}; };
for t in threads { for t in threads {
t.join().expect("join"); t.join().expect("join");

View File

@ -2,32 +2,42 @@ use bank::Bank;
use crdt::{ReplicatedData, TestNode}; use crdt::{ReplicatedData, TestNode};
use entry_writer; use entry_writer;
use server::Server; use server::Server;
use std::io::{BufRead, Write}; use std::fs::File;
use std::io::{stdin, stdout, BufReader};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use std::thread::JoinHandle; use std::thread::JoinHandle;
//use std::time::Duration; //use std::time::Duration;
pub fn start<'a, R: BufRead, W: Write + Send + 'static>( pub fn start(
mut node: TestNode, mut node: TestNode,
leader: bool, leader: bool,
infile: &'a mut R, infile: Option<String>,
network_entry_for_validator: Option<SocketAddr>, network_entry_for_validator: Option<SocketAddr>,
outfile_for_leader: Option<W>, outfile_for_leader: Option<String>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Vec<JoinHandle<()>> { ) -> Vec<JoinHandle<()>> {
eprintln!("creating bank..."); info!("creating bank...");
let entries = entry_writer::read_entries(infile).map(|e| e.expect("failed to parse entry"));
let bank = Bank::default(); let bank = Bank::default();
let entry_height = if let Some(path) = infile {
let f = File::open(path).unwrap();
let mut r = BufReader::new(f);
let entries = entry_writer::read_entries(&mut r).map(|e| e.expect("failed to parse entry"));
info!("processing ledger...");
bank.process_ledger(entries).expect("process_ledger")
} else {
let mut r = BufReader::new(stdin());
let entries = entry_writer::read_entries(&mut r).map(|e| e.expect("failed to parse entry"));
info!("processing ledger...");
bank.process_ledger(entries).expect("process_ledger")
};
// entry_height is the network-wide agreed height of the ledger. // entry_height is the network-wide agreed height of the ledger.
// initialize it from the input ledger // initialize it from the input ledger
eprintln!("processing ledger..."); info!("processed {} ledger...", entry_height);
let entry_height = bank.process_ledger(entries).expect("process_ledger");
eprintln!("processed {} ledger...", entry_height);
eprintln!("creating networking stack..."); info!("creating networking stack...");
let local_gossip_addr = node.sockets.gossip.local_addr().unwrap(); let local_gossip_addr = node.sockets.gossip.local_addr().unwrap();
let local_requests_addr = node.sockets.requests.local_addr().unwrap(); let local_requests_addr = node.sockets.requests.local_addr().unwrap();
@ -58,22 +68,37 @@ pub fn start<'a, R: BufRead, W: Write + Send + 'static>(
s.thread_hdls s.thread_hdls
} else { } else {
node.data.current_leader_id = node.data.id.clone(); node.data.current_leader_id = node.data.id.clone();
let f = outfile_for_leader.expect("outfile is needed for leader"); let server = if let Some(file) = outfile_for_leader {
let outfile: Box<Write + Send + 'static> = Box::new(f); Server::new_leader(
let server = Server::new_leader( bank,
bank, entry_height,
entry_height, //Some(Duration::from_millis(1000)),
//Some(Duration::from_millis(1000)), None,
None, node.data.clone(),
node.data.clone(), node.sockets.requests,
node.sockets.requests, node.sockets.transaction,
node.sockets.transaction, node.sockets.broadcast,
node.sockets.broadcast, node.sockets.respond,
node.sockets.respond, node.sockets.gossip,
node.sockets.gossip, exit.clone(),
exit.clone(), File::create(file).expect("opening ledger file"),
outfile, )
); } else {
Server::new_leader(
bank,
entry_height,
//Some(Duration::from_millis(1000)),
None,
node.data.clone(),
node.sockets.requests,
node.sockets.transaction,
node.sockets.broadcast,
node.sockets.respond,
node.sockets.gossip,
exit.clone(),
stdout(),
)
};
info!( info!(
"leader ready... local request address: {} (advertising {})", "leader ready... local request address: {} (advertising {})",
local_requests_addr, node.data.requests_addr local_requests_addr, node.data.requests_addr

View File

@ -228,6 +228,7 @@ fn recv_window(
let leader_id = crdt.read() let leader_id = crdt.read()
.expect("'crdt' read lock in fn recv_window") .expect("'crdt' read lock in fn recv_window")
.leader_data() .leader_data()
.expect("leader not ready")
.id; .id;
while let Ok(mut nq) = r.try_recv() { while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq) dq.append(&mut nq)