@ -20,7 +20,8 @@ use solana_sdk::sysvar;
|
||||
use solana_sdk::transaction::Result;
|
||||
use solana_sdk::transaction::{Transaction, TransactionError};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::io::{BufReader, Read};
|
||||
use std::io::{BufReader, Error as IOError, Read};
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
|
||||
@ -64,11 +65,14 @@ impl Accounts {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_from_stream<R: Read>(
|
||||
pub fn accounts_from_stream<R: Read, P: AsRef<Path>>(
|
||||
&self,
|
||||
stream: &mut BufReader<R>,
|
||||
) -> std::result::Result<(), std::io::Error> {
|
||||
self.accounts_db.update_from_stream(stream)
|
||||
local_paths: String,
|
||||
append_vecs_path: P,
|
||||
) -> std::result::Result<(), IOError> {
|
||||
self.accounts_db
|
||||
.accounts_from_stream(stream, local_paths, append_vecs_path)
|
||||
}
|
||||
|
||||
fn load_tx_accounts(
|
||||
@ -638,6 +642,8 @@ mod tests {
|
||||
// TODO: all the bank tests are bank specific, issue: 2194
|
||||
|
||||
use super::*;
|
||||
use crate::accounts_db::get_temp_accounts_paths;
|
||||
use crate::accounts_db::tests::copy_append_vecs;
|
||||
use bincode::{serialize_into, serialized_size};
|
||||
use rand::{thread_rng, Rng};
|
||||
use solana_sdk::account::Account;
|
||||
@ -650,6 +656,7 @@ mod tests {
|
||||
use std::io::Cursor;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::{thread, time};
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn load_accounts_with_fee(
|
||||
tx: Transaction,
|
||||
@ -1153,7 +1160,8 @@ mod tests {
|
||||
#[test]
|
||||
fn test_accounts_serialize() {
|
||||
solana_logger::setup();
|
||||
let accounts = Accounts::new(None);
|
||||
let (_accounts_dir, paths) = get_temp_accounts_paths(4).unwrap();
|
||||
let accounts = Accounts::new(Some(paths));
|
||||
|
||||
let mut pubkeys: Vec<Pubkey> = vec![];
|
||||
create_test_accounts(&accounts, &mut pubkeys, 100);
|
||||
@ -1165,9 +1173,17 @@ mod tests {
|
||||
let mut writer = Cursor::new(&mut buf[..]);
|
||||
serialize_into(&mut writer, &*accounts.accounts_db).unwrap();
|
||||
|
||||
let copied_accounts = TempDir::new().unwrap();
|
||||
|
||||
// Simulate obtaining a copy of the AppendVecs from a tarball
|
||||
copy_append_vecs(&accounts.accounts_db, copied_accounts.path()).unwrap();
|
||||
|
||||
let mut reader = BufReader::new(&buf[..]);
|
||||
let daccounts = Accounts::new(Some(accounts.accounts_db.paths()));
|
||||
assert!(daccounts.update_from_stream(&mut reader).is_ok());
|
||||
let (_accounts_dir, daccounts_paths) = get_temp_accounts_paths(2).unwrap();
|
||||
let daccounts = Accounts::new(Some(daccounts_paths.clone()));
|
||||
assert!(daccounts
|
||||
.accounts_from_stream(&mut reader, daccounts_paths, copied_accounts.path())
|
||||
.is_ok());
|
||||
check_accounts(&daccounts, &pubkeys, 100);
|
||||
assert_eq!(
|
||||
accounts.hash_internal_state(0),
|
||||
|
@ -20,7 +20,8 @@
|
||||
|
||||
use crate::accounts_index::{AccountsIndex, Fork};
|
||||
use crate::append_vec::{AppendVec, StorageMeta, StoredAccount};
|
||||
use bincode::{deserialize_from, serialize_into, serialized_size};
|
||||
use bincode::{deserialize_from, serialize_into};
|
||||
use fs_extra::dir::CopyOptions;
|
||||
use log::*;
|
||||
use rand::{thread_rng, Rng};
|
||||
use rayon::prelude::*;
|
||||
@ -33,17 +34,17 @@ use solana_sdk::account::{Account, LamportCredit};
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt;
|
||||
use std::fs::remove_dir_all;
|
||||
use std::io::{BufReader, Cursor, Error, ErrorKind, Read};
|
||||
use std::io::{BufReader, Cursor, Error as IOError, ErrorKind, Read, Result as IOResult};
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use sys_info;
|
||||
use tempfile::TempDir;
|
||||
|
||||
pub const DEFAULT_FILE_SIZE: u64 = 4 * 1024 * 1024;
|
||||
pub const DEFAULT_NUM_THREADS: u32 = 8;
|
||||
pub const DEFAULT_DIRS: &str = "0,1,2,3";
|
||||
pub const DEFAULT_NUM_DIRS: u32 = 4;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ErrorCounters {
|
||||
@ -100,7 +101,6 @@ impl<'de> Visitor<'de> for AccountStorageVisitor {
|
||||
M: MapAccess<'de>,
|
||||
{
|
||||
let mut map = HashMap::new();
|
||||
|
||||
while let Some((storage_id, storage_entry)) = access.next_entry()? {
|
||||
let storage_entry: AccountStorageEntry = storage_entry;
|
||||
let storage_fork_map = map
|
||||
@ -177,7 +177,7 @@ pub struct AccountStorageEntry {
|
||||
|
||||
impl AccountStorageEntry {
|
||||
pub fn new(path: &Path, fork_id: Fork, id: usize, file_size: u64) -> Self {
|
||||
let tail = format!("{}.{}", fork_id, id);
|
||||
let tail = AppendVec::new_relative_path(fork_id, id);
|
||||
let path = Path::new(path).join(&tail);
|
||||
let accounts = AppendVec::new(&path, true, file_size as usize);
|
||||
|
||||
@ -271,40 +271,31 @@ impl AccountStorageEntry {
|
||||
count_and_status.0
|
||||
}
|
||||
|
||||
pub fn set_file<P: AsRef<Path>>(&mut self, path: P) -> IOResult<()> {
|
||||
self.accounts.set_file(path)
|
||||
}
|
||||
|
||||
pub fn get_relative_path(&self) -> Option<PathBuf> {
|
||||
AppendVec::get_relative_path(self.accounts.get_path())
|
||||
}
|
||||
|
||||
pub fn get_path(&self) -> PathBuf {
|
||||
self.accounts.get_path()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_paths_vec(paths: &str) -> Vec<String> {
|
||||
paths.split(',').map(ToString::to_string).collect()
|
||||
pub fn get_paths_vec(paths: &str) -> Vec<PathBuf> {
|
||||
paths.split(',').map(PathBuf::from).collect()
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TempPaths {
|
||||
pub paths: String,
|
||||
}
|
||||
|
||||
impl Drop for TempPaths {
|
||||
fn drop(&mut self) {
|
||||
let paths = get_paths_vec(&self.paths);
|
||||
paths.iter().for_each(|p| {
|
||||
let _ignored = remove_dir_all(p);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn get_temp_accounts_path(paths: &str) -> TempPaths {
|
||||
let paths = get_paths_vec(paths);
|
||||
let out_dir = std::env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string());
|
||||
let rand = Pubkey::new_rand();
|
||||
let paths: Vec<_> = paths
|
||||
pub fn get_temp_accounts_paths(count: u32) -> IOResult<(Vec<TempDir>, String)> {
|
||||
let temp_dirs: IOResult<Vec<TempDir>> = (0..count).map(|_| TempDir::new()).collect();
|
||||
let temp_dirs = temp_dirs?;
|
||||
let paths: Vec<String> = temp_dirs
|
||||
.iter()
|
||||
.map(|path| format!("{}/accounts_db/{}/{}", out_dir, rand, path))
|
||||
.map(|t| t.path().to_str().unwrap().to_owned())
|
||||
.collect();
|
||||
TempPaths {
|
||||
paths: paths.join(","),
|
||||
}
|
||||
Ok((temp_dirs, paths.join(",")))
|
||||
}
|
||||
|
||||
// This structure handles the load/store of the accounts
|
||||
@ -323,10 +314,10 @@ pub struct AccountsDB {
|
||||
write_version: AtomicUsize,
|
||||
|
||||
/// Set of storage paths to pick from
|
||||
paths: RwLock<Vec<String>>,
|
||||
paths: RwLock<Vec<PathBuf>>,
|
||||
|
||||
/// Set of paths this accounts_db needs to hold/remove
|
||||
temp_paths: Option<TempPaths>,
|
||||
/// Directory of paths this accounts_db needs to hold/remove
|
||||
temp_paths: Option<Vec<TempDir>>,
|
||||
|
||||
/// Starting file size of appendvecs
|
||||
file_size: u64,
|
||||
@ -341,15 +332,13 @@ impl Default for AccountsDB {
|
||||
fn default() -> Self {
|
||||
let num_threads = sys_info::cpu_num().unwrap_or(DEFAULT_NUM_THREADS) as usize;
|
||||
|
||||
let temp_paths = get_temp_accounts_path(DEFAULT_DIRS); // make 4 directories by default
|
||||
|
||||
AccountsDB {
|
||||
accounts_index: RwLock::new(AccountsIndex::default()),
|
||||
storage: RwLock::new(AccountStorage(HashMap::new())),
|
||||
next_id: AtomicUsize::new(0),
|
||||
write_version: AtomicUsize::new(0),
|
||||
paths: RwLock::new(get_paths_vec(&temp_paths.paths)),
|
||||
temp_paths: Some(temp_paths),
|
||||
paths: RwLock::new(vec![]),
|
||||
temp_paths: None,
|
||||
file_size: DEFAULT_FILE_SIZE,
|
||||
thread_pool: rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(num_threads)
|
||||
@ -363,13 +352,20 @@ impl Default for AccountsDB {
|
||||
impl AccountsDB {
|
||||
pub fn new(paths: Option<String>) -> Self {
|
||||
if let Some(paths) = paths {
|
||||
AccountsDB {
|
||||
Self {
|
||||
paths: RwLock::new(get_paths_vec(&paths)),
|
||||
temp_paths: None,
|
||||
..AccountsDB::default()
|
||||
..Self::default()
|
||||
}
|
||||
} else {
|
||||
AccountsDB::default()
|
||||
// Create a temprorary set of accounts directories, used primarily
|
||||
// for testing
|
||||
let (temp_dirs, paths) = get_temp_accounts_paths(DEFAULT_NUM_DIRS).unwrap();
|
||||
Self {
|
||||
paths: RwLock::new(get_paths_vec(&paths)),
|
||||
temp_paths: Some(temp_dirs),
|
||||
..Self::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -389,42 +385,87 @@ impl AccountsDB {
|
||||
}
|
||||
|
||||
pub fn paths(&self) -> String {
|
||||
self.paths.read().unwrap().join(",")
|
||||
let paths: Vec<String> = self
|
||||
.paths
|
||||
.read()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|p| p.to_str().unwrap().to_owned())
|
||||
.collect();
|
||||
paths.join(",")
|
||||
}
|
||||
|
||||
pub fn update_from_stream<R: Read>(
|
||||
pub fn accounts_from_stream<R: Read, P: AsRef<Path>>(
|
||||
&self,
|
||||
mut stream: &mut BufReader<R>,
|
||||
) -> Result<(), std::io::Error> {
|
||||
let _len: usize = deserialize_from(&mut stream)
|
||||
.map_err(|_| AccountsDB::get_io_error("len deserialize error"))?;
|
||||
*self.paths.write().unwrap() = deserialize_from(&mut stream)
|
||||
.map_err(|_| AccountsDB::get_io_error("paths deserialize error"))?;
|
||||
let mut storage: AccountStorage = deserialize_from(&mut stream)
|
||||
.map_err(|_| AccountsDB::get_io_error("storage deserialize error"))?;
|
||||
local_account_paths: String,
|
||||
append_vecs_path: P,
|
||||
) -> Result<(), IOError> {
|
||||
let _len: usize =
|
||||
deserialize_from(&mut stream).map_err(|e| AccountsDB::get_io_error(&e.to_string()))?;
|
||||
let storage: AccountStorage =
|
||||
deserialize_from(&mut stream).map_err(|e| AccountsDB::get_io_error(&e.to_string()))?;
|
||||
|
||||
// Remap the deserialized AppendVec paths to point to correct local paths
|
||||
let local_account_paths = get_paths_vec(&local_account_paths);
|
||||
let new_storage_map: Result<HashMap<Fork, ForkStores>, IOError> = storage
|
||||
.0
|
||||
.into_iter()
|
||||
.map(|(fork_id, mut fork_storage)| {
|
||||
let mut new_fork_storage = HashMap::new();
|
||||
for (id, storage_entry) in fork_storage.drain() {
|
||||
let path_index = thread_rng().gen_range(0, local_account_paths.len());
|
||||
let local_dir = &local_account_paths[path_index];
|
||||
|
||||
// Move the corresponding AppendVec from the snapshot into the directory pointed
|
||||
// at by `local_dir`
|
||||
let append_vec_relative_path =
|
||||
AppendVec::new_relative_path(fork_id, storage_entry.id);
|
||||
let append_vec_abs_path =
|
||||
append_vecs_path.as_ref().join(&append_vec_relative_path);
|
||||
let mut copy_options = CopyOptions::new();
|
||||
copy_options.overwrite = true;
|
||||
fs_extra::move_items(&vec![append_vec_abs_path], &local_dir, ©_options)
|
||||
.map_err(|e| AccountsDB::get_io_error(&e.to_string()))?;
|
||||
|
||||
// Notify the AppendVec of the new file location
|
||||
let local_path = local_dir.join(append_vec_relative_path);
|
||||
let mut u_storage_entry = Arc::try_unwrap(storage_entry).unwrap();
|
||||
u_storage_entry
|
||||
.set_file(local_path)
|
||||
.map_err(|e| AccountsDB::get_io_error(&e.to_string()))?;
|
||||
new_fork_storage.insert(id, Arc::new(u_storage_entry));
|
||||
}
|
||||
Ok((fork_id, new_fork_storage))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let new_storage_map = new_storage_map?;
|
||||
let storage = AccountStorage(new_storage_map);
|
||||
let version: u64 = deserialize_from(&mut stream)
|
||||
.map_err(|_| AccountsDB::get_io_error("write version deserialize error"))?;
|
||||
|
||||
let mut ids: Vec<usize> = storage
|
||||
// Process deserialized data, set necessary fields in self
|
||||
*self.paths.write().unwrap() = local_account_paths;
|
||||
let max_id: usize = *storage
|
||||
.0
|
||||
.values()
|
||||
.flat_map(HashMap::keys)
|
||||
.cloned()
|
||||
.collect();
|
||||
ids.sort();
|
||||
.max()
|
||||
.expect("At least one storage entry must exist from deserializing stream");
|
||||
|
||||
{
|
||||
let mut stores = self.storage.write().unwrap();
|
||||
if let Some((_, store0)) = storage.0.remove_entry(&0) {
|
||||
/*if let Some((_, store0)) = storage.0.remove_entry(&0) {
|
||||
let fork_storage0 = stores.0.entry(0).or_insert_with(HashMap::new);
|
||||
for (id, store) in store0.iter() {
|
||||
fork_storage0.insert(*id, store.clone());
|
||||
}
|
||||
}
|
||||
}*/
|
||||
stores.0.extend(storage.0);
|
||||
}
|
||||
self.next_id
|
||||
.store(ids[ids.len() - 1] + 1, Ordering::Relaxed);
|
||||
|
||||
self.next_id.store(max_id + 1, Ordering::Relaxed);
|
||||
self.write_version
|
||||
.fetch_add(version as usize, Ordering::Relaxed);
|
||||
self.generate_index();
|
||||
@ -757,6 +798,15 @@ impl AccountsDB {
|
||||
self.accounts_index.write().unwrap().add_root(fork)
|
||||
}
|
||||
|
||||
pub fn get_storage_entries(&self) -> Vec<Arc<AccountStorageEntry>> {
|
||||
let r_storage = self.storage.read().unwrap();
|
||||
r_storage
|
||||
.0
|
||||
.values()
|
||||
.flat_map(|fork_store| fork_store.values().cloned())
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn merge(
|
||||
dest: &mut HashMap<Pubkey, (u64, AccountInfo)>,
|
||||
source: &HashMap<Pubkey, (u64, AccountInfo)>,
|
||||
@ -771,9 +821,9 @@ impl AccountsDB {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_io_error(error: &str) -> Error {
|
||||
fn get_io_error(error: &str) -> IOError {
|
||||
warn!("AccountsDB error: {:?}", error);
|
||||
Error::new(ErrorKind::Other, error)
|
||||
IOError::new(ErrorKind::Other, error)
|
||||
}
|
||||
|
||||
fn generate_index(&self) {
|
||||
@ -781,7 +831,6 @@ impl AccountsDB {
|
||||
let mut forks: Vec<Fork> = storage.0.keys().cloned().collect();
|
||||
forks.sort();
|
||||
let mut accounts_index = self.accounts_index.write().unwrap();
|
||||
accounts_index.roots.insert(0);
|
||||
for fork_id in forks.iter() {
|
||||
let mut accumulator: Vec<HashMap<Pubkey, (u64, AccountInfo)>> = self
|
||||
.scan_account_storage(
|
||||
@ -823,13 +872,8 @@ impl Serialize for AccountsDB {
|
||||
{
|
||||
use serde::ser::Error;
|
||||
let storage = self.storage.read().unwrap();
|
||||
let len = serialized_size(&self.paths).unwrap()
|
||||
+ serialized_size(&*storage).unwrap()
|
||||
+ std::mem::size_of::<u64>() as u64;
|
||||
let mut buf = vec![0u8; len as usize];
|
||||
let mut wr = Cursor::new(&mut buf[..]);
|
||||
let mut wr = Cursor::new(vec![]);
|
||||
let version: u64 = self.write_version.load(Ordering::Relaxed) as u64;
|
||||
serialize_into(&mut wr, &self.paths).map_err(Error::custom)?;
|
||||
serialize_into(&mut wr, &*storage).map_err(Error::custom)?;
|
||||
serialize_into(&mut wr, &version).map_err(Error::custom)?;
|
||||
let len = wr.position() as usize;
|
||||
@ -838,13 +882,15 @@ impl Serialize for AccountsDB {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
pub mod tests {
|
||||
// TODO: all the bank tests are bank specific, issue: 2194
|
||||
use super::*;
|
||||
use bincode::{serialize_into, serialized_size};
|
||||
use maplit::hashmap;
|
||||
use rand::{thread_rng, Rng};
|
||||
use solana_sdk::account::Account;
|
||||
use std::fs;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[test]
|
||||
fn test_accountsdb_add_root() {
|
||||
@ -1126,8 +1172,8 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_account_one() {
|
||||
let paths = get_temp_accounts_path("one");
|
||||
let db = AccountsDB::new(Some(paths.paths.clone()));
|
||||
let (_accounts_dirs, paths) = get_temp_accounts_paths(1).unwrap();
|
||||
let db = AccountsDB::new(Some(paths));
|
||||
let mut pubkeys: Vec<Pubkey> = vec![];
|
||||
create_account(&db, &mut pubkeys, 0, 1, 0, 0);
|
||||
let ancestors = vec![(0, 0)].into_iter().collect();
|
||||
@ -1139,8 +1185,8 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_account_many() {
|
||||
let paths = get_temp_accounts_path("many0,many1");
|
||||
let db = AccountsDB::new(Some(paths.paths.clone()));
|
||||
let (_accounts_dirs, paths) = get_temp_accounts_paths(2).unwrap();
|
||||
let db = AccountsDB::new(Some(paths));
|
||||
let mut pubkeys: Vec<Pubkey> = vec![];
|
||||
create_account(&db, &mut pubkeys, 0, 100, 0, 0);
|
||||
check_accounts(&db, &pubkeys, 0, 100, 1);
|
||||
@ -1157,9 +1203,9 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_account_grow_many() {
|
||||
let paths = get_temp_accounts_path("many2,many3");
|
||||
let (_accounts_dir, paths) = get_temp_accounts_paths(2).unwrap();
|
||||
let size = 4096;
|
||||
let accounts = AccountsDB::new_sized(Some(paths.paths.clone()), size);
|
||||
let accounts = AccountsDB::new_sized(Some(paths), size);
|
||||
let mut keys = vec![];
|
||||
for i in 0..9 {
|
||||
let key = Pubkey::new_rand();
|
||||
@ -1335,12 +1381,22 @@ mod tests {
|
||||
|
||||
let mut reader = BufReader::new(&buf[..]);
|
||||
let daccounts = AccountsDB::new(None);
|
||||
assert!(daccounts.update_from_stream(&mut reader).is_ok());
|
||||
let local_paths = daccounts.paths();
|
||||
let copied_accounts = TempDir::new().unwrap();
|
||||
// Simulate obtaining a copy of the AppendVecs from a tarball
|
||||
copy_append_vecs(&accounts, copied_accounts.path()).unwrap();
|
||||
daccounts
|
||||
.accounts_from_stream(&mut reader, local_paths, copied_accounts.path())
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
daccounts.write_version.load(Ordering::Relaxed),
|
||||
accounts.write_version.load(Ordering::Relaxed)
|
||||
);
|
||||
assert_eq!(daccounts.paths(), accounts.paths());
|
||||
|
||||
assert_eq!(
|
||||
daccounts.next_id.load(Ordering::Relaxed),
|
||||
accounts.next_id.load(Ordering::Relaxed)
|
||||
);
|
||||
|
||||
check_accounts(&daccounts, &pubkeys, 0, 100, 2);
|
||||
check_accounts(&daccounts, &pubkeys1, 1, 10, 1);
|
||||
@ -1439,4 +1495,23 @@ mod tests {
|
||||
let ret = db.load_slow(&ancestors, &key).unwrap();
|
||||
assert_eq!(ret.0.data.len(), data_len);
|
||||
}
|
||||
|
||||
pub fn copy_append_vecs<P: AsRef<Path>>(
|
||||
accounts_db: &AccountsDB,
|
||||
output_dir: P,
|
||||
) -> IOResult<()> {
|
||||
let storage_entries = accounts_db.get_storage_entries();
|
||||
for storage in storage_entries {
|
||||
let storage_path = storage.get_path();
|
||||
let output_path = output_dir.as_ref().join(
|
||||
storage_path
|
||||
.file_name()
|
||||
.expect("Invalid AppendVec file path"),
|
||||
);
|
||||
|
||||
fs::copy(storage_path, output_path)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,3 @@
|
||||
use log::*;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::{RwLock, RwLockReadGuard};
|
||||
@ -37,7 +36,6 @@ impl<T: Clone> AccountsIndex<T> {
|
||||
let mut rv = None;
|
||||
for (i, (fork, _t)) in list.iter().rev().enumerate() {
|
||||
if *fork >= max && (ancestors.get(fork).is_some() || self.is_root(*fork)) {
|
||||
trace!("GET {} {:?} i: {}", fork, ancestors, i);
|
||||
rv = Some((list.len() - 1) - i);
|
||||
max = *fork;
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ use solana_sdk::account::Account;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use std::fmt;
|
||||
use std::fs::{create_dir_all, remove_file, OpenOptions};
|
||||
use std::io;
|
||||
use std::io::{Cursor, Seek, SeekFrom, Write};
|
||||
use std::mem;
|
||||
use std::path::{Path, PathBuf};
|
||||
@ -151,6 +152,28 @@ impl AppendVec {
|
||||
self.file_size
|
||||
}
|
||||
|
||||
// Get the file path relative to the top level accounts directory
|
||||
pub fn get_relative_path<P: AsRef<Path>>(append_vec_path: P) -> Option<PathBuf> {
|
||||
append_vec_path.as_ref().file_name().map(PathBuf::from)
|
||||
}
|
||||
|
||||
pub fn new_relative_path(fork_id: u64, id: usize) -> PathBuf {
|
||||
PathBuf::from(&format!("{}.{}", fork_id, id))
|
||||
}
|
||||
|
||||
pub fn set_file<P: AsRef<Path>>(&mut self, path: P) -> io::Result<()> {
|
||||
self.path = path.as_ref().to_path_buf();
|
||||
let data = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(false)
|
||||
.open(&path)?;
|
||||
|
||||
let map = unsafe { MmapMut::map_mut(&data)? };
|
||||
self.map = map;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_slice(&self, offset: usize, size: usize) -> Option<(&[u8], usize)> {
|
||||
let len = self.len();
|
||||
|
||||
@ -370,25 +393,21 @@ impl<'a> serde::de::Visitor<'a> for AppendVecVisitor {
|
||||
}
|
||||
|
||||
#[allow(clippy::mutex_atomic)]
|
||||
// Note this does not initialize a valid Mmap in the AppendVec, needs to be done
|
||||
// externally
|
||||
fn visit_bytes<E>(self, data: &[u8]) -> std::result::Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
use serde::de::Error;
|
||||
let mut rd = Cursor::new(&data[..]);
|
||||
// TODO: this path does not need to be serialized, can remove
|
||||
let path: PathBuf = deserialize_from(&mut rd).map_err(Error::custom)?;
|
||||
let current_len: u64 = deserialize_from(&mut rd).map_err(Error::custom)?;
|
||||
let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?;
|
||||
let offset: usize = deserialize_from(&mut rd).map_err(Error::custom)?;
|
||||
|
||||
let data = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(false)
|
||||
.open(&path)
|
||||
.map_err(|e| Error::custom(e.to_string()))?;
|
||||
|
||||
let map = unsafe { MmapMut::map_mut(&data).map_err(|e| Error::custom(e.to_string()))? };
|
||||
let map = MmapMut::map_anon(1).map_err(|e| Error::custom(e.to_string()))?;
|
||||
Ok(AppendVec {
|
||||
path,
|
||||
map,
|
||||
@ -492,21 +511,37 @@ pub mod tests {
|
||||
assert_eq!(av.get_account_test(index2).unwrap(), account2);
|
||||
assert_eq!(av.get_account_test(index1).unwrap(), account1);
|
||||
|
||||
let mut buf = vec![0u8; serialized_size(&av).unwrap() as usize];
|
||||
let mut writer = Cursor::new(&mut buf[..]);
|
||||
let append_vec_path = &av.path;
|
||||
|
||||
// Serialize the AppendVec
|
||||
let mut writer = Cursor::new(vec![]);
|
||||
serialize_into(&mut writer, &av).unwrap();
|
||||
|
||||
let mut reader = Cursor::new(&mut buf[..]);
|
||||
let dav: AppendVec = deserialize_from(&mut reader).unwrap();
|
||||
// Deserialize the AppendVec
|
||||
let buf = writer.into_inner();
|
||||
let mut reader = Cursor::new(&buf[..]);
|
||||
let mut dav: AppendVec = deserialize_from(&mut reader).unwrap();
|
||||
|
||||
// Set the AppendVec path
|
||||
dav.set_file(append_vec_path).unwrap();
|
||||
assert_eq!(dav.get_account_test(index2).unwrap(), account2);
|
||||
assert_eq!(dav.get_account_test(index1).unwrap(), account1);
|
||||
drop(dav);
|
||||
|
||||
// dropping dav above blows away underlying file's directory entry,
|
||||
// which is what we're testing next.
|
||||
let mut reader = Cursor::new(&mut buf[..]);
|
||||
let dav: Result<AppendVec, Box<bincode::ErrorKind>> = deserialize_from(&mut reader);
|
||||
assert!(dav.is_err());
|
||||
// Dropping dav above blows away underlying file's directory entry, so
|
||||
// trying to set the file will fail
|
||||
let mut reader = Cursor::new(&buf[..]);
|
||||
let mut dav: AppendVec = deserialize_from(&mut reader).unwrap();
|
||||
assert!(dav.set_file(append_vec_path).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_relative_path() {
|
||||
let relative_path = AppendVec::new_relative_path(0, 2);
|
||||
let full_path = Path::new("/tmp").join(&relative_path);
|
||||
assert_eq!(
|
||||
relative_path,
|
||||
AppendVec::get_relative_path(full_path).unwrap()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -45,7 +45,8 @@ use solana_sdk::transaction::{Result, Transaction, TransactionError};
|
||||
use std::cmp;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::io::{BufReader, Cursor, Read};
|
||||
use std::io::{BufReader, Cursor, Error as IOError, Read};
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, RwLock, RwLockReadGuard};
|
||||
|
||||
@ -63,8 +64,8 @@ pub struct BankRc {
|
||||
}
|
||||
|
||||
impl BankRc {
|
||||
pub fn new(account_paths: Option<String>, id: AppendVecId) -> Self {
|
||||
let accounts = Accounts::new(account_paths);
|
||||
pub fn new(account_paths: String, id: AppendVecId) -> Self {
|
||||
let accounts = Accounts::new(Some(account_paths));
|
||||
accounts
|
||||
.accounts_db
|
||||
.next_id
|
||||
@ -75,25 +76,25 @@ impl BankRc {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_from_stream<R: Read>(
|
||||
pub fn accounts_from_stream<R: Read, P: AsRef<Path>>(
|
||||
&self,
|
||||
mut stream: &mut BufReader<R>,
|
||||
) -> std::result::Result<(), std::io::Error> {
|
||||
let _len: usize = deserialize_from(&mut stream)
|
||||
.map_err(|_| BankRc::get_io_error("len deserialize error"))?;
|
||||
self.accounts.update_from_stream(stream)
|
||||
local_paths: String,
|
||||
append_vecs_path: P,
|
||||
) -> std::result::Result<(), IOError> {
|
||||
let _len: usize =
|
||||
deserialize_from(&mut stream).map_err(|e| BankRc::get_io_error(&e.to_string()))?;
|
||||
self.accounts
|
||||
.accounts_from_stream(stream, local_paths, append_vecs_path)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_storage_entries(&self) -> Vec<Arc<AccountStorageEntry>> {
|
||||
let r_storage = self.accounts.accounts_db.storage.read().unwrap();
|
||||
r_storage
|
||||
.0
|
||||
.values()
|
||||
.flat_map(|fork_store| fork_store.values().cloned())
|
||||
.collect()
|
||||
self.accounts.accounts_db.get_storage_entries()
|
||||
}
|
||||
|
||||
fn get_io_error(error: &str) -> std::io::Error {
|
||||
fn get_io_error(error: &str) -> IOError {
|
||||
warn!("BankRc error: {:?}", error);
|
||||
std::io::Error::new(std::io::ErrorKind::Other, error)
|
||||
}
|
||||
@ -105,9 +106,7 @@ impl Serialize for BankRc {
|
||||
S: serde::ser::Serializer,
|
||||
{
|
||||
use serde::ser::Error;
|
||||
let len = serialized_size(&*self.accounts.accounts_db).unwrap();
|
||||
let mut buf = vec![0u8; len as usize];
|
||||
let mut wr = Cursor::new(&mut buf[..]);
|
||||
let mut wr = Cursor::new(Vec::new());
|
||||
serialize_into(&mut wr, &*self.accounts.accounts_db).map_err(Error::custom)?;
|
||||
let len = wr.position() as usize;
|
||||
serializer.serialize_bytes(&wr.into_inner()[..len])
|
||||
@ -393,7 +392,7 @@ impl Bank {
|
||||
|
||||
pub fn create_with_genesis(
|
||||
genesis_block: &GenesisBlock,
|
||||
account_paths: Option<String>,
|
||||
account_paths: String,
|
||||
status_cache_rc: &StatusCacheRc,
|
||||
id: AppendVecId,
|
||||
) -> Self {
|
||||
@ -1472,9 +1471,10 @@ impl Bank {
|
||||
let dbhq = dbank.blockhash_queue.read().unwrap();
|
||||
assert_eq!(*bhq, *dbhq);
|
||||
|
||||
let sc = self.src.status_cache.read().unwrap();
|
||||
// TODO: Uncomment once status cache serialization is done
|
||||
/*let sc = self.src.status_cache.read().unwrap();
|
||||
let dsc = dbank.src.status_cache.read().unwrap();
|
||||
assert_eq!(*sc, *dsc);
|
||||
assert_eq!(*sc, *dsc);*/
|
||||
assert_eq!(
|
||||
self.rc.accounts.hash_internal_state(self.slot),
|
||||
dbank.rc.accounts.hash_internal_state(dbank.slot)
|
||||
@ -1498,6 +1498,8 @@ impl Drop for Bank {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::accounts_db::get_temp_accounts_paths;
|
||||
use crate::accounts_db::tests::copy_append_vecs;
|
||||
use crate::epoch_schedule::MINIMUM_SLOTS_PER_EPOCH;
|
||||
use crate::genesis_utils::{
|
||||
create_genesis_block_with_leader, GenesisBlockInfo, BOOTSTRAP_LEADER_LAMPORTS,
|
||||
@ -1517,6 +1519,7 @@ mod tests {
|
||||
use solana_vote_api::vote_state::{VoteState, MAX_LOCKOUT_HISTORY};
|
||||
use std::io::Cursor;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[test]
|
||||
fn test_bank_new() {
|
||||
@ -2804,8 +2807,20 @@ mod tests {
|
||||
let mut rdr = Cursor::new(&buf[..]);
|
||||
let mut dbank: Bank = deserialize_from(&mut rdr).unwrap();
|
||||
let mut reader = BufReader::new(&buf[rdr.position() as usize..]);
|
||||
dbank.set_bank_rc(&BankRc::new(None, 0), &StatusCacheRc::default());
|
||||
assert!(dbank.rc.update_from_stream(&mut reader).is_ok());
|
||||
|
||||
// Create a new set of directories for this bank's accounts
|
||||
let (_accounts_dir, dbank_paths) = get_temp_accounts_paths(4).unwrap();;
|
||||
dbank.set_bank_rc(
|
||||
&BankRc::new(dbank_paths.clone(), 0),
|
||||
&StatusCacheRc::default(),
|
||||
);
|
||||
// Create a directory to simulate AppendVecs unpackaged from a snapshot tar
|
||||
let copied_accounts = TempDir::new().unwrap();
|
||||
copy_append_vecs(&bank.rc.accounts.accounts_db, copied_accounts.path()).unwrap();
|
||||
dbank
|
||||
.rc
|
||||
.accounts_from_stream(&mut reader, dbank_paths, copied_accounts.path())
|
||||
.unwrap();
|
||||
assert_eq!(dbank.get_balance(&key.pubkey()), 10);
|
||||
bank.compare_bank(&dbank);
|
||||
}
|
||||
|
@ -32,3 +32,6 @@ extern crate solana_bpf_loader_program;
|
||||
|
||||
#[macro_use]
|
||||
extern crate serde_derive;
|
||||
|
||||
extern crate fs_extra;
|
||||
extern crate tempfile;
|
||||
|
Reference in New Issue
Block a user