Add hook for getting vote transactions on replay (#11264)

* Add hook for getting vote transactions on replay

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin
2020-07-29 23:17:40 -07:00
committed by GitHub
parent a888f2f516
commit bf18524368
11 changed files with 426 additions and 228 deletions

View File

@@ -2,7 +2,7 @@ use crate::{
blockstore::Blockstore,
blockstore_processor::{
self, BlockstoreProcessorError, BlockstoreProcessorResult, ProcessOptions,
TransactionStatusSender,
ReplayVotesSender, TransactionStatusSender,
},
entry::VerifyRecyclers,
leader_schedule_cache::LeaderScheduleCache,
@@ -36,6 +36,7 @@ pub fn load(
snapshot_config: Option<&SnapshotConfig>,
process_options: ProcessOptions,
transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>,
) -> LoadResult {
if let Some(snapshot_config) = snapshot_config.as_ref() {
info!(
@@ -89,6 +90,7 @@ pub fn load(
&process_options,
&VerifyRecyclers::default(),
transaction_status_sender,
replay_votes_sender,
),
Some(deserialized_snapshot_hash),
);

View File

@@ -6,7 +6,7 @@ use crate::{
entry::{create_ticks, Entry, EntrySlice, EntryVerificationStatus, VerifyRecyclers},
leader_schedule_cache::LeaderScheduleCache,
};
use crossbeam_channel::Sender;
use crossbeam_channel::{Receiver, Sender};
use itertools::Itertools;
use log::*;
use rand::{seq::SliceRandom, thread_rng};
@@ -29,6 +29,7 @@ use solana_sdk::{
timing::duration_as_ms,
transaction::{Result, Transaction, TransactionError},
};
use solana_vote_program::{vote_state::Vote, vote_transaction};
use std::{
cell::RefCell,
collections::HashMap,
@@ -42,6 +43,10 @@ use thiserror::Error;
pub type BlockstoreProcessorResult =
result::Result<(BankForks, LeaderScheduleCache), BlockstoreProcessorError>;
pub type ReplayedVote = (Pubkey, Vote, Option<Hash>);
pub type ReplayVotesSender = Sender<ReplayedVote>;
pub type ReplayVotesReceiver = Receiver<ReplayedVote>;
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|ix| format!("blockstore_processor_{}", ix))
@@ -93,6 +98,7 @@ fn execute_batch(
batch: &TransactionBatch,
bank: &Arc<Bank>,
transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>,
) -> Result<()> {
let (
TransactionResults {
@@ -106,6 +112,19 @@ fn execute_batch(
transaction_status_sender.is_some(),
);
if let Some(replay_votes_sender) = replay_votes_sender {
for (transaction, (processing_result, _)) in
OrderedIterator::new(batch.transactions(), batch.iteration_order())
.zip(&processing_results)
{
if processing_result.is_ok() {
if let Some(parsed_vote) = vote_transaction::parse_vote_transaction(transaction) {
let _ = replay_votes_sender.send(parsed_vote);
}
}
}
}
if let Some(sender) = transaction_status_sender {
send_transaction_status_batch(
bank.clone(),
@@ -126,6 +145,7 @@ fn execute_batches(
batches: &[TransactionBatch],
entry_callback: Option<&ProcessCallback>,
transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>,
) -> Result<()> {
inc_new_counter_debug!("bank-par_execute_entries-count", batches.len());
let results: Vec<Result<()>> = PAR_THREAD_POOL.with(|thread_pool| {
@@ -133,7 +153,7 @@ fn execute_batches(
batches
.into_par_iter()
.map_with(transaction_status_sender, |sender, batch| {
let result = execute_batch(batch, bank, sender.clone());
let result = execute_batch(batch, bank, sender.clone(), replay_votes_sender);
if let Some(entry_callback) = entry_callback {
entry_callback(bank);
}
@@ -156,8 +176,16 @@ pub fn process_entries(
entries: &[Entry],
randomize: bool,
transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>,
) -> Result<()> {
process_entries_with_callback(bank, entries, randomize, None, transaction_status_sender)
process_entries_with_callback(
bank,
entries,
randomize,
None,
transaction_status_sender,
replay_votes_sender,
)
}
fn process_entries_with_callback(
@@ -166,6 +194,7 @@ fn process_entries_with_callback(
randomize: bool,
entry_callback: Option<&ProcessCallback>,
transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>,
) -> Result<()> {
// accumulator for entries that can be processed in parallel
let mut batches = vec![];
@@ -182,6 +211,7 @@ fn process_entries_with_callback(
&batches,
entry_callback,
transaction_status_sender.clone(),
replay_votes_sender,
)?;
batches.clear();
for hash in &tick_hashes {
@@ -237,12 +267,19 @@ fn process_entries_with_callback(
&batches,
entry_callback,
transaction_status_sender.clone(),
replay_votes_sender,
)?;
batches.clear();
}
}
}
execute_batches(bank, &batches, entry_callback, transaction_status_sender)?;
execute_batches(
bank,
&batches,
entry_callback,
transaction_status_sender,
replay_votes_sender,
)?;
for hash in tick_hashes {
bank.register_tick(&hash);
}
@@ -308,7 +345,15 @@ pub fn process_blockstore(
info!("processing ledger for slot 0...");
let recyclers = VerifyRecyclers::default();
process_bank_0(&bank0, blockstore, &opts, &recyclers)?;
process_blockstore_from_root(genesis_config, blockstore, bank0, &opts, &recyclers, None)
process_blockstore_from_root(
genesis_config,
blockstore,
bank0,
&opts,
&recyclers,
None,
None,
)
}
// Process blockstore from a known root bank
@@ -319,6 +364,7 @@ pub fn process_blockstore_from_root(
opts: &ProcessOptions,
recyclers: &VerifyRecyclers,
transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>,
) -> BlockstoreProcessorResult {
info!("processing ledger from slot {}...", bank.slot());
let allocated = thread_mem_usage::Allocatedp::default();
@@ -384,6 +430,7 @@ pub fn process_blockstore_from_root(
opts,
recyclers,
transaction_status_sender,
replay_votes_sender,
)?;
(initial_forks, leader_schedule_cache)
} else {
@@ -473,6 +520,7 @@ fn confirm_full_slot(
recyclers: &VerifyRecyclers,
progress: &mut ConfirmationProgress,
transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>,
) -> result::Result<(), BlockstoreProcessorError> {
let mut timing = ConfirmationTiming::default();
let skip_verification = !opts.poh_verify;
@@ -483,6 +531,7 @@ fn confirm_full_slot(
progress,
skip_verification,
transaction_status_sender,
replay_votes_sender,
opts.entry_callback.as_ref(),
recyclers,
)?;
@@ -543,6 +592,7 @@ pub fn confirm_slot(
progress: &mut ConfirmationProgress,
skip_verification: bool,
transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>,
entry_callback: Option<&ProcessCallback>,
recyclers: &VerifyRecyclers,
) -> result::Result<(), BlockstoreProcessorError> {
@@ -610,6 +660,7 @@ pub fn confirm_slot(
true,
entry_callback,
transaction_status_sender,
replay_votes_sender,
)
.map_err(BlockstoreProcessorError::from);
replay_elapsed.stop();
@@ -646,8 +697,16 @@ fn process_bank_0(
) -> result::Result<(), BlockstoreProcessorError> {
assert_eq!(bank0.slot(), 0);
let mut progress = ConfirmationProgress::new(bank0.last_blockhash());
confirm_full_slot(blockstore, bank0, opts, recyclers, &mut progress, None)
.expect("processing for bank 0 must succeed");
confirm_full_slot(
blockstore,
bank0,
opts,
recyclers,
&mut progress,
None,
None,
)
.expect("processing for bank 0 must succeed");
bank0.freeze();
Ok(())
}
@@ -720,6 +779,7 @@ fn load_frozen_forks(
opts: &ProcessOptions,
recyclers: &VerifyRecyclers,
transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>,
) -> result::Result<Vec<Arc<Bank>>, BlockstoreProcessorError> {
let mut initial_forks = HashMap::new();
let mut last_status_report = Instant::now();
@@ -766,6 +826,7 @@ fn load_frozen_forks(
recyclers,
&mut progress,
transaction_status_sender.clone(),
replay_votes_sender,
)
.is_err()
{
@@ -816,10 +877,11 @@ fn process_single_slot(
recyclers: &VerifyRecyclers,
progress: &mut ConfirmationProgress,
transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>,
) -> result::Result<(), BlockstoreProcessorError> {
// Mark corrupt slots as dead so validators don't replay this slot and
// see DuplicateSignature errors later in ReplayStage
confirm_full_slot(blockstore, bank, opts, recyclers, progress, transaction_status_sender).map_err(|err| {
confirm_full_slot(blockstore, bank, opts, recyclers, progress, transaction_status_sender, replay_votes_sender).map_err(|err| {
let slot = bank.slot();
warn!("slot {} failed to verify: {}", slot, err);
if blockstore.is_primary_access() {
@@ -909,8 +971,12 @@ pub mod tests {
create_genesis_config, create_genesis_config_with_leader, GenesisConfigInfo,
},
};
use crossbeam_channel::unbounded;
use matches::assert_matches;
use rand::{thread_rng, Rng};
use solana_runtime::genesis_utils::{
create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
};
use solana_sdk::account::Account;
use solana_sdk::{
epoch_schedule::EpochSchedule,
@@ -921,7 +987,7 @@ pub mod tests {
system_transaction,
transaction::{Transaction, TransactionError},
};
use std::sync::RwLock;
use std::{collections::BTreeSet, sync::RwLock};
#[test]
fn test_process_blockstore_with_missing_hashes() {
@@ -1593,7 +1659,7 @@ pub mod tests {
);
// Now ensure the TX is accepted despite pointing to the ID of an empty entry.
process_entries(&bank, &slot_entries, true, None).unwrap();
process_entries(&bank, &slot_entries, true, None, None).unwrap();
assert_eq!(bank.process_transaction(&tx), Ok(()));
}
@@ -1798,7 +1864,7 @@ pub mod tests {
// ensure bank can process a tick
assert_eq!(bank.tick_height(), 0);
let tick = next_entry(&genesis_config.hash(), 1, vec![]);
assert_eq!(process_entries(&bank, &[tick], true, None), Ok(()));
assert_eq!(process_entries(&bank, &[tick], true, None, None), Ok(()));
assert_eq!(bank.tick_height(), 1);
}
@@ -1831,7 +1897,7 @@ pub mod tests {
);
let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]);
assert_eq!(
process_entries(&bank, &[entry_1, entry_2], true, None),
process_entries(&bank, &[entry_1, entry_2], true, None, None),
Ok(())
);
assert_eq!(bank.get_balance(&keypair1.pubkey()), 2);
@@ -1891,7 +1957,8 @@ pub mod tests {
&bank,
&[entry_1_to_mint, entry_2_to_3_mint_to_1],
false,
None
None,
None,
),
Ok(())
);
@@ -1963,6 +2030,7 @@ pub mod tests {
&[entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()],
false,
None,
None,
)
.is_err());
@@ -2074,6 +2142,7 @@ pub mod tests {
],
false,
None,
None,
)
.is_err());
@@ -2121,7 +2190,7 @@ pub mod tests {
system_transaction::transfer(&keypair2, &keypair4.pubkey(), 1, bank.last_blockhash());
let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]);
assert_eq!(
process_entries(&bank, &[entry_1, entry_2], true, None),
process_entries(&bank, &[entry_1, entry_2], true, None, None),
Ok(())
);
assert_eq!(bank.get_balance(&keypair3.pubkey()), 1);
@@ -2181,7 +2250,7 @@ pub mod tests {
next_entry_mut(&mut hash, 0, transactions)
})
.collect();
assert_eq!(process_entries(&bank, &entries, true, None), Ok(()));
assert_eq!(process_entries(&bank, &entries, true, None, None), Ok(()));
}
#[test]
@@ -2241,7 +2310,7 @@ pub mod tests {
// Transfer lamports to each other
let entry = next_entry(&bank.last_blockhash(), 1, tx_vector);
assert_eq!(process_entries(&bank, &[entry], true, None), Ok(()));
assert_eq!(process_entries(&bank, &[entry], true, None, None), Ok(()));
bank.squash();
// Even number keypair should have balance of 2 * initial_lamports and
@@ -2299,7 +2368,7 @@ pub mod tests {
system_transaction::transfer(&keypair1, &keypair4.pubkey(), 1, bank.last_blockhash());
let entry_2 = next_entry(&tick.hash, 1, vec![tx]);
assert_eq!(
process_entries(&bank, &[entry_1, tick, entry_2.clone()], true, None),
process_entries(&bank, &[entry_1, tick, entry_2.clone()], true, None, None),
Ok(())
);
assert_eq!(bank.get_balance(&keypair3.pubkey()), 1);
@@ -2310,7 +2379,7 @@ pub mod tests {
system_transaction::transfer(&keypair2, &keypair3.pubkey(), 1, bank.last_blockhash());
let entry_3 = next_entry(&entry_2.hash, 1, vec![tx]);
assert_eq!(
process_entries(&bank, &[entry_3], true, None),
process_entries(&bank, &[entry_3], true, None, None),
Err(TransactionError::AccountNotFound)
);
}
@@ -2390,7 +2459,7 @@ pub mod tests {
);
assert_eq!(
process_entries(&bank, &[entry_1_to_mint], false, None),
process_entries(&bank, &[entry_1_to_mint], false, None, None),
Err(TransactionError::AccountInUse)
);
@@ -2450,6 +2519,7 @@ pub mod tests {
&recyclers,
&mut ConfirmationProgress::new(bank0.last_blockhash()),
None,
None,
)
.unwrap();
bank1.squash();
@@ -2462,6 +2532,7 @@ pub mod tests {
&opts,
&recyclers,
None,
None,
)
.unwrap();
@@ -2543,7 +2614,7 @@ pub mod tests {
})
.collect();
info!("paying iteration {}", i);
process_entries(&bank, &entries, true, None).expect("paying failed");
process_entries(&bank, &entries, true, None, None).expect("paying failed");
let entries: Vec<_> = (0..NUM_TRANSFERS)
.step_by(NUM_TRANSFERS_PER_ENTRY)
@@ -2566,7 +2637,7 @@ pub mod tests {
.collect();
info!("refunding iteration {}", i);
process_entries(&bank, &entries, true, None).expect("refunding failed");
process_entries(&bank, &entries, true, None, None).expect("refunding failed");
// advance to next block
process_entries(
@@ -2576,6 +2647,7 @@ pub mod tests {
.collect::<Vec<_>>(),
true,
None,
None,
)
.expect("process ticks failed");
@@ -2618,7 +2690,7 @@ pub mod tests {
let entry = next_entry(&new_blockhash, 1, vec![tx]);
entries.push(entry);
process_entries_with_callback(&bank0, &entries, true, None, None).unwrap();
process_entries_with_callback(&bank0, &entries, true, None, None, None).unwrap();
assert_eq!(bank0.get_balance(&keypair.pubkey()), 1)
}
@@ -2699,4 +2771,81 @@ pub mod tests {
assert_eq!(err.unwrap_err(), TransactionError::AccountNotFound);
assert_eq!(signature, account_not_found_sig);
}
#[test]
fn test_replay_vote_sender() {
let validator_keypairs: Vec<_> =
(0..10).map(|_| ValidatorVoteKeypairs::new_rand()).collect();
let GenesisConfigInfo {
genesis_config,
voting_keypair: _,
..
} = create_genesis_config_with_vote_accounts(
1_000_000_000,
&validator_keypairs,
vec![100; validator_keypairs.len()],
);
let bank0 = Arc::new(Bank::new(&genesis_config));
bank0.freeze();
let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::new_rand(), 1));
// The new blockhash is going to be the hash of the last tick in the block
let bank_1_blockhash = bank1.last_blockhash();
// Create an transaction that references the new blockhash, should still
// be able to find the blockhash if we process transactions all in the same
// batch
let mut expected_successful_voter_pubkeys = BTreeSet::new();
let vote_txs: Vec<_> = validator_keypairs
.iter()
.enumerate()
.map(|(i, validator_keypairs)| {
if i % 3 == 0 {
// These votes are correct
expected_successful_voter_pubkeys
.insert(validator_keypairs.vote_keypair.pubkey());
vote_transaction::new_vote_transaction(
vec![0],
bank0.hash(),
bank_1_blockhash,
&validator_keypairs.node_keypair,
&validator_keypairs.vote_keypair,
&validator_keypairs.vote_keypair,
None,
)
} else if i % 3 == 1 {
// These have the wrong authorized voter
vote_transaction::new_vote_transaction(
vec![0],
bank0.hash(),
bank_1_blockhash,
&validator_keypairs.node_keypair,
&validator_keypairs.vote_keypair,
&Keypair::new(),
None,
)
} else {
// These have an invalid vote for non-existent bank 2
vote_transaction::new_vote_transaction(
vec![bank1.slot() + 1],
bank0.hash(),
bank_1_blockhash,
&validator_keypairs.node_keypair,
&validator_keypairs.vote_keypair,
&validator_keypairs.vote_keypair,
None,
)
}
})
.collect();
let entry = next_entry(&bank_1_blockhash, 1, vote_txs);
let (replay_votes_sender, replay_votes_receiver) = unbounded();
let _ = process_entries(&bank1, &[entry], true, None, Some(&replay_votes_sender));
let successes: BTreeSet<Pubkey> = replay_votes_receiver
.try_iter()
.map(|(vote_pubkey, _, _)| vote_pubkey)
.collect();
assert_eq!(successes, expected_successful_voter_pubkeys);
}
}