Add blockstore column to store performance sampling data (#12251)

* Add blockstore column to store performance sampling data

* introduce timer and write performance metrics to blockstore

* introduce getRecentPerformanceSamples rpc

* only run on rpc nodes enabled with transaction history

* add unit tests for get_recent_performance_samples

* remove RpcResponse from rpc call

* refactor to use Instant::now and elapsed for timer

* switch to root bank and ensure not negative subraction

* Add PerfSamples to purge/compaction

* refactor to use Instant::now and elapsed for timer

* switch to root bank and ensure not negative subraction

* remove duplicate constants

Co-authored-by: Tyera Eulberg <tyera@solana.com>
This commit is contained in:
Josh
2020-09-22 12:26:32 -07:00
committed by GitHub
parent afd9bfc45f
commit 65a6bfad09
8 changed files with 312 additions and 3 deletions

View File

@ -16,6 +16,7 @@ pub mod cluster_info_vote_listener;
pub mod commitment_service; pub mod commitment_service;
pub mod completed_data_sets_service; pub mod completed_data_sets_service;
mod deprecated; mod deprecated;
pub mod sample_performance_service;
pub mod shred_fetch_stage; pub mod shred_fetch_stage;
#[macro_use] #[macro_use]
pub mod contact_info; pub mod contact_info;

View File

@ -33,7 +33,10 @@ use solana_client::{
rpc_response::*, rpc_response::*,
}; };
use solana_faucet::faucet::request_airdrop_transaction; use solana_faucet::faucet::request_airdrop_transaction;
use solana_ledger::{blockstore::Blockstore, blockstore_db::BlockstoreError, get_tmp_ledger_path}; use solana_ledger::{
blockstore::Blockstore, blockstore_db::BlockstoreError, blockstore_meta::PerfSample,
get_tmp_ledger_path,
};
use solana_perf::packet::PACKET_DATA_SIZE; use solana_perf::packet::PACKET_DATA_SIZE;
use solana_runtime::{ use solana_runtime::{
accounts::AccountAddressFilter, accounts::AccountAddressFilter,
@ -79,6 +82,7 @@ use std::{
use tokio::runtime; use tokio::runtime;
pub const MAX_REQUEST_PAYLOAD_SIZE: usize = 50 * (1 << 10); // 50kB pub const MAX_REQUEST_PAYLOAD_SIZE: usize = 50 * (1 << 10); // 50kB
pub const PERFORMANCE_SAMPLES_LIMIT: usize = 720;
fn new_response<T>(bank: &Bank, value: T) -> RpcResponse<T> { fn new_response<T>(bank: &Bank, value: T) -> RpcResponse<T> {
let context = RpcResponseContext { slot: bank.slot() }; let context = RpcResponseContext { slot: bank.slot() };
@ -1496,6 +1500,13 @@ pub trait RpcSol {
#[rpc(meta, name = "getClusterNodes")] #[rpc(meta, name = "getClusterNodes")]
fn get_cluster_nodes(&self, meta: Self::Metadata) -> Result<Vec<RpcContactInfo>>; fn get_cluster_nodes(&self, meta: Self::Metadata) -> Result<Vec<RpcContactInfo>>;
#[rpc(meta, name = "getRecentPerformanceSamples")]
fn get_recent_performance_samples(
&self,
meta: Self::Metadata,
limit: Option<usize>,
) -> Result<Vec<(Slot, PerfSample)>>;
#[rpc(meta, name = "getEpochInfo")] #[rpc(meta, name = "getEpochInfo")]
fn get_epoch_info( fn get_epoch_info(
&self, &self,
@ -1885,6 +1896,30 @@ impl RpcSol for RpcSolImpl {
Ok(meta.get_balance(&pubkey, commitment)) Ok(meta.get_balance(&pubkey, commitment))
} }
fn get_recent_performance_samples(
&self,
meta: Self::Metadata,
limit: Option<usize>,
) -> Result<Vec<(Slot, PerfSample)>> {
debug!("get_recent_performance_samples request received");
let limit = limit.unwrap_or(PERFORMANCE_SAMPLES_LIMIT);
if limit > PERFORMANCE_SAMPLES_LIMIT {
return Err(Error::invalid_params(format!(
"Invalid limit; max {}",
PERFORMANCE_SAMPLES_LIMIT
)));
}
meta.blockstore
.get_recent_perf_samples(limit)
.map_err(|err| {
warn!("get_recent_performance_samples failed: {:?}", err);
Error::invalid_request()
})
}
fn get_cluster_nodes(&self, meta: Self::Metadata) -> Result<Vec<RpcContactInfo>> { fn get_cluster_nodes(&self, meta: Self::Metadata) -> Result<Vec<RpcContactInfo>> {
debug!("get_cluster_nodes rpc request received"); debug!("get_cluster_nodes rpc request received");
let cluster_info = &meta.cluster_info; let cluster_info = &meta.cluster_info;
@ -2670,6 +2705,16 @@ pub mod tests {
&socketaddr!("127.0.0.1:1234"), &socketaddr!("127.0.0.1:1234"),
)); ));
let sample1 = PerfSample {
num_slots: 1,
num_transactions: 4,
sample_period_secs: 60,
};
blockstore
.write_perf_sample(0, &sample1)
.expect("write to blockstore");
let (meta, receiver) = JsonRpcRequestProcessor::new( let (meta, receiver) = JsonRpcRequestProcessor::new(
JsonRpcConfig { JsonRpcConfig {
enable_rpc_transaction_history: true, enable_rpc_transaction_history: true,
@ -2797,6 +2842,63 @@ pub mod tests {
assert_eq!(expected, result); assert_eq!(expected, result);
} }
#[test]
fn test_rpc_get_recent_performance_samples() {
let bob_pubkey = Pubkey::new_rand();
let RpcHandler { io, meta, .. } = start_rpc_handler_with_tx(&bob_pubkey);
let req = r#"{"jsonrpc":"2.0","id":1,"method":"getRecentPerformanceSamples"}"#;
let res = io.handle_request_sync(&req, meta);
let result: Response = serde_json::from_str(&res.expect("actual response"))
.expect("actual response deserialization");
let expected = json!({
"jsonrpc": "2.0",
"id": 1,
"result": [
[
0,
{
"num_slots": 1,
"num_transactions": 4,
"sample_period_secs": 60
}
]
],
});
let expected: Response =
serde_json::from_value(expected).expect("expected response deserialization");
assert_eq!(expected, result);
}
#[test]
fn test_rpc_get_recent_performance_samples_invalid_limit() {
let bob_pubkey = Pubkey::new_rand();
let RpcHandler { io, meta, .. } = start_rpc_handler_with_tx(&bob_pubkey);
let req =
r#"{"jsonrpc":"2.0","id":1,"method":"getRecentPerformanceSamples","params":[10000]}"#;
let res = io.handle_request_sync(&req, meta);
let result: Response = serde_json::from_str(&res.expect("actual response"))
.expect("actual response deserialization");
let expected = json!({
"jsonrpc": "2.0",
"error": {
"code": -32602,
"message": "Invalid limit; max 720"
},
"id": 1
});
let expected: Response =
serde_json::from_value(expected).expect("expected response deserialization");
assert_eq!(expected, result);
}
#[test] #[test]
fn test_rpc_get_slot_leader() { fn test_rpc_get_slot_leader() {
let bob_pubkey = Pubkey::new_rand(); let bob_pubkey = Pubkey::new_rand();

View File

@ -0,0 +1,104 @@
use solana_ledger::{blockstore::Blockstore, blockstore_meta::PerfSample};
use solana_runtime::bank_forks::BankForks;
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{self, sleep, Builder, JoinHandle},
time::{Duration, Instant},
};
const SAMPLE_INTERVAL: u64 = 60;
const SLEEP_INTERVAL: u64 = 500;
pub struct SamplePerformanceSnapshot {
pub num_transactions: u64,
pub num_slots: u64,
}
pub struct SamplePerformanceService {
thread_hdl: JoinHandle<()>,
}
impl SamplePerformanceService {
#[allow(clippy::new_ret_no_self)]
pub fn new(
bank_forks: &Arc<RwLock<BankForks>>,
blockstore: &Arc<Blockstore>,
exit: &Arc<AtomicBool>,
) -> Self {
let exit = exit.clone();
let blockstore = blockstore.clone();
let bank_forks = bank_forks.clone();
info!("Starting SamplePerformance service");
let thread_hdl = Builder::new()
.name("sample-performance".to_string())
.spawn(move || {
Self::run(bank_forks, &blockstore, exit);
})
.unwrap();
Self { thread_hdl }
}
pub fn run(
bank_forks: Arc<RwLock<BankForks>>,
blockstore: &Arc<Blockstore>,
exit: Arc<AtomicBool>,
) {
let forks = bank_forks.read().unwrap();
let bank = forks.root_bank().clone();
let highest_slot = forks.highest_slot();
drop(forks);
let mut sample_snapshot = SamplePerformanceSnapshot {
num_transactions: bank.transaction_count(),
num_slots: highest_slot,
};
let mut now = Instant::now();
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let elapsed = now.elapsed();
if elapsed.as_secs() >= SAMPLE_INTERVAL {
now = Instant::now();
let bank_forks = bank_forks.read().unwrap();
let bank = bank_forks.root_bank().clone();
let highest_slot = bank_forks.highest_slot();
drop(bank_forks);
let perf_sample = PerfSample {
num_slots: highest_slot
.checked_sub(sample_snapshot.num_slots)
.unwrap_or_default(),
num_transactions: bank
.transaction_count()
.checked_sub(sample_snapshot.num_transactions)
.unwrap_or_default(),
sample_period_secs: elapsed.as_secs() as u16,
};
if let Err(e) = blockstore.write_perf_sample(highest_slot, &perf_sample) {
error!("write_perf_sample failed: slot {:?} {:?}", highest_slot, e);
}
sample_snapshot = SamplePerformanceSnapshot {
num_transactions: bank.transaction_count(),
num_slots: highest_slot,
};
}
sleep(Duration::from_millis(SLEEP_INTERVAL));
}
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}

View File

@ -16,6 +16,7 @@ use crate::{
rpc_pubsub_service::PubSubService, rpc_pubsub_service::PubSubService,
rpc_service::JsonRpcService, rpc_service::JsonRpcService,
rpc_subscriptions::RpcSubscriptions, rpc_subscriptions::RpcSubscriptions,
sample_performance_service::SamplePerformanceService,
serve_repair::ServeRepair, serve_repair::ServeRepair,
serve_repair_service::ServeRepairService, serve_repair_service::ServeRepairService,
sigverify, sigverify,
@ -169,6 +170,7 @@ pub struct Validator {
transaction_status_service: Option<TransactionStatusService>, transaction_status_service: Option<TransactionStatusService>,
rewards_recorder_service: Option<RewardsRecorderService>, rewards_recorder_service: Option<RewardsRecorderService>,
cache_block_time_service: Option<CacheBlockTimeService>, cache_block_time_service: Option<CacheBlockTimeService>,
sample_performance_service: Option<SamplePerformanceService>,
gossip_service: GossipService, gossip_service: GossipService,
serve_repair_service: ServeRepairService, serve_repair_service: ServeRepairService,
completed_data_sets_service: CompletedDataSetsService, completed_data_sets_service: CompletedDataSetsService,
@ -279,6 +281,17 @@ impl Validator {
let bank = bank_forks.working_bank(); let bank = bank_forks.working_bank();
let bank_forks = Arc::new(RwLock::new(bank_forks)); let bank_forks = Arc::new(RwLock::new(bank_forks));
let sample_performance_service =
if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
Some(SamplePerformanceService::new(
&bank_forks,
&blockstore,
&exit,
))
} else {
None
};
info!("Starting validator with working bank slot {}", bank.slot()); info!("Starting validator with working bank slot {}", bank.slot());
{ {
let hard_forks: Vec<_> = bank.hard_forks().read().unwrap().iter().copied().collect(); let hard_forks: Vec<_> = bank.hard_forks().read().unwrap().iter().copied().collect();
@ -554,6 +567,7 @@ impl Validator {
transaction_status_service, transaction_status_service,
rewards_recorder_service, rewards_recorder_service,
cache_block_time_service, cache_block_time_service,
sample_performance_service,
snapshot_packager_service, snapshot_packager_service,
completed_data_sets_service, completed_data_sets_service,
tpu, tpu,
@ -622,6 +636,10 @@ impl Validator {
cache_block_time_service.join()?; cache_block_time_service.join()?;
} }
if let Some(sample_performance_service) = self.sample_performance_service {
sample_performance_service.join()?;
}
if let Some(s) = self.snapshot_packager_service { if let Some(s) = self.snapshot_packager_service {
s.join()?; s.join()?;
} }

View File

@ -137,6 +137,7 @@ pub struct Blockstore {
active_transaction_status_index: RwLock<u64>, active_transaction_status_index: RwLock<u64>,
rewards_cf: LedgerColumn<cf::Rewards>, rewards_cf: LedgerColumn<cf::Rewards>,
blocktime_cf: LedgerColumn<cf::Blocktime>, blocktime_cf: LedgerColumn<cf::Blocktime>,
perf_samples_cf: LedgerColumn<cf::PerfSamples>,
last_root: Arc<RwLock<Slot>>, last_root: Arc<RwLock<Slot>>,
insert_shreds_lock: Arc<Mutex<()>>, insert_shreds_lock: Arc<Mutex<()>>,
pub new_shreds_signals: Vec<SyncSender<bool>>, pub new_shreds_signals: Vec<SyncSender<bool>>,
@ -293,6 +294,7 @@ impl Blockstore {
let transaction_status_index_cf = db.column(); let transaction_status_index_cf = db.column();
let rewards_cf = db.column(); let rewards_cf = db.column();
let blocktime_cf = db.column(); let blocktime_cf = db.column();
let perf_samples_cf = db.column();
let db = Arc::new(db); let db = Arc::new(db);
@ -338,6 +340,7 @@ impl Blockstore {
active_transaction_status_index: RwLock::new(active_transaction_status_index), active_transaction_status_index: RwLock::new(active_transaction_status_index),
rewards_cf, rewards_cf,
blocktime_cf, blocktime_cf,
perf_samples_cf,
new_shreds_signals: vec![], new_shreds_signals: vec![],
completed_slots_senders: vec![], completed_slots_senders: vec![],
insert_shreds_lock: Arc::new(Mutex::new(())), insert_shreds_lock: Arc::new(Mutex::new(())),
@ -2303,6 +2306,22 @@ impl Blockstore {
.collect()) .collect())
} }
pub fn get_recent_perf_samples(&self, num: usize) -> Result<Vec<(Slot, PerfSample)>> {
Ok(self
.db
.iter::<cf::PerfSamples>(IteratorMode::End)?
.take(num)
.map(|(slot, data)| {
let perf_sample = deserialize(&data).unwrap();
(slot, perf_sample)
})
.collect())
}
pub fn write_perf_sample(&self, index: Slot, perf_sample: &PerfSample) -> Result<()> {
self.perf_samples_cf.put(index, perf_sample)
}
/// Returns the entry vector for the slot starting with `shred_start_index` /// Returns the entry vector for the slot starting with `shred_start_index`
pub fn get_slot_entries(&self, slot: Slot, shred_start_index: u64) -> Result<Vec<Entry>> { pub fn get_slot_entries(&self, slot: Slot, shred_start_index: u64) -> Result<Vec<Entry>> {
self.get_slot_entries_with_shred_info(slot, shred_start_index, false) self.get_slot_entries_with_shred_info(slot, shred_start_index, false)
@ -6930,6 +6949,38 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
} }
#[test]
fn test_write_get_perf_samples() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let num_entries: usize = 10;
let mut perf_samples: Vec<(Slot, PerfSample)> = vec![];
for x in 1..num_entries + 1 {
perf_samples.push((
x as u64 * 50,
PerfSample {
num_transactions: 1000 + x as u64,
num_slots: 50,
sample_period_secs: 20,
},
));
}
for (slot, sample) in perf_samples.iter() {
blockstore.write_perf_sample(*slot, sample).unwrap();
}
for x in 0..num_entries {
let mut expected_samples = perf_samples[num_entries - 1 - x..].to_vec();
expected_samples.sort_by(|a, b| b.0.cmp(&a.0));
assert_eq!(
blockstore.get_recent_perf_samples(x + 1).unwrap(),
expected_samples
);
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test] #[test]
fn test_lowest_slot() { fn test_lowest_slot() {
let blockstore_path = get_tmp_ledger_path!(); let blockstore_path = get_tmp_ledger_path!();

View File

@ -137,6 +137,10 @@ impl Blockstore {
& self & self
.db .db
.delete_range_cf::<cf::Blocktime>(&mut write_batch, from_slot, to_slot) .delete_range_cf::<cf::Blocktime>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::PerfSamples>(&mut write_batch, from_slot, to_slot)
.is_ok(); .is_ok();
let mut w_active_transaction_status_index = let mut w_active_transaction_status_index =
self.active_transaction_status_index.write().unwrap(); self.active_transaction_status_index.write().unwrap();
@ -231,6 +235,10 @@ impl Blockstore {
&& self && self
.blocktime_cf .blocktime_cf
.compact_range(from_slot, to_slot) .compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.perf_samples_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false); .unwrap_or(false);
compact_timer.stop(); compact_timer.stop();
if !result { if !result {

View File

@ -52,6 +52,8 @@ const TRANSACTION_STATUS_INDEX_CF: &str = "transaction_status_index";
const REWARDS_CF: &str = "rewards"; const REWARDS_CF: &str = "rewards";
/// Column family for Blocktime /// Column family for Blocktime
const BLOCKTIME_CF: &str = "blocktime"; const BLOCKTIME_CF: &str = "blocktime";
/// Column family for Performance Samples
const PERF_SAMPLES_CF: &str = "perf_samples";
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum BlockstoreError { pub enum BlockstoreError {
@ -140,6 +142,10 @@ pub mod columns {
#[derive(Debug)] #[derive(Debug)]
/// The blocktime column /// The blocktime column
pub struct Blocktime; pub struct Blocktime;
#[derive(Debug)]
/// The performance samples column
pub struct PerfSamples;
} }
pub enum AccessType { pub enum AccessType {
@ -200,7 +206,7 @@ impl Rocks {
) -> Result<Rocks> { ) -> Result<Rocks> {
use columns::{ use columns::{
AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans,
Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus, PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
TransactionStatusIndex, TransactionStatusIndex,
}; };
@ -236,6 +242,8 @@ impl Rocks {
let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options()); let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options());
let blocktime_cf_descriptor = let blocktime_cf_descriptor =
ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options()); ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options());
let perf_samples_cf_descriptor =
ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options());
let cfs = vec![ let cfs = vec![
(SlotMeta::NAME, meta_cf_descriptor), (SlotMeta::NAME, meta_cf_descriptor),
@ -255,6 +263,7 @@ impl Rocks {
), ),
(Rewards::NAME, rewards_cf_descriptor), (Rewards::NAME, rewards_cf_descriptor),
(Blocktime::NAME, blocktime_cf_descriptor), (Blocktime::NAME, blocktime_cf_descriptor),
(PerfSamples::NAME, perf_samples_cf_descriptor),
]; ];
// Open the database // Open the database
@ -293,7 +302,7 @@ impl Rocks {
fn columns(&self) -> Vec<&'static str> { fn columns(&self) -> Vec<&'static str> {
use columns::{ use columns::{
AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans,
Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus, PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
TransactionStatusIndex, TransactionStatusIndex,
}; };
@ -312,6 +321,7 @@ impl Rocks {
TransactionStatusIndex::NAME, TransactionStatusIndex::NAME,
Rewards::NAME, Rewards::NAME,
Blocktime::NAME, Blocktime::NAME,
PerfSamples::NAME,
] ]
} }
@ -545,6 +555,14 @@ impl TypedColumn for columns::Blocktime {
type Type = UnixTimestamp; type Type = UnixTimestamp;
} }
impl SlotColumn for columns::PerfSamples {}
impl ColumnName for columns::PerfSamples {
const NAME: &'static str = PERF_SAMPLES_CF;
}
impl TypedColumn for columns::PerfSamples {
type Type = blockstore_meta::PerfSample;
}
impl Column for columns::ShredCode { impl Column for columns::ShredCode {
type Index = (u64, u64); type Index = (u64, u64);

View File

@ -243,6 +243,13 @@ pub struct AddressSignatureMeta {
pub writeable: bool, pub writeable: bool,
} }
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct PerfSample {
pub num_transactions: u64,
pub num_slots: u64,
pub sample_period_secs: u16,
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;