diff --git a/Cargo.lock b/Cargo.lock index 9465c68183..22809b87f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1263,6 +1263,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + [[package]] name = "fast-math" version = "0.1.1" @@ -2321,6 +2327,17 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" +[[package]] +name = "md-5" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15" +dependencies = [ + "block-buffer 0.9.0", + "digest 0.9.0", + "opaque-debug 0.3.0", +] + [[package]] name = "memchr" version = "1.0.2" @@ -2857,6 +2874,24 @@ dependencies = [ "indexmap", ] +[[package]] +name = "phf" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dfb61232e34fcb633f43d12c58f83c1df82962dcdfa565a4e866ffc17dafe12" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c00cf8b9eafe68dde5e9eaa2cef8ee84a9336a47d566ec55ca16589633b65af7" +dependencies = [ + "siphasher", +] + [[package]] name = "pickledb" version = "0.4.1" @@ -2934,6 +2969,50 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" +[[package]] +name = "postgres" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7871ee579860d8183f542e387b176a25f2656b9fb5211e045397f745a68d1c2" +dependencies = [ + "bytes 1.0.1", + "fallible-iterator", + "futures 0.3.8", + "log 0.4.14", + "tokio", + "tokio-postgres", +] + +[[package]] +name = "postgres-protocol" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff3e0f70d32e20923cabf2df02913be7c1842d4c772db8065c00fcfdd1d1bff3" +dependencies = [ + "base64 0.13.0", + "byteorder", + "bytes 1.0.1", + "fallible-iterator", + "hmac 0.10.1", + "md-5", + "memchr 2.4.0", + "rand 0.8.3", + "sha2", + "stringprep", +] + +[[package]] +name = "postgres-types" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "430f4131e1b7657b0cd9a2b0c3408d77c9a43a042d300b8c77f981dffcc43a2f" +dependencies = [ + "bytes 1.0.1", + "chrono", + "fallible-iterator", + "postgres-protocol", +] + [[package]] name = "ppv-lite86" version = "0.2.8" @@ -3750,9 +3829,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.126" +version = "1.0.130" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03" +checksum = "f12d06de37cf59146fbdecab66aa99f9fe4f78722e3607577a5375d66bd0c913" dependencies = [ "serde_derive", ] @@ -3778,9 +3857,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.126" +version = "1.0.130" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43" +checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b" dependencies = [ "proc-macro2 1.0.24", "quote 1.0.9", @@ -3789,9 +3868,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.62" +version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea1c6153794552ea7cf7cf63b1231a25de00ec90db326ba6264440fa08e31486" +checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8" dependencies = [ "itoa", "ryu", @@ -3938,6 +4017,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a30f10c911c0355f80f1c2faa8096efc4a58cdf8590b954d5b395efa071c711" +[[package]] +name = "siphasher" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "533494a8f9b724d33625ab53c6c4800f7cc445895924a8ef649222dcb76e938b" + [[package]] name = "slab" version = "0.4.2" @@ -4075,6 +4160,51 @@ dependencies = [ "spl-token", ] +[[package]] +name = "solana-accountsdb-plugin-interface" +version = "1.8.1" +dependencies = [ + "log 0.4.14", + "thiserror", +] + +[[package]] +name = "solana-accountsdb-plugin-manager" +version = "1.8.1" +dependencies = [ + "bs58 0.4.0", + "crossbeam-channel 0.4.4", + "libloading 0.7.0", + "log 0.4.14", + "serde", + "serde_derive", + "serde_json", + "solana-accountsdb-plugin-interface", + "solana-logger 1.8.1", + "solana-metrics", + "solana-rpc", + "solana-runtime", + "solana-sdk", + "thiserror", +] + +[[package]] +name = "solana-accountsdb-plugin-postgres" +version = "1.8.1" +dependencies = [ + "bs58 0.4.0", + "chrono", + "libloading 0.7.0", + "log 0.4.14", + "postgres", + "serde", + "serde_derive", + "serde_json", + "solana-accountsdb-plugin-interface", + "solana-logger 1.8.1", + "thiserror", +] + [[package]] name = "solana-banking-bench" version = "1.8.1" @@ -4459,6 +4589,7 @@ dependencies = [ "serde_json", "serial_test", "solana-account-decoder", + "solana-accountsdb-plugin-manager", "solana-banks-server", "solana-clap-utils", "solana-client", @@ -5849,6 +5980,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "stringprep" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "strsim" version = "0.8.0" @@ -6025,18 +6166,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.23" +version = "1.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76cc616c6abf8c8928e2fdcc0dbfab37175edd8fb49a4641066ad1364fdab146" +checksum = "602eca064b2d83369e2b2f34b09c70b605402801927c65c11071ac911d299b88" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.23" +version = "1.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9be73a2caec27583d0046ef3796c3794f868a5bc813db689eed00c7631275cd1" +checksum = "bad553cc2c78e8de258400763a647e80e6d1b31ee237275d756f6836d204494c" dependencies = [ "proc-macro2 1.0.24", "quote 1.0.9", @@ -6235,6 +6376,29 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-postgres" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2b1383c7e4fb9a09e292c7c6afb7da54418d53b045f1c1fac7a911411a2b8b" +dependencies = [ + "async-trait", + "byteorder", + "bytes 1.0.1", + "fallible-iterator", + "futures 0.3.8", + "log 0.4.14", + "parking_lot 0.11.2", + "percent-encoding 2.1.0", + "phf", + "pin-project-lite", + "postgres-protocol", + "postgres-types", + "socket2 0.4.1", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-reactor" version = "0.1.12" diff --git a/Cargo.toml b/Cargo.toml index e92130987c..b6f27eab77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,8 @@ [workspace] members = [ + "accountsdb-plugin-interface", + "accountsdb-plugin-manager", + "accountsdb-plugin-postgres", "accounts-cluster-bench", "bench-exchange", "bench-streamer", diff --git a/accounts-bench/src/main.rs b/accounts-bench/src/main.rs index abe4a98e24..aa2dbcf571 100644 --- a/accounts-bench/src/main.rs +++ b/accounts-bench/src/main.rs @@ -66,6 +66,7 @@ fn main() { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); println!("Creating {} accounts", num_accounts); let mut create_time = Measure::start("create accounts"); diff --git a/accountsdb-plugin-interface/Cargo.toml b/accountsdb-plugin-interface/Cargo.toml new file mode 100644 index 0000000000..5280dedf12 --- /dev/null +++ b/accountsdb-plugin-interface/Cargo.toml @@ -0,0 +1,17 @@ +[package] +authors = ["Solana Maintainers "] +edition = "2018" +name = "solana-accountsdb-plugin-interface" +description = "The Solana AccountsDb plugin interface." +version = "1.8.1" +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +documentation = "https://docs.rs/solana-validator" + +[dependencies] +log = "0.4.11" +thiserror = "1.0.29" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/accountsdb-plugin-interface/README.md b/accountsdb-plugin-interface/README.md new file mode 100644 index 0000000000..6646b9908f --- /dev/null +++ b/accountsdb-plugin-interface/README.md @@ -0,0 +1,20 @@ +

+ + Solana + +

+ +# Solana AccountsDb Plugin Interface + +This crate enables an AccountsDb plugin to be plugged into the Solana Validator runtime to take actions +at the time of each account update; for example, saving the account state to an external database. The plugin must implement the `AccountsDbPlugin` trait. Please see the detail of the `accountsdb_plugin_interface.rs` for the interface definition. + +The plugin should produce a `cdylib` dynamic library, which must expose a `C` function `_create_plugin()` that +instantiates the implementation of the interface. + +The `solana-accountsdb-plugin-postgres` crate provides an example of how to create a plugin which saves the accounts data into an +external PostgreSQL databases. + +More information about Solana is available in the [Solana documentation](https://docs.solana.com/). + +Still have questions? Ask us on [Discord](https://discordapp.com/invite/pquxPsq) diff --git a/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs new file mode 100644 index 0000000000..b75914bf34 --- /dev/null +++ b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs @@ -0,0 +1,90 @@ +/// The interface for AccountsDb plugins. A plugin must implement +/// the AccountsDbPlugin trait to work with the runtime. +/// In addition, the dynamic library must export a "C" function _create_plugin which +/// creates the implementation of the plugin. +use { + std::{any::Any, error, io}, + thiserror::Error, +}; + +impl Eq for ReplicaAccountInfo<'_> {} + +#[derive(Clone, PartialEq, Debug)] +pub struct ReplicaAccountInfo<'a> { + pub pubkey: &'a [u8], + pub lamports: u64, + pub owner: &'a [u8], + pub executable: bool, + pub rent_epoch: u64, + pub data: &'a [u8], +} + +pub enum ReplicaAccountInfoVersions<'a> { + V0_0_1(&'a ReplicaAccountInfo<'a>), +} + +#[derive(Error, Debug)] +pub enum AccountsDbPluginError { + #[error("Error opening config file.")] + ConfigFileOpenError(#[from] io::Error), + + #[error("Error reading config file.")] + ConfigFileReadError { msg: String }, + + #[error("Error updating account.")] + AccountsUpdateError { msg: String }, + + #[error("Error updating slot status.")] + SlotStatusUpdateError { msg: String }, + + #[error("Plugin-defined custom error.")] + Custom(Box), +} + +#[derive(Debug, Clone)] +pub enum SlotStatus { + Processed, + Rooted, + Confirmed, +} + +impl SlotStatus { + pub fn as_str(&self) -> &'static str { + match self { + SlotStatus::Confirmed => "confirmed", + SlotStatus::Processed => "processed", + SlotStatus::Rooted => "rooted", + } + } +} + +pub type Result = std::result::Result; + +pub trait AccountsDbPlugin: Any + Send + Sync + std::fmt::Debug { + fn name(&self) -> &'static str; + + /// The callback called when a plugin is loaded by the system, + /// used for doing whatever initialization is required by the plugin. + /// The _config_file contains the name of the + /// of the config file. The config must be in JSON format and + /// include a field "libpath" indicating the full path + /// name of the shared library implementing this interface. + fn on_load(&mut self, _config_file: &str) -> Result<()> { + Ok(()) + } + + /// The callback called right before a plugin is unloaded by the system + /// Used for doing cleanup before unload. + fn on_unload(&mut self) {} + + /// Called when an account is updated at a slot. + fn update_account(&mut self, account: ReplicaAccountInfoVersions, slot: u64) -> Result<()>; + + /// Called when a slot status is updated + fn update_slot_status( + &mut self, + slot: u64, + parent: Option, + status: SlotStatus, + ) -> Result<()>; +} diff --git a/accountsdb-plugin-interface/src/lib.rs b/accountsdb-plugin-interface/src/lib.rs new file mode 100644 index 0000000000..0c3e43c20b --- /dev/null +++ b/accountsdb-plugin-interface/src/lib.rs @@ -0,0 +1 @@ +pub mod accountsdb_plugin_interface; diff --git a/accountsdb-plugin-manager/Cargo.toml b/accountsdb-plugin-manager/Cargo.toml new file mode 100644 index 0000000000..9508c983c5 --- /dev/null +++ b/accountsdb-plugin-manager/Cargo.toml @@ -0,0 +1,29 @@ +[package] +authors = ["Solana Maintainers "] +edition = "2018" +name = "solana-accountsdb-plugin-manager" +description = "The Solana AccountsDb plugin manager." +version = "1.8.1" +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +documentation = "https://docs.rs/solana-validator" + +[dependencies] +bs58 = "0.4.0" +crossbeam-channel = "0.4" +libloading = "0.7.0" +log = "0.4.11" +serde = "1.0.130" +serde_derive = "1.0.103" +serde_json = "1.0.67" +solana-accountsdb-plugin-interface = { path = "../accountsdb-plugin-interface", version = "=1.8.1" } +solana-logger = { path = "../logger", version = "=1.8.1" } +solana-metrics = { path = "../metrics", version = "=1.8.1" } +solana-rpc = { path = "../rpc", version = "=1.8.1" } +solana-runtime = { path = "../runtime", version = "=1.8.1" } +solana-sdk = { path = "../sdk", version = "=1.8.1" } +thiserror = "1.0.21" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/accountsdb-plugin-manager/src/accounts_update_notifier.rs b/accountsdb-plugin-manager/src/accounts_update_notifier.rs new file mode 100644 index 0000000000..1896dca013 --- /dev/null +++ b/accountsdb-plugin-manager/src/accounts_update_notifier.rs @@ -0,0 +1,129 @@ +/// Module responsible for notifying plugins of account updates +use { + crate::accountsdb_plugin_manager::AccountsDbPluginManager, + log::*, + solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ + ReplicaAccountInfo, ReplicaAccountInfoVersions, SlotStatus, + }, + solana_runtime::{ + accounts_update_notifier_interface::AccountsUpdateNotifierInterface, + append_vec::StoredAccountMeta, + }, + solana_sdk::{ + account::{AccountSharedData, ReadableAccount}, + clock::Slot, + pubkey::Pubkey, + }, + std::sync::{Arc, RwLock}, +}; +#[derive(Debug)] +pub(crate) struct AccountsUpdateNotifierImpl { + plugin_manager: Arc>, +} + +impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl { + fn notify_account_update(&self, slot: Slot, pubkey: &Pubkey, account: &AccountSharedData) { + if let Some(account_info) = self.accountinfo_from_shared_account_data(pubkey, account) { + self.notify_plugins_of_account_update(account_info, slot); + } + } + + fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) { + if let Some(account_info) = self.accountinfo_from_stored_account_meta(account) { + self.notify_plugins_of_account_update(account_info, slot); + } + } + + fn notify_slot_confirmed(&self, slot: Slot, parent: Option) { + self.notify_slot_status(slot, parent, SlotStatus::Confirmed); + } + + fn notify_slot_processed(&self, slot: Slot, parent: Option) { + self.notify_slot_status(slot, parent, SlotStatus::Processed); + } + + fn notify_slot_rooted(&self, slot: Slot, parent: Option) { + self.notify_slot_status(slot, parent, SlotStatus::Rooted); + } +} + +impl AccountsUpdateNotifierImpl { + pub fn new(plugin_manager: Arc>) -> Self { + AccountsUpdateNotifierImpl { plugin_manager } + } + + fn accountinfo_from_shared_account_data<'a>( + &self, + pubkey: &'a Pubkey, + account: &'a AccountSharedData, + ) -> Option> { + Some(ReplicaAccountInfo { + pubkey: pubkey.as_ref(), + lamports: account.lamports(), + owner: account.owner().as_ref(), + executable: account.executable(), + rent_epoch: account.rent_epoch(), + data: account.data(), + }) + } + + fn accountinfo_from_stored_account_meta<'a>( + &self, + stored_account_meta: &'a StoredAccountMeta, + ) -> Option> { + Some(ReplicaAccountInfo { + pubkey: stored_account_meta.meta.pubkey.as_ref(), + lamports: stored_account_meta.account_meta.lamports, + owner: stored_account_meta.account_meta.owner.as_ref(), + executable: stored_account_meta.account_meta.executable, + rent_epoch: stored_account_meta.account_meta.rent_epoch, + data: stored_account_meta.data, + }) + } + + fn notify_plugins_of_account_update(&self, account: ReplicaAccountInfo, slot: Slot) { + let mut plugin_manager = self.plugin_manager.write().unwrap(); + + if plugin_manager.plugins.is_empty() { + return; + } + for plugin in plugin_manager.plugins.iter_mut() { + match plugin.update_account(ReplicaAccountInfoVersions::V0_0_1(&account), slot) { + Err(err) => { + error!( + "Failed to update account {:?} at slot {:?}, error: {:?}", + account.pubkey, slot, err + ) + } + Ok(_) => { + trace!( + "Successfully updated account {:?} at slot {:?}", + account.pubkey, + slot + ); + } + } + } + } + + pub fn notify_slot_status(&self, slot: Slot, parent: Option, slot_status: SlotStatus) { + let mut plugin_manager = self.plugin_manager.write().unwrap(); + if plugin_manager.plugins.is_empty() { + return; + } + + for plugin in plugin_manager.plugins.iter_mut() { + match plugin.update_slot_status(slot, parent, slot_status.clone()) { + Err(err) => { + error!( + "Failed to update slot status at slot {:?}, error: {:?}", + slot, err + ) + } + Ok(_) => { + trace!("Successfully updated slot status at slot {:?}", slot); + } + } + } + } +} diff --git a/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs b/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs new file mode 100644 index 0000000000..a6074362f8 --- /dev/null +++ b/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs @@ -0,0 +1,55 @@ +/// Managing the AccountsDb plugins +use { + libloading::{Library, Symbol}, + log::*, + solana_accountsdb_plugin_interface::accountsdb_plugin_interface::AccountsDbPlugin, + std::error::Error, +}; + +#[derive(Default, Debug)] +pub struct AccountsDbPluginManager { + pub plugins: Vec>, + libs: Vec, +} + +impl AccountsDbPluginManager { + pub fn new() -> Self { + AccountsDbPluginManager { + plugins: Vec::default(), + libs: Vec::default(), + } + } + + /// # Safety + /// + /// This function loads the dynamically linked library specified in the path. The library + /// must do necessary initializations. + pub unsafe fn load_plugin( + &mut self, + libpath: &str, + config_file: &str, + ) -> Result<(), Box> { + type PluginConstructor = unsafe fn() -> *mut dyn AccountsDbPlugin; + let lib = Library::new(libpath)?; + let constructor: Symbol = lib.get(b"_create_plugin")?; + let plugin_raw = constructor(); + let mut plugin = Box::from_raw(plugin_raw); + plugin.on_load(config_file)?; + self.plugins.push(plugin); + self.libs.push(lib); + Ok(()) + } + + /// Unload all plugins and loaded plugin libraries, making sure to fire + /// their `on_plugin_unload()` methods so they can do any necessary cleanup. + pub fn unload(&mut self) { + for mut plugin in self.plugins.drain(..) { + info!("Unloading plugin for {:?}", plugin.name()); + plugin.on_unload(); + } + + for lib in self.libs.drain(..) { + drop(lib); + } + } +} diff --git a/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs b/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs new file mode 100644 index 0000000000..ed10433097 --- /dev/null +++ b/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs @@ -0,0 +1,150 @@ +use { + crate::{ + accounts_update_notifier::AccountsUpdateNotifierImpl, + accountsdb_plugin_manager::AccountsDbPluginManager, + slot_status_observer::SlotStatusObserver, + }, + crossbeam_channel::Receiver, + log::*, + serde_json, + solana_rpc::optimistically_confirmed_bank_tracker::BankNotification, + solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier, + std::{ + fs::File, + io::Read, + path::Path, + sync::{Arc, RwLock}, + thread, + }, + thiserror::Error, +}; + +#[derive(Error, Debug)] +pub enum AccountsdbPluginServiceError { + #[error("Cannot open the the plugin config file")] + CannotOpenConfigFile(String), + + #[error("Cannot read the the plugin config file")] + CannotReadConfigFile(String), + + #[error("The config file is not in a valid Json format")] + InvalidConfigFileFormat(String), + + #[error("Plugin library path is not specified in the config file")] + LibPathNotSet, + + #[error("Invalid plugin path")] + InvalidPluginPath, + + #[error("Cannot load plugin shared library")] + PluginLoadError(String), +} + +/// The service managing the AccountsDb plugin workflow. +pub struct AccountsDbPluginService { + slot_status_observer: SlotStatusObserver, + plugin_manager: Arc>, + accounts_update_notifier: AccountsUpdateNotifier, +} + +impl AccountsDbPluginService { + /// Creates and returns the AccountsDbPluginService. + /// # Arguments + /// * `confirmed_bank_receiver` - The receiver for confirmed bank notification + /// * `accountsdb_plugin_config_file` - The config file path for the plugin. The + /// config file controls the plugin responsible + /// for transporting the data to external data stores. It is defined in JSON format. + /// The `libpath` field should be pointed to the full path of the dynamic shared library + /// (.so file) to be loaded. The shared library must implement the `AccountsDbPlugin` + /// trait. And the shared library shall export a `C` function `_create_plugin` which + /// shall create the implementation of `AccountsDbPlugin` and returns to the caller. + /// The rest of the JSON fields' definition is up to to the concrete plugin implementation + /// It is usually used to configure the connection information for the external data store. + + pub fn new( + confirmed_bank_receiver: Receiver, + accountsdb_plugin_config_file: &Path, + ) -> Result { + info!( + "Starting AccountsDbPluginService from config file: {:?}", + accountsdb_plugin_config_file + ); + let plugin_manager = AccountsDbPluginManager::new(); + let plugin_manager = Arc::new(RwLock::new(plugin_manager)); + + let mut file = match File::open(accountsdb_plugin_config_file) { + Ok(file) => file, + Err(err) => { + return Err(AccountsdbPluginServiceError::CannotOpenConfigFile(format!( + "Failed to open the plugin config file {:?}, error: {:?}", + accountsdb_plugin_config_file, err + ))); + } + }; + + let mut contents = String::new(); + if let Err(err) = file.read_to_string(&mut contents) { + return Err(AccountsdbPluginServiceError::CannotReadConfigFile(format!( + "Failed to read the plugin config file {:?}, error: {:?}", + accountsdb_plugin_config_file, err + ))); + } + + let result: serde_json::Value = match serde_json::from_str(&contents) { + Ok(value) => value, + Err(err) => { + return Err(AccountsdbPluginServiceError::InvalidConfigFileFormat( + format!( + "The config file {:?} is not in a valid Json format, error: {:?}", + accountsdb_plugin_config_file, err + ), + )); + } + }; + + let accounts_update_notifier = Arc::new(RwLock::new(AccountsUpdateNotifierImpl::new( + plugin_manager.clone(), + ))); + let slot_status_observer = + SlotStatusObserver::new(confirmed_bank_receiver, accounts_update_notifier.clone()); + + let libpath = result["libpath"] + .as_str() + .ok_or(AccountsdbPluginServiceError::LibPathNotSet)?; + let config_file = accountsdb_plugin_config_file + .as_os_str() + .to_str() + .ok_or(AccountsdbPluginServiceError::InvalidPluginPath)?; + + unsafe { + let result = plugin_manager + .write() + .unwrap() + .load_plugin(libpath, config_file); + if let Err(err) = result { + let msg = format!( + "Failed to load the plugin library: {:?}, error: {:?}", + libpath, err + ); + return Err(AccountsdbPluginServiceError::PluginLoadError(msg)); + } + } + + info!("Started AccountsDbPluginService"); + Ok(AccountsDbPluginService { + slot_status_observer, + plugin_manager, + accounts_update_notifier, + }) + } + + pub fn get_accounts_update_notifier(&self) -> AccountsUpdateNotifier { + self.accounts_update_notifier.clone() + } + + pub fn join(mut self) -> thread::Result<()> { + self.slot_status_observer.join()?; + self.plugin_manager.write().unwrap().unload(); + Ok(()) + } +} diff --git a/accountsdb-plugin-manager/src/lib.rs b/accountsdb-plugin-manager/src/lib.rs new file mode 100644 index 0000000000..d2b38b57b3 --- /dev/null +++ b/accountsdb-plugin-manager/src/lib.rs @@ -0,0 +1,4 @@ +pub mod accounts_update_notifier; +pub mod accountsdb_plugin_manager; +pub mod accountsdb_plugin_service; +pub mod slot_status_observer; diff --git a/accountsdb-plugin-manager/src/slot_status_observer.rs b/accountsdb-plugin-manager/src/slot_status_observer.rs new file mode 100644 index 0000000000..9d3b36879f --- /dev/null +++ b/accountsdb-plugin-manager/src/slot_status_observer.rs @@ -0,0 +1,80 @@ +use { + crossbeam_channel::Receiver, + solana_rpc::optimistically_confirmed_bank_tracker::BankNotification, + solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier, + std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, Builder, JoinHandle}, + }, +}; + +#[derive(Debug)] +pub(crate) struct SlotStatusObserver { + bank_notification_receiver_service: Option>, + exit_updated_slot_server: Arc, +} + +impl SlotStatusObserver { + pub fn new( + bank_notification_receiver: Receiver, + accounts_update_notifier: AccountsUpdateNotifier, + ) -> Self { + let exit_updated_slot_server = Arc::new(AtomicBool::new(false)); + + Self { + bank_notification_receiver_service: Some(Self::run_bank_notification_receiver( + bank_notification_receiver, + exit_updated_slot_server.clone(), + accounts_update_notifier, + )), + exit_updated_slot_server, + } + } + + pub fn join(&mut self) -> thread::Result<()> { + self.exit_updated_slot_server.store(true, Ordering::Relaxed); + self.bank_notification_receiver_service + .take() + .map(JoinHandle::join) + .unwrap() + } + + fn run_bank_notification_receiver( + bank_notification_receiver: Receiver, + exit: Arc, + accounts_update_notifier: AccountsUpdateNotifier, + ) -> JoinHandle<()> { + Builder::new() + .name("bank_notification_receiver".to_string()) + .spawn(move || { + while !exit.load(Ordering::Relaxed) { + if let Ok(slot) = bank_notification_receiver.recv() { + match slot { + BankNotification::OptimisticallyConfirmed(slot) => { + accounts_update_notifier + .read() + .unwrap() + .notify_slot_confirmed(slot, None); + } + BankNotification::Frozen(bank) => { + accounts_update_notifier + .read() + .unwrap() + .notify_slot_processed(bank.slot(), Some(bank.parent_slot())); + } + BankNotification::Root(bank) => { + accounts_update_notifier + .read() + .unwrap() + .notify_slot_rooted(bank.slot(), Some(bank.parent_slot())); + } + } + } + } + }) + .unwrap() + } +} diff --git a/accountsdb-plugin-postgres/Cargo.toml b/accountsdb-plugin-postgres/Cargo.toml new file mode 100644 index 0000000000..59438c884c --- /dev/null +++ b/accountsdb-plugin-postgres/Cargo.toml @@ -0,0 +1,29 @@ +[package] +authors = ["Solana Maintainers "] +edition = "2018" +name = "solana-accountsdb-plugin-postgres" +description = "The Solana AccountsDb plugin for PostgreSQL database." +version = "1.8.1" +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +documentation = "https://docs.rs/solana-validator" + +[lib] +crate-type = ["cdylib", "rlib"] + +[dependencies] +bs58 = "0.4.0" +chrono = { version = "0.4.11", features = ["serde"] } +libloading = "0.7.0" +log = "0.4.14" +postgres = { version = "0.19.1", features = ["with-chrono-0_4"] } +serde = "1.0.130" +serde_derive = "1.0.103" +serde_json = "1.0.67" +solana-accountsdb-plugin-interface = { path = "../accountsdb-plugin-interface", version = "=1.8.1" } +solana-logger = { path = "../logger", version = "=1.8.1" } +thiserror = "1.0.21" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/accountsdb-plugin-postgres/scripts/create_schema.sql b/accountsdb-plugin-postgres/scripts/create_schema.sql new file mode 100644 index 0000000000..41be52f5b3 --- /dev/null +++ b/accountsdb-plugin-postgres/scripts/create_schema.sql @@ -0,0 +1,52 @@ +/** + * This plugin implementation for PostgreSQL requires the following tables + */ +-- The table storing accounts + + +CREATE TABLE account ( + pubkey BYTEA PRIMARY KEY, + owner BYTEA, + lamports BIGINT NOT NULL, + slot BIGINT NOT NULL, + executable BOOL NOT NULL, + rent_epoch BIGINT NOT NULL, + data BYTEA, + updated_on TIMESTAMP NOT NULL +); + +-- The table storing slot information +CREATE TABLE slot ( + slot BIGINT PRIMARY KEY, + parent BIGINT, + status varchar(16) NOT NULL, + updated_on TIMESTAMP NOT NULL +); + +/** + * The following is for keeping historical data for accounts and is not required for plugin to work. + */ +-- The table storing historical data for accounts +CREATE TABLE account_audit ( + pubkey BYTEA, + owner BYTEA, + lamports BIGINT NOT NULL, + slot BIGINT NOT NULL, + executable BOOL NOT NULL, + rent_epoch BIGINT NOT NULL, + data BYTEA, + updated_on TIMESTAMP NOT NULL +); + +CREATE FUNCTION audit_account_update() RETURNS trigger AS $audit_account_update$ + BEGIN + INSERT INTO account_audit (pubkey, owner, lamports, slot, executable, rent_epoch, data, updated_on) + VALUES (OLD.pubkey, OLD.owner, OLD.lamports, OLD.slot, + OLD.executable, OLD.rent_epoch, OLD.data, OLD.updated_on); + RETURN NEW; + END; + +$audit_account_update$ LANGUAGE plpgsql; + +CREATE TRIGGER account_update_trigger AFTER UPDATE OR DELETE ON account + FOR EACH ROW EXECUTE PROCEDURE audit_account_update(); diff --git a/accountsdb-plugin-postgres/scripts/drop_schema.sql b/accountsdb-plugin-postgres/scripts/drop_schema.sql new file mode 100644 index 0000000000..e76a0dc165 --- /dev/null +++ b/accountsdb-plugin-postgres/scripts/drop_schema.sql @@ -0,0 +1,7 @@ +/** + * Script for cleaning up the schema for PostgreSQL used for the AccountsDb plugin. + */ + +DROP FUNCTION audit_account_update; +DROP TABLE account_audit; +DROP TABLE account; \ No newline at end of file diff --git a/accountsdb-plugin-postgres/src/accounts_selector.rs b/accountsdb-plugin-postgres/src/accounts_selector.rs new file mode 100644 index 0000000000..91c669f70a --- /dev/null +++ b/accountsdb-plugin-postgres/src/accounts_selector.rs @@ -0,0 +1,69 @@ +use {log::*, std::collections::HashSet}; + +#[derive(Debug)] +pub(crate) struct AccountsSelector { + pub accounts: HashSet>, + pub owners: HashSet>, + pub select_all_accounts: bool, +} + +impl AccountsSelector { + pub fn default() -> Self { + AccountsSelector { + accounts: HashSet::default(), + owners: HashSet::default(), + select_all_accounts: true, + } + } + + pub fn new(accounts: &[String], owners: &[String]) -> Self { + info!( + "Creating AccountsSelector from accounts: {:?}, owners: {:?}", + accounts, owners + ); + + let select_all_accounts = accounts.iter().any(|key| key == "*"); + if select_all_accounts { + return AccountsSelector { + accounts: HashSet::default(), + owners: HashSet::default(), + select_all_accounts, + }; + } + let accounts = accounts + .iter() + .map(|key| bs58::decode(key).into_vec().unwrap()) + .collect(); + let owners = owners + .iter() + .map(|key| bs58::decode(key).into_vec().unwrap()) + .collect(); + AccountsSelector { + accounts, + owners, + select_all_accounts, + } + } + + pub fn is_account_selected(&self, account: &[u8], owner: &[u8]) -> bool { + self.select_all_accounts || self.accounts.contains(account) || self.owners.contains(owner) + } +} + +#[cfg(test)] +pub(crate) mod tests { + use super::*; + + #[test] + fn test_create_accounts_selector() { + AccountsSelector::new( + &["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_string()], + &[], + ); + + AccountsSelector::new( + &[], + &["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_string()], + ); + } +} diff --git a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs new file mode 100644 index 0000000000..1663a0e2d4 --- /dev/null +++ b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs @@ -0,0 +1,349 @@ +/// Main entry for the PostgreSQL plugin +use { + crate::accounts_selector::AccountsSelector, + bs58, + chrono::Utc, + log::*, + postgres::{Client, NoTls, Statement}, + serde_derive::{Deserialize, Serialize}, + serde_json, + solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ + AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions, Result, SlotStatus, + }, + std::{fs::File, io::Read, sync::Mutex}, + thiserror::Error, +}; + +struct PostgresSqlClientWrapper { + client: Client, + update_account_stmt: Statement, +} + +#[derive(Default)] +pub struct AccountsDbPluginPostgres { + client: Option>, + accounts_selector: Option, +} + +impl std::fmt::Debug for AccountsDbPluginPostgres { + fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Ok(()) + } +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +struct AccountsDbPluginPostgresConfig { + host: String, + user: String, +} + +#[derive(Error, Debug)] +enum AccountsDbPluginPostgresError { + #[error("Error connecting to the backend data store.")] + DataStoreConnectionError { msg: String }, + + #[error("Error preparing data store schema.")] + DataSchemaError { msg: String }, +} + +impl AccountsDbPlugin for AccountsDbPluginPostgres { + fn name(&self) -> &'static str { + "AccountsDbPluginPostgres" + } + + /// Do initialization for the PostgreSQL plugin. + /// # Arguments + /// + /// Format of the config file: + /// The `accounts_selector` section allows the user to controls accounts selections. + /// "accounts_selector" : { + /// "accounts" : \["pubkey-1", "pubkey-2", ..., "pubkey-n"\], + /// } + /// or: + /// "accounts_selector" = { + /// "owners" : \["pubkey-1', 'pubkey-2", ..., "pubkey-m"\] + /// } + /// Accounts either satisyfing the accounts condition or owners condition will be selected. + /// When only owners is specified, + /// all accounts belonging to the owners will be streamed. + /// The accounts field support wildcard to select all accounts: + /// "accounts_selector" : { + /// "accounts" : \["*"\], + /// } + /// "host" specifies the PostgreSQL server. + /// "user" specifies the PostgreSQL user. + /// # Examples + /// { + /// "libpath": "/home/solana/target/release/libsolana_accountsdb_plugin_postgres.so", + /// "host": "host_foo", + /// "user": "solana", + /// "accounts_selector" : { + /// "owners" : ["9oT9R5ZyRovSVnt37QvVoBttGpNqR3J7unkb567NP8k3"] + /// } + + fn on_load(&mut self, config_file: &str) -> Result<()> { + solana_logger::setup_with_default("info"); + info!( + "Loading plugin {:?} from config_file {:?}", + self.name(), + config_file + ); + let mut file = File::open(config_file)?; + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + + let result: serde_json::Value = serde_json::from_str(&contents).unwrap(); + self.accounts_selector = Some(Self::create_accounts_selector_from_config(&result)); + + let result: serde_json::Result = + serde_json::from_str(&contents); + match result { + Err(err) => { + return Err(AccountsDbPluginError::ConfigFileReadError { + msg: format!( + "The config file is not in the JSON format expected: {:?}", + err + ), + }) + } + Ok(config) => { + let connection_str = format!("host={} user={}", config.host, config.user); + match Client::connect(&connection_str, NoTls) { + Err(err) => { + return Err(AccountsDbPluginError::Custom( + Box::new(AccountsDbPluginPostgresError::DataStoreConnectionError { + msg: format!( + "Error in connecting to the PostgreSQL database: {:?} host: {:?} user: {:?} config: {:?}", + err, config.host, config.user, connection_str), + }))); + } + Ok(mut client) => { + let result = client.prepare("INSERT INTO account (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \ + ON CONFLICT (pubkey) DO UPDATE SET slot=$2, owner=$3, lamports=$4, executable=$5, rent_epoch=$6, \ + data=$7, updated_on=$8"); + + match result { + Err(err) => { + return Err(AccountsDbPluginError::Custom( + Box::new(AccountsDbPluginPostgresError::DataSchemaError { + msg: format!( + "Error in preparing for the accounts update PostgreSQL database: {:?} host: {:?} user: {:?} config: {:?}", + err, config.host, config.user, connection_str + ), + }))); + } + Ok(update_account_stmt) => { + self.client = Some(Mutex::new(PostgresSqlClientWrapper { + client, + update_account_stmt, + })); + } + } + } + } + } + } + + Ok(()) + } + + /// Unload all plugins and loaded plugin libraries, making sure to fire + /// their `on_plugin_unload()` methods so they can do any necessary cleanup. + fn on_unload(&mut self) { + info!("Unloading plugin: {:?}", self.name()); + } + + fn update_account(&mut self, account: ReplicaAccountInfoVersions, slot: u64) -> Result<()> { + match account { + ReplicaAccountInfoVersions::V0_0_1(account) => { + if let Some(accounts_selector) = &self.accounts_selector { + if !accounts_selector.is_account_selected(account.pubkey, account.owner) { + return Ok(()); + } + } else { + return Ok(()); + } + + debug!( + "Updating account {:?} with owner {:?} at slot {:?} using account selector {:?}", + bs58::encode(account.pubkey).into_string(), + bs58::encode(account.owner).into_string(), + slot, + self.accounts_selector.as_ref().unwrap() + ); + + match &mut self.client { + None => { + return Err(AccountsDbPluginError::Custom(Box::new( + AccountsDbPluginPostgresError::DataStoreConnectionError { + msg: "There is no connection to the PostgreSQL database." + .to_string(), + }, + ))); + } + Some(client) => { + let slot = slot as i64; // postgres only supports i64 + let lamports = account.lamports as i64; + let rent_epoch = account.rent_epoch as i64; + let updated_on = Utc::now().naive_utc(); + let client = client.get_mut().unwrap(); + let result = client.client.query( + &client.update_account_stmt, + &[ + &account.pubkey, + &slot, + &account.owner, + &lamports, + &account.executable, + &rent_epoch, + &account.data, + &updated_on, + ], + ); + + if let Err(err) = result { + return Err(AccountsDbPluginError::AccountsUpdateError { + msg: format!("Failed to persist the update of account to the PostgreSQL database. Error: {:?}", err) + }); + } + } + } + } + } + Ok(()) + } + + fn update_slot_status( + &mut self, + slot: u64, + parent: Option, + status: SlotStatus, + ) -> Result<()> { + info!("Updating slot {:?} at with status {:?}", slot, status); + + match &mut self.client { + None => { + return Err(AccountsDbPluginError::Custom(Box::new( + AccountsDbPluginPostgresError::DataStoreConnectionError { + msg: "There is no connection to the PostgreSQL database.".to_string(), + }, + ))); + } + Some(client) => { + let slot = slot as i64; // postgres only supports i64 + let parent = parent.map(|parent| parent as i64); + let updated_on = Utc::now().naive_utc(); + let status_str = status.as_str(); + + let result = match parent { + Some(parent) => { + client.get_mut().unwrap().client.execute( + "INSERT INTO slot (slot, parent, status, updated_on) \ + VALUES ($1, $2, $3, $4) \ + ON CONFLICT (slot) DO UPDATE SET parent=$2, status=$3, updated_on=$4", + &[ + &slot, + &parent, + &status_str, + &updated_on, + ], + ) + } + None => { + client.get_mut().unwrap().client.execute( + "INSERT INTO slot (slot, status, updated_on) \ + VALUES ($1, $2, $3) \ + ON CONFLICT (slot) DO UPDATE SET status=$2, updated_on=$3", + &[ + &slot, + &status_str, + &updated_on, + ], + ) + } + }; + + match result { + Err(err) => { + return Err(AccountsDbPluginError::SlotStatusUpdateError{ + msg: format!("Failed to persist the update of slot to the PostgreSQL database. Error: {:?}", err) + }); + } + Ok(rows) => { + assert_eq!(1, rows, "Expected one rows to be updated a time"); + } + } + } + } + + Ok(()) + } +} + +impl AccountsDbPluginPostgres { + fn create_accounts_selector_from_config(config: &serde_json::Value) -> AccountsSelector { + let accounts_selector = &config["accounts_selector"]; + + if accounts_selector.is_null() { + AccountsSelector::default() + } else { + let accounts = &accounts_selector["accounts"]; + let accounts: Vec = if accounts.is_array() { + accounts + .as_array() + .unwrap() + .iter() + .map(|val| val.as_str().unwrap().to_string()) + .collect() + } else { + Vec::default() + }; + let owners = &accounts_selector["owners"]; + let owners: Vec = if owners.is_array() { + owners + .as_array() + .unwrap() + .iter() + .map(|val| val.as_str().unwrap().to_string()) + .collect() + } else { + Vec::default() + }; + AccountsSelector::new(&accounts, &owners) + } + } + + pub fn new() -> Self { + AccountsDbPluginPostgres { + client: None, + accounts_selector: None, + } + } +} + +#[no_mangle] +#[allow(improper_ctypes_definitions)] +/// # Safety +/// +/// This function returns the AccountsDbPluginPostgres pointer as trait AccountsDbPlugin. +pub unsafe extern "C" fn _create_plugin() -> *mut dyn AccountsDbPlugin { + let plugin = AccountsDbPluginPostgres::new(); + let plugin: Box = Box::new(plugin); + Box::into_raw(plugin) +} + +#[cfg(test)] +pub(crate) mod tests { + use {super::*, serde_json}; + + #[test] + fn test_accounts_selector_from_config() { + let config = "{\"accounts_selector\" : { \ + \"owners\" : [\"9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin\"] \ + }}"; + + let config: serde_json::Value = serde_json::from_str(config).unwrap(); + AccountsDbPluginPostgres::create_accounts_selector_from_config(&config); + } +} diff --git a/accountsdb-plugin-postgres/src/lib.rs b/accountsdb-plugin-postgres/src/lib.rs new file mode 100644 index 0000000000..ccc4f17902 --- /dev/null +++ b/accountsdb-plugin-postgres/src/lib.rs @@ -0,0 +1,2 @@ +pub mod accounts_selector; +pub mod accountsdb_plugin_postgres; diff --git a/core/Cargo.toml b/core/Cargo.toml index 6df2a82224..461f1dceb2 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -45,6 +45,7 @@ serde = "1.0.122" serde_bytes = "0.11" serde_derive = "1.0.103" solana-account-decoder = { path = "../account-decoder", version = "=1.8.1" } +solana-accountsdb-plugin-manager = { path = "../accountsdb-plugin-manager", version = "=1.8.1" } solana-banks-server = { path = "../banks-server", version = "=1.8.1" } solana-clap-utils = { path = "../clap-utils", version = "=1.8.1" } solana-client = { path = "../client", version = "=1.8.1" } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 8e3ce33840..6ce17523d7 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -495,7 +495,7 @@ mod tests { ..ProcessOptions::default() }; let (bank_forks, cached_leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); let leader_schedule_cache = Arc::new(cached_leader_schedule); let bank_forks = Arc::new(RwLock::new(bank_forks)); diff --git a/core/src/validator.rs b/core/src/validator.rs index ccc74a9d7f..5f818f0a87 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -19,6 +19,7 @@ use { }, crossbeam_channel::{bounded, unbounded}, rand::{thread_rng, Rng}, + solana_accountsdb_plugin_manager::accountsdb_plugin_service::AccountsDbPluginService, solana_gossip::{ cluster_info::{ ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, @@ -58,6 +59,7 @@ use { solana_runtime::{ accounts_db::AccountShrinkThreshold, accounts_index::AccountSecondaryIndexes, + accounts_update_notifier_interface::AccountsUpdateNotifier, bank::Bank, bank_forks::{BankForks, SnapshotConfig}, commitment::BlockCommitmentCache, @@ -104,6 +106,7 @@ pub struct ValidatorConfig { pub account_paths: Vec, pub account_shrink_paths: Option>, pub rpc_config: JsonRpcConfig, + pub accountsdb_plugin_config_file: Option, pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub) pub pubsub_config: PubSubConfig, pub snapshot_config: Option, @@ -163,6 +166,7 @@ impl Default for ValidatorConfig { account_paths: Vec::new(), account_shrink_paths: None, rpc_config: JsonRpcConfig::default(), + accountsdb_plugin_config_file: None, rpc_addrs: None, pubsub_config: PubSubConfig::default(), snapshot_config: None, @@ -267,6 +271,7 @@ pub struct Validator { tpu: Tpu, tvu: Tvu, ip_echo_server: Option, + accountsdb_plugin_service: Option, } // in the distant future, get rid of ::new()/exit() and use Result properly... @@ -303,6 +308,27 @@ impl Validator { warn!("identity: {}", id); warn!("vote account: {}", vote_account); + let mut bank_notification_senders = Vec::new(); + + let accountsdb_plugin_service = + if let Some(accountsdb_plugin_config_file) = &config.accountsdb_plugin_config_file { + let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded(); + bank_notification_senders.push(confirmed_bank_sender); + let result = AccountsDbPluginService::new( + confirmed_bank_receiver, + accountsdb_plugin_config_file, + ); + match result { + Ok(accountsdb_plugin_service) => Some(accountsdb_plugin_service), + Err(err) => { + error!("Failed to load the AccountsDb plugin: {:?}", err); + abort(); + } + } + } else { + None + }; + if config.voting_disabled { warn!("voting disabled"); authorized_voter_keypairs.write().unwrap().clear(); @@ -396,6 +422,9 @@ impl Validator { config.enforce_ulimit_nofile, &start_progress, config.no_poh_speed_test, + accountsdb_plugin_service + .as_ref() + .map(|plugin_service| plugin_service.get_accounts_update_notifier()), ); *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices; @@ -533,6 +562,11 @@ impl Validator { )); } let (bank_notification_sender, bank_notification_receiver) = unbounded(); + let confirmed_bank_subscribers = if !bank_notification_senders.is_empty() { + Some(Arc::new(RwLock::new(bank_notification_senders))) + } else { + None + }; ( Some(JsonRpcService::new( rpc_addr, @@ -577,6 +611,7 @@ impl Validator { bank_forks.clone(), optimistically_confirmed_bank, rpc_subscriptions.clone(), + confirmed_bank_subscribers, )), Some(bank_notification_sender), ) @@ -814,6 +849,7 @@ impl Validator { poh_recorder, ip_echo_server, validator_exit: config.validator_exit.clone(), + accountsdb_plugin_service, } } @@ -916,6 +952,12 @@ impl Validator { if let Some(ip_echo_server) = self.ip_echo_server { ip_echo_server.shutdown_background(); } + + if let Some(accountsdb_plugin_service) = self.accountsdb_plugin_service { + accountsdb_plugin_service + .join() + .expect("accountsdb_plugin_service"); + } } } @@ -1049,6 +1091,7 @@ fn post_process_restored_tower( } #[allow(clippy::type_complexity)] +#[allow(clippy::too_many_arguments)] fn new_banks_from_ledger( validator_identity: &Pubkey, vote_account: &Pubkey, @@ -1059,6 +1102,7 @@ fn new_banks_from_ledger( enforce_ulimit_nofile: bool, start_progress: &Arc>, no_poh_speed_test: bool, + accounts_update_notifier: Option, ) -> ( GenesisConfig, BankForks, @@ -1175,6 +1219,7 @@ fn new_banks_from_ledger( transaction_history_services .cache_block_meta_sender .as_ref(), + accounts_update_notifier, ) .unwrap_or_else(|err| { error!("Failed to load ledger: {:?}", err); diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index bc0a52bb6b..0ad024199f 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -109,6 +109,7 @@ mod tests { false, accounts_db::AccountShrinkThreshold::default(), false, + None, ); bank0.freeze(); let mut bank_forks = BankForks::new(bank0); @@ -172,6 +173,7 @@ mod tests { accounts_db::AccountShrinkThreshold::default(), check_hash_calculation, false, + None, ) .unwrap(); diff --git a/docs/src/developing/backwards-compatibility.md b/docs/src/developing/backwards-compatibility.md index 9f26ad03f8..a6db1f0946 100644 --- a/docs/src/developing/backwards-compatibility.md +++ b/docs/src/developing/backwards-compatibility.md @@ -76,6 +76,7 @@ Major releases: - [`solana-program`](https://docs.rs/solana-program/) - Rust SDK for writing programs - [`solana-client`](https://docs.rs/solana-client/) - Rust client for connecting to RPC API - [`solana-cli-config`](https://docs.rs/solana-cli-config/) - Rust client for managing Solana CLI config files +- [`solana-accountsdb-plugin-interface`](https://docs.rs/solana-accountsdb-plugin-interface/) - Rust interface for developing Solana AccountsDb plugins. Patch releases: diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index a269afcc89..b16f3bf78c 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -727,6 +727,7 @@ fn load_bank_forks( process_options, None, None, + None, ) } diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index d654f499b7..39cd2b0a6d 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -9,6 +9,7 @@ use crate::{ }; use log::*; use solana_runtime::{ + accounts_update_notifier_interface::AccountsUpdateNotifier, bank_forks::{ArchiveFormat, BankForks, SnapshotConfig}, snapshot_utils, }; @@ -33,6 +34,7 @@ fn to_loadresult( /// /// If a snapshot config is given, and a snapshot is found, it will be loaded. Otherwise, load /// from genesis. +#[allow(clippy::too_many_arguments)] pub fn load( genesis_config: &GenesisConfig, blockstore: &Blockstore, @@ -42,6 +44,7 @@ pub fn load( process_options: ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + accounts_update_notifier: Option, ) -> LoadResult { if let Some(snapshot_config) = snapshot_config.as_ref() { info!( @@ -70,6 +73,7 @@ pub fn load( archive_slot, archive_hash, archive_format, + accounts_update_notifier, ); } else { info!("No snapshot package available; will load from genesis"); @@ -84,6 +88,7 @@ pub fn load( account_paths, process_options, cache_block_meta_sender, + accounts_update_notifier, ) } @@ -93,6 +98,7 @@ fn load_from_genesis( account_paths: Vec, process_options: ProcessOptions, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + accounts_update_notifier: Option, ) -> LoadResult { info!("Processing ledger from genesis"); to_loadresult( @@ -102,6 +108,7 @@ fn load_from_genesis( account_paths, process_options, cache_block_meta_sender, + accounts_update_notifier, ), None, ) @@ -121,6 +128,7 @@ fn load_from_snapshot( archive_slot: Slot, archive_hash: Hash, archive_format: ArchiveFormat, + accounts_update_notifier: Option, ) -> LoadResult { info!("Loading snapshot package: {:?}", archive_filename); @@ -145,6 +153,7 @@ fn load_from_snapshot( process_options.shrink_ratio, process_options.accounts_db_test_hash_calculation, process_options.accounts_db_skip_shrink, + accounts_update_notifier, ) .expect("Load from snapshot failed"); if let Some(shrink_paths) = shrink_paths { diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index daf4d18306..a01a68931d 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -19,6 +19,7 @@ use solana_rayon_threadlimit::get_thread_count; use solana_runtime::{ accounts_db::AccountShrinkThreshold, accounts_index::AccountSecondaryIndexes, + accounts_update_notifier_interface::AccountsUpdateNotifier, bank::{ Bank, ExecuteTimings, InnerInstructionsList, RentDebits, TransactionBalancesSet, TransactionExecutionResult, TransactionLogMessages, TransactionResults, @@ -464,6 +465,7 @@ pub fn process_blockstore( account_paths: Vec, opts: ProcessOptions, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + accounts_update_notifier: Option, ) -> BlockstoreProcessorResult { if let Some(num_threads) = opts.override_num_threads { PAR_THREAD_POOL.with(|pool| { @@ -485,6 +487,7 @@ pub fn process_blockstore( opts.accounts_db_caching_enabled, opts.shrink_ratio, false, + accounts_update_notifier, ); let bank0 = Arc::new(bank0); info!("processing ledger for slot 0..."); @@ -1456,6 +1459,7 @@ pub mod tests { ..ProcessOptions::default() }, None, + None, ) .unwrap(); assert_eq!(frozen_bank_slots(&bank_forks), vec![0]); @@ -1501,6 +1505,7 @@ pub mod tests { ..ProcessOptions::default() }, None, + None, ) .unwrap(); assert_eq!(frozen_bank_slots(&bank_forks), vec![0]); @@ -1518,6 +1523,7 @@ pub mod tests { ..ProcessOptions::default() }, None, + None, ) .unwrap(); @@ -1574,7 +1580,7 @@ pub mod tests { ..ProcessOptions::default() }; let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); assert_eq!(frozen_bank_slots(&bank_forks), vec![0]); } @@ -1640,7 +1646,7 @@ pub mod tests { ..ProcessOptions::default() }; let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); assert_eq!(frozen_bank_slots(&bank_forks), vec![0]); // slot 1 isn't "full", we stop at slot zero @@ -1660,7 +1666,7 @@ pub mod tests { fill_blockstore_slot_with_ticks(&blockstore, ticks_per_slot, 3, 0, blockhash); // Slot 0 should not show up in the ending bank_forks_info let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); // slot 1 isn't "full", we stop at slot zero assert_eq!(frozen_bank_slots(&bank_forks), vec![0, 3]); @@ -1728,7 +1734,7 @@ pub mod tests { ..ProcessOptions::default() }; let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); // One fork, other one is ignored b/c not a descendant of the root assert_eq!(frozen_bank_slots(&bank_forks), vec![4]); @@ -1808,7 +1814,7 @@ pub mod tests { ..ProcessOptions::default() }; let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); assert_eq!(frozen_bank_slots(&bank_forks), vec![1, 2, 3, 4]); assert_eq!(bank_forks.working_bank().slot(), 4); @@ -1869,6 +1875,7 @@ pub mod tests { Vec::new(), ProcessOptions::default(), None, + None, ) .unwrap(); @@ -1919,6 +1926,7 @@ pub mod tests { Vec::new(), ProcessOptions::default(), None, + None, ) .unwrap(); @@ -1972,6 +1980,7 @@ pub mod tests { Vec::new(), ProcessOptions::default(), None, + None, ) .unwrap(); @@ -2025,7 +2034,7 @@ pub mod tests { ..ProcessOptions::default() }; let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); // There is one fork, head is last_slot + 1 assert_eq!(frozen_bank_slots(&bank_forks), vec![last_slot + 1]); @@ -2170,7 +2179,7 @@ pub mod tests { ..ProcessOptions::default() }; let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); assert_eq!(frozen_bank_slots(&bank_forks), vec![0, 1]); assert_eq!(bank_forks.root(), 0); @@ -2200,7 +2209,7 @@ pub mod tests { ..ProcessOptions::default() }; let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); assert_eq!(frozen_bank_slots(&bank_forks), vec![0]); let bank = bank_forks[0].clone(); @@ -2218,7 +2227,7 @@ pub mod tests { accounts_db_test_hash_calculation: true, ..ProcessOptions::default() }; - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); PAR_THREAD_POOL.with(|pool| { assert_eq!(pool.borrow().current_num_threads(), 1); }); @@ -2236,7 +2245,7 @@ pub mod tests { ..ProcessOptions::default() }; let (_bank_forks, leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); assert_eq!(leader_schedule.max_schedules(), std::usize::MAX); } @@ -2297,7 +2306,7 @@ pub mod tests { accounts_db_test_hash_calculation: true, ..ProcessOptions::default() }; - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); assert_eq!(*callback_counter.write().unwrap(), 2); } @@ -2952,7 +2961,7 @@ pub mod tests { ..ProcessOptions::default() }; let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); // Should be able to fetch slot 0 because we specified halting at slot 0, even // if there is a greater root at slot 1. @@ -3202,6 +3211,7 @@ pub mod tests { false, AccountShrinkThreshold::default(), false, + None, ); *bank.epoch_schedule() } @@ -3459,9 +3469,15 @@ pub mod tests { accounts_db_test_hash_calculation: true, ..ProcessOptions::default() }; - let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts.clone(), None) - .unwrap(); + let (bank_forks, _leader_schedule) = process_blockstore( + &genesis_config, + &blockstore, + Vec::new(), + opts.clone(), + None, + None, + ) + .unwrap(); // prepare to add votes let last_vote_bank_hash = bank_forks.get(last_main_fork_slot - 1).unwrap().hash(); @@ -3492,9 +3508,15 @@ pub mod tests { &leader_keypair, ); - let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts.clone(), None) - .unwrap(); + let (bank_forks, _leader_schedule) = process_blockstore( + &genesis_config, + &blockstore, + Vec::new(), + opts.clone(), + None, + None, + ) + .unwrap(); assert_eq!(bank_forks.root(), expected_root_slot); assert_eq!( @@ -3549,7 +3571,7 @@ pub mod tests { ); let (bank_forks, _leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None, None).unwrap(); assert_eq!(bank_forks.root(), really_expected_root_slot); } diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 0348e5d960..0fcc985e2a 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -12,6 +12,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { account_paths: config.account_paths.clone(), account_shrink_paths: config.account_shrink_paths.clone(), rpc_config: config.rpc_config.clone(), + accountsdb_plugin_config_file: config.accountsdb_plugin_config_file.clone(), rpc_addrs: config.rpc_addrs, pubsub_config: config.pubsub_config.clone(), snapshot_config: config.snapshot_config.clone(), diff --git a/replica-node/src/replica_node.rs b/replica-node/src/replica_node.rs index a7a9d2d1a9..720692f0d7 100644 --- a/replica-node/src/replica_node.rs +++ b/replica-node/src/replica_node.rs @@ -133,6 +133,7 @@ fn initialize_from_snapshot( false, process_options.verify_index, process_options.accounts_db_config, + None, ) .unwrap(); diff --git a/rpc/src/optimistically_confirmed_bank_tracker.rs b/rpc/src/optimistically_confirmed_bank_tracker.rs index c7eab7dc00..5261e2298d 100644 --- a/rpc/src/optimistically_confirmed_bank_tracker.rs +++ b/rpc/src/optimistically_confirmed_bank_tracker.rs @@ -31,6 +31,7 @@ impl OptimisticallyConfirmedBank { } } +#[derive(Clone)] pub enum BankNotification { OptimisticallyConfirmed(Slot), Frozen(Arc), @@ -63,6 +64,7 @@ impl OptimisticallyConfirmedBankTracker { bank_forks: Arc>, optimistically_confirmed_bank: Arc>, subscriptions: Arc, + bank_notification_subscribers: Option>>>, ) -> Self { let exit_ = exit.clone(); let mut pending_optimistically_confirmed_banks = HashSet::new(); @@ -83,6 +85,7 @@ impl OptimisticallyConfirmedBankTracker { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &bank_notification_subscribers, ) { break; } @@ -99,6 +102,7 @@ impl OptimisticallyConfirmedBankTracker { mut pending_optimistically_confirmed_banks: &mut HashSet, mut last_notified_confirmed_slot: &mut Slot, mut highest_confirmed_slot: &mut Slot, + bank_notification_subscribers: &Option>>>, ) -> Result<(), RecvTimeoutError> { let notification = receiver.recv_timeout(Duration::from_secs(1))?; Self::process_notification( @@ -109,16 +113,37 @@ impl OptimisticallyConfirmedBankTracker { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + bank_notification_subscribers, ); Ok(()) } + fn notify_slot_status( + bank_notification_subscribers: &Option>>>, + notifcation: BankNotification, + ) { + if let Some(bank_notification_subscribers) = bank_notification_subscribers { + for sender in bank_notification_subscribers.read().unwrap().iter() { + match sender.send(notifcation.clone()) { + Ok(_) => {} + Err(err) => { + info!( + "Failed to send notification {:?}, error: {:?}", + notifcation, err + ); + } + } + } + } + } + fn notify_or_defer( subscriptions: &Arc, bank_forks: &Arc>, bank: &Arc, last_notified_confirmed_slot: &mut Slot, pending_optimistically_confirmed_banks: &mut HashSet, + bank_notification_subscribers: &Option>>>, ) { if bank.is_frozen() { if bank.slot() > *last_notified_confirmed_slot { @@ -128,6 +153,10 @@ impl OptimisticallyConfirmedBankTracker { ); subscriptions.notify_gossip_subscribers(bank.slot()); *last_notified_confirmed_slot = bank.slot(); + Self::notify_slot_status( + bank_notification_subscribers, + BankNotification::OptimisticallyConfirmed(bank.slot()), + ); } } else if bank.slot() > bank_forks.read().unwrap().root_bank().slot() { pending_optimistically_confirmed_banks.insert(bank.slot()); @@ -142,6 +171,7 @@ impl OptimisticallyConfirmedBankTracker { slot_threshold: Slot, mut last_notified_confirmed_slot: &mut Slot, mut pending_optimistically_confirmed_banks: &mut HashSet, + bank_notification_subscribers: &Option>>>, ) { for confirmed_bank in bank.clone().parents_inclusive().iter().rev() { if confirmed_bank.slot() > slot_threshold { @@ -155,6 +185,7 @@ impl OptimisticallyConfirmedBankTracker { confirmed_bank, &mut last_notified_confirmed_slot, &mut pending_optimistically_confirmed_banks, + bank_notification_subscribers, ); } } @@ -168,6 +199,7 @@ impl OptimisticallyConfirmedBankTracker { mut pending_optimistically_confirmed_banks: &mut HashSet, mut last_notified_confirmed_slot: &mut Slot, highest_confirmed_slot: &mut Slot, + bank_notification_subscribers: &Option>>>, ) { debug!("received bank notification: {:?}", notification); match notification { @@ -189,6 +221,7 @@ impl OptimisticallyConfirmedBankTracker { *highest_confirmed_slot, &mut last_notified_confirmed_slot, &mut pending_optimistically_confirmed_banks, + bank_notification_subscribers, ); *highest_confirmed_slot = slot; @@ -222,6 +255,11 @@ impl OptimisticallyConfirmedBankTracker { max_transactions_per_entry: bank.transactions_per_entry_max(), }, }); + + Self::notify_slot_status( + bank_notification_subscribers, + BankNotification::Frozen(bank.clone()), + ); } if pending_optimistically_confirmed_banks.remove(&bank.slot()) { @@ -237,6 +275,7 @@ impl OptimisticallyConfirmedBankTracker { *last_notified_confirmed_slot, &mut last_notified_confirmed_slot, &mut pending_optimistically_confirmed_banks, + bank_notification_subscribers, ); let mut w_optimistically_confirmed_bank = @@ -248,6 +287,10 @@ impl OptimisticallyConfirmedBankTracker { } } BankNotification::Root(bank) => { + Self::notify_slot_status( + bank_notification_subscribers, + BankNotification::Root(bank.clone()), + ); let root_slot = bank.slot(); let mut w_optimistically_confirmed_bank = optimistically_confirmed_bank.write().unwrap(); @@ -320,6 +363,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); assert_eq!(highest_confirmed_slot, 2); @@ -333,6 +377,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); assert_eq!(highest_confirmed_slot, 2); @@ -346,6 +391,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); assert_eq!(pending_optimistically_confirmed_banks.len(), 1); @@ -364,6 +410,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3); assert_eq!(highest_confirmed_slot, 3); @@ -381,6 +428,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3); assert_eq!(pending_optimistically_confirmed_banks.len(), 1); @@ -399,6 +447,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5); assert_eq!(pending_optimistically_confirmed_banks.len(), 0); @@ -424,6 +473,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5); assert_eq!(pending_optimistically_confirmed_banks.len(), 0); diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 9c0e20d333..da201a43c2 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -7515,6 +7515,7 @@ pub mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; @@ -7532,6 +7533,7 @@ pub mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; @@ -7549,6 +7551,7 @@ pub mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; @@ -7567,6 +7570,7 @@ pub mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 7adee218b2..cd80af723e 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -1290,6 +1290,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); // a closure to reduce code duplications in building expected responses: @@ -1339,6 +1340,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let response = receiver.recv(); @@ -1454,6 +1456,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); // The following should panic @@ -1565,6 +1568,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); // a closure to reduce code duplications in building expected responses: @@ -1616,6 +1620,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let response = receiver.recv(); @@ -2031,6 +2036,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); // Now, notify the frozen bank and ensure its notifications are processed @@ -2043,6 +2049,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let response = receiver0.recv(); @@ -2091,6 +2098,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let response = receiver1.recv(); let expected = json!({ diff --git a/runtime/benches/accounts.rs b/runtime/benches/accounts.rs index 364707ddde..e94b595a5d 100644 --- a/runtime/benches/accounts.rs +++ b/runtime/benches/accounts.rs @@ -62,6 +62,7 @@ fn test_accounts_create(bencher: &mut Bencher) { false, AccountShrinkThreshold::default(), false, + None, ); bencher.iter(|| { let mut pubkeys: Vec = vec![]; @@ -83,6 +84,7 @@ fn test_accounts_squash(bencher: &mut Bencher) { false, AccountShrinkThreshold::default(), false, + None, )); let mut pubkeys: Vec = vec![]; deposit_many(&prev_bank, &mut pubkeys, 250_000).unwrap(); @@ -109,6 +111,7 @@ fn test_accounts_hash_bank_hash(bencher: &mut Bencher) { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let mut pubkeys: Vec = vec![]; let num_accounts = 60_000; @@ -136,6 +139,7 @@ fn test_update_accounts_hash(bencher: &mut Bencher) { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let mut pubkeys: Vec = vec![]; create_test_accounts(&accounts, &mut pubkeys, 50_000, 0); @@ -154,6 +158,7 @@ fn test_accounts_delta_hash(bencher: &mut Bencher) { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let mut pubkeys: Vec = vec![]; create_test_accounts(&accounts, &mut pubkeys, 100_000, 0); @@ -171,6 +176,7 @@ fn bench_delete_dependencies(bencher: &mut Bencher) { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let mut old_pubkey = Pubkey::default(); let zero_account = AccountSharedData::new(0, 0, AccountSharedData::default().owner()); @@ -205,6 +211,7 @@ fn store_accounts_with_possible_contention( AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, )); let num_keys = 1000; let slot = 0; @@ -341,6 +348,7 @@ fn setup_bench_dashmap_iter() -> (Arc, DashMap, ) -> Self { Self { accounts_db: Arc::new(AccountsDb::new_with_config( @@ -145,6 +148,7 @@ impl Accounts { account_indexes, caching_enabled, shrink_ratio, + accounts_update_notifier, )), account_locks: Mutex::new(AccountLocks::default()), } @@ -1185,6 +1189,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); for ka in ka.iter() { accounts.store_slow_uncached(0, &ka.0, &ka.1); @@ -1723,6 +1728,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); // Load accounts owned by various programs into AccountsDb @@ -1993,6 +1999,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let mut error_counters = ErrorCounters::default(); let ancestors = vec![(0, 0)].into_iter().collect(); @@ -2017,6 +2024,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); accounts.bank_hash_at(1); } @@ -2039,6 +2047,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); accounts.store_slow_uncached(0, &keypair0.pubkey(), &account0); accounts.store_slow_uncached(0, &keypair1.pubkey(), &account1); @@ -2151,6 +2160,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); accounts.store_slow_uncached(0, &keypair0.pubkey(), &account0); accounts.store_slow_uncached(0, &keypair1.pubkey(), &account1); @@ -2239,6 +2249,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); accounts.store_slow_uncached(0, &keypair0.pubkey(), &account0); accounts.store_slow_uncached(0, &keypair1.pubkey(), &account1); @@ -2361,6 +2372,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); { accounts @@ -2417,6 +2429,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let mut old_pubkey = Pubkey::default(); let zero_account = AccountSharedData::new(0, 0, AccountSharedData::default().owner()); @@ -2465,6 +2478,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let instructions_key = solana_sdk::sysvar::instructions::id(); @@ -2755,6 +2769,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let txs = &[tx]; let collected_accounts = accounts.collect_accounts_to_store( @@ -2874,6 +2889,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let txs = &[tx]; let collected_accounts = accounts.collect_accounts_to_store( @@ -2912,6 +2928,7 @@ mod tests { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let pubkey0 = Pubkey::new_unique(); diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 51f311d2a1..1446db4f91 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -26,6 +26,7 @@ use crate::{ AccountIndexGetResult, AccountSecondaryIndexes, AccountsIndex, AccountsIndexRootsStats, IndexKey, IsCached, ScanResult, SlotList, SlotSlice, ZeroLamport, }, + accounts_update_notifier_interface::AccountsUpdateNotifier, ancestors::Ancestors, append_vec::{AppendVec, StoredAccountMeta, StoredMeta, StoredMetaWriteVersion}, contains::Contains, @@ -954,6 +955,7 @@ pub struct AccountsDb { /// such that potentially a 0-lamport account update could be present which /// means we can remove the account from the index entirely. dirty_stores: DashMap<(Slot, AppendVecId), Arc>, + accounts_update_notifier: Option, } #[derive(Debug, Default)] @@ -1375,6 +1377,7 @@ impl Default for AccountsDb { remove_unrooted_slots_synchronization: RemoveUnrootedSlotsSynchronization::default(), shrink_ratio: AccountShrinkThreshold::default(), dirty_stores: DashMap::default(), + accounts_update_notifier: None, } } } @@ -1390,6 +1393,7 @@ impl AccountsDb { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ) } @@ -1399,6 +1403,7 @@ impl AccountsDb { account_indexes: AccountSecondaryIndexes, caching_enabled: bool, shrink_ratio: AccountShrinkThreshold, + accounts_update_notifier: Option, ) -> Self { let mut new = if !paths.is_empty() { Self { @@ -1408,6 +1413,7 @@ impl AccountsDb { account_indexes, caching_enabled, shrink_ratio, + accounts_update_notifier, ..Self::default() } } else { @@ -5462,6 +5468,16 @@ impl AccountsDb { pub fn store_cached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) { self.store(slot, accounts, self.caching_enabled); + + if let Some(accounts_update_notifier) = &self.accounts_update_notifier { + let notifier = &accounts_update_notifier.read().unwrap(); + + for account in accounts { + let pubkey = account.0; + let account = account.1; + notifier.notify_account_update(slot, pubkey, account); + } + } } /// Store the account update. @@ -6080,6 +6096,24 @@ impl AccountsDb { } } } + + pub fn notify_account_restore_from_snapshot(&self) { + if let Some(accounts_update_notifier) = &self.accounts_update_notifier { + let notifier = &accounts_update_notifier.read().unwrap(); + let slots = self.storage.all_slots(); + for slot in &slots { + let slot_stores = self.storage.get_slot_stores(*slot).unwrap(); + + let slot_stores = slot_stores.read().unwrap(); + for (_, storage_entry) in slot_stores.iter() { + let accounts = storage_entry.all_accounts(); + for account in &accounts { + notifier.notify_account_restore_from_snapshot(*slot, account); + } + } + } + } + } } #[cfg(test)] @@ -7753,6 +7787,7 @@ pub mod tests { spl_token_mint_index_enabled(), false, AccountShrinkThreshold::default(), + None, ); let pubkey1 = solana_sdk::pubkey::new_rand(); let pubkey2 = solana_sdk::pubkey::new_rand(); @@ -9901,6 +9936,7 @@ pub mod tests { AccountSecondaryIndexes::default(), true, AccountShrinkThreshold::default(), + None, ); let account = AccountSharedData::new(1, 16 * 4096, &Pubkey::default()); @@ -10212,6 +10248,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, )); let account_key = Pubkey::new_unique(); @@ -10260,6 +10297,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, )); let account_key = Pubkey::new_unique(); @@ -10309,6 +10347,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, )); let zero_lamport_account_key = Pubkey::new_unique(); @@ -10444,6 +10483,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, )); let account_key = Pubkey::new_unique(); let account_key2 = Pubkey::new_unique(); @@ -10551,6 +10591,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, ); let slot: Slot = 0; let num_keys = 10; @@ -10606,6 +10647,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, )); let slots: Vec<_> = (0..num_slots as Slot).into_iter().collect(); let stall_slot = num_slots as Slot; @@ -11011,6 +11053,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, ); let account_key1 = Pubkey::new_unique(); let account_key2 = Pubkey::new_unique(); @@ -11268,6 +11311,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, ); db.load_delay = RACY_SLEEP_MS; let db = Arc::new(db); @@ -11340,6 +11384,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, ); db.load_delay = RACY_SLEEP_MS; let db = Arc::new(db); @@ -11416,6 +11461,7 @@ pub mod tests { AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), + None, ); let db = Arc::new(db); let num_cached_slots = 100; diff --git a/runtime/src/accounts_update_notifier_interface.rs b/runtime/src/accounts_update_notifier_interface.rs new file mode 100644 index 0000000000..70730ef01c --- /dev/null +++ b/runtime/src/accounts_update_notifier_interface.rs @@ -0,0 +1,25 @@ +use { + crate::append_vec::StoredAccountMeta, + solana_sdk::{account::AccountSharedData, clock::Slot, pubkey::Pubkey}, + std::sync::{Arc, RwLock}, +}; + +pub trait AccountsUpdateNotifierInterface: std::fmt::Debug { + /// Notified when an account is updated at runtime, due to transaction activities + fn notify_account_update(&self, slot: Slot, pubkey: &Pubkey, account: &AccountSharedData); + + /// Notified when the AccountsDb is initialized at start when restored + /// from a snapshot. + fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta); + + /// Notified when a slot is optimistically confirmed + fn notify_slot_confirmed(&self, slot: Slot, parent: Option); + + /// Notified when a slot is marked frozen. + fn notify_slot_processed(&self, slot: Slot, parent: Option); + + /// Notified when a slot is rooted. + fn notify_slot_rooted(&self, slot: Slot, parent: Option); +} + +pub type AccountsUpdateNotifier = Arc>; diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index d735570f09..a0884245aa 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -40,6 +40,7 @@ use crate::{ }, accounts_db::{AccountShrinkThreshold, ErrorCounters, SnapshotStorages}, accounts_index::{AccountSecondaryIndexes, IndexKey, ScanResult}, + accounts_update_notifier_interface::AccountsUpdateNotifier, ancestors::{Ancestors, AncestorsForSerialization}, blockhash_queue::BlockhashQueue, builtins::{self, ActivationType}, @@ -1069,6 +1070,7 @@ impl Bank { false, AccountShrinkThreshold::default(), false, + None, ) } @@ -1083,6 +1085,7 @@ impl Bank { false, AccountShrinkThreshold::default(), false, + None, ); bank.ns_per_slot = std::u128::MAX; @@ -1106,9 +1109,11 @@ impl Bank { accounts_db_caching_enabled, shrink_ratio, false, + None, ) } + #[allow(clippy::too_many_arguments)] pub fn new_with_paths( genesis_config: &GenesisConfig, paths: Vec, @@ -1119,6 +1124,7 @@ impl Bank { accounts_db_caching_enabled: bool, shrink_ratio: AccountShrinkThreshold, debug_do_not_add_builtins: bool, + accounts_update_notifier: Option, ) -> Self { let mut bank = Self::default(); bank.ancestors = Ancestors::from(vec![bank.slot()]); @@ -1131,6 +1137,7 @@ impl Bank { account_indexes, accounts_db_caching_enabled, shrink_ratio, + accounts_update_notifier, )); bank.process_genesis_config(genesis_config); bank.finish_init( @@ -12542,6 +12549,7 @@ pub(crate) mod tests { false, AccountShrinkThreshold::default(), false, + None, )); // move to next epoch to create now deprecated rewards sysvar intentionally let bank1 = Arc::new(Bank::new_from_parent( diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 0d1570b38f..8a1844467a 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -6,6 +6,7 @@ pub mod accounts_cache; pub mod accounts_db; pub mod accounts_hash; pub mod accounts_index; +pub mod accounts_update_notifier_interface; pub mod ancestors; pub mod append_vec; pub mod bank; diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index 999bbe3840..7954bddcdd 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -5,6 +5,7 @@ use { AccountShrinkThreshold, AccountStorageEntry, AccountsDb, AppendVecId, BankHashInfo, }, accounts_index::AccountSecondaryIndexes, + accounts_update_notifier_interface::AccountsUpdateNotifier, ancestors::Ancestors, append_vec::{AppendVec, StoredMetaWriteVersion}, bank::{Bank, BankFieldsToDeserialize, BankRc, Builtins}, @@ -21,6 +22,7 @@ use { log::*, rayon::prelude::*, serde::{de::DeserializeOwned, Deserialize, Serialize}, + solana_measure::measure::Measure, solana_sdk::{ clock::{Epoch, Slot, UnixTimestamp}, epoch_schedule::EpochSchedule, @@ -140,6 +142,7 @@ pub(crate) fn bank_from_stream( caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, + accounts_update_notifier: Option, ) -> std::result::Result where R: Read, @@ -161,6 +164,7 @@ where caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, + accounts_update_notifier, )?; Ok(bank) }}; @@ -252,6 +256,7 @@ fn reconstruct_bank_from_fields( caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, + accounts_update_notifier: Option, ) -> Result where E: SerializableStorage + std::marker::Sync, @@ -265,6 +270,7 @@ where caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, + accounts_update_notifier, )?; accounts_db.freeze_accounts( &Ancestors::from(&bank_fields.ancestors), @@ -314,6 +320,7 @@ fn reconstruct_accountsdb_from_fields( caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, + accounts_update_notifier: Option, ) -> Result where E: SerializableStorage + std::marker::Sync, @@ -324,6 +331,7 @@ where account_indexes, caching_enabled, shrink_ratio, + accounts_update_notifier, ); let AccountsDbFields(storage, version, slot, bank_hash_info) = accounts_db_fields; @@ -388,6 +396,15 @@ where ); } + let mut measure_notify = Measure::start("accounts_notify"); + accounts_db.notify_account_restore_from_snapshot(); + measure_notify.stop(); + + datapoint_info!( + "reconstruct_accountsdb_from_fields()", + ("accountsdb-notify-at-start-us", measure_notify.as_us(), i64), + ); + if max_id > AppendVecId::MAX / 2 { panic!("Storage id {} larger than allowed max", max_id); } diff --git a/runtime/src/serde_snapshot/tests.rs b/runtime/src/serde_snapshot/tests.rs index c7bf8a9bda..8767701896 100644 --- a/runtime/src/serde_snapshot/tests.rs +++ b/runtime/src/serde_snapshot/tests.rs @@ -75,6 +75,7 @@ where false, None, AccountShrinkThreshold::default(), + None, ) } @@ -131,6 +132,7 @@ fn test_accounts_serialize_style(serde_style: SerdeStyle) { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), + None, ); let mut pubkeys: Vec = vec![]; @@ -231,6 +233,7 @@ fn test_bank_serialize_style(serde_style: SerdeStyle) { false, None, AccountShrinkThreshold::default(), + None, ) .unwrap(); dbank.src = ref_sc; diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index af468a8b4e..1e54a08e75 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -2,6 +2,7 @@ use { crate::{ accounts_db::{AccountShrinkThreshold, AccountsDb}, accounts_index::AccountSecondaryIndexes, + accounts_update_notifier_interface::AccountsUpdateNotifier, bank::{Bank, BankSlotDelta, Builtins}, bank_forks::ArchiveFormat, hardened_unpack::{unpack_snapshot, ParallelSelector, UnpackError, UnpackedAppendVecMap}, @@ -633,6 +634,7 @@ pub fn bank_from_archive + std::marker::Sync>( shrink_ratio: AccountShrinkThreshold, test_hash_calculation: bool, accounts_db_skip_shrink: bool, + accounts_update_notifier: Option, ) -> Result<(Bank, BankFromArchiveTimings)> { let unpack_dir = tempfile::Builder::new() .prefix(TMP_SNAPSHOT_PREFIX) @@ -673,6 +675,7 @@ pub fn bank_from_archive + std::marker::Sync>( accounts_db_caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, + accounts_update_notifier, )?; measure.stop(); @@ -900,6 +903,7 @@ fn rebuild_bank_from_snapshots( accounts_db_caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, + accounts_update_notifier: Option, ) -> Result { let (snapshot_version_enum, root_paths) = verify_snapshot_version_and_folder(snapshot_version, unpacked_snapshots_dir)?; @@ -922,6 +926,7 @@ fn rebuild_bank_from_snapshots( accounts_db_caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, + accounts_update_notifier, ), }?) })?; diff --git a/upload-perf/src/upload-perf.rs b/upload-perf/src/upload-perf.rs index d983596884..8018564827 100644 --- a/upload-perf/src/upload-perf.rs +++ b/upload-perf/src/upload-perf.rs @@ -64,17 +64,14 @@ fn main() { let deviation: i64 = v["deviation"].to_string().parse().unwrap(); if upload_metrics { panic!("TODO..."); - /* - solana_metrics::datapoint_info!( - &v["name"].as_str().unwrap().trim_matches('\"'), - ("test", "bench", String), - ("branch", branch.to_string(), String), - ("median", median, i64), - ("deviation", deviation, i64), - ("commit", git_commit_hash.trim().to_string(), String) - ); - */ - + // solana_metrics::datapoint_info!( + // &v["name"].as_str().unwrap().trim_matches('\"'), + // ("test", "bench", String), + // ("branch", branch.to_string(), String), + // ("median", median, i64), + // ("deviation", deviation, i64), + // ("commit", git_commit_hash.trim().to_string(), String) + // ); } let last_median = get_last_metrics(&"median".to_string(), &db, &name, branch).unwrap_or_default(); diff --git a/validator/src/main.rs b/validator/src/main.rs index dbd0eb007e..cba2a05188 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1711,6 +1711,14 @@ pub fn main() { .requires("enable_rpc_transaction_history") .help("Verifies blockstore roots on boot and fixes any gaps"), ) + .arg( + Arg::with_name("accountsdb_plugin_config") + .long("accountsdb-plugin-config") + .value_name("FILE") + .takes_value(true) + .hidden(true) + .help("Specify the configuration file for the AccountsDb plugin."), + ) .arg( Arg::with_name("halt_on_trusted_validators_accounts_hash_mismatch") .alias("halt-on-trusted-validators-accounts-hash-mismatch") @@ -2269,6 +2277,10 @@ pub fn main() { .ok() .or_else(|| get_cluster_shred_version(&entrypoint_addrs)); + let accountsdb_plugin_config_file = matches + .value_of("accountsdb_plugin_config") + .map(PathBuf::from); + let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), tower_path: value_t!(matches, "tower", PathBuf).ok(), @@ -2310,6 +2322,7 @@ pub fn main() { account_indexes: account_indexes.clone(), rpc_scan_and_fix_roots: matches.is_present("rpc_scan_and_fix_roots"), }, + accountsdb_plugin_config_file, rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| { ( SocketAddr::new(rpc_bind_address, rpc_port),