Store versioned transactions in the ledger, disabled by default (#19139)

* Add support for versioned transactions, but disable by default

* merge conflicts

* trent's feedback

* bump Cargo.lock

* Fix transaction error encoding

* Rename legacy_transaction method

* cargo clippy

* Clean up casts, int arithmetic, and unused methods

* Check for duplicates in sanitized message conversion

* fix clippy

* fix new test

* Fix bpf conditional compilation for message module
This commit is contained in:
Justin Starry
2021-08-17 15:17:56 -07:00
committed by GitHub
parent 098e2b2de3
commit c50b01cb60
47 changed files with 2373 additions and 1049 deletions

View File

@ -35,7 +35,7 @@ use {
sanitize::Sanitize,
signature::{Keypair, Signature, Signer},
timing::timestamp,
transaction::Transaction,
transaction::VersionedTransaction,
},
solana_storage_proto::{StoredExtendedRewards, StoredTransactionStatusMeta},
solana_transaction_status::{
@ -1890,9 +1890,12 @@ impl Blockstore {
if slot_meta.is_full() {
let slot_entries = self.get_slot_entries(slot, 0)?;
if !slot_entries.is_empty() {
let blockhash = slot_entries
.last()
.map(|entry| entry.hash)
.unwrap_or_else(|| panic!("Rooted slot {:?} must have blockhash", slot));
let slot_transaction_iterator = slot_entries
.iter()
.cloned()
.into_iter()
.flat_map(|entry| entry.transactions)
.map(|transaction| {
if let Err(err) = transaction.sanitize() {
@ -1917,9 +1920,6 @@ impl Blockstore {
Hash::default()
};
let blockhash = get_last_hash(slot_entries.iter())
.unwrap_or_else(|| panic!("Rooted slot {:?} must have blockhash", slot));
let rewards = self
.rewards_cf
.get_protobuf_or_bincode::<StoredExtendedRewards>(slot)?
@ -1937,7 +1937,7 @@ impl Blockstore {
blockhash: blockhash.to_string(),
parent_slot: slot_meta.parent_slot,
transactions: self
.map_transactions_to_statuses(slot, slot_transaction_iterator),
.map_transactions_to_statuses(slot, slot_transaction_iterator)?,
rewards,
block_time,
block_height,
@ -1948,20 +1948,25 @@ impl Blockstore {
Err(BlockstoreError::SlotUnavailable)
}
pub fn map_transactions_to_statuses<'a>(
pub fn map_transactions_to_statuses(
&self,
slot: Slot,
iterator: impl Iterator<Item = Transaction> + 'a,
) -> Vec<TransactionWithStatusMeta> {
iterator: impl Iterator<Item = VersionedTransaction>,
) -> Result<Vec<TransactionWithStatusMeta>> {
iterator
.map(|transaction| {
let signature = transaction.signatures[0];
TransactionWithStatusMeta {
transaction,
meta: self
.read_transaction_status((signature, slot))
.ok()
.flatten(),
.map(|versioned_tx| {
// TODO: add support for versioned transactions
if let Some(transaction) = versioned_tx.into_legacy_transaction() {
let signature = transaction.signatures[0];
Ok(TransactionWithStatusMeta {
transaction,
meta: self
.read_transaction_status((signature, slot))
.ok()
.flatten(),
})
} else {
Err(BlockstoreError::UnsupportedTransactionVersion)
}
})
.collect()
@ -2243,6 +2248,12 @@ impl Blockstore {
let transaction = self
.find_transaction_in_slot(slot, signature)?
.ok_or(BlockstoreError::TransactionStatusSlotMismatch)?; // Should not happen
// TODO: support retrieving versioned transactions
let transaction = transaction
.into_legacy_transaction()
.ok_or(BlockstoreError::UnsupportedTransactionVersion)?;
let block_time = self.get_block_time(slot)?;
Ok(Some(ConfirmedTransaction {
slot,
@ -2261,7 +2272,7 @@ impl Blockstore {
&self,
slot: Slot,
signature: Signature,
) -> Result<Option<Transaction>> {
) -> Result<Option<VersionedTransaction>> {
let slot_entries = self.get_slot_entries(slot, 0)?;
Ok(slot_entries
.iter()
@ -3995,7 +4006,7 @@ pub mod tests {
packet::PACKET_DATA_SIZE,
pubkey::Pubkey,
signature::Signature,
transaction::TransactionError,
transaction::{Transaction, TransactionError},
};
use solana_storage_proto::convert::generated;
use solana_transaction_status::{InnerInstructions, Reward, Rewards, TransactionTokenBalance};
@ -6173,6 +6184,11 @@ pub mod tests {
.cloned()
.filter(|entry| !entry.is_tick())
.flat_map(|entry| entry.transactions)
.map(|transaction| {
transaction
.into_legacy_transaction()
.expect("versioned transactions not supported")
})
.map(|transaction| {
let mut pre_balances: Vec<u64> = vec![];
let mut post_balances: Vec<u64> = vec![];
@ -7043,6 +7059,10 @@ pub mod tests {
.cloned()
.filter(|entry| !entry.is_tick())
.flat_map(|entry| entry.transactions)
.map(|tx| {
tx.into_legacy_transaction()
.expect("versioned transactions not supported")
})
.map(|transaction| {
let mut pre_balances: Vec<u64> = vec![];
let mut post_balances: Vec<u64> = vec![];
@ -7142,6 +7162,10 @@ pub mod tests {
.cloned()
.filter(|entry| !entry.is_tick())
.flat_map(|entry| entry.transactions)
.map(|tx| {
tx.into_legacy_transaction()
.expect("versioned transactions not supported")
})
.map(|transaction| {
let mut pre_balances: Vec<u64> = vec![];
let mut post_balances: Vec<u64> = vec![];
@ -7496,12 +7520,15 @@ pub mod tests {
let shreds = entries_to_test_shreds(entries.clone(), slot, slot - 1, true, 0);
blockstore.insert_shreds(shreds, None, false).unwrap();
for (i, entry) in entries.iter().enumerate() {
for (i, entry) in entries.into_iter().enumerate() {
if slot == 4 && i == 2 {
// Purge to freeze index 0 and write address-signatures in new primary index
blockstore.run_purge(0, 1, PurgeType::PrimaryIndex).unwrap();
}
for transaction in &entry.transactions {
for tx in entry.transactions {
let transaction = tx
.into_legacy_transaction()
.expect("versioned transactions not supported");
assert_eq!(transaction.signatures.len(), 1);
blockstore
.write_transaction_status(
@ -7524,8 +7551,11 @@ pub mod tests {
let shreds = entries_to_test_shreds(entries.clone(), slot, 8, true, 0);
blockstore.insert_shreds(shreds, None, false).unwrap();
for entry in entries.iter() {
for transaction in &entry.transactions {
for entry in entries.into_iter() {
for tx in entry.transactions {
let transaction = tx
.into_legacy_transaction()
.expect("versioned transactions not supported");
assert_eq!(transaction.signatures.len(), 1);
blockstore
.write_transaction_status(
@ -7895,7 +7925,7 @@ pub mod tests {
let transaction_status_cf = blockstore.db.column::<cf::TransactionStatus>();
let slot = 0;
let mut transactions: Vec<Transaction> = vec![];
let mut transactions: Vec<VersionedTransaction> = vec![];
for x in 0..4 {
let transaction = Transaction::new_with_compiled_instructions(
&[&Keypair::new()],
@ -7921,18 +7951,24 @@ pub mod tests {
transaction_status_cf
.put_protobuf((0, transaction.signatures[0], slot), &status)
.unwrap();
transactions.push(transaction);
transactions.push(transaction.into());
}
// Push transaction that will not have matching status, as a test case
transactions.push(Transaction::new_with_compiled_instructions(
&[&Keypair::new()],
&[solana_sdk::pubkey::new_rand()],
Hash::default(),
vec![solana_sdk::pubkey::new_rand()],
vec![CompiledInstruction::new(1, &(), vec![0])],
));
transactions.push(
Transaction::new_with_compiled_instructions(
&[&Keypair::new()],
&[solana_sdk::pubkey::new_rand()],
Hash::default(),
vec![solana_sdk::pubkey::new_rand()],
vec![CompiledInstruction::new(1, &(), vec![0])],
)
.into(),
);
let map = blockstore.map_transactions_to_statuses(slot, transactions.into_iter());
let map_result =
blockstore.map_transactions_to_statuses(slot, transactions.into_iter());
assert!(map_result.is_ok());
let map = map_result.unwrap();
assert_eq!(map.len(), 5);
for (x, m) in map.iter().take(4).enumerate() {
assert_eq!(m.meta.as_ref().unwrap().fee, x as u64);

View File

@ -336,7 +336,8 @@ impl Blockstore {
if let Some(&signature) = transaction.signatures.get(0) {
batch.delete::<cf::TransactionStatus>((0, signature, slot))?;
batch.delete::<cf::TransactionStatus>((1, signature, slot))?;
for pubkey in transaction.message.account_keys {
// TODO: support purging mapped addresses from versioned transactions
for pubkey in transaction.message.unmapped_keys() {
batch.delete::<cf::AddressSignatures>((0, pubkey, slot, signature))?;
batch.delete::<cf::AddressSignatures>((1, pubkey, slot, signature))?;
}
@ -398,6 +399,7 @@ pub mod tests {
use solana_sdk::{
hash::{hash, Hash},
message::Message,
transaction::Transaction,
};
// check that all columns are either empty or start at `min_slot`

View File

@ -99,6 +99,7 @@ pub enum BlockstoreError {
ProtobufDecodeError(#[from] prost::DecodeError),
ParentEntriesUnavailable,
SlotUnavailable,
UnsupportedTransactionVersion,
}
pub type Result<T> = std::result::Result<T, BlockstoreError>;

View File

@ -10,7 +10,7 @@ use log::*;
use rand::{seq::SliceRandom, thread_rng};
use rayon::{prelude::*, ThreadPool};
use solana_entry::entry::{
create_ticks, Entry, EntrySlice, EntryType, EntryVerificationStatus, VerifyRecyclers,
self, create_ticks, Entry, EntrySlice, EntryType, EntryVerificationStatus, VerifyRecyclers,
};
use solana_measure::measure::Measure;
use solana_metrics::{datapoint_error, inc_new_counter_debug};
@ -38,7 +38,7 @@ use solana_sdk::{
pubkey::Pubkey,
signature::{Keypair, Signature},
timing,
transaction::{Result, Transaction, TransactionError},
transaction::{Result, SanitizedTransaction, TransactionError, VersionedTransaction},
};
use solana_transaction_status::token_balances::{
collect_token_balances, TransactionTokenBalancesSet,
@ -47,7 +47,6 @@ use solana_transaction_status::token_balances::{
use std::{
cell::RefCell,
collections::{HashMap, HashSet},
convert::TryFrom,
path::PathBuf,
result,
sync::{Arc, RwLock},
@ -109,10 +108,13 @@ fn get_first_error(
fee_collection_results: Vec<Result<()>>,
) -> Option<(Result<()>, Signature)> {
let mut first_err = None;
for (result, transaction) in fee_collection_results.iter().zip(batch.transactions_iter()) {
for (result, transaction) in fee_collection_results
.iter()
.zip(batch.sanitized_transactions())
{
if let Err(ref err) = result {
if first_err.is_none() {
first_err = Some((result.clone(), transaction.signatures[0]));
first_err = Some((result.clone(), *transaction.signature()));
}
warn!(
"Unexpected validator error: {:?}, transaction: {:?}",
@ -210,7 +212,7 @@ fn execute_batch(
} = tx_results;
if let Some(transaction_status_sender) = transaction_status_sender {
let txs = batch.transactions_iter().cloned().collect();
let transactions = batch.sanitized_transactions().to_vec();
let post_token_balances = if record_token_balances {
collect_token_balances(bank, batch, &mut mint_decimals)
} else {
@ -222,7 +224,7 @@ fn execute_batch(
transaction_status_sender.send_transaction_status_batch(
bank.clone(),
txs,
transactions,
execution_results,
balances,
token_balances,
@ -286,19 +288,23 @@ fn execute_batches(
/// 4. Update the leader scheduler, goto 1
pub fn process_entries(
bank: &Arc<Bank>,
entries: &mut [Entry],
entries: Vec<Entry>,
randomize: bool,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
) -> Result<()> {
let verify_transaction = {
let bank = bank.clone();
move |versioned_tx: VersionedTransaction| -> Result<SanitizedTransaction> {
bank.verify_transaction(versioned_tx, false)
}
};
let mut timings = ExecuteTimings::default();
let mut entry_types: Vec<_> = entries
.iter()
.map(EntryType::try_from)
.collect::<Result<_>>()?;
let mut entries = entry::verify_transactions(entries, Arc::new(verify_transaction))?;
let result = process_entries_with_callback(
bank,
&mut entry_types,
&mut entries,
randomize,
None,
transaction_status_sender,
@ -886,12 +892,14 @@ pub fn confirm_slot(
None
};
let verify_transaction = {
let bank = bank.clone();
move |versioned_tx: VersionedTransaction| -> Result<SanitizedTransaction> {
bank.verify_transaction(versioned_tx, skip_verification)
}
};
let check_start = Instant::now();
let mut entries = entries.verify_and_hash_transactions(
skip_verification,
bank.libsecp256k1_0_5_upgrade_enabled(),
bank.verify_tx_signatures_len_enabled(),
)?;
let mut entries = entry::verify_transactions(entries, Arc::new(verify_transaction))?;
let transaction_duration_us = timing::duration_as_us(&check_start.elapsed());
let mut replay_elapsed = Measure::start("replay_elapsed");
@ -1290,7 +1298,7 @@ pub enum TransactionStatusMessage {
pub struct TransactionStatusBatch {
pub bank: Arc<Bank>,
pub transactions: Vec<Transaction>,
pub transactions: Vec<SanitizedTransaction>,
pub statuses: Vec<TransactionExecutionResult>,
pub balances: TransactionBalancesSet,
pub token_balances: TransactionTokenBalancesSet,
@ -1309,7 +1317,7 @@ impl TransactionStatusSender {
pub fn send_transaction_status_batch(
&self,
bank: Arc<Bank>,
transactions: Vec<Transaction>,
transactions: Vec<SanitizedTransaction>,
statuses: Vec<TransactionExecutionResult>,
balances: TransactionBalancesSet,
token_balances: TransactionTokenBalancesSet,
@ -2097,8 +2105,7 @@ pub mod tests {
} = create_genesis_config(2);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let keypair = Keypair::new();
let mut slot_entries =
create_ticks(genesis_config.ticks_per_slot, 1, genesis_config.hash());
let slot_entries = create_ticks(genesis_config.ticks_per_slot, 1, genesis_config.hash());
let tx = system_transaction::transfer(
&mint_keypair,
&keypair.pubkey(),
@ -2113,7 +2120,7 @@ pub mod tests {
);
// Now ensure the TX is accepted despite pointing to the ID of an empty entry.
process_entries(&bank, &mut slot_entries, true, None, None).unwrap();
process_entries(&bank, slot_entries, true, None, None).unwrap();
assert_eq!(bank.process_transaction(&tx), Ok(()));
}
@ -2323,10 +2330,7 @@ pub mod tests {
// ensure bank can process a tick
assert_eq!(bank.tick_height(), 0);
let tick = next_entry(&genesis_config.hash(), 1, vec![]);
assert_eq!(
process_entries(&bank, &mut [tick], true, None, None),
Ok(())
);
assert_eq!(process_entries(&bank, vec![tick], true, None, None), Ok(()));
assert_eq!(bank.tick_height(), 1);
}
@ -2359,7 +2363,7 @@ pub mod tests {
);
let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]);
assert_eq!(
process_entries(&bank, &mut [entry_1, entry_2], true, None, None),
process_entries(&bank, vec![entry_1, entry_2], true, None, None),
Ok(())
);
assert_eq!(bank.get_balance(&keypair1.pubkey()), 2);
@ -2417,7 +2421,7 @@ pub mod tests {
assert_eq!(
process_entries(
&bank,
&mut [entry_1_to_mint, entry_2_to_3_mint_to_1],
vec![entry_1_to_mint, entry_2_to_3_mint_to_1],
false,
None,
None,
@ -2489,7 +2493,7 @@ pub mod tests {
assert!(process_entries(
&bank,
&mut [entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()],
vec![entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()],
false,
None,
None,
@ -2501,15 +2505,15 @@ pub mod tests {
assert_eq!(bank.get_balance(&keypair2.pubkey()), 4);
// Check all accounts are unlocked
let txs1 = &entry_1_to_mint.transactions[..];
let txs2 = &entry_2_to_3_mint_to_1.transactions[..];
let batch1 = bank.prepare_batch(txs1.iter()).unwrap();
let txs1 = entry_1_to_mint.transactions;
let txs2 = entry_2_to_3_mint_to_1.transactions;
let batch1 = bank.prepare_entry_batch(txs1).unwrap();
for result in batch1.lock_results() {
assert!(result.is_ok());
}
// txs1 and txs2 have accounts that conflict, so we must drop txs1 first
drop(batch1);
let batch2 = bank.prepare_batch(txs2.iter()).unwrap();
let batch2 = bank.prepare_entry_batch(txs2).unwrap();
for result in batch2.lock_results() {
assert!(result.is_ok());
}
@ -2597,7 +2601,7 @@ pub mod tests {
assert!(process_entries(
&bank,
&mut [
vec![
entry_1_to_mint,
entry_2_to_3_and_1_to_mint,
entry_conflict_itself,
@ -2652,7 +2656,7 @@ pub mod tests {
system_transaction::transfer(&keypair2, &keypair4.pubkey(), 1, bank.last_blockhash());
let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]);
assert_eq!(
process_entries(&bank, &mut [entry_1, entry_2], true, None, None),
process_entries(&bank, vec![entry_1, entry_2], true, None, None),
Ok(())
);
assert_eq!(bank.get_balance(&keypair3.pubkey()), 1);
@ -2686,7 +2690,7 @@ pub mod tests {
let present_account = AccountSharedData::new(1, 10, &Pubkey::default());
bank.store_account(&present_account_key.pubkey(), &present_account);
let mut entries: Vec<_> = (0..NUM_TRANSFERS)
let entries: Vec<_> = (0..NUM_TRANSFERS)
.step_by(NUM_TRANSFERS_PER_ENTRY)
.map(|i| {
let mut transactions = (0..NUM_TRANSFERS_PER_ENTRY)
@ -2712,10 +2716,7 @@ pub mod tests {
next_entry_mut(&mut hash, 0, transactions)
})
.collect();
assert_eq!(
process_entries(&bank, &mut entries, true, None, None),
Ok(())
);
assert_eq!(process_entries(&bank, entries, true, None, None), Ok(()));
}
#[test]
@ -2776,7 +2777,7 @@ pub mod tests {
// Transfer lamports to each other
let entry = next_entry(&bank.last_blockhash(), 1, tx_vector);
assert_eq!(
process_entries(&bank, &mut [entry], true, None, None),
process_entries(&bank, vec![entry], true, None, None),
Ok(())
);
bank.squash();
@ -2838,7 +2839,7 @@ pub mod tests {
assert_eq!(
process_entries(
&bank,
&mut [entry_1, tick, entry_2.clone()],
vec![entry_1, tick, entry_2.clone()],
true,
None,
None
@ -2853,7 +2854,7 @@ pub mod tests {
system_transaction::transfer(&keypair2, &keypair3.pubkey(), 1, bank.last_blockhash());
let entry_3 = next_entry(&entry_2.hash, 1, vec![tx]);
assert_eq!(
process_entries(&bank, &mut [entry_3], true, None, None),
process_entries(&bank, vec![entry_3], true, None, None),
Err(TransactionError::AccountNotFound)
);
}
@ -2933,7 +2934,7 @@ pub mod tests {
);
assert_eq!(
process_entries(&bank, &mut [entry_1_to_mint], false, None, None),
process_entries(&bank, vec![entry_1_to_mint], false, None, None),
Err(TransactionError::AccountInUse)
);
@ -3094,7 +3095,7 @@ pub mod tests {
let mut hash = bank.last_blockhash();
let mut root: Option<Arc<Bank>> = None;
loop {
let mut entries: Vec<_> = (0..NUM_TRANSFERS)
let entries: Vec<_> = (0..NUM_TRANSFERS)
.step_by(NUM_TRANSFERS_PER_ENTRY)
.map(|i| {
next_entry_mut(&mut hash, 0, {
@ -3122,9 +3123,9 @@ pub mod tests {
})
.collect();
info!("paying iteration {}", i);
process_entries(&bank, &mut entries, true, None, None).expect("paying failed");
process_entries(&bank, entries, true, None, None).expect("paying failed");
let mut entries: Vec<_> = (0..NUM_TRANSFERS)
let entries: Vec<_> = (0..NUM_TRANSFERS)
.step_by(NUM_TRANSFERS_PER_ENTRY)
.map(|i| {
next_entry_mut(
@ -3145,12 +3146,12 @@ pub mod tests {
.collect();
info!("refunding iteration {}", i);
process_entries(&bank, &mut entries, true, None, None).expect("refunding failed");
process_entries(&bank, entries, true, None, None).expect("refunding failed");
// advance to next block
process_entries(
&bank,
&mut (0..bank.ticks_per_slot())
(0..bank.ticks_per_slot())
.map(|_| next_entry_mut(&mut hash, 1, vec![]))
.collect::<Vec<_>>(),
true,
@ -3198,7 +3199,7 @@ pub mod tests {
let entry = next_entry(&new_blockhash, 1, vec![tx]);
entries.push(entry);
process_entries(&bank0, &mut entries, true, None, None).unwrap();
process_entries(&bank0, entries, true, None, None).unwrap();
assert_eq!(bank0.get_balance(&keypair.pubkey()), 1)
}
@ -3272,8 +3273,8 @@ pub mod tests {
42,
Hash::default(),
);
let transactions = [account_not_found_tx, invalid_blockhash_tx];
let batch = bank.prepare_batch(transactions.iter()).unwrap();
let txs = vec![account_not_found_tx, invalid_blockhash_tx];
let batch = bank.prepare_batch(txs).unwrap();
let (
TransactionResults {
fee_collection_results,
@ -3368,7 +3369,7 @@ pub mod tests {
.collect();
let entry = next_entry(&bank_1_blockhash, 1, vote_txs);
let (replay_vote_sender, replay_vote_receiver) = unbounded();
let _ = process_entries(&bank1, &mut [entry], true, None, Some(&replay_vote_sender));
let _ = process_entries(&bank1, vec![entry], true, None, Some(&replay_vote_sender));
let successes: BTreeSet<Pubkey> = replay_vote_receiver
.try_iter()
.map(|(vote_pubkey, _, _)| vote_pubkey)