* Extricate RpcCompletedSlotsService from RetransmitStage
(cherry picked from commit fa04531c7a
)
# Conflicts:
# core/src/replay_stage.rs
# core/src/retransmit_stage.rs
# core/src/tvu.rs
# core/src/validator.rs
* removes backport merge conflicts
Co-authored-by: Michael Vines <mvines@gmail.com>
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
@ -19,18 +19,12 @@ use {
|
||||
solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT},
|
||||
solana_ledger::{
|
||||
shred::Shred,
|
||||
{
|
||||
blockstore::{Blockstore, CompletedSlotsReceiver},
|
||||
leader_schedule_cache::LeaderScheduleCache,
|
||||
},
|
||||
{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
|
||||
},
|
||||
solana_measure::measure::Measure,
|
||||
solana_metrics::inc_new_counter_error,
|
||||
solana_perf::packet::Packets,
|
||||
solana_rpc::{
|
||||
max_slots::MaxSlots, rpc_completed_slots_service::RpcCompletedSlotsService,
|
||||
rpc_subscriptions::RpcSubscriptions,
|
||||
},
|
||||
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
|
||||
solana_runtime::{bank::Bank, bank_forks::BankForks},
|
||||
solana_sdk::{
|
||||
clock::Slot,
|
||||
@ -457,7 +451,6 @@ impl RetransmitStage {
|
||||
repair_socket: Arc<UdpSocket>,
|
||||
verified_receiver: Receiver<Vec<Packets>>,
|
||||
exit: Arc<AtomicBool>,
|
||||
rpc_completed_slots_receiver: CompletedSlotsReceiver,
|
||||
cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
|
||||
epoch_schedule: EpochSchedule,
|
||||
cfg: Option<Arc<AtomicBool>>,
|
||||
@ -476,18 +469,16 @@ impl RetransmitStage {
|
||||
let _retransmit_sender = retransmit_sender.clone();
|
||||
|
||||
let retransmit_receiver = Arc::new(Mutex::new(retransmit_receiver));
|
||||
let t_retransmit = retransmitter(
|
||||
let thread_hdls = retransmitter(
|
||||
retransmit_sockets,
|
||||
bank_forks.clone(),
|
||||
leader_schedule_cache.clone(),
|
||||
cluster_info.clone(),
|
||||
retransmit_receiver,
|
||||
max_slots,
|
||||
rpc_subscriptions.clone(),
|
||||
rpc_subscriptions,
|
||||
);
|
||||
|
||||
let rpc_completed_slots_hdl =
|
||||
RpcCompletedSlotsService::spawn(rpc_completed_slots_receiver, rpc_subscriptions);
|
||||
let cluster_slots_service = ClusterSlotsService::new(
|
||||
blockstore.clone(),
|
||||
cluster_slots.clone(),
|
||||
@ -534,11 +525,6 @@ impl RetransmitStage {
|
||||
duplicate_slots_sender,
|
||||
);
|
||||
|
||||
let mut thread_hdls = t_retransmit;
|
||||
if let Some(thread_hdl) = rpc_completed_slots_hdl {
|
||||
thread_hdls.push(thread_hdl);
|
||||
}
|
||||
|
||||
Self {
|
||||
thread_hdls,
|
||||
window_service,
|
||||
|
@ -25,8 +25,7 @@ use crate::{
|
||||
use crossbeam_channel::unbounded;
|
||||
use solana_gossip::cluster_info::ClusterInfo;
|
||||
use solana_ledger::{
|
||||
blockstore::{Blockstore, CompletedSlotsReceiver},
|
||||
blockstore_processor::TransactionStatusSender,
|
||||
blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
|
||||
leader_schedule_cache::LeaderScheduleCache,
|
||||
};
|
||||
use solana_poh::poh_recorder::PohRecorder;
|
||||
@ -115,7 +114,6 @@ impl Tvu {
|
||||
tower: Tower,
|
||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
completed_slots_receiver: CompletedSlotsReceiver,
|
||||
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
||||
cfg: Option<Arc<AtomicBool>>,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
@ -179,7 +177,6 @@ impl Tvu {
|
||||
repair_socket,
|
||||
verified_receiver,
|
||||
exit.clone(),
|
||||
completed_slots_receiver,
|
||||
cluster_slots_update_receiver,
|
||||
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
|
||||
cfg,
|
||||
@ -393,7 +390,6 @@ pub mod tests {
|
||||
let BlockstoreSignals {
|
||||
blockstore,
|
||||
ledger_signal_receiver,
|
||||
completed_slots_receiver,
|
||||
..
|
||||
} = Blockstore::open_with_signal(&blockstore_path, None, true)
|
||||
.expect("Expected to successfully open ledger");
|
||||
@ -437,7 +433,6 @@ pub mod tests {
|
||||
tower,
|
||||
&leader_schedule_cache,
|
||||
&exit,
|
||||
completed_slots_receiver,
|
||||
block_commitment_cache,
|
||||
None,
|
||||
None,
|
||||
|
@ -48,6 +48,7 @@ use {
|
||||
OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
|
||||
},
|
||||
rpc::JsonRpcConfig,
|
||||
rpc_completed_slots_service::RpcCompletedSlotsService,
|
||||
rpc_pubsub_service::{PubSubConfig, PubSubService},
|
||||
rpc_service::JsonRpcService,
|
||||
rpc_subscriptions::RpcSubscriptions,
|
||||
@ -84,7 +85,7 @@ use {
|
||||
mpsc::Receiver,
|
||||
Arc, Mutex, RwLock,
|
||||
},
|
||||
thread::{sleep, Builder},
|
||||
thread::{sleep, Builder, JoinHandle},
|
||||
time::{Duration, Instant},
|
||||
},
|
||||
};
|
||||
@ -248,6 +249,7 @@ pub struct Validator {
|
||||
validator_exit: Arc<RwLock<Exit>>,
|
||||
json_rpc_service: Option<JsonRpcService>,
|
||||
pubsub_service: Option<PubSubService>,
|
||||
rpc_completed_slots_service: JoinHandle<()>,
|
||||
optimistically_confirmed_bank_tracker: Option<OptimisticallyConfirmedBankTracker>,
|
||||
transaction_status_service: Option<TransactionStatusService>,
|
||||
rewards_recorder_service: Option<RewardsRecorderService>,
|
||||
@ -681,6 +683,10 @@ impl Validator {
|
||||
let (verified_vote_sender, verified_vote_receiver) = unbounded();
|
||||
let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
|
||||
let (cluster_confirmed_slot_sender, cluster_confirmed_slot_receiver) = unbounded();
|
||||
|
||||
let rpc_completed_slots_service =
|
||||
RpcCompletedSlotsService::spawn(completed_slots_receiver, rpc_subscriptions.clone());
|
||||
|
||||
let tvu = Tvu::new(
|
||||
vote_account,
|
||||
authorized_voter_keypairs,
|
||||
@ -718,7 +724,6 @@ impl Validator {
|
||||
tower,
|
||||
&leader_schedule_cache,
|
||||
&exit,
|
||||
completed_slots_receiver,
|
||||
block_commitment_cache,
|
||||
config.enable_partition.clone(),
|
||||
transaction_status_sender.clone(),
|
||||
@ -784,6 +789,7 @@ impl Validator {
|
||||
serve_repair_service,
|
||||
json_rpc_service,
|
||||
pubsub_service,
|
||||
rpc_completed_slots_service,
|
||||
optimistically_confirmed_bank_tracker,
|
||||
transaction_status_service,
|
||||
rewards_recorder_service,
|
||||
@ -847,6 +853,10 @@ impl Validator {
|
||||
pubsub_service.join().expect("pubsub_service");
|
||||
}
|
||||
|
||||
self.rpc_completed_slots_service
|
||||
.join()
|
||||
.expect("rpc_completed_slots_service");
|
||||
|
||||
if let Some(optimistically_confirmed_bank_tracker) =
|
||||
self.optimistically_confirmed_bank_tracker
|
||||
{
|
||||
|
@ -13,23 +13,20 @@ pub struct RpcCompletedSlotsService;
|
||||
impl RpcCompletedSlotsService {
|
||||
pub fn spawn(
|
||||
completed_slots_receiver: CompletedSlotsReceiver,
|
||||
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
|
||||
) -> Option<JoinHandle<()>> {
|
||||
let rpc_subscriptions = rpc_subscriptions?;
|
||||
Some(
|
||||
Builder::new()
|
||||
.name("solana-rpc-completed-slots-service".to_string())
|
||||
.spawn(move || {
|
||||
for slots in completed_slots_receiver.iter() {
|
||||
for slot in slots {
|
||||
rpc_subscriptions.notify_slot_update(SlotUpdate::Completed {
|
||||
slot,
|
||||
timestamp: timestamp(),
|
||||
});
|
||||
}
|
||||
rpc_subscriptions: Arc<RpcSubscriptions>,
|
||||
) -> JoinHandle<()> {
|
||||
Builder::new()
|
||||
.name("solana-rpc-completed-slots-service".to_string())
|
||||
.spawn(move || {
|
||||
for slots in completed_slots_receiver.iter() {
|
||||
for slot in slots {
|
||||
rpc_subscriptions.notify_slot_update(SlotUpdate::Completed {
|
||||
slot,
|
||||
timestamp: timestamp(),
|
||||
});
|
||||
}
|
||||
})
|
||||
.unwrap(),
|
||||
)
|
||||
}
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user