diff --git a/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs index 6e5c8637c9..7b0163e784 100644 --- a/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs +++ b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs @@ -17,6 +17,7 @@ pub struct ReplicaAccountInfo<'a> { pub executable: bool, pub rent_epoch: u64, pub data: &'a [u8], + pub write_version: u64, } pub enum ReplicaAccountInfoVersions<'a> { diff --git a/accountsdb-plugin-manager/src/accounts_update_notifier.rs b/accountsdb-plugin-manager/src/accounts_update_notifier.rs index 7a835e09fb..d3a0d85143 100644 --- a/accountsdb-plugin-manager/src/accounts_update_notifier.rs +++ b/accountsdb-plugin-manager/src/accounts_update_notifier.rs @@ -9,12 +9,11 @@ use { solana_metrics::*, solana_runtime::{ accounts_update_notifier_interface::AccountsUpdateNotifierInterface, - append_vec::StoredAccountMeta, + append_vec::{StoredAccountMeta, StoredMeta}, }, solana_sdk::{ account::{AccountSharedData, ReadableAccount}, clock::Slot, - pubkey::Pubkey, }, std::sync::{Arc, RwLock}, }; @@ -24,8 +23,8 @@ pub(crate) struct AccountsUpdateNotifierImpl { } impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl { - fn notify_account_update(&self, slot: Slot, pubkey: &Pubkey, account: &AccountSharedData) { - if let Some(account_info) = self.accountinfo_from_shared_account_data(pubkey, account) { + fn notify_account_update(&self, slot: Slot, meta: &StoredMeta, account: &AccountSharedData) { + if let Some(account_info) = self.accountinfo_from_shared_account_data(meta, account) { self.notify_plugins_of_account_update(account_info, slot, false); } } @@ -108,16 +107,17 @@ impl AccountsUpdateNotifierImpl { fn accountinfo_from_shared_account_data<'a>( &self, - pubkey: &'a Pubkey, + meta: &'a StoredMeta, account: &'a AccountSharedData, ) -> Option> { Some(ReplicaAccountInfo { - pubkey: pubkey.as_ref(), + pubkey: meta.pubkey.as_ref(), lamports: account.lamports(), owner: account.owner().as_ref(), executable: account.executable(), rent_epoch: account.rent_epoch(), data: account.data(), + write_version: meta.write_version, }) } @@ -132,6 +132,7 @@ impl AccountsUpdateNotifierImpl { executable: stored_account_meta.account_meta.executable, rent_epoch: stored_account_meta.account_meta.rent_epoch, data: stored_account_meta.data, + write_version: stored_account_meta.meta.write_version, }) } diff --git a/accountsdb-plugin-postgres/scripts/create_schema.sql b/accountsdb-plugin-postgres/scripts/create_schema.sql index 41be52f5b3..1c02253ac7 100644 --- a/accountsdb-plugin-postgres/scripts/create_schema.sql +++ b/accountsdb-plugin-postgres/scripts/create_schema.sql @@ -12,6 +12,7 @@ CREATE TABLE account ( executable BOOL NOT NULL, rent_epoch BIGINT NOT NULL, data BYTEA, + write_version BIGINT NOT NULL, updated_on TIMESTAMP NOT NULL ); @@ -35,14 +36,15 @@ CREATE TABLE account_audit ( executable BOOL NOT NULL, rent_epoch BIGINT NOT NULL, data BYTEA, + write_version BIGINT NOT NULL, updated_on TIMESTAMP NOT NULL ); CREATE FUNCTION audit_account_update() RETURNS trigger AS $audit_account_update$ BEGIN - INSERT INTO account_audit (pubkey, owner, lamports, slot, executable, rent_epoch, data, updated_on) + INSERT INTO account_audit (pubkey, owner, lamports, slot, executable, rent_epoch, data, write_version, updated_on) VALUES (OLD.pubkey, OLD.owner, OLD.lamports, OLD.slot, - OLD.executable, OLD.rent_epoch, OLD.data, OLD.updated_on); + OLD.executable, OLD.rent_epoch, OLD.data, OLD.write_version, OLD.updated_on); RETURN NEW; END; diff --git a/accountsdb-plugin-postgres/src/postgres_client.rs b/accountsdb-plugin-postgres/src/postgres_client.rs index 1474a6858d..10d22e8cf8 100644 --- a/accountsdb-plugin-postgres/src/postgres_client.rs +++ b/accountsdb-plugin-postgres/src/postgres_client.rs @@ -32,7 +32,7 @@ const MAX_ASYNC_REQUESTS: usize = 40960; const DEFAULT_POSTGRES_PORT: u16 = 5432; const DEFAULT_THREADS_COUNT: usize = 100; const DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE: usize = 10; -const ACCOUNT_COLUMN_COUNT: usize = 8; +const ACCOUNT_COLUMN_COUNT: usize = 9; struct PostgresSqlClientWrapper { client: Client, @@ -63,6 +63,7 @@ pub struct DbAccountInfo { pub rent_epoch: i64, pub data: Vec, pub slot: i64, + pub write_version: i64, } impl DbAccountInfo { @@ -76,6 +77,7 @@ impl DbAccountInfo { rent_epoch: account.rent_epoch() as i64, data, slot: slot as i64, + write_version: account.write_version(), } } } @@ -87,6 +89,7 @@ pub trait ReadableAccountInfo: Sized { fn executable(&self) -> bool; fn rent_epoch(&self) -> i64; fn data(&self) -> &[u8]; + fn write_version(&self) -> i64; } impl ReadableAccountInfo for DbAccountInfo { @@ -113,6 +116,10 @@ impl ReadableAccountInfo for DbAccountInfo { fn data(&self) -> &[u8] { &self.data } + + fn write_version(&self) -> i64 { + self.write_version + } } impl<'a> ReadableAccountInfo for ReplicaAccountInfo<'a> { @@ -139,6 +146,10 @@ impl<'a> ReadableAccountInfo for ReplicaAccountInfo<'a> { fn data(&self) -> &[u8] { self.data } + + fn write_version(&self) -> i64 { + self.write_version as i64 + } } pub trait PostgresClient { @@ -191,11 +202,11 @@ impl SimplePostgresClient { let batch_size = config .batch_size .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE); - let mut stmt = String::from("INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) VALUES"); + let mut stmt = String::from("INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) VALUES"); for j in 0..batch_size { let row = j * ACCOUNT_COLUMN_COUNT; let val_str = format!( - "(${}, ${}, ${}, ${}, ${}, ${}, ${}, ${})", + "(${}, ${}, ${}, ${}, ${}, ${}, ${}, ${}, ${})", row + 1, row + 2, row + 3, @@ -204,6 +215,7 @@ impl SimplePostgresClient { row + 6, row + 7, row + 8, + row + 9, ); if j == 0 { @@ -214,7 +226,8 @@ impl SimplePostgresClient { } let handle_conflict = "ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \ - data=excluded.data, updated_on=excluded.updated_on WHERE acct.slot <= excluded.slot"; + data=excluded.data, write_version=excluded.write_version, updated_on=excluded.updated_on WHERE acct.slot < excluded.slot OR (\ + acct.slot = excluded.slot AND acct.write_version < excluded.write_version)"; stmt = format!("{} {}", stmt, handle_conflict); @@ -238,10 +251,11 @@ impl SimplePostgresClient { client: &mut Client, config: &AccountsDbPluginPostgresConfig, ) -> Result { - let stmt = "INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) \ - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \ + let stmt = "INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \ ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \ - data=excluded.data, updated_on=excluded.updated_on WHERE acct.slot <= excluded.slot"; + data=excluded.data, write_version=excluded.write_version, updated_on=excluded.updated_on WHERE acct.slot < excluded.slot OR (\ + acct.slot = excluded.slot AND acct.write_version < excluded.write_version)"; let stmt = client.prepare(stmt); @@ -277,6 +291,7 @@ impl SimplePostgresClient { &account.executable(), &rent_epoch, &account.data(), + &account.write_version(), &updated_on, ], ); @@ -324,6 +339,7 @@ impl SimplePostgresClient { values.push(&account.executable); values.push(&account.rent_epoch); values.push(&account.data); + values.push(&account.write_version); values.push(&updated_on); } measure.stop(); diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 9c5f009aa0..cd6c51b03e 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -1601,6 +1601,18 @@ impl AccountsDb { ) } + pub fn new_for_tests_with_caching(paths: Vec, cluster_type: &ClusterType) -> Self { + AccountsDb::new_with_config( + paths, + cluster_type, + AccountSecondaryIndexes::default(), + true, + AccountShrinkThreshold::default(), + Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), + None, + ) + } + pub fn new_with_config( paths: Vec, cluster_type: &ClusterType, @@ -1680,6 +1692,13 @@ impl AccountsDb { } } + pub fn new_single_for_tests_with_caching() -> Self { + AccountsDb { + min_num_stores: 0, + ..AccountsDb::new_for_tests_with_caching(Vec::new(), &ClusterType::Development) + } + } + fn new_storage_entry(&self, slot: Slot, path: &Path, size: u64) -> AccountStorageEntry { AccountStorageEntry::new( path, @@ -4833,6 +4852,8 @@ impl AccountsDb { lamports: account.lamports(), }; + self.notify_account_at_accounts_update(slot, meta, &account); + let cached_account = self.accounts_cache.store(slot, &meta.pubkey, account, hash); // hash this account in the bg match &self.sender_bg_hasher { @@ -6227,8 +6248,6 @@ impl AccountsDb { pub fn store_cached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) { self.store(slot, accounts, self.caching_enabled); - - self.notify_account_at_accounts_update(slot, accounts); } /// Store the account update. diff --git a/runtime/src/accounts_db/accountsdb_plugin_utils.rs b/runtime/src/accounts_db/accountsdb_plugin_utils.rs index 2223c8d2e6..f57894ffdb 100644 --- a/runtime/src/accounts_db/accountsdb_plugin_utils.rs +++ b/runtime/src/accounts_db/accountsdb_plugin_utils.rs @@ -1,5 +1,8 @@ use { - crate::{accounts_db::AccountsDb, append_vec::StoredAccountMeta}, + crate::{ + accounts_db::AccountsDb, + append_vec::{StoredAccountMeta, StoredMeta}, + }, solana_measure::measure::Measure, solana_metrics::*, solana_sdk::{account::AccountSharedData, clock::Slot, pubkey::Pubkey}, @@ -59,16 +62,12 @@ impl AccountsDb { pub fn notify_account_at_accounts_update( &self, slot: Slot, - accounts: &[(&Pubkey, &AccountSharedData)], + meta: &StoredMeta, + account: &AccountSharedData, ) { if let Some(accounts_update_notifier) = &self.accounts_update_notifier { let notifier = &accounts_update_notifier.read().unwrap(); - - for account in accounts { - let pubkey = account.0; - let account = account.1; - notifier.notify_account_update(slot, pubkey, account); - } + notifier.notify_account_update(slot, meta, account); } } @@ -154,7 +153,7 @@ pub mod tests { accounts_update_notifier_interface::{ AccountsUpdateNotifier, AccountsUpdateNotifierInterface, }, - append_vec::StoredAccountMeta, + append_vec::{StoredAccountMeta, StoredMeta}, }, dashmap::DashMap, solana_sdk::{ @@ -176,15 +175,20 @@ pub mod tests { #[derive(Debug, Default)] struct AccountsDbTestPlugin { - pub accounts_at_snapshot_restore: DashMap>, + pub accounts_notified: DashMap>, pub is_startup_done: AtomicBool, } impl AccountsUpdateNotifierInterface for AccountsDbTestPlugin { /// Notified when an account is updated at runtime, due to transaction activities - fn notify_account_update(&self, slot: Slot, pubkey: &Pubkey, account: &AccountSharedData) { - self.accounts_at_snapshot_restore - .entry(*pubkey) + fn notify_account_update( + &self, + slot: Slot, + meta: &StoredMeta, + account: &AccountSharedData, + ) { + self.accounts_notified + .entry(meta.pubkey) .or_insert(Vec::default()) .push((slot, account.clone())); } @@ -192,7 +196,7 @@ pub mod tests { /// Notified when the AccountsDb is initialized at start when restored /// from a snapshot. fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) { - self.accounts_at_snapshot_restore + self.accounts_notified .entry(account.meta.pubkey) .or_insert(Vec::default()) .push((slot, account.clone_account())); @@ -241,42 +245,22 @@ pub mod tests { accounts.notify_account_restore_from_snapshot(); let notifier = notifier.write().unwrap(); + assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1); assert_eq!( - notifier - .accounts_at_snapshot_restore - .get(&key1) - .unwrap() - .len(), - 1 - ); - assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0] + notifier.accounts_notified.get(&key1).unwrap()[0] .1 .lamports(), account1_lamports ); + assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0); + assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1); assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0].0, - slot0 - ); - assert_eq!( - notifier - .accounts_at_snapshot_restore - .get(&key2) - .unwrap() - .len(), - 1 - ); - assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0] + notifier.accounts_notified.get(&key2).unwrap()[0] .1 .lamports(), account2_lamports ); - assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0].0, - slot0 - ); + assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0); assert!(notifier.is_startup_done.load(Ordering::Relaxed)); } @@ -318,66 +302,37 @@ pub mod tests { accounts.notify_account_restore_from_snapshot(); let notifier = notifier.write().unwrap(); + assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1); assert_eq!( - notifier - .accounts_at_snapshot_restore - .get(&key1) - .unwrap() - .len(), - 1 - ); - assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0] + notifier.accounts_notified.get(&key1).unwrap()[0] .1 .lamports(), account1_lamports ); + assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot1); + assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1); assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0].0, - slot1 - ); - assert_eq!( - notifier - .accounts_at_snapshot_restore - .get(&key2) - .unwrap() - .len(), - 1 - ); - assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0] + notifier.accounts_notified.get(&key2).unwrap()[0] .1 .lamports(), account2_lamports ); + assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0); + assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1); assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0].0, - slot0 - ); - assert_eq!( - notifier - .accounts_at_snapshot_restore - .get(&key3) - .unwrap() - .len(), - 1 - ); - assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key3).unwrap()[0] + notifier.accounts_notified.get(&key3).unwrap()[0] .1 .lamports(), account3_lamports ); - assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key3).unwrap()[0].0, - slot1 - ); + assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1); assert!(notifier.is_startup_done.load(Ordering::Relaxed)); } #[test] fn test_notify_account_at_accounts_update() { - let mut accounts = AccountsDb::new_single_for_tests(); + let mut accounts = AccountsDb::new_single_for_tests_with_caching(); + let notifier = AccountsDbTestPlugin::default(); let notifier = Arc::new(RwLock::new(notifier)); @@ -411,70 +366,37 @@ pub mod tests { accounts.store_cached(slot1, &[(&key3, &account3)]); let notifier = notifier.write().unwrap(); + assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 2); assert_eq!( - notifier - .accounts_at_snapshot_restore - .get(&key1) - .unwrap() - .len(), - 2 - ); - assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0] + notifier.accounts_notified.get(&key1).unwrap()[0] .1 .lamports(), account1_lamports1 ); + assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0); assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0].0, - slot0 - ); - assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[1] + notifier.accounts_notified.get(&key1).unwrap()[1] .1 .lamports(), account1_lamports2 ); - assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[1].0, - slot1 - ); + assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[1].0, slot1); + assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1); assert_eq!( - notifier - .accounts_at_snapshot_restore - .get(&key2) - .unwrap() - .len(), - 1 - ); - assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0] + notifier.accounts_notified.get(&key2).unwrap()[0] .1 .lamports(), account2_lamports ); + assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0); + assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1); assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0].0, - slot0 - ); - assert_eq!( - notifier - .accounts_at_snapshot_restore - .get(&key3) - .unwrap() - .len(), - 1 - ); - assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key3).unwrap()[0] + notifier.accounts_notified.get(&key3).unwrap()[0] .1 .lamports(), account3_lamports ); - assert_eq!( - notifier.accounts_at_snapshot_restore.get(&key3).unwrap()[0].0, - slot1 - ); + assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1); } } diff --git a/runtime/src/accounts_update_notifier_interface.rs b/runtime/src/accounts_update_notifier_interface.rs index bfd62e9b15..b42c6d5c22 100644 --- a/runtime/src/accounts_update_notifier_interface.rs +++ b/runtime/src/accounts_update_notifier_interface.rs @@ -1,12 +1,12 @@ use { - crate::append_vec::StoredAccountMeta, - solana_sdk::{account::AccountSharedData, clock::Slot, pubkey::Pubkey}, + crate::append_vec::{StoredAccountMeta, StoredMeta}, + solana_sdk::{account::AccountSharedData, clock::Slot}, std::sync::{Arc, RwLock}, }; pub trait AccountsUpdateNotifierInterface: std::fmt::Debug { /// Notified when an account is updated at runtime, due to transaction activities - fn notify_account_update(&self, slot: Slot, pubkey: &Pubkey, account: &AccountSharedData); + fn notify_account_update(&self, slot: Slot, meta: &StoredMeta, account: &AccountSharedData); /// Notified when the AccountsDb is initialized at start when restored /// from a snapshot.