diff --git a/core/src/cluster_slots_service.rs b/core/src/cluster_slots_service.rs index 1f059521b6..5b455fa5f1 100644 --- a/core/src/cluster_slots_service.rs +++ b/core/src/cluster_slots_service.rs @@ -1,29 +1,32 @@ use crate::cluster_slots::ClusterSlots; +use crossbeam_channel::{Receiver, RecvTimeoutError, Sender}; use solana_gossip::cluster_info::ClusterInfo; -use solana_ledger::blockstore::{Blockstore, CompletedSlotsReceiver}; +use solana_ledger::blockstore::Blockstore; use solana_measure::measure::Measure; use solana_runtime::bank_forks::BankForks; use solana_sdk::{clock::Slot, pubkey::Pubkey}; use std::{ sync::{ atomic::{AtomicBool, Ordering}, - mpsc::RecvTimeoutError, {Arc, RwLock}, }, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, }; +pub type ClusterSlotsUpdateReceiver = Receiver>; +pub type ClusterSlotsUpdateSender = Sender>; + #[derive(Default, Debug)] struct ClusterSlotsServiceTiming { pub lowest_slot_elapsed: u64, - pub update_completed_slots_elapsed: u64, + pub process_cluster_slots_updates_elapsed: u64, } impl ClusterSlotsServiceTiming { - fn update(&mut self, lowest_slot_elapsed: u64, update_completed_slots_elapsed: u64) { + fn update(&mut self, lowest_slot_elapsed: u64, process_cluster_slots_updates_elapsed: u64) { self.lowest_slot_elapsed += lowest_slot_elapsed; - self.update_completed_slots_elapsed += update_completed_slots_elapsed; + self.process_cluster_slots_updates_elapsed += process_cluster_slots_updates_elapsed; } } @@ -37,12 +40,12 @@ impl ClusterSlotsService { cluster_slots: Arc, bank_forks: Arc>, cluster_info: Arc, - completed_slots_receiver: CompletedSlotsReceiver, + cluster_slots_update_receiver: ClusterSlotsUpdateReceiver, exit: Arc, ) -> Self { let id = cluster_info.id(); Self::initialize_lowest_slot(id, &blockstore, &cluster_info); - Self::initialize_epoch_slots(&blockstore, &cluster_info, &completed_slots_receiver); + Self::initialize_epoch_slots(&bank_forks, &cluster_info); let t_cluster_slots_service = Builder::new() .name("solana-cluster-slots-service".to_string()) .spawn(move || { @@ -51,7 +54,7 @@ impl ClusterSlotsService { cluster_slots, bank_forks, cluster_info, - completed_slots_receiver, + cluster_slots_update_receiver, exit, ) }) @@ -71,7 +74,7 @@ impl ClusterSlotsService { cluster_slots: Arc, bank_forks: Arc>, cluster_info: Arc, - completed_slots_receiver: CompletedSlotsReceiver, + cluster_slots_update_receiver: ClusterSlotsUpdateReceiver, exit: Arc, ) { let mut cluster_slots_service_timing = ClusterSlotsServiceTiming::default(); @@ -80,7 +83,8 @@ impl ClusterSlotsService { if exit.load(Ordering::Relaxed) { break; } - let slots = match completed_slots_receiver.recv_timeout(Duration::from_millis(200)) { + let slots = match cluster_slots_update_receiver.recv_timeout(Duration::from_millis(200)) + { Ok(slots) => Some(slots), Err(RecvTimeoutError::Timeout) => None, Err(RecvTimeoutError::Disconnected) => { @@ -94,17 +98,21 @@ impl ClusterSlotsService { let lowest_slot = blockstore.lowest_slot(); Self::update_lowest_slot(&id, lowest_slot, &cluster_info); lowest_slot_elapsed.stop(); - let mut update_completed_slots_elapsed = - Measure::start("update_completed_slots_elapsed"); + let mut process_cluster_slots_updates_elapsed = + Measure::start("process_cluster_slots_updates_elapsed"); if let Some(slots) = slots { - Self::update_completed_slots(slots, &completed_slots_receiver, &cluster_info); + Self::process_cluster_slots_updates( + slots, + &cluster_slots_update_receiver, + &cluster_info, + ); } cluster_slots.update(new_root, &cluster_info, &bank_forks); - update_completed_slots_elapsed.stop(); + process_cluster_slots_updates_elapsed.stop(); cluster_slots_service_timing.update( lowest_slot_elapsed.as_us(), - update_completed_slots_elapsed.as_us(), + process_cluster_slots_updates_elapsed.as_us(), ); if last_stats.elapsed().as_secs() > 2 { @@ -116,8 +124,8 @@ impl ClusterSlotsService { i64 ), ( - "update_completed_slots_elapsed", - cluster_slots_service_timing.update_completed_slots_elapsed, + "process_cluster_slots_updates_elapsed", + cluster_slots_service_timing.process_cluster_slots_updates_elapsed, i64 ), ); @@ -127,12 +135,12 @@ impl ClusterSlotsService { } } - fn update_completed_slots( + fn process_cluster_slots_updates( mut slots: Vec, - completed_slots_receiver: &CompletedSlotsReceiver, + cluster_slots_update_receiver: &ClusterSlotsUpdateReceiver, cluster_info: &ClusterInfo, ) { - while let Ok(mut more) = completed_slots_receiver.try_recv() { + while let Ok(mut more) = cluster_slots_update_receiver.try_recv() { slots.append(&mut more); } #[allow(clippy::stable_sort_primitive)] @@ -155,30 +163,16 @@ impl ClusterSlotsService { cluster_info.push_lowest_slot(*id, lowest_slot); } - fn initialize_epoch_slots( - blockstore: &Blockstore, - cluster_info: &ClusterInfo, - completed_slots_receiver: &CompletedSlotsReceiver, - ) { - let root = blockstore.last_root(); - let mut slots: Vec<_> = blockstore - .live_slots_iterator(root) - .filter_map(|(slot, slot_meta)| { - if slot_meta.is_full() { - Some(slot) - } else { - None - } - }) - .collect(); + fn initialize_epoch_slots(bank_forks: &RwLock, cluster_info: &ClusterInfo) { + // TODO: Should probably incorporate slots that were replayed on startup, + // and maybe some that were frozen < snapshot root in case validators restart + // from newer snapshots and lose history. + let frozen_banks = bank_forks.read().unwrap().frozen_banks(); + let mut frozen_bank_slots: Vec = frozen_banks.keys().cloned().collect(); + frozen_bank_slots.sort_unstable(); - while let Ok(mut more) = completed_slots_receiver.try_recv() { - slots.append(&mut more); - } - slots.sort_unstable(); - slots.dedup(); - if !slots.is_empty() { - cluster_info.push_epoch_slots(&slots); + if !frozen_bank_slots.is_empty() { + cluster_info.push_epoch_slots(&frozen_bank_slots); } } } diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index a916f62b23..ceb0b2c65f 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -5,6 +5,7 @@ use crate::{ cluster_slots::ClusterSlots, outstanding_requests::OutstandingRequests, repair_weight::RepairWeight, + replay_stage::DUPLICATE_THRESHOLD, result::Result, serve_repair::{RepairType, ServeRepair}, }; @@ -15,9 +16,7 @@ use solana_ledger::{ shred::Nonce, }; use solana_measure::measure::Measure; -use solana_runtime::{ - bank::Bank, bank_forks::BankForks, commitment::VOTE_THRESHOLD_SIZE, contains::Contains, -}; +use solana_runtime::{bank::Bank, bank_forks::BankForks, contains::Contains}; use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}; use std::{ collections::{HashMap, HashSet}, @@ -33,6 +32,8 @@ use std::{ pub type DuplicateSlotsResetSender = CrossbeamSender; pub type DuplicateSlotsResetReceiver = CrossbeamReceiver; +pub type ConfirmedSlotsSender = CrossbeamSender>; +pub type ConfirmedSlotsReceiver = CrossbeamReceiver>; pub type OutstandingRepairs = OutstandingRequests; @@ -569,7 +570,7 @@ impl RepairService { ) { for slot in new_duplicate_slots { warn!( - "Cluster completed slot: {}, dumping our current version and repairing", + "Cluster confirmed slot: {}, dumping our current version and repairing", slot ); // Clear the slot signatures from status cache for this slot @@ -641,7 +642,7 @@ impl RepairService { }) .sum(); if total_completed_slot_stake as f64 / total_stake as f64 - > VOTE_THRESHOLD_SIZE + > DUPLICATE_THRESHOLD { Some(dead_slot) } else { @@ -1059,7 +1060,7 @@ mod test { let serve_repair = ServeRepair::new(cluster_info.clone()); let valid_repair_peer = Node::new_localhost().info; - // Signal that this peer has completed the dead slot, and is thus + // Signal that this peer has confirmed the dead slot, and is thus // a valid target for repair let dead_slot = 9; let cluster_slots = ClusterSlots::default(); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 401434f2b2..d0abec248e 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -8,6 +8,7 @@ use crate::{ }, cluster_slot_state_verifier::*, cluster_slots::ClusterSlots, + cluster_slots_service::ClusterSlotsUpdateSender, commitment_service::{AggregateCommitmentService, CommitmentAggregationData}, consensus::{ ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes, SWITCH_FORK_THRESHOLD, @@ -292,6 +293,7 @@ impl ReplayStage { replay_vote_sender: ReplayVoteSender, gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver, gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver, + cluster_slots_update_sender: ClusterSlotsUpdateSender, ) -> Self { let ReplayStageConfig { my_pubkey, @@ -387,6 +389,7 @@ impl ReplayStage { &descendants, &mut unfrozen_gossip_verified_vote_hashes, &mut latest_validator_votes_for_frozen_banks, + &cluster_slots_update_sender, ); replay_active_banks_time.stop(); @@ -1156,9 +1159,8 @@ impl ReplayStage { // Signal retransmit if Self::should_retransmit(poh_slot, &mut skipped_slots_info.last_retransmit_slot) { datapoint_info!("replay_stage-retransmit", ("slot", bank.slot(), i64),); - retransmit_slots_sender - .send(vec![(bank.slot(), bank.clone())].into_iter().collect()) - .unwrap(); + let _ = retransmit_slots_sender + .send(vec![(bank.slot(), bank.clone())].into_iter().collect()); } return; } @@ -1638,6 +1640,7 @@ impl ReplayStage { descendants: &HashMap>, unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes, latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks, + cluster_slots_update_sender: &ClusterSlotsUpdateSender, ) -> bool { let mut did_complete_bank = false; let mut tx_count = 0; @@ -1725,6 +1728,7 @@ impl ReplayStage { ); did_complete_bank = true; info!("bank frozen: {}", bank.slot()); + let _ = cluster_slots_update_sender.send(vec![*bank_slot]); if let Some(transaction_status_sender) = transaction_status_sender { transaction_status_sender.send_transaction_status_freeze_message(&bank); } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 3ebfaf56a1..647c829dea 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -4,10 +4,9 @@ use crate::{ cluster_info_vote_listener::VerifiedVoteReceiver, cluster_slots::ClusterSlots, - cluster_slots_service::ClusterSlotsService, + cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver}, completed_data_sets_service::CompletedDataSetsSender, - repair_service::DuplicateSlotsResetSender, - repair_service::RepairInfo, + repair_service::{DuplicateSlotsResetSender, RepairInfo}, result::{Error, Result}, window_service::{should_retransmit_and_persist, WindowService}, }; @@ -592,7 +591,8 @@ impl RetransmitStage { repair_socket: Arc, verified_receiver: Receiver>, exit: &Arc, - completed_slots_receivers: [CompletedSlotsReceiver; 2], + rpc_completed_slots_receiver: CompletedSlotsReceiver, + cluster_slots_update_receiver: ClusterSlotsUpdateReceiver, epoch_schedule: EpochSchedule, cfg: Option>, shred_version: u16, @@ -618,8 +618,6 @@ impl RetransmitStage { rpc_subscriptions.clone(), ); - let [rpc_completed_slots_receiver, cluster_completed_slots_receiver] = - completed_slots_receivers; let rpc_completed_slots_hdl = RpcCompletedSlotsService::spawn(rpc_completed_slots_receiver, rpc_subscriptions); let cluster_slots_service = ClusterSlotsService::new( @@ -627,7 +625,7 @@ impl RetransmitStage { cluster_slots.clone(), bank_forks.clone(), cluster_info.clone(), - cluster_completed_slots_receiver, + cluster_slots_update_receiver, exit.clone(), ); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 8f1bd381fd..90a5105ce1 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -111,7 +111,7 @@ impl Tvu { tower: Tower, leader_schedule_cache: &Arc, exit: &Arc, - completed_slots_receivers: [CompletedSlotsReceiver; 2], + completed_slots_receiver: CompletedSlotsReceiver, block_commitment_cache: Arc>, cfg: Option>, transaction_status_sender: Option, @@ -165,6 +165,7 @@ impl Tvu { let compaction_interval = tvu_config.rocksdb_compaction_interval; let max_compaction_jitter = tvu_config.rocksdb_max_compaction_jitter; let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded(); + let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded(); let retransmit_stage = RetransmitStage::new( bank_forks.clone(), leader_schedule_cache, @@ -174,7 +175,8 @@ impl Tvu { repair_socket, verified_receiver, &exit, - completed_slots_receivers, + completed_slots_receiver, + cluster_slots_update_receiver, *bank_forks.read().unwrap().working_bank().epoch_schedule(), cfg, tvu_config.shred_version, @@ -288,6 +290,7 @@ impl Tvu { replay_vote_sender, gossip_confirmed_slots_receiver, gossip_verified_vote_hash_receiver, + cluster_slots_update_sender, ); let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { @@ -373,7 +376,7 @@ pub mod tests { let BlockstoreSignals { blockstore, ledger_signal_receiver, - completed_slots_receivers, + completed_slots_receiver, .. } = Blockstore::open_with_signal(&blockstore_path, None, true) .expect("Expected to successfully open ledger"); @@ -417,7 +420,7 @@ pub mod tests { tower, &leader_schedule_cache, &exit, - completed_slots_receivers, + completed_slots_receiver, block_commitment_cache, None, None, diff --git a/core/src/validator.rs b/core/src/validator.rs index 7b729fa949..c5bfb0a4fd 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -387,7 +387,7 @@ impl Validator { bank_forks, blockstore, ledger_signal_receiver, - completed_slots_receivers, + completed_slots_receiver, leader_schedule_cache, snapshot_hash, TransactionHistoryServices { @@ -719,7 +719,7 @@ impl Validator { tower, &leader_schedule_cache, &exit, - completed_slots_receivers, + completed_slots_receiver, block_commitment_cache, config.enable_partition.clone(), transaction_status_sender.clone(), @@ -1042,7 +1042,7 @@ fn new_banks_from_ledger( BankForks, Arc, Receiver, - [CompletedSlotsReceiver; 2], + CompletedSlotsReceiver, LeaderScheduleCache, Option<(Slot, Hash)>, TransactionHistoryServices, @@ -1073,7 +1073,7 @@ fn new_banks_from_ledger( let BlockstoreSignals { mut blockstore, ledger_signal_receiver, - completed_slots_receivers, + completed_slots_receiver, .. } = Blockstore::open_with_signal( ledger_path, @@ -1225,7 +1225,7 @@ fn new_banks_from_ledger( bank_forks, blockstore, ledger_signal_receiver, - completed_slots_receivers, + completed_slots_receiver, leader_schedule_cache, snapshot_hash, transaction_history_services, diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index d64d7e81d3..4cee093610 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -85,7 +85,8 @@ pub const MAX_TURBINE_DELAY_IN_TICKS: u64 = MAX_TURBINE_PROPAGATION_IN_MS / MS_P // (32K shreds per slot * 4 TX per shred * 2.5 slots per sec) pub const MAX_DATA_SHREDS_PER_SLOT: usize = 32_768; -pub type CompletedSlotsReceiver = Receiver>; +pub type CompletedSlotsSender = SyncSender>; +pub type CompletedSlotsReceiver = Receiver>; type CompletedRanges = Vec<(u32, u32)>; #[derive(Clone, Copy)] @@ -118,7 +119,7 @@ pub struct CompletedDataSetInfo { pub struct BlockstoreSignals { pub blockstore: Blockstore, pub ledger_signal_receiver: Receiver, - pub completed_slots_receivers: [CompletedSlotsReceiver; 2], + pub completed_slots_receiver: CompletedSlotsReceiver, } // ledger window @@ -144,7 +145,7 @@ pub struct Blockstore { last_root: Arc>, insert_shreds_lock: Arc>, pub new_shreds_signals: Vec>, - pub completed_slots_senders: Vec>>, + pub completed_slots_senders: Vec, pub lowest_cleanup_slot: Arc>, no_compaction: bool, } @@ -385,18 +386,16 @@ impl Blockstore { enforce_ulimit_nofile, )?; let (ledger_signal_sender, ledger_signal_receiver) = sync_channel(1); - let (completed_slots_sender1, completed_slots_receiver1) = - sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL); - let (completed_slots_sender2, completed_slots_receiver2) = + let (completed_slots_sender, completed_slots_receiver) = sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL); blockstore.new_shreds_signals = vec![ledger_signal_sender]; - blockstore.completed_slots_senders = vec![completed_slots_sender1, completed_slots_sender2]; + blockstore.completed_slots_senders = vec![completed_slots_sender]; Ok(BlockstoreSignals { blockstore, ledger_signal_receiver, - completed_slots_receivers: [completed_slots_receiver1, completed_slots_receiver2], + completed_slots_receiver, }) } @@ -4568,7 +4567,7 @@ pub mod tests { let ledger_path = get_tmp_ledger_path!(); let BlockstoreSignals { blockstore: ledger, - completed_slots_receivers: [recvr, _], + completed_slots_receiver: recvr, .. } = Blockstore::open_with_signal(&ledger_path, None, true).unwrap(); let ledger = Arc::new(ledger); @@ -4594,7 +4593,7 @@ pub mod tests { let ledger_path = get_tmp_ledger_path!(); let BlockstoreSignals { blockstore: ledger, - completed_slots_receivers: [recvr, _], + completed_slots_receiver: recvr, .. } = Blockstore::open_with_signal(&ledger_path, None, true).unwrap(); let ledger = Arc::new(ledger); @@ -4638,7 +4637,7 @@ pub mod tests { let ledger_path = get_tmp_ledger_path!(); let BlockstoreSignals { blockstore: ledger, - completed_slots_receivers: [recvr, _], + completed_slots_receiver: recvr, .. } = Blockstore::open_with_signal(&ledger_path, None, true).unwrap(); let ledger = Arc::new(ledger);