refactor wip

This commit is contained in:
Anatoly Yakovenko
2018-05-23 08:29:24 -07:00
committed by Greg Fitzgerald
parent 59be94a81f
commit 8646ff4927
4 changed files with 64 additions and 72 deletions

View File

@ -140,7 +140,7 @@ fn main() {
let respond_socket = UdpSocket::bind(local.clone()).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap();
eprintln!("starting server..."); eprintln!("starting server...");
let server = Server::new( let server = Server::leader(
bank, bank,
last_id, last_id,
Some(Duration::from_millis(1000)), Some(Duration::from_millis(1000)),

View File

@ -11,6 +11,7 @@ use std::sync::atomic::AtomicBool;
use std::thread::JoinHandle; use std::thread::JoinHandle;
use std::time::Duration; use std::time::Duration;
use tpu::Tpu; use tpu::Tpu;
use tvu::Tvu;
pub struct Server { pub struct Server {
pub thread_hdls: Vec<JoinHandle<()>>, pub thread_hdls: Vec<JoinHandle<()>>,
@ -70,7 +71,7 @@ impl Server {
leader_repl_data, leader_repl_data,
exit.clone(), exit.clone(),
); );
thread_hdls.extend(tpu.thread_hdls); thread_hdls.extend(tvu.thread_hdls);
Server { thread_hdls } Server { thread_hdls }
} }
} }

View File

