Fixed issue #22124 -- missing historical data if slot updated later. (#22193) (#22259)

* Fixed issue #22124 -- missing historical data if slot updated later.

* Fixed a couple of comments

(cherry picked from commit 5b6027bef0)

Co-authored-by: Lijun Wang <83639177+lijunwangs@users.noreply.github.com>
This commit is contained in:
mergify[bot]
2022-01-04 07:18:58 +00:00
committed by GitHub
parent 464d533da3
commit 77558c315d
3 changed files with 95 additions and 12 deletions

View File

@ -41,6 +41,8 @@ pub struct AccountsDbPluginPostgresConfig {
pub threads: Option<usize>,
pub batch_size: Option<usize>,
pub panic_on_db_errors: Option<bool>,
/// Indicates if to store historical data for accounts
pub store_account_historical_data: Option<bool>,
}
#[derive(Error, Debug)]
@ -74,7 +76,7 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
/// Accounts either satisyfing the accounts condition or owners condition will be selected.
/// When only owners is specified,
/// all accounts belonging to the owners will be streamed.
/// The accounts field support wildcard to select all accounts:
/// The accounts field supports wildcard to select all accounts:
/// "accounts_selector" : {
/// "accounts" : \["*"\],
/// }
@ -85,6 +87,8 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
/// Please refer to https://docs.rs/postgres/0.19.2/postgres/config/struct.Config.html for the connection configuration.
/// When `connection_str` is set, the values in "host", "user" and "port" are ignored. If `connection_str` is not given,
/// `host` and `user` must be given.
/// "store_account_historical_data", optional, set it to 'true', to store historical account data to account_audit
/// table.
/// * "threads" optional, specifies the number of worker threads for the plugin. A thread
/// maintains a PostgreSQL connection to the server. The default is '10'.
/// * "batch_size" optional, specifies the batch size of bulk insert when the AccountsDb is created

View File

@ -39,6 +39,7 @@ const DEFAULT_THREADS_COUNT: usize = 100;
const DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE: usize = 10;
const ACCOUNT_COLUMN_COUNT: usize = 9;
const DEFAULT_PANIC_ON_DB_ERROR: bool = false;
const DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA: bool = false;
struct PostgresSqlClientWrapper {
client: Client,
@ -48,6 +49,7 @@ struct PostgresSqlClientWrapper {
update_slot_without_parent_stmt: Statement,
update_transaction_log_stmt: Statement,
update_block_metadata_stmt: Statement,
insert_account_audit_stmt: Option<Statement>,
}
pub struct SimplePostgresClient {
@ -324,6 +326,28 @@ impl SimplePostgresClient {
}
}
fn build_account_audit_insert_statement(
client: &mut Client,
config: &AccountsDbPluginPostgresConfig,
) -> Result<Statement, AccountsDbPluginError> {
let stmt = "INSERT INTO account_audit (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)";
let stmt = client.prepare(stmt);
match stmt {
Err(err) => {
return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
msg: format!(
"Error in preparing for the account_audit update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
err, config.host, config.user, config
),
})));
}
Ok(stmt) => Ok(stmt),
}
}
fn build_slot_upsert_statement_with_parent(
client: &mut Client,
config: &AccountsDbPluginPostgresConfig,
@ -370,8 +394,8 @@ impl SimplePostgresClient {
}
}
/// Internal function for updating or inserting a single account
fn upsert_account_internal(
/// Internal function for inserting an account into account_audit table.
fn insert_account_audit(
account: &DbAccountInfo,
statement: &Statement,
client: &mut Client,
@ -379,7 +403,43 @@ impl SimplePostgresClient {
let lamports = account.lamports() as i64;
let rent_epoch = account.rent_epoch() as i64;
let updated_on = Utc::now().naive_utc();
let result = client.query(
let result = client.execute(
statement,
&[
&account.pubkey(),
&account.slot,
&account.owner(),
&lamports,
&account.executable(),
&rent_epoch,
&account.data(),
&account.write_version(),
&updated_on,
],
);
if let Err(err) = result {
let msg = format!(
"Failed to persist the insert of account_audit to the PostgreSQL database. Error: {:?}",
err
);
error!("{}", msg);
return Err(AccountsDbPluginError::AccountsUpdateError { msg });
}
Ok(())
}
/// Internal function for updating or inserting a single account
fn upsert_account_internal(
account: &DbAccountInfo,
statement: &Statement,
client: &mut Client,
insert_account_audit_stmt: &Option<Statement>,
) -> Result<(), AccountsDbPluginError> {
let lamports = account.lamports() as i64;
let rent_epoch = account.rent_epoch() as i64;
let updated_on = Utc::now().naive_utc();
let result = client.execute(
statement,
&[
&account.pubkey(),
@ -401,6 +461,11 @@ impl SimplePostgresClient {
);
error!("{}", msg);
return Err(AccountsDbPluginError::AccountsUpdateError { msg });
} else if result.unwrap() == 0 && insert_account_audit_stmt.is_some() {
// If no records modified (inserted or updated), it is because the account is updated
// at an older slot, insert the record directly into the account_audit table.
let statement = insert_account_audit_stmt.as_ref().unwrap();
Self::insert_account_audit(account, statement, client)?;
}
Ok(())
@ -409,9 +474,10 @@ impl SimplePostgresClient {
/// Update or insert a single account
fn upsert_account(&mut self, account: &DbAccountInfo) -> Result<(), AccountsDbPluginError> {
let client = self.client.get_mut().unwrap();
let insert_account_audit_stmt = &client.insert_account_audit_stmt;
let statement = &client.update_account_stmt;
let client = &mut client.client;
Self::upsert_account_internal(account, statement, client)
Self::upsert_account_internal(account, statement, client, insert_account_audit_stmt)
}
/// Insert accounts in batch to reduce network overhead
@ -487,11 +553,12 @@ impl SimplePostgresClient {
}
let client = self.client.get_mut().unwrap();
let insert_account_audit_stmt = &client.insert_account_audit_stmt;
let statement = &client.update_account_stmt;
let client = &mut client.client;
for account in self.pending_account_updates.drain(..) {
Self::upsert_account_internal(&account, statement, client)?;
Self::upsert_account_internal(&account, statement, client, insert_account_audit_stmt)?;
}
Ok(())
@ -516,6 +583,18 @@ impl SimplePostgresClient {
let batch_size = config
.batch_size
.unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
let store_account_historical_data = config
.store_account_historical_data
.unwrap_or(DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA);
let insert_account_audit_stmt = if store_account_historical_data {
let stmt = Self::build_account_audit_insert_statement(&mut client, config)?;
Some(stmt)
} else {
None
};
info!("Created SimplePostgresClient.");
Ok(Self {
batch_size,
@ -528,6 +607,7 @@ impl SimplePostgresClient {
update_slot_without_parent_stmt,
update_transaction_log_stmt,
update_block_metadata_stmt,
insert_account_audit_stmt,
}),
})
}