diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index 2be2932d46..2f5e18f76d 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -1,13 +1,10 @@ -use bincode::serialize_into; use solana_ledger::snapshot_package::{SnapshotPackage, SnapshotPackageReceiver}; -use solana_ledger::snapshot_utils::{self, SnapshotError, TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR}; +use solana_ledger::snapshot_utils::{ + serialize_status_cache, SnapshotError, TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR, +}; use solana_measure::measure::Measure; use solana_metrics::datapoint_info; -use solana_runtime::status_cache::SlotDelta; -use solana_sdk::transaction::Result as TransactionResult; use std::fs; -use std::fs::File; -use std::io::BufWriter; use std::process::ExitStatus; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::RecvTimeoutError; @@ -76,7 +73,8 @@ impl SnapshotPackagerService { snapshot_package.root ); - Self::serialize_status_cache( + serialize_status_cache( + snapshot_package.root, &snapshot_package.slot_deltas, &snapshot_package.snapshot_links, )?; @@ -175,30 +173,6 @@ impl SnapshotPackagerService { Ok(()) } - fn serialize_status_cache( - slot_deltas: &[SlotDelta>], - snapshot_links: &TempDir, - ) -> Result<()> { - // the status cache is stored as snapshot_path/status_cache - let snapshot_status_cache_file_path = snapshot_links - .path() - .join(snapshot_utils::SNAPSHOT_STATUS_CACHE_FILE_NAME); - - let status_cache = File::create(&snapshot_status_cache_file_path)?; - // status cache writer - let mut status_cache_stream = BufWriter::new(status_cache); - - let mut status_cache_serialize = Measure::start("status_cache_serialize-ms"); - // write the status cache - serialize_into(&mut status_cache_stream, slot_deltas)?; - status_cache_serialize.stop(); - inc_new_counter_info!( - "serialize-status-cache-ms", - status_cache_serialize.as_ms() as usize - ); - Ok(()) - } - pub fn join(self) -> thread::Result<()> { self.t_snapshot_packager.join() } @@ -207,8 +181,13 @@ impl SnapshotPackagerService { #[cfg(test)] mod tests { use super::*; - use solana_ledger::snapshot_utils; - use solana_runtime::accounts_db::AccountStorageEntry; + use bincode::serialize_into; + use solana_ledger::snapshot_utils::{self, SNAPSHOT_STATUS_CACHE_FILE_NAME}; + use solana_runtime::{ + accounts_db::AccountStorageEntry, bank::MAX_SNAPSHOT_DATA_FILE_SIZE, + status_cache::SlotDelta, + }; + use solana_sdk::transaction::Result as TransactionResult; use std::{ fs::{remove_dir_all, OpenOptions}, io::Write, @@ -294,11 +273,16 @@ mod tests { // before we compare, stick an empty status_cache in this dir so that the package comparision works // This is needed since the status_cache is added by the packager and is not collected from // the source dir for snapshots - let slot_deltas: Vec>> = vec![]; - let dummy_status_cache = File::create(snapshots_dir.join("status_cache")).unwrap(); - let mut status_cache_stream = BufWriter::new(dummy_status_cache); - serialize_into(&mut status_cache_stream, &slot_deltas).unwrap(); - status_cache_stream.flush().unwrap(); + let dummy_slot_deltas: Vec>> = vec![]; + snapshot_utils::serialize_snapshot_data_file( + &snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME), + MAX_SNAPSHOT_DATA_FILE_SIZE, + |stream| { + serialize_into(stream, &dummy_slot_deltas)?; + Ok(()) + }, + ) + .unwrap(); // Check tarball is correct snapshot_utils::verify_snapshot_tar(output_tar_path, snapshots_dir, accounts_dir); diff --git a/core/tests/bank_forks.rs b/core/tests/bank_forks.rs index 25c87f7c9e..e96d9ff8ba 100644 --- a/core/tests/bank_forks.rs +++ b/core/tests/bank_forks.rs @@ -25,15 +25,7 @@ mod tests { system_transaction, transaction::Result as TransactionResult, }; - use std::{ - fs, - fs::File, - io::{BufWriter, Write}, - path::PathBuf, - sync::atomic::AtomicBool, - sync::mpsc::channel, - sync::Arc, - }; + use std::{fs, path::PathBuf, sync::atomic::AtomicBool, sync::mpsc::channel, sync::Arc}; use tempfile::TempDir; struct SnapshotTestConfig { @@ -318,12 +310,18 @@ mod tests { // before we compare, stick an empty status_cache in this dir so that the package comparision works // This is needed since the status_cache is added by the packager and is not collected from // the source dir for snapshots - let slot_deltas: Vec>> = vec![]; - let dummy_status_cache = - File::create(saved_snapshots_dir.path().join("status_cache")).unwrap(); - let mut status_cache_stream = BufWriter::new(dummy_status_cache); - serialize_into(&mut status_cache_stream, &slot_deltas).unwrap(); - status_cache_stream.flush().unwrap(); + let dummy_slot_deltas: Vec>> = vec![]; + snapshot_utils::serialize_snapshot_data_file( + &saved_snapshots_dir + .path() + .join(snapshot_utils::SNAPSHOT_STATUS_CACHE_FILE_NAME), + solana_runtime::bank::MAX_SNAPSHOT_DATA_FILE_SIZE, + |stream| { + serialize_into(stream, &dummy_slot_deltas)?; + Ok(()) + }, + ) + .unwrap(); snapshot_utils::verify_snapshot_tar( saved_tar, diff --git a/ledger/src/snapshot_utils.rs b/ledger/src/snapshot_utils.rs index ad1c65dc37..0bdf8c6ccd 100644 --- a/ledger/src/snapshot_utils.rs +++ b/ledger/src/snapshot_utils.rs @@ -1,19 +1,24 @@ use crate::snapshot_package::SnapshotPackage; -use bincode::{deserialize_from, serialize_into}; +use bincode::serialize_into; use bzip2::bufread::BzDecoder; use fs_extra::dir::CopyOptions; use log::*; use solana_measure::measure::Measure; -use solana_runtime::{bank::Bank, status_cache::SlotDelta}; +use solana_runtime::{ + bank::{deserialize_from_snapshot, Bank, MAX_SNAPSHOT_DATA_FILE_SIZE}, + status_cache::SlotDelta, +}; +use solana_sdk::transaction::Result as TransactionResult; use solana_sdk::{clock::Slot, transaction}; use std::{ cmp::Ordering, fs, fs::File, - io::{BufReader, BufWriter, Error as IOError, ErrorKind}, + io::{BufReader, BufWriter, Error as IOError, ErrorKind, Read, Seek, SeekFrom, Write}, path::{Path, PathBuf}, }; use tar::Archive; +use tempfile::TempDir; use thiserror::Error; pub const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache"; @@ -138,6 +143,66 @@ where } } +pub fn serialize_snapshot_data_file( + data_file_path: &Path, + maximum_file_size: u64, + mut serializer: F, +) -> Result +where + F: FnMut(&mut BufWriter) -> Result<()>, +{ + let data_file = File::create(data_file_path)?; + let mut data_file_stream = BufWriter::new(data_file); + serializer(&mut data_file_stream)?; + data_file_stream.flush()?; + + let consumed_size = data_file_stream.seek(SeekFrom::Current(0))?; + if consumed_size > maximum_file_size { + let error_message = format!( + "too large snapshot data file to serialize: {:?} has {} bytes", + data_file_path, consumed_size + ); + return Err(get_io_error(&error_message)); + } + Ok(consumed_size) +} + +pub fn deserialize_snapshot_data_file( + data_file_path: &Path, + maximum_file_size: u64, + mut deserializer: F, +) -> Result +where + F: FnMut(&mut BufReader) -> Result, +{ + let file_size = fs::metadata(&data_file_path)?.len(); + + if file_size > maximum_file_size { + let error_message = format!( + "too large snapshot data file to deserialize: {:?} has {} bytes", + data_file_path, file_size + ); + return Err(get_io_error(&error_message)); + } + + let data_file = File::open(data_file_path)?; + let mut data_file_stream = BufReader::new(data_file); + + let ret = deserializer(&mut data_file_stream)?; + + let consumed_size = data_file_stream.seek(SeekFrom::Current(0))?; + + if file_size != consumed_size { + let error_message = format!( + "invalid snapshot data file: {:?} has {} bytes, however consumed {} bytes to deserialize", + data_file_path, file_size, consumed_size + ); + return Err(get_io_error(&error_message)); + } + + Ok(ret) +} + pub fn add_snapshot>(snapshot_path: P, bank: &Bank) -> Result<()> { bank.purge_zero_lamport_accounts(); let slot = bank.slot(); @@ -146,33 +211,78 @@ pub fn add_snapshot>(snapshot_path: P, bank: &Bank) -> Result<()> fs::create_dir_all(slot_snapshot_dir.clone())?; // the snapshot is stored as snapshot_path/slot/slot - let snapshot_file_path = slot_snapshot_dir.join(get_snapshot_file_name(slot)); + let snapshot_bank_file_path = slot_snapshot_dir.join(get_snapshot_file_name(slot)); info!( "creating snapshot {}, path: {:?}", bank.slot(), - snapshot_file_path, + snapshot_bank_file_path, ); - let snapshot_file = File::create(&snapshot_file_path)?; - // snapshot writer - let mut snapshot_stream = BufWriter::new(snapshot_file); - // Create the snapshot - serialize_into(&mut snapshot_stream, &*bank)?; - let mut bank_rc_serialize = Measure::start("create snapshot"); - serialize_into(&mut snapshot_stream, &bank.rc)?; - bank_rc_serialize.stop(); - inc_new_counter_info!("bank-rc-serialize-ms", bank_rc_serialize.as_ms() as usize); + let mut bank_serialize = Measure::start("bank-serialize-ms"); + let consumed_size = serialize_snapshot_data_file( + &snapshot_bank_file_path, + MAX_SNAPSHOT_DATA_FILE_SIZE, + |stream| { + serialize_into(stream.by_ref(), &*bank)?; + serialize_into(stream.by_ref(), &bank.rc)?; + Ok(()) + }, + )?; + bank_serialize.stop(); + + // Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE + datapoint_info!( + "snapshot-bank-file", + ("slot", bank.slot(), i64), + ("size", consumed_size, i64) + ); + + inc_new_counter_info!("bank-serialize-ms", bank_serialize.as_ms() as usize); info!( "{} for slot {} at {:?}", - bank_rc_serialize, + bank_serialize, bank.slot(), - snapshot_file_path, + snapshot_bank_file_path, ); Ok(()) } +pub fn serialize_status_cache( + slot: Slot, + slot_deltas: &[SlotDelta>], + snapshot_links: &TempDir, +) -> Result<()> { + // the status cache is stored as snapshot_path/status_cache + let snapshot_status_cache_file_path = + snapshot_links.path().join(SNAPSHOT_STATUS_CACHE_FILE_NAME); + + let mut status_cache_serialize = Measure::start("status_cache_serialize-ms"); + let consumed_size = serialize_snapshot_data_file( + &snapshot_status_cache_file_path, + MAX_SNAPSHOT_DATA_FILE_SIZE, + |stream| { + serialize_into(stream, slot_deltas)?; + Ok(()) + }, + )?; + status_cache_serialize.stop(); + + // Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE + datapoint_info!( + "snapshot-status-cache-file", + ("slot", slot, i64), + ("size", consumed_size, i64) + ); + + inc_new_counter_info!( + "serialize-status-cache-ms", + status_cache_serialize.as_ms() as usize + ); + Ok(()) +} + pub fn remove_snapshot>(slot: Slot, snapshot_path: P) -> Result<()> { let slot_snapshot_dir = get_bank_snapshot_dir(&snapshot_path, slot); // Remove the snapshot directory for this slot @@ -180,17 +290,29 @@ pub fn remove_snapshot>(slot: Slot, snapshot_path: P) -> Result<( Ok(()) } -pub fn bank_slot_from_archive>(snapshot_tar: P) -> Result { +pub fn bank_slot_from_archive>(snapshot_tar: P) -> Result { let tempdir = tempfile::TempDir::new()?; untar_snapshot_in(&snapshot_tar, &tempdir)?; let unpacked_snapshots_dir = tempdir.path().join(TAR_SNAPSHOTS_DIR); + let local_account_paths = vec![tempdir.path().join("account_dummy")]; + let unpacked_accounts_dir = tempdir.path().join(TAR_ACCOUNTS_DIR); let snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir); let last_root_paths = snapshot_paths .last() .ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?; - let file = File::open(&last_root_paths.snapshot_file_path)?; - let mut stream = BufReader::new(file); - let bank: Bank = deserialize_from(&mut stream)?; + let bank = deserialize_snapshot_data_file( + &last_root_paths.snapshot_file_path, + MAX_SNAPSHOT_DATA_FILE_SIZE, + |stream| { + let bank: Bank = deserialize_from_snapshot(stream.by_ref())?; + bank.rc.accounts_from_stream( + stream.by_ref(), + &local_account_paths, + &unpacked_accounts_dir, + )?; + Ok(bank) + }, + )?; Ok(bank.slot()) } @@ -269,22 +391,35 @@ where .pop() .ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?; - // Rebuild the root bank info!("Loading from {:?}", &root_paths.snapshot_file_path); - let file = File::open(&root_paths.snapshot_file_path)?; - let mut stream = BufReader::new(file); - let bank: Bank = deserialize_from(&mut stream)?; + let bank = deserialize_snapshot_data_file( + &root_paths.snapshot_file_path, + MAX_SNAPSHOT_DATA_FILE_SIZE, + |stream| { + // Rebuild the root bank + let bank: Bank = deserialize_from_snapshot(stream.by_ref())?; + // Rebuild accounts + bank.rc.accounts_from_stream( + stream.by_ref(), + local_account_paths, + &append_vecs_path, + )?; + Ok(bank) + }, + )?; - // Rebuild accounts - bank.rc - .accounts_from_stream(&mut stream, local_account_paths, append_vecs_path)?; - - // Rebuild status cache let status_cache_path = unpacked_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME); - let status_cache = File::open(status_cache_path)?; - let mut stream = BufReader::new(status_cache); - let slot_deltas: Vec>> = - deserialize_from(&mut stream).unwrap_or_default(); + let slot_deltas = deserialize_snapshot_data_file( + &status_cache_path, + MAX_SNAPSHOT_DATA_FILE_SIZE, + |stream| { + // Rebuild status cache + let slot_deltas: Vec>> = + deserialize_from_snapshot(stream).unwrap_or_default(); + + Ok(slot_deltas) + }, + )?; bank.src.append(&slot_deltas); @@ -322,3 +457,116 @@ where let unpacked_accounts = unpack_dir.join(&TAR_ACCOUNTS_DIR); assert!(!dir_diff::is_different(&storages_to_verify, unpacked_accounts).unwrap()); } + +#[cfg(test)] +mod tests { + use super::*; + use bincode::{deserialize_from, serialize_into}; + use matches::assert_matches; + use std::mem::size_of; + + #[test] + fn test_serialize_snapshot_data_file_under_limit() { + let temp_dir = tempfile::TempDir::new().unwrap(); + let expected_consumed_size = size_of::() as u64; + let consumed_size = serialize_snapshot_data_file( + &temp_dir.path().join("data-file"), + expected_consumed_size, + |stream| { + serialize_into(stream, &2323_u32)?; + Ok(()) + }, + ) + .unwrap(); + assert_eq!(consumed_size, expected_consumed_size); + } + + #[test] + fn test_serialize_snapshot_data_file_over_limit() { + let temp_dir = tempfile::TempDir::new().unwrap(); + let expected_consumed_size = size_of::() as u64; + let result = serialize_snapshot_data_file( + &temp_dir.path().join("data-file"), + expected_consumed_size - 1, + |stream| { + serialize_into(stream, &2323_u32)?; + Ok(()) + }, + ); + assert_matches!(result, Err(SnapshotError::IO(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize")); + } + + #[test] + fn test_deserialize_snapshot_data_file_under_limit() { + let expected_data = 2323_u32; + let expected_consumed_size = size_of::() as u64; + + let temp_dir = tempfile::TempDir::new().unwrap(); + serialize_snapshot_data_file( + &temp_dir.path().join("data-file"), + expected_consumed_size, + |stream| { + serialize_into(stream, &expected_data)?; + Ok(()) + }, + ) + .unwrap(); + + let actual_data = deserialize_snapshot_data_file( + &temp_dir.path().join("data-file"), + expected_consumed_size, + |stream| Ok(deserialize_from::<_, u32>(stream)?), + ) + .unwrap(); + assert_eq!(actual_data, expected_data); + } + + #[test] + fn test_deserialize_snapshot_data_file_over_limit() { + let expected_data = 2323_u32; + let expected_consumed_size = size_of::() as u64; + + let temp_dir = tempfile::TempDir::new().unwrap(); + serialize_snapshot_data_file( + &temp_dir.path().join("data-file"), + expected_consumed_size, + |stream| { + serialize_into(stream, &expected_data)?; + Ok(()) + }, + ) + .unwrap(); + + let result = deserialize_snapshot_data_file( + &temp_dir.path().join("data-file"), + expected_consumed_size - 1, + |stream| Ok(deserialize_from::<_, u32>(stream)?), + ); + assert_matches!(result, Err(SnapshotError::IO(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize")); + } + + #[test] + fn test_deserialize_snapshot_data_file_extra_data() { + let expected_data = 2323_u32; + let expected_consumed_size = size_of::() as u64; + + let temp_dir = tempfile::TempDir::new().unwrap(); + serialize_snapshot_data_file( + &temp_dir.path().join("data-file"), + expected_consumed_size * 2, + |stream| { + serialize_into(stream.by_ref(), &expected_data)?; + serialize_into(stream.by_ref(), &expected_data)?; + Ok(()) + }, + ) + .unwrap(); + + let result = deserialize_snapshot_data_file( + &temp_dir.path().join("data-file"), + expected_consumed_size * 2, + |stream| Ok(deserialize_from::<_, u32>(stream)?), + ); + assert_matches!(result, Err(SnapshotError::IO(ref message)) if message.to_string().starts_with("invalid snapshot data file")); + } +} diff --git a/metrics/scripts/grafana-provisioning/dashboards/testnet-monitor.json b/metrics/scripts/grafana-provisioning/dashboards/testnet-monitor.json index eac230f3f9..7c62def3fc 100644 --- a/metrics/scripts/grafana-provisioning/dashboards/testnet-monitor.json +++ b/metrics/scripts/grafana-provisioning/dashboards/testnet-monitor.json @@ -5803,7 +5803,7 @@ "measurement": "cluster_info-vote-count", "orderByTime": "ASC", "policy": "autogen", - "query": "SELECT mean(\"count\") AS \"serialize_bank_rc\" FROM \"$testnet\".\"autogen\".\"bank-rc-serialize-ms\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "query": "SELECT mean(\"count\") AS \"serialize_bank\" FROM \"$testnet\".\"autogen\".\"bank-serialize-ms\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", "rawQuery": true, "refId": "C", "resultFormat": "time_series", @@ -9510,6 +9510,82 @@ ] ], "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "hide": false, + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT MAX(\"size\") FROM \"$testnet\".\"autogen\".\"snapshot-bank-file\" WHERE $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "D", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "hide": false, + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT MAX(\"size\") FROM \"$testnet\".\"autogen\".\"snapshot-status-cache-file\" WHERE $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "E", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] } ], "thresholds": [], diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 22dc1bed7c..c4d8568cfe 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -20,6 +20,7 @@ use crate::accounts_index::AccountsIndex; use crate::append_vec::{AppendVec, StoredAccount, StoredMeta}; +use crate::bank::deserialize_from_snapshot; use bincode::{deserialize_from, serialize_into}; use byteorder::{ByteOrder, LittleEndian}; use fs_extra::dir::CopyOptions; @@ -459,8 +460,8 @@ impl AccountsDB { ) -> Result<(), IOError> { let _len: usize = deserialize_from(&mut stream).map_err(|e| AccountsDB::get_io_error(&e.to_string()))?; - let storage: AccountStorage = - deserialize_from(&mut stream).map_err(|e| AccountsDB::get_io_error(&e.to_string()))?; + let storage: AccountStorage = deserialize_from_snapshot(&mut stream) + .map_err(|e| AccountsDB::get_io_error(&e.to_string()))?; // Remap the deserialized AppendVec paths to point to correct local paths let new_storage_map: Result, IOError> = storage diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index bc1922b8f2..825b9909d5 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -59,6 +59,7 @@ use std::{ }; pub const SECONDS_PER_YEAR: f64 = (365.25 * 24.0 * 60.0 * 60.0); +pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB pub const MAX_LEADER_SCHEDULE_STAKES: Epoch = 5; @@ -1876,6 +1877,19 @@ pub fn goto_end_of_slot(bank: &mut Bank) { } } +// This guards against possible memory exhaustions in bincode when restoring +// the full state from snapshot data files by imposing a fixed hard limit with +// ample of headrooms for such a usecase. +pub fn deserialize_from_snapshot(reader: R) -> bincode::Result +where + R: Read, + T: serde::de::DeserializeOwned, +{ + bincode::config() + .limit(MAX_SNAPSHOT_DATA_FILE_SIZE) + .deserialize_from(reader) +} + #[cfg(test)] mod tests { use super::*; @@ -1887,7 +1901,7 @@ mod tests { }, status_cache::MAX_CACHE_ENTRIES, }; - use bincode::{deserialize_from, serialize_into, serialized_size}; + use bincode::{serialize_into, serialized_size}; use solana_sdk::instruction::AccountMeta; use solana_sdk::system_program::solana_system_program; use solana_sdk::{ @@ -4318,7 +4332,7 @@ mod tests { serialize_into(&mut writer, &bank2.rc).unwrap(); let mut rdr = Cursor::new(&buf[..]); - let mut dbank: Bank = deserialize_from(&mut rdr).unwrap(); + let mut dbank: Bank = deserialize_from_snapshot(&mut rdr).unwrap(); let mut reader = BufReader::new(&buf[rdr.position() as usize..]); // Create a new set of directories for this bank's accounts