RPC: add caching to getLargestAccounts (#15154)

* introduce get largest accounts cache

* remove cache size and change hash key

* remove eq and hash derivation from commitment config

* add slot to the cache
This commit is contained in:
Josh
2021-02-11 11:32:46 -08:00
committed by GitHub
parent ab0f4c69aa
commit 4013f91dbe
5 changed files with 138 additions and 16 deletions

View File

@ -8,6 +8,7 @@ pub mod mock_sender;
pub mod nonce_utils; pub mod nonce_utils;
pub mod perf_utils; pub mod perf_utils;
pub mod pubsub_client; pub mod pubsub_client;
pub mod rpc_cache;
pub mod rpc_client; pub mod rpc_client;
pub mod rpc_config; pub mod rpc_config;
pub mod rpc_custom_error; pub mod rpc_custom_error;

75
client/src/rpc_cache.rs Normal file
View File

@ -0,0 +1,75 @@
use crate::{rpc_config::RpcLargestAccountsFilter, rpc_response::RpcAccountBalance};
use std::{
collections::HashMap,
time::{Duration, SystemTime},
};
#[derive(Debug, Clone)]
pub struct LargestAccountsCache {
duration: u64,
cache: HashMap<Option<RpcLargestAccountsFilter>, LargestAccountsCacheValue>,
}
#[derive(Debug, Clone)]
struct LargestAccountsCacheValue {
accounts: Vec<RpcAccountBalance>,
slot: u64,
cached_time: SystemTime,
}
impl LargestAccountsCache {
pub fn new(duration: u64) -> Self {
Self {
duration,
cache: HashMap::new(),
}
}
pub fn get_largest_accounts(
&self,
filter: &Option<RpcLargestAccountsFilter>,
) -> Option<(u64, Vec<RpcAccountBalance>)> {
self.cache.get(&filter).and_then(|value| {
if let Ok(elapsed) = value.cached_time.elapsed() {
if elapsed < Duration::from_secs(self.duration) {
return Some((value.slot, value.accounts.clone()));
}
}
None
})
}
pub fn set_largest_accounts(
&mut self,
filter: &Option<RpcLargestAccountsFilter>,
slot: u64,
accounts: &[RpcAccountBalance],
) {
self.cache.insert(
filter.clone(),
LargestAccountsCacheValue {
accounts: accounts.to_owned(),
slot,
cached_time: SystemTime::now(),
},
);
}
}
#[cfg(test)]
pub mod test {
use super::*;
#[test]
fn test_old_entries_expire() {
let mut cache = LargestAccountsCache::new(1);
let filter = Some(RpcLargestAccountsFilter::Circulating);
let accounts: Vec<RpcAccountBalance> = Vec::new();
cache.set_largest_accounts(&filter, 1000, &accounts);
std::thread::sleep(Duration::from_secs(1));
assert_eq!(cache.get_largest_accounts(&filter), None);
}
}

View File

@ -31,7 +31,7 @@ pub struct RpcSimulateTransactionConfig {
pub encoding: Option<UiTransactionEncoding>, pub encoding: Option<UiTransactionEncoding>,
} }
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub enum RpcLargestAccountsFilter { pub enum RpcLargestAccountsFilter {
Circulating, Circulating,

View File

@ -21,6 +21,7 @@ use solana_account_decoder::{
UiAccount, UiAccountData, UiAccountEncoding, UiDataSliceConfig, UiAccount, UiAccountData, UiAccountEncoding, UiDataSliceConfig,
}; };
use solana_client::{ use solana_client::{
rpc_cache::LargestAccountsCache,
rpc_config::*, rpc_config::*,
rpc_custom_error::RpcCustomError, rpc_custom_error::RpcCustomError,
rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType}, rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType},
@ -136,6 +137,7 @@ pub struct JsonRpcRequestProcessor {
runtime: Arc<Runtime>, runtime: Arc<Runtime>,
bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>, bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>, optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
largest_accounts_cache: Arc<RwLock<LargestAccountsCache>>,
} }
impl Metadata for JsonRpcRequestProcessor {} impl Metadata for JsonRpcRequestProcessor {}
@ -218,6 +220,7 @@ impl JsonRpcRequestProcessor {
runtime: Arc<Runtime>, runtime: Arc<Runtime>,
bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>, bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>, optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
largest_accounts_cache: Arc<RwLock<LargestAccountsCache>>,
) -> (Self, Receiver<TransactionInfo>) { ) -> (Self, Receiver<TransactionInfo>) {
let (sender, receiver) = channel(); let (sender, receiver) = channel();
( (
@ -235,6 +238,7 @@ impl JsonRpcRequestProcessor {
runtime, runtime,
bigtable_ledger_storage, bigtable_ledger_storage,
optimistically_confirmed_bank, optimistically_confirmed_bank,
largest_accounts_cache,
}, },
receiver, receiver,
) )
@ -274,6 +278,7 @@ impl JsonRpcRequestProcessor {
optimistically_confirmed_bank: Arc::new(RwLock::new(OptimisticallyConfirmedBank { optimistically_confirmed_bank: Arc::new(RwLock::new(OptimisticallyConfirmedBank {
bank: bank.clone(), bank: bank.clone(),
})), })),
largest_accounts_cache: Arc::new(RwLock::new(LargestAccountsCache::new(30))),
} }
} }
@ -505,13 +510,38 @@ impl JsonRpcRequestProcessor {
self.bank(commitment).capitalization() self.bank(commitment).capitalization()
} }
fn get_cached_largest_accounts(
&self,
filter: &Option<RpcLargestAccountsFilter>,
) -> Option<(u64, Vec<RpcAccountBalance>)> {
let largest_accounts_cache = self.largest_accounts_cache.read().unwrap();
largest_accounts_cache.get_largest_accounts(filter)
}
fn set_cached_largest_accounts(
&self,
filter: &Option<RpcLargestAccountsFilter>,
slot: u64,
accounts: &[RpcAccountBalance],
) {
let mut largest_accounts_cache = self.largest_accounts_cache.write().unwrap();
largest_accounts_cache.set_largest_accounts(filter, slot, accounts)
}
fn get_largest_accounts( fn get_largest_accounts(
&self, &self,
config: Option<RpcLargestAccountsConfig>, config: Option<RpcLargestAccountsConfig>,
) -> RpcResponse<Vec<RpcAccountBalance>> { ) -> RpcResponse<Vec<RpcAccountBalance>> {
let config = config.unwrap_or_default(); let config = config.unwrap_or_default();
let bank = self.bank(config.commitment); let bank = self.bank(config.commitment);
let (addresses, address_filter) = if let Some(filter) = config.filter {
if let Some((slot, accounts)) = self.get_cached_largest_accounts(&config.filter) {
Response {
context: RpcResponseContext { slot },
value: accounts,
}
} else {
let (addresses, address_filter) = if let Some(filter) = config.clone().filter {
let non_circulating_supply = calculate_non_circulating_supply(&bank); let non_circulating_supply = calculate_non_circulating_supply(&bank);
let addresses = non_circulating_supply.accounts.into_iter().collect(); let addresses = non_circulating_supply.accounts.into_iter().collect();
let address_filter = match filter { let address_filter = match filter {
@ -522,16 +552,18 @@ impl JsonRpcRequestProcessor {
} else { } else {
(HashSet::new(), AccountAddressFilter::Exclude) (HashSet::new(), AccountAddressFilter::Exclude)
}; };
new_response( let accounts = bank
&bank, .get_largest_accounts(NUM_LARGEST_ACCOUNTS, &addresses, address_filter)
bank.get_largest_accounts(NUM_LARGEST_ACCOUNTS, &addresses, address_filter)
.into_iter() .into_iter()
.map(|(address, lamports)| RpcAccountBalance { .map(|(address, lamports)| RpcAccountBalance {
address: address.to_string(), address: address.to_string(),
lamports, lamports,
}) })
.collect(), .collect::<Vec<RpcAccountBalance>>();
)
self.set_cached_largest_accounts(&config.filter, bank.slot(), &accounts);
new_response(&bank, accounts)
}
} }
fn get_supply(&self, commitment: Option<CommitmentConfig>) -> RpcResponse<RpcSupply> { fn get_supply(&self, commitment: Option<CommitmentConfig>) -> RpcResponse<RpcSupply> {
@ -3185,6 +3217,7 @@ pub mod tests {
Arc::new(tokio::runtime::Runtime::new().unwrap()), Arc::new(tokio::runtime::Runtime::new().unwrap()),
None, None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
); );
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
@ -4594,6 +4627,7 @@ pub mod tests {
Arc::new(tokio::runtime::Runtime::new().unwrap()), Arc::new(tokio::runtime::Runtime::new().unwrap()),
None, None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
); );
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
@ -4790,6 +4824,7 @@ pub mod tests {
Arc::new(tokio::runtime::Runtime::new().unwrap()), Arc::new(tokio::runtime::Runtime::new().unwrap()),
None, None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
); );
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
assert_eq!(request_processor.validator_exit(), false); assert_eq!(request_processor.validator_exit(), false);
@ -4823,6 +4858,7 @@ pub mod tests {
Arc::new(tokio::runtime::Runtime::new().unwrap()), Arc::new(tokio::runtime::Runtime::new().unwrap()),
None, None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
); );
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
assert_eq!(request_processor.validator_exit(), true); assert_eq!(request_processor.validator_exit(), true);
@ -4915,6 +4951,7 @@ pub mod tests {
Arc::new(tokio::runtime::Runtime::new().unwrap()), Arc::new(tokio::runtime::Runtime::new().unwrap()),
None, None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
); );
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
assert_eq!( assert_eq!(
@ -6144,6 +6181,7 @@ pub mod tests {
Arc::new(tokio::runtime::Runtime::new().unwrap()), Arc::new(tokio::runtime::Runtime::new().unwrap()),
None, None,
optimistically_confirmed_bank.clone(), optimistically_confirmed_bank.clone(),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
); );
let mut io = MetaIoHandler::default(); let mut io = MetaIoHandler::default();

View File

@ -16,6 +16,7 @@ use jsonrpc_http_server::{
RequestMiddlewareAction, ServerBuilder, RequestMiddlewareAction, ServerBuilder,
}; };
use regex::Regex; use regex::Regex;
use solana_client::rpc_cache::LargestAccountsCache;
use solana_ledger::blockstore::Blockstore; use solana_ledger::blockstore::Blockstore;
use solana_metrics::inc_new_counter_info; use solana_metrics::inc_new_counter_info;
use solana_runtime::{ use solana_runtime::{
@ -35,6 +36,8 @@ use std::{
use tokio::runtime; use tokio::runtime;
use tokio_util::codec::{BytesCodec, FramedRead}; use tokio_util::codec::{BytesCodec, FramedRead};
const LARGEST_ACCOUNTS_CACHE_DURATION: u64 = 60 * 60 * 2;
pub struct JsonRpcService { pub struct JsonRpcService {
thread_hdl: JoinHandle<()>, thread_hdl: JoinHandle<()>,
@ -262,6 +265,10 @@ impl JsonRpcService {
override_health_check, override_health_check,
)); ));
let largest_accounts_cache = Arc::new(RwLock::new(LargestAccountsCache::new(
LARGEST_ACCOUNTS_CACHE_DURATION,
)));
let tpu_address = cluster_info.my_contact_info().tpu; let tpu_address = cluster_info.my_contact_info().tpu;
let runtime = Arc::new( let runtime = Arc::new(
runtime::Builder::new_multi_thread() runtime::Builder::new_multi_thread()
@ -322,6 +329,7 @@ impl JsonRpcService {
runtime, runtime,
bigtable_ledger_storage, bigtable_ledger_storage,
optimistically_confirmed_bank, optimistically_confirmed_bank,
largest_accounts_cache,
); );
let leader_info = let leader_info =