Fix parent record locks usage in child banks (#4159)

* Introduce record locks on txs that will be recorded

* Add tests for LockedAccountsResults

* Fix broken bench

* Exit process_entries on detecting conflicting locks within same entry
This commit is contained in:
carllin
2019-05-07 15:51:35 -07:00
committed by GitHub
parent 55e3b7d380
commit 69eeb7cf08
6 changed files with 361 additions and 113 deletions

View File

@@ -17,6 +17,7 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::transaction::Result;
use solana_sdk::transaction::{Transaction, TransactionError};
use std::borrow::Borrow;
use std::env;
use std::fs::remove_dir_all;
use std::iter::once;
@@ -24,16 +25,28 @@ use std::ops::Neg;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::sleep;
use std::time::Duration;
const ACCOUNTSDB_DIR: &str = "accountsdb";
const NUM_ACCOUNT_DIRS: usize = 4;
const WAIT_FOR_PARENT_MS: u64 = 5;
type AccountLocks = (
// Locks for the current bank
Arc<Mutex<HashSet<Pubkey>>>,
// Any unreleased locks from all parent/grandparent banks. We use Arc<Mutex> to
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum AccountLockType {
AccountLock,
RecordLock,
}
type AccountLocks = Mutex<HashSet<Pubkey>>;
// Locks for accounts that are currently being recorded + committed
type RecordLocks = (
// Record Locks for the current bank
Arc<AccountLocks>,
// Any unreleased record locks from all parent/grandparent banks. We use Arc<Mutex> to
// avoid copies when calling new_from_parent().
Vec<Arc<Mutex<HashSet<Pubkey>>>>,
Vec<Arc<AccountLocks>>,
);
/// This structure handles synchronization for db
@@ -43,7 +56,10 @@ pub struct Accounts {
pub accounts_db: Arc<AccountsDB>,
/// set of accounts which are currently in the pipeline
account_locks: Mutex<AccountLocks>,
account_locks: AccountLocks,
/// set of accounts which are about to record + commit
record_locks: Mutex<RecordLocks>,
/// List of persistent stores
paths: String,
@@ -103,7 +119,8 @@ impl Accounts {
let accounts_db = Arc::new(AccountsDB::new(&paths));
Accounts {
accounts_db,
account_locks: Mutex::new((Arc::new(Mutex::new(HashSet::new())), vec![])),
account_locks: Mutex::new(HashSet::new()),
record_locks: Mutex::new((Arc::new(Mutex::new(HashSet::new())), vec![])),
paths,
own_paths,
}
@@ -111,29 +128,30 @@ impl Accounts {
pub fn new_from_parent(parent: &Accounts) -> Self {
let accounts_db = parent.accounts_db.clone();
let parent_locks: Vec<_> = {
let (ref parent_locks, ref mut grandparent_locks) =
*parent.account_locks.lock().unwrap();
let parent_record_locks: Vec<_> = {
let (ref record_locks, ref mut grandparent_record_locks) =
*parent.record_locks.lock().unwrap();
// Copy all unreleased parent locks and the much more unlikely (but still possible)
// grandparent account locks into the new child. Note that by the time this function
// is called, no further transactions will be recorded on the parent bank, so even if
// banking threads grab account locks on this parent bank, none of those results will
// be committed.
// Note that when creating a child bank, we only care about the locks that are held for
// accounts that are in txs that are currently recording + committing, because other
// incoming txs on this bank that are not yet recording will not make it to bank commit.
//
// Thus:
// 1) The child doesn't need to care about potential "future" account locks on its parent
// bank that the parent does not currently hold.
// 2) The parent doesn't need to retain any of the locks other than the ones it owns so
// 2) The child only needs the currently held "record locks" from the parent.
// 3) The parent doesn't need to retain any of the locks other than the ones it owns so
// that unlock() can be called later (the grandparent locks can be given to the child).
once(parent_locks.clone())
.chain(grandparent_locks.drain(..))
once(record_locks.clone())
.chain(grandparent_record_locks.drain(..))
.filter(|a| !a.lock().unwrap().is_empty())
.collect()
};
Accounts {
accounts_db,
account_locks: Mutex::new((Arc::new(Mutex::new(HashSet::new())), parent_locks)),
account_locks: Mutex::new(HashSet::new()),
record_locks: Mutex::new((Arc::new(Mutex::new(HashSet::new())), parent_record_locks)),
paths: parent.paths.clone(),
own_paths: parent.own_paths,
}
@@ -330,12 +348,11 @@ impl Accounts {
}
fn lock_account(
(fork_locks, parent_locks): &mut AccountLocks,
(fork_locks, parent_locks): (&mut HashSet<Pubkey>, &mut Vec<Arc<AccountLocks>>),
keys: &[Pubkey],
error_counters: &mut ErrorCounters,
) -> Result<()> {
// Copy all the accounts
let mut fork_locks = fork_locks.lock().unwrap();
for k in keys {
let is_locked = {
if fork_locks.contains(k) {
@@ -344,16 +361,25 @@ impl Accounts {
// Check parent locks. As soon as a set of parent locks is empty,
// we can remove it from the list b/c that means the parent has
// released the locks.
let mut is_locked = false;
parent_locks.retain(|p| {
let p = p.lock().unwrap();
if p.contains(k) {
is_locked = true;
loop {
{
// If a parent bank holds a record lock for this account, then loop
// until that lock is released
let p = p.lock().unwrap();
if !p.contains(k) {
break;
}
}
// If a parent is currently recording for this key, then drop lock and wait
sleep(Duration::from_millis(WAIT_FOR_PARENT_MS));
}
let p = p.lock().unwrap();
!p.is_empty()
});
is_locked
false
}
};
if is_locked {
@@ -368,6 +394,17 @@ impl Accounts {
Ok(())
}
fn lock_record_account(record_locks: &AccountLocks, keys: &[Pubkey]) {
let mut fork_locks = record_locks.lock().unwrap();
for k in keys {
// The fork locks should always be a subset of the account locks, so
// the account locks should prevent record locks from ever touching the
// same accounts
assert!(!fork_locks.contains(k));
fork_locks.insert(*k);
}
}
fn unlock_account(tx: &Transaction, result: &Result<()>, locks: &mut HashSet<Pubkey>) {
match result {
Err(TransactionError::AccountInUse) => (),
@@ -379,6 +416,15 @@ impl Accounts {
}
}
fn unlock_record_account<I>(tx: &I, locks: &mut HashSet<Pubkey>)
where
I: Borrow<Transaction>,
{
for k in &tx.borrow().message().account_keys {
locks.remove(k);
}
}
fn hash_account(stored_account: &StoredAccount) -> Hash {
let mut hasher = Hasher::default();
hasher.hash(&serialize(&stored_account.balance).unwrap());
@@ -414,15 +460,18 @@ impl Accounts {
/// This function will prevent multiple threads from modifying the same account state at the
/// same time
#[must_use]
pub fn lock_accounts(&self, txs: &[Transaction]) -> Vec<Result<()>> {
let mut account_locks = self.account_locks.lock().unwrap();
pub fn lock_accounts<I>(&self, txs: &[I]) -> Vec<Result<()>>
where
I: Borrow<Transaction>,
{
let (_, ref mut parent_record_locks) = *self.record_locks.lock().unwrap();
let mut error_counters = ErrorCounters::default();
let rv = txs
.iter()
.map(|tx| {
Self::lock_account(
&mut account_locks,
&tx.message().account_keys,
(&mut self.account_locks.lock().unwrap(), parent_record_locks),
&tx.borrow().message().account_keys,
&mut error_counters,
)
})
@@ -436,13 +485,36 @@ impl Accounts {
rv
}
pub fn lock_record_accounts<I>(&self, txs: &[I])
where
I: Borrow<Transaction>,
{
let record_locks = self.record_locks.lock().unwrap();
for tx in txs {
Self::lock_record_account(&record_locks.0, &tx.borrow().message().account_keys);
}
}
/// Once accounts are unlocked, new transactions that modify that state can enter the pipeline
pub fn unlock_accounts(&self, txs: &[Transaction], results: &[Result<()>]) {
let (ref my_locks, _) = *self.account_locks.lock().unwrap();
pub fn unlock_accounts<I>(&self, txs: &[I], results: &[Result<()>])
where
I: Borrow<Transaction>,
{
let my_locks = &mut self.account_locks.lock().unwrap();
debug!("bank unlock accounts");
txs.iter().zip(results.iter()).for_each(|(tx, result)| {
Self::unlock_account(tx, result, &mut my_locks.lock().unwrap())
});
txs.iter()
.zip(results.iter())
.for_each(|(tx, result)| Self::unlock_account(tx.borrow(), result, my_locks));
}
pub fn unlock_record_accounts<I>(&self, txs: &[I])
where
I: Borrow<Transaction>,
{
let (ref my_record_locks, _) = *self.record_locks.lock().unwrap();
for tx in txs {
Self::unlock_record_account(tx, &mut my_record_locks.lock().unwrap())
}
}
pub fn has_accounts(&self, fork: Fork) -> bool {
@@ -504,6 +576,8 @@ mod tests {
use solana_sdk::instruction::CompiledInstruction;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::transaction::Transaction;
use std::thread::{sleep, Builder};
use std::time::{Duration, Instant};
fn load_accounts_with_fee(
tx: Transaction,
@@ -957,69 +1031,83 @@ mod tests {
}
#[test]
fn test_parent_locked_accounts() {
fn test_parent_locked_record_accounts() {
let mut parent = Accounts::new(None);
let locked_pubkey = Keypair::new().pubkey();
let mut locked_accounts = HashSet::new();
locked_accounts.insert(locked_pubkey);
parent.account_locks = Mutex::new((Arc::new(Mutex::new(locked_accounts.clone())), vec![]));
parent.record_locks = Mutex::new((Arc::new(Mutex::new(locked_accounts.clone())), vec![]));
let parent = Arc::new(parent);
let child = Accounts::new_from_parent(&parent);
// Make sure child contains the parent's locked accounts
// Make sure child record locks contains the parent's locked record accounts
{
let (_, ref mut parent_account_locks) = *child.account_locks.lock().unwrap();
assert_eq!(parent_account_locks.len(), 1);
assert_eq!(locked_accounts, *parent_account_locks[0].lock().unwrap());
let (_, ref parent_record_locks) = *child.record_locks.lock().unwrap();
assert_eq!(parent_record_locks.len(), 1);
assert_eq!(locked_accounts, *parent_record_locks[0].lock().unwrap());
}
// Make sure locking on same account in the child fails
let parent_ = parent.clone();
let parent_thread = Builder::new()
.name("exit".to_string())
.spawn(move || {
sleep(Duration::from_secs(2));
// Unlock the accounts in the parent
{
let (ref parent_record_locks, _) = *parent_.record_locks.lock().unwrap();
parent_record_locks.lock().unwrap().clear();
}
})
.unwrap();
// Function will block until the parent_thread unlocks the parent's record lock
let now = Instant::now();
assert_eq!(
Accounts::lock_account(
&mut child.account_locks.lock().unwrap(),
(
&mut child.account_locks.lock().unwrap(),
&mut child.record_locks.lock().unwrap().1
),
&vec![locked_pubkey],
&mut ErrorCounters::default()
),
Err(TransactionError::AccountInUse)
Ok(())
);
// Make sure that the function blocked
assert!(now.elapsed().as_secs() > 1);
parent_thread.join().unwrap();
// Unlock the accounts in the parent
{
let (ref parent_accounts, _) = *parent.account_locks.lock().unwrap();
parent_accounts.lock().unwrap().clear();
// Check the lock was successfully obtained
let child_account_locks = &mut child.account_locks.lock().unwrap();
let parent_record_locks = child.record_locks.lock().unwrap();
assert_eq!(child_account_locks.len(), 1);
// Make sure child removed the parent's record locks after the parent had
// released all its locks
assert!(parent_record_locks.1.is_empty());
// After all the checks pass, clear the account we just locked from the
// set of locks
child_account_locks.clear();
}
// Make sure child removes the parent locked_accounts after the parent has
// released all its locks
assert!(Accounts::lock_account(
&mut child.account_locks.lock().unwrap(),
&vec![locked_pubkey],
&mut ErrorCounters::default()
)
.is_ok());
// Make sure calling new_from_parent() on the child bank also cleans up the copy of old locked
// parent accounts, in case the child doesn't call lock_account() after a parent has
// released their account locks
{
let child_account_locks = child.account_locks.lock().unwrap();
assert_eq!(child_account_locks.0.lock().unwrap().len(), 1);
assert!(child_account_locks.1.is_empty());
// Clear the account we just locked from our locks
child_account_locks.0.lock().unwrap().clear();
// Mock an empty set of parent record accounts in the child bank
let (_, ref mut parent_record_locks) = *child.record_locks.lock().unwrap();
parent_record_locks.push(Arc::new(Mutex::new(HashSet::new())));
}
// Make sure new_from_parent() also cleans up old locked parent accounts, in
// case the child doesn't call lock_account() after a parent has released their
// account locks
{
// Mock an empty parent locked_accounts HashSet
let (_, ref mut parent_account_locks) = *child.account_locks.lock().unwrap();
parent_account_locks.push(Arc::new(Mutex::new(HashSet::new())));
}
// Call new_from_parent, make sure the empty parent locked_accounts is purged
let child2 = Accounts::new_from_parent(&child);
{
let (_, ref mut parent_account_locks) = *child.account_locks.lock().unwrap();
assert!(parent_account_locks.is_empty());
let (_, ref mut parent_account_locks2) = *child2.account_locks.lock().unwrap();
assert!(parent_account_locks2.is_empty());
let (_, ref mut parent_record_locks) = *child.record_locks.lock().unwrap();
assert!(parent_record_locks.is_empty());
let (_, ref mut parent_record_locks2) = *child2.record_locks.lock().unwrap();
assert!(parent_record_locks2.is_empty());
}
}
}