Add blockstore column to store performance sampling data (#12251)
* Add blockstore column to store performance sampling data * introduce timer and write performance metrics to blockstore * introduce getRecentPerformanceSamples rpc * only run on rpc nodes enabled with transaction history * add unit tests for get_recent_performance_samples * remove RpcResponse from rpc call * refactor to use Instant::now and elapsed for timer * switch to root bank and ensure not negative subraction * Add PerfSamples to purge/compaction * refactor to use Instant::now and elapsed for timer * switch to root bank and ensure not negative subraction * remove duplicate constants Co-authored-by: Tyera Eulberg <tyera@solana.com>
This commit is contained in:
@ -16,6 +16,7 @@ pub mod cluster_info_vote_listener;
|
||||
pub mod commitment_service;
|
||||
pub mod completed_data_sets_service;
|
||||
mod deprecated;
|
||||
pub mod sample_performance_service;
|
||||
pub mod shred_fetch_stage;
|
||||
#[macro_use]
|
||||
pub mod contact_info;
|
||||
|
104
core/src/rpc.rs
104
core/src/rpc.rs
@ -33,7 +33,10 @@ use solana_client::{
|
||||
rpc_response::*,
|
||||
};
|
||||
use solana_faucet::faucet::request_airdrop_transaction;
|
||||
use solana_ledger::{blockstore::Blockstore, blockstore_db::BlockstoreError, get_tmp_ledger_path};
|
||||
use solana_ledger::{
|
||||
blockstore::Blockstore, blockstore_db::BlockstoreError, blockstore_meta::PerfSample,
|
||||
get_tmp_ledger_path,
|
||||
};
|
||||
use solana_perf::packet::PACKET_DATA_SIZE;
|
||||
use solana_runtime::{
|
||||
accounts::AccountAddressFilter,
|
||||
@ -79,6 +82,7 @@ use std::{
|
||||
use tokio::runtime;
|
||||
|
||||
pub const MAX_REQUEST_PAYLOAD_SIZE: usize = 50 * (1 << 10); // 50kB
|
||||
pub const PERFORMANCE_SAMPLES_LIMIT: usize = 720;
|
||||
|
||||
fn new_response<T>(bank: &Bank, value: T) -> RpcResponse<T> {
|
||||
let context = RpcResponseContext { slot: bank.slot() };
|
||||
@ -1496,6 +1500,13 @@ pub trait RpcSol {
|
||||
#[rpc(meta, name = "getClusterNodes")]
|
||||
fn get_cluster_nodes(&self, meta: Self::Metadata) -> Result<Vec<RpcContactInfo>>;
|
||||
|
||||
#[rpc(meta, name = "getRecentPerformanceSamples")]
|
||||
fn get_recent_performance_samples(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
limit: Option<usize>,
|
||||
) -> Result<Vec<(Slot, PerfSample)>>;
|
||||
|
||||
#[rpc(meta, name = "getEpochInfo")]
|
||||
fn get_epoch_info(
|
||||
&self,
|
||||
@ -1885,6 +1896,30 @@ impl RpcSol for RpcSolImpl {
|
||||
Ok(meta.get_balance(&pubkey, commitment))
|
||||
}
|
||||
|
||||
fn get_recent_performance_samples(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
limit: Option<usize>,
|
||||
) -> Result<Vec<(Slot, PerfSample)>> {
|
||||
debug!("get_recent_performance_samples request received");
|
||||
|
||||
let limit = limit.unwrap_or(PERFORMANCE_SAMPLES_LIMIT);
|
||||
|
||||
if limit > PERFORMANCE_SAMPLES_LIMIT {
|
||||
return Err(Error::invalid_params(format!(
|
||||
"Invalid limit; max {}",
|
||||
PERFORMANCE_SAMPLES_LIMIT
|
||||
)));
|
||||
}
|
||||
|
||||
meta.blockstore
|
||||
.get_recent_perf_samples(limit)
|
||||
.map_err(|err| {
|
||||
warn!("get_recent_performance_samples failed: {:?}", err);
|
||||
Error::invalid_request()
|
||||
})
|
||||
}
|
||||
|
||||
fn get_cluster_nodes(&self, meta: Self::Metadata) -> Result<Vec<RpcContactInfo>> {
|
||||
debug!("get_cluster_nodes rpc request received");
|
||||
let cluster_info = &meta.cluster_info;
|
||||
@ -2670,6 +2705,16 @@ pub mod tests {
|
||||
&socketaddr!("127.0.0.1:1234"),
|
||||
));
|
||||
|
||||
let sample1 = PerfSample {
|
||||
num_slots: 1,
|
||||
num_transactions: 4,
|
||||
sample_period_secs: 60,
|
||||
};
|
||||
|
||||
blockstore
|
||||
.write_perf_sample(0, &sample1)
|
||||
.expect("write to blockstore");
|
||||
|
||||
let (meta, receiver) = JsonRpcRequestProcessor::new(
|
||||
JsonRpcConfig {
|
||||
enable_rpc_transaction_history: true,
|
||||
@ -2797,6 +2842,63 @@ pub mod tests {
|
||||
assert_eq!(expected, result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rpc_get_recent_performance_samples() {
|
||||
let bob_pubkey = Pubkey::new_rand();
|
||||
let RpcHandler { io, meta, .. } = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
|
||||
let req = r#"{"jsonrpc":"2.0","id":1,"method":"getRecentPerformanceSamples"}"#;
|
||||
|
||||
let res = io.handle_request_sync(&req, meta);
|
||||
let result: Response = serde_json::from_str(&res.expect("actual response"))
|
||||
.expect("actual response deserialization");
|
||||
|
||||
let expected = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"result": [
|
||||
[
|
||||
0,
|
||||
{
|
||||
"num_slots": 1,
|
||||
"num_transactions": 4,
|
||||
"sample_period_secs": 60
|
||||
}
|
||||
]
|
||||
],
|
||||
});
|
||||
|
||||
let expected: Response =
|
||||
serde_json::from_value(expected).expect("expected response deserialization");
|
||||
assert_eq!(expected, result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rpc_get_recent_performance_samples_invalid_limit() {
|
||||
let bob_pubkey = Pubkey::new_rand();
|
||||
let RpcHandler { io, meta, .. } = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
|
||||
let req =
|
||||
r#"{"jsonrpc":"2.0","id":1,"method":"getRecentPerformanceSamples","params":[10000]}"#;
|
||||
|
||||
let res = io.handle_request_sync(&req, meta);
|
||||
let result: Response = serde_json::from_str(&res.expect("actual response"))
|
||||
.expect("actual response deserialization");
|
||||
|
||||
let expected = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"error": {
|
||||
"code": -32602,
|
||||
"message": "Invalid limit; max 720"
|
||||
},
|
||||
"id": 1
|
||||
});
|
||||
|
||||
let expected: Response =
|
||||
serde_json::from_value(expected).expect("expected response deserialization");
|
||||
assert_eq!(expected, result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rpc_get_slot_leader() {
|
||||
let bob_pubkey = Pubkey::new_rand();
|
||||
|
104
core/src/sample_performance_service.rs
Normal file
104
core/src/sample_performance_service.rs
Normal file
@ -0,0 +1,104 @@
|
||||
use solana_ledger::{blockstore::Blockstore, blockstore_meta::PerfSample};
|
||||
use solana_runtime::bank_forks::BankForks;
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc, RwLock,
|
||||
},
|
||||
thread::{self, sleep, Builder, JoinHandle},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
const SAMPLE_INTERVAL: u64 = 60;
|
||||
const SLEEP_INTERVAL: u64 = 500;
|
||||
|
||||
pub struct SamplePerformanceSnapshot {
|
||||
pub num_transactions: u64,
|
||||
pub num_slots: u64,
|
||||
}
|
||||
|
||||
pub struct SamplePerformanceService {
|
||||
thread_hdl: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl SamplePerformanceService {
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub fn new(
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
) -> Self {
|
||||
let exit = exit.clone();
|
||||
let blockstore = blockstore.clone();
|
||||
let bank_forks = bank_forks.clone();
|
||||
|
||||
info!("Starting SamplePerformance service");
|
||||
let thread_hdl = Builder::new()
|
||||
.name("sample-performance".to_string())
|
||||
.spawn(move || {
|
||||
Self::run(bank_forks, &blockstore, exit);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
Self { thread_hdl }
|
||||
}
|
||||
|
||||
pub fn run(
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
exit: Arc<AtomicBool>,
|
||||
) {
|
||||
let forks = bank_forks.read().unwrap();
|
||||
let bank = forks.root_bank().clone();
|
||||
let highest_slot = forks.highest_slot();
|
||||
drop(forks);
|
||||
|
||||
let mut sample_snapshot = SamplePerformanceSnapshot {
|
||||
num_transactions: bank.transaction_count(),
|
||||
num_slots: highest_slot,
|
||||
};
|
||||
|
||||
let mut now = Instant::now();
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
let elapsed = now.elapsed();
|
||||
|
||||
if elapsed.as_secs() >= SAMPLE_INTERVAL {
|
||||
now = Instant::now();
|
||||
let bank_forks = bank_forks.read().unwrap();
|
||||
let bank = bank_forks.root_bank().clone();
|
||||
let highest_slot = bank_forks.highest_slot();
|
||||
drop(bank_forks);
|
||||
|
||||
let perf_sample = PerfSample {
|
||||
num_slots: highest_slot
|
||||
.checked_sub(sample_snapshot.num_slots)
|
||||
.unwrap_or_default(),
|
||||
num_transactions: bank
|
||||
.transaction_count()
|
||||
.checked_sub(sample_snapshot.num_transactions)
|
||||
.unwrap_or_default(),
|
||||
sample_period_secs: elapsed.as_secs() as u16,
|
||||
};
|
||||
|
||||
if let Err(e) = blockstore.write_perf_sample(highest_slot, &perf_sample) {
|
||||
error!("write_perf_sample failed: slot {:?} {:?}", highest_slot, e);
|
||||
}
|
||||
|
||||
sample_snapshot = SamplePerformanceSnapshot {
|
||||
num_transactions: bank.transaction_count(),
|
||||
num_slots: highest_slot,
|
||||
};
|
||||
}
|
||||
|
||||
sleep(Duration::from_millis(SLEEP_INTERVAL));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn join(self) -> thread::Result<()> {
|
||||
self.thread_hdl.join()
|
||||
}
|
||||
}
|
@ -16,6 +16,7 @@ use crate::{
|
||||
rpc_pubsub_service::PubSubService,
|
||||
rpc_service::JsonRpcService,
|
||||
rpc_subscriptions::RpcSubscriptions,
|
||||
sample_performance_service::SamplePerformanceService,
|
||||
serve_repair::ServeRepair,
|
||||
serve_repair_service::ServeRepairService,
|
||||
sigverify,
|
||||
@ -169,6 +170,7 @@ pub struct Validator {
|
||||
transaction_status_service: Option<TransactionStatusService>,
|
||||
rewards_recorder_service: Option<RewardsRecorderService>,
|
||||
cache_block_time_service: Option<CacheBlockTimeService>,
|
||||
sample_performance_service: Option<SamplePerformanceService>,
|
||||
gossip_service: GossipService,
|
||||
serve_repair_service: ServeRepairService,
|
||||
completed_data_sets_service: CompletedDataSetsService,
|
||||
@ -279,6 +281,17 @@ impl Validator {
|
||||
let bank = bank_forks.working_bank();
|
||||
let bank_forks = Arc::new(RwLock::new(bank_forks));
|
||||
|
||||
let sample_performance_service =
|
||||
if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
|
||||
Some(SamplePerformanceService::new(
|
||||
&bank_forks,
|
||||
&blockstore,
|
||||
&exit,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
info!("Starting validator with working bank slot {}", bank.slot());
|
||||
{
|
||||
let hard_forks: Vec<_> = bank.hard_forks().read().unwrap().iter().copied().collect();
|
||||
@ -554,6 +567,7 @@ impl Validator {
|
||||
transaction_status_service,
|
||||
rewards_recorder_service,
|
||||
cache_block_time_service,
|
||||
sample_performance_service,
|
||||
snapshot_packager_service,
|
||||
completed_data_sets_service,
|
||||
tpu,
|
||||
@ -622,6 +636,10 @@ impl Validator {
|
||||
cache_block_time_service.join()?;
|
||||
}
|
||||
|
||||
if let Some(sample_performance_service) = self.sample_performance_service {
|
||||
sample_performance_service.join()?;
|
||||
}
|
||||
|
||||
if let Some(s) = self.snapshot_packager_service {
|
||||
s.join()?;
|
||||
}
|
||||
|
@ -137,6 +137,7 @@ pub struct Blockstore {
|
||||
active_transaction_status_index: RwLock<u64>,
|
||||
rewards_cf: LedgerColumn<cf::Rewards>,
|
||||
blocktime_cf: LedgerColumn<cf::Blocktime>,
|
||||
perf_samples_cf: LedgerColumn<cf::PerfSamples>,
|
||||
last_root: Arc<RwLock<Slot>>,
|
||||
insert_shreds_lock: Arc<Mutex<()>>,
|
||||
pub new_shreds_signals: Vec<SyncSender<bool>>,
|
||||
@ -293,6 +294,7 @@ impl Blockstore {
|
||||
let transaction_status_index_cf = db.column();
|
||||
let rewards_cf = db.column();
|
||||
let blocktime_cf = db.column();
|
||||
let perf_samples_cf = db.column();
|
||||
|
||||
let db = Arc::new(db);
|
||||
|
||||
@ -338,6 +340,7 @@ impl Blockstore {
|
||||
active_transaction_status_index: RwLock::new(active_transaction_status_index),
|
||||
rewards_cf,
|
||||
blocktime_cf,
|
||||
perf_samples_cf,
|
||||
new_shreds_signals: vec![],
|
||||
completed_slots_senders: vec![],
|
||||
insert_shreds_lock: Arc::new(Mutex::new(())),
|
||||
@ -2303,6 +2306,22 @@ impl Blockstore {
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub fn get_recent_perf_samples(&self, num: usize) -> Result<Vec<(Slot, PerfSample)>> {
|
||||
Ok(self
|
||||
.db
|
||||
.iter::<cf::PerfSamples>(IteratorMode::End)?
|
||||
.take(num)
|
||||
.map(|(slot, data)| {
|
||||
let perf_sample = deserialize(&data).unwrap();
|
||||
(slot, perf_sample)
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub fn write_perf_sample(&self, index: Slot, perf_sample: &PerfSample) -> Result<()> {
|
||||
self.perf_samples_cf.put(index, perf_sample)
|
||||
}
|
||||
|
||||
/// Returns the entry vector for the slot starting with `shred_start_index`
|
||||
pub fn get_slot_entries(&self, slot: Slot, shred_start_index: u64) -> Result<Vec<Entry>> {
|
||||
self.get_slot_entries_with_shred_info(slot, shred_start_index, false)
|
||||
@ -6930,6 +6949,38 @@ pub mod tests {
|
||||
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_get_perf_samples() {
|
||||
let blockstore_path = get_tmp_ledger_path!();
|
||||
{
|
||||
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
||||
let num_entries: usize = 10;
|
||||
let mut perf_samples: Vec<(Slot, PerfSample)> = vec![];
|
||||
for x in 1..num_entries + 1 {
|
||||
perf_samples.push((
|
||||
x as u64 * 50,
|
||||
PerfSample {
|
||||
num_transactions: 1000 + x as u64,
|
||||
num_slots: 50,
|
||||
sample_period_secs: 20,
|
||||
},
|
||||
));
|
||||
}
|
||||
for (slot, sample) in perf_samples.iter() {
|
||||
blockstore.write_perf_sample(*slot, sample).unwrap();
|
||||
}
|
||||
for x in 0..num_entries {
|
||||
let mut expected_samples = perf_samples[num_entries - 1 - x..].to_vec();
|
||||
expected_samples.sort_by(|a, b| b.0.cmp(&a.0));
|
||||
assert_eq!(
|
||||
blockstore.get_recent_perf_samples(x + 1).unwrap(),
|
||||
expected_samples
|
||||
);
|
||||
}
|
||||
}
|
||||
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_lowest_slot() {
|
||||
let blockstore_path = get_tmp_ledger_path!();
|
||||
|
@ -137,6 +137,10 @@ impl Blockstore {
|
||||
& self
|
||||
.db
|
||||
.delete_range_cf::<cf::Blocktime>(&mut write_batch, from_slot, to_slot)
|
||||
.is_ok()
|
||||
& self
|
||||
.db
|
||||
.delete_range_cf::<cf::PerfSamples>(&mut write_batch, from_slot, to_slot)
|
||||
.is_ok();
|
||||
let mut w_active_transaction_status_index =
|
||||
self.active_transaction_status_index.write().unwrap();
|
||||
@ -231,6 +235,10 @@ impl Blockstore {
|
||||
&& self
|
||||
.blocktime_cf
|
||||
.compact_range(from_slot, to_slot)
|
||||
.unwrap_or(false)
|
||||
&& self
|
||||
.perf_samples_cf
|
||||
.compact_range(from_slot, to_slot)
|
||||
.unwrap_or(false);
|
||||
compact_timer.stop();
|
||||
if !result {
|
||||
|
@ -52,6 +52,8 @@ const TRANSACTION_STATUS_INDEX_CF: &str = "transaction_status_index";
|
||||
const REWARDS_CF: &str = "rewards";
|
||||
/// Column family for Blocktime
|
||||
const BLOCKTIME_CF: &str = "blocktime";
|
||||
/// Column family for Performance Samples
|
||||
const PERF_SAMPLES_CF: &str = "perf_samples";
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum BlockstoreError {
|
||||
@ -140,6 +142,10 @@ pub mod columns {
|
||||
#[derive(Debug)]
|
||||
/// The blocktime column
|
||||
pub struct Blocktime;
|
||||
|
||||
#[derive(Debug)]
|
||||
/// The performance samples column
|
||||
pub struct PerfSamples;
|
||||
}
|
||||
|
||||
pub enum AccessType {
|
||||
@ -200,7 +206,7 @@ impl Rocks {
|
||||
) -> Result<Rocks> {
|
||||
use columns::{
|
||||
AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans,
|
||||
Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
|
||||
PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
|
||||
TransactionStatusIndex,
|
||||
};
|
||||
|
||||
@ -236,6 +242,8 @@ impl Rocks {
|
||||
let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options());
|
||||
let blocktime_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options());
|
||||
let perf_samples_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options());
|
||||
|
||||
let cfs = vec![
|
||||
(SlotMeta::NAME, meta_cf_descriptor),
|
||||
@ -255,6 +263,7 @@ impl Rocks {
|
||||
),
|
||||
(Rewards::NAME, rewards_cf_descriptor),
|
||||
(Blocktime::NAME, blocktime_cf_descriptor),
|
||||
(PerfSamples::NAME, perf_samples_cf_descriptor),
|
||||
];
|
||||
|
||||
// Open the database
|
||||
@ -293,7 +302,7 @@ impl Rocks {
|
||||
fn columns(&self) -> Vec<&'static str> {
|
||||
use columns::{
|
||||
AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans,
|
||||
Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
|
||||
PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
|
||||
TransactionStatusIndex,
|
||||
};
|
||||
|
||||
@ -312,6 +321,7 @@ impl Rocks {
|
||||
TransactionStatusIndex::NAME,
|
||||
Rewards::NAME,
|
||||
Blocktime::NAME,
|
||||
PerfSamples::NAME,
|
||||
]
|
||||
}
|
||||
|
||||
@ -545,6 +555,14 @@ impl TypedColumn for columns::Blocktime {
|
||||
type Type = UnixTimestamp;
|
||||
}
|
||||
|
||||
impl SlotColumn for columns::PerfSamples {}
|
||||
impl ColumnName for columns::PerfSamples {
|
||||
const NAME: &'static str = PERF_SAMPLES_CF;
|
||||
}
|
||||
impl TypedColumn for columns::PerfSamples {
|
||||
type Type = blockstore_meta::PerfSample;
|
||||
}
|
||||
|
||||
impl Column for columns::ShredCode {
|
||||
type Index = (u64, u64);
|
||||
|
||||
|
@ -243,6 +243,13 @@ pub struct AddressSignatureMeta {
|
||||
pub writeable: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
|
||||
pub struct PerfSample {
|
||||
pub num_transactions: u64,
|
||||
pub num_slots: u64,
|
||||
pub sample_period_secs: u16,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
Reference in New Issue
Block a user