hash account state on store (#5573)
This commit is contained in:
@@ -21,6 +21,7 @@
|
||||
use crate::accounts_index::{AccountsIndex, Fork};
|
||||
use crate::append_vec::{AppendVec, StorageMeta, StoredAccount};
|
||||
use bincode::{deserialize_from, serialize_into};
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use fs_extra::dir::CopyOptions;
|
||||
use log::*;
|
||||
use rand::{thread_rng, Rng};
|
||||
@@ -32,7 +33,10 @@ use serde::{Deserialize, Serialize};
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_rayon_threadlimit::get_thread_count;
|
||||
use solana_sdk::account::Account;
|
||||
use solana_sdk::bank_hash::BankHash;
|
||||
use solana_sdk::hash::{Hash, Hasher};
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::sysvar;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt;
|
||||
use std::io::{BufReader, Cursor, Error as IOError, ErrorKind, Read, Result as IOResult};
|
||||
@@ -333,6 +337,8 @@ impl<'a> Serialize for AccountsDBSerialize<'a> {
|
||||
let account_storage_serialize = AccountStorageSerialize::new(&*storage, self.slot);
|
||||
serialize_into(&mut wr, &account_storage_serialize).map_err(Error::custom)?;
|
||||
serialize_into(&mut wr, &version).map_err(Error::custom)?;
|
||||
let fork_hashes = self.accounts_db.fork_hashes.read().unwrap();
|
||||
serialize_into(&mut wr, &*fork_hashes).map_err(Error::custom)?;
|
||||
let len = wr.position() as usize;
|
||||
serializer.serialize_bytes(&wr.into_inner()[..len])
|
||||
}
|
||||
@@ -365,7 +371,12 @@ pub struct AccountsDB {
|
||||
/// Thread pool used for par_iter
|
||||
pub thread_pool: ThreadPool,
|
||||
|
||||
/// Number of append vecs to create to maximize parallelism when scanning
|
||||
/// the accounts
|
||||
min_num_stores: usize,
|
||||
|
||||
/// fork to BankHash and a status flag to indicate if the hash has been initialized or not
|
||||
pub fork_hashes: RwLock<HashMap<Fork, (bool, BankHash)>>,
|
||||
}
|
||||
|
||||
impl Default for AccountsDB {
|
||||
@@ -385,6 +396,7 @@ impl Default for AccountsDB {
|
||||
.build()
|
||||
.unwrap(),
|
||||
min_num_stores: num_threads,
|
||||
fork_hashes: RwLock::new(HashMap::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -489,6 +501,10 @@ impl AccountsDB {
|
||||
let version: u64 = deserialize_from(&mut stream)
|
||||
.map_err(|_| AccountsDB::get_io_error("write version deserialize error"))?;
|
||||
|
||||
let fork_hashes: HashMap<Fork, (bool, BankHash)> = deserialize_from(&mut stream)
|
||||
.map_err(|_| AccountsDB::get_io_error("fork hashes deserialize error"))?;
|
||||
*self.fork_hashes.write().unwrap() = fork_hashes;
|
||||
|
||||
// Process deserialized data, set necessary fields in self
|
||||
*self.paths.write().unwrap() = local_account_paths;
|
||||
let max_id: usize = *storage
|
||||
@@ -500,12 +516,6 @@ impl AccountsDB {
|
||||
|
||||
{
|
||||
let mut stores = self.storage.write().unwrap();
|
||||
/*if let Some((_, store0)) = storage.0.remove_entry(&0) {
|
||||
let fork_storage0 = stores.0.entry(0).or_insert_with(HashMap::new);
|
||||
for (id, store) in store0.iter() {
|
||||
fork_storage0.insert(*id, store.clone());
|
||||
}
|
||||
}*/
|
||||
stores.0.extend(storage.0);
|
||||
}
|
||||
|
||||
@@ -599,6 +609,14 @@ impl AccountsDB {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn set_hash(&self, slot: Fork, parent_slot: Fork) {
|
||||
let mut fork_hashes = self.fork_hashes.write().unwrap();
|
||||
let hash = *fork_hashes
|
||||
.get(&parent_slot)
|
||||
.expect("accounts_db::set_hash::no parent slot");
|
||||
fork_hashes.insert(slot, (false, hash.1));
|
||||
}
|
||||
|
||||
pub fn load(
|
||||
storage: &AccountStorage,
|
||||
ancestors: &HashMap<Fork, usize>,
|
||||
@@ -702,7 +720,42 @@ impl AccountsDB {
|
||||
}
|
||||
}
|
||||
|
||||
fn store_accounts(&self, fork_id: Fork, accounts: &[(&Pubkey, &Account)]) -> Vec<AccountInfo> {
|
||||
pub fn hash_stored_account(fork: Fork, account: &StoredAccount) -> Hash {
|
||||
Self::hash_account_data(
|
||||
fork,
|
||||
account.balance.lamports,
|
||||
account.data,
|
||||
&account.meta.pubkey,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn hash_account(fork: Fork, account: &Account, pubkey: &Pubkey) -> Hash {
|
||||
Self::hash_account_data(fork, account.lamports, &account.data, pubkey)
|
||||
}
|
||||
|
||||
pub fn hash_account_data(fork: Fork, lamports: u64, data: &[u8], pubkey: &Pubkey) -> Hash {
|
||||
let mut hasher = Hasher::default();
|
||||
let mut buf = [0u8; 8];
|
||||
|
||||
LittleEndian::write_u64(&mut buf[..], lamports);
|
||||
hasher.hash(&buf);
|
||||
|
||||
LittleEndian::write_u64(&mut buf[..], fork);
|
||||
hasher.hash(&buf);
|
||||
|
||||
hasher.hash(&data);
|
||||
|
||||
hasher.hash(&pubkey.as_ref());
|
||||
|
||||
hasher.result()
|
||||
}
|
||||
|
||||
fn store_accounts(
|
||||
&self,
|
||||
fork_id: Fork,
|
||||
accounts: &[(&Pubkey, &Account)],
|
||||
hashes: &[Hash],
|
||||
) -> Vec<AccountInfo> {
|
||||
let with_meta: Vec<(StorageMeta, &Account)> = accounts
|
||||
.iter()
|
||||
.map(|(pubkey, account)| {
|
||||
@@ -724,7 +777,9 @@ impl AccountsDB {
|
||||
let mut infos: Vec<AccountInfo> = vec![];
|
||||
while infos.len() < with_meta.len() {
|
||||
let storage = self.find_storage_candidate(fork_id);
|
||||
let rvs = storage.accounts.append_accounts(&with_meta[infos.len()..]);
|
||||
let rvs = storage
|
||||
.accounts
|
||||
.append_accounts(&with_meta[infos.len()..], &hashes);
|
||||
if rvs.is_empty() {
|
||||
storage.set_status(AccountStorageStatus::Full);
|
||||
|
||||
@@ -749,6 +804,40 @@ impl AccountsDB {
|
||||
infos
|
||||
}
|
||||
|
||||
pub fn verify_hash_internal_state(&self, fork: Fork, ancestors: &HashMap<Fork, usize>) -> bool {
|
||||
let mut hash_state = BankHash::default();
|
||||
let hashes: Vec<_> = self.scan_accounts(
|
||||
ancestors,
|
||||
|collector: &mut Vec<BankHash>, option: Option<(&Pubkey, Account, Fork)>| {
|
||||
if let Some((pubkey, account, fork)) = option {
|
||||
if !sysvar::check_id(&account.owner) {
|
||||
let hash = BankHash::from_hash(&Self::hash_account(fork, &account, pubkey));
|
||||
debug!("xoring..{} key: {}", hash, pubkey);
|
||||
collector.push(hash);
|
||||
}
|
||||
}
|
||||
},
|
||||
);
|
||||
for hash in hashes {
|
||||
hash_state.xor(hash);
|
||||
}
|
||||
let fork_hashes = self.fork_hashes.read().unwrap();
|
||||
if let Some((_, state)) = fork_hashes.get(&fork) {
|
||||
hash_state == *state
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn xor_in_hash_state(&self, fork_id: Fork, hash: BankHash) {
|
||||
let mut fork_hashes = self.fork_hashes.write().unwrap();
|
||||
let fork_hash_state = fork_hashes
|
||||
.entry(fork_id)
|
||||
.or_insert((false, BankHash::default()));
|
||||
fork_hash_state.1.xor(hash);
|
||||
fork_hash_state.0 = true;
|
||||
}
|
||||
|
||||
fn update_index(
|
||||
&self,
|
||||
fork_id: Fork,
|
||||
@@ -820,10 +909,44 @@ impl AccountsDB {
|
||||
}
|
||||
}
|
||||
|
||||
fn hash_accounts(&self, fork_id: Fork, accounts: &[(&Pubkey, &Account)]) -> Vec<Hash> {
|
||||
let mut hashes = vec![];
|
||||
let mut hash_state = BankHash::default();
|
||||
let mut had_account = false;
|
||||
for (pubkey, account) in accounts {
|
||||
if !sysvar::check_id(&account.owner) {
|
||||
let hash = BankHash::from_hash(&account.hash);
|
||||
let new_hash = Self::hash_account(fork_id, account, pubkey);
|
||||
let new_bank_hash = BankHash::from_hash(&new_hash);
|
||||
debug!(
|
||||
"hash_accounts: key: {} xor {} current: {}",
|
||||
pubkey, hash, hash_state
|
||||
);
|
||||
if !had_account {
|
||||
hash_state = hash;
|
||||
had_account = true;
|
||||
} else {
|
||||
hash_state.xor(hash);
|
||||
}
|
||||
hash_state.xor(new_bank_hash);
|
||||
hashes.push(new_hash);
|
||||
} else {
|
||||
hashes.push(Hash::default());
|
||||
}
|
||||
}
|
||||
|
||||
if had_account {
|
||||
self.xor_in_hash_state(fork_id, hash_state);
|
||||
}
|
||||
hashes
|
||||
}
|
||||
|
||||
/// Store the account update.
|
||||
pub fn store(&self, fork_id: Fork, accounts: &[(&Pubkey, &Account)]) {
|
||||
let hashes = self.hash_accounts(fork_id, accounts);
|
||||
|
||||
let mut store_accounts = Measure::start("store::store_accounts");
|
||||
let infos = self.store_accounts(fork_id, accounts);
|
||||
let infos = self.store_accounts(fork_id, accounts, &hashes);
|
||||
store_accounts.stop();
|
||||
|
||||
let mut update_index = Measure::start("store::update_index");
|
||||
|
Reference in New Issue
Block a user