diff --git a/Cargo.lock b/Cargo.lock index f9cf99d9dc..4703a22e8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4198,7 +4198,7 @@ version = "1.8.1" dependencies = [ "bs58 0.4.0", "chrono", - "libloading 0.7.0", + "crossbeam-channel 0.5.0", "log 0.4.14", "postgres", "serde", diff --git a/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs b/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs index ed10433097..0af928838b 100644 --- a/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs +++ b/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs @@ -12,7 +12,7 @@ use { std::{ fs::File, io::Read, - path::Path, + path::{Path, PathBuf}, sync::{Arc, RwLock}, thread, }, @@ -63,15 +63,37 @@ impl AccountsDbPluginService { pub fn new( confirmed_bank_receiver: Receiver, - accountsdb_plugin_config_file: &Path, + accountsdb_plugin_config_files: &[PathBuf], ) -> Result { info!( - "Starting AccountsDbPluginService from config file: {:?}", - accountsdb_plugin_config_file + "Starting AccountsDbPluginService from config files: {:?}", + accountsdb_plugin_config_files ); - let plugin_manager = AccountsDbPluginManager::new(); - let plugin_manager = Arc::new(RwLock::new(plugin_manager)); + let mut plugin_manager = AccountsDbPluginManager::new(); + for accountsdb_plugin_config_file in accountsdb_plugin_config_files { + Self::load_plugin(&mut plugin_manager, accountsdb_plugin_config_file)?; + } + + let plugin_manager = Arc::new(RwLock::new(plugin_manager)); + let accounts_update_notifier = Arc::new(RwLock::new(AccountsUpdateNotifierImpl::new( + plugin_manager.clone(), + ))); + let slot_status_observer = + SlotStatusObserver::new(confirmed_bank_receiver, accounts_update_notifier.clone()); + + info!("Started AccountsDbPluginService"); + Ok(AccountsDbPluginService { + slot_status_observer, + plugin_manager, + accounts_update_notifier, + }) + } + + fn load_plugin( + plugin_manager: &mut AccountsDbPluginManager, + accountsdb_plugin_config_file: &Path, + ) -> Result<(), AccountsdbPluginServiceError> { let mut file = match File::open(accountsdb_plugin_config_file) { Ok(file) => file, Err(err) => { @@ -102,12 +124,6 @@ impl AccountsDbPluginService { } }; - let accounts_update_notifier = Arc::new(RwLock::new(AccountsUpdateNotifierImpl::new( - plugin_manager.clone(), - ))); - let slot_status_observer = - SlotStatusObserver::new(confirmed_bank_receiver, accounts_update_notifier.clone()); - let libpath = result["libpath"] .as_str() .ok_or(AccountsdbPluginServiceError::LibPathNotSet)?; @@ -117,10 +133,7 @@ impl AccountsDbPluginService { .ok_or(AccountsdbPluginServiceError::InvalidPluginPath)?; unsafe { - let result = plugin_manager - .write() - .unwrap() - .load_plugin(libpath, config_file); + let result = plugin_manager.load_plugin(libpath, config_file); if let Err(err) = result { let msg = format!( "Failed to load the plugin library: {:?}, error: {:?}", @@ -129,13 +142,7 @@ impl AccountsDbPluginService { return Err(AccountsdbPluginServiceError::PluginLoadError(msg)); } } - - info!("Started AccountsDbPluginService"); - Ok(AccountsDbPluginService { - slot_status_observer, - plugin_manager, - accounts_update_notifier, - }) + Ok(()) } pub fn get_accounts_update_notifier(&self) -> AccountsUpdateNotifier { diff --git a/accountsdb-plugin-postgres/Cargo.toml b/accountsdb-plugin-postgres/Cargo.toml index 59438c884c..6f4b2d3cb5 100644 --- a/accountsdb-plugin-postgres/Cargo.toml +++ b/accountsdb-plugin-postgres/Cargo.toml @@ -15,7 +15,7 @@ crate-type = ["cdylib", "rlib"] [dependencies] bs58 = "0.4.0" chrono = { version = "0.4.11", features = ["serde"] } -libloading = "0.7.0" +crossbeam-channel = "0.5" log = "0.4.14" postgres = { version = "0.19.1", features = ["with-chrono-0_4"] } serde = "1.0.130" diff --git a/accountsdb-plugin-postgres/README.md b/accountsdb-plugin-postgres/README.md new file mode 100644 index 0000000000..e43c327363 --- /dev/null +++ b/accountsdb-plugin-postgres/README.md @@ -0,0 +1,5 @@ +This is an example implementing the AccountsDb plugin for PostgreSQL database. +Please see the `src/accountsdb_plugin_postgres.rs` for the format of the plugin's configuration file. + +To create the schema objects for the database, please use `scripts/create_schema.sql`. +`scripts/drop_schema.sql` can be used to tear down the schema objects. \ No newline at end of file diff --git a/accountsdb-plugin-postgres/scripts/drop_schema.sql b/accountsdb-plugin-postgres/scripts/drop_schema.sql index e76a0dc165..c73dd9f050 100644 --- a/accountsdb-plugin-postgres/scripts/drop_schema.sql +++ b/accountsdb-plugin-postgres/scripts/drop_schema.sql @@ -2,6 +2,8 @@ * Script for cleaning up the schema for PostgreSQL used for the AccountsDb plugin. */ +DROP TRIGGER account_update_trigger; DROP FUNCTION audit_account_update; DROP TABLE account_audit; -DROP TABLE account; \ No newline at end of file +DROP TABLE account; +DROP TABLE slot; diff --git a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs index 1663a0e2d4..d06a2fd316 100644 --- a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs +++ b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs @@ -1,27 +1,30 @@ /// Main entry for the PostgreSQL plugin use { - crate::accounts_selector::AccountsSelector, + crate::{ + accounts_selector::AccountsSelector, + postgres_client::{ + ParallelPostgresClient, PostgresClient, PostgresClientBuilder, SimplePostgresClient, + }, + }, bs58, - chrono::Utc, log::*, - postgres::{Client, NoTls, Statement}, serde_derive::{Deserialize, Serialize}, serde_json, solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions, Result, SlotStatus, }, - std::{fs::File, io::Read, sync::Mutex}, + std::{fs::File, io::Read}, thiserror::Error, }; -struct PostgresSqlClientWrapper { - client: Client, - update_account_stmt: Statement, +enum PostgresClientEnum { + Simple(SimplePostgresClient), + Parallel(ParallelPostgresClient), } #[derive(Default)] pub struct AccountsDbPluginPostgres { - client: Option>, + client: Option, accounts_selector: Option, } @@ -32,13 +35,14 @@ impl std::fmt::Debug for AccountsDbPluginPostgres { } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -struct AccountsDbPluginPostgresConfig { - host: String, - user: String, +pub struct AccountsDbPluginPostgresConfig { + pub host: String, + pub user: String, + pub threads: Option, } #[derive(Error, Debug)] -enum AccountsDbPluginPostgresError { +pub enum AccountsDbPluginPostgresError { #[error("Error connecting to the backend data store.")] DataStoreConnectionError { msg: String }, @@ -61,7 +65,7 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { /// } /// or: /// "accounts_selector" = { - /// "owners" : \["pubkey-1', 'pubkey-2", ..., "pubkey-m"\] + /// "owners" : \["pubkey-1", "pubkey-2", ..., "pubkey-m"\] /// } /// Accounts either satisyfing the accounts condition or owners condition will be selected. /// When only owners is specified, @@ -72,11 +76,14 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { /// } /// "host" specifies the PostgreSQL server. /// "user" specifies the PostgreSQL user. + /// "threads" optional, specifies the number of worker threads for the plugin. A thread + /// maintains a PostgreSQL connection to the server. /// # Examples /// { /// "libpath": "/home/solana/target/release/libsolana_accountsdb_plugin_postgres.so", /// "host": "host_foo", /// "user": "solana", + /// "threads": 10, /// "accounts_selector" : { /// "owners" : ["9oT9R5ZyRovSVnt37QvVoBttGpNqR3J7unkb567NP8k3"] /// } @@ -107,51 +114,31 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { }) } Ok(config) => { - let connection_str = format!("host={} user={}", config.host, config.user); - match Client::connect(&connection_str, NoTls) { - Err(err) => { - return Err(AccountsDbPluginError::Custom( - Box::new(AccountsDbPluginPostgresError::DataStoreConnectionError { - msg: format!( - "Error in connecting to the PostgreSQL database: {:?} host: {:?} user: {:?} config: {:?}", - err, config.host, config.user, connection_str), - }))); - } - Ok(mut client) => { - let result = client.prepare("INSERT INTO account (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) \ - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \ - ON CONFLICT (pubkey) DO UPDATE SET slot=$2, owner=$3, lamports=$4, executable=$5, rent_epoch=$6, \ - data=$7, updated_on=$8"); - - match result { - Err(err) => { - return Err(AccountsDbPluginError::Custom( - Box::new(AccountsDbPluginPostgresError::DataSchemaError { - msg: format!( - "Error in preparing for the accounts update PostgreSQL database: {:?} host: {:?} user: {:?} config: {:?}", - err, config.host, config.user, connection_str - ), - }))); - } - Ok(update_account_stmt) => { - self.client = Some(Mutex::new(PostgresSqlClientWrapper { - client, - update_account_stmt, - })); - } - } - } - } + self.client = if config.threads.is_some() && config.threads.unwrap() > 1 { + let client = PostgresClientBuilder::build_pararallel_postgres_client(&config)?; + Some(PostgresClientEnum::Parallel(client)) + } else { + let client = PostgresClientBuilder::build_simple_postgres_client(&config)?; + Some(PostgresClientEnum::Simple(client)) + }; } } Ok(()) } - /// Unload all plugins and loaded plugin libraries, making sure to fire - /// their `on_plugin_unload()` methods so they can do any necessary cleanup. fn on_unload(&mut self) { info!("Unloading plugin: {:?}", self.name()); + + match &mut self.client { + None => {} + Some(client) => match client { + PostgresClientEnum::Parallel(client) => client.join().unwrap(), + PostgresClientEnum::Simple(client) => { + client.join().unwrap(); + } + }, + } } fn update_account(&mut self, account: ReplicaAccountInfoVersions, slot: u64) -> Result<()> { @@ -183,24 +170,14 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { ))); } Some(client) => { - let slot = slot as i64; // postgres only supports i64 - let lamports = account.lamports as i64; - let rent_epoch = account.rent_epoch as i64; - let updated_on = Utc::now().naive_utc(); - let client = client.get_mut().unwrap(); - let result = client.client.query( - &client.update_account_stmt, - &[ - &account.pubkey, - &slot, - &account.owner, - &lamports, - &account.executable, - &rent_epoch, - &account.data, - &updated_on, - ], - ); + let result = match client { + PostgresClientEnum::Parallel(client) => { + client.update_account(account, slot) + } + PostgresClientEnum::Simple(client) => { + client.update_account(account, slot) + } + }; if let Err(err) = result { return Err(AccountsDbPluginError::AccountsUpdateError { @@ -231,48 +208,19 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { ))); } Some(client) => { - let slot = slot as i64; // postgres only supports i64 - let parent = parent.map(|parent| parent as i64); - let updated_on = Utc::now().naive_utc(); - let status_str = status.as_str(); - - let result = match parent { - Some(parent) => { - client.get_mut().unwrap().client.execute( - "INSERT INTO slot (slot, parent, status, updated_on) \ - VALUES ($1, $2, $3, $4) \ - ON CONFLICT (slot) DO UPDATE SET parent=$2, status=$3, updated_on=$4", - &[ - &slot, - &parent, - &status_str, - &updated_on, - ], - ) - } - None => { - client.get_mut().unwrap().client.execute( - "INSERT INTO slot (slot, status, updated_on) \ - VALUES ($1, $2, $3) \ - ON CONFLICT (slot) DO UPDATE SET status=$2, updated_on=$3", - &[ - &slot, - &status_str, - &updated_on, - ], - ) - } + let result = match client { + PostgresClientEnum::Parallel(client) => { + client.update_slot_status(slot, parent, status) + } + PostgresClientEnum::Simple(client) => { + client.update_slot_status(slot, parent, status) + } }; - match result { - Err(err) => { - return Err(AccountsDbPluginError::SlotStatusUpdateError{ - msg: format!("Failed to persist the update of slot to the PostgreSQL database. Error: {:?}", err) - }); - } - Ok(rows) => { - assert_eq!(1, rows, "Expected one rows to be updated a time"); - } + if let Err(err) = result { + return Err(AccountsDbPluginError::SlotStatusUpdateError{ + msg: format!("Failed to persist the update of slot to the PostgreSQL database. Error: {:?}", err) + }); } } } diff --git a/accountsdb-plugin-postgres/src/lib.rs b/accountsdb-plugin-postgres/src/lib.rs index ccc4f17902..55eb28af00 100644 --- a/accountsdb-plugin-postgres/src/lib.rs +++ b/accountsdb-plugin-postgres/src/lib.rs @@ -1,2 +1,3 @@ pub mod accounts_selector; pub mod accountsdb_plugin_postgres; +pub mod postgres_client; diff --git a/accountsdb-plugin-postgres/src/postgres_client.rs b/accountsdb-plugin-postgres/src/postgres_client.rs new file mode 100644 index 0000000000..3cc8310eae --- /dev/null +++ b/accountsdb-plugin-postgres/src/postgres_client.rs @@ -0,0 +1,449 @@ +/// A concurrent implementation for writing accounts into the PostgreSQL in parallel. +use { + crate::accountsdb_plugin_postgres::{ + AccountsDbPluginPostgresConfig, AccountsDbPluginPostgresError, + }, + chrono::Utc, + crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender}, + log::*, + postgres::{Client, NoTls, Statement}, + solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ + AccountsDbPluginError, ReplicaAccountInfo, SlotStatus, + }, + std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, + }, +}; + +/// The maximum asynchronous requests allowed in the channel to avoid excessive +/// memory usage. The downside -- calls after this threshold is reached can get blocked. +const MAX_ASYNC_REQUESTS: usize = 10240; + +struct PostgresSqlClientWrapper { + client: Client, + update_account_stmt: Statement, +} + +pub struct SimplePostgresClient { + client: Mutex, +} + +struct PostgresClientWorker { + client: SimplePostgresClient, +} + +impl Eq for DbAccountInfo {} + +#[derive(Clone, PartialEq, Debug)] +pub struct DbAccountInfo { + pub pubkey: Vec, + pub lamports: u64, + pub owner: Vec, + pub executable: bool, + pub rent_epoch: u64, + pub data: Vec, +} + +impl DbAccountInfo { + fn new(account: &T) -> DbAccountInfo { + let data = account.data().to_vec(); + Self { + pubkey: account.pubkey().to_vec(), + lamports: account.lamports(), + owner: account.owner().to_vec(), + executable: account.executable(), + rent_epoch: account.rent_epoch(), + data, + } + } +} + +pub trait ReadableAccountInfo: Sized { + fn pubkey(&self) -> &[u8]; + fn owner(&self) -> &[u8]; + fn lamports(&self) -> u64; + fn executable(&self) -> bool; + fn rent_epoch(&self) -> u64; + fn data(&self) -> &[u8]; +} + +impl ReadableAccountInfo for DbAccountInfo { + fn pubkey(&self) -> &[u8] { + &self.pubkey + } + + fn owner(&self) -> &[u8] { + &self.owner + } + + fn lamports(&self) -> u64 { + self.lamports + } + + fn executable(&self) -> bool { + self.executable + } + + fn rent_epoch(&self) -> u64 { + self.rent_epoch + } + + fn data(&self) -> &[u8] { + &self.data + } +} + +impl<'a> ReadableAccountInfo for ReplicaAccountInfo<'a> { + fn pubkey(&self) -> &[u8] { + self.pubkey + } + + fn owner(&self) -> &[u8] { + self.owner + } + + fn lamports(&self) -> u64 { + self.lamports + } + + fn executable(&self) -> bool { + self.executable + } + + fn rent_epoch(&self) -> u64 { + self.rent_epoch + } + + fn data(&self) -> &[u8] { + self.data + } +} + +pub trait PostgresClient { + fn join(&mut self) -> thread::Result<()> { + Ok(()) + } + + fn update_account( + &mut self, + account: &T, + slot: u64, + ) -> Result<(), AccountsDbPluginError>; + + fn update_slot_status( + &mut self, + slot: u64, + parent: Option, + status: SlotStatus, + ) -> Result<(), AccountsDbPluginError>; +} + +impl SimplePostgresClient { + pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result { + let connection_str = format!("host={} user={}", config.host, config.user); + match Client::connect(&connection_str, NoTls) { + Err(err) => { + return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataStoreConnectionError { + msg: format!( + "Error in connecting to the PostgreSQL database: {:?} host: {:?} user: {:?} config: {:?}", + err, config.host, config.user, connection_str + ), + }))); + } + Ok(mut client) => { + let result = client.prepare("INSERT INTO account (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \ + ON CONFLICT (pubkey) DO UPDATE SET slot=$2, owner=$3, lamports=$4, executable=$5, rent_epoch=$6, \ + data=$7, updated_on=$8"); + + match result { + Err(err) => { + return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError { + msg: format!( + "Error in preparing for the accounts update PostgreSQL database: {:?} host: {:?} user: {:?} config: {:?}", + err, config.host, config.user, connection_str + ), + }))); + } + Ok(update_account_stmt) => Ok(Self { + client: Mutex::new(PostgresSqlClientWrapper { + client, + update_account_stmt, + }), + }), + } + } + } + } +} + +impl PostgresClient for SimplePostgresClient { + fn update_account( + &mut self, + account: &T, + slot: u64, + ) -> Result<(), AccountsDbPluginError> { + debug!( + "Updating account {:?} {:?} at slot {:?}", + account.pubkey(), + account.owner(), + slot, + ); + + let slot = slot as i64; // postgres only supports i64 + let lamports = account.lamports() as i64; + let rent_epoch = account.rent_epoch() as i64; + let updated_on = Utc::now().naive_utc(); + let client = self.client.get_mut().unwrap(); + let result = client.client.query( + &client.update_account_stmt, + &[ + &account.pubkey(), + &slot, + &account.owner(), + &lamports, + &account.executable(), + &rent_epoch, + &account.data(), + &updated_on, + ], + ); + + if let Err(err) = result { + return Err(AccountsDbPluginError::AccountsUpdateError { + msg: format!("Failed to persist the update of account to the PostgreSQL database. Error: {:?}", err) + }); + } + Ok(()) + } + + fn update_slot_status( + &mut self, + slot: u64, + parent: Option, + status: SlotStatus, + ) -> Result<(), AccountsDbPluginError> { + info!("Updating slot {:?} at with status {:?}", slot, status); + + let slot = slot as i64; // postgres only supports i64 + let parent = parent.map(|parent| parent as i64); + let updated_on = Utc::now().naive_utc(); + let status_str = status.as_str(); + let client = self.client.get_mut().unwrap(); + + let result = match parent { + Some(parent) => { + client.client.execute( + "INSERT INTO slot (slot, parent, status, updated_on) \ + VALUES ($1, $2, $3, $4) \ + ON CONFLICT (slot) DO UPDATE SET parent=$2, status=$3, updated_on=$4", + &[ + &slot, + &parent, + &status_str, + &updated_on, + ], + ) + } + None => { + client.client.execute( + "INSERT INTO slot (slot, status, updated_on) \ + VALUES ($1, $2, $3) \ + ON CONFLICT (slot) DO UPDATE SET status=$2, updated_on=$3", + &[ + &slot, + &status_str, + &updated_on, + ], + ) + } + }; + + match result { + Err(err) => { + return Err(AccountsDbPluginError::SlotStatusUpdateError{ + msg: format!("Failed to persist the update of slot to the PostgreSQL database. Error: {:?}", err) + }); + } + Ok(rows) => { + assert_eq!(1, rows, "Expected one rows to be updated a time"); + } + } + + Ok(()) + } +} + +struct UpdateAccountRequest { + account: DbAccountInfo, + slot: u64, +} + +struct UpdateSlotRequest { + slot: u64, + parent: Option, + slot_status: SlotStatus, +} + +enum DbWorkItem { + UpdateAccount(UpdateAccountRequest), + UpdateSlot(UpdateSlotRequest), +} + +impl PostgresClientWorker { + fn new(config: AccountsDbPluginPostgresConfig) -> Result { + let client = SimplePostgresClient::new(&config)?; + Ok(PostgresClientWorker { client }) + } + + fn do_work( + &mut self, + receiver: Receiver, + exit_worker: Arc, + ) -> Result<(), AccountsDbPluginError> { + while !exit_worker.load(Ordering::Relaxed) { + let work = receiver.recv_timeout(Duration::from_millis(500)); + + match work { + Ok(work) => match work { + DbWorkItem::UpdateAccount(request) => { + self.client.update_account(&request.account, request.slot)?; + } + DbWorkItem::UpdateSlot(request) => { + self.client.update_slot_status( + request.slot, + request.parent, + request.slot_status, + )?; + } + }, + Err(err) => match err { + RecvTimeoutError::Timeout => { + continue; + } + _ => { + error!("Error in receiving the item {:?}", err); + break; + } + }, + } + } + Ok(()) + } +} +pub struct ParallelPostgresClient { + workers: Vec>>, + exit_worker: Arc, + sender: Sender, +} + +impl ParallelPostgresClient { + pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result { + let (sender, receiver) = bounded(MAX_ASYNC_REQUESTS); + let exit_worker = Arc::new(AtomicBool::new(false)); + let mut workers = Vec::default(); + + for i in 0..config.threads.unwrap() { + let cloned_receiver = receiver.clone(); + let exit_clone = exit_worker.clone(); + let config = config.clone(); + let worker = Builder::new() + .name(format!("worker-{}", i)) + .spawn(move || -> Result<(), AccountsDbPluginError> { + let mut worker = PostgresClientWorker::new(config)?; + worker.do_work(cloned_receiver, exit_clone)?; + Ok(()) + }) + .unwrap(); + + workers.push(worker); + } + + Ok(Self { + workers, + exit_worker, + sender, + }) + } +} + +impl PostgresClient for ParallelPostgresClient { + fn join(&mut self) -> thread::Result<()> { + self.exit_worker.store(true, Ordering::Relaxed); + while !self.workers.is_empty() { + let worker = self.workers.pop(); + if worker.is_none() { + break; + } + let worker = worker.unwrap(); + let result = worker.join().unwrap(); + if result.is_err() { + error!("The worker thread has failed: {:?}", result); + } + } + + Ok(()) + } + + fn update_account( + &mut self, + account: &T, + slot: u64, + ) -> Result<(), AccountsDbPluginError> { + if let Err(err) = self + .sender + .send(DbWorkItem::UpdateAccount(UpdateAccountRequest { + account: DbAccountInfo::new(account), + slot, + })) + { + return Err(AccountsDbPluginError::AccountsUpdateError { + msg: format!( + "Failed to update the account {:?}, error: {:?}", + account.pubkey(), + err + ), + }); + } + Ok(()) + } + + fn update_slot_status( + &mut self, + slot: u64, + parent: Option, + status: SlotStatus, + ) -> Result<(), AccountsDbPluginError> { + if let Err(err) = self.sender.send(DbWorkItem::UpdateSlot(UpdateSlotRequest { + slot, + parent, + slot_status: status, + })) { + return Err(AccountsDbPluginError::SlotStatusUpdateError { + msg: format!("Failed to update the slot {:?}, error: {:?}", slot, err), + }); + } + Ok(()) + } +} + +pub struct PostgresClientBuilder {} + +impl PostgresClientBuilder { + pub fn build_pararallel_postgres_client( + config: &AccountsDbPluginPostgresConfig, + ) -> Result { + ParallelPostgresClient::new(config) + } + + pub fn build_simple_postgres_client( + config: &AccountsDbPluginPostgresConfig, + ) -> Result { + SimplePostgresClient::new(config) + } +} diff --git a/core/src/validator.rs b/core/src/validator.rs index 5f818f0a87..ebf21cae42 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -106,7 +106,7 @@ pub struct ValidatorConfig { pub account_paths: Vec, pub account_shrink_paths: Option>, pub rpc_config: JsonRpcConfig, - pub accountsdb_plugin_config_file: Option, + pub accountsdb_plugin_config_files: Option>, pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub) pub pubsub_config: PubSubConfig, pub snapshot_config: Option, @@ -166,7 +166,7 @@ impl Default for ValidatorConfig { account_paths: Vec::new(), account_shrink_paths: None, rpc_config: JsonRpcConfig::default(), - accountsdb_plugin_config_file: None, + accountsdb_plugin_config_files: None, rpc_addrs: None, pubsub_config: PubSubConfig::default(), snapshot_config: None, @@ -311,12 +311,12 @@ impl Validator { let mut bank_notification_senders = Vec::new(); let accountsdb_plugin_service = - if let Some(accountsdb_plugin_config_file) = &config.accountsdb_plugin_config_file { + if let Some(accountsdb_plugin_config_files) = &config.accountsdb_plugin_config_files { let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded(); bank_notification_senders.push(confirmed_bank_sender); let result = AccountsDbPluginService::new( confirmed_bank_receiver, - accountsdb_plugin_config_file, + accountsdb_plugin_config_files, ); match result { Ok(accountsdb_plugin_service) => Some(accountsdb_plugin_service), diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 0fcc985e2a..1d3927ed71 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -12,7 +12,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { account_paths: config.account_paths.clone(), account_shrink_paths: config.account_shrink_paths.clone(), rpc_config: config.rpc_config.clone(), - accountsdb_plugin_config_file: config.accountsdb_plugin_config_file.clone(), + accountsdb_plugin_config_files: config.accountsdb_plugin_config_files.clone(), rpc_addrs: config.rpc_addrs, pubsub_config: config.pubsub_config.clone(), snapshot_config: config.snapshot_config.clone(), diff --git a/validator/src/main.rs b/validator/src/main.rs index cba2a05188..da89906b14 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1716,6 +1716,7 @@ pub fn main() { .long("accountsdb-plugin-config") .value_name("FILE") .takes_value(true) + .multiple(true) .hidden(true) .help("Specify the configuration file for the AccountsDb plugin."), ) @@ -2277,9 +2278,16 @@ pub fn main() { .ok() .or_else(|| get_cluster_shred_version(&entrypoint_addrs)); - let accountsdb_plugin_config_file = matches - .value_of("accountsdb_plugin_config") - .map(PathBuf::from); + let accountsdb_plugin_config_files = if matches.is_present("accountsdb_plugin_config") { + Some( + values_t_or_exit!(matches, "accountsdb_plugin_config", String) + .into_iter() + .map(PathBuf::from) + .collect(), + ) + } else { + None + }; let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), @@ -2322,7 +2330,7 @@ pub fn main() { account_indexes: account_indexes.clone(), rpc_scan_and_fix_roots: matches.is_present("rpc_scan_and_fix_roots"), }, - accountsdb_plugin_config_file, + accountsdb_plugin_config_files, rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| { ( SocketAddr::new(rpc_bind_address, rpc_port),