From f0be595e4c164e3f75e5538587275a3b2c0264f0 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 11 May 2018 17:58:27 -0600 Subject: [PATCH] Create function for thin client thread --- src/tpu.rs | 77 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 43 insertions(+), 34 deletions(-) diff --git a/src/tpu.rs b/src/tpu.rs index 394282c566..7865e6b400 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -16,7 +16,7 @@ use std::io::Write; use std::io::sink; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Sender}; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; @@ -196,6 +196,30 @@ impl Tpu { Ok(()) } + fn thin_client_service( + obj: SharedTpu, + exit: Arc, + verified_receiver: Receiver)>>, + responder_sender: streamer::BlobSender, + packet_recycler: packet::PacketRecycler, + blob_recycler: packet::BlobRecycler, + ) -> JoinHandle<()> { + spawn(move || loop { + let e = obj.thin_client_service.process_request_packets( + &obj.accounting_stage, + &verified_receiver, + &responder_sender, + &packet_recycler, + &blob_recycler, + ); + if e.is_err() { + if exit.load(Ordering::Relaxed) { + break; + } + } + }) + } + /// Create a UDP microservice that forwards messages the given Tpu. /// This service is the network leader /// Set `exit` to shutdown its threads. @@ -270,26 +294,19 @@ impl Tpu { Mutex::new(writer), ); - let tpu = obj.clone(); - let t_server = spawn(move || loop { - let e = tpu.thin_client_service.process_request_packets( - &tpu.accounting_stage, - &verified_receiver, - &responder_sender, - &packet_recycler, - &blob_recycler, - ); - if e.is_err() { - if exit.load(Ordering::Relaxed) { - break; - } - } - }); + let t_thin_client = Self::thin_client_service( + obj.clone(), + exit.clone(), + verified_receiver, + responder_sender, + packet_recycler.clone(), + blob_recycler.clone(), + ); let mut threads = vec![ t_receiver, t_responder, - t_server, + t_thin_client, t_sync, t_gossip, t_listen, @@ -423,22 +440,14 @@ impl Tpu { } let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone()); - let tpu = obj.clone(); - let s_exit = exit.clone(); - let t_server = spawn(move || loop { - let e = tpu.thin_client_service.process_request_packets( - &tpu.accounting_stage, - &verified_receiver, - &responder_sender, - &packet_recycler, - &blob_recycler, - ); - if e.is_err() { - if s_exit.load(Ordering::Relaxed) { - break; - } - } - }); + let t_thin_client = Self::thin_client_service( + obj.clone(), + exit.clone(), + verified_receiver, + responder_sender, + packet_recycler.clone(), + blob_recycler.clone(), + ); let mut threads = vec![ //replicate threads @@ -451,7 +460,7 @@ impl Tpu { //serve threads t_packet_receiver, t_responder, - t_server, + t_thin_client, t_sync, ]; threads.extend(verify_threads.into_iter());