Batch joins on entire tpumode struct instead of individual services

This commit is contained in:
Carl
2019-02-16 18:03:55 -08:00
committed by Greg Fitzgerald
parent a074cb78cd
commit 4e3d71c2c9
2 changed files with 45 additions and 47 deletions

View File

@ -31,11 +31,11 @@ pub enum TpuMode {
} }
pub struct LeaderServices { pub struct LeaderServices {
fetch_stage: Option<FetchStage>, fetch_stage: FetchStage,
sigverify_stage: Option<SigVerifyStage>, sigverify_stage: SigVerifyStage,
banking_stage: Option<BankingStage>, banking_stage: BankingStage,
cluster_info_vote_listener: Option<ClusterInfoVoteListener>, cluster_info_vote_listener: ClusterInfoVoteListener,
broadcast_service: Option<BroadcastService>, broadcast_service: BroadcastService,
} }
impl LeaderServices { impl LeaderServices {
@ -47,25 +47,25 @@ impl LeaderServices {
broadcast_service: BroadcastService, broadcast_service: BroadcastService,
) -> Self { ) -> Self {
LeaderServices { LeaderServices {
fetch_stage: Some(fetch_stage), fetch_stage,
sigverify_stage: Some(sigverify_stage), sigverify_stage,
banking_stage: Some(banking_stage), banking_stage,
cluster_info_vote_listener: Some(cluster_info_vote_listener), cluster_info_vote_listener,
broadcast_service: Some(broadcast_service), broadcast_service,
} }
} }
fn exit(&self) { fn exit(&self) {
self.fetch_stage.as_ref().unwrap().close(); self.fetch_stage.close();
} }
fn join(&mut self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
let mut results = vec![]; let mut results = vec![];
results.push(self.fetch_stage.take().unwrap().join()); results.push(self.fetch_stage.join());
results.push(self.sigverify_stage.take().unwrap().join()); results.push(self.sigverify_stage.join());
results.push(self.cluster_info_vote_listener.take().unwrap().join()); results.push(self.cluster_info_vote_listener.join());
results.push(self.banking_stage.take().unwrap().join()); results.push(self.banking_stage.join());
let broadcast_result = self.broadcast_service.take().unwrap().join(); let broadcast_result = self.broadcast_service.join();
for result in results { for result in results {
result?; result?;
} }
@ -73,32 +73,30 @@ impl LeaderServices {
Ok(()) Ok(())
} }
fn close(&mut self) -> thread::Result<()> { fn close(self) -> thread::Result<()> {
self.exit(); self.exit();
self.join() self.join()
} }
} }
pub struct ForwarderServices { pub struct ForwarderServices {
tpu_forwarder: Option<TpuForwarder>, tpu_forwarder: TpuForwarder,
} }
impl ForwarderServices { impl ForwarderServices {
fn new(tpu_forwarder: TpuForwarder) -> Self { fn new(tpu_forwarder: TpuForwarder) -> Self {
ForwarderServices { ForwarderServices { tpu_forwarder }
tpu_forwarder: Some(tpu_forwarder),
}
} }
fn exit(&self) { fn exit(&self) {
self.tpu_forwarder.as_ref().unwrap().close(); self.tpu_forwarder.close();
} }
fn join(&mut self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
self.tpu_forwarder.take().unwrap().join() self.tpu_forwarder.join()
} }
fn close(&mut self) -> thread::Result<()> { fn close(self) -> thread::Result<()> {
self.exit(); self.exit();
self.join() self.join()
} }
@ -134,14 +132,16 @@ impl Tpu {
} }
fn mode_close(&mut self) { fn mode_close(&mut self) {
match &mut self.tpu_mode { let tpu_mode = self.tpu_mode.take();
Some(TpuMode::Leader(svcs)) => { if let Some(tpu_mode) = tpu_mode {
let _ = svcs.close(); match tpu_mode {
TpuMode::Leader(svcs) => {
let _ = svcs.close();
}
TpuMode::Forwarder(svcs) => {
let _ = svcs.close();
}
} }
Some(TpuMode::Forwarder(svcs)) => {
let _ = svcs.close();
}
None => (),
} }
} }
@ -162,17 +162,13 @@ impl Tpu {
fn close_and_forward_unprocessed_packets(&mut self) { fn close_and_forward_unprocessed_packets(&mut self) {
self.mode_exit(); self.mode_exit();
let unprocessed_packets = match self.tpu_mode.take().as_mut() { let unprocessed_packets = match self.tpu_mode.as_mut() {
Some(TpuMode::Leader(svcs)) => svcs Some(TpuMode::Leader(svcs)) => {
.banking_stage svcs.banking_stage.join_and_collect_unprocessed_packets()
.as_mut() }
.unwrap() Some(TpuMode::Forwarder(svcs)) => {
.join_and_collect_unprocessed_packets(), svcs.tpu_forwarder.join_and_collect_unprocessed_packets()
Some(TpuMode::Forwarder(svcs)) => svcs }
.tpu_forwarder
.as_mut()
.unwrap()
.join_and_collect_unprocessed_packets(),
None => vec![], None => vec![],
}; };
@ -290,8 +286,8 @@ impl Service for Tpu {
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
match self.tpu_mode { match self.tpu_mode {
Some(TpuMode::Leader(mut svcs)) => svcs.join()?, Some(TpuMode::Leader(svcs)) => svcs.join()?,
Some(TpuMode::Forwarder(mut svcs)) => svcs.join()?, Some(TpuMode::Forwarder(svcs)) => svcs.join()?,
None => (), None => (),
} }
Ok(()) Ok(())

View File

@ -129,7 +129,9 @@ impl Service for TpuForwarder {
for thread_hdl in self.thread_hdls { for thread_hdl in self.thread_hdls {
thread_hdl.join()?; thread_hdl.join()?;
} }
self.forwarder_thread.unwrap().join()?; if let Some(forwarder_thread) = self.forwarder_thread {
forwarder_thread.join()?;
}
Ok(()) Ok(())
} }
} }