Co-authored-by: Carl Lin <carl@solana.com>
(cherry picked from commit 2745b79b74
)
Co-authored-by: carllin <wumu727@gmail.com>
This commit is contained in:
144
Cargo.lock
generated
144
Cargo.lock
generated
@ -25,15 +25,6 @@ version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2"
|
||||
|
||||
[[package]]
|
||||
name = "ahash"
|
||||
version = "0.3.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217"
|
||||
dependencies = [
|
||||
"const-random",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ahash"
|
||||
version = "0.4.6"
|
||||
@ -612,24 +603,10 @@ dependencies = [
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const-random"
|
||||
version = "0.1.8"
|
||||
name = "const_fn"
|
||||
version = "0.4.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a"
|
||||
dependencies = [
|
||||
"const-random-macro",
|
||||
"proc-macro-hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const-random-macro"
|
||||
version = "0.1.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a"
|
||||
dependencies = [
|
||||
"getrandom 0.1.14",
|
||||
"proc-macro-hack",
|
||||
]
|
||||
checksum = "28b9d6de7f49e22cf97ad17fc4036ece69300032f45f78f30b4a4482cdc3f4a6"
|
||||
|
||||
[[package]]
|
||||
name = "constant_time_eq"
|
||||
@ -693,21 +670,42 @@ version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.7.2",
|
||||
"maybe-uninit",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-channel"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"crossbeam-utils 0.8.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-deque"
|
||||
version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285"
|
||||
dependencies = [
|
||||
"crossbeam-epoch",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-epoch 0.8.2",
|
||||
"crossbeam-utils 0.7.2",
|
||||
"maybe-uninit",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-deque"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"crossbeam-epoch 0.9.1",
|
||||
"crossbeam-utils 0.8.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-epoch"
|
||||
version = "0.8.2"
|
||||
@ -716,10 +714,24 @@ checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
|
||||
dependencies = [
|
||||
"autocfg 1.0.0",
|
||||
"cfg-if 0.1.10",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.7.2",
|
||||
"lazy_static",
|
||||
"maybe-uninit",
|
||||
"memoffset",
|
||||
"memoffset 0.5.4",
|
||||
"scopeguard",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-epoch"
|
||||
version = "0.9.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a1aaa739f95311c2c7887a76863f500026092fb1dce0161dab577e559ef3569d"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"const_fn",
|
||||
"crossbeam-utils 0.8.1",
|
||||
"lazy_static",
|
||||
"memoffset 0.6.1",
|
||||
"scopeguard",
|
||||
]
|
||||
|
||||
@ -730,7 +742,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570"
|
||||
dependencies = [
|
||||
"cfg-if 0.1.10",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.7.2",
|
||||
"maybe-uninit",
|
||||
]
|
||||
|
||||
@ -745,6 +757,17 @@ dependencies = [
|
||||
"lazy_static",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-utils"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "02d96d1e189ef58269ebe5b97953da3274d83a93af647c2ddd6f9dab28cedb8d"
|
||||
dependencies = [
|
||||
"autocfg 1.0.0",
|
||||
"cfg-if 1.0.0",
|
||||
"lazy_static",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crunchy"
|
||||
version = "0.2.2"
|
||||
@ -831,13 +854,13 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "dashmap"
|
||||
version = "3.11.10"
|
||||
version = "4.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0f260e2fc850179ef410018660006951c1b55b79e8087e87111a2c388994b9b5"
|
||||
checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c"
|
||||
dependencies = [
|
||||
"ahash 0.3.8",
|
||||
"cfg-if 0.1.10",
|
||||
"cfg-if 1.0.0",
|
||||
"num_cpus",
|
||||
"rayon",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -2150,6 +2173,15 @@ dependencies = [
|
||||
"autocfg 1.0.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memoffset"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "157b4208e3059a8f9e78d559edc658e13df41410cb3ae03979c83130067fdd87"
|
||||
dependencies = [
|
||||
"autocfg 1.0.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mime"
|
||||
version = "0.2.6"
|
||||
@ -3056,25 +3088,25 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rayon"
|
||||
version = "1.4.1"
|
||||
version = "1.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dcf6960dc9a5b4ee8d3e4c5787b4a112a8818e0290a42ff664ad60692fdf2032"
|
||||
checksum = "8b0d8e0819fadc20c74ea8373106ead0600e3a67ef1fe8da56e39b9ae7275674"
|
||||
dependencies = [
|
||||
"autocfg 1.0.0",
|
||||
"crossbeam-deque",
|
||||
"crossbeam-deque 0.8.0",
|
||||
"either",
|
||||
"rayon-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon-core"
|
||||
version = "1.8.1"
|
||||
version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e8c4fec834fb6e6d2dd5eece3c7b432a52f0ba887cf40e595190c4107edc08bf"
|
||||
checksum = "9ab346ac5921dc62ffa9f89b7a773907511cdfa5490c572ae9be1be33e8afa4a"
|
||||
dependencies = [
|
||||
"crossbeam-channel",
|
||||
"crossbeam-deque",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-channel 0.5.0",
|
||||
"crossbeam-deque 0.8.0",
|
||||
"crossbeam-utils 0.8.1",
|
||||
"lazy_static",
|
||||
"num_cpus",
|
||||
]
|
||||
@ -3236,7 +3268,7 @@ dependencies = [
|
||||
"base64 0.11.0",
|
||||
"blake2b_simd",
|
||||
"constant_time_eq",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.7.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -3655,7 +3687,7 @@ name = "solana-accounts-bench"
|
||||
version = "1.5.5"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-channel 0.4.4",
|
||||
"log 0.4.11",
|
||||
"rand 0.7.3",
|
||||
"rayon",
|
||||
@ -3671,7 +3703,7 @@ name = "solana-banking-bench"
|
||||
version = "1.5.5"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-channel 0.4.4",
|
||||
"log 0.4.11",
|
||||
"rand 0.7.3",
|
||||
"rayon",
|
||||
@ -4000,7 +4032,7 @@ dependencies = [
|
||||
"bytes 0.4.12",
|
||||
"chrono",
|
||||
"core_affinity",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-channel 0.4.4",
|
||||
"ed25519-dalek",
|
||||
"flate2",
|
||||
"fs_extra",
|
||||
@ -4329,7 +4361,7 @@ dependencies = [
|
||||
"byteorder",
|
||||
"chrono",
|
||||
"chrono-humanize",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-channel 0.4.4",
|
||||
"dlopen",
|
||||
"dlopen_derive",
|
||||
"ed25519-dalek",
|
||||
@ -4415,7 +4447,7 @@ name = "solana-local-cluster"
|
||||
version = "1.5.5"
|
||||
dependencies = [
|
||||
"assert_matches",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-channel 0.4.4",
|
||||
"fs_extra",
|
||||
"gag",
|
||||
"itertools 0.9.0",
|
||||
@ -4745,7 +4777,7 @@ dependencies = [
|
||||
"bv",
|
||||
"byteorder",
|
||||
"bzip2",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-channel 0.4.4",
|
||||
"dashmap",
|
||||
"dir-diff",
|
||||
"flate2",
|
||||
@ -5729,7 +5761,7 @@ version = "0.1.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fb2d1b8f4548dbf5e1f7818512e9c406860678f29c300cdf0ebac72d1a3a1671"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.7.2",
|
||||
"futures 0.1.29",
|
||||
]
|
||||
|
||||
@ -5783,7 +5815,7 @@ version = "0.1.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09bc590ec4ba8ba87652da2068d150dcada2cfa2e07faae270a5e0409aa51351"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.7.2",
|
||||
"futures 0.1.29",
|
||||
"lazy_static",
|
||||
"log 0.4.11",
|
||||
@ -5852,9 +5884,9 @@ version = "0.1.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df720b6581784c118f0eb4310796b12b1d242a7eb95f716a8367855325c25f89"
|
||||
dependencies = [
|
||||
"crossbeam-deque",
|
||||
"crossbeam-deque 0.7.3",
|
||||
"crossbeam-queue",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.7.2",
|
||||
"futures 0.1.29",
|
||||
"lazy_static",
|
||||
"log 0.4.11",
|
||||
@ -5869,7 +5901,7 @@ version = "0.2.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "93044f2d313c95ff1cb7809ce9a7a05735b012288a888b62d4434fd58c94f296"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.7.2",
|
||||
"futures 0.1.29",
|
||||
"slab",
|
||||
"tokio-executor",
|
||||
|
@ -14,7 +14,7 @@ blake3 = "0.3.6"
|
||||
bv = { version = "0.11.1", features = ["serde"] }
|
||||
byteorder = "1.3.4"
|
||||
bzip2 = "0.3.3"
|
||||
dashmap = "3.11.10"
|
||||
dashmap = { version = "4.0.2", features = ["rayon"] }
|
||||
crossbeam-channel = "0.4"
|
||||
dir-diff = "0.3.2"
|
||||
flate2 = "1.0.14"
|
||||
|
@ -4,6 +4,7 @@ extern crate test;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use rand::Rng;
|
||||
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
||||
use solana_runtime::{
|
||||
accounts::{create_test_accounts, Accounts},
|
||||
bank::*,
|
||||
@ -11,6 +12,7 @@ use solana_runtime::{
|
||||
use solana_sdk::{
|
||||
account::Account,
|
||||
genesis_config::{create_genesis_config, ClusterType},
|
||||
hash::Hash,
|
||||
pubkey::Pubkey,
|
||||
};
|
||||
use std::{
|
||||
@ -297,3 +299,59 @@ fn bench_rwlock_hashmap_single_reader_with_n_writers(bencher: &mut Bencher) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn setup_bench_dashmap_iter() -> (Arc<Accounts>, DashMap<Pubkey, (Account, Hash)>) {
|
||||
let accounts = Arc::new(Accounts::new_with_config(
|
||||
vec![
|
||||
PathBuf::from(std::env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string()))
|
||||
.join("bench_dashmap_par_iter"),
|
||||
],
|
||||
&ClusterType::Development,
|
||||
HashSet::new(),
|
||||
false,
|
||||
));
|
||||
|
||||
let dashmap = DashMap::new();
|
||||
let num_keys = std::env::var("NUM_BENCH_KEYS")
|
||||
.map(|num_keys| num_keys.parse::<usize>().unwrap())
|
||||
.unwrap_or_else(|_| 10000);
|
||||
for _ in 0..num_keys {
|
||||
dashmap.insert(
|
||||
Pubkey::new_unique(),
|
||||
(
|
||||
Account::new(1, 0, &Account::default().owner),
|
||||
Hash::new_unique(),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
(accounts, dashmap)
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_dashmap_par_iter(bencher: &mut Bencher) {
|
||||
let (accounts, dashmap) = setup_bench_dashmap_iter();
|
||||
|
||||
bencher.iter(|| {
|
||||
test::black_box(accounts.accounts_db.thread_pool.install(|| {
|
||||
dashmap
|
||||
.par_iter()
|
||||
.map(|cached_account| (*cached_account.key(), cached_account.value().1))
|
||||
.collect::<Vec<(Pubkey, Hash)>>()
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_dashmap_iter(bencher: &mut Bencher) {
|
||||
let (_accounts, dashmap) = setup_bench_dashmap_iter();
|
||||
|
||||
bencher.iter(|| {
|
||||
test::black_box(
|
||||
dashmap
|
||||
.iter()
|
||||
.map(|cached_account| (*cached_account.key(), cached_account.value().1))
|
||||
.collect::<Vec<(Pubkey, Hash)>>(),
|
||||
);
|
||||
});
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
use crate::{
|
||||
accounts_db::{AccountsDB, AppendVecId, BankHashInfo, ErrorCounters, LoadedAccount},
|
||||
accounts_db::{AccountsDB, BankHashInfo, ErrorCounters, LoadedAccount, ScanStorageResult},
|
||||
accounts_index::{AccountIndex, Ancestors, IndexKey},
|
||||
bank::{
|
||||
NonceRollbackFull, NonceRollbackInfo, TransactionCheckResult, TransactionExecutionResult,
|
||||
@ -9,9 +9,12 @@ use crate::{
|
||||
system_instruction_processor::{get_system_account_kind, SystemAccountKind},
|
||||
transaction_utils::OrderedIterator,
|
||||
};
|
||||
use dashmap::{
|
||||
mapref::entry::Entry::{Occupied, Vacant},
|
||||
DashMap,
|
||||
};
|
||||
use log::*;
|
||||
use rand::{thread_rng, Rng};
|
||||
use rayon::slice::ParallelSliceMut;
|
||||
use solana_sdk::{
|
||||
account::Account,
|
||||
account_utils::StateMut,
|
||||
@ -453,28 +456,48 @@ impl Accounts {
|
||||
pub fn scan_slot<F, B>(&self, slot: Slot, func: F) -> Vec<B>
|
||||
where
|
||||
F: Fn(LoadedAccount) -> Option<B> + Send + Sync,
|
||||
B: Send + Default,
|
||||
B: Sync + Send + Default + std::cmp::Eq,
|
||||
{
|
||||
let accumulator: Vec<Vec<(Pubkey, u64, B)>> = self.accounts_db.scan_account_storage(
|
||||
let scan_result = self.accounts_db.scan_account_storage(
|
||||
slot,
|
||||
|loaded_account: LoadedAccount, _id: AppendVecId, accum: &mut Vec<(Pubkey, u64, B)>| {
|
||||
let pubkey = *loaded_account.pubkey();
|
||||
let write_version = loaded_account.write_version();
|
||||
if let Some(val) = func(loaded_account) {
|
||||
accum.push((pubkey, std::u64::MAX - write_version, val));
|
||||
|loaded_account: LoadedAccount| {
|
||||
// Cache only has one version per key, don't need to worry about versioning
|
||||
func(loaded_account)
|
||||
},
|
||||
|accum: &DashMap<Pubkey, (u64, B)>, loaded_account: LoadedAccount| {
|
||||
let loaded_account_pubkey = *loaded_account.pubkey();
|
||||
let loaded_write_version = loaded_account.write_version();
|
||||
let should_insert = accum
|
||||
.get(&loaded_account_pubkey)
|
||||
.map(|existing_entry| loaded_write_version > existing_entry.value().0)
|
||||
.unwrap_or(true);
|
||||
if should_insert {
|
||||
if let Some(val) = func(loaded_account) {
|
||||
// Detected insertion is necessary, grabs the write lock to commit the write,
|
||||
match accum.entry(loaded_account_pubkey) {
|
||||
// Double check in case another thread interleaved a write between the read + write.
|
||||
Occupied(mut occupied_entry) => {
|
||||
if loaded_write_version > occupied_entry.get().0 {
|
||||
occupied_entry.insert((loaded_write_version, val));
|
||||
}
|
||||
}
|
||||
|
||||
Vacant(vacant_entry) => {
|
||||
vacant_entry.insert((loaded_write_version, val));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
let mut versions: Vec<(Pubkey, u64, B)> = accumulator.into_iter().flatten().collect();
|
||||
self.accounts_db.thread_pool.install(|| {
|
||||
versions.par_sort_by_key(|s| (s.0, s.1));
|
||||
});
|
||||
versions.dedup_by_key(|s| s.0);
|
||||
versions
|
||||
.into_iter()
|
||||
.map(|(_pubkey, _version, val)| val)
|
||||
.collect()
|
||||
match scan_result {
|
||||
ScanStorageResult::Cached(cached_result) => cached_result,
|
||||
ScanStorageResult::Stored(stored_result) => stored_result
|
||||
.into_iter()
|
||||
.map(|(_pubkey, (_latest_write_version, val))| val)
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn load_by_program_slot(
|
||||
|
@ -27,7 +27,10 @@ use crate::{
|
||||
contains::Contains,
|
||||
};
|
||||
use blake3::traits::digest::Digest;
|
||||
use dashmap::DashMap;
|
||||
use dashmap::{
|
||||
mapref::entry::Entry::{Occupied, Vacant},
|
||||
DashMap, DashSet,
|
||||
};
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use rand::{prelude::SliceRandom, thread_rng, Rng};
|
||||
@ -62,6 +65,7 @@ const MAX_RECYCLE_STORES: usize = 1000;
|
||||
const STORE_META_OVERHEAD: usize = 256;
|
||||
const MAX_CACHE_SLOTS: usize = 200;
|
||||
const FLUSH_CACHE_RANDOM_THRESHOLD: usize = MAX_LOCKOUT_HISTORY;
|
||||
const SCAN_SLOT_PAR_ITER_THRESHOLD: usize = 4000;
|
||||
|
||||
pub const DEFAULT_FILE_SIZE: u64 = PAGE_SIZE * 1024;
|
||||
pub const DEFAULT_NUM_THREADS: u32 = 8;
|
||||
@ -87,12 +91,19 @@ const CACHE_VIRTUAL_WRITE_VERSION: u64 = 0;
|
||||
const CACHE_VIRTUAL_OFFSET: usize = 0;
|
||||
const CACHE_VIRTUAL_STORED_SIZE: usize = 0;
|
||||
|
||||
type DashMapVersionHash = DashMap<Pubkey, (u64, Hash)>;
|
||||
|
||||
lazy_static! {
|
||||
// FROZEN_ACCOUNT_PANIC is used to signal local_cluster that an AccountsDB panic has occurred,
|
||||
// as |cargo test| cannot observe panics in other threads
|
||||
pub static ref FROZEN_ACCOUNT_PANIC: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
|
||||
}
|
||||
|
||||
pub enum ScanStorageResult<R, B> {
|
||||
Cached(Vec<R>),
|
||||
Stored(B),
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ErrorCounters {
|
||||
pub total: usize,
|
||||
@ -623,7 +634,6 @@ pub struct AccountsDB {
|
||||
struct AccountsStats {
|
||||
delta_hash_scan_time_total_us: AtomicU64,
|
||||
delta_hash_accumulate_time_total_us: AtomicU64,
|
||||
delta_hash_merge_time_total_us: AtomicU64,
|
||||
delta_hash_num: AtomicU64,
|
||||
|
||||
last_store_report: AtomicU64,
|
||||
@ -1894,35 +1904,46 @@ impl AccountsDB {
|
||||
}
|
||||
|
||||
/// Scan a specific slot through all the account storage in parallel
|
||||
pub fn scan_account_storage<F, B>(&self, slot: Slot, scan_func: F) -> Vec<B>
|
||||
pub fn scan_account_storage<R, B>(
|
||||
&self,
|
||||
slot: Slot,
|
||||
cache_map_func: impl Fn(LoadedAccount) -> Option<R> + Sync,
|
||||
storage_scan_func: impl Fn(&B, LoadedAccount) + Sync,
|
||||
) -> ScanStorageResult<R, B>
|
||||
where
|
||||
F: Fn(LoadedAccount, AppendVecId, &mut B) + Send + Sync,
|
||||
B: Send + Default,
|
||||
{
|
||||
self.scan_account_storage_inner(slot, scan_func)
|
||||
}
|
||||
|
||||
fn scan_account_storage_inner<F, B>(&self, slot: Slot, scan_func: F) -> Vec<B>
|
||||
where
|
||||
F: Fn(LoadedAccount, AppendVecId, &mut B) + Send + Sync,
|
||||
B: Send + Default,
|
||||
R: Send,
|
||||
B: Send + Default + Sync,
|
||||
{
|
||||
if let Some(slot_cache) = self.accounts_cache.slot_cache(slot) {
|
||||
// If we see the slot in the cache, then all the account information
|
||||
// is in this cached slot
|
||||
let mut retval = B::default();
|
||||
for cached_account in slot_cache.iter() {
|
||||
scan_func(
|
||||
LoadedAccount::Cached((
|
||||
*cached_account.key(),
|
||||
Cow::Borrowed(cached_account.value()),
|
||||
)),
|
||||
CACHE_VIRTUAL_STORAGE_ID,
|
||||
&mut retval,
|
||||
);
|
||||
if slot_cache.len() > SCAN_SLOT_PAR_ITER_THRESHOLD {
|
||||
ScanStorageResult::Cached(self.thread_pool.install(|| {
|
||||
slot_cache
|
||||
.par_iter()
|
||||
.filter_map(|cached_account| {
|
||||
cache_map_func(LoadedAccount::Cached((
|
||||
*cached_account.key(),
|
||||
Cow::Borrowed(cached_account.value()),
|
||||
)))
|
||||
})
|
||||
.collect()
|
||||
}))
|
||||
} else {
|
||||
ScanStorageResult::Cached(
|
||||
slot_cache
|
||||
.iter()
|
||||
.filter_map(|cached_account| {
|
||||
cache_map_func(LoadedAccount::Cached((
|
||||
*cached_account.key(),
|
||||
Cow::Borrowed(cached_account.value()),
|
||||
)))
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
vec![retval]
|
||||
} else {
|
||||
let retval = B::default();
|
||||
// If the slot is not in the cache, then all the account information must have
|
||||
// been flushed. This is guaranteed because we only remove the rooted slot from
|
||||
// the cache *after* we've finished flushing in `flush_slot_cache`.
|
||||
@ -1933,21 +1954,12 @@ impl AccountsDB {
|
||||
.unwrap_or_default();
|
||||
self.thread_pool.install(|| {
|
||||
storage_maps
|
||||
.into_par_iter()
|
||||
.map(|storage| {
|
||||
let accounts = storage.accounts.accounts(0);
|
||||
let mut retval = B::default();
|
||||
accounts.into_iter().for_each(|stored_account| {
|
||||
scan_func(
|
||||
LoadedAccount::Stored(stored_account),
|
||||
storage.append_vec_id(),
|
||||
&mut retval,
|
||||
)
|
||||
});
|
||||
retval
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.par_iter()
|
||||
.flat_map(|storage| storage.accounts.accounts(0))
|
||||
.for_each(|account| storage_scan_func(&retval, LoadedAccount::Stored(account)));
|
||||
});
|
||||
|
||||
ScanStorageResult::Stored(retval)
|
||||
}
|
||||
}
|
||||
|
||||
@ -2518,9 +2530,10 @@ impl AccountsDB {
|
||||
// Reads will then always read the latest version of a slot. Scans will also know
|
||||
// which version their parents because banks will also be augmented with this version,
|
||||
// which handles cases where a deletion of one version happens in the middle of the scan.
|
||||
let pubkey_sets: Vec<HashSet<Pubkey>> = self.scan_account_storage(
|
||||
let scan_result: ScanStorageResult<Pubkey, DashSet<Pubkey>> = self.scan_account_storage(
|
||||
remove_slot,
|
||||
|loaded_account: LoadedAccount, _, accum: &mut HashSet<Pubkey>| {
|
||||
|loaded_account: LoadedAccount| Some(*loaded_account.pubkey()),
|
||||
|accum: &DashSet<Pubkey>, loaded_account: LoadedAccount| {
|
||||
accum.insert(*loaded_account.pubkey());
|
||||
},
|
||||
);
|
||||
@ -2528,15 +2541,26 @@ impl AccountsDB {
|
||||
// Purge this slot from the accounts index
|
||||
let purge_slot: HashSet<Slot> = vec![remove_slot].into_iter().collect();
|
||||
let mut reclaims = vec![];
|
||||
{
|
||||
let pubkeys = pubkey_sets.iter().flatten();
|
||||
for pubkey in pubkeys {
|
||||
self.accounts_index.purge_exact(
|
||||
pubkey,
|
||||
&purge_slot,
|
||||
&mut reclaims,
|
||||
&self.account_indexes,
|
||||
);
|
||||
match scan_result {
|
||||
ScanStorageResult::Cached(cached_keys) => {
|
||||
for pubkey in cached_keys.iter() {
|
||||
self.accounts_index.purge_exact(
|
||||
pubkey,
|
||||
&purge_slot,
|
||||
&mut reclaims,
|
||||
&self.account_indexes,
|
||||
);
|
||||
}
|
||||
}
|
||||
ScanStorageResult::Stored(stored_keys) => {
|
||||
for set_ref in stored_keys.iter() {
|
||||
self.accounts_index.purge_exact(
|
||||
set_ref.key(),
|
||||
&purge_slot,
|
||||
&mut reclaims,
|
||||
&self.account_indexes,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -3061,13 +3085,6 @@ impl AccountsDB {
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"delta_hash_merge_us",
|
||||
self.stats
|
||||
.delta_hash_merge_time_total_us
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"delta_hash_accumulate_us",
|
||||
self.stats
|
||||
@ -3345,41 +3362,59 @@ impl AccountsDB {
|
||||
|
||||
pub fn get_accounts_delta_hash(&self, slot: Slot) -> Hash {
|
||||
let mut scan = Measure::start("scan");
|
||||
let mut accumulator: Vec<HashMap<Pubkey, (u64, Hash)>> = self.scan_account_storage(
|
||||
slot,
|
||||
|loaded_account: LoadedAccount,
|
||||
_store_id: AppendVecId,
|
||||
accum: &mut HashMap<Pubkey, (u64, Hash)>| {
|
||||
accum.insert(
|
||||
*loaded_account.pubkey(),
|
||||
(
|
||||
loaded_account.write_version(),
|
||||
|
||||
let scan_result: ScanStorageResult<(Pubkey, Hash, u64), DashMapVersionHash> = self
|
||||
.scan_account_storage(
|
||||
slot,
|
||||
|loaded_account: LoadedAccount| {
|
||||
// Cache only has one version per key, don't need to worry about versioning
|
||||
Some((
|
||||
*loaded_account.pubkey(),
|
||||
*loaded_account.loaded_hash(),
|
||||
),
|
||||
);
|
||||
},
|
||||
);
|
||||
CACHE_VIRTUAL_WRITE_VERSION,
|
||||
))
|
||||
},
|
||||
|accum: &DashMap<Pubkey, (u64, Hash)>, loaded_account: LoadedAccount| {
|
||||
let loaded_write_version = loaded_account.write_version();
|
||||
let loaded_hash = *loaded_account.loaded_hash();
|
||||
let should_insert =
|
||||
if let Some(existing_entry) = accum.get(loaded_account.pubkey()) {
|
||||
loaded_write_version > existing_entry.value().version()
|
||||
} else {
|
||||
true
|
||||
};
|
||||
if should_insert {
|
||||
// Detected insertion is necessary, grabs the write lock to commit the write,
|
||||
match accum.entry(*loaded_account.pubkey()) {
|
||||
// Double check in case another thread interleaved a write between the read + write.
|
||||
Occupied(mut occupied_entry) => {
|
||||
if loaded_write_version > occupied_entry.get().version() {
|
||||
occupied_entry.insert((loaded_write_version, loaded_hash));
|
||||
}
|
||||
}
|
||||
|
||||
Vacant(vacant_entry) => {
|
||||
vacant_entry.insert((loaded_write_version, loaded_hash));
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
);
|
||||
scan.stop();
|
||||
let mut merge = Measure::start("merge");
|
||||
let mut account_maps = HashMap::new();
|
||||
while let Some(maps) = accumulator.pop() {
|
||||
AccountsDB::merge(&mut account_maps, &maps);
|
||||
}
|
||||
merge.stop();
|
||||
|
||||
let mut accumulate = Measure::start("accumulate");
|
||||
let hashes: Vec<_> = account_maps
|
||||
.into_iter()
|
||||
.map(|(pubkey, (_, hash))| (pubkey, hash, 0))
|
||||
.collect();
|
||||
let hashes: Vec<_> = match scan_result {
|
||||
ScanStorageResult::Cached(cached_result) => cached_result,
|
||||
ScanStorageResult::Stored(stored_result) => stored_result
|
||||
.into_iter()
|
||||
.map(|(pubkey, (_latest_write_version, hash))| (pubkey, hash, 0))
|
||||
.collect(),
|
||||
};
|
||||
let ret = Self::accumulate_account_hashes(hashes, slot, false);
|
||||
accumulate.stop();
|
||||
self.stats
|
||||
.delta_hash_scan_time_total_us
|
||||
.fetch_add(scan.as_us(), Ordering::Relaxed);
|
||||
self.stats
|
||||
.delta_hash_merge_time_total_us
|
||||
.fetch_add(merge.as_us(), Ordering::Relaxed);
|
||||
self.stats
|
||||
.delta_hash_accumulate_time_total_us
|
||||
.fetch_add(accumulate.as_us(), Ordering::Relaxed);
|
||||
@ -3880,20 +3915,6 @@ impl AccountsDB {
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn merge<X>(dest: &mut HashMap<Pubkey, X>, source: &HashMap<Pubkey, X>)
|
||||
where
|
||||
X: Versioned + Clone,
|
||||
{
|
||||
for (key, source_item) in source.iter() {
|
||||
if let Some(dest_item) = dest.get(key) {
|
||||
if dest_item.version() > source_item.version() {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
dest.insert(*key, source_item.clone());
|
||||
}
|
||||
}
|
||||
|
||||
pub fn generate_index(&self) {
|
||||
type AccountsMap<'a> =
|
||||
DashMap<Pubkey, Mutex<BTreeMap<u64, (AppendVecId, StoredAccountMeta<'a>)>>>;
|
||||
|
Reference in New Issue
Block a user