diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 659b84ea89..4b82899a95 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -140,7 +140,7 @@ fn main() { let respond_socket = UdpSocket::bind(local.clone()).unwrap(); eprintln!("starting server..."); - let server = Server::new( + let server = Server::leader( bank, last_id, Some(Duration::from_millis(1000)), diff --git a/src/server.rs b/src/server.rs index 016d5ab57e..94e3ffbc11 100644 --- a/src/server.rs +++ b/src/server.rs @@ -11,6 +11,7 @@ use std::sync::atomic::AtomicBool; use std::thread::JoinHandle; use std::time::Duration; use tpu::Tpu; +use tvu::Tvu; pub struct Server { pub thread_hdls: Vec>, @@ -70,7 +71,7 @@ impl Server { leader_repl_data, exit.clone(), ); - thread_hdls.extend(tpu.thread_hdls); + thread_hdls.extend(tvu.thread_hdls); Server { thread_hdls } } } diff --git a/src/thin_client.rs b/src/thin_client.rs index 142bb46ac9..a2cc3f2bb8 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -192,7 +192,6 @@ mod tests { use std::thread::sleep; use std::time::Duration; use streamer::default_window; - use tvu::{self, Tvu}; #[test] fn test_thin_client() { @@ -224,7 +223,7 @@ mod tests { let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let server = Server::new( + let server = Server::leader( bank, alice.last_id(), Some(Duration::from_millis(30)), @@ -258,21 +257,20 @@ mod tests { #[test] fn test_bad_sig() { logger::setup(); - let (leader_data, leader_gossip, _, leader_serve, _leader_events) = tvu::test_node(); + let leader = TestNode::new(); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let serve_addr = leader_serve.local_addr().unwrap(); - - let mut local = leader_serve.local_addr().unwrap(); + let serve_addr = leader.data.requests_addr; + let mut local = serve_addr.local_addr().unwrap(); local.set_port(0); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); let events_addr = events_socket.local_addr().unwrap(); - let server = Server::new( + let server = Server::leader( bank, alice.last_id(), Some(Duration::from_millis(30)), @@ -313,23 +311,33 @@ mod tests { t.join().unwrap(); } } - - fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { - let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); - let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); - serve.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); - - let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); - let pubkey = KeyPair::new().pubkey(); - let leader = ReplicatedData::new( - pubkey, - gossip.local_addr().unwrap(), - replicate.local_addr().unwrap(), - serve.local_addr().unwrap(), - events_socket.local_addr().unwrap(), - ); - (leader, gossip, serve, replicate, events_socket) + struct TestNode { + data: ReplicatedData, + gossip: UdpSocket, + requests: UdpSocket, + replicate: UdpSocket, + event: UdpSocket, + respond: UdpSocket, + broadcast: UdpSocket, + } + impl TestNode { + fn new() -> TestNode { + let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); + let requests = UdpSocket::bind("0.0.0.0:0").unwrap(); + let event = UdpSocket::bind("0.0.0.0:0").unwrap(); + let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); + let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); + let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); + let pubkey = KeyPair::new().pubkey(); + let data = ReplicatedData::new( + pubkey, + gossip.local_addr().unwrap(), + replicate.local_addr().unwrap(), + requests.local_addr().unwrap(), + event.local_addr().unwrap(), + ); + TestNode {data, gossip, requests, replicate, event, respond, broadcast } + } } fn replicant( @@ -338,21 +346,19 @@ mod tests { alice: &Mint, threads: &mut Vec>, ) { - let replicant = test_node(); - let replicant_bank = { - let bank = Bank::new(&alice); - Arc::new(Tvu::new(bank, alice.last_id(), None)) - }; - let mut ts = Tvu::serve( - &replicant_bank, - replicant.0.clone(), - replicant.1, - replicant.2, - replicant.3, + let replicant = TestNode::new(); + let replicant_bank = Bank::new(&alice); + let mut ts = Server::validator( + replicant_bank, + replicant.data.clone(), + replicant.requests, + replicant.respond, + replicant.replicate, + replicant.gossip, leader.clone(), exit.clone(), - ).unwrap(); - threads.append(&mut ts); + ); + threads.append(&mut ts.thread_hdls); } fn converge( @@ -362,7 +368,7 @@ mod tests { threads: &mut Vec>, ) -> Vec { //lets spy on the network - let (mut spy, spy_gossip, _, _, _) = test_node(); + let mut spy = test_node(); let daddr = "0.0.0.0:0".parse().unwrap(); let me = spy.id.clone(); spy.replicate_addr = daddr; @@ -418,7 +424,7 @@ mod tests { let respond_socket = UdpSocket::bind(local.clone()).unwrap(); let events_addr = leader.4.local_addr().unwrap(); - let server = Server::new( + let server = Server::leader( leader_bank, alice.last_id(), None, diff --git a/src/tvu.rs b/src/tvu.rs index 92e35ffd85..f640428839 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -4,7 +4,8 @@ //! - Incoming blobs are picked up from the replicate socket. //! 2. verifier //! - TODO Blobs are sent to the GPU, and while the memory is there the PoH stream is verified -//! along with the ecdsa signature for the blob and each signature in all the transactions. +//! along with the ecdsa signature for the blob and each signature in all the transactions. Blobs +//! with errors are dropped, or marked for slashing. //! 3.a retransmit //! - Blobs originating from the parent (leader atm is the only parent), are retransmit to all the //! peers in the crdt. Peers is everyone who is not me or the leader that has a known replicate @@ -20,22 +21,15 @@ //! - TODO Validation messages are sent back to the leader use bank::Bank; -use banking_stage::BankingStage; use crdt::{Crdt, ReplicatedData}; -use hash::Hash; use packet; -use record_stage::RecordStage; use replicate_stage::ReplicateStage; -use result::Result; -use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::JoinHandle; -use std::time::Duration; use streamer; -use write_stage::WriteStage; pub struct Tvu { pub thread_hdls: Vec>, @@ -71,10 +65,11 @@ impl Tvu { let window = streamer::default_window(); let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); + // TODO pull this socket out through the public interface // make sure we are on the same interface - let mut local = replicate.local_addr()?; + let mut local = replicate.local_addr().expect("tvu: get local address"); local.set_port(0); - let write = UdpSocket::bind(local)?; + let write = UdpSocket::bind(local).expect("tvu: bind to local socket"); let blob_recycler = packet::BlobRecycler::default(); let (blob_sender, blob_receiver) = channel(); @@ -83,7 +78,7 @@ impl Tvu { blob_recycler.clone(), replicate, blob_sender.clone(), - )?; + ).expect("tvu: blob receiver creation"); let (window_sender, window_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel(); @@ -109,7 +104,7 @@ impl Tvu { ); let replicate_stage = ReplicateStage::new( - obj.bank.clone(), + bank.clone(), exit.clone(), window_receiver, blob_recycler.clone(), @@ -123,16 +118,14 @@ impl Tvu { replicate_stage.thread_hdl, t_gossip, t_listen, - //serve threads - t_packet_receiver, - banking_stage.thread_hdl, - write_stage.thread_hdl, ]; - threads.extend(sig_verify_stage.thread_hdls.into_iter()); - Tvu{threads} + Tvu{thread_hdls: threads} } } +#[cfg(test)] +use std::time::Duration; + #[cfg(test)] pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { use signature::{KeyPair, KeyPairUtil}; @@ -228,22 +221,17 @@ mod tests { let starting_balance = 10_000; let mint = Mint::new(starting_balance); - let bank = Bank::new(&mint); - let tvu = Arc::new(Tvu::new( - bank, - mint.last_id(), - Some(Duration::from_millis(30)), - )); let replicate_addr = target1_data.replicate_addr; - let threads = Tvu::serve( - &tvu, + let bank = Arc::new(Bank::new(&mint)); + let tvu = Tvu::new( + bank, target1_data, target1_gossip, target1_events, target1_replicate, leader_data, exit.clone(), - ).unwrap(); + ); let mut alice_ref_balance = starting_balance; let mut msgs = VecDeque::new(); @@ -258,8 +246,6 @@ mod tests { w.set_index(i).unwrap(); w.set_id(leader_id).unwrap(); - let bank = &tvu.bank; - let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); let entry0 = Entry::new(&cur_hash, i, vec![tr0]); bank.register_entry_id(&cur_hash); @@ -299,7 +285,6 @@ mod tests { msgs.push(msg); } - let bank = &tvu.bank; let alice_balance = bank.get_balance(&mint.keypair().pubkey()).unwrap(); assert_eq!(alice_balance, alice_ref_balance); @@ -307,7 +292,7 @@ mod tests { assert_eq!(bob_balance, starting_balance - alice_ref_balance); exit.store(true, Ordering::Relaxed); - for t in threads { + for t in tvu.thread_hdls { t.join().expect("join"); } t2_gossip.join().expect("join");