Modify rpc_completed_slot_service to be non-blocking (#24007)
* timeout for validator exits * clippy * print backtrace when panic * add backtrace package * increase time out to 30s * debug logging * make rpc complete service non blocking * reduce log level * remove logging * recv_timeout * remove backtrace * remove sleep * remove unused variable * add comments * Update core/src/validator.rs Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com> * Update core/src/validator.rs Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com> * whitespace * more whitespace * fix build Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>
This commit is contained in:
@ -872,8 +872,11 @@ impl Validator {
|
|||||||
let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
|
let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
|
||||||
let (cluster_confirmed_slot_sender, cluster_confirmed_slot_receiver) = unbounded();
|
let (cluster_confirmed_slot_sender, cluster_confirmed_slot_receiver) = unbounded();
|
||||||
|
|
||||||
let rpc_completed_slots_service =
|
let rpc_completed_slots_service = RpcCompletedSlotsService::spawn(
|
||||||
RpcCompletedSlotsService::spawn(completed_slots_receiver, rpc_subscriptions.clone());
|
completed_slots_receiver,
|
||||||
|
rpc_subscriptions.clone(),
|
||||||
|
exit.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
let (replay_vote_sender, replay_vote_receiver) = unbounded();
|
let (replay_vote_sender, replay_vote_receiver) = unbounded();
|
||||||
let tvu = Tvu::new(
|
let tvu = Tvu::new(
|
||||||
@ -1806,9 +1809,10 @@ pub fn is_snapshot_config_valid(
|
|||||||
mod tests {
|
mod tests {
|
||||||
use {
|
use {
|
||||||
super::*,
|
super::*,
|
||||||
|
crossbeam_channel::{bounded, RecvTimeoutError},
|
||||||
solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader},
|
solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader},
|
||||||
solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
|
solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
|
||||||
std::fs::remove_dir_all,
|
std::{fs::remove_dir_all, thread, time::Duration},
|
||||||
};
|
};
|
||||||
|
|
||||||
fn validator_exit() {
|
fn validator_exit() {
|
||||||
@ -1926,12 +1930,22 @@ mod tests {
|
|||||||
|
|
||||||
// Each validator can exit in parallel to speed many sequential calls to join`
|
// Each validator can exit in parallel to speed many sequential calls to join`
|
||||||
validators.iter_mut().for_each(|v| v.exit());
|
validators.iter_mut().for_each(|v| v.exit());
|
||||||
// While join is called sequentially, the above exit call notified all the
|
|
||||||
// validators to exit from all their threads
|
// spawn a new thread to wait for the join of the validator
|
||||||
validators.into_iter().for_each(|validator| {
|
let (sender, receiver) = bounded(0);
|
||||||
validator.join();
|
let _ = thread::spawn(move || {
|
||||||
|
validators.into_iter().for_each(|validator| {
|
||||||
|
validator.join();
|
||||||
|
});
|
||||||
|
sender.send(()).unwrap();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// timeout of 30s for shutting down the validators
|
||||||
|
let timeout = Duration::from_secs(30);
|
||||||
|
if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
|
||||||
|
panic!("timeout for shutting down validators",);
|
||||||
|
}
|
||||||
|
|
||||||
for path in ledger_paths {
|
for path in ledger_paths {
|
||||||
remove_dir_all(path).unwrap();
|
remove_dir_all(path).unwrap();
|
||||||
}
|
}
|
||||||
|
@ -4,21 +4,35 @@ use {
|
|||||||
solana_ledger::blockstore::CompletedSlotsReceiver,
|
solana_ledger::blockstore::CompletedSlotsReceiver,
|
||||||
solana_sdk::timing::timestamp,
|
solana_sdk::timing::timestamp,
|
||||||
std::{
|
std::{
|
||||||
sync::Arc,
|
sync::{
|
||||||
|
atomic::{AtomicBool, Ordering},
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
thread::{Builder, JoinHandle},
|
thread::{Builder, JoinHandle},
|
||||||
|
time::Duration,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pub const COMPLETE_SLOT_REPORT_SLEEP_MS: u64 = 100;
|
||||||
|
|
||||||
pub struct RpcCompletedSlotsService;
|
pub struct RpcCompletedSlotsService;
|
||||||
impl RpcCompletedSlotsService {
|
impl RpcCompletedSlotsService {
|
||||||
pub fn spawn(
|
pub fn spawn(
|
||||||
completed_slots_receiver: CompletedSlotsReceiver,
|
completed_slots_receiver: CompletedSlotsReceiver,
|
||||||
rpc_subscriptions: Arc<RpcSubscriptions>,
|
rpc_subscriptions: Arc<RpcSubscriptions>,
|
||||||
|
exit: Arc<AtomicBool>,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name("solana-rpc-completed-slots-service".to_string())
|
.name("solana-rpc-completed-slots-service".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || loop {
|
||||||
for slots in completed_slots_receiver.iter() {
|
// shutdown the service
|
||||||
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Ok(slots) = completed_slots_receiver
|
||||||
|
.recv_timeout(Duration::from_millis(COMPLETE_SLOT_REPORT_SLEEP_MS))
|
||||||
|
{
|
||||||
for slot in slots {
|
for slot in slots {
|
||||||
rpc_subscriptions.notify_slot_update(SlotUpdate::Completed {
|
rpc_subscriptions.notify_slot_update(SlotUpdate::Completed {
|
||||||
slot,
|
slot,
|
||||||
|
Reference in New Issue
Block a user