Cleanup AccountStorage apis

Remove duplicate code
This commit is contained in:
Stephen Akridge
2019-03-06 15:12:50 -08:00
committed by sakridge
parent a4a3995a84
commit 61fbea3ee4

View File

@ -113,7 +113,7 @@ struct AccountIndex {
} }
/// Persistent storage structure holding the accounts /// Persistent storage structure holding the accounts
struct AccountStorage { struct AccountStorageEntry {
/// storage holding the accounts /// storage holding the accounts
accounts: Arc<RwLock<AppendVec<Account>>>, accounts: Arc<RwLock<AppendVec<Account>>>,
@ -129,7 +129,27 @@ struct AccountStorage {
path: String, path: String,
} }
impl AccountStorage { impl AccountStorageEntry {
pub fn new(path: String, id: usize) -> Self {
let p = format!("{}/{}", path, id);
let path = Path::new(&p);
let _ignored = remove_dir_all(path);
create_dir_all(path).expect("Create directory failed");
let accounts = Arc::new(RwLock::new(AppendVec::<Account>::new(
&path.join(ACCOUNT_DATA_FILE),
true,
ACCOUNT_DATA_FILE_SIZE,
0,
)));
AccountStorageEntry {
accounts,
count: AtomicUsize::new(0),
status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize),
path: p,
}
}
pub fn set_status(&self, status: AccountStorageStatus) { pub fn set_status(&self, status: AccountStorageStatus) {
self.status.store(status as usize, Ordering::Relaxed); self.status.store(status as usize, Ordering::Relaxed);
} }
@ -137,8 +157,21 @@ impl AccountStorage {
pub fn get_status(&self) -> AccountStorageStatus { pub fn get_status(&self) -> AccountStorageStatus {
self.status.load(Ordering::Relaxed).into() self.status.load(Ordering::Relaxed).into()
} }
fn add_account(&self) {
self.count.fetch_add(1, Ordering::Relaxed);
}
fn remove_account(&self) {
if self.count.fetch_sub(1, Ordering::Relaxed) == 1 {
self.accounts.write().unwrap().reset();
self.set_status(AccountStorageStatus::StorageAvailable);
}
}
} }
type AccountStorage = Vec<AccountStorageEntry>;
#[derive(Default, Debug)] #[derive(Default, Debug)]
struct ForkInfo { struct ForkInfo {
/// The number of transactions processed without error /// The number of transactions processed without error
@ -154,7 +187,7 @@ pub struct AccountsDB {
account_index: AccountIndex, account_index: AccountIndex,
/// Account storage /// Account storage
storage: RwLock<Vec<AccountStorage>>, storage: RwLock<AccountStorage>,
/// distribute the accounts across storage lists /// distribute the accounts across storage lists
next_id: AtomicUsize, next_id: AtomicUsize,
@ -231,34 +264,18 @@ impl AccountsDB {
} }
} }
fn add_storage(&self, paths: &str) { fn new_storage_entry(&self, path: String) -> AccountStorageEntry {
let paths = get_paths_vec(&paths); AccountStorageEntry::new(path, self.next_id.fetch_add(1, Ordering::Relaxed))
let mut stores: Vec<AccountStorage> = vec![];
paths.iter().for_each(|p| {
let storage = AccountStorage {
accounts: self.new_account_storage(&p),
status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize),
count: AtomicUsize::new(0),
path: p.to_string(),
};
stores.push(storage);
});
let mut storage = self.storage.write().unwrap();
storage.append(&mut stores);
} }
fn new_account_storage(&self, p: &str) -> Arc<RwLock<AppendVec<Account>>> { fn add_storage(&self, paths: &str) {
let id = self.next_id.fetch_add(1, Ordering::Relaxed); let paths = get_paths_vec(&paths);
let p = format!("{}/{}", p, id); let mut stores = paths
let path = Path::new(&p); .iter()
let _ignored = remove_dir_all(path); .map(|p| self.new_storage_entry(p.to_string()))
create_dir_all(path).expect("Create directory failed"); .collect();
Arc::new(RwLock::new(AppendVec::<Account>::new( let mut storage = self.storage.write().unwrap();
&path.join(ACCOUNT_DATA_FILE), storage.append(&mut stores);
true,
ACCOUNT_DATA_FILE_SIZE,
0,
)))
} }
fn get_vote_accounts(&self, fork: Fork) -> HashMap<Pubkey, Account> { fn get_vote_accounts(&self, fork: Fork) -> HashMap<Pubkey, Account> {
@ -378,12 +395,7 @@ impl AccountsDB {
let mut stores = self.storage.write().unwrap(); let mut stores = self.storage.write().unwrap();
// check if new store was already created // check if new store was already created
if stores.len() == len { if stores.len() == len {
let storage = AccountStorage { let storage = self.new_storage_entry(stores[id].path.clone());
accounts: self.new_account_storage(&stores[id].path),
count: AtomicUsize::new(0),
status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize),
path: stores[id].path.clone(),
};
stores.push(storage); stores.push(storage);
} }
id = stores.len() - 1; id = stores.len() - 1;
@ -425,10 +437,7 @@ impl AccountsDB {
for fork in entries.iter() { for fork in entries.iter() {
if let Some((id, _)) = forks.remove(&fork) { if let Some((id, _)) = forks.remove(&fork) {
let stores = self.storage.read().unwrap(); let stores = self.storage.read().unwrap();
if stores[id].count.fetch_sub(1, Ordering::Relaxed) == 1 { stores[id].remove_account();
stores[id].accounts.write().unwrap().reset();
stores[id].set_status(AccountStorageStatus::StorageAvailable);
}
} }
} }
forks.is_empty() forks.is_empty()
@ -469,12 +478,9 @@ impl AccountsDB {
fn insert_account_entry(&self, fork: Fork, id: AppendVecId, offset: u64, map: &AccountMap) { fn insert_account_entry(&self, fork: Fork, id: AppendVecId, offset: u64, map: &AccountMap) {
let mut forks = map.write().unwrap(); let mut forks = map.write().unwrap();
let stores = self.storage.read().unwrap(); let stores = self.storage.read().unwrap();
stores[id].count.fetch_add(1, Ordering::Relaxed); stores[id].add_account();
if let Some((old_id, _)) = forks.insert(fork, (id, offset)) { if let Some((old_id, _)) = forks.insert(fork, (id, offset)) {
if stores[old_id].count.fetch_sub(1, Ordering::Relaxed) == 1 { stores[old_id].remove_account();
stores[old_id].accounts.write().unwrap().reset();
stores[old_id].set_status(AccountStorageStatus::StorageAvailable);
}
} }
} }