refactor generate_index_for_slot (#17984) (#18116)

(cherry picked from commit 2087f5da94)

Co-authored-by: Jeff Washington (jwash) <75863576+jeffwashington@users.noreply.github.com>
This commit is contained in:
mergify[bot]
2021-06-21 18:25:56 -05:00
committed by GitHub
parent 363b75619f
commit 0f87e598f6

View File

@ -5794,38 +5794,7 @@ impl AccountsDb {
accounts_map
}
#[allow(clippy::needless_collect)]
pub fn generate_index(&self, limit_load_slot_count_from_snapshot: Option<usize>) {
let mut slots = self.storage.all_slots();
#[allow(clippy::stable_sort_primitive)]
slots.sort();
if let Some(limit) = limit_load_slot_count_from_snapshot {
slots.truncate(limit); // get rid of the newer slots and keep just the older
}
let total_processed_slots_across_all_threads = AtomicU64::new(0);
let outer_slots_len = slots.len();
let chunk_size = (outer_slots_len / 7) + 1; // approximately 400k slots in a snapshot
let mut index_time = Measure::start("index");
let scan_time: u64 = slots
.par_chunks(chunk_size)
.map(|slots| {
let mut log_status = MultiThreadProgress::new(
&total_processed_slots_across_all_threads,
2,
outer_slots_len as u64,
);
let mut scan_time_sum = 0;
for (index, slot) in slots.iter().enumerate() {
let mut scan_time = Measure::start("scan");
log_status.report(index as u64);
let storage_maps: Vec<Arc<AccountStorageEntry>> = self
.storage
.get_slot_storage_entries(*slot)
.unwrap_or_default();
let accounts_map = Self::process_storage_slot(&storage_maps);
scan_time.stop();
scan_time_sum += scan_time.as_us();
fn generate_index_for_slot<'a>(&self, accounts_map: GenerateIndexAccountsMap<'a>, slot: &Slot) {
if !accounts_map.is_empty() {
let len = accounts_map.len();
@ -5876,6 +5845,41 @@ impl AccountsDb {
}
}
}
#[allow(clippy::needless_collect)]
pub fn generate_index(&self, limit_load_slot_count_from_snapshot: Option<usize>) {
let mut slots = self.storage.all_slots();
#[allow(clippy::stable_sort_primitive)]
slots.sort();
if let Some(limit) = limit_load_slot_count_from_snapshot {
slots.truncate(limit); // get rid of the newer slots and keep just the older
}
let total_processed_slots_across_all_threads = AtomicU64::new(0);
let outer_slots_len = slots.len();
let chunk_size = (outer_slots_len / 7) + 1; // approximately 400k slots in a snapshot
let mut index_time = Measure::start("index");
let scan_time: u64 = slots
.par_chunks(chunk_size)
.map(|slots| {
let mut log_status = MultiThreadProgress::new(
&total_processed_slots_across_all_threads,
2,
outer_slots_len as u64,
);
let mut scan_time_sum = 0;
for (index, slot) in slots.iter().enumerate() {
let mut scan_time = Measure::start("scan");
log_status.report(index as u64);
let storage_maps: Vec<Arc<AccountStorageEntry>> = self
.storage
.get_slot_storage_entries(*slot)
.unwrap_or_default();
let accounts_map = Self::process_storage_slot(&storage_maps);
scan_time.stop();
scan_time_sum += scan_time.as_us();
self.generate_index_for_slot(accounts_map, slot);
}
scan_time_sum
})
.sum();