From 65a6bfad096ed37757464198d9cffa57bba427a2 Mon Sep 17 00:00:00 2001 From: Josh Date: Tue, 22 Sep 2020 12:26:32 -0700 Subject: [PATCH] Add blockstore column to store performance sampling data (#12251) * Add blockstore column to store performance sampling data * introduce timer and write performance metrics to blockstore * introduce getRecentPerformanceSamples rpc * only run on rpc nodes enabled with transaction history * add unit tests for get_recent_performance_samples * remove RpcResponse from rpc call * refactor to use Instant::now and elapsed for timer * switch to root bank and ensure not negative subraction * Add PerfSamples to purge/compaction * refactor to use Instant::now and elapsed for timer * switch to root bank and ensure not negative subraction * remove duplicate constants Co-authored-by: Tyera Eulberg --- core/src/lib.rs | 1 + core/src/rpc.rs | 104 +++++++++++++++++++++- core/src/sample_performance_service.rs | 104 ++++++++++++++++++++++ core/src/validator.rs | 18 ++++ ledger/src/blockstore.rs | 51 +++++++++++ ledger/src/blockstore/blockstore_purge.rs | 8 ++ ledger/src/blockstore_db.rs | 22 ++++- ledger/src/blockstore_meta.rs | 7 ++ 8 files changed, 312 insertions(+), 3 deletions(-) create mode 100644 core/src/sample_performance_service.rs diff --git a/core/src/lib.rs b/core/src/lib.rs index 492610a59b..97c39a03fa 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -16,6 +16,7 @@ pub mod cluster_info_vote_listener; pub mod commitment_service; pub mod completed_data_sets_service; mod deprecated; +pub mod sample_performance_service; pub mod shred_fetch_stage; #[macro_use] pub mod contact_info; diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 5ed960b628..15225a8b42 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -33,7 +33,10 @@ use solana_client::{ rpc_response::*, }; use solana_faucet::faucet::request_airdrop_transaction; -use solana_ledger::{blockstore::Blockstore, blockstore_db::BlockstoreError, get_tmp_ledger_path}; +use solana_ledger::{ + blockstore::Blockstore, blockstore_db::BlockstoreError, blockstore_meta::PerfSample, + get_tmp_ledger_path, +}; use solana_perf::packet::PACKET_DATA_SIZE; use solana_runtime::{ accounts::AccountAddressFilter, @@ -79,6 +82,7 @@ use std::{ use tokio::runtime; pub const MAX_REQUEST_PAYLOAD_SIZE: usize = 50 * (1 << 10); // 50kB +pub const PERFORMANCE_SAMPLES_LIMIT: usize = 720; fn new_response(bank: &Bank, value: T) -> RpcResponse { let context = RpcResponseContext { slot: bank.slot() }; @@ -1496,6 +1500,13 @@ pub trait RpcSol { #[rpc(meta, name = "getClusterNodes")] fn get_cluster_nodes(&self, meta: Self::Metadata) -> Result>; + #[rpc(meta, name = "getRecentPerformanceSamples")] + fn get_recent_performance_samples( + &self, + meta: Self::Metadata, + limit: Option, + ) -> Result>; + #[rpc(meta, name = "getEpochInfo")] fn get_epoch_info( &self, @@ -1885,6 +1896,30 @@ impl RpcSol for RpcSolImpl { Ok(meta.get_balance(&pubkey, commitment)) } + fn get_recent_performance_samples( + &self, + meta: Self::Metadata, + limit: Option, + ) -> Result> { + debug!("get_recent_performance_samples request received"); + + let limit = limit.unwrap_or(PERFORMANCE_SAMPLES_LIMIT); + + if limit > PERFORMANCE_SAMPLES_LIMIT { + return Err(Error::invalid_params(format!( + "Invalid limit; max {}", + PERFORMANCE_SAMPLES_LIMIT + ))); + } + + meta.blockstore + .get_recent_perf_samples(limit) + .map_err(|err| { + warn!("get_recent_performance_samples failed: {:?}", err); + Error::invalid_request() + }) + } + fn get_cluster_nodes(&self, meta: Self::Metadata) -> Result> { debug!("get_cluster_nodes rpc request received"); let cluster_info = &meta.cluster_info; @@ -2670,6 +2705,16 @@ pub mod tests { &socketaddr!("127.0.0.1:1234"), )); + let sample1 = PerfSample { + num_slots: 1, + num_transactions: 4, + sample_period_secs: 60, + }; + + blockstore + .write_perf_sample(0, &sample1) + .expect("write to blockstore"); + let (meta, receiver) = JsonRpcRequestProcessor::new( JsonRpcConfig { enable_rpc_transaction_history: true, @@ -2797,6 +2842,63 @@ pub mod tests { assert_eq!(expected, result); } + #[test] + fn test_rpc_get_recent_performance_samples() { + let bob_pubkey = Pubkey::new_rand(); + let RpcHandler { io, meta, .. } = start_rpc_handler_with_tx(&bob_pubkey); + + let req = r#"{"jsonrpc":"2.0","id":1,"method":"getRecentPerformanceSamples"}"#; + + let res = io.handle_request_sync(&req, meta); + let result: Response = serde_json::from_str(&res.expect("actual response")) + .expect("actual response deserialization"); + + let expected = json!({ + "jsonrpc": "2.0", + "id": 1, + "result": [ + [ + 0, + { + "num_slots": 1, + "num_transactions": 4, + "sample_period_secs": 60 + } + ] + ], + }); + + let expected: Response = + serde_json::from_value(expected).expect("expected response deserialization"); + assert_eq!(expected, result); + } + + #[test] + fn test_rpc_get_recent_performance_samples_invalid_limit() { + let bob_pubkey = Pubkey::new_rand(); + let RpcHandler { io, meta, .. } = start_rpc_handler_with_tx(&bob_pubkey); + + let req = + r#"{"jsonrpc":"2.0","id":1,"method":"getRecentPerformanceSamples","params":[10000]}"#; + + let res = io.handle_request_sync(&req, meta); + let result: Response = serde_json::from_str(&res.expect("actual response")) + .expect("actual response deserialization"); + + let expected = json!({ + "jsonrpc": "2.0", + "error": { + "code": -32602, + "message": "Invalid limit; max 720" + }, + "id": 1 + }); + + let expected: Response = + serde_json::from_value(expected).expect("expected response deserialization"); + assert_eq!(expected, result); + } + #[test] fn test_rpc_get_slot_leader() { let bob_pubkey = Pubkey::new_rand(); diff --git a/core/src/sample_performance_service.rs b/core/src/sample_performance_service.rs new file mode 100644 index 0000000000..8b6372aaa1 --- /dev/null +++ b/core/src/sample_performance_service.rs @@ -0,0 +1,104 @@ +use solana_ledger::{blockstore::Blockstore, blockstore_meta::PerfSample}; +use solana_runtime::bank_forks::BankForks; +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::{self, sleep, Builder, JoinHandle}, + time::{Duration, Instant}, +}; + +const SAMPLE_INTERVAL: u64 = 60; +const SLEEP_INTERVAL: u64 = 500; + +pub struct SamplePerformanceSnapshot { + pub num_transactions: u64, + pub num_slots: u64, +} + +pub struct SamplePerformanceService { + thread_hdl: JoinHandle<()>, +} + +impl SamplePerformanceService { + #[allow(clippy::new_ret_no_self)] + pub fn new( + bank_forks: &Arc>, + blockstore: &Arc, + exit: &Arc, + ) -> Self { + let exit = exit.clone(); + let blockstore = blockstore.clone(); + let bank_forks = bank_forks.clone(); + + info!("Starting SamplePerformance service"); + let thread_hdl = Builder::new() + .name("sample-performance".to_string()) + .spawn(move || { + Self::run(bank_forks, &blockstore, exit); + }) + .unwrap(); + + Self { thread_hdl } + } + + pub fn run( + bank_forks: Arc>, + blockstore: &Arc, + exit: Arc, + ) { + let forks = bank_forks.read().unwrap(); + let bank = forks.root_bank().clone(); + let highest_slot = forks.highest_slot(); + drop(forks); + + let mut sample_snapshot = SamplePerformanceSnapshot { + num_transactions: bank.transaction_count(), + num_slots: highest_slot, + }; + + let mut now = Instant::now(); + loop { + if exit.load(Ordering::Relaxed) { + break; + } + + let elapsed = now.elapsed(); + + if elapsed.as_secs() >= SAMPLE_INTERVAL { + now = Instant::now(); + let bank_forks = bank_forks.read().unwrap(); + let bank = bank_forks.root_bank().clone(); + let highest_slot = bank_forks.highest_slot(); + drop(bank_forks); + + let perf_sample = PerfSample { + num_slots: highest_slot + .checked_sub(sample_snapshot.num_slots) + .unwrap_or_default(), + num_transactions: bank + .transaction_count() + .checked_sub(sample_snapshot.num_transactions) + .unwrap_or_default(), + sample_period_secs: elapsed.as_secs() as u16, + }; + + if let Err(e) = blockstore.write_perf_sample(highest_slot, &perf_sample) { + error!("write_perf_sample failed: slot {:?} {:?}", highest_slot, e); + } + + sample_snapshot = SamplePerformanceSnapshot { + num_transactions: bank.transaction_count(), + num_slots: highest_slot, + }; + } + + sleep(Duration::from_millis(SLEEP_INTERVAL)); + } + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} diff --git a/core/src/validator.rs b/core/src/validator.rs index 2541969337..e1adc6e08a 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -16,6 +16,7 @@ use crate::{ rpc_pubsub_service::PubSubService, rpc_service::JsonRpcService, rpc_subscriptions::RpcSubscriptions, + sample_performance_service::SamplePerformanceService, serve_repair::ServeRepair, serve_repair_service::ServeRepairService, sigverify, @@ -169,6 +170,7 @@ pub struct Validator { transaction_status_service: Option, rewards_recorder_service: Option, cache_block_time_service: Option, + sample_performance_service: Option, gossip_service: GossipService, serve_repair_service: ServeRepairService, completed_data_sets_service: CompletedDataSetsService, @@ -279,6 +281,17 @@ impl Validator { let bank = bank_forks.working_bank(); let bank_forks = Arc::new(RwLock::new(bank_forks)); + let sample_performance_service = + if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history { + Some(SamplePerformanceService::new( + &bank_forks, + &blockstore, + &exit, + )) + } else { + None + }; + info!("Starting validator with working bank slot {}", bank.slot()); { let hard_forks: Vec<_> = bank.hard_forks().read().unwrap().iter().copied().collect(); @@ -554,6 +567,7 @@ impl Validator { transaction_status_service, rewards_recorder_service, cache_block_time_service, + sample_performance_service, snapshot_packager_service, completed_data_sets_service, tpu, @@ -622,6 +636,10 @@ impl Validator { cache_block_time_service.join()?; } + if let Some(sample_performance_service) = self.sample_performance_service { + sample_performance_service.join()?; + } + if let Some(s) = self.snapshot_packager_service { s.join()?; } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 1a8c3a6fb9..3b6b18caec 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -137,6 +137,7 @@ pub struct Blockstore { active_transaction_status_index: RwLock, rewards_cf: LedgerColumn, blocktime_cf: LedgerColumn, + perf_samples_cf: LedgerColumn, last_root: Arc>, insert_shreds_lock: Arc>, pub new_shreds_signals: Vec>, @@ -293,6 +294,7 @@ impl Blockstore { let transaction_status_index_cf = db.column(); let rewards_cf = db.column(); let blocktime_cf = db.column(); + let perf_samples_cf = db.column(); let db = Arc::new(db); @@ -338,6 +340,7 @@ impl Blockstore { active_transaction_status_index: RwLock::new(active_transaction_status_index), rewards_cf, blocktime_cf, + perf_samples_cf, new_shreds_signals: vec![], completed_slots_senders: vec![], insert_shreds_lock: Arc::new(Mutex::new(())), @@ -2303,6 +2306,22 @@ impl Blockstore { .collect()) } + pub fn get_recent_perf_samples(&self, num: usize) -> Result> { + Ok(self + .db + .iter::(IteratorMode::End)? + .take(num) + .map(|(slot, data)| { + let perf_sample = deserialize(&data).unwrap(); + (slot, perf_sample) + }) + .collect()) + } + + pub fn write_perf_sample(&self, index: Slot, perf_sample: &PerfSample) -> Result<()> { + self.perf_samples_cf.put(index, perf_sample) + } + /// Returns the entry vector for the slot starting with `shred_start_index` pub fn get_slot_entries(&self, slot: Slot, shred_start_index: u64) -> Result> { self.get_slot_entries_with_shred_info(slot, shred_start_index, false) @@ -6930,6 +6949,38 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } + #[test] + fn test_write_get_perf_samples() { + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + let num_entries: usize = 10; + let mut perf_samples: Vec<(Slot, PerfSample)> = vec![]; + for x in 1..num_entries + 1 { + perf_samples.push(( + x as u64 * 50, + PerfSample { + num_transactions: 1000 + x as u64, + num_slots: 50, + sample_period_secs: 20, + }, + )); + } + for (slot, sample) in perf_samples.iter() { + blockstore.write_perf_sample(*slot, sample).unwrap(); + } + for x in 0..num_entries { + let mut expected_samples = perf_samples[num_entries - 1 - x..].to_vec(); + expected_samples.sort_by(|a, b| b.0.cmp(&a.0)); + assert_eq!( + blockstore.get_recent_perf_samples(x + 1).unwrap(), + expected_samples + ); + } + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } + #[test] fn test_lowest_slot() { let blockstore_path = get_tmp_ledger_path!(); diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index db08c46cbc..5030858c8c 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -137,6 +137,10 @@ impl Blockstore { & self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) .is_ok(); let mut w_active_transaction_status_index = self.active_transaction_status_index.write().unwrap(); @@ -231,6 +235,10 @@ impl Blockstore { && self .blocktime_cf .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .perf_samples_cf + .compact_range(from_slot, to_slot) .unwrap_or(false); compact_timer.stop(); if !result { diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 772a5be76e..dfda92d79f 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -52,6 +52,8 @@ const TRANSACTION_STATUS_INDEX_CF: &str = "transaction_status_index"; const REWARDS_CF: &str = "rewards"; /// Column family for Blocktime const BLOCKTIME_CF: &str = "blocktime"; +/// Column family for Performance Samples +const PERF_SAMPLES_CF: &str = "perf_samples"; #[derive(Error, Debug)] pub enum BlockstoreError { @@ -140,6 +142,10 @@ pub mod columns { #[derive(Debug)] /// The blocktime column pub struct Blocktime; + + #[derive(Debug)] + /// The performance samples column + pub struct PerfSamples; } pub enum AccessType { @@ -200,7 +206,7 @@ impl Rocks { ) -> Result { use columns::{ AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, - Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus, + PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus, TransactionStatusIndex, }; @@ -236,6 +242,8 @@ impl Rocks { let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options()); let blocktime_cf_descriptor = ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options()); + let perf_samples_cf_descriptor = + ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options()); let cfs = vec![ (SlotMeta::NAME, meta_cf_descriptor), @@ -255,6 +263,7 @@ impl Rocks { ), (Rewards::NAME, rewards_cf_descriptor), (Blocktime::NAME, blocktime_cf_descriptor), + (PerfSamples::NAME, perf_samples_cf_descriptor), ]; // Open the database @@ -293,7 +302,7 @@ impl Rocks { fn columns(&self) -> Vec<&'static str> { use columns::{ AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, - Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus, + PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus, TransactionStatusIndex, }; @@ -312,6 +321,7 @@ impl Rocks { TransactionStatusIndex::NAME, Rewards::NAME, Blocktime::NAME, + PerfSamples::NAME, ] } @@ -545,6 +555,14 @@ impl TypedColumn for columns::Blocktime { type Type = UnixTimestamp; } +impl SlotColumn for columns::PerfSamples {} +impl ColumnName for columns::PerfSamples { + const NAME: &'static str = PERF_SAMPLES_CF; +} +impl TypedColumn for columns::PerfSamples { + type Type = blockstore_meta::PerfSample; +} + impl Column for columns::ShredCode { type Index = (u64, u64); diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 911ac375fe..01df93e85a 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -243,6 +243,13 @@ pub struct AddressSignatureMeta { pub writeable: bool, } +#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] +pub struct PerfSample { + pub num_transactions: u64, + pub num_slots: u64, + pub sample_period_secs: u16, +} + #[cfg(test)] mod test { use super::*;