From fa247196c062513e2bd468d84c390e4998048874 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Mon, 2 Jul 2018 11:20:35 -0700 Subject: [PATCH] fullnode lib --- src/bin/fullnode.rs | 111 ++++++-------------------------------------- src/fullnode.rs | 83 +++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 3 files changed, 97 insertions(+), 98 deletions(-) create mode 100644 src/fullnode.rs diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 1a886eb3ab..8ccb0cf621 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -7,17 +7,15 @@ extern crate solana; use atty::{is, Stream}; use getopts::Options; -use solana::bank::Bank; -use solana::crdt::ReplicatedData; -use solana::entry_writer; -use solana::server::Server; +use solana::crdt::{ReplicatedData, TestNode}; +use solana::fullnode::start; use std::env; use std::fs::File; -use std::io::{stdin, stdout, BufRead, Write}; -use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; +use std::io::{stdin, stdout, Write}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::process::exit; use std::sync::atomic::AtomicBool; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; //use std::time::Duration; fn print_usage(program: &str, opts: Options) { @@ -65,27 +63,6 @@ fn main() { exit(1); } - eprintln!("Initializing..."); - let stdin = stdin(); - let entries = stdin.lock().lines().map(|line| { - entry_writer::read_entry(line.unwrap()).unwrap_or_else(|e| { - eprintln!("failed to parse json: {}", e); - exit(1); - }) - }); - eprintln!("done parsing..."); - - eprintln!("creating bank..."); - let bank = Bank::default(); - - // entry_height is the network-wide agreed height of the ledger. - // initialize it from the input ledger - eprintln!("processing ledger..."); - let entry_height = bank.process_ledger(entries).expect("process_ledger"); - eprintln!("processed {} ledger...", entry_height); - - eprintln!("creating networking stack..."); - let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); let mut repl_data = ReplicatedData::new_leader(&bind_addr); if matches.opt_present("l") { @@ -102,87 +79,25 @@ fn main() { exit(1); } } - - let mut local_gossip_addr = bind_addr.clone(); - local_gossip_addr.set_port(repl_data.gossip_addr.port()); - - let mut local_replicate_addr = bind_addr.clone(); - local_replicate_addr.set_port(repl_data.replicate_addr.port()); - - let mut local_requests_addr = bind_addr.clone(); - local_requests_addr.set_port(repl_data.requests_addr.port()); - - let mut local_transactions_addr = bind_addr.clone(); - local_transactions_addr.set_port(repl_data.transactions_addr.port()); - - let mut local_repair_addr = bind_addr.clone(); - local_repair_addr.set_port(repl_data.repair_addr.port()); - + let node = TestNode::new_with_bind_addr(repl_data, bind_addr); let exit = Arc::new(AtomicBool::new(false)); + let mut reader = stdin().lock(); let threads = if matches.opt_present("t") { let testnet_address_string = matches.opt_str("t").unwrap(); - eprintln!( - "starting validator... {} (advertising {}) connecting to {}", - local_requests_addr, repl_data.requests_addr, testnet_address_string - ); let testnet_addr = testnet_address_string.parse().unwrap(); - let newtwork_entry_point = ReplicatedData::new_entry_point(testnet_addr); - let s = Server::new_validator( - bank, - entry_height, - repl_data.clone(), - UdpSocket::bind(local_requests_addr).unwrap(), - UdpSocket::bind("0.0.0.0:0").unwrap(), - UdpSocket::bind(local_replicate_addr).unwrap(), - UdpSocket::bind(local_gossip_addr).unwrap(), - UdpSocket::bind(local_repair_addr).unwrap(), - newtwork_entry_point, - exit.clone(), - ); - s.thread_hdls + start(node, false, &mut reader, Some(testnet_addr), None, exit) } else { - eprintln!( - "starting leader... {} (advertising {})", - local_requests_addr, repl_data.requests_addr - ); repl_data.current_leader_id = repl_data.id.clone(); - let outfile: Box = if matches.opt_present("o") { + let outfile: Write + Send + 'static = if matches.opt_present("o") { let path = matches.opt_str("o").unwrap(); - Box::new( - File::create(&path).expect(&format!("unable to open output file \"{}\"", path)), - ) + let f = File::create(&path).expect(&format!("unable to open output file \"{}\"", path)); + Mutex::new(f) } else { - Box::new(stdout()) + stdout().lock() }; - - let requests_socket = UdpSocket::bind(local_requests_addr).unwrap(); - // Responses are sent from the same Udp port as requests are received - // from, in hopes that a NAT sitting in the middle will route the - // response Udp packet correctly back to the requester. - let respond_socket = requests_socket.try_clone().unwrap(); - - let server = Server::new_leader( - bank, - entry_height, - //Some(Duration::from_millis(1000)), - None, - repl_data.clone(), - requests_socket, - UdpSocket::bind(local_transactions_addr).unwrap(), - UdpSocket::bind("0.0.0.0:0").unwrap(), - respond_socket, - UdpSocket::bind(local_gossip_addr).unwrap(), - exit.clone(), - outfile, - ); - server.thread_hdls + start(node, true, &mut reader, None, Some(&outfile), exit) }; - eprintln!( - "Ready. Listening on {} (advertising {})", - local_transactions_addr, repl_data.transactions_addr - ); - for t in threads { t.join().expect("join"); } diff --git a/src/fullnode.rs b/src/fullnode.rs new file mode 100644 index 0000000000..2e2b09d016 --- /dev/null +++ b/src/fullnode.rs @@ -0,0 +1,83 @@ +use bank::Bank; +use crdt::{ReplicatedData, TestNode}; +use entry_writer; +use server::Server; +use std::io::{BufRead, Write}; +use std::net::SocketAddr; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use std::thread::JoinHandle; +//use std::time::Duration; + +pub fn start<'a, R: BufRead, W: Write + Send + 'static>( + mut node: TestNode, + leader: bool, + infile: &'a mut R, + network_entry_for_validator: Option, + outfile_for_leader: Option, + exit: Arc, +) -> Vec> { + eprintln!("creating bank..."); + let entries = entry_writer::read_entries(infile).map(|e| e.expect("failed to parse entry")); + let bank = Bank::default(); + + // entry_height is the network-wide agreed height of the ledger. + // initialize it from the input ledger + eprintln!("processing ledger..."); + let entry_height = bank.process_ledger(entries).expect("process_ledger"); + eprintln!("processed {} ledger...", entry_height); + + eprintln!("creating networking stack..."); + + let local_gossip_addr = node.sockets.gossip.local_addr().unwrap(); + let local_requests_addr = node.sockets.requests.local_addr().unwrap(); + info!( + "starting... local gossip address: {} (advertising {})", + local_gossip_addr, node.data.gossip_addr + ); + if !leader { + let testnet_addr = network_entry_for_validator.expect("validator requires entry"); + + let network_entry_point = ReplicatedData::new_entry_point(testnet_addr); + let s = Server::new_validator( + bank, + entry_height, + node.data.clone(), + node.sockets.requests, + node.sockets.respond, + node.sockets.replicate, + node.sockets.gossip, + node.sockets.repair, + network_entry_point, + exit.clone(), + ); + info!( + "validator ready... local request address: {} (advertising {}) connected to: {}", + local_requests_addr, node.data.requests_addr, testnet_addr + ); + s.thread_hdls + } else { + node.data.current_leader_id = node.data.id.clone(); + let f = outfile_for_leader.expect("outfile is needed for leader"); + let outfile: Box = Box::new(f); + let server = Server::new_leader( + bank, + entry_height, + //Some(Duration::from_millis(1000)), + None, + node.data.clone(), + node.sockets.requests, + node.sockets.transaction, + node.sockets.broadcast, + node.sockets.respond, + node.sockets.gossip, + exit.clone(), + outfile, + ); + info!( + "leader ready... local request address: {} (advertising {})", + local_requests_addr, node.data.requests_addr + ); + server.thread_hdls + } +} diff --git a/src/lib.rs b/src/lib.rs index 956f056af9..06a6fcf55d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,7 @@ pub mod entry_writer; #[cfg(feature = "erasure")] pub mod erasure; pub mod fetch_stage; +pub mod fullnode; pub mod hash; pub mod ledger; pub mod logger;