Summary of Changes Added the reference postgresql.conf Prepare slot update statement to reduce overhead in updating slot Support custom connection string Allow the plugin to panic on replication issues to ensure consistency
880 lines
30 KiB
Rust
880 lines
30 KiB
Rust
#![allow(clippy::integer_arithmetic)]
|
|
|
|
/// A concurrent implementation for writing accounts into the PostgreSQL in parallel.
|
|
use {
|
|
crate::accountsdb_plugin_postgres::{
|
|
AccountsDbPluginPostgresConfig, AccountsDbPluginPostgresError,
|
|
},
|
|
chrono::Utc,
|
|
crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender},
|
|
log::*,
|
|
postgres::{Client, NoTls, Statement},
|
|
solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
|
|
AccountsDbPluginError, ReplicaAccountInfo, SlotStatus,
|
|
},
|
|
solana_measure::measure::Measure,
|
|
solana_metrics::*,
|
|
solana_sdk::timing::AtomicInterval,
|
|
std::{
|
|
sync::{
|
|
atomic::{AtomicBool, AtomicUsize, Ordering},
|
|
Arc, Mutex,
|
|
},
|
|
thread::{self, sleep, Builder, JoinHandle},
|
|
time::Duration,
|
|
},
|
|
tokio_postgres::types::ToSql,
|
|
};
|
|
|
|
/// The maximum asynchronous requests allowed in the channel to avoid excessive
|
|
/// memory usage. The downside -- calls after this threshold is reached can get blocked.
|
|
const MAX_ASYNC_REQUESTS: usize = 40960;
|
|
const DEFAULT_POSTGRES_PORT: u16 = 5432;
|
|
const DEFAULT_THREADS_COUNT: usize = 100;
|
|
const DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE: usize = 10;
|
|
const ACCOUNT_COLUMN_COUNT: usize = 9;
|
|
const DEFAULT_PANIC_ON_DB_ERROR: bool = false;
|
|
|
|
struct PostgresSqlClientWrapper {
|
|
client: Client,
|
|
update_account_stmt: Statement,
|
|
bulk_account_insert_stmt: Statement,
|
|
update_slot_with_parent_stmt: Statement,
|
|
update_slot_without_parent_stmt: Statement,
|
|
}
|
|
|
|
pub struct SimplePostgresClient {
|
|
batch_size: usize,
|
|
pending_account_updates: Vec<DbAccountInfo>,
|
|
client: Mutex<PostgresSqlClientWrapper>,
|
|
}
|
|
|
|
struct PostgresClientWorker {
|
|
client: SimplePostgresClient,
|
|
/// Indicating if accounts notification during startup is done.
|
|
is_startup_done: bool,
|
|
}
|
|
|
|
impl Eq for DbAccountInfo {}
|
|
|
|
#[derive(Clone, PartialEq, Debug)]
|
|
pub struct DbAccountInfo {
|
|
pub pubkey: Vec<u8>,
|
|
pub lamports: i64,
|
|
pub owner: Vec<u8>,
|
|
pub executable: bool,
|
|
pub rent_epoch: i64,
|
|
pub data: Vec<u8>,
|
|
pub slot: i64,
|
|
pub write_version: i64,
|
|
}
|
|
|
|
pub(crate) fn abort() -> ! {
|
|
#[cfg(not(test))]
|
|
{
|
|
// standard error is usually redirected to a log file, cry for help on standard output as
|
|
// well
|
|
eprintln!("Validator process aborted. The validator log may contain further details");
|
|
std::process::exit(1);
|
|
}
|
|
|
|
#[cfg(test)]
|
|
panic!("process::exit(1) is intercepted for friendly test failure...");
|
|
}
|
|
|
|
impl DbAccountInfo {
|
|
fn new<T: ReadableAccountInfo>(account: &T, slot: u64) -> DbAccountInfo {
|
|
let data = account.data().to_vec();
|
|
Self {
|
|
pubkey: account.pubkey().to_vec(),
|
|
lamports: account.lamports() as i64,
|
|
owner: account.owner().to_vec(),
|
|
executable: account.executable(),
|
|
rent_epoch: account.rent_epoch() as i64,
|
|
data,
|
|
slot: slot as i64,
|
|
write_version: account.write_version(),
|
|
}
|
|
}
|
|
}
|
|
|
|
pub trait ReadableAccountInfo: Sized {
|
|
fn pubkey(&self) -> &[u8];
|
|
fn owner(&self) -> &[u8];
|
|
fn lamports(&self) -> i64;
|
|
fn executable(&self) -> bool;
|
|
fn rent_epoch(&self) -> i64;
|
|
fn data(&self) -> &[u8];
|
|
fn write_version(&self) -> i64;
|
|
}
|
|
|
|
impl ReadableAccountInfo for DbAccountInfo {
|
|
fn pubkey(&self) -> &[u8] {
|
|
&self.pubkey
|
|
}
|
|
|
|
fn owner(&self) -> &[u8] {
|
|
&self.owner
|
|
}
|
|
|
|
fn lamports(&self) -> i64 {
|
|
self.lamports
|
|
}
|
|
|
|
fn executable(&self) -> bool {
|
|
self.executable
|
|
}
|
|
|
|
fn rent_epoch(&self) -> i64 {
|
|
self.rent_epoch
|
|
}
|
|
|
|
fn data(&self) -> &[u8] {
|
|
&self.data
|
|
}
|
|
|
|
fn write_version(&self) -> i64 {
|
|
self.write_version
|
|
}
|
|
}
|
|
|
|
impl<'a> ReadableAccountInfo for ReplicaAccountInfo<'a> {
|
|
fn pubkey(&self) -> &[u8] {
|
|
self.pubkey
|
|
}
|
|
|
|
fn owner(&self) -> &[u8] {
|
|
self.owner
|
|
}
|
|
|
|
fn lamports(&self) -> i64 {
|
|
self.lamports as i64
|
|
}
|
|
|
|
fn executable(&self) -> bool {
|
|
self.executable
|
|
}
|
|
|
|
fn rent_epoch(&self) -> i64 {
|
|
self.rent_epoch as i64
|
|
}
|
|
|
|
fn data(&self) -> &[u8] {
|
|
self.data
|
|
}
|
|
|
|
fn write_version(&self) -> i64 {
|
|
self.write_version as i64
|
|
}
|
|
}
|
|
|
|
pub trait PostgresClient {
|
|
fn join(&mut self) -> thread::Result<()> {
|
|
Ok(())
|
|
}
|
|
|
|
fn update_account(
|
|
&mut self,
|
|
account: DbAccountInfo,
|
|
is_startup: bool,
|
|
) -> Result<(), AccountsDbPluginError>;
|
|
|
|
fn update_slot_status(
|
|
&mut self,
|
|
slot: u64,
|
|
parent: Option<u64>,
|
|
status: SlotStatus,
|
|
) -> Result<(), AccountsDbPluginError>;
|
|
|
|
fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError>;
|
|
}
|
|
|
|
impl SimplePostgresClient {
|
|
fn connect_to_db(
|
|
config: &AccountsDbPluginPostgresConfig,
|
|
) -> Result<Client, AccountsDbPluginError> {
|
|
let port = config.port.unwrap_or(DEFAULT_POSTGRES_PORT);
|
|
|
|
let connection_str = if let Some(connection_str) = &config.connection_str {
|
|
connection_str.clone()
|
|
} else {
|
|
if config.host.is_none() || config.user.is_none() {
|
|
let msg = format!(
|
|
"\"connection_str\": {:?}, or \"host\": {:?} \"user\": {:?} must be specified",
|
|
config.connection_str, config.host, config.user
|
|
);
|
|
return Err(AccountsDbPluginError::Custom(Box::new(
|
|
AccountsDbPluginPostgresError::ConfigurationError { msg },
|
|
)));
|
|
}
|
|
format!(
|
|
"host={} user={} port={}",
|
|
config.host.as_ref().unwrap(),
|
|
config.user.as_ref().unwrap(),
|
|
port
|
|
)
|
|
};
|
|
|
|
match Client::connect(&connection_str, NoTls) {
|
|
Err(err) => {
|
|
let msg = format!(
|
|
"Error in connecting to the PostgreSQL database: {:?} connection_str: {:?}",
|
|
err, connection_str
|
|
);
|
|
error!("{}", msg);
|
|
Err(AccountsDbPluginError::Custom(Box::new(
|
|
AccountsDbPluginPostgresError::DataStoreConnectionError { msg },
|
|
)))
|
|
}
|
|
Ok(client) => Ok(client),
|
|
}
|
|
}
|
|
|
|
fn build_bulk_account_insert_statement(
|
|
client: &mut Client,
|
|
config: &AccountsDbPluginPostgresConfig,
|
|
) -> Result<Statement, AccountsDbPluginError> {
|
|
let batch_size = config
|
|
.batch_size
|
|
.unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
|
|
let mut stmt = String::from("INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) VALUES");
|
|
for j in 0..batch_size {
|
|
let row = j * ACCOUNT_COLUMN_COUNT;
|
|
let val_str = format!(
|
|
"(${}, ${}, ${}, ${}, ${}, ${}, ${}, ${}, ${})",
|
|
row + 1,
|
|
row + 2,
|
|
row + 3,
|
|
row + 4,
|
|
row + 5,
|
|
row + 6,
|
|
row + 7,
|
|
row + 8,
|
|
row + 9,
|
|
);
|
|
|
|
if j == 0 {
|
|
stmt = format!("{} {}", &stmt, val_str);
|
|
} else {
|
|
stmt = format!("{}, {}", &stmt, val_str);
|
|
}
|
|
}
|
|
|
|
let handle_conflict = "ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \
|
|
data=excluded.data, write_version=excluded.write_version, updated_on=excluded.updated_on WHERE acct.slot < excluded.slot OR (\
|
|
acct.slot = excluded.slot AND acct.write_version < excluded.write_version)";
|
|
|
|
stmt = format!("{} {}", stmt, handle_conflict);
|
|
|
|
info!("{}", stmt);
|
|
let bulk_stmt = client.prepare(&stmt);
|
|
|
|
match bulk_stmt {
|
|
Err(err) => {
|
|
return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
|
|
msg: format!(
|
|
"Error in preparing for the accounts update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
|
|
err, config.host, config.user, config
|
|
),
|
|
})));
|
|
}
|
|
Ok(update_account_stmt) => Ok(update_account_stmt),
|
|
}
|
|
}
|
|
|
|
fn build_single_account_upsert_statement(
|
|
client: &mut Client,
|
|
config: &AccountsDbPluginPostgresConfig,
|
|
) -> Result<Statement, AccountsDbPluginError> {
|
|
let stmt = "INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) \
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
|
|
ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \
|
|
data=excluded.data, write_version=excluded.write_version, updated_on=excluded.updated_on WHERE acct.slot < excluded.slot OR (\
|
|
acct.slot = excluded.slot AND acct.write_version < excluded.write_version)";
|
|
|
|
let stmt = client.prepare(stmt);
|
|
|
|
match stmt {
|
|
Err(err) => {
|
|
return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
|
|
msg: format!(
|
|
"Error in preparing for the accounts update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
|
|
err, config.host, config.user, config
|
|
),
|
|
})));
|
|
}
|
|
Ok(update_account_stmt) => Ok(update_account_stmt),
|
|
}
|
|
}
|
|
|
|
fn build_slot_upsert_statement_with_parent(
|
|
client: &mut Client,
|
|
config: &AccountsDbPluginPostgresConfig,
|
|
) -> Result<Statement, AccountsDbPluginError> {
|
|
let stmt = "INSERT INTO slot (slot, parent, status, updated_on) \
|
|
VALUES ($1, $2, $3, $4) \
|
|
ON CONFLICT (slot) DO UPDATE SET parent=excluded.parent, status=excluded.status, updated_on=excluded.updated_on";
|
|
|
|
let stmt = client.prepare(stmt);
|
|
|
|
match stmt {
|
|
Err(err) => {
|
|
return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
|
|
msg: format!(
|
|
"Error in preparing for the slot update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
|
|
err, config.host, config.user, config
|
|
),
|
|
})));
|
|
}
|
|
Ok(stmt) => Ok(stmt),
|
|
}
|
|
}
|
|
|
|
fn build_slot_upsert_statement_without_parent(
|
|
client: &mut Client,
|
|
config: &AccountsDbPluginPostgresConfig,
|
|
) -> Result<Statement, AccountsDbPluginError> {
|
|
let stmt = "INSERT INTO slot (slot, status, updated_on) \
|
|
VALUES ($1, $2, $3) \
|
|
ON CONFLICT (slot) DO UPDATE SET status=excluded.status, updated_on=excluded.updated_on";
|
|
|
|
let stmt = client.prepare(stmt);
|
|
|
|
match stmt {
|
|
Err(err) => {
|
|
return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
|
|
msg: format!(
|
|
"Error in preparing for the slot update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
|
|
err, config.host, config.user, config
|
|
),
|
|
})));
|
|
}
|
|
Ok(stmt) => Ok(stmt),
|
|
}
|
|
}
|
|
|
|
/// Internal function for updating or inserting a single account
|
|
fn upsert_account_internal(
|
|
account: &DbAccountInfo,
|
|
statement: &Statement,
|
|
client: &mut Client,
|
|
) -> Result<(), AccountsDbPluginError> {
|
|
let lamports = account.lamports() as i64;
|
|
let rent_epoch = account.rent_epoch() as i64;
|
|
let updated_on = Utc::now().naive_utc();
|
|
let result = client.query(
|
|
statement,
|
|
&[
|
|
&account.pubkey(),
|
|
&account.slot,
|
|
&account.owner(),
|
|
&lamports,
|
|
&account.executable(),
|
|
&rent_epoch,
|
|
&account.data(),
|
|
&account.write_version(),
|
|
&updated_on,
|
|
],
|
|
);
|
|
|
|
if let Err(err) = result {
|
|
let msg = format!(
|
|
"Failed to persist the update of account to the PostgreSQL database. Error: {:?}",
|
|
err
|
|
);
|
|
error!("{}", msg);
|
|
return Err(AccountsDbPluginError::AccountsUpdateError { msg });
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Update or insert a single account
|
|
fn upsert_account(&mut self, account: &DbAccountInfo) -> Result<(), AccountsDbPluginError> {
|
|
let client = self.client.get_mut().unwrap();
|
|
let statement = &client.update_account_stmt;
|
|
let client = &mut client.client;
|
|
Self::upsert_account_internal(account, statement, client)
|
|
}
|
|
|
|
/// Insert accounts in batch to reduce network overhead
|
|
fn insert_accounts_in_batch(
|
|
&mut self,
|
|
account: DbAccountInfo,
|
|
) -> Result<(), AccountsDbPluginError> {
|
|
self.pending_account_updates.push(account);
|
|
|
|
if self.pending_account_updates.len() == self.batch_size {
|
|
let mut measure = Measure::start("accountsdb-plugin-postgres-prepare-values");
|
|
|
|
let mut values: Vec<&(dyn ToSql + Sync)> =
|
|
Vec::with_capacity(self.batch_size * ACCOUNT_COLUMN_COUNT);
|
|
let updated_on = Utc::now().naive_utc();
|
|
for j in 0..self.batch_size {
|
|
let account = &self.pending_account_updates[j];
|
|
|
|
values.push(&account.pubkey);
|
|
values.push(&account.slot);
|
|
values.push(&account.owner);
|
|
values.push(&account.lamports);
|
|
values.push(&account.executable);
|
|
values.push(&account.rent_epoch);
|
|
values.push(&account.data);
|
|
values.push(&account.write_version);
|
|
values.push(&updated_on);
|
|
}
|
|
measure.stop();
|
|
inc_new_counter_debug!(
|
|
"accountsdb-plugin-postgres-prepare-values-us",
|
|
measure.as_us() as usize,
|
|
10000,
|
|
10000
|
|
);
|
|
|
|
let mut measure = Measure::start("accountsdb-plugin-postgres-update-account");
|
|
let client = self.client.get_mut().unwrap();
|
|
let result = client
|
|
.client
|
|
.query(&client.bulk_account_insert_stmt, &values);
|
|
|
|
self.pending_account_updates.clear();
|
|
if let Err(err) = result {
|
|
let msg = format!(
|
|
"Failed to persist the update of account to the PostgreSQL database. Error: {:?}",
|
|
err
|
|
);
|
|
error!("{}", msg);
|
|
return Err(AccountsDbPluginError::AccountsUpdateError { msg });
|
|
}
|
|
measure.stop();
|
|
inc_new_counter_debug!(
|
|
"accountsdb-plugin-postgres-update-account-us",
|
|
measure.as_us() as usize,
|
|
10000,
|
|
10000
|
|
);
|
|
inc_new_counter_debug!(
|
|
"accountsdb-plugin-postgres-update-account-count",
|
|
self.batch_size,
|
|
10000,
|
|
10000
|
|
);
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Flush any left over accounts in batch which are not processed in the last batch
|
|
fn flush_buffered_writes(&mut self) -> Result<(), AccountsDbPluginError> {
|
|
if self.pending_account_updates.is_empty() {
|
|
return Ok(());
|
|
}
|
|
|
|
let client = self.client.get_mut().unwrap();
|
|
let statement = &client.update_account_stmt;
|
|
let client = &mut client.client;
|
|
|
|
for account in self.pending_account_updates.drain(..) {
|
|
Self::upsert_account_internal(&account, statement, client)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result<Self, AccountsDbPluginError> {
|
|
info!("Creating SimplePostgresClient...");
|
|
let mut client = Self::connect_to_db(config)?;
|
|
let bulk_account_insert_stmt =
|
|
Self::build_bulk_account_insert_statement(&mut client, config)?;
|
|
let update_account_stmt = Self::build_single_account_upsert_statement(&mut client, config)?;
|
|
|
|
let update_slot_with_parent_stmt =
|
|
Self::build_slot_upsert_statement_with_parent(&mut client, config)?;
|
|
let update_slot_without_parent_stmt =
|
|
Self::build_slot_upsert_statement_without_parent(&mut client, config)?;
|
|
|
|
let batch_size = config
|
|
.batch_size
|
|
.unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
|
|
info!("Created SimplePostgresClient.");
|
|
Ok(Self {
|
|
batch_size,
|
|
pending_account_updates: Vec::with_capacity(batch_size),
|
|
client: Mutex::new(PostgresSqlClientWrapper {
|
|
client,
|
|
update_account_stmt,
|
|
bulk_account_insert_stmt,
|
|
update_slot_with_parent_stmt,
|
|
update_slot_without_parent_stmt,
|
|
}),
|
|
})
|
|
}
|
|
}
|
|
|
|
impl PostgresClient for SimplePostgresClient {
|
|
fn update_account(
|
|
&mut self,
|
|
account: DbAccountInfo,
|
|
is_startup: bool,
|
|
) -> Result<(), AccountsDbPluginError> {
|
|
trace!(
|
|
"Updating account {} with owner {} at slot {}",
|
|
bs58::encode(account.pubkey()).into_string(),
|
|
bs58::encode(account.owner()).into_string(),
|
|
account.slot,
|
|
);
|
|
if !is_startup {
|
|
return self.upsert_account(&account);
|
|
}
|
|
self.insert_accounts_in_batch(account)
|
|
}
|
|
|
|
fn update_slot_status(
|
|
&mut self,
|
|
slot: u64,
|
|
parent: Option<u64>,
|
|
status: SlotStatus,
|
|
) -> Result<(), AccountsDbPluginError> {
|
|
info!("Updating slot {:?} at with status {:?}", slot, status);
|
|
|
|
let slot = slot as i64; // postgres only supports i64
|
|
let parent = parent.map(|parent| parent as i64);
|
|
let updated_on = Utc::now().naive_utc();
|
|
let status_str = status.as_str();
|
|
let client = self.client.get_mut().unwrap();
|
|
|
|
let result = match parent {
|
|
Some(parent) => client.client.execute(
|
|
&client.update_slot_with_parent_stmt,
|
|
&[&slot, &parent, &status_str, &updated_on],
|
|
),
|
|
None => client.client.execute(
|
|
&client.update_slot_without_parent_stmt,
|
|
&[&slot, &status_str, &updated_on],
|
|
),
|
|
};
|
|
|
|
match result {
|
|
Err(err) => {
|
|
let msg = format!(
|
|
"Failed to persist the update of slot to the PostgreSQL database. Error: {:?}",
|
|
err
|
|
);
|
|
error!("{:?}", msg);
|
|
return Err(AccountsDbPluginError::SlotStatusUpdateError { msg });
|
|
}
|
|
Ok(rows) => {
|
|
assert_eq!(1, rows, "Expected one rows to be updated a time");
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> {
|
|
self.flush_buffered_writes()
|
|
}
|
|
}
|
|
|
|
struct UpdateAccountRequest {
|
|
account: DbAccountInfo,
|
|
is_startup: bool,
|
|
}
|
|
|
|
struct UpdateSlotRequest {
|
|
slot: u64,
|
|
parent: Option<u64>,
|
|
slot_status: SlotStatus,
|
|
}
|
|
|
|
enum DbWorkItem {
|
|
UpdateAccount(UpdateAccountRequest),
|
|
UpdateSlot(UpdateSlotRequest),
|
|
}
|
|
|
|
impl PostgresClientWorker {
|
|
fn new(config: AccountsDbPluginPostgresConfig) -> Result<Self, AccountsDbPluginError> {
|
|
let result = SimplePostgresClient::new(&config);
|
|
match result {
|
|
Ok(client) => Ok(PostgresClientWorker {
|
|
client,
|
|
is_startup_done: false,
|
|
}),
|
|
Err(err) => {
|
|
error!("Error in creating SimplePostgresClient: {}", err);
|
|
Err(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
fn do_work(
|
|
&mut self,
|
|
receiver: Receiver<DbWorkItem>,
|
|
exit_worker: Arc<AtomicBool>,
|
|
is_startup_done: Arc<AtomicBool>,
|
|
startup_done_count: Arc<AtomicUsize>,
|
|
panic_on_db_errors: bool,
|
|
) -> Result<(), AccountsDbPluginError> {
|
|
while !exit_worker.load(Ordering::Relaxed) {
|
|
let mut measure = Measure::start("accountsdb-plugin-postgres-worker-recv");
|
|
let work = receiver.recv_timeout(Duration::from_millis(500));
|
|
measure.stop();
|
|
inc_new_counter_debug!(
|
|
"accountsdb-plugin-postgres-worker-recv-us",
|
|
measure.as_us() as usize,
|
|
100000,
|
|
100000
|
|
);
|
|
match work {
|
|
Ok(work) => match work {
|
|
DbWorkItem::UpdateAccount(request) => {
|
|
if let Err(err) = self
|
|
.client
|
|
.update_account(request.account, request.is_startup)
|
|
{
|
|
error!("Failed to update account: ({})", err);
|
|
if panic_on_db_errors {
|
|
abort();
|
|
}
|
|
}
|
|
}
|
|
DbWorkItem::UpdateSlot(request) => {
|
|
if let Err(err) = self.client.update_slot_status(
|
|
request.slot,
|
|
request.parent,
|
|
request.slot_status,
|
|
) {
|
|
error!("Failed to update slot: ({})", err);
|
|
if panic_on_db_errors {
|
|
abort();
|
|
}
|
|
}
|
|
}
|
|
},
|
|
Err(err) => match err {
|
|
RecvTimeoutError::Timeout => {
|
|
if !self.is_startup_done && is_startup_done.load(Ordering::Relaxed) {
|
|
if let Err(err) = self.client.notify_end_of_startup() {
|
|
error!("Error in notifying end of startup: ({})", err);
|
|
if panic_on_db_errors {
|
|
abort();
|
|
}
|
|
}
|
|
self.is_startup_done = true;
|
|
startup_done_count.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
|
|
continue;
|
|
}
|
|
_ => {
|
|
error!("Error in receiving the item {:?}", err);
|
|
if panic_on_db_errors {
|
|
abort();
|
|
}
|
|
break;
|
|
}
|
|
},
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
pub struct ParallelPostgresClient {
|
|
workers: Vec<JoinHandle<Result<(), AccountsDbPluginError>>>,
|
|
exit_worker: Arc<AtomicBool>,
|
|
is_startup_done: Arc<AtomicBool>,
|
|
startup_done_count: Arc<AtomicUsize>,
|
|
initialized_worker_count: Arc<AtomicUsize>,
|
|
sender: Sender<DbWorkItem>,
|
|
last_report: AtomicInterval,
|
|
}
|
|
|
|
impl ParallelPostgresClient {
|
|
pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result<Self, AccountsDbPluginError> {
|
|
info!("Creating ParallelPostgresClient...");
|
|
let (sender, receiver) = bounded(MAX_ASYNC_REQUESTS);
|
|
let exit_worker = Arc::new(AtomicBool::new(false));
|
|
let mut workers = Vec::default();
|
|
let is_startup_done = Arc::new(AtomicBool::new(false));
|
|
let startup_done_count = Arc::new(AtomicUsize::new(0));
|
|
let worker_count = config.threads.unwrap_or(DEFAULT_THREADS_COUNT);
|
|
let initialized_worker_count = Arc::new(AtomicUsize::new(0));
|
|
for i in 0..worker_count {
|
|
let cloned_receiver = receiver.clone();
|
|
let exit_clone = exit_worker.clone();
|
|
let is_startup_done_clone = is_startup_done.clone();
|
|
let startup_done_count_clone = startup_done_count.clone();
|
|
let initialized_worker_count_clone = initialized_worker_count.clone();
|
|
let config = config.clone();
|
|
let worker = Builder::new()
|
|
.name(format!("worker-{}", i))
|
|
.spawn(move || -> Result<(), AccountsDbPluginError> {
|
|
let panic_on_db_errors = *config
|
|
.panic_on_db_errors
|
|
.as_ref()
|
|
.unwrap_or(&DEFAULT_PANIC_ON_DB_ERROR);
|
|
let result = PostgresClientWorker::new(config);
|
|
|
|
match result {
|
|
Ok(mut worker) => {
|
|
initialized_worker_count_clone.fetch_add(1, Ordering::Relaxed);
|
|
worker.do_work(
|
|
cloned_receiver,
|
|
exit_clone,
|
|
is_startup_done_clone,
|
|
startup_done_count_clone,
|
|
panic_on_db_errors,
|
|
)?;
|
|
Ok(())
|
|
}
|
|
Err(err) => {
|
|
error!("Error when making connection to database: ({})", err);
|
|
if panic_on_db_errors {
|
|
abort();
|
|
}
|
|
Err(err)
|
|
}
|
|
}
|
|
})
|
|
.unwrap();
|
|
|
|
workers.push(worker);
|
|
}
|
|
|
|
info!("Created ParallelPostgresClient.");
|
|
Ok(Self {
|
|
last_report: AtomicInterval::default(),
|
|
workers,
|
|
exit_worker,
|
|
is_startup_done,
|
|
startup_done_count,
|
|
initialized_worker_count,
|
|
sender,
|
|
})
|
|
}
|
|
|
|
pub fn join(&mut self) -> thread::Result<()> {
|
|
self.exit_worker.store(true, Ordering::Relaxed);
|
|
while !self.workers.is_empty() {
|
|
let worker = self.workers.pop();
|
|
if worker.is_none() {
|
|
break;
|
|
}
|
|
let worker = worker.unwrap();
|
|
let result = worker.join().unwrap();
|
|
if result.is_err() {
|
|
error!("The worker thread has failed: {:?}", result);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub fn update_account(
|
|
&mut self,
|
|
account: &ReplicaAccountInfo,
|
|
slot: u64,
|
|
is_startup: bool,
|
|
) -> Result<(), AccountsDbPluginError> {
|
|
if self.last_report.should_update(30000) {
|
|
datapoint_debug!(
|
|
"postgres-plugin-stats",
|
|
("message-queue-length", self.sender.len() as i64, i64),
|
|
);
|
|
}
|
|
let mut measure = Measure::start("accountsdb-plugin-posgres-create-work-item");
|
|
let wrk_item = DbWorkItem::UpdateAccount(UpdateAccountRequest {
|
|
account: DbAccountInfo::new(account, slot),
|
|
is_startup,
|
|
});
|
|
|
|
measure.stop();
|
|
|
|
inc_new_counter_debug!(
|
|
"accountsdb-plugin-posgres-create-work-item-us",
|
|
measure.as_us() as usize,
|
|
100000,
|
|
100000
|
|
);
|
|
|
|
let mut measure = Measure::start("accountsdb-plugin-posgres-send-msg");
|
|
|
|
if let Err(err) = self.sender.send(wrk_item) {
|
|
return Err(AccountsDbPluginError::AccountsUpdateError {
|
|
msg: format!(
|
|
"Failed to update the account {:?}, error: {:?}",
|
|
bs58::encode(account.pubkey()).into_string(),
|
|
err
|
|
),
|
|
});
|
|
}
|
|
|
|
measure.stop();
|
|
inc_new_counter_debug!(
|
|
"accountsdb-plugin-posgres-send-msg-us",
|
|
measure.as_us() as usize,
|
|
100000,
|
|
100000
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub fn update_slot_status(
|
|
&mut self,
|
|
slot: u64,
|
|
parent: Option<u64>,
|
|
status: SlotStatus,
|
|
) -> Result<(), AccountsDbPluginError> {
|
|
if let Err(err) = self.sender.send(DbWorkItem::UpdateSlot(UpdateSlotRequest {
|
|
slot,
|
|
parent,
|
|
slot_status: status,
|
|
})) {
|
|
return Err(AccountsDbPluginError::SlotStatusUpdateError {
|
|
msg: format!("Failed to update the slot {:?}, error: {:?}", slot, err),
|
|
});
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> {
|
|
info!("Notifying the end of startup");
|
|
// Ensure all items in the queue has been received by the workers
|
|
while !self.sender.is_empty() {
|
|
sleep(Duration::from_millis(100));
|
|
}
|
|
self.is_startup_done.store(true, Ordering::Relaxed);
|
|
|
|
// Wait for all worker threads to be done with flushing
|
|
while self.startup_done_count.load(Ordering::Relaxed)
|
|
!= self.initialized_worker_count.load(Ordering::Relaxed)
|
|
{
|
|
info!(
|
|
"Startup done count: {}, good worker thread count: {}",
|
|
self.startup_done_count.load(Ordering::Relaxed),
|
|
self.initialized_worker_count.load(Ordering::Relaxed)
|
|
);
|
|
sleep(Duration::from_millis(100));
|
|
}
|
|
|
|
info!("Done with notifying the end of startup");
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
pub struct PostgresClientBuilder {}
|
|
|
|
impl PostgresClientBuilder {
|
|
pub fn build_pararallel_postgres_client(
|
|
config: &AccountsDbPluginPostgresConfig,
|
|
) -> Result<ParallelPostgresClient, AccountsDbPluginError> {
|
|
ParallelPostgresClient::new(config)
|
|
}
|
|
|
|
pub fn build_simple_postgres_client(
|
|
config: &AccountsDbPluginPostgresConfig,
|
|
) -> Result<SimplePostgresClient, AccountsDbPluginError> {
|
|
SimplePostgresClient::new(config)
|
|
}
|
|
}
|