diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index d0c45c357a..5c006539a4 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -189,6 +189,52 @@ impl ZeroLamport for AccountInfo { } } +struct MultiThreadProgress<'a> { + last_update: Instant, + my_last_report_count: u64, + total_count: &'a AtomicU64, + report_delay_secs: u64, + first_caller: bool, + ultimate_count: u64, +} + +impl<'a> MultiThreadProgress<'a> { + fn new(total_count: &'a AtomicU64, report_delay_secs: u64, ultimate_count: u64) -> Self { + Self { + last_update: Instant::now(), + my_last_report_count: 0, + total_count, + report_delay_secs, + first_caller: false, + ultimate_count, + } + } + fn report(&mut self, my_current_count: u64) { + let now = Instant::now(); + if now.duration_since(self.last_update).as_secs() >= self.report_delay_secs { + let my_total_newly_processed_slots_since_last_report = + my_current_count - self.my_last_report_count; + + self.my_last_report_count = my_current_count; + let previous_total_processed_slots_across_all_threads = self.total_count.fetch_add( + my_total_newly_processed_slots_since_last_report, + Ordering::Relaxed, + ); + self.first_caller = + self.first_caller || 0 == previous_total_processed_slots_across_all_threads; + if self.first_caller { + info!( + "generating index: {}/{} slots...", + previous_total_processed_slots_across_all_threads + + my_total_newly_processed_slots_since_last_report, + self.ultimate_count + ); + } + self.last_update = now; + } + } +} + /// An offset into the AccountsDb::storage vector pub type AppendVecId = usize; pub type SnapshotStorage = Vec>; @@ -5770,34 +5816,15 @@ impl AccountsDb { let scan_time: u64 = slots .par_chunks(chunk_size) .map(|slots| { - let mut last_log_update = Instant::now(); - let mut my_last_reported_number_of_processed_slots = 0; - let mut was_first = false; + let mut log_status = MultiThreadProgress::new( + &total_processed_slots_across_all_threads, + 2, + outer_slots_len as u64, + ); let mut scan_time_sum = 0; for (index, slot) in slots.iter().enumerate() { let mut scan_time = Measure::start("scan"); - let now = Instant::now(); - if now.duration_since(last_log_update).as_secs() >= 2 { - let my_total_newly_processed_slots_since_last_report = - (index as u64) - my_last_reported_number_of_processed_slots; - my_last_reported_number_of_processed_slots = index as u64; - let previous_total_processed_slots_across_all_threads = - total_processed_slots_across_all_threads.fetch_add( - my_total_newly_processed_slots_since_last_report, - Ordering::Relaxed, - ); - was_first = - was_first || 0 == previous_total_processed_slots_across_all_threads; - if was_first { - info!( - "generating index: {}/{} slots...", - previous_total_processed_slots_across_all_threads - + my_total_newly_processed_slots_since_last_report, - outer_slots_len - ); - } - last_log_update = now; - } + log_status.report(index as u64); let storage_maps: Vec> = self .storage .get_slot_storage_entries(*slot)