break out generate index reporting (#17980)
This commit is contained in:
committed by
GitHub
parent
7ca04d6a86
commit
71796f4951
@ -189,6 +189,52 @@ impl ZeroLamport for AccountInfo {
|
||||
}
|
||||
}
|
||||
|
||||
struct MultiThreadProgress<'a> {
|
||||
last_update: Instant,
|
||||
my_last_report_count: u64,
|
||||
total_count: &'a AtomicU64,
|
||||
report_delay_secs: u64,
|
||||
first_caller: bool,
|
||||
ultimate_count: u64,
|
||||
}
|
||||
|
||||
impl<'a> MultiThreadProgress<'a> {
|
||||
fn new(total_count: &'a AtomicU64, report_delay_secs: u64, ultimate_count: u64) -> Self {
|
||||
Self {
|
||||
last_update: Instant::now(),
|
||||
my_last_report_count: 0,
|
||||
total_count,
|
||||
report_delay_secs,
|
||||
first_caller: false,
|
||||
ultimate_count,
|
||||
}
|
||||
}
|
||||
fn report(&mut self, my_current_count: u64) {
|
||||
let now = Instant::now();
|
||||
if now.duration_since(self.last_update).as_secs() >= self.report_delay_secs {
|
||||
let my_total_newly_processed_slots_since_last_report =
|
||||
my_current_count - self.my_last_report_count;
|
||||
|
||||
self.my_last_report_count = my_current_count;
|
||||
let previous_total_processed_slots_across_all_threads = self.total_count.fetch_add(
|
||||
my_total_newly_processed_slots_since_last_report,
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
self.first_caller =
|
||||
self.first_caller || 0 == previous_total_processed_slots_across_all_threads;
|
||||
if self.first_caller {
|
||||
info!(
|
||||
"generating index: {}/{} slots...",
|
||||
previous_total_processed_slots_across_all_threads
|
||||
+ my_total_newly_processed_slots_since_last_report,
|
||||
self.ultimate_count
|
||||
);
|
||||
}
|
||||
self.last_update = now;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An offset into the AccountsDb::storage vector
|
||||
pub type AppendVecId = usize;
|
||||
pub type SnapshotStorage = Vec<Arc<AccountStorageEntry>>;
|
||||
@ -5770,34 +5816,15 @@ impl AccountsDb {
|
||||
let scan_time: u64 = slots
|
||||
.par_chunks(chunk_size)
|
||||
.map(|slots| {
|
||||
let mut last_log_update = Instant::now();
|
||||
let mut my_last_reported_number_of_processed_slots = 0;
|
||||
let mut was_first = false;
|
||||
let mut log_status = MultiThreadProgress::new(
|
||||
&total_processed_slots_across_all_threads,
|
||||
2,
|
||||
outer_slots_len as u64,
|
||||
);
|
||||
let mut scan_time_sum = 0;
|
||||
for (index, slot) in slots.iter().enumerate() {
|
||||
let mut scan_time = Measure::start("scan");
|
||||
let now = Instant::now();
|
||||
if now.duration_since(last_log_update).as_secs() >= 2 {
|
||||
let my_total_newly_processed_slots_since_last_report =
|
||||
(index as u64) - my_last_reported_number_of_processed_slots;
|
||||
my_last_reported_number_of_processed_slots = index as u64;
|
||||
let previous_total_processed_slots_across_all_threads =
|
||||
total_processed_slots_across_all_threads.fetch_add(
|
||||
my_total_newly_processed_slots_since_last_report,
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
was_first =
|
||||
was_first || 0 == previous_total_processed_slots_across_all_threads;
|
||||
if was_first {
|
||||
info!(
|
||||
"generating index: {}/{} slots...",
|
||||
previous_total_processed_slots_across_all_threads
|
||||
+ my_total_newly_processed_slots_since_last_report,
|
||||
outer_slots_len
|
||||
);
|
||||
}
|
||||
last_log_update = now;
|
||||
}
|
||||
log_status.report(index as u64);
|
||||
let storage_maps: Vec<Arc<AccountStorageEntry>> = self
|
||||
.storage
|
||||
.get_slot_storage_entries(*slot)
|
||||
|
Reference in New Issue
Block a user