2021-12-03 09:00:31 -08:00
|
|
|
use {
|
|
|
|
solana_gossip::cluster_info::{
|
|
|
|
ClusterInfo, MAX_INCREMENTAL_SNAPSHOT_HASHES, MAX_SNAPSHOT_HASHES,
|
2021-10-08 15:14:56 -05:00
|
|
|
},
|
2021-12-03 09:00:31 -08:00
|
|
|
solana_perf::thread::renice_this_thread,
|
|
|
|
solana_runtime::{
|
|
|
|
snapshot_archive_info::SnapshotArchiveInfoGetter,
|
|
|
|
snapshot_config::SnapshotConfig,
|
|
|
|
snapshot_hash::{
|
|
|
|
FullSnapshotHash, FullSnapshotHashes, IncrementalSnapshotHash,
|
|
|
|
IncrementalSnapshotHashes, StartingSnapshotHashes,
|
|
|
|
},
|
|
|
|
snapshot_package::{PendingSnapshotPackage, SnapshotType},
|
|
|
|
snapshot_utils,
|
|
|
|
},
|
|
|
|
solana_sdk::{clock::Slot, hash::Hash},
|
|
|
|
std::{
|
|
|
|
sync::{
|
|
|
|
atomic::{AtomicBool, Ordering},
|
|
|
|
Arc,
|
|
|
|
},
|
|
|
|
thread::{self, Builder, JoinHandle},
|
|
|
|
time::Duration,
|
2020-01-23 10:20:34 -07:00
|
|
|
},
|
|
|
|
};
|
2019-10-18 15:58:16 -06:00
|
|
|
|
|
|
|
pub struct SnapshotPackagerService {
|
|
|
|
t_snapshot_packager: JoinHandle<()>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl SnapshotPackagerService {
|
2020-02-20 11:46:13 -08:00
|
|
|
pub fn new(
|
2021-01-11 10:21:15 -08:00
|
|
|
pending_snapshot_package: PendingSnapshotPackage,
|
2021-10-08 15:14:56 -05:00
|
|
|
starting_snapshot_hashes: Option<StartingSnapshotHashes>,
|
2020-02-20 11:46:13 -08:00
|
|
|
exit: &Arc<AtomicBool>,
|
2020-04-21 12:54:45 -07:00
|
|
|
cluster_info: &Arc<ClusterInfo>,
|
2021-09-03 16:42:32 -05:00
|
|
|
snapshot_config: SnapshotConfig,
|
2021-10-20 13:07:29 -05:00
|
|
|
enable_gossip_push: bool,
|
2020-02-20 11:46:13 -08:00
|
|
|
) -> Self {
|
2019-10-18 15:58:16 -06:00
|
|
|
let exit = exit.clone();
|
2020-02-20 11:46:13 -08:00
|
|
|
let cluster_info = cluster_info.clone();
|
2021-10-08 15:14:56 -05:00
|
|
|
let max_full_snapshot_hashes = std::cmp::min(
|
2021-10-01 15:59:45 -05:00
|
|
|
MAX_SNAPSHOT_HASHES,
|
|
|
|
snapshot_config.maximum_full_snapshot_archives_to_retain,
|
|
|
|
);
|
2021-10-08 15:14:56 -05:00
|
|
|
let max_incremental_snapshot_hashes = std::cmp::min(
|
|
|
|
MAX_INCREMENTAL_SNAPSHOT_HASHES,
|
|
|
|
snapshot_config.maximum_incremental_snapshot_archives_to_retain,
|
|
|
|
);
|
2020-03-05 23:52:31 -07:00
|
|
|
|
2019-10-18 15:58:16 -06:00
|
|
|
let t_snapshot_packager = Builder::new()
|
2021-01-11 10:21:15 -08:00
|
|
|
.name("snapshot-packager".to_string())
|
2020-02-21 18:42:24 -08:00
|
|
|
.spawn(move || {
|
2021-10-27 06:56:16 +05:00
|
|
|
renice_this_thread(snapshot_config.packager_thread_niceness_adj).unwrap();
|
2021-10-20 13:07:29 -05:00
|
|
|
let mut snapshot_gossip_manager = if enable_gossip_push {
|
|
|
|
Some(SnapshotGossipManager {
|
|
|
|
cluster_info,
|
|
|
|
max_full_snapshot_hashes,
|
|
|
|
max_incremental_snapshot_hashes,
|
|
|
|
full_snapshot_hashes: FullSnapshotHashes::default(),
|
|
|
|
incremental_snapshot_hashes: IncrementalSnapshotHashes::default(),
|
|
|
|
})
|
|
|
|
} else {
|
|
|
|
None
|
2021-10-08 15:14:56 -05:00
|
|
|
};
|
2021-10-20 13:07:29 -05:00
|
|
|
if let Some(snapshot_gossip_manager) = snapshot_gossip_manager.as_mut() {
|
|
|
|
snapshot_gossip_manager.push_starting_snapshot_hashes(starting_snapshot_hashes);
|
|
|
|
}
|
2021-10-08 15:14:56 -05:00
|
|
|
|
2020-02-21 18:42:24 -08:00
|
|
|
loop {
|
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
|
break;
|
|
|
|
}
|
2020-01-23 11:20:37 -07:00
|
|
|
|
2021-01-11 10:21:15 -08:00
|
|
|
let snapshot_package = pending_snapshot_package.lock().unwrap().take();
|
2021-08-31 18:33:27 -05:00
|
|
|
if snapshot_package.is_none() {
|
2021-01-11 10:21:15 -08:00
|
|
|
std::thread::sleep(Duration::from_millis(100));
|
2021-08-31 18:33:27 -05:00
|
|
|
continue;
|
2019-10-18 15:58:16 -06:00
|
|
|
}
|
2021-08-31 18:33:27 -05:00
|
|
|
let snapshot_package = snapshot_package.unwrap();
|
|
|
|
|
|
|
|
// Archiving the snapshot package is not allowed to fail.
|
|
|
|
// AccountsBackgroundService calls `clean_accounts()` with a value for
|
|
|
|
// last_full_snapshot_slot that requires this archive call to succeed.
|
|
|
|
snapshot_utils::archive_snapshot_package(
|
|
|
|
&snapshot_package,
|
2021-09-03 16:42:32 -05:00
|
|
|
snapshot_config.maximum_full_snapshot_archives_to_retain,
|
2021-09-06 18:01:56 -05:00
|
|
|
snapshot_config.maximum_incremental_snapshot_archives_to_retain,
|
2021-08-31 18:33:27 -05:00
|
|
|
)
|
|
|
|
.expect("failed to archive snapshot package");
|
|
|
|
|
2021-10-20 13:07:29 -05:00
|
|
|
if let Some(snapshot_gossip_manager) = snapshot_gossip_manager.as_mut() {
|
|
|
|
snapshot_gossip_manager.push_snapshot_hash(
|
|
|
|
snapshot_package.snapshot_type,
|
|
|
|
(snapshot_package.slot(), *snapshot_package.hash()),
|
|
|
|
);
|
|
|
|
}
|
2019-10-18 15:58:16 -06:00
|
|
|
}
|
|
|
|
})
|
|
|
|
.unwrap();
|
2021-01-11 10:21:15 -08:00
|
|
|
|
2019-10-18 15:58:16 -06:00
|
|
|
Self {
|
|
|
|
t_snapshot_packager,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-13 11:12:09 -07:00
|
|
|
pub fn join(self) -> thread::Result<()> {
|
2019-10-18 15:58:16 -06:00
|
|
|
self.t_snapshot_packager.join()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-08 15:14:56 -05:00
|
|
|
struct SnapshotGossipManager {
|
|
|
|
cluster_info: Arc<ClusterInfo>,
|
|
|
|
max_full_snapshot_hashes: usize,
|
|
|
|
max_incremental_snapshot_hashes: usize,
|
|
|
|
full_snapshot_hashes: FullSnapshotHashes,
|
|
|
|
incremental_snapshot_hashes: IncrementalSnapshotHashes,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl SnapshotGossipManager {
|
|
|
|
/// If there were starting snapshot hashes, add those to their respective vectors, then push
|
|
|
|
/// those vectors to the cluster via CRDS.
|
|
|
|
fn push_starting_snapshot_hashes(
|
|
|
|
&mut self,
|
|
|
|
starting_snapshot_hashes: Option<StartingSnapshotHashes>,
|
|
|
|
) {
|
|
|
|
if let Some(starting_snapshot_hashes) = starting_snapshot_hashes {
|
|
|
|
let starting_full_snapshot_hash = starting_snapshot_hashes.full;
|
|
|
|
self.push_full_snapshot_hash(starting_full_snapshot_hash);
|
|
|
|
|
|
|
|
if let Some(starting_incremental_snapshot_hash) = starting_snapshot_hashes.incremental {
|
|
|
|
self.push_incremental_snapshot_hash(starting_incremental_snapshot_hash);
|
|
|
|
};
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Add `snapshot_hash` to its respective vector of hashes, then push that vector to the
|
|
|
|
/// cluster via CRDS.
|
|
|
|
fn push_snapshot_hash(&mut self, snapshot_type: SnapshotType, snapshot_hash: (Slot, Hash)) {
|
|
|
|
match snapshot_type {
|
|
|
|
SnapshotType::FullSnapshot => {
|
|
|
|
self.push_full_snapshot_hash(FullSnapshotHash {
|
|
|
|
hash: snapshot_hash,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
SnapshotType::IncrementalSnapshot(base_slot) => {
|
|
|
|
let latest_full_snapshot_hash = *self.full_snapshot_hashes.hashes.last().unwrap();
|
|
|
|
assert_eq!(
|
|
|
|
base_slot, latest_full_snapshot_hash.0,
|
|
|
|
"the incremental snapshot's base slot ({}) must match the latest full snapshot hash's slot ({})",
|
|
|
|
base_slot, latest_full_snapshot_hash.0,
|
|
|
|
);
|
|
|
|
self.push_incremental_snapshot_hash(IncrementalSnapshotHash {
|
|
|
|
base: latest_full_snapshot_hash,
|
|
|
|
hash: snapshot_hash,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Add `full_snapshot_hash` to the vector of full snapshot hashes, then push that vector to
|
|
|
|
/// the cluster via CRDS.
|
|
|
|
fn push_full_snapshot_hash(&mut self, full_snapshot_hash: FullSnapshotHash) {
|
|
|
|
self.full_snapshot_hashes
|
|
|
|
.hashes
|
|
|
|
.push(full_snapshot_hash.hash);
|
|
|
|
while self.full_snapshot_hashes.hashes.len() > self.max_full_snapshot_hashes {
|
|
|
|
self.full_snapshot_hashes.hashes.remove(0);
|
|
|
|
}
|
|
|
|
self.cluster_info
|
|
|
|
.push_snapshot_hashes(self.full_snapshot_hashes.hashes.clone());
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Add `incremental_snapshot_hash` to the vector of incremental snapshot hashes, then push
|
|
|
|
/// that vector to the cluster via CRDS.
|
|
|
|
fn push_incremental_snapshot_hash(
|
|
|
|
&mut self,
|
|
|
|
incremental_snapshot_hash: IncrementalSnapshotHash,
|
|
|
|
) {
|
|
|
|
// If the base snapshot hash is different from the one in IncrementalSnapshotHashes, then
|
|
|
|
// that means the old incremental snapshot hashes are no longer valid, so clear them all
|
|
|
|
// out.
|
|
|
|
if incremental_snapshot_hash.base != self.incremental_snapshot_hashes.base {
|
|
|
|
self.incremental_snapshot_hashes.hashes.clear();
|
|
|
|
self.incremental_snapshot_hashes.base = incremental_snapshot_hash.base;
|
|
|
|
}
|
|
|
|
|
|
|
|
self.incremental_snapshot_hashes
|
|
|
|
.hashes
|
|
|
|
.push(incremental_snapshot_hash.hash);
|
|
|
|
while self.incremental_snapshot_hashes.hashes.len() > self.max_incremental_snapshot_hashes {
|
|
|
|
self.incremental_snapshot_hashes.hashes.remove(0);
|
|
|
|
}
|
|
|
|
// Pushing incremental snapshot hashes to the cluster should never fail. The only error
|
|
|
|
// case is when the length of the hashes is too big, but we account for that with
|
|
|
|
// `max_incremental_snapshot_hashes`. If this call ever does error, it's a programmer bug!
|
|
|
|
// Check to see what changed in `push_incremental_snapshot_hashes()` and handle the new
|
|
|
|
// error condition here.
|
|
|
|
self.cluster_info
|
|
|
|
.push_incremental_snapshot_hashes(
|
|
|
|
self.incremental_snapshot_hashes.base,
|
|
|
|
self.incremental_snapshot_hashes.hashes.clone(),
|
|
|
|
)
|
|
|
|
.expect(
|
|
|
|
"Bug! The programmer contract has changed for push_incremental_snapshot_hashes() \
|
|
|
|
and a new error case has been added, which has not been handled here.",
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-10-18 15:58:16 -06:00
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
2021-12-03 09:00:31 -08:00
|
|
|
use {
|
|
|
|
super::*,
|
|
|
|
bincode::serialize_into,
|
|
|
|
solana_runtime::{
|
|
|
|
accounts_db::AccountStorageEntry,
|
|
|
|
bank::BankSlotDelta,
|
|
|
|
snapshot_archive_info::SnapshotArchiveInfo,
|
|
|
|
snapshot_package::{SnapshotPackage, SnapshotType},
|
|
|
|
snapshot_utils::{
|
|
|
|
self, ArchiveFormat, SnapshotVersion, SNAPSHOT_STATUS_CACHE_FILE_NAME,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
solana_sdk::hash::Hash,
|
|
|
|
std::{
|
|
|
|
fs::{self, remove_dir_all, OpenOptions},
|
|
|
|
io::Write,
|
|
|
|
path::{Path, PathBuf},
|
|
|
|
},
|
|
|
|
tempfile::TempDir,
|
2019-12-05 14:58:02 -05:00
|
|
|
};
|
2019-10-18 15:58:16 -06:00
|
|
|
|
2019-12-05 14:58:02 -05:00
|
|
|
// Create temporary placeholder directory for all test files
|
|
|
|
fn make_tmp_dir_path() -> PathBuf {
|
|
|
|
let out_dir = std::env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string());
|
|
|
|
let path = PathBuf::from(format!("{}/tmp/test_package_snapshots", out_dir));
|
|
|
|
|
|
|
|
// whack any possible collision
|
|
|
|
let _ignored = std::fs::remove_dir_all(&path);
|
|
|
|
// whack any possible collision
|
|
|
|
let _ignored = std::fs::remove_file(&path);
|
|
|
|
|
|
|
|
path
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_package_snapshots_relative_ledger_path() {
|
|
|
|
let temp_dir = make_tmp_dir_path();
|
|
|
|
create_and_verify_snapshot(&temp_dir);
|
|
|
|
remove_dir_all(temp_dir).expect("should remove tmp dir");
|
|
|
|
}
|
|
|
|
|
2019-10-18 15:58:16 -06:00
|
|
|
#[test]
|
|
|
|
fn test_package_snapshots() {
|
2019-12-05 14:58:02 -05:00
|
|
|
create_and_verify_snapshot(TempDir::new().unwrap().path())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn create_and_verify_snapshot(temp_dir: &Path) {
|
|
|
|
let accounts_dir = temp_dir.join("accounts");
|
|
|
|
let snapshots_dir = temp_dir.join("snapshots");
|
2021-08-21 15:41:03 -05:00
|
|
|
let snapshot_archives_dir = temp_dir.join("snapshots_output");
|
|
|
|
fs::create_dir_all(&snapshot_archives_dir).unwrap();
|
2019-10-18 15:58:16 -06:00
|
|
|
|
2019-12-13 16:46:16 -08:00
|
|
|
fs::create_dir_all(&accounts_dir).unwrap();
|
2019-10-18 15:58:16 -06:00
|
|
|
// Create some storage entries
|
|
|
|
let storage_entries: Vec<_> = (0..5)
|
|
|
|
.map(|i| Arc::new(AccountStorageEntry::new(&accounts_dir, 0, i, 10)))
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
// Create some fake snapshot
|
|
|
|
let snapshots_paths: Vec<_> = (0..5)
|
|
|
|
.map(|i| {
|
2020-03-25 18:46:41 +09:00
|
|
|
let snapshot_file_name = format!("{}", i);
|
|
|
|
let snapshots_dir = snapshots_dir.join(&snapshot_file_name);
|
|
|
|
fs::create_dir_all(&snapshots_dir).unwrap();
|
|
|
|
let fake_snapshot_path = snapshots_dir.join(&snapshot_file_name);
|
2019-10-18 15:58:16 -06:00
|
|
|
let mut fake_snapshot_file = OpenOptions::new()
|
|
|
|
.read(true)
|
|
|
|
.write(true)
|
|
|
|
.create(true)
|
|
|
|
.open(&fake_snapshot_path)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
fake_snapshot_file.write_all(b"Hello, world!").unwrap();
|
|
|
|
fake_snapshot_path
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
// Create directory of hard links for snapshots
|
2019-12-05 14:58:02 -05:00
|
|
|
let link_snapshots_dir = tempfile::tempdir_in(&temp_dir).unwrap();
|
2019-10-18 15:58:16 -06:00
|
|
|
for snapshots_path in snapshots_paths {
|
|
|
|
let snapshot_file_name = snapshots_path.file_name().unwrap();
|
2020-03-25 18:46:41 +09:00
|
|
|
let link_snapshots_dir = link_snapshots_dir.path().join(snapshot_file_name);
|
|
|
|
fs::create_dir_all(&link_snapshots_dir).unwrap();
|
|
|
|
let link_path = link_snapshots_dir.join(snapshot_file_name);
|
2019-10-18 15:58:16 -06:00
|
|
|
fs::hard_link(&snapshots_path, &link_path).unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create a packageable snapshot
|
2021-08-31 18:33:27 -05:00
|
|
|
let slot = 42;
|
|
|
|
let hash = Hash::default();
|
|
|
|
let archive_format = ArchiveFormat::TarBzip2;
|
2021-07-22 14:40:37 -05:00
|
|
|
let output_tar_path = snapshot_utils::build_full_snapshot_archive_path(
|
2021-08-21 15:41:03 -05:00
|
|
|
snapshot_archives_dir,
|
2021-08-31 18:33:27 -05:00
|
|
|
slot,
|
|
|
|
&hash,
|
|
|
|
archive_format,
|
2019-10-18 15:58:16 -06:00
|
|
|
);
|
2021-08-31 18:33:27 -05:00
|
|
|
let snapshot_package = SnapshotPackage {
|
|
|
|
snapshot_archive_info: SnapshotArchiveInfo {
|
|
|
|
path: output_tar_path.clone(),
|
|
|
|
slot,
|
|
|
|
hash,
|
|
|
|
archive_format,
|
|
|
|
},
|
|
|
|
block_height: slot,
|
|
|
|
slot_deltas: vec![],
|
|
|
|
snapshot_links: link_snapshots_dir,
|
|
|
|
snapshot_storages: vec![storage_entries],
|
|
|
|
snapshot_version: SnapshotVersion::default(),
|
|
|
|
snapshot_type: SnapshotType::FullSnapshot,
|
|
|
|
};
|
2019-10-18 15:58:16 -06:00
|
|
|
|
|
|
|
// Make tarball from packageable snapshot
|
2021-05-12 10:32:27 -07:00
|
|
|
snapshot_utils::archive_snapshot_package(
|
|
|
|
&snapshot_package,
|
2021-07-22 14:40:37 -05:00
|
|
|
snapshot_utils::DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN,
|
2021-09-06 18:01:56 -05:00
|
|
|
snapshot_utils::DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
|
2021-05-12 10:32:27 -07:00
|
|
|
)
|
|
|
|
.unwrap();
|
2019-10-18 15:58:16 -06:00
|
|
|
|
2020-06-17 21:54:52 -06:00
|
|
|
// before we compare, stick an empty status_cache in this dir so that the package comparison works
|
2019-10-18 15:58:16 -06:00
|
|
|
// This is needed since the status_cache is added by the packager and is not collected from
|
|
|
|
// the source dir for snapshots
|
2020-02-10 20:11:37 +09:00
|
|
|
let dummy_slot_deltas: Vec<BankSlotDelta> = vec![];
|
2020-01-10 09:49:36 +09:00
|
|
|
snapshot_utils::serialize_snapshot_data_file(
|
|
|
|
&snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME),
|
|
|
|
|stream| {
|
|
|
|
serialize_into(stream, &dummy_slot_deltas)?;
|
|
|
|
Ok(())
|
|
|
|
},
|
|
|
|
)
|
|
|
|
.unwrap();
|
2019-10-18 15:58:16 -06:00
|
|
|
|
2020-01-23 11:20:37 -07:00
|
|
|
// Check archive is correct
|
2020-04-03 13:13:49 -07:00
|
|
|
snapshot_utils::verify_snapshot_archive(
|
|
|
|
output_tar_path,
|
|
|
|
snapshots_dir,
|
|
|
|
accounts_dir,
|
2021-08-31 18:33:27 -05:00
|
|
|
archive_format,
|
2020-04-03 13:13:49 -07:00
|
|
|
);
|
2019-10-18 15:58:16 -06:00
|
|
|
}
|
|
|
|
}
|