(cherry picked from commit f2a2581259
)
Co-authored-by: Jeff Washington (jwash) <75863576+jeffwashington@users.noreply.github.com>
This commit is contained in:
@ -165,6 +165,7 @@ pub struct ErrorCounters {
|
|||||||
struct GenerateIndexTimings {
|
struct GenerateIndexTimings {
|
||||||
pub index_time: u64,
|
pub index_time: u64,
|
||||||
pub scan_time: u64,
|
pub scan_time: u64,
|
||||||
|
pub insertion_time_us: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl GenerateIndexTimings {
|
impl GenerateIndexTimings {
|
||||||
@ -174,6 +175,7 @@ impl GenerateIndexTimings {
|
|||||||
// we cannot accurately measure index insertion time because of many threads and lock contention
|
// we cannot accurately measure index insertion time because of many threads and lock contention
|
||||||
("total_us", self.index_time, i64),
|
("total_us", self.index_time, i64),
|
||||||
("scan_stores_us", self.scan_time, i64),
|
("scan_stores_us", self.scan_time, i64),
|
||||||
|
("insertion_time_us", self.insertion_time_us, i64),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -5870,45 +5872,52 @@ impl AccountsDb {
|
|||||||
accounts_map
|
accounts_map
|
||||||
}
|
}
|
||||||
|
|
||||||
fn generate_index_for_slot<'a>(&self, accounts_map: GenerateIndexAccountsMap<'a>, slot: &Slot) {
|
fn generate_index_for_slot<'a>(
|
||||||
if !accounts_map.is_empty() {
|
&self,
|
||||||
let items = accounts_map
|
accounts_map: GenerateIndexAccountsMap<'a>,
|
||||||
.iter()
|
slot: &Slot,
|
||||||
.map(|(pubkey, (_, store_id, stored_account))| {
|
) -> u64 {
|
||||||
(
|
if accounts_map.is_empty() {
|
||||||
pubkey,
|
return 0;
|
||||||
AccountInfo {
|
}
|
||||||
store_id: *store_id,
|
|
||||||
offset: stored_account.offset,
|
|
||||||
stored_size: stored_account.stored_size,
|
|
||||||
lamports: stored_account.account_meta.lamports,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let dirty_pubkeys = self
|
let items = accounts_map
|
||||||
.accounts_index
|
.iter()
|
||||||
.insert_new_if_missing_into_primary_index(*slot, items);
|
.map(|(pubkey, (_, store_id, stored_account))| {
|
||||||
|
(
|
||||||
|
pubkey,
|
||||||
|
AccountInfo {
|
||||||
|
store_id: *store_id,
|
||||||
|
offset: stored_account.offset,
|
||||||
|
stored_size: stored_account.stored_size,
|
||||||
|
lamports: stored_account.account_meta.lamports,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
// dirty_pubkeys will contain a pubkey if an item has multiple rooted entries for
|
let (dirty_pubkeys, insert_us) = self
|
||||||
// a given pubkey. If there is just a single item, there is no cleaning to
|
.accounts_index
|
||||||
// be done on that pubkey. Use only those pubkeys with multiple updates.
|
.insert_new_if_missing_into_primary_index(*slot, items);
|
||||||
if !dirty_pubkeys.is_empty() {
|
|
||||||
self.uncleaned_pubkeys.insert(*slot, dirty_pubkeys);
|
|
||||||
}
|
|
||||||
|
|
||||||
if !self.account_indexes.is_empty() {
|
// dirty_pubkeys will contain a pubkey if an item has multiple rooted entries for
|
||||||
for (pubkey, (_, _store_id, stored_account)) in accounts_map.iter() {
|
// a given pubkey. If there is just a single item, there is no cleaning to
|
||||||
self.accounts_index.update_secondary_indexes(
|
// be done on that pubkey. Use only those pubkeys with multiple updates.
|
||||||
pubkey,
|
if !dirty_pubkeys.is_empty() {
|
||||||
&stored_account.account_meta.owner,
|
self.uncleaned_pubkeys.insert(*slot, dirty_pubkeys);
|
||||||
stored_account.data,
|
}
|
||||||
&self.account_indexes,
|
|
||||||
);
|
if !self.account_indexes.is_empty() {
|
||||||
}
|
for (pubkey, (_, _store_id, stored_account)) in accounts_map.iter() {
|
||||||
|
self.accounts_index.update_secondary_indexes(
|
||||||
|
pubkey,
|
||||||
|
&stored_account.account_meta.owner,
|
||||||
|
stored_account.data,
|
||||||
|
&self.account_indexes,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
insert_us
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::needless_collect)]
|
#[allow(clippy::needless_collect)]
|
||||||
@ -5923,6 +5932,7 @@ impl AccountsDb {
|
|||||||
let outer_slots_len = slots.len();
|
let outer_slots_len = slots.len();
|
||||||
let chunk_size = (outer_slots_len / 7) + 1; // approximately 400k slots in a snapshot
|
let chunk_size = (outer_slots_len / 7) + 1; // approximately 400k slots in a snapshot
|
||||||
let mut index_time = Measure::start("index");
|
let mut index_time = Measure::start("index");
|
||||||
|
let insertion_time_us = AtomicU64::new(0);
|
||||||
let scan_time: u64 = slots
|
let scan_time: u64 = slots
|
||||||
.par_chunks(chunk_size)
|
.par_chunks(chunk_size)
|
||||||
.map(|slots| {
|
.map(|slots| {
|
||||||
@ -5943,7 +5953,8 @@ impl AccountsDb {
|
|||||||
scan_time.stop();
|
scan_time.stop();
|
||||||
scan_time_sum += scan_time.as_us();
|
scan_time_sum += scan_time.as_us();
|
||||||
|
|
||||||
self.generate_index_for_slot(accounts_map, slot);
|
let insert_us = self.generate_index_for_slot(accounts_map, slot);
|
||||||
|
insertion_time_us.fetch_add(insert_us, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
scan_time_sum
|
scan_time_sum
|
||||||
})
|
})
|
||||||
@ -5952,6 +5963,7 @@ impl AccountsDb {
|
|||||||
let timings = GenerateIndexTimings {
|
let timings = GenerateIndexTimings {
|
||||||
scan_time,
|
scan_time,
|
||||||
index_time: index_time.as_us(),
|
index_time: index_time.as_us(),
|
||||||
|
insertion_time_us: insertion_time_us.load(Ordering::Relaxed),
|
||||||
};
|
};
|
||||||
timings.report();
|
timings.report();
|
||||||
|
|
||||||
|
@ -1359,7 +1359,8 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
|
|||||||
&self,
|
&self,
|
||||||
slot: Slot,
|
slot: Slot,
|
||||||
items: Vec<(&Pubkey, T)>,
|
items: Vec<(&Pubkey, T)>,
|
||||||
) -> Vec<Pubkey> {
|
) -> (Vec<Pubkey>, u64) {
|
||||||
|
// returns (duplicate pubkey mask, insertion time us)
|
||||||
let item_len = items.len();
|
let item_len = items.len();
|
||||||
let potentially_new_items = items
|
let potentially_new_items = items
|
||||||
.into_iter()
|
.into_iter()
|
||||||
@ -1375,6 +1376,7 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
|
|||||||
let mut _reclaims = SlotList::new();
|
let mut _reclaims = SlotList::new();
|
||||||
let mut duplicate_keys = Vec::with_capacity(item_len / 100); // just an estimate
|
let mut duplicate_keys = Vec::with_capacity(item_len / 100); // just an estimate
|
||||||
let mut w_account_maps = self.get_account_maps_write_lock();
|
let mut w_account_maps = self.get_account_maps_write_lock();
|
||||||
|
let mut insert_time = Measure::start("insert_into_primary_index");
|
||||||
potentially_new_items
|
potentially_new_items
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.for_each(|(pubkey, new_item)| {
|
.for_each(|(pubkey, new_item)| {
|
||||||
@ -1389,7 +1391,8 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
duplicate_keys
|
insert_time.stop();
|
||||||
|
(duplicate_keys, insert_time.as_us())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Updates the given pubkey at the given slot with the new account information.
|
// Updates the given pubkey at the given slot with the new account information.
|
||||||
|
Reference in New Issue
Block a user