diff --git a/src/rpu.rs b/src/rpu.rs index 17187bec74..6c1889863d 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -20,12 +20,10 @@ use streamer; use write_stage::WriteStage; pub struct Rpu { - bank: Arc, pub thread_hdls: Vec>, } impl Rpu { - /// Create a new Rpu that wraps the given Bank. pub fn new1( bank: Bank, start_hash: Hash, @@ -38,40 +36,8 @@ impl Rpu { exit: Arc, writer: W, ) -> Self { - let mut rpu = Rpu { - bank: Arc::new(bank), - thread_hdls: vec![], - }; - let thread_hdls = rpu.serve( - start_hash, - tick_duration, - me, - requests_socket, - broadcast_socket, - respond_socket, - gossip, - exit, - writer, - ); - rpu.thread_hdls.extend(thread_hdls); - rpu - } + let bank = Arc::new(bank); - /// Create a UDP microservice that forwards messages the given Rpu. - /// This service is the network leader - /// Set `exit` to shutdown its threads. - pub fn serve( - &self, - start_hash: Hash, - tick_duration: Option, - me: ReplicatedData, - requests_socket: UdpSocket, - broadcast_socket: UdpSocket, - respond_socket: UdpSocket, - gossip: UdpSocket, - exit: Arc, - writer: W, - ) -> Vec> { let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( @@ -84,7 +50,7 @@ impl Rpu { let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let blob_recycler = packet::BlobRecycler::default(); - let request_processor = RequestProcessor::new(self.bank.clone()); + let request_processor = RequestProcessor::new(bank.clone()); let request_stage = RequestStage::new( request_processor, exit.clone(), @@ -97,7 +63,7 @@ impl Rpu { RecordStage::new(request_stage.signal_receiver, &start_hash, tick_duration); let write_stage = WriteStage::new( - self.bank.clone(), + bank.clone(), exit.clone(), blob_recycler.clone(), Mutex::new(writer), @@ -125,7 +91,7 @@ impl Rpu { request_stage.blob_receiver, ); - let mut threads = vec![ + let mut thread_hdls = vec![ t_receiver, t_responder, request_stage.thread_hdl, @@ -134,7 +100,8 @@ impl Rpu { t_listen, t_broadcast, ]; - threads.extend(sig_verify_stage.thread_hdls.into_iter()); - threads + thread_hdls.extend(sig_verify_stage.thread_hdls.into_iter()); + + Rpu { thread_hdls } } } diff --git a/src/tpu.rs b/src/tpu.rs index 94b8e0f256..9d8862424f 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -19,12 +19,10 @@ use streamer; use write_stage::WriteStage; pub struct Tpu { - bank: Arc, pub thread_hdls: Vec>, } impl Tpu { - /// Create a new Tpu that wraps the given Bank. pub fn new1( bank: Bank, start_hash: Hash, @@ -36,38 +34,8 @@ impl Tpu { exit: Arc, writer: W, ) -> Self { - let mut tpu = Tpu { - bank: Arc::new(bank), - thread_hdls: vec![], - }; - let thread_hdls = tpu.serve( - start_hash, - tick_duration, - me, - requests_socket, - broadcast_socket, - gossip, - exit, - writer, - ); - tpu.thread_hdls.extend(thread_hdls); - tpu - } + let bank = Arc::new(bank); - /// Create a UDP microservice that forwards messages the given Tpu. - /// This service is the network leader - /// Set `exit` to shutdown its threads. - pub fn serve( - &self, - start_hash: Hash, - tick_duration: Option, - me: ReplicatedData, - requests_socket: UdpSocket, - broadcast_socket: UdpSocket, - gossip: UdpSocket, - exit: Arc, - writer: W, - ) -> Vec> { let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( @@ -81,7 +49,7 @@ impl Tpu { let blob_recycler = packet::BlobRecycler::default(); let banking_stage = BankingStage::new( - self.bank.clone(), + bank.clone(), exit.clone(), sig_verify_stage.verified_receiver, packet_recycler.clone(), @@ -91,7 +59,7 @@ impl Tpu { RecordStage::new(banking_stage.signal_receiver, &start_hash, tick_duration); let write_stage = WriteStage::new( - self.bank.clone(), + bank.clone(), exit.clone(), blob_recycler.clone(), Mutex::new(writer), @@ -112,7 +80,7 @@ impl Tpu { write_stage.blob_receiver, ); - let mut threads = vec![ + let mut thread_hdls = vec![ t_receiver, banking_stage.thread_hdl, write_stage.thread_hdl, @@ -120,7 +88,7 @@ impl Tpu { t_listen, t_broadcast, ]; - threads.extend(sig_verify_stage.thread_hdls.into_iter()); - threads + thread_hdls.extend(sig_verify_stage.thread_hdls.into_iter()); + Tpu { thread_hdls } } }