diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 1907589cbd..659b84ea89 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -131,6 +131,7 @@ fn main() { gossip_sock.local_addr().unwrap(), replicate_sock.local_addr().unwrap(), serve_sock.local_addr().unwrap(), + events_sock.local_addr().unwrap(), ); let mut local = serve_sock.local_addr().unwrap(); diff --git a/src/crdt.rs b/src/crdt.rs index 7029e79b21..eef6981f47 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -31,7 +31,7 @@ use std::thread::{sleep, spawn, JoinHandle}; use std::time::Duration; /// Structure to be replicated by the network -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct ReplicatedData { pub id: PublicKey, sig: Signature, @@ -42,7 +42,9 @@ pub struct ReplicatedData { /// address to connect to for replication pub replicate_addr: SocketAddr, /// address to connect to when this node is leader - pub serve_addr: SocketAddr, + pub requests_addr: SocketAddr, + /// events address + pub events_addr: SocketAddr, /// current leader identity current_leader_id: PublicKey, /// last verified hash that was submitted to the leader @@ -56,7 +58,8 @@ impl ReplicatedData { id: PublicKey, gossip_addr: SocketAddr, replicate_addr: SocketAddr, - serve_addr: SocketAddr, + requests_addr: SocketAddr, + events_addr: SocketAddr, ) -> ReplicatedData { ReplicatedData { id, @@ -64,7 +67,8 @@ impl ReplicatedData { version: 0, gossip_addr, replicate_addr, - serve_addr, + requests_addr, + events_addr, current_leader_id: PublicKey::default(), last_verified_hash: Hash::default(), last_verified_count: 0, @@ -515,12 +519,14 @@ mod test { let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); + let events = UdpSocket::bind("0.0.0.0:0").unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), serve.local_addr().unwrap(), + events.local_addr().unwrap(), ); let crdt = Crdt::new(d); trace!( @@ -632,6 +638,7 @@ mod test { "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), + "127.0.0.1:1237".parse().unwrap(), ); assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()); diff --git a/src/streamer.rs b/src/streamer.rs index ee83cd25d0..9af0552ce1 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -650,12 +650,14 @@ mod test { let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let serve = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let event = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); let rep_data = ReplicatedData::new( pubkey_me, read.local_addr().unwrap(), send.local_addr().unwrap(), serve.local_addr().unwrap(), + event.local_addr().unwrap(), ); let mut crdt_me = Crdt::new(rep_data); let me_id = crdt_me.my_data().id; @@ -712,21 +714,17 @@ mod test { let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); let serve = UdpSocket::bind("127.0.0.1:0").unwrap(); + let event = UdpSocket::bind("127.0.0.1:0").unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), serve.local_addr().unwrap(), + event.local_addr().unwrap(), ); let crdt = Crdt::new(d); - trace!( - "id: {} gossip: {} replicate: {} serve: {}", - crdt.my_data().id[0], - gossip.local_addr().unwrap(), - replicate.local_addr().unwrap(), - serve.local_addr().unwrap(), - ); + trace!("data: {:?}", d); (Arc::new(RwLock::new(crdt)), gossip, replicate, serve) } diff --git a/src/thin_client.rs b/src/thin_client.rs index b5632c9ae3..142bb46ac9 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -211,6 +211,7 @@ mod tests { gossip.local_addr().unwrap(), "0.0.0.0:0".parse().unwrap(), requests_socket.local_addr().unwrap(), + events_addr, ); let alice = Mint::new(10_000); @@ -326,6 +327,7 @@ mod tests { gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), serve.local_addr().unwrap(), + events_socket.local_addr().unwrap(), ); (leader, gossip, serve, replicate, events_socket) } @@ -364,7 +366,7 @@ mod tests { let daddr = "0.0.0.0:0".parse().unwrap(); let me = spy.id.clone(); spy.replicate_addr = daddr; - spy.serve_addr = daddr; + spy.requests_addr = daddr; let mut spy_crdt = Crdt::new(spy); spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); @@ -393,7 +395,7 @@ mod tests { .values() .into_iter() .filter(|x| x.id != me) - .map(|x| x.serve_addr) + .map(|x| x.requests_addr) .collect(); v.clone() } @@ -446,7 +448,7 @@ mod tests { let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader.0.serve_addr, + leader.0.requests_addr, requests_socket, events_addr, events_socket, diff --git a/src/tvu.rs b/src/tvu.rs index 18d09d08b1..92e35ffd85 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -1,5 +1,23 @@ //! The `tvu` module implements the Transaction Validation Unit, a //! 5-stage transaction validation pipeline in software. +//! 1. streamer +//! - Incoming blobs are picked up from the replicate socket. +//! 2. verifier +//! - TODO Blobs are sent to the GPU, and while the memory is there the PoH stream is verified +//! along with the ecdsa signature for the blob and each signature in all the transactions. +//! 3.a retransmit +//! - Blobs originating from the parent (leader atm is the only parent), are retransmit to all the +//! peers in the crdt. Peers is everyone who is not me or the leader that has a known replicate +//! address. +//! 3.b window +//! - Verified blobs are placed into a window, indexed by the counter set by the leader. This could +//! be the PoH counter if its monitonically increasing in each blob. Easure coding is used to +//! recover any missing packets, and requests are made at random to peers and parents to retransmit +//! a missing packet. +//! 4. accountant +//! - Contigous blobs are sent to the accountant for processing transactions +//! 5. validator +//! - TODO Validation messages are sent back to the leader use bank::Bank; use banking_stage::BankingStage; @@ -20,48 +38,27 @@ use streamer; use write_stage::WriteStage; pub struct Tvu { - bank: Arc, - start_hash: Hash, - tick_duration: Option, + pub thread_hdls: Vec>, } impl Tvu { - /// Create a new Tvu that wraps the given Bank. - pub fn new(bank: Bank, start_hash: Hash, tick_duration: Option) -> Self { - Tvu { - bank: Arc::new(bank), - start_hash, - tick_duration, - } - } - /// This service receives messages from a leader in the network and processes the transactions /// on the bank state. /// # Arguments - /// * `obj` - The bank state. + /// * `bank` - The bank state. /// * `me` - my configuration + /// * `gossip` - my gosisp socket + /// * `replicte` - my replicte socket /// * `leader` - leader configuration /// * `exit` - The exit signal. - /// # Remarks - /// The pipeline is constructed as follows: - /// 1. receive blobs from the network, these are out of order - /// 2. verify blobs, PoH, signatures (TODO) - /// 3. reconstruct contiguous window - /// a. order the blobs - /// b. use erasure coding to reconstruct missing blobs - /// c. ask the network for missing blobs, if erasure coding is insufficient - /// d. make sure that the blobs PoH sequences connect (TODO) - /// 4. process the transaction state machine - /// 5. respond with the hash of the state back to the leader - pub fn serve( - obj: &Arc, + pub fn new( + bank: Arc, me: ReplicatedData, gossip: UdpSocket, - requests_socket: UdpSocket, replicate: UdpSocket, leader: ReplicatedData, exit: Arc, - ) -> Result>> { + ) -> Self { //replicate pipeline let crdt = Arc::new(RwLock::new(Crdt::new(me))); crdt.write() @@ -118,39 +115,7 @@ impl Tvu { blob_recycler.clone(), ); - //serve pipeline - // make sure we are on the same interface - let mut local = requests_socket.local_addr()?; - local.set_port(0); - - let packet_recycler = packet::PacketRecycler::default(); - let (packet_sender, packet_receiver) = channel(); - let t_packet_receiver = streamer::receiver( - requests_socket, - exit.clone(), - packet_recycler.clone(), - packet_sender, - ); - - let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); - - let banking_stage = BankingStage::new( - obj.bank.clone(), - exit.clone(), - sig_verify_stage.verified_receiver, - packet_recycler.clone(), - ); - - let record_stage = RecordStage::new( - banking_stage.signal_receiver, - &obj.start_hash, - obj.tick_duration, - ); - - let write_stage = - WriteStage::new_drain(obj.bank.clone(), exit.clone(), record_stage.entry_receiver); - - let mut threads = vec![ + let threads = vec![ //replicate threads t_blob_receiver, t_retransmit, @@ -164,7 +129,7 @@ impl Tvu { write_stage.thread_hdl, ]; threads.extend(sig_verify_stage.thread_hdls.into_iter()); - Ok(threads) + Tvu{threads} } } @@ -185,6 +150,7 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), requests_socket.local_addr().unwrap(), + events_socket.local_addr().unwrap(), ); (d, gossip, replicate, requests_socket, events_socket) } @@ -215,7 +181,7 @@ mod tests { fn test_replicate() { logger::setup(); let (leader_data, leader_gossip, _, leader_serve, _) = test_node(); - let (target1_data, target1_gossip, target1_replicate, target1_serve, _) = test_node(); + let (target1_data, target1_gossip, target1_replicate, _, target1_events) = test_node(); let (target2_data, target2_gossip, target2_replicate, _, _) = test_node(); let exit = Arc::new(AtomicBool::new(false)); @@ -273,7 +239,7 @@ mod tests { &tvu, target1_data, target1_gossip, - target1_serve, + target1_events, target1_replicate, leader_data, exit.clone(),