Ensure leader services are closed before starting new ones

This commit is contained in:
Carl
2019-02-16 00:48:02 -08:00
committed by Greg Fitzgerald
parent 0dbc33f781
commit a074cb78cd

View File

@ -31,11 +31,11 @@ pub enum TpuMode {
} }
pub struct LeaderServices { pub struct LeaderServices {
fetch_stage: FetchStage, fetch_stage: Option<FetchStage>,
sigverify_stage: SigVerifyStage, sigverify_stage: Option<SigVerifyStage>,
banking_stage: BankingStage, banking_stage: Option<BankingStage>,
cluster_info_vote_listener: ClusterInfoVoteListener, cluster_info_vote_listener: Option<ClusterInfoVoteListener>,
broadcast_service: BroadcastService, broadcast_service: Option<BroadcastService>,
} }
impl LeaderServices { impl LeaderServices {
@ -47,22 +47,60 @@ impl LeaderServices {
broadcast_service: BroadcastService, broadcast_service: BroadcastService,
) -> Self { ) -> Self {
LeaderServices { LeaderServices {
fetch_stage, fetch_stage: Some(fetch_stage),
sigverify_stage, sigverify_stage: Some(sigverify_stage),
banking_stage, banking_stage: Some(banking_stage),
cluster_info_vote_listener, cluster_info_vote_listener: Some(cluster_info_vote_listener),
broadcast_service, broadcast_service: Some(broadcast_service),
} }
} }
fn exit(&self) {
self.fetch_stage.as_ref().unwrap().close();
}
fn join(&mut self) -> thread::Result<()> {
let mut results = vec![];
results.push(self.fetch_stage.take().unwrap().join());
results.push(self.sigverify_stage.take().unwrap().join());
results.push(self.cluster_info_vote_listener.take().unwrap().join());
results.push(self.banking_stage.take().unwrap().join());
let broadcast_result = self.broadcast_service.take().unwrap().join();
for result in results {
result?;
}
let _ = broadcast_result?;
Ok(())
}
fn close(&mut self) -> thread::Result<()> {
self.exit();
self.join()
}
} }
pub struct ForwarderServices { pub struct ForwarderServices {
tpu_forwarder: TpuForwarder, tpu_forwarder: Option<TpuForwarder>,
} }
impl ForwarderServices { impl ForwarderServices {
fn new(tpu_forwarder: TpuForwarder) -> Self { fn new(tpu_forwarder: TpuForwarder) -> Self {
ForwarderServices { tpu_forwarder } ForwarderServices {
tpu_forwarder: Some(tpu_forwarder),
}
}
fn exit(&self) {
self.tpu_forwarder.as_ref().unwrap().close();
}
fn join(&mut self) -> thread::Result<()> {
self.tpu_forwarder.take().unwrap().join()
}
fn close(&mut self) -> thread::Result<()> {
self.exit();
self.join()
} }
} }
@ -83,13 +121,25 @@ impl Tpu {
} }
} }
fn mode_close(&self) { fn mode_exit(&mut self) {
match &self.tpu_mode { match &mut self.tpu_mode {
Some(TpuMode::Leader(svcs)) => { Some(TpuMode::Leader(svcs)) => {
svcs.fetch_stage.close(); svcs.exit();
} }
Some(TpuMode::Forwarder(svcs)) => { Some(TpuMode::Forwarder(svcs)) => {
svcs.tpu_forwarder.close(); svcs.exit();
}
None => (),
}
}
fn mode_close(&mut self) {
match &mut self.tpu_mode {
Some(TpuMode::Leader(svcs)) => {
let _ = svcs.close();
}
Some(TpuMode::Forwarder(svcs)) => {
let _ = svcs.close();
} }
None => (), None => (),
} }
@ -110,15 +160,19 @@ impl Tpu {
} }
fn close_and_forward_unprocessed_packets(&mut self) { fn close_and_forward_unprocessed_packets(&mut self) {
self.mode_close(); self.mode_exit();
let unprocessed_packets = match self.tpu_mode.take().as_mut() { let unprocessed_packets = match self.tpu_mode.take().as_mut() {
Some(TpuMode::Leader(svcs)) => { Some(TpuMode::Leader(svcs)) => svcs
svcs.banking_stage.join_and_collect_unprocessed_packets() .banking_stage
} .as_mut()
Some(TpuMode::Forwarder(svcs)) => { .unwrap()
svcs.tpu_forwarder.join_and_collect_unprocessed_packets() .join_and_collect_unprocessed_packets(),
} Some(TpuMode::Forwarder(svcs)) => svcs
.tpu_forwarder
.as_mut()
.unwrap()
.join_and_collect_unprocessed_packets(),
None => vec![], None => vec![],
}; };
@ -129,6 +183,8 @@ impl Tpu {
warn!("Failed to forward unprocessed transactions: {:?}", err) warn!("Failed to forward unprocessed transactions: {:?}", err)
}); });
} }
self.mode_close();
} }
pub fn switch_to_forwarder(&mut self, leader_id: Pubkey, transactions_sockets: Vec<UdpSocket>) { pub fn switch_to_forwarder(&mut self, leader_id: Pubkey, transactions_sockets: Vec<UdpSocket>) {
@ -223,7 +279,7 @@ impl Tpu {
self.exit.load(Ordering::Relaxed) self.exit.load(Ordering::Relaxed)
} }
pub fn close(self) -> thread::Result<()> { pub fn close(mut self) -> thread::Result<()> {
self.mode_close(); self.mode_close();
self.join() self.join()
} }
@ -234,16 +290,8 @@ impl Service for Tpu {
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
match self.tpu_mode { match self.tpu_mode {
Some(TpuMode::Leader(svcs)) => { Some(TpuMode::Leader(mut svcs)) => svcs.join()?,
svcs.broadcast_service.join()?; Some(TpuMode::Forwarder(mut svcs)) => svcs.join()?,
svcs.fetch_stage.join()?;
svcs.sigverify_stage.join()?;
svcs.cluster_info_vote_listener.join()?;
svcs.banking_stage.join()?;
}
Some(TpuMode::Forwarder(svcs)) => {
svcs.tpu_forwarder.join()?;
}
None => (), None => (),
} }
Ok(()) Ok(())