Make startup aware of Incremental Snapshots (#19600)
This commit is contained in:
@ -497,8 +497,17 @@ mod tests {
|
||||
full_leader_cache: true,
|
||||
..ProcessOptions::default()
|
||||
};
|
||||
let (bank_forks, cached_leader_schedule) =
|
||||
process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap();
|
||||
let (accounts_package_sender, _) = channel();
|
||||
let (bank_forks, cached_leader_schedule, _) = process_blockstore(
|
||||
&genesis_config,
|
||||
&blockstore,
|
||||
Vec::new(),
|
||||
opts,
|
||||
None,
|
||||
None,
|
||||
accounts_package_sender,
|
||||
)
|
||||
.unwrap();
|
||||
let leader_schedule_cache = Arc::new(cached_leader_schedule);
|
||||
let bank_forks = Arc::new(RwLock::new(bank_forks));
|
||||
|
||||
|
@ -44,10 +44,10 @@ use solana_runtime::{
|
||||
bank_forks::BankForks,
|
||||
commitment::BlockCommitmentCache,
|
||||
snapshot_config::SnapshotConfig,
|
||||
snapshot_package::PendingSnapshotPackage,
|
||||
snapshot_package::{AccountsPackageReceiver, AccountsPackageSender, PendingSnapshotPackage},
|
||||
vote_sender_types::ReplayVoteSender,
|
||||
};
|
||||
use solana_sdk::{pubkey::Pubkey, signature::Keypair};
|
||||
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair};
|
||||
use std::{
|
||||
boxed::Box,
|
||||
collections::HashSet,
|
||||
@ -135,6 +135,8 @@ impl Tvu {
|
||||
tvu_config: TvuConfig,
|
||||
max_slots: &Arc<MaxSlots>,
|
||||
cost_model: &Arc<RwLock<CostModel>>,
|
||||
accounts_package_channel: (AccountsPackageSender, AccountsPackageReceiver),
|
||||
last_full_snapshot_slot: Option<Slot>,
|
||||
) -> Self {
|
||||
let Sockets {
|
||||
repair: repair_socket,
|
||||
@ -212,9 +214,9 @@ impl Tvu {
|
||||
(Some(snapshot_config), Some(pending_snapshot_package))
|
||||
})
|
||||
.unwrap_or((None, None));
|
||||
let (accounts_hash_sender, accounts_hash_receiver) = channel();
|
||||
let (accounts_package_sender, accounts_package_receiver) = accounts_package_channel;
|
||||
let accounts_hash_verifier = AccountsHashVerifier::new(
|
||||
accounts_hash_receiver,
|
||||
accounts_package_receiver,
|
||||
pending_snapshot_package,
|
||||
exit,
|
||||
cluster_info,
|
||||
@ -224,20 +226,19 @@ impl Tvu {
|
||||
snapshot_config.clone(),
|
||||
);
|
||||
|
||||
let (snapshot_request_sender, snapshot_request_handler) = {
|
||||
snapshot_config
|
||||
.map(|snapshot_config| {
|
||||
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
|
||||
(
|
||||
Some(snapshot_request_sender),
|
||||
Some(SnapshotRequestHandler {
|
||||
snapshot_config,
|
||||
snapshot_request_receiver,
|
||||
accounts_package_sender: accounts_hash_sender,
|
||||
}),
|
||||
)
|
||||
})
|
||||
.unwrap_or((None, None))
|
||||
let (snapshot_request_sender, snapshot_request_handler) = match snapshot_config {
|
||||
None => (None, None),
|
||||
Some(snapshot_config) => {
|
||||
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
|
||||
(
|
||||
Some(snapshot_request_sender),
|
||||
Some(SnapshotRequestHandler {
|
||||
snapshot_config,
|
||||
snapshot_request_receiver,
|
||||
accounts_package_sender,
|
||||
}),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
|
||||
@ -340,7 +341,7 @@ impl Tvu {
|
||||
tvu_config.accounts_db_caching_enabled,
|
||||
tvu_config.test_hash_calculation,
|
||||
tvu_config.use_index_hash_calculation,
|
||||
None,
|
||||
last_full_snapshot_slot,
|
||||
);
|
||||
|
||||
Tvu {
|
||||
@ -434,6 +435,7 @@ pub mod tests {
|
||||
let (_, gossip_confirmed_slots_receiver) = unbounded();
|
||||
let bank_forks = Arc::new(RwLock::new(bank_forks));
|
||||
let tower = Tower::default();
|
||||
let accounts_package_channel = channel();
|
||||
let tvu = Tvu::new(
|
||||
&vote_keypair.pubkey(),
|
||||
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
|
||||
@ -477,6 +479,8 @@ pub mod tests {
|
||||
TvuConfig::default(),
|
||||
&Arc::new(MaxSlots::default()),
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
accounts_package_channel,
|
||||
None,
|
||||
);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
tvu.join().unwrap();
|
||||
|
@ -69,7 +69,7 @@ use {
|
||||
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
|
||||
snapshot_archive_info::SnapshotArchiveInfoGetter,
|
||||
snapshot_config::SnapshotConfig,
|
||||
snapshot_package::PendingSnapshotPackage,
|
||||
snapshot_package::{AccountsPackageSender, PendingSnapshotPackage},
|
||||
snapshot_utils,
|
||||
},
|
||||
solana_sdk::{
|
||||
@ -92,7 +92,7 @@ use {
|
||||
path::{Path, PathBuf},
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicU64, Ordering},
|
||||
mpsc::Receiver,
|
||||
mpsc::{channel, Receiver},
|
||||
Arc, Mutex, RwLock,
|
||||
},
|
||||
thread::{sleep, Builder, JoinHandle},
|
||||
@ -379,7 +379,7 @@ impl Validator {
|
||||
.register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
|
||||
}
|
||||
|
||||
let (replay_vote_sender, replay_vote_receiver) = unbounded();
|
||||
let accounts_package_channel = channel();
|
||||
let (
|
||||
genesis_config,
|
||||
bank_forks,
|
||||
@ -387,6 +387,7 @@ impl Validator {
|
||||
ledger_signal_receiver,
|
||||
completed_slots_receiver,
|
||||
leader_schedule_cache,
|
||||
last_full_snapshot_slot,
|
||||
snapshot_hash,
|
||||
TransactionHistoryServices {
|
||||
transaction_status_sender,
|
||||
@ -408,6 +409,7 @@ impl Validator {
|
||||
config.enforce_ulimit_nofile,
|
||||
&start_progress,
|
||||
config.no_poh_speed_test,
|
||||
accounts_package_channel.0.clone(),
|
||||
);
|
||||
|
||||
*start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
|
||||
@ -707,6 +709,7 @@ impl Validator {
|
||||
let rpc_completed_slots_service =
|
||||
RpcCompletedSlotsService::spawn(completed_slots_receiver, rpc_subscriptions.clone());
|
||||
|
||||
let (replay_vote_sender, replay_vote_receiver) = unbounded();
|
||||
let tvu = Tvu::new(
|
||||
vote_account,
|
||||
authorized_voter_keypairs,
|
||||
@ -777,6 +780,8 @@ impl Validator {
|
||||
},
|
||||
&max_slots,
|
||||
&cost_model,
|
||||
accounts_package_channel,
|
||||
last_full_snapshot_slot,
|
||||
);
|
||||
|
||||
let tpu = Tpu::new(
|
||||
@ -1069,7 +1074,7 @@ fn post_process_restored_tower(
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
#[allow(clippy::type_complexity, clippy::too_many_arguments)]
|
||||
fn new_banks_from_ledger(
|
||||
validator_identity: &Pubkey,
|
||||
vote_account: &Pubkey,
|
||||
@ -1080,6 +1085,7 @@ fn new_banks_from_ledger(
|
||||
enforce_ulimit_nofile: bool,
|
||||
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
|
||||
no_poh_speed_test: bool,
|
||||
accounts_package_sender: AccountsPackageSender,
|
||||
) -> (
|
||||
GenesisConfig,
|
||||
BankForks,
|
||||
@ -1087,6 +1093,7 @@ fn new_banks_from_ledger(
|
||||
Receiver<bool>,
|
||||
CompletedSlotsReceiver,
|
||||
LeaderScheduleCache,
|
||||
Option<Slot>,
|
||||
Option<(Slot, Hash)>,
|
||||
TransactionHistoryServices,
|
||||
Tower,
|
||||
@ -1182,24 +1189,26 @@ fn new_banks_from_ledger(
|
||||
TransactionHistoryServices::default()
|
||||
};
|
||||
|
||||
let (mut bank_forks, mut leader_schedule_cache, snapshot_hash) = bank_forks_utils::load(
|
||||
&genesis_config,
|
||||
&blockstore,
|
||||
config.account_paths.clone(),
|
||||
config.account_shrink_paths.clone(),
|
||||
config.snapshot_config.as_ref(),
|
||||
process_options,
|
||||
transaction_history_services
|
||||
.transaction_status_sender
|
||||
.as_ref(),
|
||||
transaction_history_services
|
||||
.cache_block_meta_sender
|
||||
.as_ref(),
|
||||
)
|
||||
.unwrap_or_else(|err| {
|
||||
error!("Failed to load ledger: {:?}", err);
|
||||
abort()
|
||||
});
|
||||
let (mut bank_forks, mut leader_schedule_cache, last_full_snapshot_slot, snapshot_hash) =
|
||||
bank_forks_utils::load(
|
||||
&genesis_config,
|
||||
&blockstore,
|
||||
config.account_paths.clone(),
|
||||
config.account_shrink_paths.clone(),
|
||||
config.snapshot_config.as_ref(),
|
||||
process_options,
|
||||
transaction_history_services
|
||||
.transaction_status_sender
|
||||
.as_ref(),
|
||||
transaction_history_services
|
||||
.cache_block_meta_sender
|
||||
.as_ref(),
|
||||
accounts_package_sender,
|
||||
)
|
||||
.unwrap_or_else(|err| {
|
||||
error!("Failed to load ledger: {:?}", err);
|
||||
abort()
|
||||
});
|
||||
|
||||
if let Some(warp_slot) = config.warp_slot {
|
||||
let snapshot_config = config.snapshot_config.as_ref().unwrap_or_else(|| {
|
||||
@ -1278,6 +1287,7 @@ fn new_banks_from_ledger(
|
||||
ledger_signal_receiver,
|
||||
completed_slots_receiver,
|
||||
leader_schedule_cache,
|
||||
last_full_snapshot_slot,
|
||||
snapshot_hash,
|
||||
transaction_history_services,
|
||||
tower,
|
||||
|
Reference in New Issue
Block a user