Add blockstore column to store performance sampling data (bp #12251) (#12393)

* Add blockstore column to store performance sampling data (#12251)

* Add blockstore column to store performance sampling data

* introduce timer and write performance metrics to blockstore

* introduce getRecentPerformanceSamples rpc

* only run on rpc nodes enabled with transaction history

* add unit tests for get_recent_performance_samples

* remove RpcResponse from rpc call

* refactor to use Instant::now and elapsed for timer

* switch to root bank and ensure not negative subraction

* Add PerfSamples to purge/compaction

* refactor to use Instant::now and elapsed for timer

* switch to root bank and ensure not negative subraction

* remove duplicate constants

Co-authored-by: Tyera Eulberg <tyera@solana.com>
(cherry picked from commit 65a6bfad09)

# Conflicts:
#	core/src/validator.rs
#	ledger/src/blockstore.rs

* merge cherry pick of 65a6bfad0

Co-authored-by: Josh <josh.hundley@gmail.com>
This commit is contained in:
mergify[bot]
2020-09-22 22:00:51 +00:00
committed by GitHub
parent 339e72d8d2
commit 65e4aac306
8 changed files with 312 additions and 3 deletions

View File

@@ -15,6 +15,7 @@ pub mod cluster_info_vote_listener;
pub mod commitment_service;
pub mod completed_data_sets_service;
mod deprecated;
pub mod sample_performance_service;
pub mod shred_fetch_stage;
#[macro_use]
pub mod contact_info;

View File

@@ -33,7 +33,10 @@ use solana_client::{
rpc_response::*,
};
use solana_faucet::faucet::request_airdrop_transaction;
use solana_ledger::{blockstore::Blockstore, blockstore_db::BlockstoreError, get_tmp_ledger_path};
use solana_ledger::{
blockstore::Blockstore, blockstore_db::BlockstoreError, blockstore_meta::PerfSample,
get_tmp_ledger_path,
};
use solana_perf::packet::PACKET_DATA_SIZE;
use solana_runtime::{
accounts::AccountAddressFilter,
@@ -80,6 +83,7 @@ use std::{
use tokio::runtime;
pub const MAX_REQUEST_PAYLOAD_SIZE: usize = 50 * (1 << 10); // 50kB
pub const PERFORMANCE_SAMPLES_LIMIT: usize = 720;
fn new_response<T>(bank: &Bank, value: T) -> RpcResponse<T> {
let context = RpcResponseContext { slot: bank.slot() };
@@ -1513,6 +1517,13 @@ pub trait RpcSol {
#[rpc(meta, name = "getClusterNodes")]
fn get_cluster_nodes(&self, meta: Self::Metadata) -> Result<Vec<RpcContactInfo>>;
#[rpc(meta, name = "getRecentPerformanceSamples")]
fn get_recent_performance_samples(
&self,
meta: Self::Metadata,
limit: Option<usize>,
) -> Result<Vec<(Slot, PerfSample)>>;
#[rpc(meta, name = "getEpochInfo")]
fn get_epoch_info(
&self,
@@ -1902,6 +1913,30 @@ impl RpcSol for RpcSolImpl {
Ok(meta.get_balance(&pubkey, commitment))
}
fn get_recent_performance_samples(
&self,
meta: Self::Metadata,
limit: Option<usize>,
) -> Result<Vec<(Slot, PerfSample)>> {
debug!("get_recent_performance_samples request received");
let limit = limit.unwrap_or(PERFORMANCE_SAMPLES_LIMIT);
if limit > PERFORMANCE_SAMPLES_LIMIT {
return Err(Error::invalid_params(format!(
"Invalid limit; max {}",
PERFORMANCE_SAMPLES_LIMIT
)));
}
meta.blockstore
.get_recent_perf_samples(limit)
.map_err(|err| {
warn!("get_recent_performance_samples failed: {:?}", err);
Error::invalid_request()
})
}
fn get_cluster_nodes(&self, meta: Self::Metadata) -> Result<Vec<RpcContactInfo>> {
debug!("get_cluster_nodes rpc request received");
let cluster_info = &meta.cluster_info;
@@ -2686,6 +2721,16 @@ pub mod tests {
&socketaddr!("127.0.0.1:1234"),
));
let sample1 = PerfSample {
num_slots: 1,
num_transactions: 4,
sample_period_secs: 60,
};
blockstore
.write_perf_sample(0, &sample1)
.expect("write to blockstore");
let (meta, receiver) = JsonRpcRequestProcessor::new(
JsonRpcConfig {
enable_rpc_transaction_history: true,
@@ -2813,6 +2858,63 @@ pub mod tests {
assert_eq!(expected, result);
}
#[test]
fn test_rpc_get_recent_performance_samples() {
let bob_pubkey = Pubkey::new_rand();
let RpcHandler { io, meta, .. } = start_rpc_handler_with_tx(&bob_pubkey);
let req = r#"{"jsonrpc":"2.0","id":1,"method":"getRecentPerformanceSamples"}"#;
let res = io.handle_request_sync(&req, meta);
let result: Response = serde_json::from_str(&res.expect("actual response"))
.expect("actual response deserialization");
let expected = json!({
"jsonrpc": "2.0",
"id": 1,
"result": [
[
0,
{
"num_slots": 1,
"num_transactions": 4,
"sample_period_secs": 60
}
]
],
});
let expected: Response =
serde_json::from_value(expected).expect("expected response deserialization");
assert_eq!(expected, result);
}
#[test]
fn test_rpc_get_recent_performance_samples_invalid_limit() {
let bob_pubkey = Pubkey::new_rand();
let RpcHandler { io, meta, .. } = start_rpc_handler_with_tx(&bob_pubkey);
let req =
r#"{"jsonrpc":"2.0","id":1,"method":"getRecentPerformanceSamples","params":[10000]}"#;
let res = io.handle_request_sync(&req, meta);
let result: Response = serde_json::from_str(&res.expect("actual response"))
.expect("actual response deserialization");
let expected = json!({
"jsonrpc": "2.0",
"error": {
"code": -32602,
"message": "Invalid limit; max 720"
},
"id": 1
});
let expected: Response =
serde_json::from_value(expected).expect("expected response deserialization");
assert_eq!(expected, result);
}
#[test]
fn test_rpc_get_slot_leader() {
let bob_pubkey = Pubkey::new_rand();

View File

@@ -0,0 +1,104 @@
use solana_ledger::{blockstore::Blockstore, blockstore_meta::PerfSample};
use solana_runtime::bank_forks::BankForks;
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{self, sleep, Builder, JoinHandle},
time::{Duration, Instant},
};
const SAMPLE_INTERVAL: u64 = 60;
const SLEEP_INTERVAL: u64 = 500;
pub struct SamplePerformanceSnapshot {
pub num_transactions: u64,
pub num_slots: u64,
}
pub struct SamplePerformanceService {
thread_hdl: JoinHandle<()>,
}
impl SamplePerformanceService {
#[allow(clippy::new_ret_no_self)]
pub fn new(
bank_forks: &Arc<RwLock<BankForks>>,
blockstore: &Arc<Blockstore>,
exit: &Arc<AtomicBool>,
) -> Self {
let exit = exit.clone();
let blockstore = blockstore.clone();
let bank_forks = bank_forks.clone();
info!("Starting SamplePerformance service");
let thread_hdl = Builder::new()
.name("sample-performance".to_string())
.spawn(move || {
Self::run(bank_forks, &blockstore, exit);
})
.unwrap();
Self { thread_hdl }
}
pub fn run(
bank_forks: Arc<RwLock<BankForks>>,
blockstore: &Arc<Blockstore>,
exit: Arc<AtomicBool>,
) {
let forks = bank_forks.read().unwrap();
let bank = forks.root_bank().clone();
let highest_slot = forks.highest_slot();
drop(forks);
let mut sample_snapshot = SamplePerformanceSnapshot {
num_transactions: bank.transaction_count(),
num_slots: highest_slot,
};
let mut now = Instant::now();
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let elapsed = now.elapsed();
if elapsed.as_secs() >= SAMPLE_INTERVAL {
now = Instant::now();
let bank_forks = bank_forks.read().unwrap();
let bank = bank_forks.root_bank().clone();
let highest_slot = bank_forks.highest_slot();
drop(bank_forks);
let perf_sample = PerfSample {
num_slots: highest_slot
.checked_sub(sample_snapshot.num_slots)
.unwrap_or_default(),
num_transactions: bank
.transaction_count()
.checked_sub(sample_snapshot.num_transactions)
.unwrap_or_default(),
sample_period_secs: elapsed.as_secs() as u16,
};
if let Err(e) = blockstore.write_perf_sample(highest_slot, &perf_sample) {
error!("write_perf_sample failed: slot {:?} {:?}", highest_slot, e);
}
sample_snapshot = SamplePerformanceSnapshot {
num_transactions: bank.transaction_count(),
num_slots: highest_slot,
};
}
sleep(Duration::from_millis(SLEEP_INTERVAL));
}
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}

View File

@@ -14,6 +14,7 @@ use crate::{
rpc_pubsub_service::PubSubService,
rpc_service::JsonRpcService,
rpc_subscriptions::RpcSubscriptions,
sample_performance_service::SamplePerformanceService,
serve_repair::ServeRepair,
serve_repair_service::ServeRepairService,
sigverify,
@@ -162,6 +163,7 @@ pub struct Validator {
rpc_service: Option<(JsonRpcService, PubSubService, RpcBanksService)>,
transaction_status_service: Option<TransactionStatusService>,
rewards_recorder_service: Option<RewardsRecorderService>,
sample_performance_service: Option<SamplePerformanceService>,
gossip_service: GossipService,
serve_repair_service: ServeRepairService,
completed_data_sets_service: CompletedDataSetsService,
@@ -262,6 +264,17 @@ impl Validator {
let bank = bank_forks.working_bank();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let sample_performance_service =
if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
Some(SamplePerformanceService::new(
&bank_forks,
&blockstore,
&exit,
))
} else {
None
};
info!("Starting validator with working bank slot {}", bank.slot());
{
let hard_forks: Vec<_> = bank.hard_forks().read().unwrap().iter().copied().collect();
@@ -534,6 +547,7 @@ impl Validator {
rpc_service,
transaction_status_service,
rewards_recorder_service,
sample_performance_service,
snapshot_packager_service,
completed_data_sets_service,
tpu,
@@ -598,6 +612,10 @@ impl Validator {
rewards_recorder_service.join()?;
}
if let Some(sample_performance_service) = self.sample_performance_service {
sample_performance_service.join()?;
}
if let Some(s) = self.snapshot_packager_service {
s.join()?;
}

View File

@@ -137,6 +137,7 @@ pub struct Blockstore {
active_transaction_status_index: RwLock<u64>,
rewards_cf: LedgerColumn<cf::Rewards>,
_blocktime_cf: LedgerColumn<cf::Blocktime>,
perf_samples_cf: LedgerColumn<cf::PerfSamples>,
last_root: Arc<RwLock<Slot>>,
insert_shreds_lock: Arc<Mutex<()>>,
pub new_shreds_signals: Vec<SyncSender<bool>>,
@@ -295,6 +296,7 @@ impl Blockstore {
// This column is created (but never populated) in order to maintain compatibility with
// newer versions of Blockstore.
let _blocktime_cf = db.column();
let perf_samples_cf = db.column();
let db = Arc::new(db);
@@ -340,6 +342,7 @@ impl Blockstore {
active_transaction_status_index: RwLock::new(active_transaction_status_index),
rewards_cf,
_blocktime_cf,
perf_samples_cf,
new_shreds_signals: vec![],
completed_slots_senders: vec![],
insert_shreds_lock: Arc::new(Mutex::new(())),
@@ -2316,6 +2319,22 @@ impl Blockstore {
.collect())
}
pub fn get_recent_perf_samples(&self, num: usize) -> Result<Vec<(Slot, PerfSample)>> {
Ok(self
.db
.iter::<cf::PerfSamples>(IteratorMode::End)?
.take(num)
.map(|(slot, data)| {
let perf_sample = deserialize(&data).unwrap();
(slot, perf_sample)
})
.collect())
}
pub fn write_perf_sample(&self, index: Slot, perf_sample: &PerfSample) -> Result<()> {
self.perf_samples_cf.put(index, perf_sample)
}
/// Returns the entry vector for the slot starting with `shred_start_index`
pub fn get_slot_entries(&self, slot: Slot, shred_start_index: u64) -> Result<Vec<Entry>> {
self.get_slot_entries_with_shred_info(slot, shred_start_index, false)
@@ -6895,6 +6914,38 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_write_get_perf_samples() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let num_entries: usize = 10;
let mut perf_samples: Vec<(Slot, PerfSample)> = vec![];
for x in 1..num_entries + 1 {
perf_samples.push((
x as u64 * 50,
PerfSample {
num_transactions: 1000 + x as u64,
num_slots: 50,
sample_period_secs: 20,
},
));
}
for (slot, sample) in perf_samples.iter() {
blockstore.write_perf_sample(*slot, sample).unwrap();
}
for x in 0..num_entries {
let mut expected_samples = perf_samples[num_entries - 1 - x..].to_vec();
expected_samples.sort_by(|a, b| b.0.cmp(&a.0));
assert_eq!(
blockstore.get_recent_perf_samples(x + 1).unwrap(),
expected_samples
);
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_lowest_slot() {
let blockstore_path = get_tmp_ledger_path!();

View File

@@ -137,6 +137,10 @@ impl Blockstore {
& self
.db
.delete_range_cf::<cf::Blocktime>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::PerfSamples>(&mut write_batch, from_slot, to_slot)
.is_ok();
let mut w_active_transaction_status_index =
self.active_transaction_status_index.write().unwrap();
@@ -231,6 +235,10 @@ impl Blockstore {
&& self
._blocktime_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.perf_samples_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false);
compact_timer.stop();
if !result {

View File

@@ -52,6 +52,8 @@ const TRANSACTION_STATUS_INDEX_CF: &str = "transaction_status_index";
const REWARDS_CF: &str = "rewards";
/// Column family for Blocktime
const BLOCKTIME_CF: &str = "blocktime";
/// Column family for Performance Samples
const PERF_SAMPLES_CF: &str = "perf_samples";
#[derive(Error, Debug)]
pub enum BlockstoreError {
@@ -138,6 +140,10 @@ pub mod columns {
#[derive(Debug)]
/// The blocktime column
pub struct Blocktime;
#[derive(Debug)]
/// The performance samples column
pub struct PerfSamples;
}
pub enum AccessType {
@@ -198,7 +204,7 @@ impl Rocks {
) -> Result<Rocks> {
use columns::{
AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans,
Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
TransactionStatusIndex,
};
@@ -234,6 +240,8 @@ impl Rocks {
let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options());
let blocktime_cf_descriptor =
ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options());
let perf_samples_cf_descriptor =
ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options());
let cfs = vec![
(SlotMeta::NAME, meta_cf_descriptor),
@@ -253,6 +261,7 @@ impl Rocks {
),
(Rewards::NAME, rewards_cf_descriptor),
(Blocktime::NAME, blocktime_cf_descriptor),
(PerfSamples::NAME, perf_samples_cf_descriptor),
];
// Open the database
@@ -291,7 +300,7 @@ impl Rocks {
fn columns(&self) -> Vec<&'static str> {
use columns::{
AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans,
Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
TransactionStatusIndex,
};
@@ -310,6 +319,7 @@ impl Rocks {
TransactionStatusIndex::NAME,
Rewards::NAME,
Blocktime::NAME,
PerfSamples::NAME,
]
}
@@ -543,6 +553,14 @@ impl TypedColumn for columns::Blocktime {
type Type = UnixTimestamp;
}
impl SlotColumn for columns::PerfSamples {}
impl ColumnName for columns::PerfSamples {
const NAME: &'static str = PERF_SAMPLES_CF;
}
impl TypedColumn for columns::PerfSamples {
type Type = blockstore_meta::PerfSample;
}
impl Column for columns::ShredCode {
type Index = (u64, u64);

View File

@@ -243,6 +243,13 @@ pub struct AddressSignatureMeta {
pub writeable: bool,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct PerfSample {
pub num_transactions: u64,
pub num_slots: u64,
pub sample_period_secs: u16,
}
#[cfg(test)]
mod test {
use super::*;