Accountsdb plugin transaction part 4 -- postgres plugin implementations -- DB models (#21407)

AccountsDb plugin for transactions -- part 4 -- persisting to the Db.
1. DB models for transactions
2. Rust models for transactions
3. Transform from SDK models to rust db models
4. Unit tests
This commit is contained in:
Lijun Wang
2021-12-01 09:23:26 -08:00
committed by GitHub
parent 3d21b062cc
commit f5b0764795
7 changed files with 1629 additions and 17 deletions

16
Cargo.lock generated
View File

@ -3042,6 +3042,17 @@ dependencies = [
"tokio-postgres",
]
[[package]]
name = "postgres-derive"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c857dd221cb0e7d8414b894a0ce29eae44d453dda0baa132447878e75e701477"
dependencies = [
"proc-macro2 1.0.32",
"quote 1.0.10",
"syn 1.0.81",
]
[[package]]
name = "postgres-protocol"
version = "0.6.2"
@ -3069,6 +3080,7 @@ dependencies = [
"bytes 1.1.0",
"chrono",
"fallible-iterator",
"postgres-derive",
"postgres-protocol",
]
@ -4310,14 +4322,18 @@ dependencies = [
"crossbeam-channel",
"log 0.4.14",
"postgres",
"postgres-types",
"serde",
"serde_derive",
"serde_json",
"solana-account-decoder",
"solana-accountsdb-plugin-interface",
"solana-logger 1.9.0",
"solana-measure",
"solana-metrics",
"solana-runtime",
"solana-sdk",
"solana-transaction-status",
"thiserror",
"tokio-postgres",
]

View File

@ -18,6 +18,7 @@ chrono = { version = "0.4.11", features = ["serde"] }
crossbeam-channel = "0.5"
log = "0.4.14"
postgres = { version = "0.19.2", features = ["with-chrono-0_4"] }
postgres-types = { version = "0.2.2", features = ["derive"] }
serde = "1.0.130"
serde_derive = "1.0.103"
serde_json = "1.0.72"
@ -25,8 +26,14 @@ solana-accountsdb-plugin-interface = { path = "../accountsdb-plugin-interface",
solana-logger = { path = "../logger", version = "=1.9.0" }
solana-measure = { path = "../measure", version = "=1.9.0" }
solana-metrics = { path = "../metrics", version = "=1.9.0" }
solana-runtime = { path = "../runtime", version = "=1.9.0" }
solana-sdk = { path = "../sdk", version = "=1.9.0" }
solana-transaction-status = { path = "../transaction-status", version = "=1.9.0" }
thiserror = "1.0.30"
tokio-postgres = "0.7.4"
[dev-dependencies]
solana-account-decoder = { path = "../account-decoder", version = "=1.9.0" }
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]

View File

@ -20,10 +20,137 @@ CREATE TABLE account (
CREATE TABLE slot (
slot BIGINT PRIMARY KEY,
parent BIGINT,
status varchar(16) NOT NULL,
status VARCHAR(16) NOT NULL,
updated_on TIMESTAMP NOT NULL
);
-- Types for Transactions
Create TYPE "TransactionErrorCode" AS ENUM (
'AccountInUse',
'AccountLoadedTwice',
'AccountNotFound',
'ProgramAccountNotFound',
'InsufficientFundsForFee',
'InvalidAccountForFee',
'AlreadyProcessed',
'BlockhashNotFound',
'InstructionError',
'CallChainTooDeep',
'MissingSignatureForFee',
'InvalidAccountIndex',
'SignatureFailure',
'InvalidProgramForExecution',
'SanitizeFailure',
'ClusterMaintenance',
'AccountBorrowOutstanding',
'WouldExceedMaxAccountCostLimit',
'WouldExceedMaxBlockCostLimit',
'UnsupportedVersion',
'InvalidWritableAccount'
);
CREATE TYPE "TransactionError" AS (
error_code "TransactionErrorCode",
error_detail VARCHAR(256)
);
CREATE TYPE "CompiledInstruction" AS (
program_id_index SMALLINT,
accounts SMALLINT[],
data BYTEA
);
CREATE TYPE "InnerInstructions" AS (
index SMALLINT,
instructions "CompiledInstruction"[]
);
CREATE TYPE "TransactionTokenBalance" AS (
account_index SMALLINT,
mint VARCHAR(44),
ui_token_amount DOUBLE PRECISION,
owner VARCHAR(44)
);
Create TYPE "RewardType" AS ENUM (
'Fee',
'Rent',
'Staking',
'Voting'
);
CREATE TYPE "Reward" AS (
pubkey VARCHAR(44),
lamports BIGINT,
post_balance BIGINT,
reward_type "RewardType",
commission SMALLINT
);
CREATE TYPE "TransactionStatusMeta" AS (
error "TransactionError",
fee BIGINT,
pre_balances BIGINT[],
post_balances BIGINT[],
inner_instructions "InnerInstructions"[],
log_messages TEXT[],
pre_token_balances "TransactionTokenBalance"[],
post_token_balances "TransactionTokenBalance"[],
rewards "Reward"[]
);
CREATE TYPE "TransactionMessageHeader" AS (
num_required_signatures SMALLINT,
num_readonly_signed_accounts SMALLINT,
num_readonly_unsigned_accounts SMALLINT
);
CREATE TYPE "TransactionMessage" AS (
header "TransactionMessageHeader",
account_keys BYTEA[],
recent_blockhash BYTEA,
instructions "CompiledInstruction"[]
);
CREATE TYPE "AddressMapIndexes" AS (
writable SMALLINT[],
readonly SMALLINT[]
);
CREATE TYPE "TransactionMessageV0" AS (
header "TransactionMessageHeader",
account_keys BYTEA[],
recent_blockhash BYTEA,
instructions "CompiledInstruction"[],
address_map_indexes "AddressMapIndexes"[]
);
CREATE TYPE "MappedAddresses" AS (
writable BYTEA[],
readonly BYTEA[]
);
CREATE TYPE "MappedMessage" AS (
message "TransactionMessageV0",
mapped_addresses "MappedAddresses"
);
-- The table storing transactions
CREATE TABLE transaction (
slot BIGINT NOT NULL,
signature BYTEA NOT NULL,
is_vote BOOL NOT NULL,
message_type SMALLINT, -- 0: legacy, 1: v0 message
legacy_message "TransactionMessage",
v0_mapped_message "MappedMessage",
signatures BYTEA[],
message_hash BYTEA,
meta "TransactionStatusMeta",
updated_on TIMESTAMP NOT NULL,
CONSTRAINT transaction_pk PRIMARY KEY (slot, signature)
);
/**
* The following is for keeping historical data for accounts and is not required for plugin to work.
*/
@ -40,6 +167,8 @@ CREATE TABLE account_audit (
updated_on TIMESTAMP NOT NULL
);
CREATE INDEX account_audit_account_key ON account_audit (pubkey, write_version);
CREATE FUNCTION audit_account_update() RETURNS trigger AS $audit_account_update$
BEGIN
INSERT INTO account_audit (pubkey, owner, lamports, slot, executable, rent_epoch, data, write_version, updated_on)

View File

@ -7,3 +7,19 @@ DROP FUNCTION audit_account_update;
DROP TABLE account_audit;
DROP TABLE account;
DROP TABLE slot;
DROP TABLE transaction;
DROP TYPE "TransactionError" CASCADE;
DROP TYPE "TransactionErrorCode" CASCADE;
DROP TYPE "MappedMessage" CASCADE;
DROP TYPE "MappedAddresses" CASCADE;
DROP TYPE "TransactionMessageV0" CASCADE;
DROP TYPE "AddressMapIndexes" CASCADE;
DROP TYPE "TransactionMessage" CASCADE;
DROP TYPE "TransactionMessageHeader" CASCADE;
DROP TYPE "TransactionStatusMeta" CASCADE;
DROP TYPE "RewardType" CASCADE;
DROP TYPE "Reward" CASCADE;
DROP TYPE "TransactionTokenBalance" CASCADE;
DROP TYPE "InnerInstructions" CASCADE;
DROP TYPE "CompiledInstruction" CASCADE;

View File

@ -5,13 +5,15 @@ use {
crate::{
accounts_selector::AccountsSelector,
postgres_client::{ParallelPostgresClient, PostgresClientBuilder},
transaction_selector::TransactionSelector,
},
bs58,
log::*,
serde_derive::{Deserialize, Serialize},
serde_json,
solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions, Result, SlotStatus,
AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions,
ReplicaTransactionInfoVersions, Result, SlotStatus,
},
solana_metrics::*,
std::{fs::File, io::Read},
@ -22,6 +24,7 @@ use {
pub struct AccountsDbPluginPostgres {
client: Option<ParallelPostgresClient>,
accounts_selector: Option<AccountsSelector>,
transaction_selector: Option<TransactionSelector>,
}
impl std::fmt::Debug for AccountsDbPluginPostgres {
@ -89,6 +92,20 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
/// from restoring a snapshot. The default is '10'.
/// * "panic_on_db_errors", optional, contols if to panic when there are errors replicating data to the
/// PostgreSQL database. The default is 'false'.
/// * "transaction_selector", optional, controls if and what transaction to store. If this field is missing
/// None of the transction is stored.
/// "transaction_selector" : {
/// "mentions" : \["pubkey-1", "pubkey-2", ..., "pubkey-n"\],
/// }
/// The `mentions` field support wildcard to select all transaction or all 'vote' transactions:
/// For example, to select all transactions:
/// "transaction_selector" : {
/// "mentions" : \["*"\],
/// }
/// To select all vote transactions:
/// "transaction_selector" : {
/// "mentions" : \["all_votes"\],
/// }
/// # Examples
///
/// {
@ -114,6 +131,7 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
let result: serde_json::Value = serde_json::from_str(&contents).unwrap();
self.accounts_selector = Some(Self::create_accounts_selector_from_config(&result));
self.transaction_selector = Some(Self::create_transaction_selector_from_config(&result));
let result: serde_json::Result<AccountsDbPluginPostgresConfig> =
serde_json::from_str(&contents);
@ -277,6 +295,46 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
Ok(())
}
fn notify_transaction(
&mut self,
transaction_info: ReplicaTransactionInfoVersions,
slot: u64,
) -> Result<()> {
match &mut self.client {
None => {
return Err(AccountsDbPluginError::Custom(Box::new(
AccountsDbPluginPostgresError::DataStoreConnectionError {
msg: "There is no connection to the PostgreSQL database.".to_string(),
},
)));
}
Some(client) => match transaction_info {
ReplicaTransactionInfoVersions::V0_0_1(transaction_info) => {
if let Some(transaction_selector) = &self.transaction_selector {
if !transaction_selector.is_transaction_selected(
transaction_info.is_vote,
transaction_info.transaction.message().account_keys_iter(),
) {
return Ok(());
}
} else {
return Ok(());
}
let result = client.log_transaction_info(transaction_info, slot);
if let Err(err) = result {
return Err(AccountsDbPluginError::SlotStatusUpdateError{
msg: format!("Failed to persist the transaction info to the PostgreSQL database. Error: {:?}", err)
});
}
}
},
}
Ok(())
}
/// Check if the plugin is interested in account data
/// Default is true -- if the plugin is not interested in
/// account data, please return false.
@ -285,6 +343,13 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
.as_ref()
.map_or_else(|| false, |selector| selector.is_enabled())
}
/// Check if the plugin is interested in transaction data
fn transaction_notifications_enabled(&self) -> bool {
self.transaction_selector
.as_ref()
.map_or_else(|| false, |selector| selector.is_enabled())
}
}
impl AccountsDbPluginPostgres {
@ -320,12 +385,30 @@ impl AccountsDbPluginPostgres {
}
}
pub fn new() -> Self {
AccountsDbPluginPostgres {
client: None,
accounts_selector: None,
fn create_transaction_selector_from_config(config: &serde_json::Value) -> TransactionSelector {
let transaction_selector = &config["transaction_selector"];
if transaction_selector.is_null() {
TransactionSelector::default()
} else {
let accounts = &transaction_selector["mentions"];
let accounts: Vec<String> = if accounts.is_array() {
accounts
.as_array()
.unwrap()
.iter()
.map(|val| val.as_str().unwrap().to_string())
.collect()
} else {
Vec::default()
};
TransactionSelector::new(&accounts)
}
}
pub fn new() -> Self {
Self::default()
}
}
#[no_mangle]

View File

@ -1,4 +1,5 @@
#![allow(clippy::integer_arithmetic)]
mod postgres_client_transaction;
/// A concurrent implementation for writing accounts into the PostgreSQL in parallel.
use {
@ -9,6 +10,7 @@ use {
crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender},
log::*,
postgres::{Client, NoTls, Statement},
postgres_client_transaction::LogTransactionRequest,
solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
AccountsDbPluginError, ReplicaAccountInfo, SlotStatus,
},
@ -23,7 +25,7 @@ use {
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
},
tokio_postgres::types::ToSql,
tokio_postgres::types,
};
/// The maximum asynchronous requests allowed in the channel to avoid excessive
@ -41,6 +43,7 @@ struct PostgresSqlClientWrapper {
bulk_account_insert_stmt: Statement,
update_slot_with_parent_stmt: Statement,
update_slot_without_parent_stmt: Statement,
update_transaction_log_stmt: Statement,
}
pub struct SimplePostgresClient {
@ -187,6 +190,11 @@ pub trait PostgresClient {
) -> Result<(), AccountsDbPluginError>;
fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError>;
fn log_transaction(
&mut self,
transaction_log_info: LogTransactionRequest,
) -> Result<(), AccountsDbPluginError>;
}
impl SimplePostgresClient {
@ -407,7 +415,7 @@ impl SimplePostgresClient {
if self.pending_account_updates.len() == self.batch_size {
let mut measure = Measure::start("accountsdb-plugin-postgres-prepare-values");
let mut values: Vec<&(dyn ToSql + Sync)> =
let mut values: Vec<&(dyn types::ToSql + Sync)> =
Vec::with_capacity(self.batch_size * ACCOUNT_COLUMN_COUNT);
let updated_on = Utc::now().naive_utc();
for j in 0..self.batch_size {
@ -491,6 +499,8 @@ impl SimplePostgresClient {
Self::build_slot_upsert_statement_with_parent(&mut client, config)?;
let update_slot_without_parent_stmt =
Self::build_slot_upsert_statement_without_parent(&mut client, config)?;
let update_transaction_log_stmt =
Self::build_transaction_info_upsert_statement(&mut client, config)?;
let batch_size = config
.batch_size
@ -505,6 +515,7 @@ impl SimplePostgresClient {
bulk_account_insert_stmt,
update_slot_with_parent_stmt,
update_slot_without_parent_stmt,
update_transaction_log_stmt,
}),
})
}
@ -573,6 +584,13 @@ impl PostgresClient for SimplePostgresClient {
fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> {
self.flush_buffered_writes()
}
fn log_transaction(
&mut self,
transaction_log_info: LogTransactionRequest,
) -> Result<(), AccountsDbPluginError> {
self.log_transaction_impl(transaction_log_info)
}
}
struct UpdateAccountRequest {
@ -586,9 +604,11 @@ struct UpdateSlotRequest {
slot_status: SlotStatus,
}
#[warn(clippy::large_enum_variant)]
enum DbWorkItem {
UpdateAccount(UpdateAccountRequest),
UpdateSlot(UpdateSlotRequest),
UpdateAccount(Box<UpdateAccountRequest>),
UpdateSlot(Box<UpdateSlotRequest>),
LogTransaction(Box<LogTransactionRequest>),
}
impl PostgresClientWorker {
@ -649,6 +669,9 @@ impl PostgresClientWorker {
}
}
}
DbWorkItem::LogTransaction(transaction_log_info) => {
self.client.log_transaction(*transaction_log_info)?;
}
},
Err(err) => match err {
RecvTimeoutError::Timeout => {
@ -782,10 +805,10 @@ impl ParallelPostgresClient {
);
}
let mut measure = Measure::start("accountsdb-plugin-posgres-create-work-item");
let wrk_item = DbWorkItem::UpdateAccount(UpdateAccountRequest {
let wrk_item = DbWorkItem::UpdateAccount(Box::new(UpdateAccountRequest {
account: DbAccountInfo::new(account, slot),
is_startup,
});
}));
measure.stop();
@ -825,11 +848,14 @@ impl ParallelPostgresClient {
parent: Option<u64>,
status: SlotStatus,
) -> Result<(), AccountsDbPluginError> {
if let Err(err) = self.sender.send(DbWorkItem::UpdateSlot(UpdateSlotRequest {
if let Err(err) = self
.sender
.send(DbWorkItem::UpdateSlot(Box::new(UpdateSlotRequest {
slot,
parent,
slot_status: status,
})) {
})))
{
return Err(AccountsDbPluginError::SlotStatusUpdateError {
msg: format!("Failed to update the slot {:?}, error: {:?}", slot, err),
});

File diff suppressed because it is too large Load Diff