@ -192,7 +192,6 @@ mod tests {
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
use streamer::default_window; use streamer::default_window;
use tvu::{self, Tvu};
#[test] #[test]
fn test_thin_client() { fn test_thin_client() {
@ -224,7 +223,7 @@ mod tests {
let broadcast_socket = UdpSocket::bind(local).unwrap(); let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let server = Server::new( let server = Server::leader(
bank, bank,
alice.last_id(), alice.last_id(),
Some(Duration::from_millis(30)), Some(Duration::from_millis(30)),
@ -258,21 +257,20 @@ mod tests {
#[test] #[test]
fn test_bad_sig() { fn test_bad_sig() {
logger::setup(); logger::setup();
let (leader_data, leader_gossip, _, leader_serve, _leader_events) = tvu::test_node(); let leader = TestNode::new();
let alice = Mint::new(10_000); let alice = Mint::new(10_000);
let bank = Bank::new(&alice); let bank = Bank::new(&alice);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let serve_addr = leader_serve.local_addr().unwrap(); let serve_addr = leader.data.requests_addr;
let mut local = serve_addr.local_addr().unwrap();
let mut local = leader_serve.local_addr().unwrap();
local.set_port(0); local.set_port(0);
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast_socket = UdpSocket::bind(local).unwrap(); let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let events_addr = events_socket.local_addr().unwrap(); let events_addr = events_socket.local_addr().unwrap();
let server = Server::new( let server = Server::leader(
bank, bank,
alice.last_id(), alice.last_id(),
Some(Duration::from_millis(30)), Some(Duration::from_millis(30)),
@ -313,23 +311,33 @@ mod tests {
t.join().unwrap(); t.join().unwrap();
} }
} }
struct TestNode {
fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { data: ReplicatedData,
gossip: UdpSocket,
requests: UdpSocket,
replicate: UdpSocket,
event: UdpSocket,
respond: UdpSocket,
broadcast: UdpSocket,
}
impl TestNode {
fn new() -> TestNode {
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); let requests = UdpSocket::bind("0.0.0.0:0").unwrap();
serve.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); let event = UdpSocket::bind("0.0.0.0:0").unwrap();
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey(); let pubkey = KeyPair::new().pubkey();
let leader = ReplicatedData::new( let data = ReplicatedData::new(
pubkey, pubkey,
gossip.local_addr().unwrap(), gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(), replicate.local_addr().unwrap(),
serve.local_addr().unwrap(), requests.local_addr().unwrap(),
events_socket.local_addr().unwrap(), event.local_addr().unwrap(),
); );
(leader, gossip, serve, replicate, events_socket) TestNode {data, gossip, requests, replicate, event, respond, broadcast }
}
} }
fn replicant( fn replicant(
@ -338,21 +346,19 @@ mod tests {
alice: &Mint, alice: &Mint,
threads: &mut Vec<JoinHandle<()>>, threads: &mut Vec<JoinHandle<()>>,
) { ) {
let replicant = test_node(); let replicant = TestNode::new();
let replicant_bank = { let replicant_bank = Bank::new(&alice);
let bank = Bank::new(&alice); let mut ts = Server::validator(
Arc::new(Tvu::new(bank, alice.last_id(), None)) replicant_bank,
}; replicant.data.clone(),
let mut ts = Tvu::serve( replicant.requests,
&replicant_bank, replicant.respond,
replicant.0.clone(), replicant.replicate,
replicant.1, replicant.gossip,
replicant.2,
replicant.3,
leader.clone(), leader.clone(),
exit.clone(), exit.clone(),
).unwrap(); );
threads.append(&mut ts); threads.append(&mut ts.thread_hdls);
} }
fn converge( fn converge(
@ -362,7 +368,7 @@ mod tests {
threads: &mut Vec<JoinHandle<()>>, threads: &mut Vec<JoinHandle<()>>,
) -> Vec<SocketAddr> { ) -> Vec<SocketAddr> {
//lets spy on the network //lets spy on the network
let (mut spy, spy_gossip, _, _, _) = test_node(); let mut spy = test_node();
let daddr = "0.0.0.0:0".parse().unwrap(); let daddr = "0.0.0.0:0".parse().unwrap();
let me = spy.id.clone(); let me = spy.id.clone();
spy.replicate_addr = daddr; spy.replicate_addr = daddr;
@ -418,7 +424,7 @@ mod tests {
let respond_socket = UdpSocket::bind(local.clone()).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let events_addr = leader.4.local_addr().unwrap(); let events_addr = leader.4.local_addr().unwrap();
let server = Server::new( let server = Server::leader(
leader_bank, leader_bank,
alice.last_id(), alice.last_id(),
None, None,

View File

@ -4,7 +4,8 @@
//! - Incoming blobs are picked up from the replicate socket. //! - Incoming blobs are picked up from the replicate socket.
//! 2. verifier //! 2. verifier
//! - TODO Blobs are sent to the GPU, and while the memory is there the PoH stream is verified //! - TODO Blobs are sent to the GPU, and while the memory is there the PoH stream is verified
//! along with the ecdsa signature for the blob and each signature in all the transactions. //! along with the ecdsa signature for the blob and each signature in all the transactions. Blobs
//! with errors are dropped, or marked for slashing.
//! 3.a retransmit //! 3.a retransmit
//! - Blobs originating from the parent (leader atm is the only parent), are retransmit to all the //! - Blobs originating from the parent (leader atm is the only parent), are retransmit to all the
//! peers in the crdt. Peers is everyone who is not me or the leader that has a known replicate //! peers in the crdt. Peers is everyone who is not me or the leader that has a known replicate
@ -20,22 +21,15 @@
//! - TODO Validation messages are sent back to the leader //! - TODO Validation messages are sent back to the leader
use bank::Bank; use bank::Bank;
use banking_stage::BankingStage;
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, ReplicatedData};
use hash::Hash;
use packet; use packet;
use record_stage::RecordStage;
use replicate_stage::ReplicateStage; use replicate_stage::ReplicateStage;
use result::Result;
use sig_verify_stage::SigVerifyStage;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::JoinHandle; use std::thread::JoinHandle;
use std::time::Duration;
use streamer; use streamer;
use write_stage::WriteStage;
pub struct Tvu { pub struct Tvu {
pub thread_hdls: Vec<JoinHandle<()>>, pub thread_hdls: Vec<JoinHandle<()>>,
@ -71,10 +65,11 @@ impl Tvu {
let window = streamer::default_window(); let window = streamer::default_window();
let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone());
// TODO pull this socket out through the public interface
// make sure we are on the same interface // make sure we are on the same interface
let mut local = replicate.local_addr()?; let mut local = replicate.local_addr().expect("tvu: get local address");
local.set_port(0); local.set_port(0);
let write = UdpSocket::bind(local)?; let write = UdpSocket::bind(local).expect("tvu: bind to local socket");
let blob_recycler = packet::BlobRecycler::default(); let blob_recycler = packet::BlobRecycler::default();
let (blob_sender, blob_receiver) = channel(); let (blob_sender, blob_receiver) = channel();
@ -83,7 +78,7 @@ impl Tvu {
blob_recycler.clone(), blob_recycler.clone(),
replicate, replicate,
blob_sender.clone(), blob_sender.clone(),
)?; ).expect("tvu: blob receiver creation");
let (window_sender, window_receiver) = channel(); let (window_sender, window_receiver) = channel();
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();
@ -109,7 +104,7 @@ impl Tvu {
); );
let replicate_stage = ReplicateStage::new( let replicate_stage = ReplicateStage::new(
obj.bank.clone(), bank.clone(),
exit.clone(), exit.clone(),
window_receiver, window_receiver,
blob_recycler.clone(), blob_recycler.clone(),
@ -123,16 +118,14 @@ impl Tvu {
replicate_stage.thread_hdl, replicate_stage.thread_hdl,
t_gossip, t_gossip,
t_listen, t_listen,
//serve threads
t_packet_receiver,
banking_stage.thread_hdl,
write_stage.thread_hdl,
]; ];
threads.extend(sig_verify_stage.thread_hdls.into_iter()); Tvu{thread_hdls: threads}
Tvu{threads}
} }
} }
#[cfg(test)]
use std::time::Duration;
#[cfg(test)] #[cfg(test)]
pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
@ -228,22 +221,17 @@ mod tests {
let starting_balance = 10_000; let starting_balance = 10_000;
let mint = Mint::new(starting_balance); let mint = Mint::new(starting_balance);
let bank = Bank::new(&mint);
let tvu = Arc::new(Tvu::new(
bank,
mint.last_id(),
Some(Duration::from_millis(30)),
));
let replicate_addr = target1_data.replicate_addr; let replicate_addr = target1_data.replicate_addr;
let threads = Tvu::serve( let bank = Arc::new(Bank::new(&mint));
&tvu, let tvu = Tvu::new(
bank,
target1_data, target1_data,
target1_gossip, target1_gossip,
target1_events, target1_events,
target1_replicate, target1_replicate,
leader_data, leader_data,
exit.clone(), exit.clone(),
).unwrap(); );
let mut alice_ref_balance = starting_balance; let mut alice_ref_balance = starting_balance;
let mut msgs = VecDeque::new(); let mut msgs = VecDeque::new();
@ -258,8 +246,6 @@ mod tests {
w.set_index(i).unwrap(); w.set_index(i).unwrap();
w.set_id(leader_id).unwrap(); w.set_id(leader_id).unwrap();
let bank = &tvu.bank;
let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); let tr0 = Event::new_timestamp(&bob_keypair, Utc::now());
let entry0 = Entry::new(&cur_hash, i, vec![tr0]); let entry0 = Entry::new(&cur_hash, i, vec![tr0]);
bank.register_entry_id(&cur_hash); bank.register_entry_id(&cur_hash);
@ -299,7 +285,6 @@ mod tests {
msgs.push(msg); msgs.push(msg);
} }
let bank = &tvu.bank;
let alice_balance = bank.get_balance(&mint.keypair().pubkey()).unwrap(); let alice_balance = bank.get_balance(&mint.keypair().pubkey()).unwrap();
assert_eq!(alice_balance, alice_ref_balance); assert_eq!(alice_balance, alice_ref_balance);
@ -307,7 +292,7 @@ mod tests {
assert_eq!(bob_balance, starting_balance - alice_ref_balance); assert_eq!(bob_balance, starting_balance - alice_ref_balance);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
for t in threads { for t in tvu.thread_hdls {
t.join().expect("join"); t.join().expect("join");
} }
t2_gossip.join().expect("join"); t2_gossip.join().expect("join");