From b8fe5ae07691af29f967a89f27e40ca5ab32f7da Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Mon, 2 Jul 2018 15:24:40 -0700 Subject: [PATCH] rename server to fullnode --- src/bin/fullnode.rs | 10 +- src/crdt.rs | 8 +- src/drone.rs | 4 +- src/fullnode.rs | 384 +++++++++++++++++++++++++++++++++----------- src/lib.rs | 1 - src/server.rs | 209 ------------------------ src/thin_client.rs | 8 +- tests/multinode.rs | 8 +- 8 files changed, 313 insertions(+), 319 deletions(-) delete mode 100644 src/server.rs diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 159976ee00..9212814174 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -8,7 +8,7 @@ extern crate solana; use atty::{is, Stream}; use getopts::Options; use solana::crdt::{ReplicatedData, TestNode}; -use solana::fullnode::start; +use solana::fullnode::FullNode; use std::env; use std::fs::File; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -80,17 +80,17 @@ fn main() -> () { } let mut node = TestNode::new_with_bind_addr(repl_data, bind_addr); let exit = Arc::new(AtomicBool::new(false)); - let threads = if matches.opt_present("t") { + let fullnode = if matches.opt_present("t") { let testnet_address_string = matches.opt_str("t").unwrap(); let testnet_addr = testnet_address_string.parse().unwrap(); - start(node, false, None, Some(testnet_addr), None, exit) + FullNode::new(node, false, None, Some(testnet_addr), None, exit) } else { node.data.current_leader_id = node.data.id.clone(); let outfile = matches.opt_str("o"); - start(node, true, None, None, outfile, exit) + FullNode::new(node, true, None, None, outfile, exit) }; - for t in threads { + for t in fullnode.thread_hdls { t.join().expect("join"); } } diff --git a/src/crdt.rs b/src/crdt.rs index cbf9d8959f..890db22c03 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -944,15 +944,15 @@ impl TestNode { let transaction = UdpSocket::bind(local_transactions_addr).unwrap(); let gossip = UdpSocket::bind(local_gossip_addr).unwrap(); let replicate = UdpSocket::bind(local_replicate_addr).unwrap(); - let requests = UdpSocket::bind(local_requests_addr).unwrap(); - let repair = UdpSocket::bind(local_repair_addr).unwrap(); - let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap(); + let requests = UdpSocket::bind(local_requests_addr).unwrap(); // Responses are sent from the same Udp port as requests are received // from, in hopes that a NAT sitting in the middle will route the // response Udp packet correctly back to the requester. - let respond = requests_socket.try_clone().unwrap(); + let respond = requests.try_clone().unwrap(); + + let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); TestNode { diff --git a/src/drone.rs b/src/drone.rs index edf10392cf..98057d6612 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -133,9 +133,9 @@ mod tests { use bank::Bank; use crdt::{get_ip_addr, TestNode}; use drone::{Drone, DroneRequest, REQUEST_CAP, TIME_SLICE}; + use fullnode::FullNode; use logger; use mint::Mint; - use server::Server; use signature::{KeyPair, KeyPairUtil}; use std::io::sink; use std::net::{SocketAddr, UdpSocket}; @@ -246,7 +246,7 @@ mod tests { let carlos_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let server = Server::new_leader( + let server = FullNode::new_leader( bank, 0, Some(Duration::from_millis(30)), diff --git a/src/fullnode.rs b/src/fullnode.rs index 425cfea003..d0dc2718db 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -1,108 +1,312 @@ +//! The `fullnode` module hosts all the fullnode microservices. + use bank::Bank; -use crdt::{ReplicatedData, TestNode}; +use crdt::{Crdt, ReplicatedData, TestNode}; use entry_writer; -use server::Server; +use ncp::Ncp; +use packet::BlobRecycler; +use rpu::Rpu; use std::fs::File; +use std::io::Write; use std::io::{stdin, stdout, BufReader}; use std::net::SocketAddr; +use std::net::UdpSocket; use std::sync::atomic::AtomicBool; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::thread::JoinHandle; +use std::time::Duration; +use streamer; +use tpu::Tpu; +use tvu::Tvu; + //use std::time::Duration; +pub struct FullNode { + pub thread_hdls: Vec>, +} -pub fn start( - mut node: TestNode, - leader: bool, - infile: Option, - network_entry_for_validator: Option, - outfile_for_leader: Option, - exit: Arc, -) -> Vec> { - info!("creating bank..."); - let bank = Bank::default(); - let entry_height = if let Some(path) = infile { - let f = File::open(path).unwrap(); - let mut r = BufReader::new(f); - let entries = entry_writer::read_entries(&mut r).map(|e| e.expect("failed to parse entry")); - info!("processing ledger..."); - bank.process_ledger(entries).expect("process_ledger") - } else { - let mut r = BufReader::new(stdin()); - let entries = entry_writer::read_entries(&mut r).map(|e| e.expect("failed to parse entry")); - info!("processing ledger..."); - bank.process_ledger(entries).expect("process_ledger") - }; +impl FullNode { + pub fn new( + mut node: TestNode, + leader: bool, + infile: Option, + network_entry_for_validator: Option, + outfile_for_leader: Option, + exit: Arc, + ) -> FullNode { + info!("creating bank..."); + let bank = Bank::default(); + let entry_height = if let Some(path) = infile { + let f = File::open(path).unwrap(); + let mut r = BufReader::new(f); + let entries = + entry_writer::read_entries(&mut r).map(|e| e.expect("failed to parse entry")); + info!("processing ledger..."); + bank.process_ledger(entries).expect("process_ledger") + } else { + let mut r = BufReader::new(stdin()); + let entries = + entry_writer::read_entries(&mut r).map(|e| e.expect("failed to parse entry")); + info!("processing ledger..."); + bank.process_ledger(entries).expect("process_ledger") + }; - // entry_height is the network-wide agreed height of the ledger. - // initialize it from the input ledger - info!("processed {} ledger...", entry_height); + // entry_height is the network-wide agreed height of the ledger. + // initialize it from the input ledger + info!("processed {} ledger...", entry_height); - info!("creating networking stack..."); + info!("creating networking stack..."); - let local_gossip_addr = node.sockets.gossip.local_addr().unwrap(); - let local_requests_addr = node.sockets.requests.local_addr().unwrap(); - info!( - "starting... local gossip address: {} (advertising {})", - local_gossip_addr, node.data.gossip_addr - ); - if !leader { - let testnet_addr = network_entry_for_validator.expect("validator requires entry"); + let local_gossip_addr = node.sockets.gossip.local_addr().unwrap(); + let local_requests_addr = node.sockets.requests.local_addr().unwrap(); + info!( + "starting... local gossip address: {} (advertising {})", + local_gossip_addr, node.data.gossip_addr + ); + if !leader { + let testnet_addr = network_entry_for_validator.expect("validator requires entry"); - let network_entry_point = ReplicatedData::new_entry_point(testnet_addr); - let s = Server::new_validator( - bank, + let network_entry_point = ReplicatedData::new_entry_point(testnet_addr); + let server = FullNode::new_validator( + bank, + entry_height, + node.data.clone(), + node.sockets.requests, + node.sockets.respond, + node.sockets.replicate, + node.sockets.gossip, + node.sockets.repair, + network_entry_point, + exit.clone(), + ); + info!( + "validator ready... local request address: {} (advertising {}) connected to: {}", + local_requests_addr, node.data.requests_addr, testnet_addr + ); + server + } else { + node.data.current_leader_id = node.data.id.clone(); + let server = if let Some(file) = outfile_for_leader { + FullNode::new_leader( + bank, + entry_height, + //Some(Duration::from_millis(1000)), + None, + node.data.clone(), + node.sockets.requests, + node.sockets.transaction, + node.sockets.broadcast, + node.sockets.respond, + node.sockets.gossip, + exit.clone(), + File::create(file).expect("opening ledger file"), + ) + } else { + FullNode::new_leader( + bank, + entry_height, + //Some(Duration::from_millis(1000)), + None, + node.data.clone(), + node.sockets.requests, + node.sockets.transaction, + node.sockets.broadcast, + node.sockets.respond, + node.sockets.gossip, + exit.clone(), + stdout(), + ) + }; + info!( + "leader ready... local request address: {} (advertising {})", + local_requests_addr, node.data.requests_addr + ); + server + } + } + /// Create a server instance acting as a leader. + /// + /// ```text + /// .---------------------. + /// | Leader | + /// | | + /// .--------. | .-----. | + /// | |---->| | | + /// | Client | | | RPU | | + /// | |<----| | | + /// `----+---` | `-----` | + /// | | ^ | + /// | | | | + /// | | .--+---. | + /// | | | Bank | | + /// | | `------` | + /// | | ^ | + /// | | | | .------------. + /// | | .--+--. .-----. | | | + /// `-------->| TPU +-->| NCP +------>| Validators | + /// | `-----` `-----` | | | + /// | | `------------` + /// `---------------------` + /// ``` + pub fn new_leader( + bank: Bank, + entry_height: u64, + tick_duration: Option, + me: ReplicatedData, + requests_socket: UdpSocket, + transactions_socket: UdpSocket, + broadcast_socket: UdpSocket, + respond_socket: UdpSocket, + gossip_socket: UdpSocket, + exit: Arc, + writer: W, + ) -> Self { + let bank = Arc::new(bank); + let mut thread_hdls = vec![]; + let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); + thread_hdls.extend(rpu.thread_hdls); + + let blob_recycler = BlobRecycler::default(); + let tpu = Tpu::new( + bank.clone(), + tick_duration, + transactions_socket, + blob_recycler.clone(), + exit.clone(), + writer, + ); + thread_hdls.extend(tpu.thread_hdls); + + let crdt = Arc::new(RwLock::new(Crdt::new(me))); + let window = streamer::default_window(); + let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); + let ncp = Ncp::new( + crdt.clone(), + window.clone(), + gossip_socket, + gossip_send_socket, + exit.clone(), + ).expect("Ncp::new"); + thread_hdls.extend(ncp.thread_hdls); + + let t_broadcast = streamer::broadcaster( + broadcast_socket, + exit.clone(), + crdt, + window, entry_height, - node.data.clone(), - node.sockets.requests, - node.sockets.respond, - node.sockets.replicate, - node.sockets.gossip, - node.sockets.repair, - network_entry_point, + blob_recycler.clone(), + tpu.blob_receiver, + ); + thread_hdls.extend(vec![t_broadcast]); + + FullNode { thread_hdls } + } + + /// Create a server instance acting as a validator. + /// + /// ```text + /// .-------------------------------. + /// | Validator | + /// | | + /// .--------. | .-----. | + /// | |-------------->| | | + /// | Client | | | RPU | | + /// | |<--------------| | | + /// `--------` | `-----` | + /// | ^ | + /// | | | + /// | .--+---. | + /// | | Bank | | + /// | `------` | + /// | ^ | + /// .--------. | | | .------------. + /// | | | .--+--. | | | + /// | Leader |<------------->| TVU +<--------------->| | + /// | | | `-----` | | Validators | + /// | | | ^ | | | + /// | | | | | | | + /// | | | .--+--. | | | + /// | |<------------->| NCP +<--------------->| | + /// | | | `-----` | | | + /// `--------` | | `------------` + /// `-------------------------------` + /// ``` + pub fn new_validator( + bank: Bank, + entry_height: u64, + me: ReplicatedData, + requests_socket: UdpSocket, + respond_socket: UdpSocket, + replicate_socket: UdpSocket, + gossip_listen_socket: UdpSocket, + repair_socket: UdpSocket, + entry_point: ReplicatedData, + exit: Arc, + ) -> Self { + let bank = Arc::new(bank); + let mut thread_hdls = vec![]; + let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); + thread_hdls.extend(rpu.thread_hdls); + + let crdt = Arc::new(RwLock::new(Crdt::new(me))); + crdt.write() + .expect("'crdt' write lock before insert() in pub fn replicate") + .insert(&entry_point); + let window = streamer::default_window(); + let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); + let retransmit_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); + let ncp = Ncp::new( + crdt.clone(), + window.clone(), + gossip_listen_socket, + gossip_send_socket, + exit.clone(), + ).expect("Ncp::new"); + + let tvu = Tvu::new( + bank.clone(), + entry_height, + crdt.clone(), + window.clone(), + replicate_socket, + repair_socket, + retransmit_socket, exit.clone(), ); - info!( - "validator ready... local request address: {} (advertising {}) connected to: {}", - local_requests_addr, node.data.requests_addr, testnet_addr - ); - s.thread_hdls - } else { - node.data.current_leader_id = node.data.id.clone(); - let server = if let Some(file) = outfile_for_leader { - Server::new_leader( - bank, - entry_height, - //Some(Duration::from_millis(1000)), - None, - node.data.clone(), - node.sockets.requests, - node.sockets.transaction, - node.sockets.broadcast, - node.sockets.respond, - node.sockets.gossip, - exit.clone(), - File::create(file).expect("opening ledger file"), - ) - } else { - Server::new_leader( - bank, - entry_height, - //Some(Duration::from_millis(1000)), - None, - node.data.clone(), - node.sockets.requests, - node.sockets.transaction, - node.sockets.broadcast, - node.sockets.respond, - node.sockets.gossip, - exit.clone(), - stdout(), - ) - }; - info!( - "leader ready... local request address: {} (advertising {})", - local_requests_addr, node.data.requests_addr - ); - server.thread_hdls + thread_hdls.extend(tvu.thread_hdls); + thread_hdls.extend(ncp.thread_hdls); + FullNode { thread_hdls } + } +} +#[cfg(test)] +mod tests { + use bank::Bank; + use crdt::TestNode; + use fullnode::FullNode; + use mint::Mint; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + #[test] + fn validator_exit() { + let tn = TestNode::new(); + let alice = Mint::new(10_000); + let bank = Bank::new(&alice); + let exit = Arc::new(AtomicBool::new(false)); + let v = FullNode::new_validator( + bank, + 0, + tn.data.clone(), + tn.sockets.requests, + tn.sockets.respond, + tn.sockets.replicate, + tn.sockets.gossip, + tn.sockets.repair, + tn.data, + exit.clone(), + ); + exit.store(true, Ordering::Relaxed); + for t in v.thread_hdls { + t.join().unwrap(); + } } } diff --git a/src/lib.rs b/src/lib.rs index 06a6fcf55d..b7206934aa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,7 +38,6 @@ pub mod request_processor; pub mod request_stage; pub mod result; pub mod rpu; -pub mod server; pub mod signature; pub mod sigverify; pub mod sigverify_stage; diff --git a/src/server.rs b/src/server.rs deleted file mode 100644 index 612f4be561..0000000000 --- a/src/server.rs +++ /dev/null @@ -1,209 +0,0 @@ -//! The `server` module hosts all the server microservices. - -use bank::Bank; -use crdt::{Crdt, ReplicatedData}; -use ncp::Ncp; -use packet::BlobRecycler; -use rpu::Rpu; -use std::io::Write; -use std::net::UdpSocket; -use std::sync::atomic::AtomicBool; -use std::sync::{Arc, RwLock}; -use std::thread::JoinHandle; -use std::time::Duration; -use streamer; -use tpu::Tpu; -use tvu::Tvu; - -pub struct Server { - pub thread_hdls: Vec>, -} - -impl Server { - /// Create a server instance acting as a leader. - /// - /// ```text - /// .---------------------. - /// | Leader | - /// | | - /// .--------. | .-----. | - /// | |---->| | | - /// | Client | | | RPU | | - /// | |<----| | | - /// `----+---` | `-----` | - /// | | ^ | - /// | | | | - /// | | .--+---. | - /// | | | Bank | | - /// | | `------` | - /// | | ^ | - /// | | | | .------------. - /// | | .--+--. .-----. | | | - /// `-------->| TPU +-->| NCP +------>| Validators | - /// | `-----` `-----` | | | - /// | | `------------` - /// `---------------------` - /// ``` - pub fn new_leader( - bank: Bank, - entry_height: u64, - tick_duration: Option, - me: ReplicatedData, - requests_socket: UdpSocket, - transactions_socket: UdpSocket, - broadcast_socket: UdpSocket, - respond_socket: UdpSocket, - gossip_socket: UdpSocket, - exit: Arc, - writer: W, - ) -> Self { - let bank = Arc::new(bank); - let mut thread_hdls = vec![]; - let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); - thread_hdls.extend(rpu.thread_hdls); - - let blob_recycler = BlobRecycler::default(); - let (tpu, blob_receiver) = Tpu::new( - bank.clone(), - tick_duration, - transactions_socket, - blob_recycler.clone(), - exit.clone(), - writer, - ); - thread_hdls.extend(tpu.thread_hdls); - - let crdt = Arc::new(RwLock::new(Crdt::new(me))); - let window = streamer::default_window(); - let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); - let ncp = Ncp::new( - crdt.clone(), - window.clone(), - gossip_socket, - gossip_send_socket, - exit.clone(), - ).expect("Ncp::new"); - thread_hdls.extend(ncp.thread_hdls); - - let t_broadcast = streamer::broadcaster( - broadcast_socket, - exit.clone(), - crdt, - window, - entry_height, - blob_recycler.clone(), - blob_receiver, - ); - thread_hdls.extend(vec![t_broadcast]); - - Server { thread_hdls } - } - - /// Create a server instance acting as a validator. - /// - /// ```text - /// .-------------------------------. - /// | Validator | - /// | | - /// .--------. | .-----. | - /// | |-------------->| | | - /// | Client | | | RPU | | - /// | |<--------------| | | - /// `--------` | `-----` | - /// | ^ | - /// | | | - /// | .--+---. | - /// | | Bank | | - /// | `------` | - /// | ^ | - /// .--------. | | | .------------. - /// | | | .--+--. | | | - /// | Leader |<------------->| TVU +<--------------->| | - /// | | | `-----` | | Validators | - /// | | | ^ | | | - /// | | | | | | | - /// | | | .--+--. | | | - /// | |<------------->| NCP +<--------------->| | - /// | | | `-----` | | | - /// `--------` | | `------------` - /// `-------------------------------` - /// ``` - pub fn new_validator( - bank: Bank, - entry_height: u64, - me: ReplicatedData, - requests_socket: UdpSocket, - respond_socket: UdpSocket, - replicate_socket: UdpSocket, - gossip_listen_socket: UdpSocket, - repair_socket: UdpSocket, - entry_point: ReplicatedData, - exit: Arc, - ) -> Self { - let bank = Arc::new(bank); - let mut thread_hdls = vec![]; - let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); - thread_hdls.extend(rpu.thread_hdls); - - let crdt = Arc::new(RwLock::new(Crdt::new(me))); - crdt.write() - .expect("'crdt' write lock before insert() in pub fn replicate") - .insert(&entry_point); - let window = streamer::default_window(); - let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); - let retransmit_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); - let ncp = Ncp::new( - crdt.clone(), - window.clone(), - gossip_listen_socket, - gossip_send_socket, - exit.clone(), - ).expect("Ncp::new"); - - let tvu = Tvu::new( - bank.clone(), - entry_height, - crdt.clone(), - window.clone(), - replicate_socket, - repair_socket, - retransmit_socket, - exit.clone(), - ); - thread_hdls.extend(tvu.thread_hdls); - thread_hdls.extend(ncp.thread_hdls); - Server { thread_hdls } - } -} -#[cfg(test)] -mod tests { - use bank::Bank; - use crdt::TestNode; - use mint::Mint; - use server::Server; - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::Arc; - #[test] - fn validator_exit() { - let tn = TestNode::new(); - let alice = Mint::new(10_000); - let bank = Bank::new(&alice); - let exit = Arc::new(AtomicBool::new(false)); - let v = Server::new_validator( - bank, - 0, - tn.data.clone(), - tn.sockets.requests, - tn.sockets.respond, - tn.sockets.replicate, - tn.sockets.gossip, - tn.sockets.repair, - tn.data, - exit.clone(), - ); - exit.store(true, Ordering::Relaxed); - for t in v.thread_hdls { - t.join().unwrap(); - } - } -} diff --git a/src/thin_client.rs b/src/thin_client.rs index bdc25ea1ed..513fa41563 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -223,9 +223,9 @@ mod tests { use bank::Bank; use budget::Budget; use crdt::TestNode; + use fullnode::FullNode; use logger; use mint::Mint; - use server::Server; use signature::{KeyPair, KeyPairUtil}; use std::io::sink; use std::sync::atomic::{AtomicBool, Ordering}; @@ -244,7 +244,7 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let server = Server::new_leader( + let server = FullNode::new_leader( bank, 0, Some(Duration::from_millis(30)), @@ -289,7 +289,7 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let server = Server::new_leader( + let server = FullNode::new_leader( bank, 0, Some(Duration::from_millis(30)), @@ -347,7 +347,7 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let server = Server::new_leader( + let server = FullNode::new_leader( bank, 0, Some(Duration::from_millis(30)), diff --git a/tests/multinode.rs b/tests/multinode.rs index 9ef8916318..d53b0562d9 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -6,10 +6,10 @@ extern crate solana; use solana::bank::Bank; use solana::crdt::TestNode; use solana::crdt::{Crdt, ReplicatedData}; +use solana::fullnode::FullNode; use solana::logger; use solana::mint::Mint; use solana::ncp::Ncp; -use solana::server::Server; use solana::signature::{KeyPair, KeyPairUtil, PublicKey}; use solana::streamer::default_window; use solana::thin_client::ThinClient; @@ -30,7 +30,7 @@ fn validator( ) { let validator = TestNode::new(); let replicant_bank = Bank::new(&alice); - let mut ts = Server::new_validator( + let mut ts = FullNode::new_validator( replicant_bank, 0, validator.data.clone(), @@ -104,7 +104,7 @@ fn test_multi_node_validator_catchup_from_zero() { let exit = Arc::new(AtomicBool::new(false)); let leader_bank = Bank::new(&alice); - let server = Server::new_leader( + let server = FullNode::new_leader( leader_bank, 0, None, @@ -194,7 +194,7 @@ fn test_multi_node_basic() { let exit = Arc::new(AtomicBool::new(false)); let leader_bank = Bank::new(&alice); - let server = Server::new_leader( + let server = FullNode::new_leader( leader_bank, 0, None,