From a074cb78cd405861df251f75c5734490a3b141e9 Mon Sep 17 00:00:00 2001 From: Carl Date: Sat, 16 Feb 2019 00:48:02 -0800 Subject: [PATCH] Ensure leader services are closed before starting new ones --- src/tpu.rs | 116 +++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 82 insertions(+), 34 deletions(-) diff --git a/src/tpu.rs b/src/tpu.rs index 4cc0b29324..b3f00cead3 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -31,11 +31,11 @@ pub enum TpuMode { } pub struct LeaderServices { - fetch_stage: FetchStage, - sigverify_stage: SigVerifyStage, - banking_stage: BankingStage, - cluster_info_vote_listener: ClusterInfoVoteListener, - broadcast_service: BroadcastService, + fetch_stage: Option, + sigverify_stage: Option, + banking_stage: Option, + cluster_info_vote_listener: Option, + broadcast_service: Option, } impl LeaderServices { @@ -47,22 +47,60 @@ impl LeaderServices { broadcast_service: BroadcastService, ) -> Self { LeaderServices { - fetch_stage, - sigverify_stage, - banking_stage, - cluster_info_vote_listener, - broadcast_service, + fetch_stage: Some(fetch_stage), + sigverify_stage: Some(sigverify_stage), + banking_stage: Some(banking_stage), + cluster_info_vote_listener: Some(cluster_info_vote_listener), + broadcast_service: Some(broadcast_service), } } + + fn exit(&self) { + self.fetch_stage.as_ref().unwrap().close(); + } + + fn join(&mut self) -> thread::Result<()> { + let mut results = vec![]; + results.push(self.fetch_stage.take().unwrap().join()); + results.push(self.sigverify_stage.take().unwrap().join()); + results.push(self.cluster_info_vote_listener.take().unwrap().join()); + results.push(self.banking_stage.take().unwrap().join()); + let broadcast_result = self.broadcast_service.take().unwrap().join(); + for result in results { + result?; + } + let _ = broadcast_result?; + Ok(()) + } + + fn close(&mut self) -> thread::Result<()> { + self.exit(); + self.join() + } } pub struct ForwarderServices { - tpu_forwarder: TpuForwarder, + tpu_forwarder: Option, } impl ForwarderServices { fn new(tpu_forwarder: TpuForwarder) -> Self { - ForwarderServices { tpu_forwarder } + ForwarderServices { + tpu_forwarder: Some(tpu_forwarder), + } + } + + fn exit(&self) { + self.tpu_forwarder.as_ref().unwrap().close(); + } + + fn join(&mut self) -> thread::Result<()> { + self.tpu_forwarder.take().unwrap().join() + } + + fn close(&mut self) -> thread::Result<()> { + self.exit(); + self.join() } } @@ -83,13 +121,25 @@ impl Tpu { } } - fn mode_close(&self) { - match &self.tpu_mode { + fn mode_exit(&mut self) { + match &mut self.tpu_mode { Some(TpuMode::Leader(svcs)) => { - svcs.fetch_stage.close(); + svcs.exit(); } Some(TpuMode::Forwarder(svcs)) => { - svcs.tpu_forwarder.close(); + svcs.exit(); + } + None => (), + } + } + + fn mode_close(&mut self) { + match &mut self.tpu_mode { + Some(TpuMode::Leader(svcs)) => { + let _ = svcs.close(); + } + Some(TpuMode::Forwarder(svcs)) => { + let _ = svcs.close(); } None => (), } @@ -110,15 +160,19 @@ impl Tpu { } fn close_and_forward_unprocessed_packets(&mut self) { - self.mode_close(); + self.mode_exit(); let unprocessed_packets = match self.tpu_mode.take().as_mut() { - Some(TpuMode::Leader(svcs)) => { - svcs.banking_stage.join_and_collect_unprocessed_packets() - } - Some(TpuMode::Forwarder(svcs)) => { - svcs.tpu_forwarder.join_and_collect_unprocessed_packets() - } + Some(TpuMode::Leader(svcs)) => svcs + .banking_stage + .as_mut() + .unwrap() + .join_and_collect_unprocessed_packets(), + Some(TpuMode::Forwarder(svcs)) => svcs + .tpu_forwarder + .as_mut() + .unwrap() + .join_and_collect_unprocessed_packets(), None => vec![], }; @@ -129,6 +183,8 @@ impl Tpu { warn!("Failed to forward unprocessed transactions: {:?}", err) }); } + + self.mode_close(); } pub fn switch_to_forwarder(&mut self, leader_id: Pubkey, transactions_sockets: Vec) { @@ -223,7 +279,7 @@ impl Tpu { self.exit.load(Ordering::Relaxed) } - pub fn close(self) -> thread::Result<()> { + pub fn close(mut self) -> thread::Result<()> { self.mode_close(); self.join() } @@ -234,16 +290,8 @@ impl Service for Tpu { fn join(self) -> thread::Result<()> { match self.tpu_mode { - Some(TpuMode::Leader(svcs)) => { - svcs.broadcast_service.join()?; - svcs.fetch_stage.join()?; - svcs.sigverify_stage.join()?; - svcs.cluster_info_vote_listener.join()?; - svcs.banking_stage.join()?; - } - Some(TpuMode::Forwarder(svcs)) => { - svcs.tpu_forwarder.join()?; - } + Some(TpuMode::Leader(mut svcs)) => svcs.join()?, + Some(TpuMode::Forwarder(mut svcs)) => svcs.join()?, None => (), } Ok(())