Cap file size for snapshot data files (#7182)

* save limit deserialize

* save

* Save

* Clean up

* rustfmt

* rustfmt

* Just comment out to please CI

* Fix ci...

* Move code

* Rustfmt

* Crean up control flow

* Add another comment

* Introduce predetermined constant limit on snapshot data files (deserialize side)

* Introduce predetermined constant limit on snapshot data files (serialize side)

* rustfmt

* Tweak message

* Revert dynamic memory limit

* Limit size of snapshot data file (de)serialization

* Fix test breakage

* Clean up

* Fix uses formatting

* Rename: deserialize_{for,from}_snapshot

* Simplify comment

* Use Slot

* Provide slot for status cache

* Align variable name with snapshot_status_cache_file_path

* Define serialize_snapshot_data_file_with_metrics

* Fix build.......

* De-marco serialize_snapshot_data_file_with_metrics

* Revert u64 => Slot
This commit is contained in:
Ryo Onodera
2020-01-10 09:49:36 +09:00
committed by GitHub
parent 73c93cc345
commit 865c42465a
6 changed files with 412 additions and 91 deletions

View File

@ -1,19 +1,24 @@
use crate::snapshot_package::SnapshotPackage;
use bincode::{deserialize_from, serialize_into};
use bincode::serialize_into;
use bzip2::bufread::BzDecoder;
use fs_extra::dir::CopyOptions;
use log::*;
use solana_measure::measure::Measure;
use solana_runtime::{bank::Bank, status_cache::SlotDelta};
use solana_runtime::{
bank::{deserialize_from_snapshot, Bank, MAX_SNAPSHOT_DATA_FILE_SIZE},
status_cache::SlotDelta,
};
use solana_sdk::transaction::Result as TransactionResult;
use solana_sdk::{clock::Slot, transaction};
use std::{
cmp::Ordering,
fs,
fs::File,
io::{BufReader, BufWriter, Error as IOError, ErrorKind},
io::{BufReader, BufWriter, Error as IOError, ErrorKind, Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
};
use tar::Archive;
use tempfile::TempDir;
use thiserror::Error;
pub const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache";
@ -138,6 +143,66 @@ where
}
}
pub fn serialize_snapshot_data_file<F>(
data_file_path: &Path,
maximum_file_size: u64,
mut serializer: F,
) -> Result<u64>
where
F: FnMut(&mut BufWriter<File>) -> Result<()>,
{
let data_file = File::create(data_file_path)?;
let mut data_file_stream = BufWriter::new(data_file);
serializer(&mut data_file_stream)?;
data_file_stream.flush()?;
let consumed_size = data_file_stream.seek(SeekFrom::Current(0))?;
if consumed_size > maximum_file_size {
let error_message = format!(
"too large snapshot data file to serialize: {:?} has {} bytes",
data_file_path, consumed_size
);
return Err(get_io_error(&error_message));
}
Ok(consumed_size)
}
pub fn deserialize_snapshot_data_file<F, T>(
data_file_path: &Path,
maximum_file_size: u64,
mut deserializer: F,
) -> Result<T>
where
F: FnMut(&mut BufReader<File>) -> Result<T>,
{
let file_size = fs::metadata(&data_file_path)?.len();
if file_size > maximum_file_size {
let error_message = format!(
"too large snapshot data file to deserialize: {:?} has {} bytes",
data_file_path, file_size
);
return Err(get_io_error(&error_message));
}
let data_file = File::open(data_file_path)?;
let mut data_file_stream = BufReader::new(data_file);
let ret = deserializer(&mut data_file_stream)?;
let consumed_size = data_file_stream.seek(SeekFrom::Current(0))?;
if file_size != consumed_size {
let error_message = format!(
"invalid snapshot data file: {:?} has {} bytes, however consumed {} bytes to deserialize",
data_file_path, file_size, consumed_size
);
return Err(get_io_error(&error_message));
}
Ok(ret)
}
pub fn add_snapshot<P: AsRef<Path>>(snapshot_path: P, bank: &Bank) -> Result<()> {
bank.purge_zero_lamport_accounts();
let slot = bank.slot();
@ -146,33 +211,78 @@ pub fn add_snapshot<P: AsRef<Path>>(snapshot_path: P, bank: &Bank) -> Result<()>
fs::create_dir_all(slot_snapshot_dir.clone())?;
// the snapshot is stored as snapshot_path/slot/slot
let snapshot_file_path = slot_snapshot_dir.join(get_snapshot_file_name(slot));
let snapshot_bank_file_path = slot_snapshot_dir.join(get_snapshot_file_name(slot));
info!(
"creating snapshot {}, path: {:?}",
bank.slot(),
snapshot_file_path,
snapshot_bank_file_path,
);
let snapshot_file = File::create(&snapshot_file_path)?;
// snapshot writer
let mut snapshot_stream = BufWriter::new(snapshot_file);
// Create the snapshot
serialize_into(&mut snapshot_stream, &*bank)?;
let mut bank_rc_serialize = Measure::start("create snapshot");
serialize_into(&mut snapshot_stream, &bank.rc)?;
bank_rc_serialize.stop();
inc_new_counter_info!("bank-rc-serialize-ms", bank_rc_serialize.as_ms() as usize);
let mut bank_serialize = Measure::start("bank-serialize-ms");
let consumed_size = serialize_snapshot_data_file(
&snapshot_bank_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
serialize_into(stream.by_ref(), &*bank)?;
serialize_into(stream.by_ref(), &bank.rc)?;
Ok(())
},
)?;
bank_serialize.stop();
// Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
datapoint_info!(
"snapshot-bank-file",
("slot", bank.slot(), i64),
("size", consumed_size, i64)
);
inc_new_counter_info!("bank-serialize-ms", bank_serialize.as_ms() as usize);
info!(
"{} for slot {} at {:?}",
bank_rc_serialize,
bank_serialize,
bank.slot(),
snapshot_file_path,
snapshot_bank_file_path,
);
Ok(())
}
pub fn serialize_status_cache(
slot: Slot,
slot_deltas: &[SlotDelta<TransactionResult<()>>],
snapshot_links: &TempDir,
) -> Result<()> {
// the status cache is stored as snapshot_path/status_cache
let snapshot_status_cache_file_path =
snapshot_links.path().join(SNAPSHOT_STATUS_CACHE_FILE_NAME);
let mut status_cache_serialize = Measure::start("status_cache_serialize-ms");
let consumed_size = serialize_snapshot_data_file(
&snapshot_status_cache_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
serialize_into(stream, slot_deltas)?;
Ok(())
},
)?;
status_cache_serialize.stop();
// Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
datapoint_info!(
"snapshot-status-cache-file",
("slot", slot, i64),
("size", consumed_size, i64)
);
inc_new_counter_info!(
"serialize-status-cache-ms",
status_cache_serialize.as_ms() as usize
);
Ok(())
}
pub fn remove_snapshot<P: AsRef<Path>>(slot: Slot, snapshot_path: P) -> Result<()> {
let slot_snapshot_dir = get_bank_snapshot_dir(&snapshot_path, slot);
// Remove the snapshot directory for this slot
@ -180,17 +290,29 @@ pub fn remove_snapshot<P: AsRef<Path>>(slot: Slot, snapshot_path: P) -> Result<(
Ok(())
}
pub fn bank_slot_from_archive<P: AsRef<Path>>(snapshot_tar: P) -> Result<u64> {
pub fn bank_slot_from_archive<P: AsRef<Path>>(snapshot_tar: P) -> Result<Slot> {
let tempdir = tempfile::TempDir::new()?;
untar_snapshot_in(&snapshot_tar, &tempdir)?;
let unpacked_snapshots_dir = tempdir.path().join(TAR_SNAPSHOTS_DIR);
let local_account_paths = vec![tempdir.path().join("account_dummy")];
let unpacked_accounts_dir = tempdir.path().join(TAR_ACCOUNTS_DIR);
let snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir);
let last_root_paths = snapshot_paths
.last()
.ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?;
let file = File::open(&last_root_paths.snapshot_file_path)?;
let mut stream = BufReader::new(file);
let bank: Bank = deserialize_from(&mut stream)?;
let bank = deserialize_snapshot_data_file(
&last_root_paths.snapshot_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
let bank: Bank = deserialize_from_snapshot(stream.by_ref())?;
bank.rc.accounts_from_stream(
stream.by_ref(),
&local_account_paths,
&unpacked_accounts_dir,
)?;
Ok(bank)
},
)?;
Ok(bank.slot())
}
@ -269,22 +391,35 @@ where
.pop()
.ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?;
// Rebuild the root bank
info!("Loading from {:?}", &root_paths.snapshot_file_path);
let file = File::open(&root_paths.snapshot_file_path)?;
let mut stream = BufReader::new(file);
let bank: Bank = deserialize_from(&mut stream)?;
let bank = deserialize_snapshot_data_file(
&root_paths.snapshot_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
// Rebuild the root bank
let bank: Bank = deserialize_from_snapshot(stream.by_ref())?;
// Rebuild accounts
bank.rc.accounts_from_stream(
stream.by_ref(),
local_account_paths,
&append_vecs_path,
)?;
Ok(bank)
},
)?;
// Rebuild accounts
bank.rc
.accounts_from_stream(&mut stream, local_account_paths, append_vecs_path)?;
// Rebuild status cache
let status_cache_path = unpacked_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME);
let status_cache = File::open(status_cache_path)?;
let mut stream = BufReader::new(status_cache);
let slot_deltas: Vec<SlotDelta<transaction::Result<()>>> =
deserialize_from(&mut stream).unwrap_or_default();
let slot_deltas = deserialize_snapshot_data_file(
&status_cache_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
// Rebuild status cache
let slot_deltas: Vec<SlotDelta<transaction::Result<()>>> =
deserialize_from_snapshot(stream).unwrap_or_default();
Ok(slot_deltas)
},
)?;
bank.src.append(&slot_deltas);
@ -322,3 +457,116 @@ where
let unpacked_accounts = unpack_dir.join(&TAR_ACCOUNTS_DIR);
assert!(!dir_diff::is_different(&storages_to_verify, unpacked_accounts).unwrap());
}
#[cfg(test)]
mod tests {
use super::*;
use bincode::{deserialize_from, serialize_into};
use matches::assert_matches;
use std::mem::size_of;
#[test]
fn test_serialize_snapshot_data_file_under_limit() {
let temp_dir = tempfile::TempDir::new().unwrap();
let expected_consumed_size = size_of::<u32>() as u64;
let consumed_size = serialize_snapshot_data_file(
&temp_dir.path().join("data-file"),
expected_consumed_size,
|stream| {
serialize_into(stream, &2323_u32)?;
Ok(())
},
)
.unwrap();
assert_eq!(consumed_size, expected_consumed_size);
}
#[test]
fn test_serialize_snapshot_data_file_over_limit() {
let temp_dir = tempfile::TempDir::new().unwrap();
let expected_consumed_size = size_of::<u32>() as u64;
let result = serialize_snapshot_data_file(
&temp_dir.path().join("data-file"),
expected_consumed_size - 1,
|stream| {
serialize_into(stream, &2323_u32)?;
Ok(())
},
);
assert_matches!(result, Err(SnapshotError::IO(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
}
#[test]
fn test_deserialize_snapshot_data_file_under_limit() {
let expected_data = 2323_u32;
let expected_consumed_size = size_of::<u32>() as u64;
let temp_dir = tempfile::TempDir::new().unwrap();
serialize_snapshot_data_file(
&temp_dir.path().join("data-file"),
expected_consumed_size,
|stream| {
serialize_into(stream, &expected_data)?;
Ok(())
},
)
.unwrap();
let actual_data = deserialize_snapshot_data_file(
&temp_dir.path().join("data-file"),
expected_consumed_size,
|stream| Ok(deserialize_from::<_, u32>(stream)?),
)
.unwrap();
assert_eq!(actual_data, expected_data);
}
#[test]
fn test_deserialize_snapshot_data_file_over_limit() {
let expected_data = 2323_u32;
let expected_consumed_size = size_of::<u32>() as u64;
let temp_dir = tempfile::TempDir::new().unwrap();
serialize_snapshot_data_file(
&temp_dir.path().join("data-file"),
expected_consumed_size,
|stream| {
serialize_into(stream, &expected_data)?;
Ok(())
},
)
.unwrap();
let result = deserialize_snapshot_data_file(
&temp_dir.path().join("data-file"),
expected_consumed_size - 1,
|stream| Ok(deserialize_from::<_, u32>(stream)?),
);
assert_matches!(result, Err(SnapshotError::IO(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
}
#[test]
fn test_deserialize_snapshot_data_file_extra_data() {
let expected_data = 2323_u32;
let expected_consumed_size = size_of::<u32>() as u64;
let temp_dir = tempfile::TempDir::new().unwrap();
serialize_snapshot_data_file(
&temp_dir.path().join("data-file"),
expected_consumed_size * 2,
|stream| {
serialize_into(stream.by_ref(), &expected_data)?;
serialize_into(stream.by_ref(), &expected_data)?;
Ok(())
},
)
.unwrap();
let result = deserialize_snapshot_data_file(
&temp_dir.path().join("data-file"),
expected_consumed_size * 2,
|stream| Ok(deserialize_from::<_, u32>(stream)?),
);
assert_matches!(result, Err(SnapshotError::IO(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
}
}