make accounts_db own the directory paths (#5230)

* change paths to something accounts_db (the singleton) owns, fixes SIGILL

* fail deserialize if paths don't work
serialize paths, too

* test that paths are populated from a bank snapshot
This commit is contained in:
Rob Walker
2019-07-23 13:47:48 -07:00
committed by GitHub
parent b41e8333b1
commit 8a12ed029c
4 changed files with 145 additions and 240 deletions

View File

@ -33,7 +33,7 @@ use solana_sdk::account::{Account, LamportCredit};
use solana_sdk::pubkey::Pubkey;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::fs::{create_dir_all, remove_dir_all};
use std::fs::remove_dir_all;
use std::io::{BufReader, Cursor, Error, ErrorKind, Read};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
@ -41,7 +41,6 @@ use std::sync::{Arc, RwLock};
use sys_info;
const ACCOUNT_DATA_FILE_SIZE: u64 = 4 * 1024 * 1024;
const ACCOUNT_DATA_FILE: &str = "data";
pub const NUM_THREADS: u32 = 10;
#[derive(Debug, Default)]
@ -167,11 +166,9 @@ pub struct AccountStorageEntry {
impl AccountStorageEntry {
pub fn new(path: &str, fork_id: Fork, id: usize, file_size: u64) -> Self {
let p = format!("{}/{}.{}", path, fork_id, id);
let path = Path::new(&p);
let _ignored = remove_dir_all(path);
create_dir_all(path).expect("Create directory failed");
let accounts = AppendVec::new(&path.join(ACCOUNT_DATA_FILE), true, file_size as usize);
let tail = format!("{}.{}", fork_id, id);
let path = Path::new(path).join(&tail);
let accounts = AppendVec::new(&path, true, file_size as usize);
AccountStorageEntry {
id,
@ -256,6 +253,37 @@ impl AccountStorageEntry {
}
}
pub fn get_paths_vec(paths: &str) -> Vec<String> {
paths.split(',').map(ToString::to_string).collect()
}
#[derive(Debug)]
struct TempPaths {
pub paths: String,
}
impl Drop for TempPaths {
fn drop(&mut self) {
let paths = get_paths_vec(&self.paths);
paths.iter().for_each(|p| {
let _ignored = remove_dir_all(p);
});
}
}
fn get_temp_accounts_path(paths: &str) -> TempPaths {
let paths = get_paths_vec(paths);
let out_dir = std::env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string());
let rand = Pubkey::new_rand();
let paths: Vec<_> = paths
.iter()
.map(|path| format!("{}/accounts_db/{}/{}", out_dir, rand, path))
.collect();
TempPaths {
paths: paths.join(","),
}
}
// This structure handles the load/store of the accounts
#[derive(Debug)]
pub struct AccountsDB {
@ -272,7 +300,10 @@ pub struct AccountsDB {
write_version: AtomicUsize,
/// Set of storage paths to pick from
paths: Vec<String>,
paths: RwLock<Vec<String>>,
/// Set of paths this accounts_db needs to hold/remove
temp_paths: Option<TempPaths>,
/// Starting file size of appendvecs
file_size: u64,
@ -283,10 +314,6 @@ pub struct AccountsDB {
min_num_stores: usize,
}
pub fn get_paths_vec(paths: &str) -> Vec<String> {
paths.split(',').map(ToString::to_string).collect()
}
impl Default for AccountsDB {
fn default() -> Self {
AccountsDB {
@ -294,7 +321,8 @@ impl Default for AccountsDB {
storage: RwLock::new(AccountStorage(HashMap::new())),
next_id: AtomicUsize::new(0),
write_version: AtomicUsize::new(0),
paths: Vec::default(),
paths: RwLock::new(Vec::default()),
temp_paths: None,
file_size: u64::default(),
thread_pool: rayon::ThreadPoolBuilder::new()
.num_threads(2)
@ -306,20 +334,22 @@ impl Default for AccountsDB {
}
impl AccountsDB {
pub fn new_with_num_stores(paths: &str, min_num_stores: usize) -> Self {
let mut new = Self::new(paths);
new.min_num_stores = min_num_stores;
new
}
pub fn new_with_file_size(paths: Option<String>, file_size: u64) -> Self {
let (paths, temp_paths) = match paths {
Some(paths) => (get_paths_vec(&paths), None),
None => {
let temp_paths = get_temp_accounts_path("0,1,2,3"); // make 4 directories by default
(get_paths_vec(&temp_paths.paths), Some(temp_paths))
}
};
pub fn new_with_file_size(paths: &str, file_size: u64) -> Self {
let paths = get_paths_vec(&paths);
AccountsDB {
accounts_index: RwLock::new(AccountsIndex::default()),
storage: RwLock::new(AccountStorage(HashMap::new())),
next_id: AtomicUsize::new(0),
write_version: AtomicUsize::new(0),
paths,
paths: RwLock::new(paths),
temp_paths,
file_size,
thread_pool: rayon::ThreadPoolBuilder::new()
.num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
@ -329,18 +359,28 @@ impl AccountsDB {
}
}
pub fn new(paths: &str) -> Self {
pub fn new_with_num_stores(paths: Option<String>, min_num_stores: usize) -> Self {
let mut new = Self::new(paths);
new.min_num_stores = min_num_stores;
new
}
pub fn new(paths: Option<String>) -> Self {
Self::new_with_file_size(paths, ACCOUNT_DATA_FILE_SIZE)
}
pub fn paths(&self) -> String {
self.paths.read().unwrap().join(",")
}
pub fn update_from_stream<R: Read>(
&self,
mut stream: &mut BufReader<R>,
) -> Result<(), std::io::Error> {
AppendVec::set_account_paths(&self.paths);
let _len: usize = deserialize_from(&mut stream)
.map_err(|_| AccountsDB::get_io_error("len deserialize error"))?;
*self.paths.write().unwrap() = deserialize_from(&mut stream)
.map_err(|_| AccountsDB::get_io_error("paths deserialize error"))?;
let mut storage: AccountStorage = deserialize_from(&mut stream)
.map_err(|_| AccountsDB::get_io_error("storage deserialize error"))?;
let version: u64 = deserialize_from(&mut stream)
@ -529,8 +569,9 @@ impl AccountsDB {
fork_storage: &mut ForkStores,
size: u64,
) -> Arc<AccountStorageEntry> {
let path_index = thread_rng().gen_range(0, self.paths.len());
let store = Arc::new(self.new_storage_entry(fork_id, &self.paths[path_index], size));
let paths = self.paths.read().unwrap();
let path_index = thread_rng().gen_range(0, paths.len());
let store = Arc::new(self.new_storage_entry(fork_id, &paths[path_index], size));
fork_storage.insert(store.id, store.clone());
store
}
@ -763,10 +804,13 @@ impl Serialize for AccountsDB {
{
use serde::ser::Error;
let storage = self.storage.read().unwrap();
let len = serialized_size(&*storage).unwrap() + std::mem::size_of::<u64>() as u64;
let len = serialized_size(&self.paths).unwrap()
+ serialized_size(&*storage).unwrap()
+ std::mem::size_of::<u64>() as u64;
let mut buf = vec![0u8; len as usize];
let mut wr = Cursor::new(&mut buf[..]);
let version: u64 = self.write_version.load(Ordering::Relaxed) as u64;
serialize_into(&mut wr, &self.paths).map_err(Error::custom)?;
serialize_into(&mut wr, &*storage).map_err(Error::custom)?;
serialize_into(&mut wr, &version).map_err(Error::custom)?;
let len = wr.position() as usize;
@ -783,54 +827,10 @@ mod tests {
use rand::{thread_rng, Rng};
use solana_sdk::account::Account;
fn cleanup_paths(paths: &str) {
let paths = get_paths_vec(&paths);
paths.iter().for_each(|p| {
let _ignored = remove_dir_all(p);
});
}
struct TempPaths {
pub paths: String,
}
impl Drop for TempPaths {
fn drop(&mut self) {
cleanup_paths(&self.paths);
}
}
fn get_tmp_accounts_path(paths: &str) -> TempPaths {
let vpaths = get_paths_vec(paths);
let out_dir = std::env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string());
let vpaths: Vec<_> = vpaths
.iter()
.map(|path| format!("{}/{}", out_dir, path))
.collect();
TempPaths {
paths: vpaths.join(","),
}
}
#[macro_export]
macro_rules! tmp_accounts_name {
() => {
&format!("{}-{}", file!(), line!())
};
}
#[macro_export]
macro_rules! get_tmp_accounts_path {
() => {
get_tmp_accounts_path(tmp_accounts_name!())
};
}
#[test]
fn test_accountsdb_add_root() {
solana_logger::setup();
let paths = get_tmp_accounts_path!();
let db = AccountsDB::new(&paths.paths);
let db = AccountsDB::new(None);
let key = Pubkey::default();
let account0 = Account::new(1, 0, &key);
@ -843,8 +843,7 @@ mod tests {
#[test]
fn test_accountsdb_latest_ancestor() {
solana_logger::setup();
let paths = get_tmp_accounts_path!();
let db = AccountsDB::new(&paths.paths);
let db = AccountsDB::new(None);
let key = Pubkey::default();
let account0 = Account::new(1, 0, &key);
@ -871,8 +870,7 @@ mod tests {
#[test]
fn test_accountsdb_latest_ancestor_with_root() {
solana_logger::setup();
let paths = get_tmp_accounts_path!();
let db = AccountsDB::new(&paths.paths);
let db = AccountsDB::new(None);
let key = Pubkey::default();
let account0 = Account::new(1, 0, &key);
@ -892,8 +890,8 @@ mod tests {
#[test]
fn test_accountsdb_root_one_fork() {
solana_logger::setup();
let paths = get_tmp_accounts_path!();
let db = AccountsDB::new(&paths.paths);
let db = AccountsDB::new(None);
let key = Pubkey::default();
let account0 = Account::new(1, 0, &key);
@ -933,8 +931,7 @@ mod tests {
#[test]
fn test_accountsdb_add_root_many() {
let paths = get_tmp_accounts_path!();
let db = AccountsDB::new(&paths.paths);
let db = AccountsDB::new(None);
let mut pubkeys: Vec<Pubkey> = vec![];
create_account(&db, &mut pubkeys, 0, 100, 0, 0);
@ -966,8 +963,7 @@ mod tests {
#[test]
fn test_accountsdb_count_stores() {
solana_logger::setup();
let paths = get_tmp_accounts_path!();
let db = AccountsDB::new(&paths.paths);
let db = AccountsDB::new(None);
let mut pubkeys: Vec<Pubkey> = vec![];
create_account(
@ -1010,8 +1006,7 @@ mod tests {
let key = Pubkey::default();
// 1 token in the "root", i.e. db zero
let paths = get_tmp_accounts_path!();
let db0 = AccountsDB::new(&paths.paths);
let db0 = AccountsDB::new(None);
let account0 = Account::new(1, 0, &key);
db0.store(0, &hashmap!(&key => &account0));
@ -1035,11 +1030,11 @@ mod tests {
space: usize,
num_vote: usize,
) {
let ancestors = vec![(fork, 0)].into_iter().collect();
for t in 0..num {
let pubkey = Pubkey::new_rand();
let account = Account::new((t + 1) as u64, space, &Account::default().owner);
pubkeys.push(pubkey.clone());
let ancestors = vec![(fork, 0)].into_iter().collect();
assert!(accounts.load_slow(&ancestors, &pubkey).is_none());
accounts.store(fork, &hashmap!(&pubkey => &account));
}
@ -1092,12 +1087,15 @@ mod tests {
num: usize,
count: usize,
) {
for _ in 1..num {
let idx = thread_rng().gen_range(0, num - 1);
let ancestors = vec![(fork, 0)].into_iter().collect();
let account = accounts.load_slow(&ancestors, &pubkeys[idx]).unwrap();
let account1 = Account::new((idx + count) as u64, 0, &Account::default().owner);
assert_eq!(account, (account1, fork));
let ancestors = vec![(fork, 0)].into_iter().collect();
for _ in 0..num {
let idx = thread_rng().gen_range(0, num);
let account = accounts.load_slow(&ancestors, &pubkeys[idx]);
let account1 = Some((
Account::new((idx + count) as u64, 0, &Account::default().owner),
fork,
));
assert_eq!(account, account1);
}
}
@ -1116,12 +1114,12 @@ mod tests {
#[test]
fn test_account_one() {
let paths = get_tmp_accounts_path!();
let accounts = AccountsDB::new(&paths.paths);
let paths = get_temp_accounts_path("one");
let db = AccountsDB::new(Some(paths.paths.clone()));
let mut pubkeys: Vec<Pubkey> = vec![];
create_account(&accounts, &mut pubkeys, 0, 1, 0, 0);
create_account(&db, &mut pubkeys, 0, 1, 0, 0);
let ancestors = vec![(0, 0)].into_iter().collect();
let account = accounts.load_slow(&ancestors, &pubkeys[0]).unwrap();
let account = db.load_slow(&ancestors, &pubkeys[0]).unwrap();
let mut default_account = Account::default();
default_account.lamports = 1;
assert_eq!((default_account, 0), account);
@ -1129,17 +1127,16 @@ mod tests {
#[test]
fn test_account_many() {
let paths = get_tmp_accounts_path("many0,many1");
let accounts = AccountsDB::new(&paths.paths);
let paths = get_temp_accounts_path("many0,many1");
let db = AccountsDB::new(Some(paths.paths.clone()));
let mut pubkeys: Vec<Pubkey> = vec![];
create_account(&accounts, &mut pubkeys, 0, 100, 0, 0);
check_accounts(&accounts, &pubkeys, 0, 100, 1);
create_account(&db, &mut pubkeys, 0, 100, 0, 0);
check_accounts(&db, &pubkeys, 0, 100, 1);
}
#[test]
fn test_account_update() {
let paths = get_tmp_accounts_path!();
let accounts = AccountsDB::new(&paths.paths);
let accounts = AccountsDB::new(None);
let mut pubkeys: Vec<Pubkey> = vec![];
create_account(&accounts, &mut pubkeys, 0, 100, 0, 0);
update_accounts(&accounts, &pubkeys, 0, 99);
@ -1148,9 +1145,9 @@ mod tests {
#[test]
fn test_account_grow_many() {
let paths = get_tmp_accounts_path("many2,many3");
let paths = get_temp_accounts_path("many2,many3");
let size = 4096;
let accounts = AccountsDB::new_with_file_size(&paths.paths, size);
let accounts = AccountsDB::new_with_file_size(Some(paths.paths.clone()), size);
let mut keys = vec![];
for i in 0..9 {
let key = Pubkey::new_rand();
@ -1184,8 +1181,7 @@ mod tests {
#[test]
fn test_account_grow() {
let paths = get_tmp_accounts_path!();
let accounts = AccountsDB::new(&paths.paths);
let accounts = AccountsDB::new(None);
let count = [0, 1];
let status = [AccountStorageStatus::Available, AccountStorageStatus::Full];
let pubkey1 = Pubkey::new_rand();
@ -1249,8 +1245,7 @@ mod tests {
#[test]
fn test_purge_fork_not_root() {
let paths = get_tmp_accounts_path!();
let accounts = AccountsDB::new(&paths.paths);
let accounts = AccountsDB::new(None);
let mut pubkeys: Vec<Pubkey> = vec![];
create_account(&accounts, &mut pubkeys, 0, 1, 0, 0);
let ancestors = vec![(0, 0)].into_iter().collect();
@ -1261,8 +1256,7 @@ mod tests {
#[test]
fn test_purge_fork_after_root() {
let paths = get_tmp_accounts_path!();
let accounts = AccountsDB::new(&paths.paths);
let accounts = AccountsDB::new(None);
let mut pubkeys: Vec<Pubkey> = vec![];
create_account(&accounts, &mut pubkeys, 0, 1, 0, 0);
let ancestors = vec![(0, 0)].into_iter().collect();
@ -1276,8 +1270,7 @@ mod tests {
//This test is pedantic
//A fork is purged when a non root bank is cleaned up. If a fork is behind root but it is
//not root, it means we are retaining dead banks.
let paths = get_tmp_accounts_path!();
let accounts = AccountsDB::new(&paths.paths);
let accounts = AccountsDB::new(None);
let pubkey = Pubkey::new_rand();
let account = Account::new(1, 0, &Account::default().owner);
//store an account
@ -1309,8 +1302,7 @@ mod tests {
#[test]
fn test_accounts_db_serialize() {
solana_logger::setup();
let paths = get_tmp_accounts_path!();
let accounts = AccountsDB::new(&paths.paths);
let accounts = AccountsDB::new(None);
let mut pubkeys: Vec<Pubkey> = vec![];
create_account(&accounts, &mut pubkeys, 0, 100, 0, 0);
assert_eq!(check_storage(&accounts, 0, 100), true);
@ -1329,12 +1321,14 @@ mod tests {
assert!(check_storage(&accounts, 1, 10));
let mut reader = BufReader::new(&buf[..]);
let daccounts = AccountsDB::new(&paths.paths);
let daccounts = AccountsDB::new(None);
assert!(daccounts.update_from_stream(&mut reader).is_ok());
assert_eq!(
daccounts.write_version.load(Ordering::Relaxed),
accounts.write_version.load(Ordering::Relaxed)
);
assert_eq!(daccounts.paths(), accounts.paths());
check_accounts(&daccounts, &pubkeys, 0, 100, 2);
check_accounts(&daccounts, &pubkeys1, 1, 10, 1);
assert!(check_storage(&daccounts, 0, 100));
@ -1346,15 +1340,11 @@ mod tests {
fn test_store_account_stress() {
let fork_id = 42;
let num_threads = 2;
let paths = get_tmp_accounts_path!();
let min_file_bytes = std::mem::size_of::<StorageMeta>()
+ std::mem::size_of::<crate::append_vec::AccountBalance>();
let db = Arc::new(AccountsDB::new_with_file_size(
&paths.paths,
min_file_bytes as u64,
));
let db = Arc::new(AccountsDB::new_with_file_size(None, min_file_bytes as u64));
db.add_root(fork_id);
let thread_hdls: Vec<_> = (0..num_threads)
@ -1391,8 +1381,7 @@ mod tests {
#[test]
fn test_accountsdb_scan_accounts() {
solana_logger::setup();
let paths = get_tmp_accounts_path!();
let db = AccountsDB::new(&paths.paths);
let db = AccountsDB::new(None);
let key = Pubkey::default();
let key0 = Pubkey::new_rand();
let account0 = Account::new(1, 0, &key);
@ -1425,8 +1414,7 @@ mod tests {
#[test]
fn test_store_large_account() {
solana_logger::setup();
let paths = get_tmp_accounts_path!();
let db = AccountsDB::new(&paths.paths);
let db = AccountsDB::new(None);
let key = Pubkey::default();
let data_len = ACCOUNT_DATA_FILE_SIZE as usize + 7;