Accountsdb plugin postgres improvement (#21034)
Summary of Changes Added the reference postgresql.conf Prepare slot update statement to reduce overhead in updating slot Support custom connection string Allow the plugin to panic on replication issues to ensure consistency
This commit is contained in:
@ -32,11 +32,13 @@ impl std::fmt::Debug for AccountsDbPluginPostgres {
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct AccountsDbPluginPostgresConfig {
|
||||
pub host: String,
|
||||
pub user: String,
|
||||
pub threads: Option<usize>,
|
||||
pub host: Option<String>,
|
||||
pub user: Option<String>,
|
||||
pub port: Option<u16>,
|
||||
pub connection_str: Option<String>,
|
||||
pub threads: Option<usize>,
|
||||
pub batch_size: Option<usize>,
|
||||
pub panic_on_db_errors: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
@ -46,6 +48,9 @@ pub enum AccountsDbPluginPostgresError {
|
||||
|
||||
#[error("Error preparing data store schema. Error message: ({msg})")]
|
||||
DataSchemaError { msg: String },
|
||||
|
||||
#[error("Error preparing data store schema. Error message: ({msg})")]
|
||||
ConfigurationError { msg: String },
|
||||
}
|
||||
|
||||
impl AccountsDbPlugin for AccountsDbPluginPostgres {
|
||||
@ -54,10 +59,9 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
|
||||
}
|
||||
|
||||
/// Do initialization for the PostgreSQL plugin.
|
||||
/// # Arguments
|
||||
///
|
||||
/// Format of the config file:
|
||||
/// The `accounts_selector` section allows the user to controls accounts selections.
|
||||
/// # Format of the config file:
|
||||
/// * The `accounts_selector` section allows the user to controls accounts selections.
|
||||
/// "accounts_selector" : {
|
||||
/// "accounts" : \["pubkey-1", "pubkey-2", ..., "pubkey-n"\],
|
||||
/// }
|
||||
@ -72,13 +76,21 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
|
||||
/// "accounts_selector" : {
|
||||
/// "accounts" : \["*"\],
|
||||
/// }
|
||||
/// "host" specifies the PostgreSQL server.
|
||||
/// "user" specifies the PostgreSQL user.
|
||||
/// "threads" optional, specifies the number of worker threads for the plugin. A thread
|
||||
/// maintains a PostgreSQL connection to the server. The default is 10.
|
||||
/// "batch_size" optional, specifies the batch size of bulk insert when the AccountsDb is created
|
||||
/// from restoring a snapshot. The default is "10".
|
||||
/// * "host", optional, specifies the PostgreSQL server.
|
||||
/// * "user", optional, specifies the PostgreSQL user.
|
||||
/// * "port", optional, specifies the PostgreSQL server's port.
|
||||
/// * "connection_str", optional, the custom PostgreSQL connection string.
|
||||
/// Please refer to https://docs.rs/postgres/0.19.2/postgres/config/struct.Config.html for the connection configuration.
|
||||
/// When `connection_str` is set, the values in "host", "user" and "port" are ignored. If `connection_str` is not given,
|
||||
/// `host` and `user` must be given.
|
||||
/// * "threads" optional, specifies the number of worker threads for the plugin. A thread
|
||||
/// maintains a PostgreSQL connection to the server. The default is '10'.
|
||||
/// * "batch_size" optional, specifies the batch size of bulk insert when the AccountsDb is created
|
||||
/// from restoring a snapshot. The default is '10'.
|
||||
/// * "panic_on_db_errors", optional, contols if to panic when there are errors replicating data to the
|
||||
/// PostgreSQL database. The default is 'false'.
|
||||
/// # Examples
|
||||
///
|
||||
/// {
|
||||
/// "libpath": "/home/solana/target/release/libsolana_accountsdb_plugin_postgres.so",
|
||||
/// "host": "host_foo",
|
||||
@ -86,6 +98,7 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
|
||||
/// "threads": 10,
|
||||
/// "accounts_selector" : {
|
||||
/// "owners" : ["9oT9R5ZyRovSVnt37QvVoBttGpNqR3J7unkb567NP8k3"]
|
||||
/// }
|
||||
/// }
|
||||
|
||||
fn on_load(&mut self, config_file: &str) -> Result<()> {
|
||||
|
@ -33,11 +33,14 @@ const DEFAULT_POSTGRES_PORT: u16 = 5432;
|
||||
const DEFAULT_THREADS_COUNT: usize = 100;
|
||||
const DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE: usize = 10;
|
||||
const ACCOUNT_COLUMN_COUNT: usize = 9;
|
||||
const DEFAULT_PANIC_ON_DB_ERROR: bool = false;
|
||||
|
||||
struct PostgresSqlClientWrapper {
|
||||
client: Client,
|
||||
update_account_stmt: Statement,
|
||||
bulk_account_insert_stmt: Statement,
|
||||
update_slot_with_parent_stmt: Statement,
|
||||
update_slot_without_parent_stmt: Statement,
|
||||
}
|
||||
|
||||
pub struct SimplePostgresClient {
|
||||
@ -66,6 +69,19 @@ pub struct DbAccountInfo {
|
||||
pub write_version: i64,
|
||||
}
|
||||
|
||||
pub(crate) fn abort() -> ! {
|
||||
#[cfg(not(test))]
|
||||
{
|
||||
// standard error is usually redirected to a log file, cry for help on standard output as
|
||||
// well
|
||||
eprintln!("Validator process aborted. The validator log may contain further details");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
panic!("process::exit(1) is intercepted for friendly test failure...");
|
||||
}
|
||||
|
||||
impl DbAccountInfo {
|
||||
fn new<T: ReadableAccountInfo>(account: &T, slot: u64) -> DbAccountInfo {
|
||||
let data = account.data().to_vec();
|
||||
@ -179,13 +195,32 @@ impl SimplePostgresClient {
|
||||
) -> Result<Client, AccountsDbPluginError> {
|
||||
let port = config.port.unwrap_or(DEFAULT_POSTGRES_PORT);
|
||||
|
||||
let connection_str = format!("host={} user={} port={}", config.host, config.user, port);
|
||||
let connection_str = if let Some(connection_str) = &config.connection_str {
|
||||
connection_str.clone()
|
||||
} else {
|
||||
if config.host.is_none() || config.user.is_none() {
|
||||
let msg = format!(
|
||||
"\"connection_str\": {:?}, or \"host\": {:?} \"user\": {:?} must be specified",
|
||||
config.connection_str, config.host, config.user
|
||||
);
|
||||
return Err(AccountsDbPluginError::Custom(Box::new(
|
||||
AccountsDbPluginPostgresError::ConfigurationError { msg },
|
||||
)));
|
||||
}
|
||||
format!(
|
||||
"host={} user={} port={}",
|
||||
config.host.as_ref().unwrap(),
|
||||
config.user.as_ref().unwrap(),
|
||||
port
|
||||
)
|
||||
};
|
||||
|
||||
match Client::connect(&connection_str, NoTls) {
|
||||
Err(err) => {
|
||||
let msg = format!(
|
||||
"Error in connecting to the PostgreSQL database: {:?} host: {:?} user: {:?} config: {:?}",
|
||||
err, config.host, config.user, connection_str);
|
||||
"Error in connecting to the PostgreSQL database: {:?} connection_str: {:?}",
|
||||
err, connection_str
|
||||
);
|
||||
error!("{}", msg);
|
||||
Err(AccountsDbPluginError::Custom(Box::new(
|
||||
AccountsDbPluginPostgresError::DataStoreConnectionError { msg },
|
||||
@ -238,7 +273,7 @@ impl SimplePostgresClient {
|
||||
Err(err) => {
|
||||
return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
|
||||
msg: format!(
|
||||
"Error in preparing for the accounts update PostgreSQL database: {} host: {} user: {} config: {:?}",
|
||||
"Error in preparing for the accounts update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
|
||||
err, config.host, config.user, config
|
||||
),
|
||||
})));
|
||||
@ -263,7 +298,7 @@ impl SimplePostgresClient {
|
||||
Err(err) => {
|
||||
return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
|
||||
msg: format!(
|
||||
"Error in preparing for the accounts update PostgreSQL database: {} host: {} user: {} config: {:?}",
|
||||
"Error in preparing for the accounts update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
|
||||
err, config.host, config.user, config
|
||||
),
|
||||
})));
|
||||
@ -272,6 +307,52 @@ impl SimplePostgresClient {
|
||||
}
|
||||
}
|
||||
|
||||
fn build_slot_upsert_statement_with_parent(
|
||||
client: &mut Client,
|
||||
config: &AccountsDbPluginPostgresConfig,
|
||||
) -> Result<Statement, AccountsDbPluginError> {
|
||||
let stmt = "INSERT INTO slot (slot, parent, status, updated_on) \
|
||||
VALUES ($1, $2, $3, $4) \
|
||||
ON CONFLICT (slot) DO UPDATE SET parent=excluded.parent, status=excluded.status, updated_on=excluded.updated_on";
|
||||
|
||||
let stmt = client.prepare(stmt);
|
||||
|
||||
match stmt {
|
||||
Err(err) => {
|
||||
return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
|
||||
msg: format!(
|
||||
"Error in preparing for the slot update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
|
||||
err, config.host, config.user, config
|
||||
),
|
||||
})));
|
||||
}
|
||||
Ok(stmt) => Ok(stmt),
|
||||
}
|
||||
}
|
||||
|
||||
fn build_slot_upsert_statement_without_parent(
|
||||
client: &mut Client,
|
||||
config: &AccountsDbPluginPostgresConfig,
|
||||
) -> Result<Statement, AccountsDbPluginError> {
|
||||
let stmt = "INSERT INTO slot (slot, status, updated_on) \
|
||||
VALUES ($1, $2, $3) \
|
||||
ON CONFLICT (slot) DO UPDATE SET status=excluded.status, updated_on=excluded.updated_on";
|
||||
|
||||
let stmt = client.prepare(stmt);
|
||||
|
||||
match stmt {
|
||||
Err(err) => {
|
||||
return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
|
||||
msg: format!(
|
||||
"Error in preparing for the slot update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
|
||||
err, config.host, config.user, config
|
||||
),
|
||||
})));
|
||||
}
|
||||
Ok(stmt) => Ok(stmt),
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal function for updating or inserting a single account
|
||||
fn upsert_account_internal(
|
||||
account: &DbAccountInfo,
|
||||
@ -406,6 +487,11 @@ impl SimplePostgresClient {
|
||||
Self::build_bulk_account_insert_statement(&mut client, config)?;
|
||||
let update_account_stmt = Self::build_single_account_upsert_statement(&mut client, config)?;
|
||||
|
||||
let update_slot_with_parent_stmt =
|
||||
Self::build_slot_upsert_statement_with_parent(&mut client, config)?;
|
||||
let update_slot_without_parent_stmt =
|
||||
Self::build_slot_upsert_statement_without_parent(&mut client, config)?;
|
||||
|
||||
let batch_size = config
|
||||
.batch_size
|
||||
.unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
|
||||
@ -417,6 +503,8 @@ impl SimplePostgresClient {
|
||||
client,
|
||||
update_account_stmt,
|
||||
bulk_account_insert_stmt,
|
||||
update_slot_with_parent_stmt,
|
||||
update_slot_without_parent_stmt,
|
||||
}),
|
||||
})
|
||||
}
|
||||
@ -455,32 +543,15 @@ impl PostgresClient for SimplePostgresClient {
|
||||
let client = self.client.get_mut().unwrap();
|
||||
|
||||
let result = match parent {
|
||||
Some(parent) => {
|
||||
client.client.execute(
|
||||
"INSERT INTO slot (slot, parent, status, updated_on) \
|
||||
VALUES ($1, $2, $3, $4) \
|
||||
ON CONFLICT (slot) DO UPDATE SET parent=$2, status=$3, updated_on=$4",
|
||||
&[
|
||||
&slot,
|
||||
&parent,
|
||||
&status_str,
|
||||
&updated_on,
|
||||
],
|
||||
)
|
||||
}
|
||||
None => {
|
||||
client.client.execute(
|
||||
"INSERT INTO slot (slot, status, updated_on) \
|
||||
VALUES ($1, $2, $3) \
|
||||
ON CONFLICT (slot) DO UPDATE SET status=$2, updated_on=$3",
|
||||
&[
|
||||
&slot,
|
||||
&status_str,
|
||||
&updated_on,
|
||||
],
|
||||
)
|
||||
}
|
||||
};
|
||||
Some(parent) => client.client.execute(
|
||||
&client.update_slot_with_parent_stmt,
|
||||
&[&slot, &parent, &status_str, &updated_on],
|
||||
),
|
||||
None => client.client.execute(
|
||||
&client.update_slot_without_parent_stmt,
|
||||
&[&slot, &status_str, &updated_on],
|
||||
),
|
||||
};
|
||||
|
||||
match result {
|
||||
Err(err) => {
|
||||
@ -541,6 +612,7 @@ impl PostgresClientWorker {
|
||||
exit_worker: Arc<AtomicBool>,
|
||||
is_startup_done: Arc<AtomicBool>,
|
||||
startup_done_count: Arc<AtomicUsize>,
|
||||
panic_on_db_errors: bool,
|
||||
) -> Result<(), AccountsDbPluginError> {
|
||||
while !exit_worker.load(Ordering::Relaxed) {
|
||||
let mut measure = Measure::start("accountsdb-plugin-postgres-worker-recv");
|
||||
@ -555,21 +627,38 @@ impl PostgresClientWorker {
|
||||
match work {
|
||||
Ok(work) => match work {
|
||||
DbWorkItem::UpdateAccount(request) => {
|
||||
self.client
|
||||
.update_account(request.account, request.is_startup)?;
|
||||
if let Err(err) = self
|
||||
.client
|
||||
.update_account(request.account, request.is_startup)
|
||||
{
|
||||
error!("Failed to update account: ({})", err);
|
||||
if panic_on_db_errors {
|
||||
abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
DbWorkItem::UpdateSlot(request) => {
|
||||
self.client.update_slot_status(
|
||||
if let Err(err) = self.client.update_slot_status(
|
||||
request.slot,
|
||||
request.parent,
|
||||
request.slot_status,
|
||||
)?;
|
||||
) {
|
||||
error!("Failed to update slot: ({})", err);
|
||||
if panic_on_db_errors {
|
||||
abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(err) => match err {
|
||||
RecvTimeoutError::Timeout => {
|
||||
if !self.is_startup_done && is_startup_done.load(Ordering::Relaxed) {
|
||||
self.client.notify_end_of_startup()?;
|
||||
if let Err(err) = self.client.notify_end_of_startup() {
|
||||
error!("Error in notifying end of startup: ({})", err);
|
||||
if panic_on_db_errors {
|
||||
abort();
|
||||
}
|
||||
}
|
||||
self.is_startup_done = true;
|
||||
startup_done_count.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
@ -578,6 +667,9 @@ impl PostgresClientWorker {
|
||||
}
|
||||
_ => {
|
||||
error!("Error in receiving the item {:?}", err);
|
||||
if panic_on_db_errors {
|
||||
abort();
|
||||
}
|
||||
break;
|
||||
}
|
||||
},
|
||||
@ -616,6 +708,10 @@ impl ParallelPostgresClient {
|
||||
let worker = Builder::new()
|
||||
.name(format!("worker-{}", i))
|
||||
.spawn(move || -> Result<(), AccountsDbPluginError> {
|
||||
let panic_on_db_errors = *config
|
||||
.panic_on_db_errors
|
||||
.as_ref()
|
||||
.unwrap_or(&DEFAULT_PANIC_ON_DB_ERROR);
|
||||
let result = PostgresClientWorker::new(config);
|
||||
|
||||
match result {
|
||||
@ -626,10 +722,17 @@ impl ParallelPostgresClient {
|
||||
exit_clone,
|
||||
is_startup_done_clone,
|
||||
startup_done_count_clone,
|
||||
panic_on_db_errors,
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
Err(err) => {
|
||||
error!("Error when making connection to database: ({})", err);
|
||||
if panic_on_db_errors {
|
||||
abort();
|
||||
}
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
Reference in New Issue
Block a user