Add support for monitoring system account balances (#9345)

automerge
This commit is contained in:
Michael Vines
2020-04-06 21:41:53 -07:00
committed by GitHub
parent 33a68ec9c3
commit 03978ac5a5
2 changed files with 289 additions and 173 deletions

View File

@@ -2,7 +2,10 @@ use log::*;
use serde::{Deserialize, Serialize};
use solana_client::{client_error::Result as ClientResult, rpc_client::RpcClient};
use solana_metrics::{datapoint_error, datapoint_info};
use solana_sdk::{clock::Slot, program_utils::limited_deserialize, transaction::Transaction};
use solana_sdk::{
clock::Slot, program_utils::limited_deserialize, pubkey::Pubkey, signature::Signature,
transaction::Transaction,
};
use solana_stake_program::{stake_instruction::StakeInstruction, stake_state::Lockup};
use solana_transaction_status::{ConfirmedBlock, RpcTransactionStatusMeta, TransactionEncoding};
use std::{collections::HashMap, thread::sleep, time::Duration};
@@ -11,41 +14,64 @@ pub type PubkeyString = String;
pub type SignatureString = String;
#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub enum StakeAccountOperation {
pub enum AccountOperation {
Initialize,
Withdraw,
SplitSource,
SplitDestination,
SystemAccountEnroll,
FailedToMaintainMinimumBalance,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct StakeAccountTransactionInfo {
pub op: StakeAccountOperation,
pub struct AccountTransactionInfo {
pub op: AccountOperation,
pub slot: Slot, // Slot the transaction completed in
pub signature: SignatureString, // Transaction signature
}
#[derive(Serialize, Deserialize, Debug)]
pub struct StakeAccountInfo {
pub struct AccountInfo {
pub compliant_since: Option<Slot>, // The slot when the account was first in compliance
pub lamports: u64, // Account balance
pub transactions: Vec<StakeAccountTransactionInfo>, // Transactions affecting the account
pub transactions: Vec<AccountTransactionInfo>, // Transactions affecting the account
}
#[derive(Serialize, Deserialize, Default, Debug)]
pub struct StakeAccountsInfo {
pub struct AccountsInfo {
pub slot: Slot, // Latest processed slot
pub account_info: HashMap<PubkeyString, StakeAccountInfo>,
pub account_info: HashMap<PubkeyString, AccountInfo>,
}
impl AccountsInfo {
// System accounts must be explicitly enrolled
pub fn enroll_system_account(&mut self, account_address: &Pubkey, slot: Slot, lamports: u64) {
self.account_info.insert(
account_address.to_string(),
AccountInfo {
compliant_since: Some(slot),
lamports,
transactions: vec![AccountTransactionInfo {
op: AccountOperation::SystemAccountEnroll,
slot,
signature: Signature::default().to_string(),
}],
},
);
}
}
fn process_transaction(
slot: Slot,
transaction: &Transaction,
meta: &RpcTransactionStatusMeta,
stake_accounts: &mut HashMap<PubkeyString, StakeAccountInfo>,
accounts: &mut HashMap<PubkeyString, AccountInfo>,
) {
let mut last_instruction = true;
let message = &transaction.message;
let signature = transaction.signatures[0].to_string();
// Look for stake operations
for instruction in message.instructions.iter().rev() {
let program_pubkey = message.account_keys[instruction.program_id_index as usize];
if program_pubkey != solana_stake_program::id() {
@@ -78,8 +104,6 @@ fn process_transaction(
)
),
Ok(stake_instruction) => {
let signature = transaction.signatures[0].to_string();
match stake_instruction {
StakeInstruction::Initialize(_authorized, lockup) => {
// The initialized stake account is at instruction account 0
@@ -90,19 +114,19 @@ fn process_transaction(
// The amount staked is the stake account's post balance
let lamports = meta.post_balances[stake_account_index];
stake_accounts.insert(
accounts.insert(
stake_pubkey,
StakeAccountInfo {
AccountInfo {
compliant_since: if lockup != Lockup::default() {
None // Initialize with a lockup or custodian is non-compliant
} else {
Some(slot)
},
lamports,
transactions: vec![StakeAccountTransactionInfo {
op: StakeAccountOperation::Initialize,
transactions: vec![AccountTransactionInfo {
op: AccountOperation::Initialize,
slot,
signature,
signature: signature.clone(),
}],
},
);
@@ -122,29 +146,29 @@ fn process_transaction(
let split_stake_pubkey =
message.account_keys[split_stake_account_index].to_string();
if let Some(mut source_stake_account_info) =
stake_accounts.get_mut(&source_stake_pubkey)
if let Some(mut source_account_info) =
accounts.get_mut(&source_stake_pubkey)
{
if source_stake_account_info.compliant_since.is_some() {
source_stake_account_info.transactions.push(
StakeAccountTransactionInfo {
op: StakeAccountOperation::SplitSource,
if source_account_info.compliant_since.is_some() {
source_account_info
.transactions
.push(AccountTransactionInfo {
op: AccountOperation::SplitSource,
slot,
signature: signature.clone(),
},
);
source_stake_account_info.lamports -= lamports;
});
source_account_info.lamports -= lamports;
let split_stake_account_info = StakeAccountInfo {
compliant_since: source_stake_account_info.compliant_since,
let split_account_info = AccountInfo {
compliant_since: source_account_info.compliant_since,
lamports,
transactions: vec![StakeAccountTransactionInfo {
op: StakeAccountOperation::SplitDestination,
transactions: vec![AccountTransactionInfo {
op: AccountOperation::SplitDestination,
slot,
signature,
signature: signature.clone(),
}],
};
stake_accounts.insert(split_stake_pubkey, split_stake_account_info);
accounts.insert(split_stake_pubkey, split_account_info);
}
}
}
@@ -154,17 +178,14 @@ fn process_transaction(
let stake_account_index = instruction.accounts[0] as usize;
let stake_pubkey = message.account_keys[stake_account_index].to_string();
if let Some(mut stake_account_info) = stake_accounts.get_mut(&stake_pubkey)
{
if stake_account_info.compliant_since.is_some() {
stake_account_info.compliant_since = None;
stake_account_info
.transactions
.push(StakeAccountTransactionInfo {
op: StakeAccountOperation::Withdraw,
slot,
signature,
});
if let Some(mut account_info) = accounts.get_mut(&stake_pubkey) {
if account_info.compliant_since.is_some() {
account_info.compliant_since = None;
account_info.transactions.push(AccountTransactionInfo {
op: AccountOperation::Withdraw,
slot,
signature: signature.clone(),
});
}
}
}
@@ -177,12 +198,27 @@ fn process_transaction(
}
}
}
// Ensure the balances of all monitored accounts remain in compliance
for (index, account_pubkey) in message.account_keys.iter().enumerate() {
if let Some(mut account_info) = accounts.get_mut(&account_pubkey.to_string()) {
let post_balance = meta.post_balances[index];
if account_info.compliant_since.is_some() && post_balance < account_info.lamports {
account_info.compliant_since = None;
account_info.transactions.push(AccountTransactionInfo {
op: AccountOperation::FailedToMaintainMinimumBalance,
slot,
signature: signature.clone(),
});
}
}
}
}
fn process_confirmed_block(
slot: Slot,
confirmed_block: ConfirmedBlock,
stake_accounts: &mut HashMap<PubkeyString, StakeAccountInfo>,
accounts: &mut HashMap<PubkeyString, AccountInfo>,
) {
for rpc_transaction in confirmed_block.transactions {
match rpc_transaction.meta {
@@ -197,7 +233,7 @@ fn process_confirmed_block(
if meta.err.is_none() {
if let Some(transaction) = rpc_transaction.transaction.decode() {
if transaction.verify().is_ok() {
process_transaction(slot, &transaction, &meta, stake_accounts);
process_transaction(slot, &transaction, &meta, accounts);
} else {
datapoint_error!(
"stake-monitor-failure",
@@ -233,14 +269,10 @@ fn load_blocks(
Ok(blocks)
}
pub fn process_slots(
rpc_client: &RpcClient,
stake_accounts_info: &mut StakeAccountsInfo,
batch_size: u64,
) {
let end_slot = stake_accounts_info.slot + batch_size;
pub fn process_slots(rpc_client: &RpcClient, accounts_info: &mut AccountsInfo, batch_size: u64) {
let end_slot = accounts_info.slot + batch_size;
loop {
let start_slot = stake_accounts_info.slot + 1;
let start_slot = accounts_info.slot + 1;
info!("start_slot:{} - end_slot:{}", start_slot, end_slot);
if start_slot >= end_slot {
break;
@@ -253,11 +285,8 @@ pub fn process_slots(
0
});
if stake_accounts_info.slot >= latest_available_slot {
info!(
"Waiting for a slot greater than {}...",
stake_accounts_info.slot
);
if accounts_info.slot >= latest_available_slot {
info!("Waiting for a slot greater than {}...", accounts_info.slot);
sleep(Duration::from_secs(5));
continue;
}
@@ -267,17 +296,14 @@ pub fn process_slots(
info!("Loaded {} blocks", blocks.len());
if blocks.is_empty() && end_slot < latest_available_slot {
stake_accounts_info.slot = end_slot;
accounts_info.slot = end_slot;
} else {
for (slot, block) in blocks.into_iter() {
process_confirmed_block(slot, block, &mut stake_accounts_info.account_info);
stake_accounts_info.slot = slot;
process_confirmed_block(slot, block, &mut accounts_info.account_info);
accounts_info.slot = slot;
}
}
datapoint_info!(
"stake-monitor-slot",
("slot", stake_accounts_info.slot, i64)
);
datapoint_info!("stake-monitor-slot", ("slot", accounts_info.slot, i64));
}
Err(err) => {
datapoint_error!(
@@ -317,6 +343,8 @@ mod test {
#[serial]
fn test_record() {
solana_logger::setup();
let mut accounts_info = AccountsInfo::default();
let one_sol = sol_to_lamports(1.0);
let cluster = LocalCluster::new(&ClusterConfig {
operating_mode: OperatingMode::Stable,
@@ -482,117 +510,163 @@ mod test {
)
.unwrap();
// System transfer 1
let system1_keypair = Keypair::new();
// Fund system1
let fund_system1_signature = rpc_client
.send_transaction(&system_transaction::transfer(
&payer,
&system1_keypair.pubkey(),
2 * one_sol,
blockhash,
))
.unwrap();
rpc_client
.poll_for_signature_with_commitment(&fund_system1_signature, CommitmentConfig::recent())
.unwrap();
accounts_info.enroll_system_account(
&system1_keypair.pubkey(),
rpc_client
.get_slot_with_commitment(CommitmentConfig::recent())
.unwrap(),
2 * one_sol,
);
// Withdraw 1 sol from system 1 to make it non-compliant
rpc_client
.send_transaction(&system_transaction::transfer(
&system1_keypair,
&payer.pubkey(),
one_sol,
blockhash,
))
.unwrap();
// Process all the transactions
let mut stake_accounts_info = StakeAccountsInfo::default();
let current_slot = rpc_client
.get_slot_with_commitment(CommitmentConfig::recent())
.unwrap();
process_slots(&rpc_client, &mut stake_accounts_info, current_slot + 1);
process_slots(&rpc_client, &mut accounts_info, current_slot + 1);
//
// Check that `stake_accounts_info` was populated with the expected results
// Check that `accounts_info` was populated with the expected results
//
info!("Check the data recorded for stake1");
let stake_account_info = stake_accounts_info
let account_info = accounts_info
.account_info
.get(&stake1_keypair.pubkey().to_string())
.unwrap();
assert!(stake_account_info.compliant_since.is_some());
assert_eq!(stake_account_info.lamports, one_sol);
assert_eq!(stake_account_info.transactions.len(), 1);
assert!(account_info.compliant_since.is_some());
assert_eq!(account_info.lamports, one_sol);
assert_eq!(account_info.transactions.len(), 1);
assert_eq!(
stake_account_info.transactions[0].op,
StakeAccountOperation::Initialize
account_info.transactions[0].op,
AccountOperation::Initialize
);
assert_eq!(
stake_account_info.transactions[0].signature,
account_info.transactions[0].signature,
stake1_signature.to_string()
);
info!("Check the data recorded for stake2");
let stake_account_info = stake_accounts_info
let account_info = accounts_info
.account_info
.get(&stake2_keypair.pubkey().to_string())
.unwrap();
assert!(stake_account_info.compliant_since.is_none());
assert_eq!(stake_account_info.lamports, one_sol);
assert_eq!(stake_account_info.transactions.len(), 1);
assert!(account_info.compliant_since.is_none());
assert_eq!(account_info.lamports, one_sol);
assert_eq!(account_info.transactions.len(), 1);
assert_eq!(
stake_account_info.transactions[0].op,
StakeAccountOperation::Initialize
account_info.transactions[0].op,
AccountOperation::Initialize
);
assert_eq!(
stake_account_info.transactions[0].signature,
account_info.transactions[0].signature,
stake2_signature.to_string()
);
info!("Check the data recorded for stake3");
let stake_account_info = stake_accounts_info
let account_info = accounts_info
.account_info
.get(&stake3_keypair.pubkey().to_string())
.unwrap();
assert!(stake_account_info.compliant_since.is_none());
assert_eq!(stake_account_info.lamports, one_sol);
assert_eq!(stake_account_info.transactions.len(), 2);
assert!(account_info.compliant_since.is_none());
assert_eq!(account_info.lamports, one_sol);
assert_eq!(account_info.transactions.len(), 2);
assert_eq!(
stake_account_info.transactions[0].op,
StakeAccountOperation::Initialize
account_info.transactions[0].op,
AccountOperation::Initialize
);
assert_eq!(
stake_account_info.transactions[0].signature,
account_info.transactions[0].signature,
stake3_initialize_signature.to_string()
);
assert_eq!(account_info.transactions[1].op, AccountOperation::Withdraw,);
assert_eq!(
stake_account_info.transactions[1].op,
StakeAccountOperation::Withdraw,
);
assert_eq!(
stake_account_info.transactions[1].signature,
account_info.transactions[1].signature,
stake3_withdraw_signature.to_string()
);
info!("Check the data recorded for stake4");
let stake_account_info = stake_accounts_info
let account_info = accounts_info
.account_info
.get(&stake4_keypair.pubkey().to_string())
.unwrap();
assert!(stake_account_info.compliant_since.is_some());
assert_eq!(stake_account_info.lamports, one_sol);
assert_eq!(stake_account_info.transactions.len(), 2);
assert!(account_info.compliant_since.is_some());
assert_eq!(account_info.lamports, one_sol);
assert_eq!(account_info.transactions.len(), 2);
assert_eq!(
stake_account_info.transactions[0].op,
StakeAccountOperation::Initialize
account_info.transactions[0].op,
AccountOperation::Initialize
);
assert_eq!(
stake_account_info.transactions[0].signature,
account_info.transactions[0].signature,
stake4_initialize_signature.to_string()
);
assert_eq!(
stake_account_info.transactions[1].op,
StakeAccountOperation::SplitSource,
account_info.transactions[1].op,
AccountOperation::SplitSource,
);
assert_eq!(
stake_account_info.transactions[1].signature,
account_info.transactions[1].signature,
stake45_split_signature.to_string()
);
info!("Check the data recorded for stake5");
let stake_account_info = stake_accounts_info
let account_info = accounts_info
.account_info
.get(&stake5_keypair.pubkey().to_string())
.unwrap();
error!("stake_account_info 5: {:?}", stake_account_info);
assert!(stake_account_info.compliant_since.is_some());
assert_eq!(stake_account_info.lamports, one_sol);
assert_eq!(stake_account_info.transactions.len(), 1);
assert!(account_info.compliant_since.is_some());
assert_eq!(account_info.lamports, one_sol);
assert_eq!(account_info.transactions.len(), 1);
assert_eq!(
stake_account_info.transactions[0].op,
StakeAccountOperation::SplitDestination,
account_info.transactions[0].op,
AccountOperation::SplitDestination,
);
assert_eq!(
stake_account_info.transactions[0].signature,
account_info.transactions[0].signature,
stake45_split_signature.to_string()
);
info!("Check the data recorded for system1");
let account_info = accounts_info
.account_info
.get(&system1_keypair.pubkey().to_string())
.unwrap();
error!("account_info system 1: {:?}", account_info);
assert!(account_info.compliant_since.is_none());
assert_eq!(account_info.lamports, 2 * one_sol);
assert_eq!(account_info.transactions.len(), 2);
assert_eq!(
account_info.transactions[0].op,
AccountOperation::SystemAccountEnroll,
);
assert_eq!(
account_info.transactions[1].op,
AccountOperation::FailedToMaintainMinimumBalance,
);
}
}