diff --git a/Cargo.lock b/Cargo.lock index fa9e8c3564..cb387e1595 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3042,6 +3042,17 @@ dependencies = [ "tokio-postgres", ] +[[package]] +name = "postgres-derive" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c857dd221cb0e7d8414b894a0ce29eae44d453dda0baa132447878e75e701477" +dependencies = [ + "proc-macro2 1.0.32", + "quote 1.0.10", + "syn 1.0.81", +] + [[package]] name = "postgres-protocol" version = "0.6.2" @@ -3069,6 +3080,7 @@ dependencies = [ "bytes 1.1.0", "chrono", "fallible-iterator", + "postgres-derive", "postgres-protocol", ] @@ -4310,14 +4322,18 @@ dependencies = [ "crossbeam-channel", "log 0.4.14", "postgres", + "postgres-types", "serde", "serde_derive", "serde_json", + "solana-account-decoder", "solana-accountsdb-plugin-interface", "solana-logger 1.9.0", "solana-measure", "solana-metrics", + "solana-runtime", "solana-sdk", + "solana-transaction-status", "thiserror", "tokio-postgres", ] diff --git a/accountsdb-plugin-postgres/Cargo.toml b/accountsdb-plugin-postgres/Cargo.toml index b5b7b2cd92..d094d86337 100644 --- a/accountsdb-plugin-postgres/Cargo.toml +++ b/accountsdb-plugin-postgres/Cargo.toml @@ -18,6 +18,7 @@ chrono = { version = "0.4.11", features = ["serde"] } crossbeam-channel = "0.5" log = "0.4.14" postgres = { version = "0.19.2", features = ["with-chrono-0_4"] } +postgres-types = { version = "0.2.2", features = ["derive"] } serde = "1.0.130" serde_derive = "1.0.103" serde_json = "1.0.72" @@ -25,8 +26,14 @@ solana-accountsdb-plugin-interface = { path = "../accountsdb-plugin-interface", solana-logger = { path = "../logger", version = "=1.9.0" } solana-measure = { path = "../measure", version = "=1.9.0" } solana-metrics = { path = "../metrics", version = "=1.9.0" } +solana-runtime = { path = "../runtime", version = "=1.9.0" } solana-sdk = { path = "../sdk", version = "=1.9.0" } +solana-transaction-status = { path = "../transaction-status", version = "=1.9.0" } thiserror = "1.0.30" tokio-postgres = "0.7.4" + +[dev-dependencies] +solana-account-decoder = { path = "../account-decoder", version = "=1.9.0" } + [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/accountsdb-plugin-postgres/scripts/create_schema.sql b/accountsdb-plugin-postgres/scripts/create_schema.sql index 1c02253ac7..ec0de205de 100644 --- a/accountsdb-plugin-postgres/scripts/create_schema.sql +++ b/accountsdb-plugin-postgres/scripts/create_schema.sql @@ -20,10 +20,137 @@ CREATE TABLE account ( CREATE TABLE slot ( slot BIGINT PRIMARY KEY, parent BIGINT, - status varchar(16) NOT NULL, + status VARCHAR(16) NOT NULL, updated_on TIMESTAMP NOT NULL ); +-- Types for Transactions + +Create TYPE "TransactionErrorCode" AS ENUM ( + 'AccountInUse', + 'AccountLoadedTwice', + 'AccountNotFound', + 'ProgramAccountNotFound', + 'InsufficientFundsForFee', + 'InvalidAccountForFee', + 'AlreadyProcessed', + 'BlockhashNotFound', + 'InstructionError', + 'CallChainTooDeep', + 'MissingSignatureForFee', + 'InvalidAccountIndex', + 'SignatureFailure', + 'InvalidProgramForExecution', + 'SanitizeFailure', + 'ClusterMaintenance', + 'AccountBorrowOutstanding', + 'WouldExceedMaxAccountCostLimit', + 'WouldExceedMaxBlockCostLimit', + 'UnsupportedVersion', + 'InvalidWritableAccount' +); + +CREATE TYPE "TransactionError" AS ( + error_code "TransactionErrorCode", + error_detail VARCHAR(256) +); + +CREATE TYPE "CompiledInstruction" AS ( + program_id_index SMALLINT, + accounts SMALLINT[], + data BYTEA +); + +CREATE TYPE "InnerInstructions" AS ( + index SMALLINT, + instructions "CompiledInstruction"[] +); + +CREATE TYPE "TransactionTokenBalance" AS ( + account_index SMALLINT, + mint VARCHAR(44), + ui_token_amount DOUBLE PRECISION, + owner VARCHAR(44) +); + +Create TYPE "RewardType" AS ENUM ( + 'Fee', + 'Rent', + 'Staking', + 'Voting' +); + +CREATE TYPE "Reward" AS ( + pubkey VARCHAR(44), + lamports BIGINT, + post_balance BIGINT, + reward_type "RewardType", + commission SMALLINT +); + +CREATE TYPE "TransactionStatusMeta" AS ( + error "TransactionError", + fee BIGINT, + pre_balances BIGINT[], + post_balances BIGINT[], + inner_instructions "InnerInstructions"[], + log_messages TEXT[], + pre_token_balances "TransactionTokenBalance"[], + post_token_balances "TransactionTokenBalance"[], + rewards "Reward"[] +); + +CREATE TYPE "TransactionMessageHeader" AS ( + num_required_signatures SMALLINT, + num_readonly_signed_accounts SMALLINT, + num_readonly_unsigned_accounts SMALLINT +); + +CREATE TYPE "TransactionMessage" AS ( + header "TransactionMessageHeader", + account_keys BYTEA[], + recent_blockhash BYTEA, + instructions "CompiledInstruction"[] +); + +CREATE TYPE "AddressMapIndexes" AS ( + writable SMALLINT[], + readonly SMALLINT[] +); + +CREATE TYPE "TransactionMessageV0" AS ( + header "TransactionMessageHeader", + account_keys BYTEA[], + recent_blockhash BYTEA, + instructions "CompiledInstruction"[], + address_map_indexes "AddressMapIndexes"[] +); + +CREATE TYPE "MappedAddresses" AS ( + writable BYTEA[], + readonly BYTEA[] +); + +CREATE TYPE "MappedMessage" AS ( + message "TransactionMessageV0", + mapped_addresses "MappedAddresses" +); + +-- The table storing transactions +CREATE TABLE transaction ( + slot BIGINT NOT NULL, + signature BYTEA NOT NULL, + is_vote BOOL NOT NULL, + message_type SMALLINT, -- 0: legacy, 1: v0 message + legacy_message "TransactionMessage", + v0_mapped_message "MappedMessage", + signatures BYTEA[], + message_hash BYTEA, + meta "TransactionStatusMeta", + updated_on TIMESTAMP NOT NULL, + CONSTRAINT transaction_pk PRIMARY KEY (slot, signature) +); + /** * The following is for keeping historical data for accounts and is not required for plugin to work. */ @@ -40,6 +167,8 @@ CREATE TABLE account_audit ( updated_on TIMESTAMP NOT NULL ); +CREATE INDEX account_audit_account_key ON account_audit (pubkey, write_version); + CREATE FUNCTION audit_account_update() RETURNS trigger AS $audit_account_update$ BEGIN INSERT INTO account_audit (pubkey, owner, lamports, slot, executable, rent_epoch, data, write_version, updated_on) diff --git a/accountsdb-plugin-postgres/scripts/drop_schema.sql b/accountsdb-plugin-postgres/scripts/drop_schema.sql index db8aeae552..419ab44169 100644 --- a/accountsdb-plugin-postgres/scripts/drop_schema.sql +++ b/accountsdb-plugin-postgres/scripts/drop_schema.sql @@ -7,3 +7,19 @@ DROP FUNCTION audit_account_update; DROP TABLE account_audit; DROP TABLE account; DROP TABLE slot; +DROP TABLE transaction; + +DROP TYPE "TransactionError" CASCADE; +DROP TYPE "TransactionErrorCode" CASCADE; +DROP TYPE "MappedMessage" CASCADE; +DROP TYPE "MappedAddresses" CASCADE; +DROP TYPE "TransactionMessageV0" CASCADE; +DROP TYPE "AddressMapIndexes" CASCADE; +DROP TYPE "TransactionMessage" CASCADE; +DROP TYPE "TransactionMessageHeader" CASCADE; +DROP TYPE "TransactionStatusMeta" CASCADE; +DROP TYPE "RewardType" CASCADE; +DROP TYPE "Reward" CASCADE; +DROP TYPE "TransactionTokenBalance" CASCADE; +DROP TYPE "InnerInstructions" CASCADE; +DROP TYPE "CompiledInstruction" CASCADE; diff --git a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs index 111b5f6dd9..698a0512a5 100644 --- a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs +++ b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs @@ -5,13 +5,15 @@ use { crate::{ accounts_selector::AccountsSelector, postgres_client::{ParallelPostgresClient, PostgresClientBuilder}, + transaction_selector::TransactionSelector, }, bs58, log::*, serde_derive::{Deserialize, Serialize}, serde_json, solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ - AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions, Result, SlotStatus, + AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions, + ReplicaTransactionInfoVersions, Result, SlotStatus, }, solana_metrics::*, std::{fs::File, io::Read}, @@ -22,6 +24,7 @@ use { pub struct AccountsDbPluginPostgres { client: Option, accounts_selector: Option, + transaction_selector: Option, } impl std::fmt::Debug for AccountsDbPluginPostgres { @@ -89,6 +92,20 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { /// from restoring a snapshot. The default is '10'. /// * "panic_on_db_errors", optional, contols if to panic when there are errors replicating data to the /// PostgreSQL database. The default is 'false'. + /// * "transaction_selector", optional, controls if and what transaction to store. If this field is missing + /// None of the transction is stored. + /// "transaction_selector" : { + /// "mentions" : \["pubkey-1", "pubkey-2", ..., "pubkey-n"\], + /// } + /// The `mentions` field support wildcard to select all transaction or all 'vote' transactions: + /// For example, to select all transactions: + /// "transaction_selector" : { + /// "mentions" : \["*"\], + /// } + /// To select all vote transactions: + /// "transaction_selector" : { + /// "mentions" : \["all_votes"\], + /// } /// # Examples /// /// { @@ -114,6 +131,7 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { let result: serde_json::Value = serde_json::from_str(&contents).unwrap(); self.accounts_selector = Some(Self::create_accounts_selector_from_config(&result)); + self.transaction_selector = Some(Self::create_transaction_selector_from_config(&result)); let result: serde_json::Result = serde_json::from_str(&contents); @@ -277,6 +295,46 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { Ok(()) } + fn notify_transaction( + &mut self, + transaction_info: ReplicaTransactionInfoVersions, + slot: u64, + ) -> Result<()> { + match &mut self.client { + None => { + return Err(AccountsDbPluginError::Custom(Box::new( + AccountsDbPluginPostgresError::DataStoreConnectionError { + msg: "There is no connection to the PostgreSQL database.".to_string(), + }, + ))); + } + Some(client) => match transaction_info { + ReplicaTransactionInfoVersions::V0_0_1(transaction_info) => { + if let Some(transaction_selector) = &self.transaction_selector { + if !transaction_selector.is_transaction_selected( + transaction_info.is_vote, + transaction_info.transaction.message().account_keys_iter(), + ) { + return Ok(()); + } + } else { + return Ok(()); + } + + let result = client.log_transaction_info(transaction_info, slot); + + if let Err(err) = result { + return Err(AccountsDbPluginError::SlotStatusUpdateError{ + msg: format!("Failed to persist the transaction info to the PostgreSQL database. Error: {:?}", err) + }); + } + } + }, + } + + Ok(()) + } + /// Check if the plugin is interested in account data /// Default is true -- if the plugin is not interested in /// account data, please return false. @@ -285,6 +343,13 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { .as_ref() .map_or_else(|| false, |selector| selector.is_enabled()) } + + /// Check if the plugin is interested in transaction data + fn transaction_notifications_enabled(&self) -> bool { + self.transaction_selector + .as_ref() + .map_or_else(|| false, |selector| selector.is_enabled()) + } } impl AccountsDbPluginPostgres { @@ -320,12 +385,30 @@ impl AccountsDbPluginPostgres { } } - pub fn new() -> Self { - AccountsDbPluginPostgres { - client: None, - accounts_selector: None, + fn create_transaction_selector_from_config(config: &serde_json::Value) -> TransactionSelector { + let transaction_selector = &config["transaction_selector"]; + + if transaction_selector.is_null() { + TransactionSelector::default() + } else { + let accounts = &transaction_selector["mentions"]; + let accounts: Vec = if accounts.is_array() { + accounts + .as_array() + .unwrap() + .iter() + .map(|val| val.as_str().unwrap().to_string()) + .collect() + } else { + Vec::default() + }; + TransactionSelector::new(&accounts) } } + + pub fn new() -> Self { + Self::default() + } } #[no_mangle] diff --git a/accountsdb-plugin-postgres/src/postgres_client.rs b/accountsdb-plugin-postgres/src/postgres_client.rs index 51f5d407c2..1ae7c7ad7c 100644 --- a/accountsdb-plugin-postgres/src/postgres_client.rs +++ b/accountsdb-plugin-postgres/src/postgres_client.rs @@ -1,4 +1,5 @@ #![allow(clippy::integer_arithmetic)] +mod postgres_client_transaction; /// A concurrent implementation for writing accounts into the PostgreSQL in parallel. use { @@ -9,6 +10,7 @@ use { crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender}, log::*, postgres::{Client, NoTls, Statement}, + postgres_client_transaction::LogTransactionRequest, solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ AccountsDbPluginError, ReplicaAccountInfo, SlotStatus, }, @@ -23,7 +25,7 @@ use { thread::{self, sleep, Builder, JoinHandle}, time::Duration, }, - tokio_postgres::types::ToSql, + tokio_postgres::types, }; /// The maximum asynchronous requests allowed in the channel to avoid excessive @@ -41,6 +43,7 @@ struct PostgresSqlClientWrapper { bulk_account_insert_stmt: Statement, update_slot_with_parent_stmt: Statement, update_slot_without_parent_stmt: Statement, + update_transaction_log_stmt: Statement, } pub struct SimplePostgresClient { @@ -187,6 +190,11 @@ pub trait PostgresClient { ) -> Result<(), AccountsDbPluginError>; fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError>; + + fn log_transaction( + &mut self, + transaction_log_info: LogTransactionRequest, + ) -> Result<(), AccountsDbPluginError>; } impl SimplePostgresClient { @@ -407,7 +415,7 @@ impl SimplePostgresClient { if self.pending_account_updates.len() == self.batch_size { let mut measure = Measure::start("accountsdb-plugin-postgres-prepare-values"); - let mut values: Vec<&(dyn ToSql + Sync)> = + let mut values: Vec<&(dyn types::ToSql + Sync)> = Vec::with_capacity(self.batch_size * ACCOUNT_COLUMN_COUNT); let updated_on = Utc::now().naive_utc(); for j in 0..self.batch_size { @@ -491,6 +499,8 @@ impl SimplePostgresClient { Self::build_slot_upsert_statement_with_parent(&mut client, config)?; let update_slot_without_parent_stmt = Self::build_slot_upsert_statement_without_parent(&mut client, config)?; + let update_transaction_log_stmt = + Self::build_transaction_info_upsert_statement(&mut client, config)?; let batch_size = config .batch_size @@ -505,6 +515,7 @@ impl SimplePostgresClient { bulk_account_insert_stmt, update_slot_with_parent_stmt, update_slot_without_parent_stmt, + update_transaction_log_stmt, }), }) } @@ -573,6 +584,13 @@ impl PostgresClient for SimplePostgresClient { fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> { self.flush_buffered_writes() } + + fn log_transaction( + &mut self, + transaction_log_info: LogTransactionRequest, + ) -> Result<(), AccountsDbPluginError> { + self.log_transaction_impl(transaction_log_info) + } } struct UpdateAccountRequest { @@ -586,9 +604,11 @@ struct UpdateSlotRequest { slot_status: SlotStatus, } +#[warn(clippy::large_enum_variant)] enum DbWorkItem { - UpdateAccount(UpdateAccountRequest), - UpdateSlot(UpdateSlotRequest), + UpdateAccount(Box), + UpdateSlot(Box), + LogTransaction(Box), } impl PostgresClientWorker { @@ -649,6 +669,9 @@ impl PostgresClientWorker { } } } + DbWorkItem::LogTransaction(transaction_log_info) => { + self.client.log_transaction(*transaction_log_info)?; + } }, Err(err) => match err { RecvTimeoutError::Timeout => { @@ -782,10 +805,10 @@ impl ParallelPostgresClient { ); } let mut measure = Measure::start("accountsdb-plugin-posgres-create-work-item"); - let wrk_item = DbWorkItem::UpdateAccount(UpdateAccountRequest { + let wrk_item = DbWorkItem::UpdateAccount(Box::new(UpdateAccountRequest { account: DbAccountInfo::new(account, slot), is_startup, - }); + })); measure.stop(); @@ -825,11 +848,14 @@ impl ParallelPostgresClient { parent: Option, status: SlotStatus, ) -> Result<(), AccountsDbPluginError> { - if let Err(err) = self.sender.send(DbWorkItem::UpdateSlot(UpdateSlotRequest { - slot, - parent, - slot_status: status, - })) { + if let Err(err) = self + .sender + .send(DbWorkItem::UpdateSlot(Box::new(UpdateSlotRequest { + slot, + parent, + slot_status: status, + }))) + { return Err(AccountsDbPluginError::SlotStatusUpdateError { msg: format!("Failed to update the slot {:?}, error: {:?}", slot, err), }); diff --git a/accountsdb-plugin-postgres/src/postgres_client/postgres_client_transaction.rs b/accountsdb-plugin-postgres/src/postgres_client/postgres_client_transaction.rs new file mode 100644 index 0000000000..bdc30b158c --- /dev/null +++ b/accountsdb-plugin-postgres/src/postgres_client/postgres_client_transaction.rs @@ -0,0 +1,1335 @@ +/// Module responsible for handling persisting transaction data to the PostgreSQL +/// database. +use { + crate::{ + accountsdb_plugin_postgres::{ + AccountsDbPluginPostgresConfig, AccountsDbPluginPostgresError, + }, + postgres_client::{DbWorkItem, ParallelPostgresClient, SimplePostgresClient}, + }, + chrono::Utc, + log::*, + postgres::{Client, Statement}, + postgres_types::ToSql, + solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ + AccountsDbPluginError, ReplicaTransactionInfo, + }, + solana_runtime::bank::RewardType, + solana_sdk::{ + instruction::CompiledInstruction, + message::{ + v0::{self, AddressMapIndexes}, + MappedAddresses, MappedMessage, Message, MessageHeader, SanitizedMessage, + }, + transaction::TransactionError, + }, + solana_transaction_status::{ + InnerInstructions, Reward, TransactionStatusMeta, TransactionTokenBalance, + }, +}; + +const MAX_TRANSACTION_STATUS_LEN: usize = 256; + +#[derive(Clone, Debug, ToSql)] +#[postgres(name = "CompiledInstruction")] +pub struct DbCompiledInstruction { + pub program_id_index: i16, + pub accounts: Vec, + pub data: Vec, +} + +#[derive(Clone, Debug, ToSql)] +#[postgres(name = "InnerInstructions")] +pub struct DbInnerInstructions { + pub index: i16, + pub instructions: Vec, +} + +#[derive(Clone, Debug, ToSql)] +#[postgres(name = "TransactionTokenBalance")] +pub struct DbTransactionTokenBalance { + pub account_index: i16, + pub mint: String, + pub ui_token_amount: Option, + pub owner: String, +} + +#[derive(Clone, Debug, ToSql, PartialEq)] +#[postgres(name = "RewardType")] +pub enum DbRewardType { + Fee, + Rent, + Staking, + Voting, +} + +#[derive(Clone, Debug, ToSql)] +#[postgres(name = "Reward")] +pub struct DbReward { + pub pubkey: String, + pub lamports: i64, + pub post_balance: i64, + pub reward_type: Option, + pub commission: Option, +} + +#[derive(Clone, Debug, ToSql)] +#[postgres(name = "TransactionStatusMeta")] +pub struct DbTransactionStatusMeta { + pub error: Option, + pub fee: i64, + pub pre_balances: Vec, + pub post_balances: Vec, + pub inner_instructions: Option>, + pub log_messages: Option>, + pub pre_token_balances: Option>, + pub post_token_balances: Option>, + pub rewards: Option>, +} + +#[derive(Clone, Debug, ToSql)] +#[postgres(name = "TransactionMessageHeader")] +pub struct DbTransactionMessageHeader { + pub num_required_signatures: i16, + pub num_readonly_signed_accounts: i16, + pub num_readonly_unsigned_accounts: i16, +} + +#[derive(Clone, Debug, ToSql)] +#[postgres(name = "TransactionMessage")] +pub struct DbTransactionMessage { + pub header: DbTransactionMessageHeader, + pub account_keys: Vec>, + pub recent_blockhash: Vec, + pub instructions: Vec, +} + +#[derive(Clone, Debug, ToSql)] +#[postgres(name = "AddressMapIndexes")] +pub struct DbAddressMapIndexes { + pub writable: Vec, + pub readonly: Vec, +} + +#[derive(Clone, Debug, ToSql)] +#[postgres(name = "TransactionMessageV0")] +pub struct DbTransactionMessageV0 { + pub header: DbTransactionMessageHeader, + pub account_keys: Vec>, + pub recent_blockhash: Vec, + pub instructions: Vec, + pub address_map_indexes: Vec, +} + +#[derive(Clone, Debug, ToSql)] +#[postgres(name = "MappedAddresses")] +pub struct DbMappedAddresses { + pub writable: Vec>, + pub readonly: Vec>, +} + +#[derive(Clone, Debug, ToSql)] +#[postgres(name = "MappedMessage")] +pub struct DbMappedMessage { + pub message: DbTransactionMessageV0, + pub mapped_addresses: DbMappedAddresses, +} + +pub struct DbTransaction { + pub signature: Vec, + pub is_vote: bool, + pub slot: i64, + pub message_type: i16, + pub legacy_message: Option, + pub v0_mapped_message: Option, + pub message_hash: Vec, + pub meta: DbTransactionStatusMeta, + pub signatures: Vec>, +} + +pub struct LogTransactionRequest { + pub transaction_info: DbTransaction, +} + +impl From<&AddressMapIndexes> for DbAddressMapIndexes { + fn from(address_map_indexes: &AddressMapIndexes) -> Self { + Self { + writable: address_map_indexes + .writable + .iter() + .map(|address_idx| *address_idx as i16) + .collect(), + readonly: address_map_indexes + .readonly + .iter() + .map(|address_idx| *address_idx as i16) + .collect(), + } + } +} + +impl From<&MappedAddresses> for DbMappedAddresses { + fn from(mapped_addresses: &MappedAddresses) -> Self { + Self { + writable: mapped_addresses + .writable + .iter() + .map(|pubkey| pubkey.as_ref().to_vec()) + .collect(), + readonly: mapped_addresses + .readonly + .iter() + .map(|pubkey| pubkey.as_ref().to_vec()) + .collect(), + } + } +} + +impl From<&MessageHeader> for DbTransactionMessageHeader { + fn from(header: &MessageHeader) -> Self { + Self { + num_required_signatures: header.num_required_signatures as i16, + num_readonly_signed_accounts: header.num_readonly_signed_accounts as i16, + num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as i16, + } + } +} + +impl From<&CompiledInstruction> for DbCompiledInstruction { + fn from(instruction: &CompiledInstruction) -> Self { + Self { + program_id_index: instruction.program_id_index as i16, + accounts: instruction + .accounts + .iter() + .map(|account_idx| *account_idx as i16) + .collect(), + data: instruction.data.clone(), + } + } +} + +impl From<&Message> for DbTransactionMessage { + fn from(message: &Message) -> Self { + Self { + header: DbTransactionMessageHeader::from(&message.header), + account_keys: message + .account_keys + .iter() + .map(|key| key.as_ref().to_vec()) + .collect(), + recent_blockhash: message.recent_blockhash.as_ref().to_vec(), + instructions: message + .instructions + .iter() + .map(DbCompiledInstruction::from) + .collect(), + } + } +} + +impl From<&v0::Message> for DbTransactionMessageV0 { + fn from(message: &v0::Message) -> Self { + Self { + header: DbTransactionMessageHeader::from(&message.header), + account_keys: message + .account_keys + .iter() + .map(|key| key.as_ref().to_vec()) + .collect(), + recent_blockhash: message.recent_blockhash.as_ref().to_vec(), + instructions: message + .instructions + .iter() + .map(DbCompiledInstruction::from) + .collect(), + address_map_indexes: message + .address_map_indexes + .iter() + .map(DbAddressMapIndexes::from) + .collect(), + } + } +} + +impl From<&MappedMessage> for DbMappedMessage { + fn from(message: &MappedMessage) -> Self { + Self { + message: DbTransactionMessageV0::from(&message.message), + mapped_addresses: DbMappedAddresses::from(&message.mapped_addresses), + } + } +} + +impl From<&InnerInstructions> for DbInnerInstructions { + fn from(instructions: &InnerInstructions) -> Self { + Self { + index: instructions.index as i16, + instructions: instructions + .instructions + .iter() + .map(DbCompiledInstruction::from) + .collect(), + } + } +} + +impl From<&RewardType> for DbRewardType { + fn from(reward_type: &RewardType) -> Self { + match reward_type { + RewardType::Fee => Self::Fee, + RewardType::Rent => Self::Rent, + RewardType::Staking => Self::Staking, + RewardType::Voting => Self::Voting, + } + } +} + +fn get_reward_type(reward: &Option) -> Option { + reward.as_ref().map(DbRewardType::from) +} + +impl From<&Reward> for DbReward { + fn from(reward: &Reward) -> Self { + Self { + pubkey: reward.pubkey.clone(), + lamports: reward.lamports as i64, + post_balance: reward.post_balance as i64, + reward_type: get_reward_type(&reward.reward_type), + commission: reward + .commission + .as_ref() + .map(|commission| *commission as i16), + } + } +} + +#[derive(Clone, Debug, ToSql, PartialEq)] +#[postgres(name = "TransactionErrorCode")] +pub enum DbTransactionErrorCode { + AccountInUse, + AccountLoadedTwice, + AccountNotFound, + ProgramAccountNotFound, + InsufficientFundsForFee, + InvalidAccountForFee, + AlreadyProcessed, + BlockhashNotFound, + InstructionError, + CallChainTooDeep, + MissingSignatureForFee, + InvalidAccountIndex, + SignatureFailure, + InvalidProgramForExecution, + SanitizeFailure, + ClusterMaintenance, + AccountBorrowOutstanding, + WouldExceedMaxAccountCostLimit, + WouldExceedMaxBlockCostLimit, + UnsupportedVersion, + InvalidWritableAccount, +} + +impl From<&TransactionError> for DbTransactionErrorCode { + fn from(err: &TransactionError) -> Self { + match err { + TransactionError::AccountInUse => Self::AccountInUse, + TransactionError::AccountLoadedTwice => Self::AccountLoadedTwice, + TransactionError::AccountNotFound => Self::AccountNotFound, + TransactionError::ProgramAccountNotFound => Self::ProgramAccountNotFound, + TransactionError::InsufficientFundsForFee => Self::InsufficientFundsForFee, + TransactionError::InvalidAccountForFee => Self::InvalidAccountForFee, + TransactionError::AlreadyProcessed => Self::AlreadyProcessed, + TransactionError::BlockhashNotFound => Self::BlockhashNotFound, + TransactionError::InstructionError(_idx, _error) => Self::InstructionError, + TransactionError::CallChainTooDeep => Self::CallChainTooDeep, + TransactionError::MissingSignatureForFee => Self::MissingSignatureForFee, + TransactionError::InvalidAccountIndex => Self::InvalidAccountIndex, + TransactionError::SignatureFailure => Self::SignatureFailure, + TransactionError::InvalidProgramForExecution => Self::InvalidProgramForExecution, + TransactionError::SanitizeFailure => Self::SanitizeFailure, + TransactionError::ClusterMaintenance => Self::ClusterMaintenance, + TransactionError::AccountBorrowOutstanding => Self::AccountBorrowOutstanding, + TransactionError::WouldExceedMaxAccountCostLimit => { + Self::WouldExceedMaxAccountCostLimit + } + TransactionError::WouldExceedMaxBlockCostLimit => Self::WouldExceedMaxBlockCostLimit, + TransactionError::UnsupportedVersion => Self::UnsupportedVersion, + TransactionError::InvalidWritableAccount => Self::InvalidWritableAccount, + } + } +} + +#[derive(Clone, Debug, ToSql, PartialEq)] +#[postgres(name = "TransactionError")] +pub struct DbTransactionError { + error_code: DbTransactionErrorCode, + error_detail: Option, +} + +fn get_transaction_error(result: &Result<(), TransactionError>) -> Option { + if result.is_ok() { + return None; + } + + let error = result.as_ref().err().unwrap(); + Some(DbTransactionError { + error_code: DbTransactionErrorCode::from(error), + error_detail: { + if let TransactionError::InstructionError(idx, instruction_error) = error { + let mut error_detail = format!( + "InstructionError: idx ({}), error: ({})", + idx, instruction_error + ); + if error_detail.len() > MAX_TRANSACTION_STATUS_LEN { + error_detail = error_detail + .to_string() + .split_off(MAX_TRANSACTION_STATUS_LEN); + } + Some(error_detail) + } else { + None + } + }, + }) +} + +impl From<&TransactionTokenBalance> for DbTransactionTokenBalance { + fn from(token_balance: &TransactionTokenBalance) -> Self { + Self { + account_index: token_balance.account_index as i16, + mint: token_balance.mint.clone(), + ui_token_amount: token_balance.ui_token_amount.ui_amount, + owner: token_balance.owner.clone(), + } + } +} + +impl From<&TransactionStatusMeta> for DbTransactionStatusMeta { + fn from(meta: &TransactionStatusMeta) -> Self { + Self { + error: get_transaction_error(&meta.status), + fee: meta.fee as i64, + pre_balances: meta + .pre_balances + .iter() + .map(|balance| *balance as i64) + .collect(), + post_balances: meta + .post_balances + .iter() + .map(|balance| *balance as i64) + .collect(), + inner_instructions: meta + .inner_instructions + .as_ref() + .map(|instructions| instructions.iter().map(DbInnerInstructions::from).collect()), + log_messages: meta.log_messages.clone(), + pre_token_balances: meta.pre_token_balances.as_ref().map(|balances| { + balances + .iter() + .map(DbTransactionTokenBalance::from) + .collect() + }), + post_token_balances: meta.post_token_balances.as_ref().map(|balances| { + balances + .iter() + .map(DbTransactionTokenBalance::from) + .collect() + }), + rewards: meta + .rewards + .as_ref() + .map(|rewards| rewards.iter().map(DbReward::from).collect()), + } + } +} + +fn build_db_transaction(slot: u64, transaction_info: &ReplicaTransactionInfo) -> DbTransaction { + DbTransaction { + signature: transaction_info.signature.as_ref().to_vec(), + is_vote: transaction_info.is_vote, + slot: slot as i64, + message_type: match transaction_info.transaction.message() { + SanitizedMessage::Legacy(_) => 0, + SanitizedMessage::V0(_) => 1, + }, + legacy_message: match transaction_info.transaction.message() { + SanitizedMessage::Legacy(legacy_message) => { + Some(DbTransactionMessage::from(legacy_message)) + } + _ => None, + }, + v0_mapped_message: match transaction_info.transaction.message() { + SanitizedMessage::V0(mapped_message) => Some(DbMappedMessage::from(mapped_message)), + _ => None, + }, + signatures: transaction_info + .transaction + .signatures() + .iter() + .map(|signature| signature.as_ref().to_vec()) + .collect(), + message_hash: transaction_info + .transaction + .message_hash() + .as_ref() + .to_vec(), + meta: DbTransactionStatusMeta::from(transaction_info.transaction_status_meta), + } +} + +impl SimplePostgresClient { + pub(crate) fn build_transaction_info_upsert_statement( + client: &mut Client, + config: &AccountsDbPluginPostgresConfig, + ) -> Result { + let stmt = "INSERT INTO transaction AS txn (signature, is_vote, slot, message_type, legacy_message, \ + v0_mapped_message, signatures, message_hash, meta, updated_on) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)"; + + let stmt = client.prepare(stmt); + + match stmt { + Err(err) => { + return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError { + msg: format!( + "Error in preparing for the transaction update PostgreSQL database: ({}) host: {:?} user: {:?} config: {:?}", + err, config.host, config.user, config + ), + }))); + } + Ok(stmt) => Ok(stmt), + } + } + + pub(crate) fn log_transaction_impl( + &mut self, + transaction_log_info: LogTransactionRequest, + ) -> Result<(), AccountsDbPluginError> { + let client = self.client.get_mut().unwrap(); + let statement = &client.update_transaction_log_stmt; + let client = &mut client.client; + let updated_on = Utc::now().naive_utc(); + + let transaction_info = transaction_log_info.transaction_info; + let result = client.query( + statement, + &[ + &transaction_info.signature, + &transaction_info.is_vote, + &transaction_info.slot, + &transaction_info.message_type, + &transaction_info.legacy_message, + &transaction_info.v0_mapped_message, + &transaction_info.signatures, + &transaction_info.message_hash, + &transaction_info.meta, + &updated_on, + ], + ); + + if let Err(err) = result { + let msg = format!( + "Failed to persist the update of transaction info to the PostgreSQL database. Error: {:?}", + err + ); + error!("{}", msg); + return Err(AccountsDbPluginError::AccountsUpdateError { msg }); + } + + Ok(()) + } +} + +impl ParallelPostgresClient { + fn build_transaction_request( + slot: u64, + transaction_info: &ReplicaTransactionInfo, + ) -> LogTransactionRequest { + LogTransactionRequest { + transaction_info: build_db_transaction(slot, transaction_info), + } + } + + pub fn log_transaction_info( + &mut self, + transaction_info: &ReplicaTransactionInfo, + slot: u64, + ) -> Result<(), AccountsDbPluginError> { + let wrk_item = DbWorkItem::LogTransaction(Box::new(Self::build_transaction_request( + slot, + transaction_info, + ))); + + if let Err(err) = self.sender.send(wrk_item) { + return Err(AccountsDbPluginError::SlotStatusUpdateError { + msg: format!("Failed to update the transaction, error: {:?}", err), + }); + } + Ok(()) + } +} + +#[cfg(test)] +pub(crate) mod tests { + use { + super::*, + solana_account_decoder::parse_token::UiTokenAmount, + solana_sdk::{ + hash::Hash, + message::VersionedMessage, + pubkey::Pubkey, + sanitize::Sanitize, + signature::{Keypair, Signature, Signer}, + system_transaction, + transaction::{SanitizedTransaction, Transaction, VersionedTransaction}, + }, + }; + + fn check_compiled_instruction_equality( + compiled_instruction: &CompiledInstruction, + db_compiled_instruction: &DbCompiledInstruction, + ) { + assert_eq!( + compiled_instruction.program_id_index, + db_compiled_instruction.program_id_index as u8 + ); + assert_eq!( + compiled_instruction.accounts.len(), + db_compiled_instruction.accounts.len() + ); + assert_eq!( + compiled_instruction.data.len(), + db_compiled_instruction.data.len() + ); + + for i in 0..compiled_instruction.accounts.len() { + assert_eq!( + compiled_instruction.accounts[i], + db_compiled_instruction.accounts[i] as u8 + ) + } + for i in 0..compiled_instruction.data.len() { + assert_eq!( + compiled_instruction.data[i], + db_compiled_instruction.data[i] as u8 + ) + } + } + + #[test] + fn test_transform_compiled_instruction() { + let compiled_instruction = CompiledInstruction { + program_id_index: 0, + accounts: vec![1, 2, 3], + data: vec![4, 5, 6], + }; + + let db_compiled_instruction = DbCompiledInstruction::from(&compiled_instruction); + check_compiled_instruction_equality(&compiled_instruction, &db_compiled_instruction); + } + + fn check_inner_instructions_equality( + inner_instructions: &InnerInstructions, + db_inner_instructions: &DbInnerInstructions, + ) { + assert_eq!(inner_instructions.index, db_inner_instructions.index as u8); + assert_eq!( + inner_instructions.instructions.len(), + db_inner_instructions.instructions.len() + ); + + for i in 0..inner_instructions.instructions.len() { + check_compiled_instruction_equality( + &inner_instructions.instructions[i], + &db_inner_instructions.instructions[i], + ) + } + } + + #[test] + fn test_transform_inner_instructions() { + let inner_instructions = InnerInstructions { + index: 0, + instructions: vec![ + CompiledInstruction { + program_id_index: 0, + accounts: vec![1, 2, 3], + data: vec![4, 5, 6], + }, + CompiledInstruction { + program_id_index: 1, + accounts: vec![12, 13, 14], + data: vec![24, 25, 26], + }, + ], + }; + + let db_inner_instructions = DbInnerInstructions::from(&inner_instructions); + check_inner_instructions_equality(&inner_instructions, &db_inner_instructions); + } + + fn check_address_map_indexes_equality( + address_map_indexes: &AddressMapIndexes, + db_address_map_indexes: &DbAddressMapIndexes, + ) { + assert_eq!( + address_map_indexes.writable.len(), + db_address_map_indexes.writable.len() + ); + assert_eq!( + address_map_indexes.readonly.len(), + db_address_map_indexes.readonly.len() + ); + + for i in 0..address_map_indexes.writable.len() { + assert_eq!( + address_map_indexes.writable[i], + db_address_map_indexes.writable[i] as u8 + ) + } + for i in 0..address_map_indexes.readonly.len() { + assert_eq!( + address_map_indexes.readonly[i], + db_address_map_indexes.readonly[i] as u8 + ) + } + } + + #[test] + fn test_transform_address_map_indexes() { + let address_map_indexes = AddressMapIndexes { + writable: vec![1, 2, 3], + readonly: vec![4, 5, 6], + }; + + let db_address_map_indexes = DbAddressMapIndexes::from(&address_map_indexes); + check_address_map_indexes_equality(&address_map_indexes, &db_address_map_indexes); + } + + fn check_reward_equality(reward: &Reward, db_reward: &DbReward) { + assert_eq!(reward.pubkey, db_reward.pubkey); + assert_eq!(reward.lamports, db_reward.lamports); + assert_eq!(reward.post_balance, db_reward.post_balance as u64); + assert_eq!(get_reward_type(&reward.reward_type), db_reward.reward_type); + assert_eq!( + reward.commission, + db_reward + .commission + .as_ref() + .map(|commission| *commission as u8) + ); + } + + #[test] + fn test_transform_reward() { + let reward = Reward { + pubkey: Pubkey::new_unique().to_string(), + lamports: 1234, + post_balance: 45678, + reward_type: Some(RewardType::Fee), + commission: Some(12), + }; + + let db_reward = DbReward::from(&reward); + check_reward_equality(&reward, &db_reward); + } + + fn check_transaction_token_balance_equality( + transaction_token_balance: &TransactionTokenBalance, + db_transaction_token_balance: &DbTransactionTokenBalance, + ) { + assert_eq!( + transaction_token_balance.account_index, + db_transaction_token_balance.account_index as u8 + ); + assert_eq!( + transaction_token_balance.mint, + db_transaction_token_balance.mint + ); + assert_eq!( + transaction_token_balance.ui_token_amount.ui_amount, + db_transaction_token_balance.ui_token_amount + ); + assert_eq!( + transaction_token_balance.owner, + db_transaction_token_balance.owner + ); + } + + #[test] + fn test_transform_transaction_token_balance() { + let transaction_token_balance = TransactionTokenBalance { + account_index: 3, + mint: Pubkey::new_unique().to_string(), + ui_token_amount: UiTokenAmount { + ui_amount: Some(0.42), + decimals: 2, + amount: "42".to_string(), + ui_amount_string: "0.42".to_string(), + }, + owner: Pubkey::new_unique().to_string(), + }; + + let db_transaction_token_balance = + DbTransactionTokenBalance::from(&transaction_token_balance); + + check_transaction_token_balance_equality( + &transaction_token_balance, + &db_transaction_token_balance, + ); + } + + fn check_token_balances( + token_balances: &Option>, + db_token_balances: &Option>, + ) { + assert_eq!( + token_balances + .as_ref() + .map(|token_balances| token_balances.len()), + db_token_balances + .as_ref() + .map(|token_balances| token_balances.len()), + ); + + if token_balances.is_some() { + for i in 0..token_balances.as_ref().unwrap().len() { + check_transaction_token_balance_equality( + &token_balances.as_ref().unwrap()[i], + &db_token_balances.as_ref().unwrap()[i], + ); + } + } + } + + fn check_transaction_status_meta( + transaction_status_meta: &TransactionStatusMeta, + db_transaction_status_meta: &DbTransactionStatusMeta, + ) { + assert_eq!( + get_transaction_error(&transaction_status_meta.status), + db_transaction_status_meta.error + ); + assert_eq!( + transaction_status_meta.fee, + db_transaction_status_meta.fee as u64 + ); + assert_eq!( + transaction_status_meta.pre_balances.len(), + db_transaction_status_meta.pre_balances.len() + ); + + for i in 0..transaction_status_meta.pre_balances.len() { + assert_eq!( + transaction_status_meta.pre_balances[i], + db_transaction_status_meta.pre_balances[i] as u64 + ); + } + assert_eq!( + transaction_status_meta.post_balances.len(), + db_transaction_status_meta.post_balances.len() + ); + for i in 0..transaction_status_meta.post_balances.len() { + assert_eq!( + transaction_status_meta.post_balances[i], + db_transaction_status_meta.post_balances[i] as u64 + ); + } + assert_eq!( + transaction_status_meta + .inner_instructions + .as_ref() + .map(|inner_instructions| inner_instructions.len()), + db_transaction_status_meta + .inner_instructions + .as_ref() + .map(|inner_instructions| inner_instructions.len()), + ); + + if transaction_status_meta.inner_instructions.is_some() { + for i in 0..transaction_status_meta + .inner_instructions + .as_ref() + .unwrap() + .len() + { + check_inner_instructions_equality( + &transaction_status_meta.inner_instructions.as_ref().unwrap()[i], + &db_transaction_status_meta + .inner_instructions + .as_ref() + .unwrap()[i], + ); + } + } + + assert_eq!( + transaction_status_meta + .log_messages + .as_ref() + .map(|log_messages| log_messages.len()), + db_transaction_status_meta + .log_messages + .as_ref() + .map(|log_messages| log_messages.len()), + ); + + if transaction_status_meta.log_messages.is_some() { + for i in 0..transaction_status_meta.log_messages.as_ref().unwrap().len() { + assert_eq!( + &transaction_status_meta.log_messages.as_ref().unwrap()[i], + &db_transaction_status_meta.log_messages.as_ref().unwrap()[i] + ); + } + } + + check_token_balances( + &transaction_status_meta.pre_token_balances, + &db_transaction_status_meta.pre_token_balances, + ); + + check_token_balances( + &transaction_status_meta.post_token_balances, + &db_transaction_status_meta.post_token_balances, + ); + + assert_eq!( + transaction_status_meta + .rewards + .as_ref() + .map(|rewards| rewards.len()), + db_transaction_status_meta + .rewards + .as_ref() + .map(|rewards| rewards.len()), + ); + + if transaction_status_meta.rewards.is_some() { + for i in 0..transaction_status_meta.rewards.as_ref().unwrap().len() { + check_reward_equality( + &transaction_status_meta.rewards.as_ref().unwrap()[i], + &db_transaction_status_meta.rewards.as_ref().unwrap()[i], + ); + } + } + } + + fn build_transaction_status_meta() -> TransactionStatusMeta { + TransactionStatusMeta { + status: Ok(()), + fee: 23456, + pre_balances: vec![11, 22, 33], + post_balances: vec![44, 55, 66], + inner_instructions: Some(vec![InnerInstructions { + index: 0, + instructions: vec![ + CompiledInstruction { + program_id_index: 0, + accounts: vec![1, 2, 3], + data: vec![4, 5, 6], + }, + CompiledInstruction { + program_id_index: 1, + accounts: vec![12, 13, 14], + data: vec![24, 25, 26], + }, + ], + }]), + log_messages: Some(vec!["message1".to_string(), "message2".to_string()]), + pre_token_balances: Some(vec![ + TransactionTokenBalance { + account_index: 3, + mint: Pubkey::new_unique().to_string(), + ui_token_amount: UiTokenAmount { + ui_amount: Some(0.42), + decimals: 2, + amount: "42".to_string(), + ui_amount_string: "0.42".to_string(), + }, + owner: Pubkey::new_unique().to_string(), + }, + TransactionTokenBalance { + account_index: 2, + mint: Pubkey::new_unique().to_string(), + ui_token_amount: UiTokenAmount { + ui_amount: Some(0.38), + decimals: 2, + amount: "38".to_string(), + ui_amount_string: "0.38".to_string(), + }, + owner: Pubkey::new_unique().to_string(), + }, + ]), + post_token_balances: Some(vec![ + TransactionTokenBalance { + account_index: 3, + mint: Pubkey::new_unique().to_string(), + ui_token_amount: UiTokenAmount { + ui_amount: Some(0.82), + decimals: 2, + amount: "82".to_string(), + ui_amount_string: "0.82".to_string(), + }, + owner: Pubkey::new_unique().to_string(), + }, + TransactionTokenBalance { + account_index: 2, + mint: Pubkey::new_unique().to_string(), + ui_token_amount: UiTokenAmount { + ui_amount: Some(0.48), + decimals: 2, + amount: "48".to_string(), + ui_amount_string: "0.48".to_string(), + }, + owner: Pubkey::new_unique().to_string(), + }, + ]), + rewards: Some(vec![ + Reward { + pubkey: Pubkey::new_unique().to_string(), + lamports: 1234, + post_balance: 45678, + reward_type: Some(RewardType::Fee), + commission: Some(12), + }, + Reward { + pubkey: Pubkey::new_unique().to_string(), + lamports: 234, + post_balance: 324, + reward_type: Some(RewardType::Staking), + commission: Some(11), + }, + ]), + } + } + + #[test] + fn test_transform_transaction_status_meta() { + let transaction_status_meta = build_transaction_status_meta(); + let db_transaction_status_meta = DbTransactionStatusMeta::from(&transaction_status_meta); + check_transaction_status_meta(&transaction_status_meta, &db_transaction_status_meta); + } + + fn check_message_header_equality( + message_header: &MessageHeader, + db_message_header: &DbTransactionMessageHeader, + ) { + assert_eq!( + message_header.num_readonly_signed_accounts, + db_message_header.num_readonly_signed_accounts as u8 + ); + assert_eq!( + message_header.num_readonly_unsigned_accounts, + db_message_header.num_readonly_unsigned_accounts as u8 + ); + assert_eq!( + message_header.num_required_signatures, + db_message_header.num_required_signatures as u8 + ); + } + + #[test] + fn test_transform_transaction_message_header() { + let message_header = MessageHeader { + num_readonly_signed_accounts: 1, + num_readonly_unsigned_accounts: 2, + num_required_signatures: 3, + }; + + let db_message_header = DbTransactionMessageHeader::from(&message_header); + check_message_header_equality(&message_header, &db_message_header) + } + + fn check_transaction_message_equality(message: &Message, db_message: &DbTransactionMessage) { + check_message_header_equality(&message.header, &db_message.header); + assert_eq!(message.account_keys.len(), db_message.account_keys.len()); + for i in 0..message.account_keys.len() { + assert_eq!(message.account_keys[i].as_ref(), db_message.account_keys[i]); + } + assert_eq!(message.instructions.len(), db_message.instructions.len()); + for i in 0..message.instructions.len() { + check_compiled_instruction_equality( + &message.instructions[i], + &db_message.instructions[i], + ); + } + } + + fn build_message() -> Message { + Message { + header: MessageHeader { + num_readonly_signed_accounts: 11, + num_readonly_unsigned_accounts: 12, + num_required_signatures: 13, + }, + account_keys: vec![Pubkey::new_unique(), Pubkey::new_unique()], + recent_blockhash: Hash::new_unique(), + instructions: vec![ + CompiledInstruction { + program_id_index: 0, + accounts: vec![1, 2, 3], + data: vec![4, 5, 6], + }, + CompiledInstruction { + program_id_index: 3, + accounts: vec![11, 12, 13], + data: vec![14, 15, 16], + }, + ], + } + } + + #[test] + fn test_transform_transaction_message() { + let message = build_message(); + + let db_message = DbTransactionMessage::from(&message); + check_transaction_message_equality(&message, &db_message); + } + + fn check_transaction_messagev0_equality( + message: &v0::Message, + db_message: &DbTransactionMessageV0, + ) { + check_message_header_equality(&message.header, &db_message.header); + assert_eq!(message.account_keys.len(), db_message.account_keys.len()); + for i in 0..message.account_keys.len() { + assert_eq!(message.account_keys[i].as_ref(), db_message.account_keys[i]); + } + assert_eq!(message.instructions.len(), db_message.instructions.len()); + for i in 0..message.instructions.len() { + check_compiled_instruction_equality( + &message.instructions[i], + &db_message.instructions[i], + ); + } + assert_eq!( + message.address_map_indexes.len(), + db_message.address_map_indexes.len() + ); + for i in 0..message.address_map_indexes.len() { + check_address_map_indexes_equality( + &message.address_map_indexes[i], + &db_message.address_map_indexes[i], + ); + } + } + + fn build_transaction_messagev0() -> v0::Message { + v0::Message { + header: MessageHeader { + num_readonly_signed_accounts: 2, + num_readonly_unsigned_accounts: 2, + num_required_signatures: 3, + }, + account_keys: vec![ + Pubkey::new_unique(), + Pubkey::new_unique(), + Pubkey::new_unique(), + Pubkey::new_unique(), + Pubkey::new_unique(), + ], + recent_blockhash: Hash::new_unique(), + instructions: vec![ + CompiledInstruction { + program_id_index: 1, + accounts: vec![1, 2, 3], + data: vec![4, 5, 6], + }, + CompiledInstruction { + program_id_index: 2, + accounts: vec![0, 1, 2], + data: vec![14, 15, 16], + }, + ], + address_map_indexes: vec![ + AddressMapIndexes { + writable: vec![0], + readonly: vec![1, 2], + }, + AddressMapIndexes { + writable: vec![1], + readonly: vec![0, 2], + }, + ], + } + } + + #[test] + fn test_transform_transaction_messagev0() { + let message = build_transaction_messagev0(); + + let db_message = DbTransactionMessageV0::from(&message); + check_transaction_messagev0_equality(&message, &db_message); + } + + fn check_mapped_addresses( + mapped_addresses: &MappedAddresses, + db_mapped_addresses: &DbMappedAddresses, + ) { + assert_eq!( + mapped_addresses.writable.len(), + db_mapped_addresses.writable.len() + ); + for i in 0..mapped_addresses.writable.len() { + assert_eq!( + mapped_addresses.writable[i].as_ref(), + db_mapped_addresses.writable[i] + ); + } + + assert_eq!( + mapped_addresses.readonly.len(), + db_mapped_addresses.readonly.len() + ); + for i in 0..mapped_addresses.readonly.len() { + assert_eq!( + mapped_addresses.readonly[i].as_ref(), + db_mapped_addresses.readonly[i] + ); + } + } + + fn check_mapped_message_equality(message: &MappedMessage, db_message: &DbMappedMessage) { + check_transaction_messagev0_equality(&message.message, &db_message.message); + check_mapped_addresses(&message.mapped_addresses, &db_message.mapped_addresses); + } + + #[test] + fn test_transform_mapped_message() { + let message = MappedMessage { + message: build_transaction_messagev0(), + mapped_addresses: MappedAddresses { + writable: vec![Pubkey::new_unique(), Pubkey::new_unique()], + readonly: vec![Pubkey::new_unique(), Pubkey::new_unique()], + }, + }; + + let db_message = DbMappedMessage::from(&message); + check_mapped_message_equality(&message, &db_message); + } + + fn check_transaction( + slot: u64, + transaction: &ReplicaTransactionInfo, + db_transaction: &DbTransaction, + ) { + assert_eq!(transaction.signature.as_ref(), db_transaction.signature); + assert_eq!(transaction.is_vote, db_transaction.is_vote); + assert_eq!(slot, db_transaction.slot as u64); + match transaction.transaction.message() { + SanitizedMessage::Legacy(message) => { + assert_eq!(db_transaction.message_type, 0); + check_transaction_message_equality( + message, + db_transaction.legacy_message.as_ref().unwrap(), + ); + } + SanitizedMessage::V0(message) => { + assert_eq!(db_transaction.message_type, 1); + check_mapped_message_equality( + message, + db_transaction.v0_mapped_message.as_ref().unwrap(), + ); + } + } + + assert_eq!( + transaction.transaction.signatures().len(), + db_transaction.signatures.len() + ); + + for i in 0..transaction.transaction.signatures().len() { + assert_eq!( + transaction.transaction.signatures()[i].as_ref(), + db_transaction.signatures[i] + ); + } + + assert_eq!( + transaction.transaction.message_hash().as_ref(), + db_transaction.message_hash + ); + + check_transaction_status_meta(transaction.transaction_status_meta, &db_transaction.meta); + } + + fn build_test_transaction_legacy() -> Transaction { + let keypair1 = Keypair::new(); + let pubkey1 = keypair1.pubkey(); + let zero = Hash::default(); + system_transaction::transfer(&keypair1, &pubkey1, 42, zero) + } + + #[test] + fn test_build_db_transaction_legacy() { + let signature = Signature::new(&[1u8; 64]); + + let message_hash = Hash::new_unique(); + let transaction = build_test_transaction_legacy(); + + let transaction = VersionedTransaction::from(transaction); + + let transaction = + SanitizedTransaction::try_create(transaction, message_hash, Some(true), |_| { + Err(TransactionError::UnsupportedVersion) + }) + .unwrap(); + + let transaction_status_meta = build_transaction_status_meta(); + let transaction_info = ReplicaTransactionInfo { + signature: &signature, + is_vote: false, + transaction: &transaction, + transaction_status_meta: &transaction_status_meta, + }; + + let slot = 54; + let db_transaction = build_db_transaction(slot, &transaction_info); + check_transaction(slot, &transaction_info, &db_transaction); + } + + fn build_test_transaction_v0() -> VersionedTransaction { + VersionedTransaction { + signatures: vec![ + Signature::new(&[1u8; 64]), + Signature::new(&[2u8; 64]), + Signature::new(&[3u8; 64]), + ], + message: VersionedMessage::V0(build_transaction_messagev0()), + } + } + + #[test] + fn test_build_db_transaction_v0() { + let signature = Signature::new(&[1u8; 64]); + + let message_hash = Hash::new_unique(); + let transaction = build_test_transaction_v0(); + + transaction.sanitize().unwrap(); + + let transaction = + SanitizedTransaction::try_create(transaction, message_hash, Some(true), |_message| { + Ok(MappedAddresses { + writable: vec![Pubkey::new_unique(), Pubkey::new_unique()], + readonly: vec![Pubkey::new_unique(), Pubkey::new_unique()], + }) + }) + .unwrap(); + + let transaction_status_meta = build_transaction_status_meta(); + let transaction_info = ReplicaTransactionInfo { + signature: &signature, + is_vote: true, + transaction: &transaction, + transaction_status_meta: &transaction_status_meta, + }; + + let slot = 54; + let db_transaction = build_db_transaction(slot, &transaction_info); + check_transaction(slot, &transaction_info, &db_transaction); + } +}