From 2376dfc139dea6ae70eac438c4e728b3f0879cb2 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 11 May 2018 23:45:57 -0600 Subject: [PATCH] Let thin client own the receiver channel --- src/thin_client_service.rs | 15 +++++---------- src/tpu.rs | 23 ++++++++++------------- 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/src/thin_client_service.rs b/src/thin_client_service.rs index 059d44c790..ef07bafddf 100644 --- a/src/thin_client_service.rs +++ b/src/thin_client_service.rs @@ -15,15 +15,14 @@ use signature::PublicKey; use std::collections::VecDeque; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, Mutex}; use std::thread::{spawn, JoinHandle}; -use transaction::Transaction; -//use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::mpsc::Receiver; use std::time::Duration; use std::time::Instant; use streamer; use timing; +use transaction::Transaction; #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] #[derive(Serialize, Deserialize, Debug, Clone)] @@ -62,8 +61,6 @@ pub enum Response { } pub struct RequestProcessor { - //pub output: Mutex>, - //response_sender: Mutex>, accountant: Arc, entry_info_subscribers: Mutex>, } @@ -71,10 +68,7 @@ pub struct RequestProcessor { impl RequestProcessor { /// Create a new Tpu that wraps the given Accountant. pub fn new(accountant: Arc) -> Self { - //let (response_sender, output) = channel(); RequestProcessor { - //output: Mutex::new(output), - //response_sender: Mutex::new(response_sender), accountant, entry_info_subscribers: Mutex::new(vec![]), } @@ -278,6 +272,7 @@ impl RequestProcessor { pub struct ThinClientService { pub thread_hdl: JoinHandle<()>, + pub output: streamer::BlobReceiver, } impl ThinClientService { @@ -286,10 +281,10 @@ impl ThinClientService { accounting_stage: Arc, exit: Arc, verified_receiver: Receiver)>>, - responder_sender: streamer::BlobSender, packet_recycler: packet::PacketRecycler, blob_recycler: packet::BlobRecycler, ) -> Self { + let (responder_sender, output) = channel(); let thread_hdl = spawn(move || loop { let e = request_processor.process_request_packets( &accounting_stage, @@ -304,7 +299,7 @@ impl ThinClientService { } } }); - ThinClientService { thread_hdl } + ThinClientService { thread_hdl, output } } } diff --git a/src/tpu.rs b/src/tpu.rs index f512148962..111ee34284 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -97,13 +97,11 @@ impl Tpu { let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let blob_recycler = packet::BlobRecycler::default(); - let (responder_sender, responder_receiver) = channel(); let thin_client_service = ThinClientService::new( obj.request_processor.clone(), obj.accounting_stage.clone(), exit.clone(), sig_verify_stage.output, - responder_sender, packet_recycler.clone(), blob_recycler.clone(), ); @@ -131,7 +129,7 @@ impl Tpu { respond_socket, exit.clone(), blob_recycler.clone(), - responder_receiver, + thin_client_service.output, ); let mut threads = vec![ @@ -265,28 +263,27 @@ impl Tpu { packet_recycler.clone(), packet_sender, )?; - let (responder_sender, responder_receiver) = channel(); - let t_responder = streamer::responder( - respond_socket, - exit.clone(), - blob_recycler.clone(), - responder_receiver, - ); let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); - let t_write = Self::drain_service(obj.clone(), exit.clone()); - let thin_client_service = ThinClientService::new( obj.request_processor.clone(), obj.accounting_stage.clone(), exit.clone(), sig_verify_stage.output, - responder_sender, packet_recycler.clone(), blob_recycler.clone(), ); + let t_write = Self::drain_service(obj.clone(), exit.clone()); + + let t_responder = streamer::responder( + respond_socket, + exit.clone(), + blob_recycler.clone(), + thin_client_service.output, + ); + let mut threads = vec![ //replicate threads t_blob_receiver,