Send votes from banking stage to vote listener (#11434) (#11454)

*  Send votes from banking stage to vote listener

Co-authored-by: Carl <carl@solana.com>
(cherry picked from commit 7e25130529)

Co-authored-by: carllin <wumu727@gmail.com>
This commit is contained in:
mergify[bot]
2020-08-07 19:45:40 +00:00
committed by GitHub
parent fa9aa0a1d7
commit f9d6fb48a4
23 changed files with 268 additions and 156 deletions

View File

@@ -281,7 +281,7 @@ impl Accounts {
OrderedIterator::new(txs, txs_iteration_order)
.zip(lock_results.into_iter())
.map(|etx| match etx {
(tx, (Ok(()), hash_age_kind)) => {
((_, tx), (Ok(()), hash_age_kind)) => {
let fee_calculator = match hash_age_kind.as_ref() {
Some(HashAgeKind::DurableNonce(_, account)) => {
nonce_utils::fee_calculator_of(account)
@@ -612,7 +612,7 @@ impl Accounts {
) -> Vec<Result<()>> {
use solana_sdk::sanitize::Sanitize;
let keys: Vec<Result<_>> = OrderedIterator::new(txs, txs_iteration_order)
.map(|tx| {
.map(|(_, tx)| {
tx.sanitize().map_err(TransactionError::from)?;
if Self::has_duplicates(&tx.message.account_keys) {
@@ -645,7 +645,7 @@ impl Accounts {
OrderedIterator::new(txs, txs_iteration_order)
.zip(results.iter())
.for_each(|(tx, result)| self.unlock_account(tx, result, &mut account_locks));
.for_each(|((_, tx), result)| self.unlock_account(tx, result, &mut account_locks));
}
/// Store the accounts into the DB
@@ -697,7 +697,7 @@ impl Accounts {
fix_recent_blockhashes_sysvar_delay: bool,
) -> Vec<(&'a Pubkey, &'a Account)> {
let mut accounts = Vec::with_capacity(loaded.len());
for (i, ((raccs, _hash_age_kind), tx)) in loaded
for (i, ((raccs, _hash_age_kind), (_, tx))) in loaded
.iter_mut()
.zip(OrderedIterator::new(txs, txs_iteration_order))
.enumerate()

View File

@@ -175,11 +175,18 @@ pub type TransactionProcessResult = (Result<()>, Option<HashAgeKind>);
pub struct TransactionResults {
pub fee_collection_results: Vec<Result<()>>,
pub processing_results: Vec<TransactionProcessResult>,
pub overwritten_vote_accounts: Vec<OverwrittenVoteAccount>,
}
pub struct TransactionBalancesSet {
pub pre_balances: TransactionBalances,
pub post_balances: TransactionBalances,
}
pub struct OverwrittenVoteAccount {
pub account: Account,
pub transaction_index: usize,
pub transaction_result_index: usize,
}
impl TransactionBalancesSet {
pub fn new(pre_balances: TransactionBalances, post_balances: TransactionBalances) -> Self {
assert_eq!(pre_balances.len(), post_balances.len());
@@ -1276,7 +1283,7 @@ impl Bank {
res: &[TransactionProcessResult],
) {
let mut status_cache = self.src.status_cache.write().unwrap();
for (i, tx) in OrderedIterator::new(txs, iteration_order).enumerate() {
for (i, (_, tx)) in OrderedIterator::new(txs, iteration_order).enumerate() {
let (res, _hash_age_kind) = &res[i];
if Self::can_commit(res) && !tx.signatures.is_empty() {
status_cache.insert(
@@ -1422,7 +1429,7 @@ impl Bank {
let hash_queue = self.blockhash_queue.read().unwrap();
OrderedIterator::new(txs, iteration_order)
.zip(lock_results.into_iter())
.map(|(tx, lock_res)| match lock_res {
.map(|((_, tx), lock_res)| match lock_res {
Ok(()) => {
let message = tx.message();
let hash_age = hash_queue.check_hash_age(&message.recent_blockhash, max_age);
@@ -1452,7 +1459,7 @@ impl Bank {
let rcache = self.src.status_cache.read().unwrap();
OrderedIterator::new(txs, iteration_order)
.zip(lock_results.into_iter())
.map(|(tx, lock_res)| {
.map(|((_, tx), lock_res)| {
if tx.signatures.is_empty() {
return lock_res;
}
@@ -1487,7 +1494,7 @@ impl Bank {
) -> Vec<TransactionProcessResult> {
OrderedIterator::new(txs, iteration_order)
.zip(lock_results.into_iter())
.map(|(tx, lock_res)| {
.map(|((_, tx), lock_res)| {
if lock_res.0.is_ok() {
if tx.message.instructions.len() == 1 {
let instruction = &tx.message.instructions[0];
@@ -1579,7 +1586,8 @@ impl Bank {
pub fn collect_balances(&self, batch: &TransactionBatch) -> TransactionBalances {
let mut balances: TransactionBalances = vec![];
for transaction in OrderedIterator::new(batch.transactions(), batch.iteration_order()) {
for (_, transaction) in OrderedIterator::new(batch.transactions(), batch.iteration_order())
{
let mut transaction_balances: Vec<u64> = vec![];
for account_key in transaction.message.account_keys.iter() {
transaction_balances.push(self.get_balance(account_key));
@@ -1728,7 +1736,7 @@ impl Bank {
let retryable_txs: Vec<_> =
OrderedIterator::new(batch.lock_results(), batch.iteration_order())
.enumerate()
.filter_map(|(index, res)| match res {
.filter_map(|(index, (_, res))| match res {
Err(TransactionError::AccountInUse) => {
error_counters.account_in_use += 1;
Some(index)
@@ -1758,7 +1766,7 @@ impl Bank {
let executed: Vec<TransactionProcessResult> = loaded_accounts
.iter_mut()
.zip(OrderedIterator::new(txs, batch.iteration_order()))
.map(|(accs, tx)| match accs {
.map(|(accs, (_, tx))| match accs {
(Err(e), hash_age_kind) => (Err(e.clone()), hash_age_kind.clone()),
(Ok((accounts, loaders, _rents)), hash_age_kind) => {
signature_count += u64::from(tx.message().header.num_required_signatures);
@@ -1837,7 +1845,7 @@ impl Bank {
let mut fees = 0;
let results = OrderedIterator::new(txs, iteration_order)
.zip(executed.iter())
.map(|(tx, (res, hash_age_kind))| {
.map(|((_, tx), (res, hash_age_kind))| {
let (fee_calculator, is_durable_nonce) = match hash_age_kind {
Some(HashAgeKind::DurableNonce(_, account)) => {
(nonce_utils::fee_calculator_of(account), true)
@@ -1921,7 +1929,8 @@ impl Bank {
);
self.collect_rent(executed, loaded_accounts);
self.update_cached_accounts(txs, iteration_order, executed, loaded_accounts);
let overwritten_vote_accounts =
self.update_cached_accounts(txs, iteration_order, executed, loaded_accounts);
// once committed there is no way to unroll
write_time.stop();
@@ -1929,9 +1938,11 @@ impl Bank {
self.update_transaction_statuses(txs, iteration_order, &executed);
let fee_collection_results =
self.filter_program_errors_and_collect_fee(txs, iteration_order, executed);
TransactionResults {
fee_collection_results,
processing_results: executed.to_vec(),
overwritten_vote_accounts,
}
}
@@ -2866,8 +2877,9 @@ impl Bank {
iteration_order: Option<&[usize]>,
res: &[TransactionProcessResult],
loaded: &[(Result<TransactionLoadResult>, Option<HashAgeKind>)],
) {
for (i, ((raccs, _load_hash_age_kind), tx)) in loaded
) -> Vec<OverwrittenVoteAccount> {
let mut overwritten_vote_accounts = vec![];
for (i, ((raccs, _load_hash_age_kind), (transaction_index, tx))) in loaded
.iter()
.zip(OrderedIterator::new(txs, iteration_order))
.enumerate()
@@ -2887,10 +2899,20 @@ impl Bank {
.filter(|(_key, account)| (Stakes::is_stake(account)))
{
if Stakes::is_stake(account) {
self.stakes.write().unwrap().store(pubkey, account);
if let Some(old_vote_account) =
self.stakes.write().unwrap().store(pubkey, account)
{
overwritten_vote_accounts.push(OverwrittenVoteAccount {
account: old_vote_account,
transaction_index,
transaction_result_index: i,
});
}
}
}
}
overwritten_vote_accounts
}
/// current stake delegations for this bank

View File

@@ -1,8 +1,10 @@
use crate::{
bank::Bank,
bank::{Bank, TransactionResults},
genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs},
vote_sender_types::ReplayVoteSender,
};
use solana_sdk::{pubkey::Pubkey, signature::Signer};
use solana_sdk::{pubkey::Pubkey, signature::Signer, transaction::Transaction};
use solana_vote_program::vote_transaction;
pub fn setup_bank_and_vote_pubkeys(num_vote_accounts: usize, stake: u64) -> (Bank, Vec<Pubkey>) {
// Create some voters at genesis
@@ -23,3 +25,28 @@ pub fn setup_bank_and_vote_pubkeys(num_vote_accounts: usize, stake: u64) -> (Ban
let bank = Bank::new(&genesis_config);
(bank, vote_pubkeys)
}
pub fn find_and_send_votes(
txs: &[Transaction],
tx_results: &TransactionResults,
vote_sender: Option<&ReplayVoteSender>,
) {
let TransactionResults {
processing_results,
overwritten_vote_accounts,
..
} = tx_results;
if let Some(vote_sender) = vote_sender {
for old_account in overwritten_vote_accounts {
assert!(processing_results[old_account.transaction_result_index]
.0
.is_ok());
let transaction = &txs[old_account.transaction_index];
if let Some(parsed_vote) = vote_transaction::parse_vote_transaction(transaction) {
if parsed_vote.1.slots.last().is_some() {
let _ = vote_sender.send(parsed_vote);
}
}
}
}
}

View File

@@ -29,6 +29,7 @@ pub mod status_cache;
mod system_instruction_processor;
pub mod transaction_batch;
pub mod transaction_utils;
pub mod vote_sender_types;
extern crate solana_config_program;
extern crate solana_stake_program;

View File

@@ -105,20 +105,18 @@ impl Stakes {
&& account.data.len() >= std::mem::size_of::<StakeState>()
}
pub fn store(&mut self, pubkey: &Pubkey, account: &Account) {
pub fn store(&mut self, pubkey: &Pubkey, account: &Account) -> Option<Account> {
if solana_vote_program::check_id(&account.owner) {
if account.lamports == 0 {
self.vote_accounts.remove(pubkey);
} else {
let old = self.vote_accounts.get(pubkey);
let stake = old.map_or_else(
let old = self.vote_accounts.remove(pubkey);
if account.lamports != 0 {
let stake = old.as_ref().map_or_else(
|| self.calculate_stake(pubkey, self.epoch, Some(&self.stake_history)),
|v| v.0,
);
self.vote_accounts.insert(*pubkey, (stake, account.clone()));
}
old.map(|(_, account)| account)
} else if solana_stake_program::check_id(&account.owner) {
// old_stake is stake lamports and voter_pubkey from the pre-store() version
let old_stake = self.stake_delegations.get(pubkey).map(|delegation| {
@@ -160,6 +158,9 @@ impl Stakes {
} else if let Some(delegation) = delegation {
self.stake_delegations.insert(*pubkey, delegation);
}
None
} else {
None
}
}

View File

@@ -21,7 +21,7 @@ impl<'a, T> OrderedIterator<'a, T> {
}
impl<'a, T> Iterator for OrderedIterator<'a, T> {
type Item = &'a T;
type Item = (usize, &'a T);
fn next(&mut self) -> Option<Self::Item> {
if self.current >= self.vec.len() {
None
@@ -33,7 +33,7 @@ impl<'a, T> Iterator for OrderedIterator<'a, T> {
index = self.current;
}
self.current += 1;
Some(self.vec.index(index))
Some((index, self.vec.index(index)))
}
}
}
@@ -42,17 +42,22 @@ impl<'a, T> Iterator for OrderedIterator<'a, T> {
mod tests {
use super::*;
type IteratorResponse<'a> = Vec<(((usize, &'a usize), &'a usize), usize)>;
#[test]
fn test_ordered_iterator_custom_order() {
let vec: Vec<usize> = vec![1, 2, 3, 4];
let custom_order: Vec<usize> = vec![3, 1, 0, 2];
let custom_order_ = custom_order.clone();
let ordered_iterator = OrderedIterator::new(&vec, Some(&custom_order));
let expected_response: Vec<usize> = vec![4, 2, 1, 3];
let resp: Vec<(&usize, &usize)> = ordered_iterator
let resp: IteratorResponse = ordered_iterator
.zip(expected_response.iter())
.filter(|(actual_elem, expected_elem)| *actual_elem == *expected_elem)
.zip(custom_order_)
.filter(|(((index, actual_elem), expected_elem), expected_index)| {
*actual_elem == *expected_elem && index == expected_index
})
.collect();
assert_eq!(resp.len(), custom_order.len());
@@ -63,9 +68,12 @@ mod tests {
let vec: Vec<usize> = vec![1, 2, 3, 4];
let ordered_iterator = OrderedIterator::new(&vec, None);
let resp: Vec<(&usize, &usize)> = ordered_iterator
let resp: IteratorResponse = ordered_iterator
.zip(vec.iter())
.filter(|(actual_elem, expected_elem)| *actual_elem == *expected_elem)
.zip(0..=4)
.filter(|(((index, actual_elem), expected_elem), expected_index)| {
*actual_elem == *expected_elem && index == expected_index
})
.collect();
assert_eq!(resp.len(), vec.len());

View File

@@ -0,0 +1,7 @@
use crossbeam_channel::{Receiver, Sender};
use solana_sdk::{hash::Hash, pubkey::Pubkey};
use solana_vote_program::vote_state::Vote;
pub type ReplayedVote = (Pubkey, Vote, Option<Hash>);
pub type ReplayVoteSender = Sender<ReplayedVote>;
pub type ReplayVoteReceiver = Receiver<ReplayedVote>;