From 2f2948f9987adde089fb82198d4910765bc65446 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 28 Sep 2021 18:25:51 +0000 Subject: [PATCH] Extricate RpcCompletedSlotsService from RetransmitStage (backport #18017) (#20294) * Extricate RpcCompletedSlotsService from RetransmitStage (cherry picked from commit fa04531c7a2175a1e61d7af3bf42117c5e33cd63) # Conflicts: # core/src/replay_stage.rs # core/src/retransmit_stage.rs # core/src/tvu.rs # core/src/validator.rs * removes backport merge conflicts Co-authored-by: Michael Vines Co-authored-by: behzad nouri --- core/src/retransmit_stage.rs | 22 ++++-------------- core/src/tvu.rs | 7 +----- core/src/validator.rs | 14 ++++++++++-- rpc/src/rpc_completed_slots_service.rs | 31 ++++++++++++-------------- 4 files changed, 31 insertions(+), 43 deletions(-) diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 0bbb179282..43f9388543 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -19,18 +19,12 @@ use { solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT}, solana_ledger::{ shred::Shred, - { - blockstore::{Blockstore, CompletedSlotsReceiver}, - leader_schedule_cache::LeaderScheduleCache, - }, + {blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache}, }, solana_measure::measure::Measure, solana_metrics::inc_new_counter_error, solana_perf::packet::Packets, - solana_rpc::{ - max_slots::MaxSlots, rpc_completed_slots_service::RpcCompletedSlotsService, - rpc_subscriptions::RpcSubscriptions, - }, + solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ clock::Slot, @@ -457,7 +451,6 @@ impl RetransmitStage { repair_socket: Arc, verified_receiver: Receiver>, exit: Arc, - rpc_completed_slots_receiver: CompletedSlotsReceiver, cluster_slots_update_receiver: ClusterSlotsUpdateReceiver, epoch_schedule: EpochSchedule, cfg: Option>, @@ -476,18 +469,16 @@ impl RetransmitStage { let _retransmit_sender = retransmit_sender.clone(); let retransmit_receiver = Arc::new(Mutex::new(retransmit_receiver)); - let t_retransmit = retransmitter( + let thread_hdls = retransmitter( retransmit_sockets, bank_forks.clone(), leader_schedule_cache.clone(), cluster_info.clone(), retransmit_receiver, max_slots, - rpc_subscriptions.clone(), + rpc_subscriptions, ); - let rpc_completed_slots_hdl = - RpcCompletedSlotsService::spawn(rpc_completed_slots_receiver, rpc_subscriptions); let cluster_slots_service = ClusterSlotsService::new( blockstore.clone(), cluster_slots.clone(), @@ -534,11 +525,6 @@ impl RetransmitStage { duplicate_slots_sender, ); - let mut thread_hdls = t_retransmit; - if let Some(thread_hdl) = rpc_completed_slots_hdl { - thread_hdls.push(thread_hdl); - } - Self { thread_hdls, window_service, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index d83c9051e8..b7487bc3d2 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -25,8 +25,7 @@ use crate::{ use crossbeam_channel::unbounded; use solana_gossip::cluster_info::ClusterInfo; use solana_ledger::{ - blockstore::{Blockstore, CompletedSlotsReceiver}, - blockstore_processor::TransactionStatusSender, + blockstore::Blockstore, blockstore_processor::TransactionStatusSender, leader_schedule_cache::LeaderScheduleCache, }; use solana_poh::poh_recorder::PohRecorder; @@ -115,7 +114,6 @@ impl Tvu { tower: Tower, leader_schedule_cache: &Arc, exit: &Arc, - completed_slots_receiver: CompletedSlotsReceiver, block_commitment_cache: Arc>, cfg: Option>, transaction_status_sender: Option, @@ -179,7 +177,6 @@ impl Tvu { repair_socket, verified_receiver, exit.clone(), - completed_slots_receiver, cluster_slots_update_receiver, *bank_forks.read().unwrap().working_bank().epoch_schedule(), cfg, @@ -393,7 +390,6 @@ pub mod tests { let BlockstoreSignals { blockstore, ledger_signal_receiver, - completed_slots_receiver, .. } = Blockstore::open_with_signal(&blockstore_path, None, true) .expect("Expected to successfully open ledger"); @@ -437,7 +433,6 @@ pub mod tests { tower, &leader_schedule_cache, &exit, - completed_slots_receiver, block_commitment_cache, None, None, diff --git a/core/src/validator.rs b/core/src/validator.rs index 95e641cb40..d2a8b31daf 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -48,6 +48,7 @@ use { OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker, }, rpc::JsonRpcConfig, + rpc_completed_slots_service::RpcCompletedSlotsService, rpc_pubsub_service::{PubSubConfig, PubSubService}, rpc_service::JsonRpcService, rpc_subscriptions::RpcSubscriptions, @@ -84,7 +85,7 @@ use { mpsc::Receiver, Arc, Mutex, RwLock, }, - thread::{sleep, Builder}, + thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, }, }; @@ -248,6 +249,7 @@ pub struct Validator { validator_exit: Arc>, json_rpc_service: Option, pubsub_service: Option, + rpc_completed_slots_service: JoinHandle<()>, optimistically_confirmed_bank_tracker: Option, transaction_status_service: Option, rewards_recorder_service: Option, @@ -681,6 +683,10 @@ impl Validator { let (verified_vote_sender, verified_vote_receiver) = unbounded(); let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded(); let (cluster_confirmed_slot_sender, cluster_confirmed_slot_receiver) = unbounded(); + + let rpc_completed_slots_service = + RpcCompletedSlotsService::spawn(completed_slots_receiver, rpc_subscriptions.clone()); + let tvu = Tvu::new( vote_account, authorized_voter_keypairs, @@ -718,7 +724,6 @@ impl Validator { tower, &leader_schedule_cache, &exit, - completed_slots_receiver, block_commitment_cache, config.enable_partition.clone(), transaction_status_sender.clone(), @@ -784,6 +789,7 @@ impl Validator { serve_repair_service, json_rpc_service, pubsub_service, + rpc_completed_slots_service, optimistically_confirmed_bank_tracker, transaction_status_service, rewards_recorder_service, @@ -847,6 +853,10 @@ impl Validator { pubsub_service.join().expect("pubsub_service"); } + self.rpc_completed_slots_service + .join() + .expect("rpc_completed_slots_service"); + if let Some(optimistically_confirmed_bank_tracker) = self.optimistically_confirmed_bank_tracker { diff --git a/rpc/src/rpc_completed_slots_service.rs b/rpc/src/rpc_completed_slots_service.rs index 83ccd28d2e..0f9d393e61 100644 --- a/rpc/src/rpc_completed_slots_service.rs +++ b/rpc/src/rpc_completed_slots_service.rs @@ -13,23 +13,20 @@ pub struct RpcCompletedSlotsService; impl RpcCompletedSlotsService { pub fn spawn( completed_slots_receiver: CompletedSlotsReceiver, - rpc_subscriptions: Option>, - ) -> Option> { - let rpc_subscriptions = rpc_subscriptions?; - Some( - Builder::new() - .name("solana-rpc-completed-slots-service".to_string()) - .spawn(move || { - for slots in completed_slots_receiver.iter() { - for slot in slots { - rpc_subscriptions.notify_slot_update(SlotUpdate::Completed { - slot, - timestamp: timestamp(), - }); - } + rpc_subscriptions: Arc, + ) -> JoinHandle<()> { + Builder::new() + .name("solana-rpc-completed-slots-service".to_string()) + .spawn(move || { + for slots in completed_slots_receiver.iter() { + for slot in slots { + rpc_subscriptions.notify_slot_update(SlotUpdate::Completed { + slot, + timestamp: timestamp(), + }); } - }) - .unwrap(), - ) + } + }) + .unwrap() } }