Write transaction status and fee into persistent store (#7030)

* Pass blocktree into execute_batch, if persist_transaction_status

* Add validator arg to enable persistent transaction status store

* Pass blocktree into banking_stage, if persist_transaction_status

* Add validator params to bash scripts

* Expose actual transaction statuses outside Bank; add tests

* Fix benches

* Offload transaction status writes to a separate thread

* Enable persistent transaction status along with rpc service

* nudge

* Review comments
This commit is contained in:
Tyera Eulberg
2019-11-20 16:43:10 -07:00
committed by GitHub
parent ee6b11d36d
commit 97ca6858b7
14 changed files with 676 additions and 151 deletions

View File

@ -12,12 +12,14 @@ use crate::{
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools;
use solana_ledger::{
blocktree::Blocktree, entry::hash_transactions, leader_schedule_cache::LeaderScheduleCache,
blocktree::Blocktree,
blocktree_processor::{send_transaction_status_batch, TransactionStatusSender},
entry::hash_transactions,
leader_schedule_cache::LeaderScheduleCache,
};
use solana_measure::measure::Measure;
use solana_metrics::{inc_new_counter_debug, inc_new_counter_info, inc_new_counter_warn};
use solana_perf::cuda_runtime::PinnedVec;
use solana_perf::perf_libs;
use solana_perf::{cuda_runtime::PinnedVec, perf_libs};
use solana_runtime::{accounts_db::ErrorCounters, bank::Bank, transaction_batch::TransactionBatch};
use solana_sdk::{
clock::{
@ -73,6 +75,7 @@ impl BankingStage {
poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
transaction_status_sender: Option<TransactionStatusSender>,
) -> Self {
Self::new_num_threads(
cluster_info,
@ -80,6 +83,7 @@ impl BankingStage {
verified_receiver,
verified_vote_receiver,
Self::num_threads(),
transaction_status_sender,
)
}
@ -89,6 +93,7 @@ impl BankingStage {
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
) -> Self {
let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH);
// Single thread to generate entries from many banks.
@ -108,6 +113,7 @@ impl BankingStage {
let poh_recorder = poh_recorder.clone();
let cluster_info = cluster_info.clone();
let mut recv_start = Instant::now();
let transaction_status_sender = transaction_status_sender.clone();
Builder::new()
.name("solana-banking-stage-tx".to_string())
.spawn(move || {
@ -121,6 +127,7 @@ impl BankingStage {
enable_forwarding,
i,
batch_limit,
transaction_status_sender.clone(),
);
})
.unwrap()
@ -155,6 +162,7 @@ impl BankingStage {
poh_recorder: &Arc<Mutex<PohRecorder>>,
buffered_packets: &mut Vec<PacketsAndOffsets>,
batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>,
) -> Result<UnprocessedPackets> {
let mut unprocessed_packets = vec![];
let mut rebuffered_packets = 0;
@ -185,6 +193,7 @@ impl BankingStage {
&poh_recorder,
&msgs,
unprocessed_indexes.to_owned(),
transaction_status_sender.clone(),
);
new_tx_count += processed;
@ -277,6 +286,7 @@ impl BankingStage {
buffered_packets: &mut Vec<PacketsAndOffsets>,
enable_forwarding: bool,
batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>,
) -> Result<()> {
let (leader_at_slot_offset, poh_has_bank, would_be_leader) = {
let poh = poh_recorder.lock().unwrap();
@ -303,6 +313,7 @@ impl BankingStage {
poh_recorder,
buffered_packets,
batch_limit,
transaction_status_sender,
)?;
buffered_packets.append(&mut unprocessed);
Ok(())
@ -350,6 +361,7 @@ impl BankingStage {
enable_forwarding: bool,
id: u32,
batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>,
) {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packets = vec![];
@ -363,6 +375,7 @@ impl BankingStage {
&mut buffered_packets,
enable_forwarding,
batch_limit,
transaction_status_sender.clone(),
)
.unwrap_or_else(|_| buffered_packets.clear());
}
@ -385,6 +398,7 @@ impl BankingStage {
recv_timeout,
id,
batch_limit,
transaction_status_sender.clone(),
) {
Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout)) => (),
Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected)) => break,
@ -482,9 +496,10 @@ impl BankingStage {
}
fn process_and_record_transactions_locked(
bank: &Bank,
bank: &Arc<Bank>,
poh: &Arc<Mutex<PohRecorder>>,
batch: &TransactionBatch,
transaction_status_sender: Option<TransactionStatusSender>,
) -> (Result<usize>, Vec<usize>) {
let mut load_execute_time = Measure::start("load_execute_time");
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce
@ -512,14 +527,24 @@ impl BankingStage {
let num_to_commit = num_to_commit.unwrap();
if num_to_commit != 0 {
bank.commit_transactions(
txs,
None,
&mut loaded_accounts,
&results,
tx_count,
signature_count,
);
let transaction_statuses = bank
.commit_transactions(
txs,
None,
&mut loaded_accounts,
&results,
tx_count,
signature_count,
)
.processing_results;
if let Some(sender) = transaction_status_sender {
send_transaction_status_batch(
bank.clone(),
batch.transactions(),
transaction_statuses,
sender,
);
}
}
commit_time.stop();
@ -538,10 +563,11 @@ impl BankingStage {
}
pub fn process_and_record_transactions(
bank: &Bank,
bank: &Arc<Bank>,
txs: &[Transaction],
poh: &Arc<Mutex<PohRecorder>>,
chunk_offset: usize,
transaction_status_sender: Option<TransactionStatusSender>,
) -> (Result<usize>, Vec<usize>) {
let mut lock_time = Measure::start("lock_time");
// Once accounts are locked, other threads cannot encode transactions that will modify the
@ -549,8 +575,12 @@ impl BankingStage {
let batch = bank.prepare_batch(txs, None);
lock_time.stop();
let (result, mut retryable_txs) =
Self::process_and_record_transactions_locked(bank, poh, &batch);
let (result, mut retryable_txs) = Self::process_and_record_transactions_locked(
bank,
poh,
&batch,
transaction_status_sender,
);
retryable_txs.iter_mut().for_each(|x| *x += chunk_offset);
let mut unlock_time = Measure::start("unlock_time");
@ -574,9 +604,10 @@ impl BankingStage {
/// Returns the number of transactions successfully processed by the bank, which may be less
/// than the total number if max PoH height was reached and the bank halted
fn process_transactions(
bank: &Bank,
bank: &Arc<Bank>,
transactions: &[Transaction],
poh: &Arc<Mutex<PohRecorder>>,
transaction_status_sender: Option<TransactionStatusSender>,
) -> (usize, Vec<usize>) {
let mut chunk_start = 0;
let mut unprocessed_txs = vec![];
@ -591,6 +622,7 @@ impl BankingStage {
&transactions[chunk_start..chunk_end],
poh,
chunk_start,
transaction_status_sender.clone(),
);
trace!("process_transactions result: {:?}", result);
@ -724,6 +756,7 @@ impl BankingStage {
poh: &Arc<Mutex<PohRecorder>>,
msgs: &Packets,
packet_indexes: Vec<usize>,
transaction_status_sender: Option<TransactionStatusSender>,
) -> (usize, usize, Vec<usize>) {
let (transactions, transaction_to_packet_indexes) =
Self::transactions_from_packets(msgs, &packet_indexes);
@ -736,7 +769,7 @@ impl BankingStage {
let tx_len = transactions.len();
let (processed, unprocessed_tx_indexes) =
Self::process_transactions(bank, &transactions, poh);
Self::process_transactions(bank, &transactions, poh, transaction_status_sender);
let unprocessed_tx_count = unprocessed_tx_indexes.len();
@ -815,6 +848,7 @@ impl BankingStage {
recv_timeout: Duration,
id: u32,
batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>,
) -> Result<UnprocessedPackets> {
let mut recv_time = Measure::start("process_packets_recv");
let mms = verified_receiver.recv_timeout(recv_timeout)?;
@ -851,8 +885,13 @@ impl BankingStage {
}
let bank = bank.unwrap();
let (processed, verified_txs_len, unprocessed_indexes) =
Self::process_received_packets(&bank, &poh, &msgs, packet_indexes);
let (processed, verified_txs_len, unprocessed_indexes) = Self::process_received_packets(
&bank,
&poh,
&msgs,
packet_indexes,
transaction_status_sender.clone(),
);
new_tx_count += processed;
@ -969,20 +1008,30 @@ pub fn create_test_recorder(
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster_info::Node;
use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use crate::packet::to_packets;
use crate::poh_recorder::WorkingBank;
use crate::{
cluster_info::Node,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
packet::to_packets,
poh_recorder::WorkingBank,
transaction_status_service::TransactionStatusService,
};
use crossbeam_channel::unbounded;
use itertools::Itertools;
use solana_ledger::entry::{Entry, EntrySlice};
use solana_ledger::get_tmp_ledger_path;
use solana_sdk::instruction::InstructionError;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction;
use solana_sdk::transaction::TransactionError;
use std::sync::atomic::Ordering;
use std::thread::sleep;
use solana_ledger::{
blocktree::entries_to_test_shreds,
entry::{next_entry, Entry, EntrySlice},
get_tmp_ledger_path,
};
use solana_sdk::{
instruction::InstructionError,
signature::{Keypair, KeypairUtil},
system_transaction,
transaction::TransactionError,
};
use std::{
sync::{atomic::Ordering, mpsc::channel},
thread::sleep,
};
#[test]
fn test_banking_stage_shutdown1() {
@ -1004,6 +1053,7 @@ mod tests {
&poh_recorder,
verified_receiver,
vote_receiver,
None,
);
drop(verified_sender);
drop(vote_sender);
@ -1042,6 +1092,7 @@ mod tests {
&poh_recorder,
verified_receiver,
vote_receiver,
None,
);
trace!("sending bank");
drop(verified_sender);
@ -1103,6 +1154,7 @@ mod tests {
&poh_recorder,
verified_receiver,
vote_receiver,
None,
);
// fund another account so we can send 2 good transactions in a single batch.
@ -1244,6 +1296,7 @@ mod tests {
verified_receiver,
vote_receiver,
2,
None,
);
// wait for banking_stage to eat the packets
@ -1644,9 +1697,15 @@ mod tests {
poh_recorder.lock().unwrap().set_working_bank(working_bank);
BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder, 0)
.0
.unwrap();
BankingStage::process_and_record_transactions(
&bank,
&transactions,
&poh_recorder,
0,
None,
)
.0
.unwrap();
poh_recorder.lock().unwrap().tick();
let mut done = false;
@ -1678,7 +1737,8 @@ mod tests {
&bank,
&transactions,
&poh_recorder,
0
0,
None,
)
.0,
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
@ -1735,6 +1795,7 @@ mod tests {
&transactions,
&poh_recorder,
0,
None,
);
assert!(result.is_ok());
@ -1819,7 +1880,7 @@ mod tests {
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let (processed_transactions_count, mut retryable_txs) =
BankingStage::process_transactions(&bank, &transactions, &poh_recorder);
BankingStage::process_transactions(&bank, &transactions, &poh_recorder, None);
assert_eq!(processed_transactions_count, 0,);
@ -1830,4 +1891,101 @@ mod tests {
Blocktree::destroy(&ledger_path).unwrap();
}
#[test]
fn test_write_persist_transaction_status() {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);
let bank = Arc::new(Bank::new(&genesis_config));
let pubkey = Pubkey::new_rand();
let pubkey1 = Pubkey::new_rand();
let keypair1 = Keypair::new();
let success_tx =
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash());
let success_signature = success_tx.signatures[0];
let entry_1 = next_entry(&genesis_config.hash(), 1, vec![success_tx.clone()]);
let ix_error_tx =
system_transaction::transfer(&keypair1, &pubkey1, 10, genesis_config.hash());
let ix_error_signature = ix_error_tx.signatures[0];
let entry_2 = next_entry(&entry_1.hash, 1, vec![ix_error_tx.clone()]);
let fail_tx =
system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash());
let entry_3 = next_entry(&entry_2.hash, 1, vec![fail_tx.clone()]);
let entries = vec![entry_1, entry_2, entry_3];
let transactions = vec![success_tx, ix_error_tx, fail_tx];
bank.transfer(4, &mint_keypair, &keypair1.pubkey()).unwrap();
let working_bank = WorkingBank {
bank: bank.clone(),
min_tick_height: bank.tick_height(),
max_tick_height: bank.tick_height() + 1,
};
let ledger_path = get_tmp_ledger_path!();
{
let blocktree =
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
let blocktree = Arc::new(blocktree);
let (poh_recorder, _entry_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
Some((4, 4)),
bank.ticks_per_slot(),
&pubkey,
&blocktree,
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
poh_recorder.lock().unwrap().set_working_bank(working_bank);
let shreds = entries_to_test_shreds(entries.clone(), bank.slot(), 0, true, 0);
blocktree.insert_shreds(shreds, None, false).unwrap();
blocktree.set_roots(&[bank.slot()]).unwrap();
let (transaction_status_sender, transaction_status_receiver) = channel();
let transaction_status_service = TransactionStatusService::new(
transaction_status_receiver,
blocktree.clone(),
&Arc::new(AtomicBool::new(false)),
);
let _ = BankingStage::process_and_record_transactions(
&bank,
&transactions,
&poh_recorder,
0,
Some(transaction_status_sender),
);
transaction_status_service.join().unwrap();
let confirmed_block = blocktree.get_confirmed_block(bank.slot()).unwrap();
assert_eq!(confirmed_block.transactions.len(), 3);
for (transaction, result) in confirmed_block.transactions.into_iter() {
if transaction.signatures[0] == success_signature {
assert_eq!(result.unwrap().status, Ok(()));
} else if transaction.signatures[0] == ix_error_signature {
assert_eq!(
result.unwrap().status,
Err(TransactionError::InstructionError(
0,
InstructionError::CustomError(1)
))
);
} else {
assert_eq!(result, None);
}
}
}
Blocktree::destroy(&ledger_path).unwrap();
}
}

View File

@ -55,6 +55,7 @@ pub mod snapshot_packager_service;
pub mod storage_stage;
pub mod streamer;
pub mod tpu;
pub mod transaction_status_service;
pub mod tvu;
pub mod validator;
pub mod weighted_shuffle;

View File

@ -1,20 +1,20 @@
//! The `replay_stage` replays transactions broadcast by the leader.
use crate::cluster_info::ClusterInfo;
use crate::commitment::{
AggregateCommitmentService, BlockCommitmentCache, CommitmentAggregationData,
use crate::{
cluster_info::ClusterInfo,
commitment::{AggregateCommitmentService, BlockCommitmentCache, CommitmentAggregationData},
consensus::{StakeLockout, Tower},
poh_recorder::PohRecorder,
result::{Error, Result},
rpc_subscriptions::RpcSubscriptions,
thread_mem_usage,
};
use crate::consensus::{StakeLockout, Tower};
use crate::poh_recorder::PohRecorder;
use crate::result::{Error, Result};
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::thread_mem_usage;
use jemalloc_ctl::thread::allocatedp;
use solana_ledger::{
bank_forks::BankForks,
block_error::BlockError,
blocktree::{Blocktree, BlocktreeError},
blocktree_processor,
blocktree_processor::{self, TransactionStatusSender},
entry::{Entry, EntrySlice},
leader_schedule_cache::LeaderScheduleCache,
snapshot_package::SnapshotPackageSender,
@ -182,6 +182,7 @@ impl ReplayStage {
slot_full_senders: Vec<Sender<(u64, Pubkey)>>,
snapshot_package_sender: Option<SnapshotPackageSender>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
transaction_status_sender: Option<TransactionStatusSender>,
) -> (Self, Receiver<Vec<Arc<Bank>>>)
where
T: 'static + KeypairUtil + Send + Sync,
@ -245,6 +246,7 @@ impl ReplayStage {
&my_pubkey,
&mut progress,
&slot_full_senders,
transaction_status_sender.clone(),
);
datapoint_debug!(
"replay_stage-memory",
@ -493,6 +495,7 @@ impl ReplayStage {
bank: &Arc<Bank>,
blocktree: &Blocktree,
bank_progress: &mut ForkProgress,
transaction_status_sender: Option<TransactionStatusSender>,
) -> (Result<()>, usize) {
let mut tx_count = 0;
let now = Instant::now();
@ -514,7 +517,14 @@ impl ReplayStage {
slot_full,
);
tx_count += entries.iter().map(|e| e.transactions.len()).sum::<usize>();
Self::replay_entries_into_bank(bank, bank_progress, entries, num_shreds, slot_full)
Self::replay_entries_into_bank(
bank,
bank_progress,
entries,
num_shreds,
slot_full,
transaction_status_sender,
)
});
if Self::is_replay_result_fatal(&replay_result) {
@ -663,6 +673,7 @@ impl ReplayStage {
my_pubkey: &Pubkey,
progress: &mut HashMap<u64, ForkProgress>,
slot_full_senders: &[Sender<(u64, Pubkey)>],
transaction_status_sender: Option<TransactionStatusSender>,
) -> bool {
let mut did_complete_bank = false;
let mut tx_count = 0;
@ -685,8 +696,12 @@ impl ReplayStage {
.entry(bank.slot())
.or_insert_with(|| ForkProgress::new(bank.slot(), bank.last_blockhash()));
if bank.collector_id() != my_pubkey {
let (replay_result, replay_tx_count) =
Self::replay_blocktree_into_bank(&bank, &blocktree, bank_progress);
let (replay_result, replay_tx_count) = Self::replay_blocktree_into_bank(
&bank,
&blocktree,
bank_progress,
transaction_status_sender.clone(),
);
tx_count += replay_tx_count;
if Self::is_replay_result_fatal(&replay_result) {
trace!("replay_result_fatal slot {}", bank_slot);
@ -950,6 +965,7 @@ impl ReplayStage {
entries: Vec<Entry>,
num_shreds: usize,
slot_full: bool,
transaction_status_sender: Option<TransactionStatusSender>,
) -> Result<()> {
let result = Self::verify_and_process_entries(
&bank,
@ -957,6 +973,7 @@ impl ReplayStage {
slot_full,
bank_progress.num_shreds,
bank_progress,
transaction_status_sender,
);
bank_progress.num_shreds += num_shreds;
bank_progress.num_entries += entries.len();
@ -1008,6 +1025,7 @@ impl ReplayStage {
slot_full: bool,
shred_index: usize,
bank_progress: &mut ForkProgress,
transaction_status_sender: Option<TransactionStatusSender>,
) -> Result<()> {
let last_entry = &bank_progress.last_entry;
let tick_hash_count = &mut bank_progress.tick_hash_count;
@ -1042,7 +1060,8 @@ impl ReplayStage {
let mut entry_state = entries.start_verify(last_entry);
let mut replay_elapsed = Measure::start("replay_elapsed");
let res = blocktree_processor::process_entries(bank, entries, true);
let res =
blocktree_processor::process_entries(bank, entries, true, transaction_status_sender);
replay_elapsed.stop();
bank_progress.stats.replay_elapsed += replay_elapsed.as_us();
@ -1125,29 +1144,38 @@ impl ReplayStage {
#[cfg(test)]
mod test {
use super::*;
use crate::commitment::BlockCommitment;
use crate::genesis_utils::{create_genesis_config, create_genesis_config_with_leader};
use crate::replay_stage::ReplayStage;
use solana_ledger::blocktree::make_slot_entries;
use solana_ledger::entry;
use solana_ledger::shred::{
CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, DATA_COMPLETE_SHRED,
SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_DATA_SHRED_HEADER, SIZE_OF_DATA_SHRED_PAYLOAD,
use crate::{
commitment::BlockCommitment,
genesis_utils::{create_genesis_config, create_genesis_config_with_leader},
replay_stage::ReplayStage,
transaction_status_service::TransactionStatusService,
};
use solana_ledger::{
blocktree::make_slot_entries,
blocktree::{entries_to_test_shreds, BlocktreeError},
create_new_tmp_ledger,
entry::{self, next_entry},
get_tmp_ledger_path,
shred::{
CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, DATA_COMPLETE_SHRED,
SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_DATA_SHRED_HEADER, SIZE_OF_DATA_SHRED_PAYLOAD,
},
};
use solana_runtime::genesis_utils::GenesisConfigInfo;
use solana_sdk::hash::{hash, Hash};
use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction;
use solana_sdk::transaction::TransactionError;
use solana_sdk::{
hash::{hash, Hash},
instruction::InstructionError,
packet::PACKET_DATA_SIZE,
signature::{Keypair, KeypairUtil},
system_transaction,
transaction::TransactionError,
};
use solana_vote_program::vote_state::VoteState;
use std::fs::remove_dir_all;
use std::iter::FromIterator;
use std::sync::{Arc, RwLock};
use std::{
fs::remove_dir_all,
iter::FromIterator,
sync::{Arc, RwLock},
};
#[test]
fn test_child_slots_of_same_parent() {
@ -1429,8 +1457,12 @@ mod test {
.or_insert_with(|| ForkProgress::new(0, last_blockhash));
let shreds = shred_to_insert(&mint_keypair, bank0.clone());
blocktree.insert_shreds(shreds, None, false).unwrap();
let (res, _tx_count) =
ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut bank0_progress);
let (res, _tx_count) = ReplayStage::replay_blocktree_into_bank(
&bank0,
&blocktree,
&mut bank0_progress,
None,
);
// Check that the erroring bank was marked as dead in the progress map
assert!(progress
@ -1675,4 +1707,90 @@ mod test {
assert!(res.1.is_some());
assert_eq!(res.1.unwrap().slot(), 11);
}
#[test]
fn test_write_persist_transaction_status() {
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(1000);
let (ledger_path, blockhash) = create_new_tmp_ledger!(&genesis_config);
{
let blocktree = Blocktree::open(&ledger_path)
.expect("Expected to successfully open database ledger");
let blocktree = Arc::new(blocktree);
let keypair1 = Keypair::new();
let keypair2 = Keypair::new();
let keypair3 = Keypair::new();
let bank0 = Arc::new(Bank::new(&genesis_config));
bank0
.transfer(4, &mint_keypair, &keypair2.pubkey())
.unwrap();
let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1));
let slot = bank1.slot();
// Generate transactions for processing
// Successful transaction
let success_tx =
system_transaction::transfer(&mint_keypair, &keypair1.pubkey(), 2, blockhash);
let success_signature = success_tx.signatures[0];
let entry_1 = next_entry(&blockhash, 1, vec![success_tx]);
// Failed transaction, InstructionError
let ix_error_tx =
system_transaction::transfer(&keypair2, &keypair3.pubkey(), 10, blockhash);
let ix_error_signature = ix_error_tx.signatures[0];
let entry_2 = next_entry(&entry_1.hash, 1, vec![ix_error_tx]);
// Failed transaction
let fail_tx =
system_transaction::transfer(&mint_keypair, &keypair2.pubkey(), 2, Hash::default());
let entry_3 = next_entry(&entry_2.hash, 1, vec![fail_tx]);
let entries = vec![entry_1, entry_2, entry_3];
let shreds = entries_to_test_shreds(entries.clone(), slot, bank0.slot(), true, 0);
blocktree.insert_shreds(shreds, None, false).unwrap();
blocktree.set_roots(&[slot]).unwrap();
let (transaction_status_sender, transaction_status_receiver) = channel();
let transaction_status_service = TransactionStatusService::new(
transaction_status_receiver,
blocktree.clone(),
&Arc::new(AtomicBool::new(false)),
);
// Check that process_entries successfully writes can_commit transactions statuses, and
// that they are matched properly by get_confirmed_block
let _result = blocktree_processor::process_entries(
&bank1,
&entries,
true,
Some(transaction_status_sender),
);
transaction_status_service.join().unwrap();
let confirmed_block = blocktree.get_confirmed_block(slot).unwrap();
assert_eq!(confirmed_block.transactions.len(), 3);
for (transaction, result) in confirmed_block.transactions.into_iter() {
if transaction.signatures[0] == success_signature {
assert_eq!(result.unwrap().status, Ok(()));
} else if transaction.signatures[0] == ix_error_signature {
assert_eq!(
result.unwrap().status,
Err(TransactionError::InstructionError(
0,
InstructionError::CustomError(1)
))
);
} else {
assert_eq!(result, None);
}
}
}
Blocktree::destroy(&ledger_path).unwrap();
}
}

View File

@ -1,21 +1,27 @@
//! The `tpu` module implements the Transaction Processing Unit, a
//! multi-stage transaction processing pipeline in software.
use crate::banking_stage::BankingStage;
use crate::broadcast_stage::{BroadcastStage, BroadcastStageType};
use crate::cluster_info::ClusterInfo;
use crate::cluster_info_vote_listener::ClusterInfoVoteListener;
use crate::fetch_stage::FetchStage;
use crate::poh_recorder::{PohRecorder, WorkingBankEntry};
use crate::sigverify::TransactionSigVerifier;
use crate::sigverify_stage::{DisabledSigVerifier, SigVerifyStage};
use crate::{
banking_stage::BankingStage,
broadcast_stage::{BroadcastStage, BroadcastStageType},
cluster_info::ClusterInfo,
cluster_info_vote_listener::ClusterInfoVoteListener,
fetch_stage::FetchStage,
poh_recorder::{PohRecorder, WorkingBankEntry},
sigverify::TransactionSigVerifier,
sigverify_stage::{DisabledSigVerifier, SigVerifyStage},
};
use crossbeam_channel::unbounded;
use solana_ledger::blocktree::Blocktree;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use solana_ledger::{blocktree::Blocktree, blocktree_processor::TransactionStatusSender};
use std::{
net::UdpSocket,
sync::{
atomic::AtomicBool,
mpsc::{channel, Receiver},
Arc, Mutex, RwLock,
},
thread,
};
pub struct Tpu {
fetch_stage: FetchStage,
@ -35,6 +41,7 @@ impl Tpu {
tpu_forwards_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket,
sigverify_disabled: bool,
transaction_status_sender: Option<TransactionStatusSender>,
blocktree: &Arc<Blocktree>,
broadcast_type: &BroadcastStageType,
exit: &Arc<AtomicBool>,
@ -72,6 +79,7 @@ impl Tpu {
poh_recorder,
verified_receiver,
verified_vote_receiver,
transaction_status_sender,
);
let broadcast_stage = broadcast_type.new_broadcast_stage(

View File

@ -0,0 +1,79 @@
use crate::result::{Error, Result};
use solana_client::rpc_request::RpcTransactionStatus;
use solana_ledger::{blocktree::Blocktree, blocktree_processor::TransactionStatusBatch};
use solana_runtime::bank::Bank;
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{Receiver, RecvTimeoutError},
Arc,
},
thread::{self, Builder, JoinHandle},
time::Duration,
};
pub struct TransactionStatusService {
thread_hdl: JoinHandle<()>,
}
impl TransactionStatusService {
#[allow(clippy::new_ret_no_self)]
pub fn new(
write_transaction_status_receiver: Receiver<TransactionStatusBatch>,
blocktree: Arc<Blocktree>,
exit: &Arc<AtomicBool>,
) -> Self {
let exit = exit.clone();
let thread_hdl = Builder::new()
.name("solana-transaction-status-writer".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(e) = Self::write_transaction_status_batch(
&write_transaction_status_receiver,
&blocktree,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => info!("Error from write_transaction_statuses: {:?}", e),
}
}
})
.unwrap();
Self { thread_hdl }
}
fn write_transaction_status_batch(
write_transaction_status_receiver: &Receiver<TransactionStatusBatch>,
blocktree: &Arc<Blocktree>,
) -> Result<()> {
let TransactionStatusBatch {
bank,
transactions,
statuses,
} = write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))?;
let slot = bank.slot();
for (transaction, status) in transactions.iter().zip(statuses) {
if Bank::can_commit(&status) && !transaction.signatures.is_empty() {
let fee_calculator = bank
.get_fee_calculator(&transaction.message().recent_blockhash)
.expect("FeeCalculator must exist");
let fee = fee_calculator.calculate_fee(transaction.message());
blocktree
.write_transaction_status(
(slot, transaction.signatures[0]),
&RpcTransactionStatus { status, fee },
)
.expect("Expect database write to succeed");
}
}
Ok(())
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}

View File

@ -1,32 +1,43 @@
//! The `tvu` module implements the Transaction Validation Unit, a multi-stage transaction
//! validation pipeline in software.
use crate::blockstream_service::BlockstreamService;
use crate::cluster_info::ClusterInfo;
use crate::commitment::BlockCommitmentCache;
use crate::ledger_cleanup_service::LedgerCleanupService;
use crate::partition_cfg::PartitionCfg;
use crate::poh_recorder::PohRecorder;
use crate::replay_stage::ReplayStage;
use crate::retransmit_stage::RetransmitStage;
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::shred_fetch_stage::ShredFetchStage;
use crate::sigverify_shreds::ShredSigVerifier;
use crate::sigverify_stage::{DisabledSigVerifier, SigVerifyStage};
use crate::snapshot_packager_service::SnapshotPackagerService;
use crate::storage_stage::{StorageStage, StorageState};
use crate::{
blockstream_service::BlockstreamService,
cluster_info::ClusterInfo,
commitment::BlockCommitmentCache,
ledger_cleanup_service::LedgerCleanupService,
partition_cfg::PartitionCfg,
poh_recorder::PohRecorder,
replay_stage::ReplayStage,
retransmit_stage::RetransmitStage,
rpc_subscriptions::RpcSubscriptions,
shred_fetch_stage::ShredFetchStage,
sigverify_shreds::ShredSigVerifier,
sigverify_stage::{DisabledSigVerifier, SigVerifyStage},
snapshot_packager_service::SnapshotPackagerService,
storage_stage::{StorageStage, StorageState},
};
use crossbeam_channel::unbounded;
use solana_ledger::bank_forks::BankForks;
use solana_ledger::blocktree::{Blocktree, CompletedSlotsReceiver};
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::net::UdpSocket;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use solana_ledger::{
bank_forks::BankForks,
blocktree::{Blocktree, CompletedSlotsReceiver},
blocktree_processor::TransactionStatusSender,
};
use solana_sdk::{
pubkey::Pubkey,
signature::{Keypair, KeypairUtil},
};
use std::{
net::UdpSocket,
path::PathBuf,
sync::{
atomic::AtomicBool,
mpsc::{channel, Receiver},
Arc, Mutex, RwLock,
},
thread,
};
pub struct Tvu {
fetch_stage: ShredFetchStage,
@ -75,6 +86,7 @@ impl Tvu {
sigverify_disabled: bool,
cfg: Option<PartitionCfg>,
shred_version: u16,
transaction_status_sender: Option<TransactionStatusSender>,
) -> Self
where
T: 'static + KeypairUtil + Sync + Send,
@ -165,6 +177,7 @@ impl Tvu {
vec![blockstream_slot_sender, ledger_cleanup_slot_sender],
snapshot_package_sender,
block_commitment_cache,
transaction_status_sender,
);
let blockstream_service = if let Some(blockstream_unix_socket) = blockstream_unix_socket {
@ -297,6 +310,7 @@ pub mod tests {
false,
None,
0,
None,
);
exit.store(true, Ordering::Relaxed);
tvu.join().unwrap();

View File

@ -16,6 +16,7 @@ use crate::{
sigverify,
storage_stage::StorageState,
tpu::Tpu,
transaction_status_service::TransactionStatusService,
tvu::{Sockets, Tvu},
};
use solana_ledger::{
@ -43,7 +44,7 @@ use std::{
path::{Path, PathBuf},
process,
sync::atomic::{AtomicBool, Ordering},
sync::mpsc::Receiver,
sync::mpsc::{channel, Receiver},
sync::{Arc, Mutex, RwLock},
thread::Result,
};
@ -54,6 +55,7 @@ pub struct ValidatorConfig {
pub dev_halt_at_slot: Option<Slot>,
pub expected_genesis_hash: Option<Hash>,
pub voting_disabled: bool,
pub transaction_status_service_disabled: bool,
pub blockstream_unix_socket: Option<PathBuf>,
pub storage_slots_per_turn: u64,
pub account_paths: Option<String>,
@ -71,6 +73,7 @@ impl Default for ValidatorConfig {
dev_halt_at_slot: None,
expected_genesis_hash: None,
voting_disabled: false,
transaction_status_service_disabled: false,
blockstream_unix_socket: None,
storage_slots_per_turn: DEFAULT_SLOTS_PER_TURN,
max_ledger_slots: None,
@ -105,6 +108,7 @@ pub struct Validator {
validator_exit: Arc<RwLock<Option<ValidatorExit>>>,
rpc_service: Option<JsonRpcService>,
rpc_pubsub_service: Option<PubSubService>,
transaction_status_service: Option<TransactionStatusService>,
gossip_service: GossipService,
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_service: PohService,
@ -238,6 +242,21 @@ impl Validator {
))
};
let (transaction_status_sender, transaction_status_service) =
if rpc_service.is_some() && !config.transaction_status_service_disabled {
let (transaction_status_sender, transaction_status_receiver) = channel();
(
Some(transaction_status_sender),
Some(TransactionStatusService::new(
transaction_status_receiver,
blocktree.clone(),
&exit,
)),
)
} else {
(None, None)
};
info!(
"Starting PoH: epoch={} slot={} tick_height={} blockhash={} leader={:?}",
bank.epoch(),
@ -350,6 +369,7 @@ impl Validator {
config.dev_sigverify_disabled,
config.partition_cfg.clone(),
shred_version,
transaction_status_sender.clone(),
);
if config.dev_sigverify_disabled {
@ -364,6 +384,7 @@ impl Validator {
node.sockets.tpu_forwards,
node.sockets.broadcast,
config.dev_sigverify_disabled,
transaction_status_sender,
&blocktree,
&config.broadcast_stage_type,
&exit,
@ -376,6 +397,7 @@ impl Validator {
gossip_service,
rpc_service,
rpc_pubsub_service,
transaction_status_service,
tpu,
tvu,
poh_service,
@ -426,6 +448,9 @@ impl Validator {
if let Some(rpc_pubsub_service) = self.rpc_pubsub_service {
rpc_pubsub_service.join()?;
}
if let Some(transaction_status_service) = self.transaction_status_service {
transaction_status_service.join()?;
}
self.gossip_service.join()?;
self.tpu.join()?;
@ -529,6 +554,8 @@ pub fn new_validator_for_tests() -> (Validator, ContactInfo, Keypair, PathBuf) {
let leader_voting_keypair = Arc::new(voting_keypair);
let storage_keypair = Arc::new(Keypair::new());
let mut config = ValidatorConfig::default();
config.transaction_status_service_disabled = true;
let node = Validator::new(
node,
&node_keypair,
@ -538,7 +565,7 @@ pub fn new_validator_for_tests() -> (Validator, ContactInfo, Keypair, PathBuf) {
&storage_keypair,
None,
true,
&ValidatorConfig::default(),
&config,
);
discover_cluster(&contact_info.gossip, 1).expect("Node startup failed");
(node, contact_info, mint_keypair, ledger_path)
@ -565,6 +592,8 @@ mod tests {
let voting_keypair = Arc::new(Keypair::new());
let storage_keypair = Arc::new(Keypair::new());
let mut config = ValidatorConfig::default();
config.transaction_status_service_disabled = true;
let validator = Validator::new(
validator_node,
&Arc::new(validator_keypair),
@ -574,7 +603,7 @@ mod tests {
&storage_keypair,
Some(&leader_node.info),
true,
&ValidatorConfig::default(),
&config,
);
validator.close().unwrap();
remove_dir_all(validator_ledger_path).unwrap();
@ -597,6 +626,8 @@ mod tests {
ledger_paths.push(validator_ledger_path.clone());
let voting_keypair = Arc::new(Keypair::new());
let storage_keypair = Arc::new(Keypair::new());
let mut config = ValidatorConfig::default();
config.transaction_status_service_disabled = true;
Validator::new(
validator_node,
&Arc::new(validator_keypair),
@ -606,7 +637,7 @@ mod tests {
&storage_keypair,
Some(&leader_node.info),
true,
&ValidatorConfig::default(),
&config,
)
})
.collect();