From 5e824b39ddba9b379b5d8570f2ea63bb21acd11f Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 29 May 2018 11:02:58 -0600 Subject: [PATCH] Move multinode communication outside TPU --- src/server.rs | 28 +++++++++++++++++++++++----- src/tpu.rs | 32 ++++++++------------------------ 2 files changed, 31 insertions(+), 29 deletions(-) diff --git a/src/server.rs b/src/server.rs index ab4f5be8bb..5c28a6282f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,15 +1,17 @@ //! The `server` module hosts all the server microservices. use bank::Bank; -use crdt::ReplicatedData; +use crdt::{Crdt, ReplicatedData}; use hash::Hash; +use packet; use rpu::Rpu; use std::io::Write; use std::net::UdpSocket; -use std::sync::Arc; use std::sync::atomic::AtomicBool; +use std::sync::{Arc, RwLock}; use std::thread::JoinHandle; use std::time::Duration; +use streamer; use tpu::Tpu; use tvu::Tvu; @@ -35,18 +37,34 @@ impl Server { let mut thread_hdls = vec![]; let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); thread_hdls.extend(rpu.thread_hdls); + + let blob_recycler = packet::BlobRecycler::default(); let tpu = Tpu::new( bank.clone(), start_hash, tick_duration, - me, transactions_socket, - broadcast_socket, - gossip_socket, + blob_recycler.clone(), exit.clone(), writer, ); thread_hdls.extend(tpu.thread_hdls); + + let crdt = Arc::new(RwLock::new(Crdt::new(me))); + let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); + let window = streamer::default_window(); + let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip_socket, exit.clone()); + + let t_broadcast = streamer::broadcaster( + broadcast_socket, + exit.clone(), + crdt.clone(), + window, + blob_recycler.clone(), + tpu.blob_receiver, + ); + thread_hdls.extend(vec![t_gossip, t_listen, t_broadcast]); + Server { thread_hdls } } pub fn new_validator( diff --git a/src/tpu.rs b/src/tpu.rs index b813bd3275..b7758b4460 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -3,7 +3,6 @@ use bank::Bank; use banking_stage::BankingStage; -use crdt::{Crdt, ReplicatedData}; use hash::Hash; use packet; use record_stage::RecordStage; @@ -12,13 +11,14 @@ use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; use std::time::Duration; use streamer; use write_stage::WriteStage; pub struct Tpu { + pub blob_receiver: streamer::BlobReceiver, pub thread_hdls: Vec>, } @@ -27,10 +27,8 @@ impl Tpu { bank: Arc, start_hash: Hash, tick_duration: Option, - me: ReplicatedData, transactions_socket: UdpSocket, - broadcast_socket: UdpSocket, - gossip: UdpSocket, + blob_recycler: packet::BlobRecycler, exit: Arc, writer: W, ) -> Self { @@ -45,7 +43,6 @@ impl Tpu { let sigverify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); - let blob_recycler = packet::BlobRecycler::default(); let banking_stage = BankingStage::new( bank.clone(), exit.clone(), @@ -64,30 +61,17 @@ impl Tpu { record_stage.entry_receiver, ); - let crdt = Arc::new(RwLock::new(Crdt::new(me))); - let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); - let window = streamer::default_window(); - let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); - - let t_broadcast = streamer::broadcaster( - broadcast_socket, - exit.clone(), - crdt.clone(), - window, - blob_recycler.clone(), - write_stage.blob_receiver, - ); - + let blob_receiver = write_stage.blob_receiver; let mut thread_hdls = vec![ t_receiver, banking_stage.thread_hdl, record_stage.thread_hdl, write_stage.thread_hdl, - t_gossip, - t_listen, - t_broadcast, ]; thread_hdls.extend(sigverify_stage.thread_hdls.into_iter()); - Tpu { thread_hdls } + Tpu { + blob_receiver, + thread_hdls, + } } }