Accountsdb plugin metrics (#20606)
Added metrics for accountsdb plugin Handle and log postgres db errors Print account pubkeys nicely in logging
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@ -4265,6 +4265,7 @@ dependencies = [
|
|||||||
"serde_json",
|
"serde_json",
|
||||||
"solana-accountsdb-plugin-interface",
|
"solana-accountsdb-plugin-interface",
|
||||||
"solana-logger 1.9.0",
|
"solana-logger 1.9.0",
|
||||||
|
"solana-measure",
|
||||||
"solana-metrics",
|
"solana-metrics",
|
||||||
"solana-rpc",
|
"solana-rpc",
|
||||||
"solana-runtime",
|
"solana-runtime",
|
||||||
@ -4286,6 +4287,8 @@ dependencies = [
|
|||||||
"serde_json",
|
"serde_json",
|
||||||
"solana-accountsdb-plugin-interface",
|
"solana-accountsdb-plugin-interface",
|
||||||
"solana-logger 1.9.0",
|
"solana-logger 1.9.0",
|
||||||
|
"solana-metrics",
|
||||||
|
"solana-sdk",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@ serde_derive = "1.0.103"
|
|||||||
serde_json = "1.0.67"
|
serde_json = "1.0.67"
|
||||||
solana-accountsdb-plugin-interface = { path = "../accountsdb-plugin-interface", version = "=1.9.0" }
|
solana-accountsdb-plugin-interface = { path = "../accountsdb-plugin-interface", version = "=1.9.0" }
|
||||||
solana-logger = { path = "../logger", version = "=1.9.0" }
|
solana-logger = { path = "../logger", version = "=1.9.0" }
|
||||||
|
solana-measure = { path = "../measure", version = "=1.9.0" }
|
||||||
solana-metrics = { path = "../metrics", version = "=1.9.0" }
|
solana-metrics = { path = "../metrics", version = "=1.9.0" }
|
||||||
solana-rpc = { path = "../rpc", version = "=1.9.0" }
|
solana-rpc = { path = "../rpc", version = "=1.9.0" }
|
||||||
solana-runtime = { path = "../runtime", version = "=1.9.0" }
|
solana-runtime = { path = "../runtime", version = "=1.9.0" }
|
||||||
|
@ -5,6 +5,8 @@ use {
|
|||||||
solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
|
solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
|
||||||
ReplicaAccountInfo, ReplicaAccountInfoVersions, SlotStatus,
|
ReplicaAccountInfo, ReplicaAccountInfoVersions, SlotStatus,
|
||||||
},
|
},
|
||||||
|
solana_measure::measure::Measure,
|
||||||
|
solana_metrics::*,
|
||||||
solana_runtime::{
|
solana_runtime::{
|
||||||
accounts_update_notifier_interface::AccountsUpdateNotifierInterface,
|
accounts_update_notifier_interface::AccountsUpdateNotifierInterface,
|
||||||
append_vec::StoredAccountMeta,
|
append_vec::StoredAccountMeta,
|
||||||
@ -88,21 +90,33 @@ impl AccountsUpdateNotifierImpl {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
for plugin in plugin_manager.plugins.iter_mut() {
|
for plugin in plugin_manager.plugins.iter_mut() {
|
||||||
|
let mut measure = Measure::start("accountsdb-plugin-update-account");
|
||||||
match plugin.update_account(ReplicaAccountInfoVersions::V0_0_1(&account), slot) {
|
match plugin.update_account(ReplicaAccountInfoVersions::V0_0_1(&account), slot) {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(
|
error!(
|
||||||
"Failed to update account {:?} at slot {:?}, error: {:?}",
|
"Failed to update account {} at slot {}, error: {} to plugin {}",
|
||||||
account.pubkey, slot, err
|
bs58::encode(account.pubkey).into_string(),
|
||||||
|
slot,
|
||||||
|
err,
|
||||||
|
plugin.name()
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
trace!(
|
trace!(
|
||||||
"Successfully updated account {:?} at slot {:?}",
|
"Successfully updated account {} at slot {} to plugin {}",
|
||||||
account.pubkey,
|
bs58::encode(account.pubkey).into_string(),
|
||||||
slot
|
slot,
|
||||||
|
plugin.name()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
measure.stop();
|
||||||
|
inc_new_counter_info!(
|
||||||
|
"accountsdb-plugin-update-account-ms",
|
||||||
|
measure.as_ms() as usize,
|
||||||
|
100000,
|
||||||
|
100000
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,17 +127,31 @@ impl AccountsUpdateNotifierImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for plugin in plugin_manager.plugins.iter_mut() {
|
for plugin in plugin_manager.plugins.iter_mut() {
|
||||||
|
let mut measure = Measure::start("accountsdb-plugin-update-slot");
|
||||||
match plugin.update_slot_status(slot, parent, slot_status.clone()) {
|
match plugin.update_slot_status(slot, parent, slot_status.clone()) {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(
|
error!(
|
||||||
"Failed to update slot status at slot {:?}, error: {:?}",
|
"Failed to update slot status at slot {}, error: {} to plugin {}",
|
||||||
slot, err
|
slot,
|
||||||
|
err,
|
||||||
|
plugin.name()
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
trace!("Successfully updated slot status at slot {:?}", slot);
|
trace!(
|
||||||
|
"Successfully updated slot status at slot {} to plugin {}",
|
||||||
|
slot,
|
||||||
|
plugin.name()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
measure.stop();
|
||||||
|
inc_new_counter_info!(
|
||||||
|
"accountsdb-plugin-update-slot-ms",
|
||||||
|
measure.as_ms() as usize,
|
||||||
|
1000,
|
||||||
|
1000
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,8 @@ serde_derive = "1.0.103"
|
|||||||
serde_json = "1.0.67"
|
serde_json = "1.0.67"
|
||||||
solana-accountsdb-plugin-interface = { path = "../accountsdb-plugin-interface", version = "=1.9.0" }
|
solana-accountsdb-plugin-interface = { path = "../accountsdb-plugin-interface", version = "=1.9.0" }
|
||||||
solana-logger = { path = "../logger", version = "=1.9.0" }
|
solana-logger = { path = "../logger", version = "=1.9.0" }
|
||||||
|
solana-metrics = { path = "../metrics", version = "=1.9.0" }
|
||||||
|
solana-sdk = { path = "../sdk", version = "=1.9.0" }
|
||||||
thiserror = "1.0.30"
|
thiserror = "1.0.30"
|
||||||
|
|
||||||
[package.metadata.docs.rs]
|
[package.metadata.docs.rs]
|
||||||
|
@ -10,6 +10,8 @@ use {
|
|||||||
solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
|
solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
|
||||||
AccountsDbPluginError, ReplicaAccountInfo, SlotStatus,
|
AccountsDbPluginError, ReplicaAccountInfo, SlotStatus,
|
||||||
},
|
},
|
||||||
|
solana_metrics::datapoint_info,
|
||||||
|
solana_sdk::timing::AtomicInterval,
|
||||||
std::{
|
std::{
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicBool, Ordering},
|
atomic::{AtomicBool, Ordering},
|
||||||
@ -188,10 +190,10 @@ impl PostgresClient for SimplePostgresClient {
|
|||||||
account: &T,
|
account: &T,
|
||||||
slot: u64,
|
slot: u64,
|
||||||
) -> Result<(), AccountsDbPluginError> {
|
) -> Result<(), AccountsDbPluginError> {
|
||||||
debug!(
|
trace!(
|
||||||
"Updating account {:?} {:?} at slot {:?}",
|
"Updating account {} with owner {} at slot {}",
|
||||||
account.pubkey(),
|
bs58::encode(account.pubkey()).into_string(),
|
||||||
account.owner(),
|
bs58::encode(account.owner()).into_string(),
|
||||||
slot,
|
slot,
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -215,9 +217,12 @@ impl PostgresClient for SimplePostgresClient {
|
|||||||
);
|
);
|
||||||
|
|
||||||
if let Err(err) = result {
|
if let Err(err) = result {
|
||||||
return Err(AccountsDbPluginError::AccountsUpdateError {
|
let msg = format!(
|
||||||
msg: format!("Failed to persist the update of account to the PostgreSQL database. Error: {:?}", err)
|
"Failed to persist the update of account to the PostgreSQL database. Error: {:?}",
|
||||||
});
|
err
|
||||||
|
);
|
||||||
|
error!("{}", msg);
|
||||||
|
return Err(AccountsDbPluginError::AccountsUpdateError { msg });
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -266,9 +271,12 @@ impl PostgresClient for SimplePostgresClient {
|
|||||||
|
|
||||||
match result {
|
match result {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
return Err(AccountsDbPluginError::SlotStatusUpdateError{
|
let msg = format!(
|
||||||
msg: format!("Failed to persist the update of slot to the PostgreSQL database. Error: {:?}", err)
|
"Failed to persist the update of slot to the PostgreSQL database. Error: {:?}",
|
||||||
});
|
err
|
||||||
|
);
|
||||||
|
error!("{:?}", msg);
|
||||||
|
return Err(AccountsDbPluginError::SlotStatusUpdateError { msg });
|
||||||
}
|
}
|
||||||
Ok(rows) => {
|
Ok(rows) => {
|
||||||
assert_eq!(1, rows, "Expected one rows to be updated a time");
|
assert_eq!(1, rows, "Expected one rows to be updated a time");
|
||||||
@ -340,6 +348,7 @@ pub struct ParallelPostgresClient {
|
|||||||
workers: Vec<JoinHandle<Result<(), AccountsDbPluginError>>>,
|
workers: Vec<JoinHandle<Result<(), AccountsDbPluginError>>>,
|
||||||
exit_worker: Arc<AtomicBool>,
|
exit_worker: Arc<AtomicBool>,
|
||||||
sender: Sender<DbWorkItem>,
|
sender: Sender<DbWorkItem>,
|
||||||
|
last_report: AtomicInterval,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ParallelPostgresClient {
|
impl ParallelPostgresClient {
|
||||||
@ -365,6 +374,7 @@ impl ParallelPostgresClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
|
last_report: AtomicInterval::default(),
|
||||||
workers,
|
workers,
|
||||||
exit_worker,
|
exit_worker,
|
||||||
sender,
|
sender,
|
||||||
@ -395,6 +405,12 @@ impl PostgresClient for ParallelPostgresClient {
|
|||||||
account: &T,
|
account: &T,
|
||||||
slot: u64,
|
slot: u64,
|
||||||
) -> Result<(), AccountsDbPluginError> {
|
) -> Result<(), AccountsDbPluginError> {
|
||||||
|
if self.last_report.should_update(30000) {
|
||||||
|
datapoint_info!(
|
||||||
|
"postgres-plugin-stats",
|
||||||
|
("message-queue-length", self.sender.len() as i64, i64),
|
||||||
|
);
|
||||||
|
}
|
||||||
if let Err(err) = self
|
if let Err(err) = self
|
||||||
.sender
|
.sender
|
||||||
.send(DbWorkItem::UpdateAccount(UpdateAccountRequest {
|
.send(DbWorkItem::UpdateAccount(UpdateAccountRequest {
|
||||||
@ -405,7 +421,7 @@ impl PostgresClient for ParallelPostgresClient {
|
|||||||
return Err(AccountsDbPluginError::AccountsUpdateError {
|
return Err(AccountsDbPluginError::AccountsUpdateError {
|
||||||
msg: format!(
|
msg: format!(
|
||||||
"Failed to update the account {:?}, error: {:?}",
|
"Failed to update the account {:?}, error: {:?}",
|
||||||
account.pubkey(),
|
bs58::encode(account.pubkey()).into_string(),
|
||||||
err
|
err
|
||||||
),
|
),
|
||||||
});
|
});
|
||||||
|
Reference in New Issue
Block a user