Move RequestProcessor out of Rpu/Tvu state

This commit is contained in:
Greg Fitzgerald
2018-05-12 11:39:24 -06:00
parent 3d82807965
commit 1511dc43d7
3 changed files with 16 additions and 13 deletions

View File

@ -272,20 +272,23 @@ impl RequestProcessor {
pub struct RequestStage {
pub thread_hdl: JoinHandle<()>,
pub output: streamer::BlobReceiver,
pub request_processor: Arc<RequestProcessor>,
}
impl RequestStage {
pub fn new(
request_processor: Arc<RequestProcessor>,
request_processor: RequestProcessor,
accounting_stage: Arc<AccountingStage>,
exit: Arc<AtomicBool>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
packet_recycler: packet::PacketRecycler,
blob_recycler: packet::BlobRecycler,
) -> Self {
let request_processor = Arc::new(request_processor);
let request_processor_ = request_processor.clone();
let (responder_sender, output) = channel();
let thread_hdl = spawn(move || loop {
let e = request_processor.process_request_packets(
let e = request_processor_.process_request_packets(
&accounting_stage,
&verified_receiver,
&responder_sender,
@ -298,7 +301,11 @@ impl RequestStage {
}
}
});
RequestStage { thread_hdl, output }
RequestStage {
thread_hdl,
output,
request_processor,
}
}
}

View File

@ -18,16 +18,13 @@ use streamer;
pub struct Rpu {
accounting_stage: Arc<AccountingStage>,
request_processor: Arc<RequestProcessor>,
}
impl Rpu {
/// Create a new Rpu that wraps the given Accountant.
pub fn new(accounting_stage: AccountingStage) -> Self {
let request_processor = RequestProcessor::new(accounting_stage.accountant.clone());
Rpu {
accounting_stage: Arc::new(accounting_stage),
request_processor: Arc::new(request_processor),
}
}
@ -80,8 +77,9 @@ impl Rpu {
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
let blob_recycler = packet::BlobRecycler::default();
let request_processor = RequestProcessor::new(self.accounting_stage.accountant.clone());
let request_stage = RequestStage::new(
self.request_processor.clone(),
request_processor,
self.accounting_stage.clone(),
exit.clone(),
sig_verify_stage.output,
@ -92,7 +90,7 @@ impl Rpu {
let (broadcast_sender, broadcast_receiver) = channel();
let t_write = Self::write_service(
self.accounting_stage.clone(),
self.request_processor.clone(),
request_stage.request_processor.clone(),
exit.clone(),
broadcast_sender,
blob_recycler.clone(),

View File

@ -19,16 +19,13 @@ use streamer;
pub struct Tvu {
accounting_stage: Arc<AccountingStage>,
request_processor: Arc<RequestProcessor>,
}
impl Tvu {
/// Create a new Tvu that wraps the given Accountant.
pub fn new(accounting_stage: AccountingStage) -> Self {
let request_processor = RequestProcessor::new(accounting_stage.accountant.clone());
Tvu {
accounting_stage: Arc::new(accounting_stage),
request_processor: Arc::new(request_processor),
}
}
@ -170,8 +167,9 @@ impl Tvu {
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
let request_processor = RequestProcessor::new(obj.accounting_stage.accountant.clone());
let request_stage = RequestStage::new(
obj.request_processor.clone(),
request_processor,
obj.accounting_stage.clone(),
exit.clone(),
sig_verify_stage.output,
@ -181,7 +179,7 @@ impl Tvu {
let t_write = Self::drain_service(
obj.accounting_stage.clone(),
obj.request_processor.clone(),
request_stage.request_processor.clone(),
exit.clone(),
);