From 1511dc43d7eb35174235ce8ba4cef1e827c7c5d7 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sat, 12 May 2018 11:39:24 -0600 Subject: [PATCH] Move RequestProcessor out of Rpu/Tvu state --- src/request_stage.rs | 13 ++++++++++--- src/rpu.rs | 8 +++----- src/tvu.rs | 8 +++----- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/src/request_stage.rs b/src/request_stage.rs index 92ea3fc53e..30e6dae528 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -272,20 +272,23 @@ impl RequestProcessor { pub struct RequestStage { pub thread_hdl: JoinHandle<()>, pub output: streamer::BlobReceiver, + pub request_processor: Arc, } impl RequestStage { pub fn new( - request_processor: Arc, + request_processor: RequestProcessor, accounting_stage: Arc, exit: Arc, verified_receiver: Receiver)>>, packet_recycler: packet::PacketRecycler, blob_recycler: packet::BlobRecycler, ) -> Self { + let request_processor = Arc::new(request_processor); + let request_processor_ = request_processor.clone(); let (responder_sender, output) = channel(); let thread_hdl = spawn(move || loop { - let e = request_processor.process_request_packets( + let e = request_processor_.process_request_packets( &accounting_stage, &verified_receiver, &responder_sender, @@ -298,7 +301,11 @@ impl RequestStage { } } }); - RequestStage { thread_hdl, output } + RequestStage { + thread_hdl, + output, + request_processor, + } } } diff --git a/src/rpu.rs b/src/rpu.rs index 36dd636964..5e5bcb0379 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -18,16 +18,13 @@ use streamer; pub struct Rpu { accounting_stage: Arc, - request_processor: Arc, } impl Rpu { /// Create a new Rpu that wraps the given Accountant. pub fn new(accounting_stage: AccountingStage) -> Self { - let request_processor = RequestProcessor::new(accounting_stage.accountant.clone()); Rpu { accounting_stage: Arc::new(accounting_stage), - request_processor: Arc::new(request_processor), } } @@ -80,8 +77,9 @@ impl Rpu { let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let blob_recycler = packet::BlobRecycler::default(); + let request_processor = RequestProcessor::new(self.accounting_stage.accountant.clone()); let request_stage = RequestStage::new( - self.request_processor.clone(), + request_processor, self.accounting_stage.clone(), exit.clone(), sig_verify_stage.output, @@ -92,7 +90,7 @@ impl Rpu { let (broadcast_sender, broadcast_receiver) = channel(); let t_write = Self::write_service( self.accounting_stage.clone(), - self.request_processor.clone(), + request_stage.request_processor.clone(), exit.clone(), broadcast_sender, blob_recycler.clone(), diff --git a/src/tvu.rs b/src/tvu.rs index b4feb477bb..b085494cbc 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -19,16 +19,13 @@ use streamer; pub struct Tvu { accounting_stage: Arc, - request_processor: Arc, } impl Tvu { /// Create a new Tvu that wraps the given Accountant. pub fn new(accounting_stage: AccountingStage) -> Self { - let request_processor = RequestProcessor::new(accounting_stage.accountant.clone()); Tvu { accounting_stage: Arc::new(accounting_stage), - request_processor: Arc::new(request_processor), } } @@ -170,8 +167,9 @@ impl Tvu { let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); + let request_processor = RequestProcessor::new(obj.accounting_stage.accountant.clone()); let request_stage = RequestStage::new( - obj.request_processor.clone(), + request_processor, obj.accounting_stage.clone(), exit.clone(), sig_verify_stage.output, @@ -181,7 +179,7 @@ impl Tvu { let t_write = Self::drain_service( obj.accounting_stage.clone(), - obj.request_processor.clone(), + request_stage.request_processor.clone(), exit.clone(), );