break out generate index reporting (#17980) (#18015)

(cherry picked from commit 71796f4951)

Co-authored-by: Jeff Washington (jwash) <75863576+jeffwashington@users.noreply.github.com>
This commit is contained in:
mergify[bot]
2021-06-16 22:02:19 +00:00
committed by GitHub
parent c28e6ebc4c
commit 2f0f1fd5f5

View File

@@ -165,6 +165,52 @@ impl ZeroLamport for AccountInfo {
}
}
struct MultiThreadProgress<'a> {
last_update: Instant,
my_last_report_count: u64,
total_count: &'a AtomicU64,
report_delay_secs: u64,
first_caller: bool,
ultimate_count: u64,
}
impl<'a> MultiThreadProgress<'a> {
fn new(total_count: &'a AtomicU64, report_delay_secs: u64, ultimate_count: u64) -> Self {
Self {
last_update: Instant::now(),
my_last_report_count: 0,
total_count,
report_delay_secs,
first_caller: false,
ultimate_count,
}
}
fn report(&mut self, my_current_count: u64) {
let now = Instant::now();
if now.duration_since(self.last_update).as_secs() >= self.report_delay_secs {
let my_total_newly_processed_slots_since_last_report =
my_current_count - self.my_last_report_count;
self.my_last_report_count = my_current_count;
let previous_total_processed_slots_across_all_threads = self.total_count.fetch_add(
my_total_newly_processed_slots_since_last_report,
Ordering::Relaxed,
);
self.first_caller =
self.first_caller || 0 == previous_total_processed_slots_across_all_threads;
if self.first_caller {
info!(
"generating index: {}/{} slots...",
previous_total_processed_slots_across_all_threads
+ my_total_newly_processed_slots_since_last_report,
self.ultimate_count
);
}
self.last_update = now;
}
}
}
/// An offset into the AccountsDb::storage vector
pub type AppendVecId = usize;
pub type SnapshotStorage = Vec<Arc<AccountStorageEntry>>;
@@ -5534,34 +5580,15 @@ impl AccountsDb {
let scan_time: u64 = slots
.par_chunks(chunk_size)
.map(|slots| {
let mut last_log_update = Instant::now();
let mut my_last_reported_number_of_processed_slots = 0;
let mut was_first = false;
let mut log_status = MultiThreadProgress::new(
&total_processed_slots_across_all_threads,
2,
outer_slots_len as u64,
);
let mut scan_time_sum = 0;
for (index, slot) in slots.iter().enumerate() {
let mut scan_time = Measure::start("scan");
let now = Instant::now();
if now.duration_since(last_log_update).as_secs() >= 2 {
let my_total_newly_processed_slots_since_last_report =
(index as u64) - my_last_reported_number_of_processed_slots;
my_last_reported_number_of_processed_slots = index as u64;
let previous_total_processed_slots_across_all_threads =
total_processed_slots_across_all_threads.fetch_add(
my_total_newly_processed_slots_since_last_report,
Ordering::Relaxed,
);
was_first =
was_first || 0 == previous_total_processed_slots_across_all_threads;
if was_first {
info!(
"generating index: {}/{} slots...",
previous_total_processed_slots_across_all_threads
+ my_total_newly_processed_slots_since_last_report,
outer_slots_len
);
}
last_log_update = now;
}
log_status.report(index as u64);
let storage_maps: Vec<Arc<AccountStorageEntry>> = self
.storage
.get_slot_storage_entries(*slot)