Add ability to abort scan (#21314)

This commit is contained in:
Jeff Washington (jwash)
2021-11-17 13:10:29 -06:00
committed by GitHub
parent e540b1cf3c
commit 0f69a14247
8 changed files with 242 additions and 121 deletions

View File

@@ -59,6 +59,34 @@ pub type SlotSlice<'s, T> = &'s [(Slot, T)];
pub type RefCount = u64;
pub type AccountMap<V> = Arc<InMemAccountsIndex<V>>;
#[derive(Debug, Default)]
pub struct ScanConfig {
/// checked by the scan. When true, abort scan.
abort: Option<Arc<AtomicBool>>,
/// true to allow return of all matching items and allow them to be unsorted.
/// This is more efficient.
collect_all_unsorted: bool,
}
impl ScanConfig {
pub fn new(collect_all_unsorted: bool) -> Self {
Self {
collect_all_unsorted,
..ScanConfig::default()
}
}
/// true if scan should abort
pub fn is_aborted(&self) -> bool {
if let Some(abort) = self.abort.as_ref() {
abort.load(Ordering::Relaxed)
} else {
false
}
}
}
pub(crate) type AccountMapEntry<T> = Arc<AccountMapEntryInner<T>>;
pub trait IsCached:
@@ -837,7 +865,7 @@ impl<T: IndexValue> AccountsIndex<T> {
scan_bank_id: BankId,
func: F,
scan_type: ScanTypes<R>,
collect_all_unsorted: bool,
config: ScanConfig,
) -> Result<(), ScanError>
where
F: FnMut(&Pubkey, (&T, Slot)),
@@ -990,14 +1018,7 @@ impl<T: IndexValue> AccountsIndex<T> {
match scan_type {
ScanTypes::Unindexed(range) => {
// Pass "" not to log metrics, so RPC doesn't get spammy
self.do_scan_accounts(
metric_name,
ancestors,
func,
range,
Some(max_root),
collect_all_unsorted,
);
self.do_scan_accounts(metric_name, ancestors, func, range, Some(max_root), config);
}
ScanTypes::Indexed(IndexKey::ProgramId(program_id)) => {
self.do_scan_secondary_index(
@@ -1006,6 +1027,7 @@ impl<T: IndexValue> AccountsIndex<T> {
&self.program_id_index,
&program_id,
Some(max_root),
config,
);
}
ScanTypes::Indexed(IndexKey::SplTokenMint(mint_key)) => {
@@ -1015,6 +1037,7 @@ impl<T: IndexValue> AccountsIndex<T> {
&self.spl_token_mint_index,
&mint_key,
Some(max_root),
config,
);
}
ScanTypes::Indexed(IndexKey::SplTokenOwner(owner_key)) => {
@@ -1024,6 +1047,7 @@ impl<T: IndexValue> AccountsIndex<T> {
&self.spl_token_owner_index,
&owner_key,
Some(max_root),
config,
);
}
}
@@ -1061,19 +1085,12 @@ impl<T: IndexValue> AccountsIndex<T> {
ancestors: &Ancestors,
func: F,
range: Option<R>,
collect_all_unsorted: bool,
config: ScanConfig,
) where
F: FnMut(&Pubkey, (&T, Slot)),
R: RangeBounds<Pubkey> + std::fmt::Debug,
{
self.do_scan_accounts(
metric_name,
ancestors,
func,
range,
None,
collect_all_unsorted,
);
self.do_scan_accounts(metric_name, ancestors, func, range, None, config);
}
// Scan accounts and return latest version of each account that is either:
@@ -1086,7 +1103,7 @@ impl<T: IndexValue> AccountsIndex<T> {
mut func: F,
range: Option<R>,
max_root: Option<Slot>,
collect_all_unsorted: bool,
config: ScanConfig,
) where
F: FnMut(&Pubkey, (&T, Slot)),
R: RangeBounds<Pubkey> + std::fmt::Debug,
@@ -1100,7 +1117,7 @@ impl<T: IndexValue> AccountsIndex<T> {
let mut read_lock_elapsed = 0;
let mut iterator_elapsed = 0;
let mut iterator_timer = Measure::start("iterator_elapsed");
for pubkey_list in self.iter(range.as_ref(), collect_all_unsorted) {
for pubkey_list in self.iter(range.as_ref(), config.collect_all_unsorted) {
iterator_timer.stop();
iterator_elapsed += iterator_timer.as_us();
for (pubkey, list) in pubkey_list {
@@ -1118,6 +1135,9 @@ impl<T: IndexValue> AccountsIndex<T> {
load_account_timer.stop();
load_account_elapsed += load_account_timer.as_us();
}
if config.is_aborted() {
return;
}
}
iterator_timer = Measure::start("iterator_elapsed");
}
@@ -1146,6 +1166,7 @@ impl<T: IndexValue> AccountsIndex<T> {
index: &SecondaryIndex<SecondaryIndexEntryType>,
index_key: &Pubkey,
max_root: Option<Slot>,
config: ScanConfig,
) where
F: FnMut(&Pubkey, (&T, Slot)),
{
@@ -1160,6 +1181,9 @@ impl<T: IndexValue> AccountsIndex<T> {
(&list_r.slot_list()[index].1, list_r.slot_list()[index].0),
);
}
if config.is_aborted() {
break;
}
}
}
@@ -1212,11 +1236,11 @@ impl<T: IndexValue> AccountsIndex<T> {
ancestors: &Ancestors,
scan_bank_id: BankId,
func: F,
config: ScanConfig,
) -> Result<(), ScanError>
where
F: FnMut(&Pubkey, (&T, Slot)),
{
let collect_all_unsorted = false;
// Pass "" not to log metrics, so RPC doesn't get spammy
self.do_checked_scan_accounts(
"",
@@ -1224,7 +1248,7 @@ impl<T: IndexValue> AccountsIndex<T> {
scan_bank_id,
func,
ScanTypes::Unindexed(None::<Range<Pubkey>>),
collect_all_unsorted,
config,
)
}
@@ -1233,7 +1257,7 @@ impl<T: IndexValue> AccountsIndex<T> {
metric_name: &'static str,
ancestors: &Ancestors,
func: F,
collect_all_unsorted: bool,
config: ScanConfig,
) where
F: FnMut(&Pubkey, (&T, Slot)),
{
@@ -1242,7 +1266,7 @@ impl<T: IndexValue> AccountsIndex<T> {
ancestors,
func,
None::<Range<Pubkey>>,
collect_all_unsorted,
config,
);
}
@@ -1252,20 +1276,14 @@ impl<T: IndexValue> AccountsIndex<T> {
metric_name: &'static str,
ancestors: &Ancestors,
range: R,
collect_all_unsorted: bool,
config: ScanConfig,
func: F,
) where
F: FnMut(&Pubkey, (&T, Slot)),
R: RangeBounds<Pubkey> + std::fmt::Debug,
{
// Only the rent logic should be calling this, which doesn't need the safety checks
self.do_unchecked_scan_accounts(
metric_name,
ancestors,
func,
Some(range),
collect_all_unsorted,
);
self.do_unchecked_scan_accounts(metric_name, ancestors, func, Some(range), config);
}
/// call func with every pubkey and index visible from a given set of ancestors
@@ -1275,12 +1293,11 @@ impl<T: IndexValue> AccountsIndex<T> {
scan_bank_id: BankId,
index_key: IndexKey,
func: F,
config: ScanConfig,
) -> Result<(), ScanError>
where
F: FnMut(&Pubkey, (&T, Slot)),
{
let collect_all_unsorted = false;
// Pass "" not to log metrics, so RPC doesn't get spammy
self.do_checked_scan_accounts(
"",
@@ -1288,7 +1305,7 @@ impl<T: IndexValue> AccountsIndex<T> {
scan_bank_id,
func,
ScanTypes::<Range<Pubkey>>::Indexed(index_key),
collect_all_unsorted,
config,
)
}
@@ -2676,7 +2693,7 @@ pub mod tests {
"",
&ancestors,
|_pubkey, _index| num += 1,
COLLECT_ALL_UNSORTED_FALSE,
ScanConfig::default(),
);
assert_eq!(num, 0);
}
@@ -2754,7 +2771,7 @@ pub mod tests {
"",
&ancestors,
|_pubkey, _index| num += 1,
COLLECT_ALL_UNSORTED_FALSE,
ScanConfig::default(),
);
assert_eq!(num, 0);
}
@@ -2793,7 +2810,7 @@ pub mod tests {
"",
&ancestors,
|_pubkey, _index| num += 1,
COLLECT_ALL_UNSORTED_FALSE,
ScanConfig::default(),
);
assert_eq!(num, 0);
ancestors.insert(slot, 0);
@@ -2803,7 +2820,7 @@ pub mod tests {
"",
&ancestors,
|_pubkey, _index| num += 1,
COLLECT_ALL_UNSORTED_FALSE,
ScanConfig::default(),
);
assert_eq!(num, 1);
@@ -2822,7 +2839,7 @@ pub mod tests {
"",
&ancestors,
|_pubkey, _index| num += 1,
COLLECT_ALL_UNSORTED_FALSE,
ScanConfig::default(),
);
assert_eq!(num, 0);
ancestors.insert(slot, 0);
@@ -2832,7 +2849,7 @@ pub mod tests {
"",
&ancestors,
|_pubkey, _index| num += 1,
COLLECT_ALL_UNSORTED_FALSE,
ScanConfig::default(),
);
assert_eq!(num, 1);
}
@@ -3019,7 +3036,7 @@ pub mod tests {
"",
&ancestors,
|_pubkey, _index| num += 1,
COLLECT_ALL_UNSORTED_FALSE,
ScanConfig::default(),
);
assert_eq!(num, 0);
ancestors.insert(slot, 0);
@@ -3028,7 +3045,7 @@ pub mod tests {
"",
&ancestors,
|_pubkey, _index| num += 1,
COLLECT_ALL_UNSORTED_FALSE,
ScanConfig::default(),
);
assert_eq!(num, 1);
}
@@ -3058,7 +3075,7 @@ pub mod tests {
"",
&ancestors,
|_pubkey, _index| num += 1,
COLLECT_ALL_UNSORTED_FALSE,
ScanConfig::default(),
);
assert_eq!(num, 0);
}
@@ -3095,7 +3112,7 @@ pub mod tests {
};
num += 1
},
COLLECT_ALL_UNSORTED_FALSE,
ScanConfig::default(),
);
assert_eq!(num, 1);
assert!(found_key);
@@ -3168,7 +3185,7 @@ pub mod tests {
"",
&ancestors,
pubkey_range,
COLLECT_ALL_UNSORTED_FALSE,
ScanConfig::default(),
|pubkey, _index| {
scanned_keys.insert(*pubkey);
},
@@ -3247,7 +3264,7 @@ pub mod tests {
|pubkey, _index| {
scanned_keys.insert(*pubkey);
},
COLLECT_ALL_UNSORTED_FALSE,
ScanConfig::default(),
);
assert_eq!(scanned_keys.len(), num_pubkeys);
}
@@ -3553,7 +3570,7 @@ pub mod tests {
};
num += 1
},
COLLECT_ALL_UNSORTED_FALSE,
ScanConfig::default(),
);
assert_eq!(num, 1);
assert!(found_key);