Switch EpochSlots to be frozen slots, not completed slots (#17168)

This commit is contained in:
carllin
2021-06-02 17:20:00 -07:00
committed by GitHub
parent 9388aaca15
commit 96ba2edfeb
7 changed files with 78 additions and 79 deletions

View File

@ -1,29 +1,32 @@
use crate::cluster_slots::ClusterSlots; use crate::cluster_slots::ClusterSlots;
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use solana_gossip::cluster_info::ClusterInfo; use solana_gossip::cluster_info::ClusterInfo;
use solana_ledger::blockstore::{Blockstore, CompletedSlotsReceiver}; use solana_ledger::blockstore::Blockstore;
use solana_measure::measure::Measure; use solana_measure::measure::Measure;
use solana_runtime::bank_forks::BankForks; use solana_runtime::bank_forks::BankForks;
use solana_sdk::{clock::Slot, pubkey::Pubkey}; use solana_sdk::{clock::Slot, pubkey::Pubkey};
use std::{ use std::{
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
mpsc::RecvTimeoutError,
{Arc, RwLock}, {Arc, RwLock},
}, },
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
pub type ClusterSlotsUpdateReceiver = Receiver<Vec<Slot>>;
pub type ClusterSlotsUpdateSender = Sender<Vec<Slot>>;
#[derive(Default, Debug)] #[derive(Default, Debug)]
struct ClusterSlotsServiceTiming { struct ClusterSlotsServiceTiming {
pub lowest_slot_elapsed: u64, pub lowest_slot_elapsed: u64,
pub update_completed_slots_elapsed: u64, pub process_cluster_slots_updates_elapsed: u64,
} }
impl ClusterSlotsServiceTiming { impl ClusterSlotsServiceTiming {
fn update(&mut self, lowest_slot_elapsed: u64, update_completed_slots_elapsed: u64) { fn update(&mut self, lowest_slot_elapsed: u64, process_cluster_slots_updates_elapsed: u64) {
self.lowest_slot_elapsed += lowest_slot_elapsed; self.lowest_slot_elapsed += lowest_slot_elapsed;
self.update_completed_slots_elapsed += update_completed_slots_elapsed; self.process_cluster_slots_updates_elapsed += process_cluster_slots_updates_elapsed;
} }
} }
@ -37,12 +40,12 @@ impl ClusterSlotsService {
cluster_slots: Arc<ClusterSlots>, cluster_slots: Arc<ClusterSlots>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
completed_slots_receiver: CompletedSlotsReceiver, cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Self { ) -> Self {
let id = cluster_info.id(); let id = cluster_info.id();
Self::initialize_lowest_slot(id, &blockstore, &cluster_info); Self::initialize_lowest_slot(id, &blockstore, &cluster_info);
Self::initialize_epoch_slots(&blockstore, &cluster_info, &completed_slots_receiver); Self::initialize_epoch_slots(&bank_forks, &cluster_info);
let t_cluster_slots_service = Builder::new() let t_cluster_slots_service = Builder::new()
.name("solana-cluster-slots-service".to_string()) .name("solana-cluster-slots-service".to_string())
.spawn(move || { .spawn(move || {
@ -51,7 +54,7 @@ impl ClusterSlotsService {
cluster_slots, cluster_slots,
bank_forks, bank_forks,
cluster_info, cluster_info,
completed_slots_receiver, cluster_slots_update_receiver,
exit, exit,
) )
}) })
@ -71,7 +74,7 @@ impl ClusterSlotsService {
cluster_slots: Arc<ClusterSlots>, cluster_slots: Arc<ClusterSlots>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
completed_slots_receiver: CompletedSlotsReceiver, cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) { ) {
let mut cluster_slots_service_timing = ClusterSlotsServiceTiming::default(); let mut cluster_slots_service_timing = ClusterSlotsServiceTiming::default();
@ -80,7 +83,8 @@ impl ClusterSlotsService {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break; break;
} }
let slots = match completed_slots_receiver.recv_timeout(Duration::from_millis(200)) { let slots = match cluster_slots_update_receiver.recv_timeout(Duration::from_millis(200))
{
Ok(slots) => Some(slots), Ok(slots) => Some(slots),
Err(RecvTimeoutError::Timeout) => None, Err(RecvTimeoutError::Timeout) => None,
Err(RecvTimeoutError::Disconnected) => { Err(RecvTimeoutError::Disconnected) => {
@ -94,17 +98,21 @@ impl ClusterSlotsService {
let lowest_slot = blockstore.lowest_slot(); let lowest_slot = blockstore.lowest_slot();
Self::update_lowest_slot(&id, lowest_slot, &cluster_info); Self::update_lowest_slot(&id, lowest_slot, &cluster_info);
lowest_slot_elapsed.stop(); lowest_slot_elapsed.stop();
let mut update_completed_slots_elapsed = let mut process_cluster_slots_updates_elapsed =
Measure::start("update_completed_slots_elapsed"); Measure::start("process_cluster_slots_updates_elapsed");
if let Some(slots) = slots { if let Some(slots) = slots {
Self::update_completed_slots(slots, &completed_slots_receiver, &cluster_info); Self::process_cluster_slots_updates(
slots,
&cluster_slots_update_receiver,
&cluster_info,
);
} }
cluster_slots.update(new_root, &cluster_info, &bank_forks); cluster_slots.update(new_root, &cluster_info, &bank_forks);
update_completed_slots_elapsed.stop(); process_cluster_slots_updates_elapsed.stop();
cluster_slots_service_timing.update( cluster_slots_service_timing.update(
lowest_slot_elapsed.as_us(), lowest_slot_elapsed.as_us(),
update_completed_slots_elapsed.as_us(), process_cluster_slots_updates_elapsed.as_us(),
); );
if last_stats.elapsed().as_secs() > 2 { if last_stats.elapsed().as_secs() > 2 {
@ -116,8 +124,8 @@ impl ClusterSlotsService {
i64 i64
), ),
( (
"update_completed_slots_elapsed", "process_cluster_slots_updates_elapsed",
cluster_slots_service_timing.update_completed_slots_elapsed, cluster_slots_service_timing.process_cluster_slots_updates_elapsed,
i64 i64
), ),
); );
@ -127,12 +135,12 @@ impl ClusterSlotsService {
} }
} }
fn update_completed_slots( fn process_cluster_slots_updates(
mut slots: Vec<Slot>, mut slots: Vec<Slot>,
completed_slots_receiver: &CompletedSlotsReceiver, cluster_slots_update_receiver: &ClusterSlotsUpdateReceiver,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
) { ) {
while let Ok(mut more) = completed_slots_receiver.try_recv() { while let Ok(mut more) = cluster_slots_update_receiver.try_recv() {
slots.append(&mut more); slots.append(&mut more);
} }
#[allow(clippy::stable_sort_primitive)] #[allow(clippy::stable_sort_primitive)]
@ -155,30 +163,16 @@ impl ClusterSlotsService {
cluster_info.push_lowest_slot(*id, lowest_slot); cluster_info.push_lowest_slot(*id, lowest_slot);
} }
fn initialize_epoch_slots( fn initialize_epoch_slots(bank_forks: &RwLock<BankForks>, cluster_info: &ClusterInfo) {
blockstore: &Blockstore, // TODO: Should probably incorporate slots that were replayed on startup,
cluster_info: &ClusterInfo, // and maybe some that were frozen < snapshot root in case validators restart
completed_slots_receiver: &CompletedSlotsReceiver, // from newer snapshots and lose history.
) { let frozen_banks = bank_forks.read().unwrap().frozen_banks();
let root = blockstore.last_root(); let mut frozen_bank_slots: Vec<Slot> = frozen_banks.keys().cloned().collect();
let mut slots: Vec<_> = blockstore frozen_bank_slots.sort_unstable();
.live_slots_iterator(root)
.filter_map(|(slot, slot_meta)| {
if slot_meta.is_full() {
Some(slot)
} else {
None
}
})
.collect();
while let Ok(mut more) = completed_slots_receiver.try_recv() { if !frozen_bank_slots.is_empty() {
slots.append(&mut more); cluster_info.push_epoch_slots(&frozen_bank_slots);
}
slots.sort_unstable();
slots.dedup();
if !slots.is_empty() {
cluster_info.push_epoch_slots(&slots);
} }
} }
} }

View File

@ -5,6 +5,7 @@ use crate::{
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
outstanding_requests::OutstandingRequests, outstanding_requests::OutstandingRequests,
repair_weight::RepairWeight, repair_weight::RepairWeight,
replay_stage::DUPLICATE_THRESHOLD,
result::Result, result::Result,
serve_repair::{RepairType, ServeRepair}, serve_repair::{RepairType, ServeRepair},
}; };
@ -15,9 +16,7 @@ use solana_ledger::{
shred::Nonce, shred::Nonce,
}; };
use solana_measure::measure::Measure; use solana_measure::measure::Measure;
use solana_runtime::{ use solana_runtime::{bank::Bank, bank_forks::BankForks, contains::Contains};
bank::Bank, bank_forks::BankForks, commitment::VOTE_THRESHOLD_SIZE, contains::Contains,
};
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}; use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp};
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
@ -33,6 +32,8 @@ use std::{
pub type DuplicateSlotsResetSender = CrossbeamSender<Slot>; pub type DuplicateSlotsResetSender = CrossbeamSender<Slot>;
pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Slot>; pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Slot>;
pub type ConfirmedSlotsSender = CrossbeamSender<Vec<Slot>>;
pub type ConfirmedSlotsReceiver = CrossbeamReceiver<Vec<Slot>>;
pub type OutstandingRepairs = OutstandingRequests<RepairType>; pub type OutstandingRepairs = OutstandingRequests<RepairType>;
@ -569,7 +570,7 @@ impl RepairService {
) { ) {
for slot in new_duplicate_slots { for slot in new_duplicate_slots {
warn!( warn!(
"Cluster completed slot: {}, dumping our current version and repairing", "Cluster confirmed slot: {}, dumping our current version and repairing",
slot slot
); );
// Clear the slot signatures from status cache for this slot // Clear the slot signatures from status cache for this slot
@ -641,7 +642,7 @@ impl RepairService {
}) })
.sum(); .sum();
if total_completed_slot_stake as f64 / total_stake as f64 if total_completed_slot_stake as f64 / total_stake as f64
> VOTE_THRESHOLD_SIZE > DUPLICATE_THRESHOLD
{ {
Some(dead_slot) Some(dead_slot)
} else { } else {
@ -1059,7 +1060,7 @@ mod test {
let serve_repair = ServeRepair::new(cluster_info.clone()); let serve_repair = ServeRepair::new(cluster_info.clone());
let valid_repair_peer = Node::new_localhost().info; let valid_repair_peer = Node::new_localhost().info;
// Signal that this peer has completed the dead slot, and is thus // Signal that this peer has confirmed the dead slot, and is thus
// a valid target for repair // a valid target for repair
let dead_slot = 9; let dead_slot = 9;
let cluster_slots = ClusterSlots::default(); let cluster_slots = ClusterSlots::default();

View File

@ -8,6 +8,7 @@ use crate::{
}, },
cluster_slot_state_verifier::*, cluster_slot_state_verifier::*,
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
cluster_slots_service::ClusterSlotsUpdateSender,
commitment_service::{AggregateCommitmentService, CommitmentAggregationData}, commitment_service::{AggregateCommitmentService, CommitmentAggregationData},
consensus::{ consensus::{
ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes, SWITCH_FORK_THRESHOLD, ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes, SWITCH_FORK_THRESHOLD,
@ -292,6 +293,7 @@ impl ReplayStage {
replay_vote_sender: ReplayVoteSender, replay_vote_sender: ReplayVoteSender,
gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver, gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver,
gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver, gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
cluster_slots_update_sender: ClusterSlotsUpdateSender,
) -> Self { ) -> Self {
let ReplayStageConfig { let ReplayStageConfig {
my_pubkey, my_pubkey,
@ -387,6 +389,7 @@ impl ReplayStage {
&descendants, &descendants,
&mut unfrozen_gossip_verified_vote_hashes, &mut unfrozen_gossip_verified_vote_hashes,
&mut latest_validator_votes_for_frozen_banks, &mut latest_validator_votes_for_frozen_banks,
&cluster_slots_update_sender,
); );
replay_active_banks_time.stop(); replay_active_banks_time.stop();
@ -1156,9 +1159,8 @@ impl ReplayStage {
// Signal retransmit // Signal retransmit
if Self::should_retransmit(poh_slot, &mut skipped_slots_info.last_retransmit_slot) { if Self::should_retransmit(poh_slot, &mut skipped_slots_info.last_retransmit_slot) {
datapoint_info!("replay_stage-retransmit", ("slot", bank.slot(), i64),); datapoint_info!("replay_stage-retransmit", ("slot", bank.slot(), i64),);
retransmit_slots_sender let _ = retransmit_slots_sender
.send(vec![(bank.slot(), bank.clone())].into_iter().collect()) .send(vec![(bank.slot(), bank.clone())].into_iter().collect());
.unwrap();
} }
return; return;
} }
@ -1638,6 +1640,7 @@ impl ReplayStage {
descendants: &HashMap<Slot, HashSet<Slot>>, descendants: &HashMap<Slot, HashSet<Slot>>,
unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes, unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes,
latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks, latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks,
cluster_slots_update_sender: &ClusterSlotsUpdateSender,
) -> bool { ) -> bool {
let mut did_complete_bank = false; let mut did_complete_bank = false;
let mut tx_count = 0; let mut tx_count = 0;
@ -1725,6 +1728,7 @@ impl ReplayStage {
); );
did_complete_bank = true; did_complete_bank = true;
info!("bank frozen: {}", bank.slot()); info!("bank frozen: {}", bank.slot());
let _ = cluster_slots_update_sender.send(vec![*bank_slot]);
if let Some(transaction_status_sender) = transaction_status_sender { if let Some(transaction_status_sender) = transaction_status_sender {
transaction_status_sender.send_transaction_status_freeze_message(&bank); transaction_status_sender.send_transaction_status_freeze_message(&bank);
} }

View File

@ -4,10 +4,9 @@
use crate::{ use crate::{
cluster_info_vote_listener::VerifiedVoteReceiver, cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
cluster_slots_service::ClusterSlotsService, cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver},
completed_data_sets_service::CompletedDataSetsSender, completed_data_sets_service::CompletedDataSetsSender,
repair_service::DuplicateSlotsResetSender, repair_service::{DuplicateSlotsResetSender, RepairInfo},
repair_service::RepairInfo,
result::{Error, Result}, result::{Error, Result},
window_service::{should_retransmit_and_persist, WindowService}, window_service::{should_retransmit_and_persist, WindowService},
}; };
@ -592,7 +591,8 @@ impl RetransmitStage {
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
verified_receiver: Receiver<Vec<Packets>>, verified_receiver: Receiver<Vec<Packets>>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
completed_slots_receivers: [CompletedSlotsReceiver; 2], rpc_completed_slots_receiver: CompletedSlotsReceiver,
cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
epoch_schedule: EpochSchedule, epoch_schedule: EpochSchedule,
cfg: Option<Arc<AtomicBool>>, cfg: Option<Arc<AtomicBool>>,
shred_version: u16, shred_version: u16,
@ -618,8 +618,6 @@ impl RetransmitStage {
rpc_subscriptions.clone(), rpc_subscriptions.clone(),
); );
let [rpc_completed_slots_receiver, cluster_completed_slots_receiver] =
completed_slots_receivers;
let rpc_completed_slots_hdl = let rpc_completed_slots_hdl =
RpcCompletedSlotsService::spawn(rpc_completed_slots_receiver, rpc_subscriptions); RpcCompletedSlotsService::spawn(rpc_completed_slots_receiver, rpc_subscriptions);
let cluster_slots_service = ClusterSlotsService::new( let cluster_slots_service = ClusterSlotsService::new(
@ -627,7 +625,7 @@ impl RetransmitStage {
cluster_slots.clone(), cluster_slots.clone(),
bank_forks.clone(), bank_forks.clone(),
cluster_info.clone(), cluster_info.clone(),
cluster_completed_slots_receiver, cluster_slots_update_receiver,
exit.clone(), exit.clone(),
); );

View File

@ -111,7 +111,7 @@ impl Tvu {
tower: Tower, tower: Tower,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
completed_slots_receivers: [CompletedSlotsReceiver; 2], completed_slots_receiver: CompletedSlotsReceiver,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
cfg: Option<Arc<AtomicBool>>, cfg: Option<Arc<AtomicBool>>,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
@ -165,6 +165,7 @@ impl Tvu {
let compaction_interval = tvu_config.rocksdb_compaction_interval; let compaction_interval = tvu_config.rocksdb_compaction_interval;
let max_compaction_jitter = tvu_config.rocksdb_max_compaction_jitter; let max_compaction_jitter = tvu_config.rocksdb_max_compaction_jitter;
let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded(); let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded();
let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded();
let retransmit_stage = RetransmitStage::new( let retransmit_stage = RetransmitStage::new(
bank_forks.clone(), bank_forks.clone(),
leader_schedule_cache, leader_schedule_cache,
@ -174,7 +175,8 @@ impl Tvu {
repair_socket, repair_socket,
verified_receiver, verified_receiver,
&exit, &exit,
completed_slots_receivers, completed_slots_receiver,
cluster_slots_update_receiver,
*bank_forks.read().unwrap().working_bank().epoch_schedule(), *bank_forks.read().unwrap().working_bank().epoch_schedule(),
cfg, cfg,
tvu_config.shred_version, tvu_config.shred_version,
@ -288,6 +290,7 @@ impl Tvu {
replay_vote_sender, replay_vote_sender,
gossip_confirmed_slots_receiver, gossip_confirmed_slots_receiver,
gossip_verified_vote_hash_receiver, gossip_verified_vote_hash_receiver,
cluster_slots_update_sender,
); );
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
@ -373,7 +376,7 @@ pub mod tests {
let BlockstoreSignals { let BlockstoreSignals {
blockstore, blockstore,
ledger_signal_receiver, ledger_signal_receiver,
completed_slots_receivers, completed_slots_receiver,
.. ..
} = Blockstore::open_with_signal(&blockstore_path, None, true) } = Blockstore::open_with_signal(&blockstore_path, None, true)
.expect("Expected to successfully open ledger"); .expect("Expected to successfully open ledger");
@ -417,7 +420,7 @@ pub mod tests {
tower, tower,
&leader_schedule_cache, &leader_schedule_cache,
&exit, &exit,
completed_slots_receivers, completed_slots_receiver,
block_commitment_cache, block_commitment_cache,
None, None,
None, None,

View File

@ -387,7 +387,7 @@ impl Validator {
bank_forks, bank_forks,
blockstore, blockstore,
ledger_signal_receiver, ledger_signal_receiver,
completed_slots_receivers, completed_slots_receiver,
leader_schedule_cache, leader_schedule_cache,
snapshot_hash, snapshot_hash,
TransactionHistoryServices { TransactionHistoryServices {
@ -719,7 +719,7 @@ impl Validator {
tower, tower,
&leader_schedule_cache, &leader_schedule_cache,
&exit, &exit,
completed_slots_receivers, completed_slots_receiver,
block_commitment_cache, block_commitment_cache,
config.enable_partition.clone(), config.enable_partition.clone(),
transaction_status_sender.clone(), transaction_status_sender.clone(),
@ -1042,7 +1042,7 @@ fn new_banks_from_ledger(
BankForks, BankForks,
Arc<Blockstore>, Arc<Blockstore>,
Receiver<bool>, Receiver<bool>,
[CompletedSlotsReceiver; 2], CompletedSlotsReceiver,
LeaderScheduleCache, LeaderScheduleCache,
Option<(Slot, Hash)>, Option<(Slot, Hash)>,
TransactionHistoryServices, TransactionHistoryServices,
@ -1073,7 +1073,7 @@ fn new_banks_from_ledger(
let BlockstoreSignals { let BlockstoreSignals {
mut blockstore, mut blockstore,
ledger_signal_receiver, ledger_signal_receiver,
completed_slots_receivers, completed_slots_receiver,
.. ..
} = Blockstore::open_with_signal( } = Blockstore::open_with_signal(
ledger_path, ledger_path,
@ -1225,7 +1225,7 @@ fn new_banks_from_ledger(
bank_forks, bank_forks,
blockstore, blockstore,
ledger_signal_receiver, ledger_signal_receiver,
completed_slots_receivers, completed_slots_receiver,
leader_schedule_cache, leader_schedule_cache,
snapshot_hash, snapshot_hash,
transaction_history_services, transaction_history_services,

View File

@ -85,7 +85,8 @@ pub const MAX_TURBINE_DELAY_IN_TICKS: u64 = MAX_TURBINE_PROPAGATION_IN_MS / MS_P
// (32K shreds per slot * 4 TX per shred * 2.5 slots per sec) // (32K shreds per slot * 4 TX per shred * 2.5 slots per sec)
pub const MAX_DATA_SHREDS_PER_SLOT: usize = 32_768; pub const MAX_DATA_SHREDS_PER_SLOT: usize = 32_768;
pub type CompletedSlotsReceiver = Receiver<Vec<u64>>; pub type CompletedSlotsSender = SyncSender<Vec<Slot>>;
pub type CompletedSlotsReceiver = Receiver<Vec<Slot>>;
type CompletedRanges = Vec<(u32, u32)>; type CompletedRanges = Vec<(u32, u32)>;
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
@ -118,7 +119,7 @@ pub struct CompletedDataSetInfo {
pub struct BlockstoreSignals { pub struct BlockstoreSignals {
pub blockstore: Blockstore, pub blockstore: Blockstore,
pub ledger_signal_receiver: Receiver<bool>, pub ledger_signal_receiver: Receiver<bool>,
pub completed_slots_receivers: [CompletedSlotsReceiver; 2], pub completed_slots_receiver: CompletedSlotsReceiver,
} }
// ledger window // ledger window
@ -144,7 +145,7 @@ pub struct Blockstore {
last_root: Arc<RwLock<Slot>>, last_root: Arc<RwLock<Slot>>,
insert_shreds_lock: Arc<Mutex<()>>, insert_shreds_lock: Arc<Mutex<()>>,
pub new_shreds_signals: Vec<SyncSender<bool>>, pub new_shreds_signals: Vec<SyncSender<bool>>,
pub completed_slots_senders: Vec<SyncSender<Vec<Slot>>>, pub completed_slots_senders: Vec<CompletedSlotsSender>,
pub lowest_cleanup_slot: Arc<RwLock<Slot>>, pub lowest_cleanup_slot: Arc<RwLock<Slot>>,
no_compaction: bool, no_compaction: bool,
} }
@ -385,18 +386,16 @@ impl Blockstore {
enforce_ulimit_nofile, enforce_ulimit_nofile,
)?; )?;
let (ledger_signal_sender, ledger_signal_receiver) = sync_channel(1); let (ledger_signal_sender, ledger_signal_receiver) = sync_channel(1);
let (completed_slots_sender1, completed_slots_receiver1) = let (completed_slots_sender, completed_slots_receiver) =
sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL);
let (completed_slots_sender2, completed_slots_receiver2) =
sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL); sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL);
blockstore.new_shreds_signals = vec![ledger_signal_sender]; blockstore.new_shreds_signals = vec![ledger_signal_sender];
blockstore.completed_slots_senders = vec![completed_slots_sender1, completed_slots_sender2]; blockstore.completed_slots_senders = vec![completed_slots_sender];
Ok(BlockstoreSignals { Ok(BlockstoreSignals {
blockstore, blockstore,
ledger_signal_receiver, ledger_signal_receiver,
completed_slots_receivers: [completed_slots_receiver1, completed_slots_receiver2], completed_slots_receiver,
}) })
} }
@ -4568,7 +4567,7 @@ pub mod tests {
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let BlockstoreSignals { let BlockstoreSignals {
blockstore: ledger, blockstore: ledger,
completed_slots_receivers: [recvr, _], completed_slots_receiver: recvr,
.. ..
} = Blockstore::open_with_signal(&ledger_path, None, true).unwrap(); } = Blockstore::open_with_signal(&ledger_path, None, true).unwrap();
let ledger = Arc::new(ledger); let ledger = Arc::new(ledger);
@ -4594,7 +4593,7 @@ pub mod tests {
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let BlockstoreSignals { let BlockstoreSignals {
blockstore: ledger, blockstore: ledger,
completed_slots_receivers: [recvr, _], completed_slots_receiver: recvr,
.. ..
} = Blockstore::open_with_signal(&ledger_path, None, true).unwrap(); } = Blockstore::open_with_signal(&ledger_path, None, true).unwrap();
let ledger = Arc::new(ledger); let ledger = Arc::new(ledger);
@ -4638,7 +4637,7 @@ pub mod tests {
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let BlockstoreSignals { let BlockstoreSignals {
blockstore: ledger, blockstore: ledger,
completed_slots_receivers: [recvr, _], completed_slots_receiver: recvr,
.. ..
} = Blockstore::open_with_signal(&ledger_path, None, true).unwrap(); } = Blockstore::open_with_signal(&ledger_path, None, true).unwrap();
let ledger = Arc::new(ledger); let ledger = Arc::new(ledger);