diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index ae1edc70a8..5b69513a7b 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -1,7 +1,6 @@ use crate::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}; -use solana_ledger::{ - snapshot_package::SnapshotPackageReceiver, snapshot_utils::archive_snapshot_package, -}; +use solana_ledger::{snapshot_package::SnapshotPackageReceiver, snapshot_utils}; +use solana_sdk::{clock::Slot, hash::Hash}; use std::{ sync::{ atomic::{AtomicBool, Ordering}, @@ -19,15 +18,24 @@ pub struct SnapshotPackagerService { impl SnapshotPackagerService { pub fn new( snapshot_package_receiver: SnapshotPackageReceiver, + starting_snapshot_hash: Option<(Slot, Hash)>, exit: &Arc, cluster_info: &Arc>, ) -> Self { let exit = exit.clone(); let cluster_info = cluster_info.clone(); + let t_snapshot_packager = Builder::new() .name("solana-snapshot-packager".to_string()) .spawn(move || { let mut hashes = vec![]; + if let Some(starting_snapshot_hash) = starting_snapshot_hash { + hashes.push(starting_snapshot_hash); + } + cluster_info + .write() + .unwrap() + .push_snapshot_hashes(hashes.clone()); loop { if exit.load(Ordering::Relaxed) { break; @@ -41,7 +49,9 @@ impl SnapshotPackagerService { { snapshot_package = new_snapshot_package; } - if let Err(err) = archive_snapshot_package(&snapshot_package) { + if let Err(err) = + snapshot_utils::archive_snapshot_package(&snapshot_package) + { warn!("Failed to create snapshot archive: {}", err); } else { hashes.push((snapshot_package.root, snapshot_package.hash)); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index d4a1ec17b5..913eb0be7a 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -14,7 +14,6 @@ use crate::{ shred_fetch_stage::ShredFetchStage, sigverify_shreds::ShredSigVerifier, sigverify_stage::{DisabledSigVerifier, SigVerifyStage}, - snapshot_packager_service::SnapshotPackagerService, storage_stage::{StorageStage, StorageState}, }; use crossbeam_channel::unbounded; @@ -23,6 +22,7 @@ use solana_ledger::{ bank_forks::BankForks, blockstore::{Blockstore, CompletedSlotsReceiver}, blockstore_processor::TransactionStatusSender, + snapshot_package::SnapshotPackageSender, }; use solana_sdk::{ pubkey::Pubkey, @@ -47,7 +47,6 @@ pub struct Tvu { blockstream_service: Option, ledger_cleanup_service: Option, storage_stage: StorageStage, - snapshot_packager_service: Option, } pub struct Sockets { @@ -88,6 +87,7 @@ impl Tvu { shred_version: u16, transaction_status_sender: Option, rewards_recorder_sender: Option, + snapshot_package_sender: Option, ) -> Self { let keypair: Arc = cluster_info .read() @@ -148,18 +148,6 @@ impl Tvu { let (blockstream_slot_sender, blockstream_slot_receiver) = channel(); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); - let (snapshot_packager_service, snapshot_package_sender) = { - let snapshot_config = { bank_forks.read().unwrap().snapshot_config().clone() }; - if snapshot_config.is_some() { - // Start a snapshot packaging service - let (sender, receiver) = channel(); - let snapshot_packager_service = - SnapshotPackagerService::new(receiver, exit, &cluster_info.clone()); - (Some(snapshot_packager_service), Some(sender)) - } else { - (None, None) - } - }; let replay_stage_config = ReplayStageConfig { my_pubkey: keypair.pubkey(), @@ -225,7 +213,6 @@ impl Tvu { blockstream_service, ledger_cleanup_service, storage_stage, - snapshot_packager_service, } } @@ -241,9 +228,6 @@ impl Tvu { self.ledger_cleanup_service.unwrap().join()?; } self.replay_stage.join()?; - if let Some(s) = self.snapshot_packager_service { - s.join()?; - } Ok(()) } } @@ -317,6 +301,7 @@ pub mod tests { 0, None, None, + None, ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index ead163d24b..e64a657c77 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -16,6 +16,7 @@ use crate::{ serve_repair::ServeRepair, serve_repair_service::ServeRepairService, sigverify, + snapshot_packager_service::SnapshotPackagerService, storage_stage::StorageState, tpu::Tpu, transaction_status_service::TransactionStatusService, @@ -50,7 +51,7 @@ use std::{ process, sync::atomic::{AtomicBool, Ordering}, sync::mpsc::Receiver, - sync::{Arc, Mutex, RwLock}, + sync::{mpsc::channel, Arc, Mutex, RwLock}, thread::{sleep, Result}, time::Duration, }; @@ -127,6 +128,7 @@ pub struct Validator { rewards_recorder_service: Option, gossip_service: GossipService, serve_repair_service: ServeRepairService, + snapshot_packager_service: Option, poh_recorder: Arc>, poh_service: PohService, tpu: Tpu, @@ -355,7 +357,7 @@ impl Validator { .set_entrypoint(entrypoint_info.clone()); } - if let Some(snapshot_hash) = snapshot_hash { + if let Some(ref snapshot_hash) = snapshot_hash { if let Some(ref trusted_validators) = config.trusted_validators { let mut trusted = false; for _ in 0..10 { @@ -383,6 +385,17 @@ impl Validator { } } + let (snapshot_packager_service, snapshot_package_sender) = + if config.snapshot_config.is_some() { + // Start a snapshot packaging service + let (sender, receiver) = channel(); + let snapshot_packager_service = + SnapshotPackagerService::new(receiver, snapshot_hash, &exit, &cluster_info); + (Some(snapshot_packager_service), Some(sender)) + } else { + (None, None) + }; + wait_for_supermajority(config, &bank, &cluster_info); let voting_keypair = if config.voting_disabled { @@ -445,6 +458,7 @@ impl Validator { node.info.shred_version, transaction_status_sender.clone(), rewards_recorder_sender, + snapshot_package_sender, ); if config.dev_sigverify_disabled { @@ -474,6 +488,7 @@ impl Validator { rpc_service, transaction_status_service, rewards_recorder_service, + snapshot_packager_service, tpu, tvu, poh_service, @@ -535,6 +550,10 @@ impl Validator { rewards_recorder_service.join()?; } + if let Some(s) = self.snapshot_packager_service { + s.join()?; + } + self.gossip_service.join()?; self.serve_repair_service.join()?; self.tpu.join()?; diff --git a/core/tests/bank_forks.rs b/core/tests/bank_forks.rs index e9862a5c2e..f2af82adbe 100644 --- a/core/tests/bank_forks.rs +++ b/core/tests/bank_forks.rs @@ -317,7 +317,7 @@ mod tests { ))); let snapshot_packager_service = - SnapshotPackagerService::new(receiver, &exit, &cluster_info); + SnapshotPackagerService::new(receiver, None, &exit, &cluster_info); // Close the channel so that the package service will exit after reading all the // packages off the channel