automerge
This commit is contained in:
@ -10,29 +10,36 @@ use std::sync::{
|
|||||||
use std::thread::{self, sleep, Builder, JoinHandle};
|
use std::thread::{self, sleep, Builder, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
pub struct AccountsCleanupService {
|
pub struct AccountsBackgroundService {
|
||||||
t_cleanup: JoinHandle<()>,
|
t_background: JoinHandle<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AccountsCleanupService {
|
const INTERVAL_MS: u64 = 100;
|
||||||
|
|
||||||
|
impl AccountsBackgroundService {
|
||||||
pub fn new(bank_forks: Arc<RwLock<BankForks>>, exit: &Arc<AtomicBool>) -> Self {
|
pub fn new(bank_forks: Arc<RwLock<BankForks>>, exit: &Arc<AtomicBool>) -> Self {
|
||||||
info!("AccountsCleanupService active");
|
info!("AccountsBackgroundService active");
|
||||||
let exit = exit.clone();
|
let exit = exit.clone();
|
||||||
let t_cleanup = Builder::new()
|
let t_background = Builder::new()
|
||||||
.name("solana-accounts-cleanup".to_string())
|
.name("solana-accounts-background".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
let bank = bank_forks.read().unwrap().working_bank();
|
let bank = bank_forks.read().unwrap().working_bank();
|
||||||
bank.clean_dead_slots();
|
|
||||||
sleep(Duration::from_millis(100));
|
bank.process_dead_slots();
|
||||||
|
|
||||||
|
// Currently, given INTERVAL_MS, we process 1 slot/100 ms
|
||||||
|
bank.process_stale_slot();
|
||||||
|
|
||||||
|
sleep(Duration::from_millis(INTERVAL_MS));
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
Self { t_cleanup }
|
Self { t_background }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn join(self) -> thread::Result<()> {
|
pub fn join(self) -> thread::Result<()> {
|
||||||
self.t_cleanup.join()
|
self.t_background.join()
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -5,7 +5,7 @@
|
|||||||
//! command-line tools to spin up validators and a Rust library
|
//! command-line tools to spin up validators and a Rust library
|
||||||
//!
|
//!
|
||||||
|
|
||||||
pub mod accounts_cleanup_service;
|
pub mod accounts_background_service;
|
||||||
pub mod accounts_hash_verifier;
|
pub mod accounts_hash_verifier;
|
||||||
pub mod banking_stage;
|
pub mod banking_stage;
|
||||||
pub mod broadcast_stage;
|
pub mod broadcast_stage;
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
//! validation pipeline in software.
|
//! validation pipeline in software.
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
accounts_cleanup_service::AccountsCleanupService,
|
accounts_background_service::AccountsBackgroundService,
|
||||||
accounts_hash_verifier::AccountsHashVerifier,
|
accounts_hash_verifier::AccountsHashVerifier,
|
||||||
broadcast_stage::RetransmitSlotsSender,
|
broadcast_stage::RetransmitSlotsSender,
|
||||||
cluster_info::ClusterInfo,
|
cluster_info::ClusterInfo,
|
||||||
@ -49,7 +49,7 @@ pub struct Tvu {
|
|||||||
retransmit_stage: RetransmitStage,
|
retransmit_stage: RetransmitStage,
|
||||||
replay_stage: ReplayStage,
|
replay_stage: ReplayStage,
|
||||||
ledger_cleanup_service: Option<LedgerCleanupService>,
|
ledger_cleanup_service: Option<LedgerCleanupService>,
|
||||||
accounts_cleanup_service: AccountsCleanupService,
|
accounts_background_service: AccountsBackgroundService,
|
||||||
storage_stage: StorageStage,
|
storage_stage: StorageStage,
|
||||||
accounts_hash_verifier: AccountsHashVerifier,
|
accounts_hash_verifier: AccountsHashVerifier,
|
||||||
}
|
}
|
||||||
@ -211,7 +211,7 @@ impl Tvu {
|
|||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
let accounts_cleanup_service = AccountsCleanupService::new(bank_forks.clone(), &exit);
|
let accounts_background_service = AccountsBackgroundService::new(bank_forks.clone(), &exit);
|
||||||
|
|
||||||
let storage_stage = StorageStage::new(
|
let storage_stage = StorageStage::new(
|
||||||
storage_state,
|
storage_state,
|
||||||
@ -231,7 +231,7 @@ impl Tvu {
|
|||||||
retransmit_stage,
|
retransmit_stage,
|
||||||
replay_stage,
|
replay_stage,
|
||||||
ledger_cleanup_service,
|
ledger_cleanup_service,
|
||||||
accounts_cleanup_service,
|
accounts_background_service,
|
||||||
storage_stage,
|
storage_stage,
|
||||||
accounts_hash_verifier,
|
accounts_hash_verifier,
|
||||||
}
|
}
|
||||||
@ -245,7 +245,7 @@ impl Tvu {
|
|||||||
if self.ledger_cleanup_service.is_some() {
|
if self.ledger_cleanup_service.is_some() {
|
||||||
self.ledger_cleanup_service.unwrap().join()?;
|
self.ledger_cleanup_service.unwrap().join()?;
|
||||||
}
|
}
|
||||||
self.accounts_cleanup_service.join()?;
|
self.accounts_background_service.join()?;
|
||||||
self.replay_stage.join()?;
|
self.replay_stage.join()?;
|
||||||
self.accounts_hash_verifier.join()?;
|
self.accounts_hash_verifier.join()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -48,12 +48,13 @@ use std::{
|
|||||||
fmt,
|
fmt,
|
||||||
io::{BufReader, Cursor, Error as IOError, ErrorKind, Read, Result as IOResult},
|
io::{BufReader, Cursor, Error as IOError, ErrorKind, Read, Result as IOResult},
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
|
sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
|
||||||
sync::{Arc, RwLock},
|
sync::{Arc, Mutex, RwLock},
|
||||||
};
|
};
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
|
|
||||||
pub const DEFAULT_FILE_SIZE: u64 = 4 * 1024 * 1024;
|
const PAGE_SIZE: u64 = 4 * 1024;
|
||||||
|
pub const DEFAULT_FILE_SIZE: u64 = PAGE_SIZE * 1024;
|
||||||
pub const DEFAULT_NUM_THREADS: u32 = 8;
|
pub const DEFAULT_NUM_THREADS: u32 = 8;
|
||||||
pub const DEFAULT_NUM_DIRS: u32 = 4;
|
pub const DEFAULT_NUM_DIRS: u32 = 4;
|
||||||
|
|
||||||
@ -371,7 +372,7 @@ impl<'a, 'b> Serialize for AccountsDBSerialize<'a, 'b> {
|
|||||||
{
|
{
|
||||||
use serde::ser::Error;
|
use serde::ser::Error;
|
||||||
let mut wr = Cursor::new(vec![]);
|
let mut wr = Cursor::new(vec![]);
|
||||||
let version: u64 = self.accounts_db.write_version.load(Ordering::Relaxed) as u64;
|
let version = self.accounts_db.write_version.load(Ordering::Relaxed);
|
||||||
let account_storage_serialize = AccountStorageSerialize {
|
let account_storage_serialize = AccountStorageSerialize {
|
||||||
account_storage_entries: self.account_storage_entries,
|
account_storage_entries: self.account_storage_entries,
|
||||||
};
|
};
|
||||||
@ -450,8 +451,9 @@ pub struct AccountsDB {
|
|||||||
|
|
||||||
/// distribute the accounts across storage lists
|
/// distribute the accounts across storage lists
|
||||||
pub next_id: AtomicUsize,
|
pub next_id: AtomicUsize,
|
||||||
|
pub shrink_candidate_slots: Mutex<Vec<Slot>>,
|
||||||
|
|
||||||
write_version: AtomicUsize,
|
write_version: AtomicU64,
|
||||||
|
|
||||||
/// Set of storage paths to pick from
|
/// Set of storage paths to pick from
|
||||||
paths: Vec<PathBuf>,
|
paths: Vec<PathBuf>,
|
||||||
@ -499,7 +501,8 @@ impl Default for AccountsDB {
|
|||||||
accounts_index: RwLock::new(AccountsIndex::default()),
|
accounts_index: RwLock::new(AccountsIndex::default()),
|
||||||
storage: RwLock::new(AccountStorage(HashMap::new())),
|
storage: RwLock::new(AccountStorage(HashMap::new())),
|
||||||
next_id: AtomicUsize::new(0),
|
next_id: AtomicUsize::new(0),
|
||||||
write_version: AtomicUsize::new(0),
|
shrink_candidate_slots: Mutex::new(Vec::new()),
|
||||||
|
write_version: AtomicU64::new(0),
|
||||||
paths: vec![],
|
paths: vec![],
|
||||||
temp_paths: None,
|
temp_paths: None,
|
||||||
file_size: DEFAULT_FILE_SIZE,
|
file_size: DEFAULT_FILE_SIZE,
|
||||||
@ -649,8 +652,7 @@ impl AccountsDB {
|
|||||||
}
|
}
|
||||||
|
|
||||||
self.next_id.store(max_id + 1, Ordering::Relaxed);
|
self.next_id.store(max_id + 1, Ordering::Relaxed);
|
||||||
self.write_version
|
self.write_version.fetch_add(version, Ordering::Relaxed);
|
||||||
.fetch_add(version as usize, Ordering::Relaxed);
|
|
||||||
self.generate_index();
|
self.generate_index();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -879,6 +881,150 @@ impl AccountsDB {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reads all accounts in given slot's AppendVecs and filter only to alive,
|
||||||
|
// then create a minimum AppendVed filled with the alive.
|
||||||
|
fn shrink_stale_slot(&self, slot: Slot) {
|
||||||
|
trace!("shrink_stale_slot: slot: {}", slot);
|
||||||
|
|
||||||
|
let mut stored_accounts = vec![];
|
||||||
|
{
|
||||||
|
let storage = self.storage.read().unwrap();
|
||||||
|
if let Some(stores) = storage.0.get(&slot) {
|
||||||
|
let mut alive_count = 0;
|
||||||
|
for store in stores.values() {
|
||||||
|
alive_count += store.count();
|
||||||
|
let mut start = 0;
|
||||||
|
while let Some((account, next)) = store.accounts.get_account(start) {
|
||||||
|
stored_accounts.push((
|
||||||
|
account.meta.pubkey,
|
||||||
|
account.clone_account(),
|
||||||
|
next - start,
|
||||||
|
(store.id, account.offset),
|
||||||
|
account.meta.write_version,
|
||||||
|
));
|
||||||
|
start = next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (alive_count as f32 / stored_accounts.len() as f32) >= 0.80 {
|
||||||
|
trace!(
|
||||||
|
"shrink_stale_slot: not enough space to shrink: {} / {}",
|
||||||
|
alive_count,
|
||||||
|
stored_accounts.len()
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let alive_accounts: Vec<_> = {
|
||||||
|
let no_ancestors = HashMap::new();
|
||||||
|
let accounts_index = self.accounts_index.read().unwrap();
|
||||||
|
stored_accounts
|
||||||
|
.iter()
|
||||||
|
.filter(
|
||||||
|
|(pubkey, _account, _storage_size, (store_id, offset), _write_version)| {
|
||||||
|
if let Some((list, _)) = accounts_index.get(pubkey, &no_ancestors) {
|
||||||
|
list.iter()
|
||||||
|
.any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset)
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
|
||||||
|
let alive_total: u64 = alive_accounts
|
||||||
|
.iter()
|
||||||
|
.map(|(_pubkey, _account, account_size, _location, _write_verion)| *account_size as u64)
|
||||||
|
.sum();
|
||||||
|
let aligned_total: u64 = (alive_total + (PAGE_SIZE - 1)) & !(PAGE_SIZE - 1);
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
"shrinking: slot: {}, stored_accounts: {} => alive_accounts: {} ({} bytes; aligned to: {})",
|
||||||
|
slot,
|
||||||
|
stored_accounts.len(),
|
||||||
|
alive_accounts.len(),
|
||||||
|
alive_total,
|
||||||
|
aligned_total
|
||||||
|
);
|
||||||
|
|
||||||
|
if aligned_total > 0 {
|
||||||
|
let mut accounts = Vec::with_capacity(alive_accounts.len());
|
||||||
|
let mut hashes = Vec::with_capacity(alive_accounts.len());
|
||||||
|
let mut write_versions = Vec::with_capacity(alive_accounts.len());
|
||||||
|
|
||||||
|
for (pubkey, account, _size, _location, write_version) in alive_accounts {
|
||||||
|
accounts.push((pubkey, account));
|
||||||
|
hashes.push(account.hash);
|
||||||
|
write_versions.push(*write_version);
|
||||||
|
}
|
||||||
|
|
||||||
|
let shrunken_store = self.create_and_insert_store(slot, aligned_total);
|
||||||
|
|
||||||
|
// here, we're writing back alive_accounts. That should be an atomic operation
|
||||||
|
// without use of rather wide locks in this whole function, because we're
|
||||||
|
// mutating rooted slots; There should be no writers to them.
|
||||||
|
let infos = self.store_accounts_to(
|
||||||
|
slot,
|
||||||
|
&accounts,
|
||||||
|
&hashes,
|
||||||
|
|_| shrunken_store.clone(),
|
||||||
|
write_versions.into_iter(),
|
||||||
|
);
|
||||||
|
let reclaims = self.update_index(slot, infos, &accounts);
|
||||||
|
|
||||||
|
self.handle_reclaims(&reclaims);
|
||||||
|
|
||||||
|
let mut storage = self.storage.write().unwrap();
|
||||||
|
if let Some(slot_storage) = storage.0.get_mut(&slot) {
|
||||||
|
slot_storage.retain(|_key, store| store.count() > 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Infinitely returns rooted roots in cyclic order
|
||||||
|
fn next_shrink_slot(&self) -> Option<Slot> {
|
||||||
|
let next = {
|
||||||
|
let mut candidates = self.shrink_candidate_slots.lock().unwrap();
|
||||||
|
candidates.pop()
|
||||||
|
};
|
||||||
|
|
||||||
|
if next.is_some() {
|
||||||
|
next
|
||||||
|
} else {
|
||||||
|
let mut new_all_slots = self.all_root_slots_in_index();
|
||||||
|
let next = new_all_slots.pop();
|
||||||
|
|
||||||
|
let mut candidates = self.shrink_candidate_slots.lock().unwrap();
|
||||||
|
*candidates = new_all_slots;
|
||||||
|
|
||||||
|
next
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn all_root_slots_in_index(&self) -> Vec<Slot> {
|
||||||
|
let index = self.accounts_index.read().unwrap();
|
||||||
|
index.roots.iter().cloned().collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn all_slots_in_storage(&self) -> Vec<Slot> {
|
||||||
|
let storage = self.storage.read().unwrap();
|
||||||
|
storage.0.keys().cloned().collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn process_stale_slot(&self) {
|
||||||
|
if let Some(slot) = self.next_shrink_slot() {
|
||||||
|
self.shrink_stale_slot(slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn shrink_all_stale_slots(&self) {
|
||||||
|
for slot in self.all_slots_in_storage() {
|
||||||
|
self.shrink_stale_slot(slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn scan_accounts<F, A>(&self, ancestors: &HashMap<Slot, usize>, scan_func: F) -> A
|
pub fn scan_accounts<F, A>(&self, ancestors: &HashMap<Slot, usize>, scan_func: F) -> A
|
||||||
where
|
where
|
||||||
F: Fn(&mut A, Option<(&Pubkey, Account, Slot)>) -> (),
|
F: Fn(&mut A, Option<(&Pubkey, Account, Slot)>) -> (),
|
||||||
@ -1140,18 +1286,46 @@ impl AccountsDB {
|
|||||||
hasher.result()
|
hasher.result()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn bulk_assign_write_version(&self, count: usize) -> u64 {
|
||||||
|
self.write_version
|
||||||
|
.fetch_add(count as u64, Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
fn store_accounts(
|
fn store_accounts(
|
||||||
&self,
|
&self,
|
||||||
slot: Slot,
|
slot: Slot,
|
||||||
accounts: &[(&Pubkey, &Account)],
|
accounts: &[(&Pubkey, &Account)],
|
||||||
hashes: &[Hash],
|
hashes: &[Hash],
|
||||||
) -> Vec<AccountInfo> {
|
) -> Vec<AccountInfo> {
|
||||||
let default_account = Account::default();
|
let mut current_version = self.bulk_assign_write_version(accounts.len());
|
||||||
|
let write_version_producer = std::iter::from_fn(move || {
|
||||||
|
let ret = current_version;
|
||||||
|
current_version += 1;
|
||||||
|
Some(ret)
|
||||||
|
});
|
||||||
|
|
||||||
|
let storage_finder = |slot| self.find_storage_candidate(slot);
|
||||||
|
self.store_accounts_to(
|
||||||
|
slot,
|
||||||
|
accounts,
|
||||||
|
hashes,
|
||||||
|
storage_finder,
|
||||||
|
write_version_producer,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn store_accounts_to<F: FnMut(Slot) -> Arc<AccountStorageEntry>, P: Iterator<Item = u64>>(
|
||||||
|
&self,
|
||||||
|
slot: Slot,
|
||||||
|
accounts: &[(&Pubkey, &Account)],
|
||||||
|
hashes: &[Hash],
|
||||||
|
mut storage_finder: F,
|
||||||
|
mut write_version_producer: P,
|
||||||
|
) -> Vec<AccountInfo> {
|
||||||
|
let default_account = Account::default();
|
||||||
let with_meta: Vec<(StoredMeta, &Account)> = accounts
|
let with_meta: Vec<(StoredMeta, &Account)> = accounts
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(pubkey, account)| {
|
.map(|(pubkey, account)| {
|
||||||
let write_version = self.write_version.fetch_add(1, Ordering::Relaxed) as u64;
|
|
||||||
let account = if account.lamports == 0 {
|
let account = if account.lamports == 0 {
|
||||||
&default_account
|
&default_account
|
||||||
} else {
|
} else {
|
||||||
@ -1160,7 +1334,7 @@ impl AccountsDB {
|
|||||||
let data_len = account.data.len() as u64;
|
let data_len = account.data.len() as u64;
|
||||||
|
|
||||||
let meta = StoredMeta {
|
let meta = StoredMeta {
|
||||||
write_version,
|
write_version: write_version_producer.next().unwrap(),
|
||||||
pubkey: **pubkey,
|
pubkey: **pubkey,
|
||||||
data_len,
|
data_len,
|
||||||
};
|
};
|
||||||
@ -1169,7 +1343,7 @@ impl AccountsDB {
|
|||||||
.collect();
|
.collect();
|
||||||
let mut infos: Vec<AccountInfo> = Vec::with_capacity(with_meta.len());
|
let mut infos: Vec<AccountInfo> = Vec::with_capacity(with_meta.len());
|
||||||
while infos.len() < with_meta.len() {
|
while infos.len() < with_meta.len() {
|
||||||
let storage = self.find_storage_candidate(slot);
|
let storage = storage_finder(slot);
|
||||||
let rvs = storage
|
let rvs = storage
|
||||||
.accounts
|
.accounts
|
||||||
.append_accounts(&with_meta[infos.len()..], &hashes[infos.len()..]);
|
.append_accounts(&with_meta[infos.len()..], &hashes[infos.len()..]);
|
||||||
@ -2217,12 +2391,26 @@ pub mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl AccountsDB {
|
impl AccountsDB {
|
||||||
fn store_count_for_slot(&self, slot: Slot) -> usize {
|
fn alive_account_count_in_store(&self, slot: Slot) -> usize {
|
||||||
let storage = self.storage.read().unwrap();
|
let storage = self.storage.read().unwrap();
|
||||||
|
|
||||||
let slot_storage = storage.0.get(&slot);
|
let slot_storage = storage.0.get(&slot);
|
||||||
if let Some(slot_storage) = slot_storage {
|
if let Some(slot_storage) = slot_storage {
|
||||||
slot_storage.values().nth(0).unwrap().count()
|
slot_storage.values().map(|store| store.count()).sum()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn all_account_count_in_append_vec(&self, slot: Slot) -> usize {
|
||||||
|
let storage = self.storage.read().unwrap();
|
||||||
|
|
||||||
|
let slot_storage = storage.0.get(&slot);
|
||||||
|
if let Some(slot_storage) = slot_storage {
|
||||||
|
slot_storage
|
||||||
|
.values()
|
||||||
|
.map(|store| store.accounts.accounts(0).len())
|
||||||
|
.sum()
|
||||||
} else {
|
} else {
|
||||||
0
|
0
|
||||||
}
|
}
|
||||||
@ -2256,14 +2444,14 @@ pub mod tests {
|
|||||||
accounts.add_root(1);
|
accounts.add_root(1);
|
||||||
|
|
||||||
//even if rooted, old state isn't cleaned up
|
//even if rooted, old state isn't cleaned up
|
||||||
assert_eq!(accounts.store_count_for_slot(0), 1);
|
assert_eq!(accounts.alive_account_count_in_store(0), 1);
|
||||||
assert_eq!(accounts.store_count_for_slot(1), 1);
|
assert_eq!(accounts.alive_account_count_in_store(1), 1);
|
||||||
|
|
||||||
accounts.clean_accounts();
|
accounts.clean_accounts();
|
||||||
|
|
||||||
//now old state is cleaned up
|
//now old state is cleaned up
|
||||||
assert_eq!(accounts.store_count_for_slot(0), 0);
|
assert_eq!(accounts.alive_account_count_in_store(0), 0);
|
||||||
assert_eq!(accounts.store_count_for_slot(1), 1);
|
assert_eq!(accounts.alive_account_count_in_store(1), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -2286,14 +2474,14 @@ pub mod tests {
|
|||||||
accounts.add_root(1);
|
accounts.add_root(1);
|
||||||
|
|
||||||
//even if rooted, old state isn't cleaned up
|
//even if rooted, old state isn't cleaned up
|
||||||
assert_eq!(accounts.store_count_for_slot(0), 2);
|
assert_eq!(accounts.alive_account_count_in_store(0), 2);
|
||||||
assert_eq!(accounts.store_count_for_slot(1), 2);
|
assert_eq!(accounts.alive_account_count_in_store(1), 2);
|
||||||
|
|
||||||
accounts.clean_accounts();
|
accounts.clean_accounts();
|
||||||
|
|
||||||
//still old state behind zero-lamport account isn't cleaned up
|
//still old state behind zero-lamport account isn't cleaned up
|
||||||
assert_eq!(accounts.store_count_for_slot(0), 1);
|
assert_eq!(accounts.alive_account_count_in_store(0), 1);
|
||||||
assert_eq!(accounts.store_count_for_slot(1), 2);
|
assert_eq!(accounts.alive_account_count_in_store(1), 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -2317,16 +2505,16 @@ pub mod tests {
|
|||||||
accounts.add_root(2);
|
accounts.add_root(2);
|
||||||
|
|
||||||
//even if rooted, old state isn't cleaned up
|
//even if rooted, old state isn't cleaned up
|
||||||
assert_eq!(accounts.store_count_for_slot(0), 2);
|
assert_eq!(accounts.alive_account_count_in_store(0), 2);
|
||||||
assert_eq!(accounts.store_count_for_slot(1), 1);
|
assert_eq!(accounts.alive_account_count_in_store(1), 1);
|
||||||
assert_eq!(accounts.store_count_for_slot(2), 1);
|
assert_eq!(accounts.alive_account_count_in_store(2), 1);
|
||||||
|
|
||||||
accounts.clean_accounts();
|
accounts.clean_accounts();
|
||||||
|
|
||||||
//both zero lamport and normal accounts are cleaned up
|
//both zero lamport and normal accounts are cleaned up
|
||||||
assert_eq!(accounts.store_count_for_slot(0), 0);
|
assert_eq!(accounts.alive_account_count_in_store(0), 0);
|
||||||
assert_eq!(accounts.store_count_for_slot(1), 0);
|
assert_eq!(accounts.alive_account_count_in_store(1), 0);
|
||||||
assert_eq!(accounts.store_count_for_slot(2), 1);
|
assert_eq!(accounts.alive_account_count_in_store(2), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -3429,11 +3617,11 @@ pub mod tests {
|
|||||||
|
|
||||||
// B: Test multiple updates to pubkey1 in a single slot/storage
|
// B: Test multiple updates to pubkey1 in a single slot/storage
|
||||||
current_slot += 1;
|
current_slot += 1;
|
||||||
assert_eq!(0, accounts.store_count_for_slot(current_slot));
|
assert_eq!(0, accounts.alive_account_count_in_store(current_slot));
|
||||||
assert_eq!(1, accounts.ref_count_for_pubkey(&pubkey1));
|
assert_eq!(1, accounts.ref_count_for_pubkey(&pubkey1));
|
||||||
accounts.store(current_slot, &[(&pubkey1, &account2)]);
|
accounts.store(current_slot, &[(&pubkey1, &account2)]);
|
||||||
accounts.store(current_slot, &[(&pubkey1, &account2)]);
|
accounts.store(current_slot, &[(&pubkey1, &account2)]);
|
||||||
assert_eq!(1, accounts.store_count_for_slot(current_slot));
|
assert_eq!(1, accounts.alive_account_count_in_store(current_slot));
|
||||||
assert_eq!(3, accounts.ref_count_for_pubkey(&pubkey1));
|
assert_eq!(3, accounts.ref_count_for_pubkey(&pubkey1));
|
||||||
accounts.add_root(current_slot);
|
accounts.add_root(current_slot);
|
||||||
|
|
||||||
@ -3491,10 +3679,167 @@ pub mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn clean_dead_slots_empty() {
|
fn test_clean_dead_slots_empty() {
|
||||||
let accounts = AccountsDB::new_single();
|
let accounts = AccountsDB::new_single();
|
||||||
let mut dead_slots = HashSet::new();
|
let mut dead_slots = HashSet::new();
|
||||||
dead_slots.insert(10);
|
dead_slots.insert(10);
|
||||||
accounts.clean_dead_slots(&dead_slots);
|
accounts.clean_dead_slots(&dead_slots);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_shrink_stale_slots_none() {
|
||||||
|
let accounts = AccountsDB::new_single();
|
||||||
|
|
||||||
|
for _ in 0..10 {
|
||||||
|
accounts.process_stale_slot();
|
||||||
|
}
|
||||||
|
|
||||||
|
accounts.shrink_all_stale_slots();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_shrink_next_slots() {
|
||||||
|
let accounts = AccountsDB::new_single();
|
||||||
|
|
||||||
|
let mut current_slot = 7;
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
vec![None, None, None],
|
||||||
|
(0..3)
|
||||||
|
.map({ |_| accounts.next_shrink_slot() })
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
);
|
||||||
|
|
||||||
|
accounts.add_root(current_slot);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
vec![Some(7), Some(7), Some(7)],
|
||||||
|
(0..3)
|
||||||
|
.map({ |_| accounts.next_shrink_slot() })
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
);
|
||||||
|
|
||||||
|
current_slot += 1;
|
||||||
|
accounts.add_root(current_slot);
|
||||||
|
|
||||||
|
let slots = (0..6)
|
||||||
|
.map({ |_| accounts.next_shrink_slot() })
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
// Because the origin of this data is HashMap (not BTreeMap), key order is arbitrary per cycle.
|
||||||
|
assert!(
|
||||||
|
vec![Some(7), Some(8), Some(7), Some(8), Some(7), Some(8)] == slots
|
||||||
|
|| vec![Some(8), Some(7), Some(8), Some(7), Some(8), Some(7)] == slots
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_shrink_stale_slots_processed() {
|
||||||
|
solana_logger::setup();
|
||||||
|
|
||||||
|
let accounts = AccountsDB::new_single();
|
||||||
|
|
||||||
|
let pubkey_count = 100;
|
||||||
|
let pubkeys: Vec<_> = (0..pubkey_count).map(|_| Pubkey::new_rand()).collect();
|
||||||
|
|
||||||
|
let some_lamport = 223;
|
||||||
|
let no_data = 0;
|
||||||
|
let owner = Account::default().owner;
|
||||||
|
|
||||||
|
let account = Account::new(some_lamport, no_data, &owner);
|
||||||
|
|
||||||
|
let mut current_slot = 0;
|
||||||
|
|
||||||
|
current_slot += 1;
|
||||||
|
for pubkey in &pubkeys {
|
||||||
|
accounts.store(current_slot, &[(&pubkey, &account)]);
|
||||||
|
}
|
||||||
|
let shrink_slot = current_slot;
|
||||||
|
accounts.add_root(current_slot);
|
||||||
|
|
||||||
|
current_slot += 1;
|
||||||
|
let pubkey_count_after_shrink = 10;
|
||||||
|
let updated_pubkeys = &pubkeys[0..pubkey_count - pubkey_count_after_shrink];
|
||||||
|
|
||||||
|
for pubkey in updated_pubkeys {
|
||||||
|
accounts.store(current_slot, &[(&pubkey, &account)]);
|
||||||
|
}
|
||||||
|
accounts.add_root(current_slot);
|
||||||
|
|
||||||
|
accounts.clean_accounts();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
pubkey_count,
|
||||||
|
accounts.all_account_count_in_append_vec(shrink_slot)
|
||||||
|
);
|
||||||
|
accounts.shrink_all_stale_slots();
|
||||||
|
assert_eq!(
|
||||||
|
pubkey_count_after_shrink,
|
||||||
|
accounts.all_account_count_in_append_vec(shrink_slot)
|
||||||
|
);
|
||||||
|
|
||||||
|
let no_ancestors = HashMap::default();
|
||||||
|
accounts.update_accounts_hash(current_slot, &no_ancestors);
|
||||||
|
accounts
|
||||||
|
.verify_bank_hash(current_slot, &no_ancestors)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot);
|
||||||
|
accounts
|
||||||
|
.verify_bank_hash(current_slot, &no_ancestors)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// repeating should be no-op
|
||||||
|
accounts.shrink_all_stale_slots();
|
||||||
|
assert_eq!(
|
||||||
|
pubkey_count_after_shrink,
|
||||||
|
accounts.all_account_count_in_append_vec(shrink_slot)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_shrink_stale_slots_skipped() {
|
||||||
|
solana_logger::setup();
|
||||||
|
|
||||||
|
let accounts = AccountsDB::new_single();
|
||||||
|
|
||||||
|
let pubkey_count = 100;
|
||||||
|
let pubkeys: Vec<_> = (0..pubkey_count).map(|_| Pubkey::new_rand()).collect();
|
||||||
|
|
||||||
|
let some_lamport = 223;
|
||||||
|
let no_data = 0;
|
||||||
|
let owner = Account::default().owner;
|
||||||
|
|
||||||
|
let account = Account::new(some_lamport, no_data, &owner);
|
||||||
|
|
||||||
|
let mut current_slot = 0;
|
||||||
|
|
||||||
|
current_slot += 1;
|
||||||
|
for pubkey in &pubkeys {
|
||||||
|
accounts.store(current_slot, &[(&pubkey, &account)]);
|
||||||
|
}
|
||||||
|
let shrink_slot = current_slot;
|
||||||
|
accounts.add_root(current_slot);
|
||||||
|
|
||||||
|
current_slot += 1;
|
||||||
|
let pubkey_count_after_shrink = 90;
|
||||||
|
let updated_pubkeys = &pubkeys[0..pubkey_count - pubkey_count_after_shrink];
|
||||||
|
|
||||||
|
for pubkey in updated_pubkeys {
|
||||||
|
accounts.store(current_slot, &[(&pubkey, &account)]);
|
||||||
|
}
|
||||||
|
accounts.add_root(current_slot);
|
||||||
|
|
||||||
|
accounts.clean_accounts();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
pubkey_count,
|
||||||
|
accounts.all_account_count_in_append_vec(shrink_slot)
|
||||||
|
);
|
||||||
|
accounts.shrink_all_stale_slots();
|
||||||
|
assert_eq!(
|
||||||
|
pubkey_count,
|
||||||
|
accounts.all_account_count_in_append_vec(shrink_slot)
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -160,7 +160,9 @@ impl AppendVec {
|
|||||||
data.seek(SeekFrom::Start(0)).unwrap();
|
data.seek(SeekFrom::Start(0)).unwrap();
|
||||||
data.flush().unwrap();
|
data.flush().unwrap();
|
||||||
//UNSAFE: Required to create a Mmap
|
//UNSAFE: Required to create a Mmap
|
||||||
let map = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") };
|
let map = unsafe { MmapMut::map_mut(&data) };
|
||||||
|
let map =
|
||||||
|
map.unwrap_or_else(|e| panic!("failed to map the data file (size: {}): {}", size, e));
|
||||||
|
|
||||||
AppendVec {
|
AppendVec {
|
||||||
path: file.to_path_buf(),
|
path: file.to_path_buf(),
|
||||||
|
@ -1959,6 +1959,7 @@ impl Bank {
|
|||||||
/// calculation and could shield other real accounts.
|
/// calculation and could shield other real accounts.
|
||||||
pub fn verify_snapshot_bank(&self) -> bool {
|
pub fn verify_snapshot_bank(&self) -> bool {
|
||||||
self.clean_accounts();
|
self.clean_accounts();
|
||||||
|
self.shrink_all_stale_slots();
|
||||||
// Order and short-circuiting is significant; verify_hash requires a valid bank hash
|
// Order and short-circuiting is significant; verify_hash requires a valid bank hash
|
||||||
self.verify_bank_hash() && self.verify_hash()
|
self.verify_bank_hash() && self.verify_hash()
|
||||||
}
|
}
|
||||||
@ -2200,9 +2201,17 @@ impl Bank {
|
|||||||
self.rc.accounts.accounts_db.clean_accounts();
|
self.rc.accounts.accounts_db.clean_accounts();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn clean_dead_slots(&self) {
|
pub fn process_dead_slots(&self) {
|
||||||
self.rc.accounts.accounts_db.process_dead_slots();
|
self.rc.accounts.accounts_db.process_dead_slots();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn process_stale_slot(&self) {
|
||||||
|
self.rc.accounts.accounts_db.process_stale_slot();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn shrink_all_stale_slots(&self) {
|
||||||
|
self.rc.accounts.accounts_db.shrink_all_stale_slots();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for Bank {
|
impl Drop for Bank {
|
||||||
|
Reference in New Issue
Block a user