* Add shrink paths (#14238)
(cherry picked from commit baa9602411
)
* Ignore long/hanging test (#14261)
Co-authored-by: sakridge <sakridge@gmail.com>
Co-authored-by: Tyera Eulberg <teulberg@gmail.com>
This commit is contained in:
@@ -442,6 +442,8 @@ pub struct AccountsDB {
|
||||
/// Set of storage paths to pick from
|
||||
pub(crate) paths: Vec<PathBuf>,
|
||||
|
||||
pub shrink_paths: RwLock<Option<Vec<PathBuf>>>,
|
||||
|
||||
/// Directory of paths this accounts_db needs to hold/remove
|
||||
temp_paths: Option<Vec<TempDir>>,
|
||||
|
||||
@@ -529,6 +531,7 @@ impl Default for AccountsDB {
|
||||
shrink_candidate_slots: Mutex::new(Vec::new()),
|
||||
write_version: AtomicU64::new(0),
|
||||
paths: vec![],
|
||||
shrink_paths: RwLock::new(None),
|
||||
temp_paths: None,
|
||||
file_size: DEFAULT_FILE_SIZE,
|
||||
thread_pool: rayon::ThreadPoolBuilder::new()
|
||||
@@ -574,6 +577,15 @@ impl AccountsDB {
|
||||
new
|
||||
}
|
||||
|
||||
pub fn set_shrink_paths(&self, paths: Vec<PathBuf>) {
|
||||
assert!(!paths.is_empty());
|
||||
let mut shrink_paths = self.shrink_paths.write().unwrap();
|
||||
for path in &paths {
|
||||
std::fs::create_dir_all(path).expect("Create directory failed.");
|
||||
}
|
||||
*shrink_paths = Some(paths);
|
||||
}
|
||||
|
||||
pub fn file_size(&self) -> u64 {
|
||||
self.file_size
|
||||
}
|
||||
@@ -1004,11 +1016,28 @@ impl AccountsDB {
|
||||
}
|
||||
|
||||
fn shrink_stale_slot(&self, candidates: &mut MutexGuard<Vec<Slot>>) -> usize {
|
||||
if let Some(slot) = self.do_next_shrink_slot(candidates) {
|
||||
self.do_shrink_stale_slot(slot)
|
||||
} else {
|
||||
0
|
||||
let mut shrunken_account_total = 0;
|
||||
let mut shrunk_slot_count = 0;
|
||||
let start = Instant::now();
|
||||
let num_roots = self.accounts_index.num_roots();
|
||||
loop {
|
||||
if let Some(slot) = self.do_next_shrink_slot(candidates) {
|
||||
shrunken_account_total += self.do_shrink_stale_slot(slot);
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
if start.elapsed().as_millis() > 100 || shrunk_slot_count > num_roots / 10 {
|
||||
debug!(
|
||||
"do_shrink_stale_slot: {} {} {}us",
|
||||
shrunk_slot_count,
|
||||
candidates.len(),
|
||||
start.elapsed().as_micros()
|
||||
);
|
||||
break;
|
||||
}
|
||||
shrunk_slot_count += 1;
|
||||
}
|
||||
shrunken_account_total
|
||||
}
|
||||
|
||||
// Reads all accounts in given slot's AppendVecs and filter only to alive,
|
||||
@@ -1043,14 +1072,15 @@ impl AccountsDB {
|
||||
} else if !forced {
|
||||
let sparse_by_count = (alive_count as f32 / stored_count as f32) <= 0.8;
|
||||
let sparse_by_bytes = (written_bytes as f32 / total_bytes as f32) <= 0.8;
|
||||
let skip_shrink = !sparse_by_count && !sparse_by_bytes;
|
||||
info!(
|
||||
"shrink_stale_slot ({}): skip_shrink: {} count: {}/{} byte: {}/{}",
|
||||
slot, skip_shrink, alive_count, stored_count, written_bytes, total_bytes,
|
||||
);
|
||||
if skip_shrink {
|
||||
let not_sparse = !sparse_by_count && !sparse_by_bytes;
|
||||
let too_small_to_shrink = total_bytes <= PAGE_SIZE;
|
||||
if not_sparse || too_small_to_shrink {
|
||||
return 0;
|
||||
}
|
||||
info!(
|
||||
"shrink_stale_slot ({}): not_sparse: {} count: {}/{} byte: {}/{}",
|
||||
slot, not_sparse, alive_count, stored_count, written_bytes, total_bytes,
|
||||
);
|
||||
}
|
||||
for store in stores.values() {
|
||||
let mut start = 0;
|
||||
@@ -1144,7 +1174,17 @@ impl AccountsDB {
|
||||
{
|
||||
new_store
|
||||
} else {
|
||||
self.create_and_insert_store(slot, aligned_total, "shrink")
|
||||
let maybe_shrink_paths = self.shrink_paths.read().unwrap();
|
||||
if let Some(ref shrink_paths) = *maybe_shrink_paths {
|
||||
self.create_and_insert_store_with_paths(
|
||||
slot,
|
||||
aligned_total,
|
||||
"shrink-w-path",
|
||||
shrink_paths,
|
||||
)
|
||||
} else {
|
||||
self.create_and_insert_store(slot, aligned_total, "shrink")
|
||||
}
|
||||
};
|
||||
start.stop();
|
||||
create_and_insert_store_elapsed = start.as_us();
|
||||
@@ -1591,7 +1631,7 @@ impl AccountsDB {
|
||||
self.stats
|
||||
.create_store_count
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
self.create_store(slot, self.file_size, "store")
|
||||
self.create_store(slot, self.file_size, "store", &self.paths)
|
||||
};
|
||||
|
||||
// try_available is like taking a lock on the store,
|
||||
@@ -1620,11 +1660,17 @@ impl AccountsDB {
|
||||
false
|
||||
}
|
||||
|
||||
fn create_store(&self, slot: Slot, size: u64, from: &str) -> Arc<AccountStorageEntry> {
|
||||
let path_index = thread_rng().gen_range(0, self.paths.len());
|
||||
fn create_store(
|
||||
&self,
|
||||
slot: Slot,
|
||||
size: u64,
|
||||
from: &str,
|
||||
paths: &[PathBuf],
|
||||
) -> Arc<AccountStorageEntry> {
|
||||
let path_index = thread_rng().gen_range(0, paths.len());
|
||||
let store = Arc::new(self.new_storage_entry(
|
||||
slot,
|
||||
&Path::new(&self.paths[path_index]),
|
||||
&Path::new(&paths[path_index]),
|
||||
self.page_align(size),
|
||||
));
|
||||
|
||||
@@ -1647,7 +1693,17 @@ impl AccountsDB {
|
||||
size: u64,
|
||||
from: &str,
|
||||
) -> Arc<AccountStorageEntry> {
|
||||
let store = self.create_store(slot, size, from);
|
||||
self.create_and_insert_store_with_paths(slot, size, from, &self.paths)
|
||||
}
|
||||
|
||||
fn create_and_insert_store_with_paths(
|
||||
&self,
|
||||
slot: Slot,
|
||||
size: u64,
|
||||
from: &str,
|
||||
paths: &[PathBuf],
|
||||
) -> Arc<AccountStorageEntry> {
|
||||
let store = self.create_store(slot, size, from, paths);
|
||||
let store_for_index = store.clone();
|
||||
|
||||
self.insert_store(slot, store_for_index);
|
||||
@@ -5457,6 +5513,7 @@ pub mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_shrink_and_clean() {
|
||||
solana_logger::setup();
|
||||
|
||||
|
Reference in New Issue
Block a user