From 7d0494fcaa401a385e14508e7e4aaa9a408ec079 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 7 Oct 2021 14:15:05 -0700 Subject: [PATCH] Merge AccountsDb plugin framework to v1.8 (#20518) Merge AccountsDb plugin framework to v1.8 (#20518) Summary of Changes Create a plugin mechanism in the accounts update path so that accounts data can be streamed out to external data stores (be it Kafka or Postgres). The plugin mechanism allows Data stores of connection strings/credentials to be configured, Accounts with patterns to be streamed PostgreSQL implementation of the streaming for different destination stores to be plugged in. The code comprises 4 major parts: accountsdb-plugin-intf: defines the plugin interface which concrete plugin should implement. accountsdb-plugin-manager: manages the load/unload of plugins and provide interfaces which the validator can notify of accounts update to plugins. accountsdb-plugin-postgres: the concrete plugin implementation for PostgreSQL The validator integrations: updated streamed right after snapshot restore and after account update from transaction processing or other real updates. The plugin is optionally loaded on demand by new validator CLI argument -- there is no impact if the plugin is not loaded. --- Cargo.lock | 184 ++++++++- Cargo.toml | 3 + accounts-bench/src/main.rs | 1 + accountsdb-plugin-interface/Cargo.toml | 17 + accountsdb-plugin-interface/README.md | 20 + .../src/accountsdb_plugin_interface.rs | 90 +++++ accountsdb-plugin-interface/src/lib.rs | 1 + accountsdb-plugin-manager/Cargo.toml | 29 ++ .../src/accounts_update_notifier.rs | 129 +++++++ .../src/accountsdb_plugin_manager.rs | 55 +++ .../src/accountsdb_plugin_service.rs | 150 ++++++++ accountsdb-plugin-manager/src/lib.rs | 4 + .../src/slot_status_observer.rs | 80 ++++ accountsdb-plugin-postgres/Cargo.toml | 29 ++ .../scripts/create_schema.sql | 52 +++ .../scripts/drop_schema.sql | 7 + .../src/accounts_selector.rs | 69 ++++ .../src/accountsdb_plugin_postgres.rs | 349 ++++++++++++++++++ accountsdb-plugin-postgres/src/lib.rs | 2 + core/Cargo.toml | 1 + core/src/retransmit_stage.rs | 2 +- core/src/validator.rs | 45 +++ core/tests/snapshots.rs | 2 + .../src/developing/backwards-compatibility.md | 1 + ledger-tool/src/main.rs | 1 + ledger/src/bank_forks_utils.rs | 9 + ledger/src/blockstore_processor.rs | 60 ++- local-cluster/src/validator_configs.rs | 1 + replica-node/src/replica_node.rs | 1 + .../optimistically_confirmed_bank_tracker.rs | 50 +++ rpc/src/rpc.rs | 4 + rpc/src/rpc_subscriptions.rs | 8 + runtime/benches/accounts.rs | 9 + runtime/src/accounts.rs | 17 + runtime/src/accounts_db.rs | 46 +++ .../src/accounts_update_notifier_interface.rs | 25 ++ runtime/src/bank.rs | 8 + runtime/src/lib.rs | 1 + runtime/src/serde_snapshot.rs | 17 + runtime/src/serde_snapshot/tests.rs | 3 + runtime/src/snapshot_utils.rs | 5 + upload-perf/src/upload-perf.rs | 19 +- validator/src/main.rs | 13 + 43 files changed, 1578 insertions(+), 41 deletions(-) create mode 100644 accountsdb-plugin-interface/Cargo.toml create mode 100644 accountsdb-plugin-interface/README.md create mode 100644 accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs create mode 100644 accountsdb-plugin-interface/src/lib.rs create mode 100644 accountsdb-plugin-manager/Cargo.toml create mode 100644 accountsdb-plugin-manager/src/accounts_update_notifier.rs create mode 100644 accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs create mode 100644 accountsdb-plugin-manager/src/accountsdb_plugin_service.rs create mode 100644 accountsdb-plugin-manager/src/lib.rs create mode 100644 accountsdb-plugin-manager/src/slot_status_observer.rs create mode 100644 accountsdb-plugin-postgres/Cargo.toml create mode 100644 accountsdb-plugin-postgres/scripts/create_schema.sql create mode 100644 accountsdb-plugin-postgres/scripts/drop_schema.sql create mode 100644 accountsdb-plugin-postgres/src/accounts_selector.rs create mode 100644 accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs create mode 100644 accountsdb-plugin-postgres/src/lib.rs create mode 100644 runtime/src/accounts_update_notifier_interface.rs diff --git a/Cargo.lock b/Cargo.lock index 9465c68183..22809b87f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1263,6 +1263,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + [[package]] name = "fast-math" version = "0.1.1" @@ -2321,6 +2327,17 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" +[[package]] +name = "md-5" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15" +dependencies = [ + "block-buffer 0.9.0", + "digest 0.9.0", + "opaque-debug 0.3.0", +] + [[package]] name = "memchr" version = "1.0.2" @@ -2857,6 +2874,24 @@ dependencies = [ "indexmap", ] +[[package]] +name = "phf" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dfb61232e34fcb633f43d12c58f83c1df82962dcdfa565a4e866ffc17dafe12" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c00cf8b9eafe68dde5e9eaa2cef8ee84a9336a47d566ec55ca16589633b65af7" +dependencies = [ + "siphasher", +] + [[package]] name = "pickledb" version = "0.4.1" @@ -2934,6 +2969,50 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" +[[package]] +name = "postgres" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7871ee579860d8183f542e387b176a25f2656b9fb5211e045397f745a68d1c2" +dependencies = [ + "bytes 1.0.1", + "fallible-iterator", + "futures 0.3.8", + "log 0.4.14", + "tokio", + "tokio-postgres", +] + +[[package]] +name = "postgres-protocol" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff3e0f70d32e20923cabf2df02913be7c1842d4c772db8065c00fcfdd1d1bff3" +dependencies = [ + "base64 0.13.0", + "byteorder", + "bytes 1.0.1", + "fallible-iterator", + "hmac 0.10.1", + "md-5", + "memchr 2.4.0", + "rand 0.8.3", + "sha2", + "stringprep", +] + +[[package]] +name = "postgres-types" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "430f4131e1b7657b0cd9a2b0c3408d77c9a43a042d300b8c77f981dffcc43a2f" +dependencies = [ + "bytes 1.0.1", + "chrono", + "fallible-iterator", + "postgres-protocol", +] + [[package]] name = "ppv-lite86" version = "0.2.8" @@ -3750,9 +3829,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.126" +version = "1.0.130" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03" +checksum = "f12d06de37cf59146fbdecab66aa99f9fe4f78722e3607577a5375d66bd0c913" dependencies = [ "serde_derive", ] @@ -3778,9 +3857,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.126" +version = "1.0.130" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43" +checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b" dependencies = [ "proc-macro2 1.0.24", "quote 1.0.9", @@ -3789,9 +3868,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.62" +version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea1c6153794552ea7cf7cf63b1231a25de00ec90db326ba6264440fa08e31486" +checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8" dependencies = [ "itoa", "ryu", @@ -3938,6 +4017,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a30f10c911c0355f80f1c2faa8096efc4a58cdf8590b954d5b395efa071c711" +[[package]] +name = "siphasher" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "533494a8f9b724d33625ab53c6c4800f7cc445895924a8ef649222dcb76e938b" + [[package]] name = "slab" version = "0.4.2" @@ -4075,6 +4160,51 @@ dependencies = [ "spl-token", ] +[[package]] +name = "solana-accountsdb-plugin-interface" +version = "1.8.1" +dependencies = [ + "log 0.4.14", + "thiserror", +] + +[[package]] +name = "solana-accountsdb-plugin-manager" +version = "1.8.1" +dependencies = [ + "bs58 0.4.0", + "crossbeam-channel 0.4.4", + "libloading 0.7.0", + "log 0.4.14", + "serde", + "serde_derive", + "serde_json", + "solana-accountsdb-plugin-interface", + "solana-logger 1.8.1", + "solana-metrics", + "solana-rpc", + "solana-runtime", + "solana-sdk", + "thiserror", +] + +[[package]] +name = "solana-accountsdb-plugin-postgres" +version = "1.8.1" +dependencies = [ + "bs58 0.4.0", + "chrono", + "libloading 0.7.0", + "log 0.4.14", + "postgres", + "serde", + "serde_derive", + "serde_json", + "solana-accountsdb-plugin-interface", + "solana-logger 1.8.1", + "thiserror", +] + [[package]] name = "solana-banking-bench" version = "1.8.1" @@ -4459,6 +4589,7 @@ dependencies = [ "serde_json", "serial_test", "solana-account-decoder", + "solana-accountsdb-plugin-manager", "solana-banks-server", "solana-clap-utils", "solana-client", @@ -5849,6 +5980,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "stringprep" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "strsim" version = "0.8.0" @@ -6025,18 +6166,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.23" +version = "1.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76cc616c6abf8c8928e2fdcc0dbfab37175edd8fb49a4641066ad1364fdab146" +checksum = "602eca064b2d83369e2b2f34b09c70b605402801927c65c11071ac911d299b88" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.23" +version = "1.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9be73a2caec27583d0046ef3796c3794f868a5bc813db689eed00c7631275cd1" +checksum = "bad553cc2c78e8de258400763a647e80e6d1b31ee237275d756f6836d204494c" dependencies = [ "proc-macro2 1.0.24", "quote 1.0.9", @@ -6235,6 +6376,29 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-postgres" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2b1383c7e4fb9a09e292c7c6afb7da54418d53b045f1c1fac7a911411a2b8b" +dependencies = [ + "async-trait", + "byteorder", + "bytes 1.0.1", + "fallible-iterator", + "futures 0.3.8", + "log 0.4.14", + "parking_lot 0.11.2", + "percent-encoding 2.1.0", + "phf", + "pin-project-lite", + "postgres-protocol", + "postgres-types", + "socket2 0.4.1", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-reactor" version = "0.1.12" diff --git a/Cargo.toml b/Cargo.toml index e92130987c..b6f27eab77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,8 @@ [workspace] members = [ + "accountsdb-plugin-interface", + "accountsdb-plugin-manager", + "accountsdb-plugin-postgres", "accounts-cluster-bench", "bench-exchange", "bench-streamer", diff --git a/accounts-bench/src/main.rs b/accounts-bench/src/main.rs index abe4a98e24..aa2dbcf571 100644 --- a/accounts-bench/src/main.rs +++ b/accounts-bench/src/main.rs @@ -66,6 +66,7 @@ fn main() { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); println!("Creating {} accounts", num_accounts); let mut create_time = Measure::start("create accounts"); diff --git a/accountsdb-plugin-interface/Cargo.toml b/accountsdb-plugin-interface/Cargo.toml new file mode 100644 index 0000000000..5280dedf12 --- /dev/null +++ b/accountsdb-plugin-interface/Cargo.toml @@ -0,0 +1,17 @@ +[package] +authors = ["Solana Maintainers "] +edition = "2018" +name = "solana-accountsdb-plugin-interface" +description = "The Solana AccountsDb plugin interface." +version = "1.8.1" +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +documentation = "https://docs.rs/solana-validator" + +[dependencies] +log = "0.4.11" +thiserror = "1.0.29" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/accountsdb-plugin-interface/README.md b/accountsdb-plugin-interface/README.md new file mode 100644 index 0000000000..6646b9908f --- /dev/null +++ b/accountsdb-plugin-interface/README.md @@ -0,0 +1,20 @@ +

+ + Solana + +

+ +# Solana AccountsDb Plugin Interface + +This crate enables an AccountsDb plugin to be plugged into the Solana Validator runtime to take actions +at the time of each account update; for example, saving the account state to an external database. The plugin must implement the `AccountsDbPlugin` trait. Please see the detail of the `accountsdb_plugin_interface.rs` for the interface definition. + +The plugin should produce a `cdylib` dynamic library, which must expose a `C` function `_create_plugin()` that +instantiates the implementation of the interface. + +The `solana-accountsdb-plugin-postgres` crate provides an example of how to create a plugin which saves the accounts data into an +external PostgreSQL databases. + +More information about Solana is available in the [Solana documentation](https://docs.solana.com/). + +Still have questions? Ask us on [Discord](https://discordapp.com/invite/pquxPsq) diff --git a/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs new file mode 100644 index 0000000000..b75914bf34 --- /dev/null +++ b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs @@ -0,0 +1,90 @@ +/// The interface for AccountsDb plugins. A plugin must implement +/// the AccountsDbPlugin trait to work with the runtime. +/// In addition, the dynamic library must export a "C" function _create_plugin which +/// creates the implementation of the plugin. +use { + std::{any::Any, error, io}, + thiserror::Error, +}; + +impl Eq for ReplicaAccountInfo<'_> {} + +#[derive(Clone, PartialEq, Debug)] +pub struct ReplicaAccountInfo<'a> { + pub pubkey: &'a [u8], + pub lamports: u64, + pub owner: &'a [u8], + pub executable: bool, + pub rent_epoch: u64, + pub data: &'a [u8], +} + +pub enum ReplicaAccountInfoVersions<'a> { + V0_0_1(&'a ReplicaAccountInfo<'a>), +} + +#[derive(Error, Debug)] +pub enum AccountsDbPluginError { + #[error("Error opening config file.")] + ConfigFileOpenError(#[from] io::Error), + + #[error("Error reading config file.")] + ConfigFileReadError { msg: String }, + + #[error("Error updating account.")] + AccountsUpdateError { msg: String }, + + #[error("Error updating slot status.")] + SlotStatusUpdateError { msg: String }, + + #[error("Plugin-defined custom error.")] + Custom(Box), +} + +#[derive(Debug, Clone)] +pub enum SlotStatus { + Processed, + Rooted, + Confirmed, +} + +impl SlotStatus { + pub fn as_str(&self) -> &'static str { + match self { + SlotStatus::Confirmed => "confirmed", + SlotStatus::Processed => "processed", + SlotStatus::Rooted => "rooted", + } + } +} + +pub type Result = std::result::Result; + +pub trait AccountsDbPlugin: Any + Send + Sync + std::fmt::Debug { + fn name(&self) -> &'static str; + + /// The callback called when a plugin is loaded by the system, + /// used for doing whatever initialization is required by the plugin. + /// The _config_file contains the name of the + /// of the config file. The config must be in JSON format and + /// include a field "libpath" indicating the full path + /// name of the shared library implementing this interface. + fn on_load(&mut self, _config_file: &str) -> Result<()> { + Ok(()) + } + + /// The callback called right before a plugin is unloaded by the system + /// Used for doing cleanup before unload. + fn on_unload(&mut self) {} + + /// Called when an account is updated at a slot. + fn update_account(&mut self, account: ReplicaAccountInfoVersions, slot: u64) -> Result<()>; + + /// Called when a slot status is updated + fn update_slot_status( + &mut self, + slot: u64, + parent: Option, + status: SlotStatus, + ) -> Result<()>; +} diff --git a/accountsdb-plugin-interface/src/lib.rs b/accountsdb-plugin-interface/src/lib.rs new file mode 100644 index 0000000000..0c3e43c20b --- /dev/null +++ b/accountsdb-plugin-interface/src/lib.rs @@ -0,0 +1 @@ +pub mod accountsdb_plugin_interface; diff --git a/accountsdb-plugin-manager/Cargo.toml b/accountsdb-plugin-manager/Cargo.toml new file mode 100644 index 0000000000..9508c983c5 --- /dev/null +++ b/accountsdb-plugin-manager/Cargo.toml @@ -0,0 +1,29 @@ +[package] +authors = ["Solana Maintainers "] +edition = "2018" +name = "solana-accountsdb-plugin-manager" +description = "The Solana AccountsDb plugin manager." +version = "1.8.1" +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +documentation = "https://docs.rs/solana-validator" + +[dependencies] +bs58 = "0.4.0" +crossbeam-channel = "0.4" +libloading = "0.7.0" +log = "0.4.11" +serde = "1.0.130" +serde_derive = "1.0.103" +serde_json = "1.0.67" +solana-accountsdb-plugin-interface = { path = "../accountsdb-plugin-interface", version = "=1.8.1" } +solana-logger = { path = "../logger", version = "=1.8.1" } +solana-metrics = { path = "../metrics", version = "=1.8.1" } +solana-rpc = { path = "../rpc", version = "=1.8.1" } +solana-runtime = { path = "../runtime", version = "=1.8.1" } +solana-sdk = { path = "../sdk", version = "=1.8.1" } +thiserror = "1.0.21" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/accountsdb-plugin-manager/src/accounts_update_notifier.rs b/accountsdb-plugin-manager/src/accounts_update_notifier.rs new file mode 100644 index 0000000000..1896dca013 --- /dev/null +++ b/accountsdb-plugin-manager/src/accounts_update_notifier.rs @@ -0,0 +1,129 @@ +/// Module responsible for notifying plugins of account updates +use { + crate::accountsdb_plugin_manager::AccountsDbPluginManager, + log::*, + solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ + ReplicaAccountInfo, ReplicaAccountInfoVersions, SlotStatus, + }, + solana_runtime::{ + accounts_update_notifier_interface::AccountsUpdateNotifierInterface, + append_vec::StoredAccountMeta, + }, + solana_sdk::{ + account::{AccountSharedData, ReadableAccount}, + clock::Slot, + pubkey::Pubkey, + }, + std::sync::{Arc, RwLock}, +}; +#[derive(Debug)] +pub(crate) struct AccountsUpdateNotifierImpl { + plugin_manager: Arc>, +} + +impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl { + fn notify_account_update(&self, slot: Slot, pubkey: &Pubkey, account: &AccountSharedData) { + if let Some(account_info) = self.accountinfo_from_shared_account_data(pubkey, account) { + self.notify_plugins_of_account_update(account_info, slot); + } + } + + fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) { + if let Some(account_info) = self.accountinfo_from_stored_account_meta(account) { + self.notify_plugins_of_account_update(account_info, slot); + } + } + + fn notify_slot_confirmed(&self, slot: Slot, parent: Option) { + self.notify_slot_status(slot, parent, SlotStatus::Confirmed); + } + + fn notify_slot_processed(&self, slot: Slot, parent: Option) { + self.notify_slot_status(slot, parent, SlotStatus::Processed); + } + + fn notify_slot_rooted(&self, slot: Slot, parent: Option) { + self.notify_slot_status(slot, parent, SlotStatus::Rooted); + } +} + +impl AccountsUpdateNotifierImpl { + pub fn new(plugin_manager: Arc>) -> Self { + AccountsUpdateNotifierImpl { plugin_manager } + } + + fn accountinfo_from_shared_account_data<'a>( + &self, + pubkey: &'a Pubkey, + account: &'a AccountSharedData, + ) -> Option> { + Some(ReplicaAccountInfo { + pubkey: pubkey.as_ref(), + lamports: account.lamports(), + owner: account.owner().as_ref(), + executable: account.executable(), + rent_epoch: account.rent_epoch(), + data: account.data(), + }) + } + + fn accountinfo_from_stored_account_meta<'a>( + &self, + stored_account_meta: &'a StoredAccountMeta, + ) -> Option> { + Some(ReplicaAccountInfo { + pubkey: stored_account_meta.meta.pubkey.as_ref(), + lamports: stored_account_meta.account_meta.lamports, + owner: stored_account_meta.account_meta.owner.as_ref(), + executable: stored_account_meta.account_meta.executable, + rent_epoch: stored_account_meta.account_meta.rent_epoch, + data: stored_account_meta.data, + }) + } + + fn notify_plugins_of_account_update(&self, account: ReplicaAccountInfo, slot: Slot) { + let mut plugin_manager = self.plugin_manager.write().unwrap(); + + if plugin_manager.plugins.is_empty() { + return; + } + for plugin in plugin_manager.plugins.iter_mut() { + match plugin.update_account(ReplicaAccountInfoVersions::V0_0_1(&account), slot) { + Err(err) => { + error!( + "Failed to update account {:?} at slot {:?}, error: {:?}", + account.pubkey, slot, err + ) + } + Ok(_) => { + trace!( + "Successfully updated account {:?} at slot {:?}", + account.pubkey, + slot + ); + } + } + } + } + + pub fn notify_slot_status(&self, slot: Slot, parent: Option, slot_status: SlotStatus) { + let mut plugin_manager = self.plugin_manager.write().unwrap(); + if plugin_manager.plugins.is_empty() { + return; + } + + for plugin in plugin_manager.plugins.iter_mut() { + match plugin.update_slot_status(slot, parent, slot_status.clone()) { + Err(err) => { + error!( + "Failed to update slot status at slot {:?}, error: {:?}", + slot, err + ) + } + Ok(_) => { + trace!("Successfully updated slot status at slot {:?}", slot); + } + } + } + } +} diff --git a/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs b/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs new file mode 100644 index 0000000000..a6074362f8 --- /dev/null +++ b/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs @@ -0,0 +1,55 @@ +/// Managing the AccountsDb plugins +use { + libloading::{Library, Symbol}, + log::*, + solana_accountsdb_plugin_interface::accountsdb_plugin_interface::AccountsDbPlugin, + std::error::Error, +}; + +#[derive(Default, Debug)] +pub struct AccountsDbPluginManager { + pub plugins: Vec>, + libs: Vec, +} + +impl AccountsDbPluginManager { + pub fn new() -> Self { + AccountsDbPluginManager { + plugins: Vec::default(), + libs: Vec::default(), + } + } + + /// # Safety + /// + /// This function loads the dynamically linked library specified in the path. The library + /// must do necessary initializations. + pub unsafe fn load_plugin( + &mut self, + libpath: &str, + config_file: &str, + ) -> Result<(), Box> { + type PluginConstructor = unsafe fn() -> *mut dyn AccountsDbPlugin; + let lib = Library::new(libpath)?; + let constructor: Symbol = lib.get(b"_create_plugin")?; + let plugin_raw = constructor(); + let mut plugin = Box::from_raw(plugin_raw); + plugin.on_load(config_file)?; + self.plugins.push(plugin); + self.libs.push(lib); + Ok(()) + } + + /// Unload all plugins and loaded plugin libraries, making sure to fire + /// their `on_plugin_unload()` methods so they can do any necessary cleanup. + pub fn unload(&mut self) { + for mut plugin in self.plugins.drain(..) { + info!("Unloading plugin for {:?}", plugin.name()); + plugin.on_unload(); + } + + for lib in self.libs.drain(..) { + drop(lib); + } + } +} diff --git a/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs b/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs new file mode 100644 index 0000000000..ed10433097 --- /dev/null +++ b/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs @@ -0,0 +1,150 @@ +use { + crate::{ + accounts_update_notifier::AccountsUpdateNotifierImpl, + accountsdb_plugin_manager::AccountsDbPluginManager, + slot_status_observer::SlotStatusObserver, + }, + crossbeam_channel::Receiver, + log::*, + serde_json, + solana_rpc::optimistically_confirmed_bank_tracker::BankNotification, + solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier, + std::{ + fs::File, + io::Read, + path::Path, + sync::{Arc, RwLock}, + thread, + }, + thiserror::Error, +}; + +#[derive(Error, Debug)] +pub enum AccountsdbPluginServiceError { + #[error("Cannot open the the plugin config file")] + CannotOpenConfigFile(String), + + #[error("Cannot read the the plugin config file")] + CannotReadConfigFile(String), + + #[error("The config file is not in a valid Json format")] + InvalidConfigFileFormat(String), + + #[error("Plugin library path is not specified in the config file")] + LibPathNotSet, + + #[error("Invalid plugin path")] + InvalidPluginPath, + + #[error("Cannot load plugin shared library")] + PluginLoadError(String), +} + +/// The service managing the AccountsDb plugin workflow. +pub struct AccountsDbPluginService { + slot_status_observer: SlotStatusObserver, + plugin_manager: Arc>, + accounts_update_notifier: AccountsUpdateNotifier, +} + +impl AccountsDbPluginService { + /// Creates and returns the AccountsDbPluginService. + /// # Arguments + /// * `confirmed_bank_receiver` - The receiver for confirmed bank notification + /// * `accountsdb_plugin_config_file` - The config file path for the plugin. The + /// config file controls the plugin responsible + /// for transporting the data to external data stores. It is defined in JSON format. + /// The `libpath` field should be pointed to the full path of the dynamic shared library + /// (.so file) to be loaded. The shared library must implement the `AccountsDbPlugin` + /// trait. And the shared library shall export a `C` function `_create_plugin` which + /// shall create the implementation of `AccountsDbPlugin` and returns to the caller. + /// The rest of the JSON fields' definition is up to to the concrete plugin implementation + /// It is usually used to configure the connection information for the external data store. + + pub fn new( + confirmed_bank_receiver: Receiver, + accountsdb_plugin_config_file: &Path, + ) -> Result { + info!( + "Starting AccountsDbPluginService from config file: {:?}", + accountsdb_plugin_config_file + ); + let plugin_manager = AccountsDbPluginManager::new(); + let plugin_manager = Arc::new(RwLock::new(plugin_manager)); + + let mut file = match File::open(accountsdb_plugin_config_file) { + Ok(file) => file, + Err(err) => { + return Err(AccountsdbPluginServiceError::CannotOpenConfigFile(format!( + "Failed to open the plugin config file {:?}, error: {:?}", + accountsdb_plugin_config_file, err + ))); + } + }; + + let mut contents = String::new(); + if let Err(err) = file.read_to_string(&mut contents) { + return Err(AccountsdbPluginServiceError::CannotReadConfigFile(format!( + "Failed to read the plugin config file {:?}, error: {:?}", + accountsdb_plugin_config_file, err + ))); + } + + let result: serde_json::Value = match serde_json::from_str(&contents) { + Ok(value) => value, + Err(err) => { + return Err(AccountsdbPluginServiceError::InvalidConfigFileFormat( + format!( + "The config file {:?} is not in a valid Json format, error: {:?}", + accountsdb_plugin_config_file, err + ), + )); + } + }; + + let accounts_update_notifier = Arc::new(RwLock::new(AccountsUpdateNotifierImpl::new( + plugin_manager.clone(), + ))); + let slot_status_observer = + SlotStatusObserver::new(confirmed_bank_receiver, accounts_update_notifier.clone()); + + let libpath = result["libpath"] + .as_str() + .ok_or(AccountsdbPluginServiceError::LibPathNotSet)?; + let config_file = accountsdb_plugin_config_file + .as_os_str() + .to_str() + .ok_or(AccountsdbPluginServiceError::InvalidPluginPath)?; + + unsafe { + let result = plugin_manager + .write() + .unwrap() + .load_plugin(libpath, config_file); + if let Err(err) = result { + let msg = format!( + "Failed to load the plugin library: {:?}, error: {:?}", + libpath, err + ); + return Err(AccountsdbPluginServiceError::PluginLoadError(msg)); + } + } + + info!("Started AccountsDbPluginService"); + Ok(AccountsDbPluginService { + slot_status_observer, + plugin_manager, + accounts_update_notifier, + }) + } + + pub fn get_accounts_update_notifier(&self) -> AccountsUpdateNotifier { + self.accounts_update_notifier.clone() + } + + pub fn join(mut self) -> thread::Result<()> { + self.slot_status_observer.join()?; + self.plugin_manager.write().unwrap().unload(); + Ok(()) + } +} diff --git a/accountsdb-plugin-manager/src/lib.rs b/accountsdb-plugin-manager/src/lib.rs new file mode 100644 index 0000000000..d2b38b57b3 --- /dev/null +++ b/accountsdb-plugin-manager/src/lib.rs @@ -0,0 +1,4 @@ +pub mod accounts_update_notifier; +pub mod accountsdb_plugin_manager; +pub mod accountsdb_plugin_service; +pub mod slot_status_observer; diff --git a/accountsdb-plugin-manager/src/slot_status_observer.rs b/accountsdb-plugin-manager/src/slot_status_observer.rs new file mode 100644 index 0000000000..9d3b36879f --- /dev/null +++ b/accountsdb-plugin-manager/src/slot_status_observer.rs @@ -0,0 +1,80 @@ +use { + crossbeam_channel::Receiver, + solana_rpc::optimistically_confirmed_bank_tracker::BankNotification, + solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier, + std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, Builder, JoinHandle}, + }, +}; + +#[derive(Debug)] +pub(crate) struct SlotStatusObserver { + bank_notification_receiver_service: Option>, + exit_updated_slot_server: Arc, +} + +impl SlotStatusObserver { + pub fn new( + bank_notification_receiver: Receiver, + accounts_update_notifier: AccountsUpdateNotifier, + ) -> Self { + let exit_updated_slot_server = Arc::new(AtomicBool::new(false)); + + Self { + bank_notification_receiver_service: Some(Self::run_bank_notification_receiver( + bank_notification_receiver, + exit_updated_slot_server.clone(), + accounts_update_notifier, + )), + exit_updated_slot_server, + } + } + + pub fn join(&mut self) -> thread::Result<()> { + self.exit_updated_slot_server.store(true, Ordering::Relaxed); + self.bank_notification_receiver_service + .take() + .map(JoinHandle::join) + .unwrap() + } + + fn run_bank_notification_receiver( + bank_notification_receiver: Receiver, + exit: Arc, + accounts_update_notifier: AccountsUpdateNotifier, + ) -> JoinHandle<()> { + Builder::new() + .name("bank_notification_receiver".to_string()) + .spawn(move || { + while !exit.load(Ordering::Relaxed) { + if let Ok(slot) = bank_notification_receiver.recv() { + match slot { + BankNotification::OptimisticallyConfirmed(slot) => { + accounts_update_notifier + .read() + .unwrap() + .notify_slot_confirmed(slot, None); + } + BankNotification::Frozen(bank) => { + accounts_update_notifier + .read() + .unwrap() + .notify_slot_processed(bank.slot(), Some(bank.parent_slot())); + } + BankNotification::Root(bank) => { + accounts_update_notifier + .read() + .unwrap() + .notify_slot_rooted(bank.slot(), Some(bank.parent_slot())); + } + } + } + } + }) + .unwrap() + } +} diff --git a/accountsdb-plugin-postgres/Cargo.toml b/accountsdb-plugin-postgres/Cargo.toml new file mode 100644 index 0000000000..59438c884c --- /dev/null +++ b/accountsdb-plugin-postgres/Cargo.toml @@ -0,0 +1,29 @@ +[package] +authors = ["Solana Maintainers "] +edition = "2018" +name = "solana-accountsdb-plugin-postgres" +description = "The Solana AccountsDb plugin for PostgreSQL database." +version = "1.8.1" +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +documentation = "https://docs.rs/solana-validator" + +[lib] +crate-type = ["cdylib", "rlib"] + +[dependencies] +bs58 = "0.4.0" +chrono = { version = "0.4.11", features = ["serde"] } +libloading = "0.7.0" +log = "0.4.14" +postgres = { version = "0.19.1", features = ["with-chrono-0_4"] } +serde = "1.0.130" +serde_derive = "1.0.103" +serde_json = "1.0.67" +solana-accountsdb-plugin-interface = { path = "../accountsdb-plugin-interface", version = "=1.8.1" } +solana-logger = { path = "../logger", version = "=1.8.1" } +thiserror = "1.0.21" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/accountsdb-plugin-postgres/scripts/create_schema.sql b/accountsdb-plugin-postgres/scripts/create_schema.sql new file mode 100644 index 0000000000..41be52f5b3 --- /dev/null +++ b/accountsdb-plugin-postgres/scripts/create_schema.sql @@ -0,0 +1,52 @@ +/** + * This plugin implementation for PostgreSQL requires the following tables + */ +-- The table storing accounts + + +CREATE TABLE account ( + pubkey BYTEA PRIMARY KEY, + owner BYTEA, + lamports BIGINT NOT NULL, + slot BIGINT NOT NULL, + executable BOOL NOT NULL, + rent_epoch BIGINT NOT NULL, + data BYTEA, + updated_on TIMESTAMP NOT NULL +); + +-- The table storing slot information +CREATE TABLE slot ( + slot BIGINT PRIMARY KEY, + parent BIGINT, + status varchar(16) NOT NULL, + updated_on TIMESTAMP NOT NULL +); + +/** + * The following is for keeping historical data for accounts and is not required for plugin to work. + */ +-- The table storing historical data for accounts +CREATE TABLE account_audit ( + pubkey BYTEA, + owner BYTEA, + lamports BIGINT NOT NULL, + slot BIGINT NOT NULL, + executable BOOL NOT NULL, + rent_epoch BIGINT NOT NULL, + data BYTEA, + updated_on TIMESTAMP NOT NULL +); + +CREATE FUNCTION audit_account_update() RETURNS trigger AS $audit_account_update$ + BEGIN + INSERT INTO account_audit (pubkey, owner, lamports, slot, executable, rent_epoch, data, updated_on) + VALUES (OLD.pubkey, OLD.owner, OLD.lamports, OLD.slot, + OLD.executable, OLD.rent_epoch, OLD.data, OLD.updated_on); + RETURN NEW; + END; + +$audit_account_update$ LANGUAGE plpgsql; + +CREATE TRIGGER account_update_trigger AFTER UPDATE OR DELETE ON account + FOR EACH ROW EXECUTE PROCEDURE audit_account_update(); diff --git a/accountsdb-plugin-postgres/scripts/drop_schema.sql b/accountsdb-plugin-postgres/scripts/drop_schema.sql new file mode 100644 index 0000000000..e76a0dc165 --- /dev/null +++ b/accountsdb-plugin-postgres/scripts/drop_schema.sql @@ -0,0 +1,7 @@ +/** + * Script for cleaning up the schema for PostgreSQL used for the AccountsDb plugin. + */ + +DROP FUNCTION audit_account_update; +DROP TABLE account_audit; +DROP TABLE account; \ No newline at end of file diff --git a/accountsdb-plugin-postgres/src/accounts_selector.rs b/accountsdb-plugin-postgres/src/accounts_selector.rs new file mode 100644 index 0000000000..91c669f70a --- /dev/null +++ b/accountsdb-plugin-postgres/src/accounts_selector.rs @@ -0,0 +1,69 @@ +use {log::*, std::collections::HashSet}; + +#[derive(Debug)] +pub(crate) struct AccountsSelector { + pub accounts: HashSet>, + pub owners: HashSet>, + pub select_all_accounts: bool, +} + +impl AccountsSelector { + pub fn default() -> Self { + AccountsSelector { + accounts: HashSet::default(), + owners: HashSet::default(), + select_all_accounts: true, + } + } + + pub fn new(accounts: &[String], owners: &[String]) -> Self { + info!( + "Creating AccountsSelector from accounts: {:?}, owners: {:?}", + accounts, owners + ); + + let select_all_accounts = accounts.iter().any(|key| key == "*"); + if select_all_accounts { + return AccountsSelector { + accounts: HashSet::default(), + owners: HashSet::default(), + select_all_accounts, + }; + } + let accounts = accounts + .iter() + .map(|key| bs58::decode(key).into_vec().unwrap()) + .collect(); + let owners = owners + .iter() + .map(|key| bs58::decode(key).into_vec().unwrap()) + .collect(); + AccountsSelector { + accounts, + owners, + select_all_accounts, + } + } + + pub fn is_account_selected(&self, account: &[u8], owner: &[u8]) -> bool { + self.select_all_accounts || self.accounts.contains(account) || self.owners.contains(owner) + } +} + +#[cfg(test)] +pub(crate) mod tests { + use super::*; + + #[test] + fn test_create_accounts_selector() { + AccountsSelector::new( + &["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_string()], + &[], + ); + + AccountsSelector::new( + &[], + &["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_string()], + ); + } +} diff --git a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs new file mode 100644 index 0000000000..1663a0e2d4 --- /dev/null +++ b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs @@ -0,0 +1,349 @@ +/// Main entry for the PostgreSQL plugin +use { + crate::accounts_selector::AccountsSelector, + bs58, + chrono::Utc, + log::*, + postgres::{Client, NoTls, Statement}, + serde_derive::{Deserialize, Serialize}, + serde_json, + solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ + AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions, Result, SlotStatus, + }, + std::{fs::File, io::Read, sync::Mutex}, + thiserror::Error, +}; + +struct PostgresSqlClientWrapper { + client: Client, + update_account_stmt: Statement, +} + +#[derive(Default)] +pub struct AccountsDbPluginPostgres { + client: Option>, + accounts_selector: Option, +} + +impl std::fmt::Debug for AccountsDbPluginPostgres { + fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Ok(()) + } +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +struct AccountsDbPluginPostgresConfig { + host: String, + user: String, +} + +#[derive(Error, Debug)] +enum AccountsDbPluginPostgresError { + #[error("Error connecting to the backend data store.")] + DataStoreConnectionError { msg: String }, + + #[error("Error preparing data store schema.")] + DataSchemaError { msg: String }, +} + +impl AccountsDbPlugin for AccountsDbPluginPostgres { + fn name(&self) -> &'static str { + "AccountsDbPluginPostgres" + } + + /// Do initialization for the PostgreSQL plugin. + /// # Arguments + /// + /// Format of the config file: + /// The `accounts_selector` section allows the user to controls accounts selections. + /// "accounts_selector" : { + /// "accounts" : \["pubkey-1", "pubkey-2", ..., "pubkey-n"\], + /// } + /// or: + /// "accounts_selector" = { + /// "owners" : \["pubkey-1', 'pubkey-2", ..., "pubkey-m"\] + /// } + /// Accounts either satisyfing the accounts condition or owners condition will be selected. + /// When only owners is specified, + /// all accounts belonging to the owners will be streamed. + /// The accounts field support wildcard to select all accounts: + /// "accounts_selector" : { + /// "accounts" : \["*"\], + /// } + /// "host" specifies the PostgreSQL server. + /// "user" specifies the PostgreSQL user. + /// # Examples + /// { + /// "libpath": "/home/solana/target/release/libsolana_accountsdb_plugin_postgres.so", + /// "host": "host_foo", + /// "user": "solana", + /// "accounts_selector" : { + /// "owners" : ["9oT9R5ZyRovSVnt37QvVoBttGpNqR3J7unkb567NP8k3"] + /// } + + fn on_load(&mut self, config_file: &str) -> Result<()> { + solana_logger::setup_with_default("info"); + info!( + "Loading plugin {:?} from config_file {:?}", + self.name(), + config_file + ); + let mut file = File::open(config_file)?; + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + + let result: serde_json::Value = serde_json::from_str(&contents).unwrap(); + self.accounts_selector = Some(Self::create_accounts_selector_from_config(&result)); + + let result: serde_json::Result = + serde_json::from_str(&contents); + match result { + Err(err) => { + return Err(AccountsDbPluginError::ConfigFileReadError { + msg: format!( + "The config file is not in the JSON format expected: {:?}", + err + ), + }) + } + Ok(config) => { + let connection_str = format!("host={} user={}", config.host, config.user); + match Client::connect(&connection_str, NoTls) { + Err(err) => { + return Err(AccountsDbPluginError::Custom( + Box::new(AccountsDbPluginPostgresError::DataStoreConnectionError { + msg: format!( + "Error in connecting to the PostgreSQL database: {:?} host: {:?} user: {:?} config: {:?}", + err, config.host, config.user, connection_str), + }))); + } + Ok(mut client) => { + let result = client.prepare("INSERT INTO account (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \ + ON CONFLICT (pubkey) DO UPDATE SET slot=$2, owner=$3, lamports=$4, executable=$5, rent_epoch=$6, \ + data=$7, updated_on=$8"); + + match result { + Err(err) => { + return Err(AccountsDbPluginError::Custom( + Box::new(AccountsDbPluginPostgresError::DataSchemaError { + msg: format!( + "Error in preparing for the accounts update PostgreSQL database: {:?} host: {:?} user: {:?} config: {:?}", + err, config.host, config.user, connection_str + ), + }))); + } + Ok(update_account_stmt) => { + self.client = Some(Mutex::new(PostgresSqlClientWrapper { + client, + update_account_stmt, + })); + } + } + } + } + } + } + + Ok(()) + } + + /// Unload all plugins and loaded plugin libraries, making sure to fire + /// their `on_plugin_unload()` methods so they can do any necessary cleanup. + fn on_unload(&mut self) { + info!("Unloading plugin: {:?}", self.name()); + } + + fn update_account(&mut self, account: ReplicaAccountInfoVersions, slot: u64) -> Result<()> { + match account { + ReplicaAccountInfoVersions::V0_0_1(account) => { + if let Some(accounts_selector) = &self.accounts_selector { + if !accounts_selector.is_account_selected(account.pubkey, account.owner) { + return Ok(()); + } + } else { + return Ok(()); + } + + debug!( + "Updating account {:?} with owner {:?} at slot {:?} using account selector {:?}", + bs58::encode(account.pubkey).into_string(), + bs58::encode(account.owner).into_string(), + slot, + self.accounts_selector.as_ref().unwrap() + ); + + match &mut self.client { + None => { + return Err(AccountsDbPluginError::Custom(Box::new( + AccountsDbPluginPostgresError::DataStoreConnectionError { + msg: "There is no connection to the PostgreSQL database." + .to_string(), + }, + ))); + } + Some(client) => { + let slot = slot as i64; // postgres only supports i64 + let lamports = account.lamports as i64; + let rent_epoch = account.rent_epoch as i64; + let updated_on = Utc::now().naive_utc(); + let client = client.get_mut().unwrap(); + let result = client.client.query( + &client.update_account_stmt, + &[ + &account.pubkey, + &slot, + &account.owner, + &lamports, + &account.executable, + &rent_epoch, + &account.data, + &updated_on, + ], + ); + + if let Err(err) = result { + return Err(AccountsDbPluginError::AccountsUpdateError { + msg: format!("Failed to persist the update of account to the PostgreSQL database. Error: {:?}", err) + }); + } + } + } + } + } + Ok(()) + } + + fn update_slot_status( + &mut self, + slot: u64, + parent: Option, + status: SlotStatus, + ) -> Result<()> { + info!("Updating slot {:?} at with status {:?}", slot, status); + + match &mut self.client { + None => { + return Err(AccountsDbPluginError::Custom(Box::new( + AccountsDbPluginPostgresError::DataStoreConnectionError { + msg: "There is no connection to the PostgreSQL database.".to_string(), + }, + ))); + } + Some(client) => { + let slot = slot as i64; // postgres only supports i64 + let parent = parent.map(|parent| parent as i64); + let updated_on = Utc::now().naive_utc(); + let status_str = status.as_str(); + + let result = match parent { + Some(parent) => { + client.get_mut().unwrap().client.execute( + "INSERT INTO slot (slot, parent, status, updated_on) \ + VALUES ($1, $2, $3, $4) \ + ON CONFLICT (slot) DO UPDATE SET parent=$2, status=$3, updated_on=$4", + &[ + &slot, + &parent, + &status_str, + &updated_on, + ], + ) + } + None => { + client.get_mut().unwrap().client.execute( + "INSERT INTO slot (slot, status, updated_on) \ + VALUES ($1, $2, $3) \ + ON CONFLICT (slot) DO UPDATE SET status=$2, updated_on=$3", + &[ + &slot, + &status_str, + &updated_on, + ], + ) + } + }; + + match result { + Err(err) => { + return Err(AccountsDbPluginError::SlotStatusUpdateError{ + msg: format!("Failed to persist the update of slot to the PostgreSQL database. Error: {:?}", err) + }); + } + Ok(rows) => { + assert_eq!(1, rows, "Expected one rows to be updated a time"); + } + } + } + } + + Ok(()) + } +} + +impl AccountsDbPluginPostgres { + fn create_accounts_selector_from_config(config: &serde_json::Value) -> AccountsSelector { + let accounts_selector = &config["accounts_selector"]; + + if accounts_selector.is_null() { + AccountsSelector::default() + } else { + let accounts = &accounts_selector["accounts"]; + let accounts: Vec = if accounts.is_array() { + accounts + .as_array() + .unwrap() + .iter() + .map(|val| val.as_str().unwrap().to_string()) + .collect() + } else { + Vec::default() + }; + let owners = &accounts_selector["owners"]; + let owners: Vec = if owners.is_array() { + owners + .as_array() + .unwrap() + .iter() + .map(|val| val.as_str().unwrap().to_string()) + .collect() + } else { + Vec::default() + }; + AccountsSelector::new(&accounts, &owners) + } + } + + pub fn new() -> Self { + AccountsDbPluginPostgres { + client: None, + accounts_selector: None, + } + } +} + +#[no_mangle] +#[allow(improper_ctypes_definitions)] +/// # Safety +/// +/// This function returns the AccountsDbPluginPostgres pointer as trait AccountsDbPlugin. +pub unsafe extern "C" fn _create_plugin() -> *mut dyn AccountsDbPlugin { + let plugin = AccountsDbPluginPostgres::new(); + let plugin: Box = Box::new(plugin); + Box::into_raw(plugin) +} + +#[cfg(test)] +pub(crate) mod tests { + use {super::*, serde_json}; + + #[test] + fn test_accounts_selector_from_config() { + let config = "{\"accounts_selector\" : { \ + \"owners\" : [\"9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin\"] \ + }}"; + + let config: serde_json::Value = serde_json::from_str(config).unwrap(); + AccountsDbPluginPostgres::create_accounts_selector_from_config(&config); + } +} diff --git a/accountsdb-plugin-postgres/src/lib.rs b/accountsdb-plugin-postgres/src/lib.rs new file mode 100644 index 0000000000..ccc4f17902 --- /dev/null +++ b/accountsdb-plugin-postgres/src/lib.rs @@ -0,0 +1,2 @@ +pub mod accounts_selector; +pub mod accountsdb_plugin_postgres; diff --git a/core/Cargo.toml b/core/Cargo.toml index 6df2a82224..461f1dceb2 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -45,6 +45,7 @@ serde = "1.0.122" serde_bytes = "0.11" serde_derive = "1.0.103" solana-account-decoder = { path = "../account-decoder", version = "=1.8.1" } +solana-accountsdb-plugin-manager = { path = "../accountsdb-plugin-manager", version = "=1.8.1" } solana-banks-server = { path = "../banks-server", version = "=1.8.1" } solana-clap-utils = { path = "../clap-utils", version = "=1.8.1" } solana-client = { path = "../client", version = "=1.8.1" } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 8e3ce33840..6ce17523d7 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -495,7 +495,7 @@ mod tests { ..ProcessOptions::default() }; let (bank_forks, cached_leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); let leader_schedule_cache = Arc::new(cached_leader_schedule); let bank_forks = Arc::new(RwLock::new(bank_forks)); diff --git a/core/src/validator.rs b/core/src/validator.rs index ccc74a9d7f..5f818f0a87 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -19,6 +19,7 @@ use { }, crossbeam_channel::{bounded, unbounded}, rand::{thread_rng, Rng}, + solana_accountsdb_plugin_manager::accountsdb_plugin_service::AccountsDbPluginService, solana_gossip::{ cluster_info::{ ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, @@ -58,6 +59,7 @@ use { solana_runtime::{ accounts_db::AccountShrinkThreshold, accounts_index::AccountSecondaryIndexes, + accounts_update_notifier_interface::AccountsUpdateNotifier, bank::Bank, bank_forks::{BankForks, SnapshotConfig}, commitment::BlockCommitmentCache, @@ -104,6 +106,7 @@ pub struct ValidatorConfig { pub account_paths: Vec, pub account_shrink_paths: Option>, pub rpc_config: JsonRpcConfig, + pub accountsdb_plugin_config_file: Option, pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub) pub pubsub_config: PubSubConfig, pub snapshot_config: Option, @@ -163,6 +166,7 @@ impl Default for ValidatorConfig { account_paths: Vec::new(), account_shrink_paths: None, rpc_config: JsonRpcConfig::default(), + accountsdb_plugin_config_file: None, rpc_addrs: None, pubsub_config: PubSubConfig::default(), snapshot_config: None, @@ -267,6 +271,7 @@ pub struct Validator { tpu: Tpu, tvu: Tvu, ip_echo_server: Option, + accountsdb_plugin_service: Option, } // in the distant future, get rid of ::new()/exit() and use Result properly... @@ -303,6 +308,27 @@ impl Validator { warn!("identity: {}", id); warn!("vote account: {}", vote_account); + let mut bank_notification_senders = Vec::new(); + + let accountsdb_plugin_service = + if let Some(accountsdb_plugin_config_file) = &config.accountsdb_plugin_config_file { + let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded(); + bank_notification_senders.push(confirmed_bank_sender); + let result = AccountsDbPluginService::new( + confirmed_bank_receiver, + accountsdb_plugin_config_file, + ); + match result { + Ok(accountsdb_plugin_service) => Some(accountsdb_plugin_service), + Err(err) => { + error!("Failed to load the AccountsDb plugin: {:?}", err); + abort(); + } + } + } else { + None + }; + if config.voting_disabled { warn!("voting disabled"); authorized_voter_keypairs.write().unwrap().clear(); @@ -396,6 +422,9 @@ impl Validator { config.enforce_ulimit_nofile, &start_progress, config.no_poh_speed_test, + accountsdb_plugin_service + .as_ref() + .map(|plugin_service| plugin_service.get_accounts_update_notifier()), ); *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices; @@ -533,6 +562,11 @@ impl Validator { )); } let (bank_notification_sender, bank_notification_receiver) = unbounded(); + let confirmed_bank_subscribers = if !bank_notification_senders.is_empty() { + Some(Arc::new(RwLock::new(bank_notification_senders))) + } else { + None + }; ( Some(JsonRpcService::new( rpc_addr, @@ -577,6 +611,7 @@ impl Validator { bank_forks.clone(), optimistically_confirmed_bank, rpc_subscriptions.clone(), + confirmed_bank_subscribers, )), Some(bank_notification_sender), ) @@ -814,6 +849,7 @@ impl Validator { poh_recorder, ip_echo_server, validator_exit: config.validator_exit.clone(), + accountsdb_plugin_service, } } @@ -916,6 +952,12 @@ impl Validator { if let Some(ip_echo_server) = self.ip_echo_server { ip_echo_server.shutdown_background(); } + + if let Some(accountsdb_plugin_service) = self.accountsdb_plugin_service { + accountsdb_plugin_service + .join() + .expect("accountsdb_plugin_service"); + } } } @@ -1049,6 +1091,7 @@ fn post_process_restored_tower( } #[allow(clippy::type_complexity)] +#[allow(clippy::too_many_arguments)] fn new_banks_from_ledger( validator_identity: &Pubkey, vote_account: &Pubkey, @@ -1059,6 +1102,7 @@ fn new_banks_from_ledger( enforce_ulimit_nofile: bool, start_progress: &Arc>, no_poh_speed_test: bool, + accounts_update_notifier: Option, ) -> ( GenesisConfig, BankForks, @@ -1175,6 +1219,7 @@ fn new_banks_from_ledger( transaction_history_services .cache_block_meta_sender .as_ref(), + accounts_update_notifier, ) .unwrap_or_else(|err| { error!("Failed to load ledger: {:?}", err); diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index bc0a52bb6b..0ad024199f 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -109,6 +109,7 @@ mod tests { false, accounts_db::AccountShrinkThreshold::default(), false, + None, ); bank0.freeze(); let mut bank_forks = BankForks::new(bank0); @@ -172,6 +173,7 @@ mod tests { accounts_db::AccountShrinkThreshold::default(), check_hash_calculation, false, + None, ) .unwrap(); diff --git a/docs/src/developing/backwards-compatibility.md b/docs/src/developing/backwards-compatibility.md index 9f26ad03f8..a6db1f0946 100644 --- a/docs/src/developing/backwards-compatibility.md +++ b/docs/src/developing/backwards-compatibility.md @@ -76,6 +76,7 @@ Major releases: - [`solana-program`](https://docs.rs/solana-program/) - Rust SDK for writing programs - [`solana-client`](https://docs.rs/solana-client/) - Rust client for connecting to RPC API - [`solana-cli-config`](https://docs.rs/solana-cli-config/) - Rust client for managing Solana CLI config files +- [`solana-accountsdb-plugin-interface`](https://docs.rs/solana-accountsdb-plugin-interface/) - Rust interface for developing Solana AccountsDb plugins. Patch releases: diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index a269afcc89..b16f3bf78c 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -727,6 +727,7 @@ fn load_bank_forks( process_options, None, None, + None, ) } diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index d654f499b7..39cd2b0a6d 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -9,6 +9,7 @@ use crate::{ }; use log::*; use solana_runtime::{ + accounts_update_notifier_interface::AccountsUpdateNotifier, bank_forks::{ArchiveFormat, BankForks, SnapshotConfig}, snapshot_utils, }; @@ -33,6 +34,7 @@ fn to_loadresult( /// /// If a snapshot config is given, and a snapshot is found, it will be loaded. Otherwise, load /// from genesis. +#[allow(clippy::too_many_arguments)] pub fn load( genesis_config: &GenesisConfig, blockstore: &Blockstore, @@ -42,6 +44,7 @@ pub fn load( process_options: ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + accounts_update_notifier: Option, ) -> LoadResult { if let Some(snapshot_config) = snapshot_config.as_ref() { info!( @@ -70,6 +73,7 @@ pub fn load( archive_slot, archive_hash, archive_format, + accounts_update_notifier, ); } else { info!("No snapshot package available; will load from genesis"); @@ -84,6 +88,7 @@ pub fn load( account_paths, process_options, cache_block_meta_sender, + accounts_update_notifier, ) } @@ -93,6 +98,7 @@ fn load_from_genesis( account_paths: Vec, process_options: ProcessOptions, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + accounts_update_notifier: Option, ) -> LoadResult { info!("Processing ledger from genesis"); to_loadresult( @@ -102,6 +108,7 @@ fn load_from_genesis( account_paths, process_options, cache_block_meta_sender, + accounts_update_notifier, ), None, ) @@ -121,6 +128,7 @@ fn load_from_snapshot( archive_slot: Slot, archive_hash: Hash, archive_format: ArchiveFormat, + accounts_update_notifier: Option, ) -> LoadResult { info!("Loading snapshot package: {:?}", archive_filename); @@ -145,6 +153,7 @@ fn load_from_snapshot( process_options.shrink_ratio, process_options.accounts_db_test_hash_calculation, process_options.accounts_db_skip_shrink, + accounts_update_notifier, ) .expect("Load from snapshot failed"); if let Some(shrink_paths) = shrink_paths { diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index daf4d18306..a01a68931d 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -19,6 +19,7 @@ use solana_rayon_threadlimit::get_thread_count; use solana_runtime::{ accounts_db::AccountShrinkThreshold, accounts_index::AccountSecondaryIndexes, + accounts_update_notifier_interface::AccountsUpdateNotifier, bank::{ Bank, ExecuteTimings, InnerInstructionsList, RentDebits, TransactionBalancesSet, TransactionExecutionResult, TransactionLogMessages, TransactionResults, @@ -464,6 +465,7 @@ pub fn process_blockstore( account_paths: Vec, opts: ProcessOptions, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + accounts_update_notifier: Option, ) -> BlockstoreProcessorResult { if let Some(num_threads) = opts.override_num_threads { PAR_THREAD_POOL.with(|pool| { @@ -485,6 +487,7 @@ pub fn process_blockstore( opts.accounts_db_caching_enabled, opts.shrink_ratio, false, + accounts_update_notifier, ); let bank0 = Arc::new(bank0); info!("processing ledger for slot 0..."); @@ -1456,6 +1459,7 @@ pub mod tests { ..ProcessOptions::default() }, None, + None, ) .unwrap(); assert_eq!(frozen_bank_slots(&bank_forks), vec![0]); @@ -1501,6 +1505,7 @@ pub mod tests { ..ProcessOptions::default() }, None, + None, ) .unwrap(); assert_eq!(frozen_bank_slots(&bank_forks), vec![0]); @@ -1518,6 +1523,7 @@ pub mod tests { ..ProcessOptions::default() }, None, + None, ) .unwrap(); @@ -1574,7 +1580,7 @@ pub mod tests { ..ProcessOptions::default() }; let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); assert_eq!(frozen_bank_slots(&bank_forks), vec![0]); } @@ -1640,7 +1646,7 @@ pub mod tests { ..ProcessOptions::default() }; let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); assert_eq!(frozen_bank_slots(&bank_forks), vec![0]); // slot 1 isn't "full", we stop at slot zero @@ -1660,7 +1666,7 @@ pub mod tests { fill_blockstore_slot_with_ticks(&blockstore, ticks_per_slot, 3, 0, blockhash); // Slot 0 should not show up in the ending bank_forks_info let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); // slot 1 isn't "full", we stop at slot zero assert_eq!(frozen_bank_slots(&bank_forks), vec![0, 3]); @@ -1728,7 +1734,7 @@ pub mod tests { ..ProcessOptions::default() }; let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); // One fork, other one is ignored b/c not a descendant of the root assert_eq!(frozen_bank_slots(&bank_forks), vec![4]); @@ -1808,7 +1814,7 @@ pub mod tests { ..ProcessOptions::default() }; let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); assert_eq!(frozen_bank_slots(&bank_forks), vec![1, 2, 3, 4]); assert_eq!(bank_forks.working_bank().slot(), 4); @@ -1869,6 +1875,7 @@ pub mod tests { Vec::new(), ProcessOptions::default(), None, + None, ) .unwrap(); @@ -1919,6 +1926,7 @@ pub mod tests { Vec::new(), ProcessOptions::default(), None, + None, ) .unwrap(); @@ -1972,6 +1980,7 @@ pub mod tests { Vec::new(), ProcessOptions::default(), None, + None, ) .unwrap(); @@ -2025,7 +2034,7 @@ pub mod tests { ..ProcessOptions::default() }; let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); // There is one fork, head is last_slot + 1 assert_eq!(frozen_bank_slots(&bank_forks), vec![last_slot + 1]); @@ -2170,7 +2179,7 @@ pub mod tests { ..ProcessOptions::default() }; let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); assert_eq!(frozen_bank_slots(&bank_forks), vec![0, 1]); assert_eq!(bank_forks.root(), 0); @@ -2200,7 +2209,7 @@ pub mod tests { ..ProcessOptions::default() }; let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); assert_eq!(frozen_bank_slots(&bank_forks), vec![0]); let bank = bank_forks[0].clone(); @@ -2218,7 +2227,7 @@ pub mod tests { accounts_db_test_hash_calculation: true, ..ProcessOptions::default() }; - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); PAR_THREAD_POOL.with(|pool| { assert_eq!(pool.borrow().current_num_threads(), 1); }); @@ -2236,7 +2245,7 @@ pub mod tests { ..ProcessOptions::default() }; let (_bank_forks, leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); assert_eq!(leader_schedule.max_schedules(), std::usize::MAX); } @@ -2297,7 +2306,7 @@ pub mod tests { accounts_db_test_hash_calculation: true, ..ProcessOptions::default() }; - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); assert_eq!(*callback_counter.write().unwrap(), 2); } @@ -2952,7 +2961,7 @@ pub mod tests { ..ProcessOptions::default() }; let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); // Should be able to fetch slot 0 because we specified halting at slot 0, even // if there is a greater root at slot 1. @@ -3202,6 +3211,7 @@ pub mod tests { false, AccountShrinkThreshold::default(), false, + None, ); *bank.epoch_schedule() } @@ -3459,9 +3469,15 @@ pub mod tests { accounts_db_test_hash_calculation: true, ..ProcessOptions::default() }; - let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts.clone(), None) - .unwrap(); + let (bank_forks, _leader_schedule) = process_blockstore( + &genesis_config, + &blockstore, + Vec::new(), + opts.clone(), + None, + None, + ) + .unwrap(); // prepare to add votes let last_vote_bank_hash = bank_forks.get(last_main_fork_slot - 1).unwrap().hash(); @@ -3492,9 +3508,15 @@ pub mod tests { &leader_keypair, ); - let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts.clone(), None) - .unwrap(); + let (bank_forks, _leader_schedule) = process_blockstore( + &genesis_config, + &blockstore, + Vec::new(), + opts.clone(), + None, + None, + ) + .unwrap(); assert_eq!(bank_forks.root(), expected_root_slot); assert_eq!( @@ -3549,7 +3571,7 @@ pub mod tests { ); let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); assert_eq!(bank_forks.root(), really_expected_root_slot); } diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 0348e5d960..0fcc985e2a 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -12,6 +12,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { account_paths: config.account_paths.clone(), account_shrink_paths: config.account_shrink_paths.clone(), rpc_config: config.rpc_config.clone(), + accountsdb_plugin_config_file: config.accountsdb_plugin_config_file.clone(), rpc_addrs: config.rpc_addrs, pubsub_config: config.pubsub_config.clone(), snapshot_config: config.snapshot_config.clone(), diff --git a/replica-node/src/replica_node.rs b/replica-node/src/replica_node.rs index a7a9d2d1a9..720692f0d7 100644 --- a/replica-node/src/replica_node.rs +++ b/replica-node/src/replica_node.rs @@ -133,6 +133,7 @@ fn initialize_from_snapshot( false, process_options.verify_index, process_options.accounts_db_config, + None, ) .unwrap(); diff --git a/rpc/src/optimistically_confirmed_bank_tracker.rs b/rpc/src/optimistically_confirmed_bank_tracker.rs index c7eab7dc00..5261e2298d 100644 --- a/rpc/src/optimistically_confirmed_bank_tracker.rs +++ b/rpc/src/optimistically_confirmed_bank_tracker.rs @@ -31,6 +31,7 @@ impl OptimisticallyConfirmedBank { } } +#[derive(Clone)] pub enum BankNotification { OptimisticallyConfirmed(Slot), Frozen(Arc), @@ -63,6 +64,7 @@ impl OptimisticallyConfirmedBankTracker { bank_forks: Arc>, optimistically_confirmed_bank: Arc>, subscriptions: Arc, + bank_notification_subscribers: Option>>>, ) -> Self { let exit_ = exit.clone(); let mut pending_optimistically_confirmed_banks = HashSet::new(); @@ -83,6 +85,7 @@ impl OptimisticallyConfirmedBankTracker { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &bank_notification_subscribers, ) { break; } @@ -99,6 +102,7 @@ impl OptimisticallyConfirmedBankTracker { mut pending_optimistically_confirmed_banks: &mut HashSet, mut last_notified_confirmed_slot: &mut Slot, mut highest_confirmed_slot: &mut Slot, + bank_notification_subscribers: &Option>>>, ) -> Result<(), RecvTimeoutError> { let notification = receiver.recv_timeout(Duration::from_secs(1))?; Self::process_notification( @@ -109,16 +113,37 @@ impl OptimisticallyConfirmedBankTracker { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + bank_notification_subscribers, ); Ok(()) } + fn notify_slot_status( + bank_notification_subscribers: &Option>>>, + notifcation: BankNotification, + ) { + if let Some(bank_notification_subscribers) = bank_notification_subscribers { + for sender in bank_notification_subscribers.read().unwrap().iter() { + match sender.send(notifcation.clone()) { + Ok(_) => {} + Err(err) => { + info!( + "Failed to send notification {:?}, error: {:?}", + notifcation, err + ); + } + } + } + } + } + fn notify_or_defer( subscriptions: &Arc, bank_forks: &Arc>, bank: &Arc, last_notified_confirmed_slot: &mut Slot, pending_optimistically_confirmed_banks: &mut HashSet, + bank_notification_subscribers: &Option>>>, ) { if bank.is_frozen() { if bank.slot() > *last_notified_confirmed_slot { @@ -128,6 +153,10 @@ impl OptimisticallyConfirmedBankTracker { ); subscriptions.notify_gossip_subscribers(bank.slot()); *last_notified_confirmed_slot = bank.slot(); + Self::notify_slot_status( + bank_notification_subscribers, + BankNotification::OptimisticallyConfirmed(bank.slot()), + ); } } else if bank.slot() > bank_forks.read().unwrap().root_bank().slot() { pending_optimistically_confirmed_banks.insert(bank.slot()); @@ -142,6 +171,7 @@ impl OptimisticallyConfirmedBankTracker { slot_threshold: Slot, mut last_notified_confirmed_slot: &mut Slot, mut pending_optimistically_confirmed_banks: &mut HashSet, + bank_notification_subscribers: &Option>>>, ) { for confirmed_bank in bank.clone().parents_inclusive().iter().rev() { if confirmed_bank.slot() > slot_threshold { @@ -155,6 +185,7 @@ impl OptimisticallyConfirmedBankTracker { confirmed_bank, &mut last_notified_confirmed_slot, &mut pending_optimistically_confirmed_banks, + bank_notification_subscribers, ); } } @@ -168,6 +199,7 @@ impl OptimisticallyConfirmedBankTracker { mut pending_optimistically_confirmed_banks: &mut HashSet, mut last_notified_confirmed_slot: &mut Slot, highest_confirmed_slot: &mut Slot, + bank_notification_subscribers: &Option>>>, ) { debug!("received bank notification: {:?}", notification); match notification { @@ -189,6 +221,7 @@ impl OptimisticallyConfirmedBankTracker { *highest_confirmed_slot, &mut last_notified_confirmed_slot, &mut pending_optimistically_confirmed_banks, + bank_notification_subscribers, ); *highest_confirmed_slot = slot; @@ -222,6 +255,11 @@ impl OptimisticallyConfirmedBankTracker { max_transactions_per_entry: bank.transactions_per_entry_max(), }, }); + + Self::notify_slot_status( + bank_notification_subscribers, + BankNotification::Frozen(bank.clone()), + ); } if pending_optimistically_confirmed_banks.remove(&bank.slot()) { @@ -237,6 +275,7 @@ impl OptimisticallyConfirmedBankTracker { *last_notified_confirmed_slot, &mut last_notified_confirmed_slot, &mut pending_optimistically_confirmed_banks, + bank_notification_subscribers, ); let mut w_optimistically_confirmed_bank = @@ -248,6 +287,10 @@ impl OptimisticallyConfirmedBankTracker { } } BankNotification::Root(bank) => { + Self::notify_slot_status( + bank_notification_subscribers, + BankNotification::Root(bank.clone()), + ); let root_slot = bank.slot(); let mut w_optimistically_confirmed_bank = optimistically_confirmed_bank.write().unwrap(); @@ -320,6 +363,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); assert_eq!(highest_confirmed_slot, 2); @@ -333,6 +377,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); assert_eq!(highest_confirmed_slot, 2); @@ -346,6 +391,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); assert_eq!(pending_optimistically_confirmed_banks.len(), 1); @@ -364,6 +410,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3); assert_eq!(highest_confirmed_slot, 3); @@ -381,6 +428,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3); assert_eq!(pending_optimistically_confirmed_banks.len(), 1); @@ -399,6 +447,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5); assert_eq!(pending_optimistically_confirmed_banks.len(), 0); @@ -424,6 +473,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5); assert_eq!(pending_optimistically_confirmed_banks.len(), 0); diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 9c0e20d333..da201a43c2 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -7515,6 +7515,7 @@ pub mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; @@ -7532,6 +7533,7 @@ pub mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; @@ -7549,6 +7551,7 @@ pub mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; @@ -7567,6 +7570,7 @@ pub mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 7adee218b2..cd80af723e 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -1290,6 +1290,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); // a closure to reduce code duplications in building expected responses: @@ -1339,6 +1340,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let response = receiver.recv(); @@ -1454,6 +1456,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); // The following should panic @@ -1565,6 +1568,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); // a closure to reduce code duplications in building expected responses: @@ -1616,6 +1620,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let response = receiver.recv(); @@ -2031,6 +2036,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); // Now, notify the frozen bank and ensure its notifications are processed @@ -2043,6 +2049,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let response = receiver0.recv(); @@ -2091,6 +2098,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let response = receiver1.recv(); let expected = json!({ diff --git a/runtime/benches/accounts.rs b/runtime/benches/accounts.rs index 364707ddde..e94b595a5d 100644 --- a/runtime/benches/accounts.rs +++ b/runtime/benches/accounts.rs @@ -62,6 +62,7 @@ fn test_accounts_create(bencher: &mut Bencher) { false, AccountShrinkThreshold::default(), false, + None, ); bencher.iter(|| { let mut pubkeys: Vec = vec![]; @@ -83,6 +84,7 @@ fn test_accounts_squash(bencher: &mut Bencher) { false, AccountShrinkThreshold::default(), false, + None, )); let mut pubkeys: Vec = vec![]; deposit_many(&prev_bank, &mut pubkeys, 250_000).unwrap(); @@ -109,6 +111,7 @@ fn test_accounts_hash_bank_hash(bencher: &mut Bencher) { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let mut pubkeys: Vec = vec![]; let num_accounts = 60_000; @@ -136,6 +139,7 @@ fn test_update_accounts_hash(bencher: &mut Bencher) { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let mut pubkeys: Vec = vec![]; create_test_accounts(&accounts, &mut pubkeys, 50_000, 0); @@ -154,6 +158,7 @@ fn test_accounts_delta_hash(bencher: &mut Bencher) { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let mut pubkeys: Vec = vec![]; create_test_accounts(&accounts, &mut pubkeys, 100_000, 0); @@ -171,6 +176,7 @@ fn bench_delete_dependencies(bencher: &mut Bencher) { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let mut old_pubkey = Pubkey::default(); let zero_account = AccountSharedData::new(0, 0, AccountSharedData::default().owner()); @@ -205,6 +211,7 @@ fn store_accounts_with_possible_contention( AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, )); let num_keys = 1000; let slot = 0; @@ -341,6 +348,7 @@ fn setup_bench_dashmap_iter() -> (Arc, DashMap, ) -> Self { Self { accounts_db: Arc::new(AccountsDb::new_with_config( @@ -145,6 +148,7 @@ impl Accounts { account_indexes, caching_enabled, shrink_ratio, + accounts_update_notifier, )), account_locks: Mutex::new(AccountLocks::default()), } @@ -1185,6 +1189,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); for ka in ka.iter() { accounts.store_slow_uncached(0, &ka.0, &ka.1); @@ -1723,6 +1728,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); // Load accounts owned by various programs into AccountsDb @@ -1993,6 +1999,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let mut error_counters = ErrorCounters::default(); let ancestors = vec![(0, 0)].into_iter().collect(); @@ -2017,6 +2024,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); accounts.bank_hash_at(1); } @@ -2039,6 +2047,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); accounts.store_slow_uncached(0, &keypair0.pubkey(), &account0); accounts.store_slow_uncached(0, &keypair1.pubkey(), &account1); @@ -2151,6 +2160,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); accounts.store_slow_uncached(0, &keypair0.pubkey(), &account0); accounts.store_slow_uncached(0, &keypair1.pubkey(), &account1); @@ -2239,6 +2249,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); accounts.store_slow_uncached(0, &keypair0.pubkey(), &account0); accounts.store_slow_uncached(0, &keypair1.pubkey(), &account1); @@ -2361,6 +2372,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); { accounts @@ -2417,6 +2429,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let mut old_pubkey = Pubkey::default(); let zero_account = AccountSharedData::new(0, 0, AccountSharedData::default().owner()); @@ -2465,6 +2478,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let instructions_key = solana_sdk::sysvar::instructions::id(); @@ -2755,6 +2769,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let txs = &[tx]; let collected_accounts = accounts.collect_accounts_to_store( @@ -2874,6 +2889,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let txs = &[tx]; let collected_accounts = accounts.collect_accounts_to_store( @@ -2912,6 +2928,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let pubkey0 = Pubkey::new_unique(); diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 51f311d2a1..1446db4f91 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -26,6 +26,7 @@ use crate::{ AccountIndexGetResult, AccountSecondaryIndexes, AccountsIndex, AccountsIndexRootsStats, IndexKey, IsCached, ScanResult, SlotList, SlotSlice, ZeroLamport, }, + accounts_update_notifier_interface::AccountsUpdateNotifier, ancestors::Ancestors, append_vec::{AppendVec, StoredAccountMeta, StoredMeta, StoredMetaWriteVersion}, contains::Contains, @@ -954,6 +955,7 @@ pub struct AccountsDb { /// such that potentially a 0-lamport account update could be present which /// means we can remove the account from the index entirely. dirty_stores: DashMap<(Slot, AppendVecId), Arc>, + accounts_update_notifier: Option, } #[derive(Debug, Default)] @@ -1375,6 +1377,7 @@ impl Default for AccountsDb { remove_unrooted_slots_synchronization: RemoveUnrootedSlotsSynchronization::default(), shrink_ratio: AccountShrinkThreshold::default(), dirty_stores: DashMap::default(), + accounts_update_notifier: None, } } } @@ -1390,6 +1393,7 @@ impl AccountsDb { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ) } @@ -1399,6 +1403,7 @@ impl AccountsDb { account_indexes: AccountSecondaryIndexes, caching_enabled: bool, shrink_ratio: AccountShrinkThreshold, + accounts_update_notifier: Option, ) -> Self { let mut new = if !paths.is_empty() { Self { @@ -1408,6 +1413,7 @@ impl AccountsDb { account_indexes, caching_enabled, shrink_ratio, + accounts_update_notifier, ..Self::default() } } else { @@ -5462,6 +5468,16 @@ impl AccountsDb { pub fn store_cached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) { self.store(slot, accounts, self.caching_enabled); + + if let Some(accounts_update_notifier) = &self.accounts_update_notifier { + let notifier = &accounts_update_notifier.read().unwrap(); + + for account in accounts { + let pubkey = account.0; + let account = account.1; + notifier.notify_account_update(slot, pubkey, account); + } + } } /// Store the account update. @@ -6080,6 +6096,24 @@ impl AccountsDb { } } } + + pub fn notify_account_restore_from_snapshot(&self) { + if let Some(accounts_update_notifier) = &self.accounts_update_notifier { + let notifier = &accounts_update_notifier.read().unwrap(); + let slots = self.storage.all_slots(); + for slot in &slots { + let slot_stores = self.storage.get_slot_stores(*slot).unwrap(); + + let slot_stores = slot_stores.read().unwrap(); + for (_, storage_entry) in slot_stores.iter() { + let accounts = storage_entry.all_accounts(); + for account in &accounts { + notifier.notify_account_restore_from_snapshot(*slot, account); + } + } + } + } + } } #[cfg(test)] @@ -7753,6 +7787,7 @@ pub mod tests { spl_token_mint_index_enabled(), false, AccountShrinkThreshold::default(), + None, ); let pubkey1 = solana_sdk::pubkey::new_rand(); let pubkey2 = solana_sdk::pubkey::new_rand(); @@ -9901,6 +9936,7 @@ pub mod tests { AccountSecondaryIndexes::default(), true, AccountShrinkThreshold::default(), + None, ); let account = AccountSharedData::new(1, 16 * 4096, &Pubkey::default()); @@ -10212,6 +10248,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, )); let account_key = Pubkey::new_unique(); @@ -10260,6 +10297,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, )); let account_key = Pubkey::new_unique(); @@ -10309,6 +10347,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, )); let zero_lamport_account_key = Pubkey::new_unique(); @@ -10444,6 +10483,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, )); let account_key = Pubkey::new_unique(); let account_key2 = Pubkey::new_unique(); @@ -10551,6 +10591,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, ); let slot: Slot = 0; let num_keys = 10; @@ -10606,6 +10647,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, )); let slots: Vec<_> = (0..num_slots as Slot).into_iter().collect(); let stall_slot = num_slots as Slot; @@ -11011,6 +11053,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, ); let account_key1 = Pubkey::new_unique(); let account_key2 = Pubkey::new_unique(); @@ -11268,6 +11311,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, ); db.load_delay = RACY_SLEEP_MS; let db = Arc::new(db); @@ -11340,6 +11384,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, ); db.load_delay = RACY_SLEEP_MS; let db = Arc::new(db); @@ -11416,6 +11461,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, ); let db = Arc::new(db); let num_cached_slots = 100; diff --git a/runtime/src/accounts_update_notifier_interface.rs b/runtime/src/accounts_update_notifier_interface.rs new file mode 100644 index 0000000000..70730ef01c --- /dev/null +++ b/runtime/src/accounts_update_notifier_interface.rs @@ -0,0 +1,25 @@ +use { + crate::append_vec::StoredAccountMeta, + solana_sdk::{account::AccountSharedData, clock::Slot, pubkey::Pubkey}, + std::sync::{Arc, RwLock}, +}; + +pub trait AccountsUpdateNotifierInterface: std::fmt::Debug { + /// Notified when an account is updated at runtime, due to transaction activities + fn notify_account_update(&self, slot: Slot, pubkey: &Pubkey, account: &AccountSharedData); + + /// Notified when the AccountsDb is initialized at start when restored + /// from a snapshot. + fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta); + + /// Notified when a slot is optimistically confirmed + fn notify_slot_confirmed(&self, slot: Slot, parent: Option); + + /// Notified when a slot is marked frozen. + fn notify_slot_processed(&self, slot: Slot, parent: Option); + + /// Notified when a slot is rooted. + fn notify_slot_rooted(&self, slot: Slot, parent: Option); +} + +pub type AccountsUpdateNotifier = Arc>; diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index d735570f09..a0884245aa 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -40,6 +40,7 @@ use crate::{ }, accounts_db::{AccountShrinkThreshold, ErrorCounters, SnapshotStorages}, accounts_index::{AccountSecondaryIndexes, IndexKey, ScanResult}, + accounts_update_notifier_interface::AccountsUpdateNotifier, ancestors::{Ancestors, AncestorsForSerialization}, blockhash_queue::BlockhashQueue, builtins::{self, ActivationType}, @@ -1069,6 +1070,7 @@ impl Bank { false, AccountShrinkThreshold::default(), false, + None, ) } @@ -1083,6 +1085,7 @@ impl Bank { false, AccountShrinkThreshold::default(), false, + None, ); bank.ns_per_slot = std::u128::MAX; @@ -1106,9 +1109,11 @@ impl Bank { accounts_db_caching_enabled, shrink_ratio, false, + None, ) } + #[allow(clippy::too_many_arguments)] pub fn new_with_paths( genesis_config: &GenesisConfig, paths: Vec, @@ -1119,6 +1124,7 @@ impl Bank { accounts_db_caching_enabled: bool, shrink_ratio: AccountShrinkThreshold, debug_do_not_add_builtins: bool, + accounts_update_notifier: Option, ) -> Self { let mut bank = Self::default(); bank.ancestors = Ancestors::from(vec![bank.slot()]); @@ -1131,6 +1137,7 @@ impl Bank { account_indexes, accounts_db_caching_enabled, shrink_ratio, + accounts_update_notifier, )); bank.process_genesis_config(genesis_config); bank.finish_init( @@ -12542,6 +12549,7 @@ pub(crate) mod tests { false, AccountShrinkThreshold::default(), false, + None, )); // move to next epoch to create now deprecated rewards sysvar intentionally let bank1 = Arc::new(Bank::new_from_parent( diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 0d1570b38f..8a1844467a 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -6,6 +6,7 @@ pub mod accounts_cache; pub mod accounts_db; pub mod accounts_hash; pub mod accounts_index; +pub mod accounts_update_notifier_interface; pub mod ancestors; pub mod append_vec; pub mod bank; diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index 999bbe3840..7954bddcdd 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -5,6 +5,7 @@ use { AccountShrinkThreshold, AccountStorageEntry, AccountsDb, AppendVecId, BankHashInfo, }, accounts_index::AccountSecondaryIndexes, + accounts_update_notifier_interface::AccountsUpdateNotifier, ancestors::Ancestors, append_vec::{AppendVec, StoredMetaWriteVersion}, bank::{Bank, BankFieldsToDeserialize, BankRc, Builtins}, @@ -21,6 +22,7 @@ use { log::*, rayon::prelude::*, serde::{de::DeserializeOwned, Deserialize, Serialize}, + solana_measure::measure::Measure, solana_sdk::{ clock::{Epoch, Slot, UnixTimestamp}, epoch_schedule::EpochSchedule, @@ -140,6 +142,7 @@ pub(crate) fn bank_from_stream( caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, + accounts_update_notifier: Option, ) -> std::result::Result where R: Read, @@ -161,6 +164,7 @@ where caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, + accounts_update_notifier, )?; Ok(bank) }}; @@ -252,6 +256,7 @@ fn reconstruct_bank_from_fields( caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, + accounts_update_notifier: Option, ) -> Result where E: SerializableStorage + std::marker::Sync, @@ -265,6 +270,7 @@ where caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, + accounts_update_notifier, )?; accounts_db.freeze_accounts( &Ancestors::from(&bank_fields.ancestors), @@ -314,6 +320,7 @@ fn reconstruct_accountsdb_from_fields( caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, + accounts_update_notifier: Option, ) -> Result where E: SerializableStorage + std::marker::Sync, @@ -324,6 +331,7 @@ where account_indexes, caching_enabled, shrink_ratio, + accounts_update_notifier, ); let AccountsDbFields(storage, version, slot, bank_hash_info) = accounts_db_fields; @@ -388,6 +396,15 @@ where ); } + let mut measure_notify = Measure::start("accounts_notify"); + accounts_db.notify_account_restore_from_snapshot(); + measure_notify.stop(); + + datapoint_info!( + "reconstruct_accountsdb_from_fields()", + ("accountsdb-notify-at-start-us", measure_notify.as_us(), i64), + ); + if max_id > AppendVecId::MAX / 2 { panic!("Storage id {} larger than allowed max", max_id); } diff --git a/runtime/src/serde_snapshot/tests.rs b/runtime/src/serde_snapshot/tests.rs index c7bf8a9bda..8767701896 100644 --- a/runtime/src/serde_snapshot/tests.rs +++ b/runtime/src/serde_snapshot/tests.rs @@ -75,6 +75,7 @@ where false, None, AccountShrinkThreshold::default(), + None, ) } @@ -131,6 +132,7 @@ fn test_accounts_serialize_style(serde_style: SerdeStyle) { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let mut pubkeys: Vec = vec![]; @@ -231,6 +233,7 @@ fn test_bank_serialize_style(serde_style: SerdeStyle) { false, None, AccountShrinkThreshold::default(), + None, ) .unwrap(); dbank.src = ref_sc; diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index af468a8b4e..1e54a08e75 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -2,6 +2,7 @@ use { crate::{ accounts_db::{AccountShrinkThreshold, AccountsDb}, accounts_index::AccountSecondaryIndexes, + accounts_update_notifier_interface::AccountsUpdateNotifier, bank::{Bank, BankSlotDelta, Builtins}, bank_forks::ArchiveFormat, hardened_unpack::{unpack_snapshot, ParallelSelector, UnpackError, UnpackedAppendVecMap}, @@ -633,6 +634,7 @@ pub fn bank_from_archive + std::marker::Sync>( shrink_ratio: AccountShrinkThreshold, test_hash_calculation: bool, accounts_db_skip_shrink: bool, + accounts_update_notifier: Option, ) -> Result<(Bank, BankFromArchiveTimings)> { let unpack_dir = tempfile::Builder::new() .prefix(TMP_SNAPSHOT_PREFIX) @@ -673,6 +675,7 @@ pub fn bank_from_archive + std::marker::Sync>( accounts_db_caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, + accounts_update_notifier, )?; measure.stop(); @@ -900,6 +903,7 @@ fn rebuild_bank_from_snapshots( accounts_db_caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, + accounts_update_notifier: Option, ) -> Result { let (snapshot_version_enum, root_paths) = verify_snapshot_version_and_folder(snapshot_version, unpacked_snapshots_dir)?; @@ -922,6 +926,7 @@ fn rebuild_bank_from_snapshots( accounts_db_caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, + accounts_update_notifier, ), }?) })?; diff --git a/upload-perf/src/upload-perf.rs b/upload-perf/src/upload-perf.rs index d983596884..8018564827 100644 --- a/upload-perf/src/upload-perf.rs +++ b/upload-perf/src/upload-perf.rs @@ -64,17 +64,14 @@ fn main() { let deviation: i64 = v["deviation"].to_string().parse().unwrap(); if upload_metrics { panic!("TODO..."); - /* - solana_metrics::datapoint_info!( - &v["name"].as_str().unwrap().trim_matches('\"'), - ("test", "bench", String), - ("branch", branch.to_string(), String), - ("median", median, i64), - ("deviation", deviation, i64), - ("commit", git_commit_hash.trim().to_string(), String) - ); - */ - + // solana_metrics::datapoint_info!( + // &v["name"].as_str().unwrap().trim_matches('\"'), + // ("test", "bench", String), + // ("branch", branch.to_string(), String), + // ("median", median, i64), + // ("deviation", deviation, i64), + // ("commit", git_commit_hash.trim().to_string(), String) + // ); } let last_median = get_last_metrics(&"median".to_string(), &db, &name, branch).unwrap_or_default(); diff --git a/validator/src/main.rs b/validator/src/main.rs index dbd0eb007e..cba2a05188 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1711,6 +1711,14 @@ pub fn main() { .requires("enable_rpc_transaction_history") .help("Verifies blockstore roots on boot and fixes any gaps"), ) + .arg( + Arg::with_name("accountsdb_plugin_config") + .long("accountsdb-plugin-config") + .value_name("FILE") + .takes_value(true) + .hidden(true) + .help("Specify the configuration file for the AccountsDb plugin."), + ) .arg( Arg::with_name("halt_on_trusted_validators_accounts_hash_mismatch") .alias("halt-on-trusted-validators-accounts-hash-mismatch") @@ -2269,6 +2277,10 @@ pub fn main() { .ok() .or_else(|| get_cluster_shred_version(&entrypoint_addrs)); + let accountsdb_plugin_config_file = matches + .value_of("accountsdb_plugin_config") + .map(PathBuf::from); + let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), tower_path: value_t!(matches, "tower", PathBuf).ok(), @@ -2310,6 +2322,7 @@ pub fn main() { account_indexes: account_indexes.clone(), rpc_scan_and_fix_roots: matches.is_present("rpc_scan_and_fix_roots"), }, + accountsdb_plugin_config_file, rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| { ( SocketAddr::new(rpc_bind_address, rpc_port),