From fb08b4151398b18f0ca8ed6e19caccc474799395 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Sat, 27 Mar 2021 05:45:00 +0000 Subject: [PATCH] Rpc: enable getConfirmedBlock and getConfirmedTransaction to return confirmed (not yet finalized) data (bp #16142) (#16159) * Rpc: enable getConfirmedBlock and getConfirmedTransaction to return confirmed (not yet finalized) data (#16142) * Add Blockstore block and tx apis that allow unrooted responses * Add TransactionStatusMessage, and send on bank freeze; also refactor TransactionStatusSender * Track highest slot with tx-status writes complete * Rename and unpub fn * Add commitment to GetConfirmed input configs * Support confirmed blocks in getConfirmedBlock * Support confirmed txs in getConfirmedTransaction * Update sigs-for-addr2 comment * Enable confirmed block in cli * Enable confirmed transaction in cli * Review comments * Rename blockstore method (cherry picked from commit 433f1ead1ce53f31c9c9b233c4c15fdab8e87260) # Conflicts: # core/src/replay_stage.rs # core/src/rpc.rs # core/src/validator.rs * Fix conflicts Co-authored-by: Tyera Eulberg Co-authored-by: Tyera Eulberg --- cli/src/cli.rs | 15 +- cli/src/cluster_query.rs | 17 +- cli/src/stake.rs | 2 +- client/src/rpc_client.rs | 20 +- client/src/rpc_config.rs | 5 + core/src/banking_stage.rs | 12 +- core/src/replay_stage.rs | 12 +- core/src/rpc.rs | 143 ++++++---- core/src/rpc_service.rs | 5 +- core/src/transaction_status_service.rs | 190 +++++++------ core/src/validator.rs | 8 +- docs/src/developing/clients/jsonrpc-api.md | 2 + ledger/src/bigtable_upload.rs | 2 +- ledger/src/blockstore.rs | 301 +++++++++++++++++---- ledger/src/blockstore_db.rs | 1 + ledger/src/blockstore_processor.rs | 86 +++--- sdk/src/commitment_config.rs | 4 + transaction-status/src/lib.rs | 13 + 18 files changed, 592 insertions(+), 246 deletions(-) diff --git a/cli/src/cli.rs b/cli/src/cli.rs index 4ce4d6e2a2..02955ae8e6 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -26,7 +26,10 @@ use solana_client::{ client_error::{ClientError, ClientErrorKind, Result as ClientResult}, nonce_utils, rpc_client::RpcClient, - rpc_config::{RpcLargestAccountsFilter, RpcSendTransactionConfig, RpcTransactionLogsFilter}, + rpc_config::{ + RpcConfirmedTransactionConfig, RpcLargestAccountsFilter, RpcSendTransactionConfig, + RpcTransactionLogsFilter, + }, rpc_response::RpcKeyedAccount, }; #[cfg(not(test))] @@ -1024,9 +1027,13 @@ fn process_confirm( let mut transaction = None; let mut get_transaction_error = None; if config.verbose { - match rpc_client - .get_confirmed_transaction(signature, UiTransactionEncoding::Base64) - { + match rpc_client.get_confirmed_transaction_with_config( + signature, + RpcConfirmedTransactionConfig { + encoding: Some(UiTransactionEncoding::Base64), + commitment: Some(CommitmentConfig::confirmed()), + }, + ) { Ok(confirmed_transaction) => { let decoded_transaction = confirmed_transaction .transaction diff --git a/cli/src/cluster_query.rs b/cli/src/cluster_query.rs index 278a244a32..c2c4e406aa 100644 --- a/cli/src/cluster_query.rs +++ b/cli/src/cluster_query.rs @@ -24,8 +24,9 @@ use solana_client::{ pubsub_client::PubsubClient, rpc_client::{GetConfirmedSignaturesForAddress2Config, RpcClient}, rpc_config::{ - RpcAccountInfoConfig, RpcLargestAccountsConfig, RpcLargestAccountsFilter, - RpcProgramAccountsConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter, + RpcAccountInfoConfig, RpcConfirmedBlockConfig, RpcLargestAccountsConfig, + RpcLargestAccountsFilter, RpcProgramAccountsConfig, RpcTransactionLogsConfig, + RpcTransactionLogsFilter, }, rpc_filter, rpc_response::SlotInfo, @@ -963,8 +964,16 @@ pub fn process_get_block( rpc_client.get_slot_with_commitment(CommitmentConfig::finalized())? }; - let encoded_confirmed_block = - rpc_client.get_confirmed_block_with_encoding(slot, UiTransactionEncoding::Base64)?; + let encoded_confirmed_block = rpc_client + .get_confirmed_block_with_config( + slot, + RpcConfirmedBlockConfig { + encoding: Some(UiTransactionEncoding::Base64), + commitment: Some(CommitmentConfig::confirmed()), + ..RpcConfirmedBlockConfig::default() + }, + )? + .into(); let cli_block = CliBlock { encoded_confirmed_block, slot, diff --git a/cli/src/stake.rs b/cli/src/stake.rs index 0aeecf9229..1540770950 100644 --- a/cli/src/stake.rs +++ b/cli/src/stake.rs @@ -1675,7 +1675,7 @@ pub(crate) fn fetch_epoch_rewards( .get(0) .ok_or_else(|| format!("Unable to fetch first confirmed block for epoch {}", epoch))?; - let first_confirmed_block = match rpc_client.get_configured_confirmed_block( + let first_confirmed_block = match rpc_client.get_confirmed_block_with_config( first_confirmed_block_in_epoch, RpcConfirmedBlockConfig::rewards_only(), ) { diff --git a/client/src/rpc_client.rs b/client/src/rpc_client.rs index 17c4dcf496..956cefe7da 100644 --- a/client/src/rpc_client.rs +++ b/client/src/rpc_client.rs @@ -4,9 +4,10 @@ use crate::{ mock_sender::{MockSender, Mocks}, rpc_config::RpcAccountInfoConfig, rpc_config::{ - RpcConfirmedBlockConfig, RpcGetConfirmedSignaturesForAddress2Config, - RpcLargestAccountsConfig, RpcProgramAccountsConfig, RpcSendTransactionConfig, - RpcSimulateTransactionConfig, RpcTokenAccountsFilter, + RpcConfirmedBlockConfig, RpcConfirmedTransactionConfig, + RpcGetConfirmedSignaturesForAddress2Config, RpcLargestAccountsConfig, + RpcProgramAccountsConfig, RpcSendTransactionConfig, RpcSimulateTransactionConfig, + RpcTokenAccountsFilter, }, rpc_request::{RpcError, RpcRequest, RpcResponseErrorData, TokenAccountsFilter}, rpc_response::*, @@ -523,7 +524,7 @@ impl RpcClient { self.send(RpcRequest::GetConfirmedBlock, json!([slot, encoding])) } - pub fn get_configured_confirmed_block( + pub fn get_confirmed_block_with_config( &self, slot: Slot, config: RpcConfirmedBlockConfig, @@ -615,6 +616,17 @@ impl RpcClient { ) } + pub fn get_confirmed_transaction_with_config( + &self, + signature: &Signature, + config: RpcConfirmedTransactionConfig, + ) -> ClientResult { + self.send( + RpcRequest::GetConfirmedTransaction, + json!([signature.to_string(), config]), + ) + } + pub fn get_block_time(&self, slot: Slot) -> ClientResult { let request = RpcRequest::GetBlockTime; let response = self.sender.send(request, json!([slot])); diff --git a/client/src/rpc_config.rs b/client/src/rpc_config.rs index f2af1beaa2..a90603de13 100644 --- a/client/src/rpc_config.rs +++ b/client/src/rpc_config.rs @@ -135,6 +135,8 @@ pub struct RpcConfirmedBlockConfig { pub encoding: Option, pub transaction_details: Option, pub rewards: Option, + #[serde(flatten)] + pub commitment: Option, } impl EncodingConfig for RpcConfirmedBlockConfig { @@ -159,12 +161,15 @@ impl RpcConfirmedBlockConfig { #[serde(rename_all = "camelCase")] pub struct RpcConfirmedTransactionConfig { pub encoding: Option, + #[serde(flatten)] + pub commitment: Option, } impl EncodingConfig for RpcConfirmedTransactionConfig { fn new_with_encoding(encoding: &Option) -> Self { Self { encoding: *encoding, + ..Self::default() } } } diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index ac3f4aa11f..6665c5b26f 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -12,10 +12,8 @@ use itertools::Itertools; use lru::LruCache; use retain_mut::RetainMut; use solana_ledger::{ - blockstore::Blockstore, - blockstore_processor::{send_transaction_status_batch, TransactionStatusSender}, - entry::hash_transactions, - leader_schedule_cache::LeaderScheduleCache, + blockstore::Blockstore, blockstore_processor::TransactionStatusSender, + entry::hash_transactions, leader_schedule_cache::LeaderScheduleCache, }; use solana_measure::{measure::Measure, thread_mem_usage}; use solana_metrics::{inc_new_counter_debug, inc_new_counter_info}; @@ -765,7 +763,7 @@ impl BankingStage { if let Some(transaction_status_sender) = transaction_status_sender { let post_balances = bank.collect_balances(batch); let post_token_balances = collect_token_balances(&bank, &batch, &mut mint_decimals); - send_transaction_status_batch( + transaction_status_sender.send_transaction_status_batch( bank.clone(), batch.transactions(), batch.iteration_order_vec(), @@ -774,7 +772,6 @@ impl BankingStage { TransactionTokenBalancesSet::new(pre_token_balances, post_token_balances), inner_instructions, transaction_logs, - transaction_status_sender, ); } } @@ -2341,6 +2338,7 @@ mod tests { let (transaction_status_sender, transaction_status_receiver) = unbounded(); let transaction_status_service = TransactionStatusService::new( transaction_status_receiver, + Arc::new(AtomicU64::default()), blockstore.clone(), &Arc::new(AtomicBool::new(false)), ); @@ -2361,7 +2359,7 @@ mod tests { transaction_status_service.join().unwrap(); - let confirmed_block = blockstore.get_confirmed_block(bank.slot(), false).unwrap(); + let confirmed_block = blockstore.get_rooted_block(bank.slot(), false).unwrap(); assert_eq!(confirmed_block.transactions.len(), 3); for TransactionWithStatusMeta { transaction, meta } in diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 6cf646bd89..854c00b296 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1400,6 +1400,9 @@ impl ReplayStage { ); did_complete_bank = true; info!("bank frozen: {}", bank.slot()); + if let Some(transaction_status_sender) = transaction_status_sender.clone() { + transaction_status_sender.send_transaction_status_freeze_message(&bank); + } bank.freeze(); heaviest_subtree_fork_choice .add_new_leaf_slot(bank.slot(), Some(bank.parent_slot())); @@ -2063,7 +2066,7 @@ pub(crate) mod tests { use std::{ fs::remove_dir_all, iter, - sync::{Arc, RwLock}, + sync::{atomic::AtomicU64, Arc, RwLock}, }; use trees::tr; @@ -2713,6 +2716,7 @@ pub(crate) mod tests { previous_slot: Slot, bank: Arc, blockstore: Arc, + max_complete_transaction_status_slot: Arc, ) -> Vec { let mint_keypair = keypairs[0]; let keypair1 = keypairs[1]; @@ -2746,12 +2750,13 @@ pub(crate) mod tests { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let transaction_status_service = TransactionStatusService::new( transaction_status_receiver, + max_complete_transaction_status_slot, blockstore, &Arc::new(AtomicBool::new(false)), ); // Check that process_entries successfully writes can_commit transactions statuses, and - // that they are matched properly by get_confirmed_block + // that they are matched properly by get_rooted_block let _result = blockstore_processor::process_entries( &bank, &entries, @@ -2798,9 +2803,10 @@ pub(crate) mod tests { bank0.slot(), bank1, blockstore.clone(), + Arc::new(AtomicU64::default()), ); - let confirmed_block = blockstore.get_confirmed_block(slot, false).unwrap(); + let confirmed_block = blockstore.get_rooted_block(slot, false).unwrap(); assert_eq!(confirmed_block.transactions.len(), 3); for TransactionWithStatusMeta { transaction, meta } in diff --git a/core/src/rpc.rs b/core/src/rpc.rs index c8bb1afc76..e4fde021a5 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -84,7 +84,7 @@ use std::{ net::SocketAddr, str::FromStr, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, mpsc::{channel, Receiver, Sender}, Arc, Mutex, RwLock, }, @@ -100,7 +100,7 @@ fn new_response(bank: &Bank, value: T) -> RpcResponse { Response { context, value } } -pub fn is_confirmed_rooted( +fn is_finalized( block_commitment_cache: &BlockCommitmentCache, bank: &Bank, blockstore: &Blockstore, @@ -145,6 +145,7 @@ pub struct JsonRpcRequestProcessor { largest_accounts_cache: Arc>, max_slots: Arc, leader_schedule_cache: Arc, + max_complete_transaction_status_slot: Arc, } impl Metadata for JsonRpcRequestProcessor {} @@ -230,6 +231,7 @@ impl JsonRpcRequestProcessor { largest_accounts_cache: Arc>, max_slots: Arc, leader_schedule_cache: Arc, + max_complete_transaction_status_slot: Arc, ) -> (Self, Receiver) { let (sender, receiver) = channel(); ( @@ -250,6 +252,7 @@ impl JsonRpcRequestProcessor { largest_accounts_cache, max_slots, leader_schedule_cache, + max_complete_transaction_status_slot, }, receiver, ) @@ -292,6 +295,7 @@ impl JsonRpcRequestProcessor { largest_accounts_cache: Arc::new(RwLock::new(LargestAccountsCache::new(30))), max_slots: Arc::new(MaxSlots::default()), leader_schedule_cache: Arc::new(LeaderScheduleCache::new_from_bank(bank)), + max_complete_transaction_status_slot: Arc::new(AtomicU64::default()), } } @@ -749,40 +753,58 @@ impl JsonRpcRequestProcessor { slot: Slot, config: Option>, ) -> Result> { - let config = config - .map(|config| config.convert_to_current()) - .unwrap_or_default(); - let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Json); - let transaction_details = config.transaction_details.unwrap_or_default(); - let show_rewards = config.rewards.unwrap_or(true); - if self.config.enable_rpc_transaction_history - && slot + if self.config.enable_rpc_transaction_history { + let config = config + .map(|config| config.convert_to_current()) + .unwrap_or_default(); + let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Json); + let transaction_details = config.transaction_details.unwrap_or_default(); + let show_rewards = config.rewards.unwrap_or(true); + let commitment = config.commitment.unwrap_or_default(); + check_is_at_least_confirmed(commitment)?; + + // Block is old enough to be finalized + if slot <= self .block_commitment_cache .read() .unwrap() .highest_confirmed_root() - { - let result = self.blockstore.get_confirmed_block(slot, true); - self.check_blockstore_root(&result, slot)?; - if result.is_err() { - if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { - let bigtable_result = self - .runtime_handle - .block_on(bigtable_ledger_storage.get_confirmed_block(slot)); - self.check_bigtable_result(&bigtable_result)?; - return Ok(bigtable_result.ok().map(|confirmed_block| { + { + let result = self.blockstore.get_rooted_block(slot, true); + self.check_blockstore_root(&result, slot)?; + if result.is_err() { + if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { + let bigtable_result = self + .runtime_handle + .block_on(bigtable_ledger_storage.get_confirmed_block(slot)); + self.check_bigtable_result(&bigtable_result)?; + return Ok(bigtable_result.ok().map(|confirmed_block| { + confirmed_block.configure(encoding, transaction_details, show_rewards) + })); + } + } + self.check_slot_cleaned_up(&result, slot)?; + return Ok(result.ok().map(|confirmed_block| { + confirmed_block.configure(encoding, transaction_details, show_rewards) + })); + } else if commitment.is_confirmed() { + // Check if block is confirmed + let confirmed_bank = self.bank(Some(CommitmentConfig::confirmed())); + if confirmed_bank.status_cache_ancestors().contains(&slot) + && slot + <= self + .max_complete_transaction_status_slot + .load(Ordering::SeqCst) + { + let result = self.blockstore.get_complete_block(slot, true); + return Ok(result.ok().map(|confirmed_block| { confirmed_block.configure(encoding, transaction_details, show_rewards) })); } } - self.check_slot_cleaned_up(&result, slot)?; - Ok(result.ok().map(|confirmed_block| { - confirmed_block.configure(encoding, transaction_details, show_rewards) - })) - } else { - Err(RpcCustomError::BlockNotAvailable { slot }.into()) } + Err(RpcCustomError::BlockNotAvailable { slot }.into()) } pub fn get_confirmed_blocks( @@ -948,7 +970,7 @@ impl JsonRpcRequestProcessor { Some(status) } else if self.config.enable_rpc_transaction_history && search_transaction_history { self.blockstore - .get_transaction_status(signature) + .get_transaction_status(signature, true) .map_err(|_| Error::internal_error())? .filter(|(slot, _status_meta)| { slot <= &self @@ -998,7 +1020,7 @@ impl JsonRpcRequestProcessor { optimistically_confirmed_bank.get_signature_status_slot(&signature); let confirmations = if r_block_commitment_cache.root() >= slot - && is_confirmed_rooted(&r_block_commitment_cache, bank, &self.blockstore, slot) + && is_finalized(&r_block_commitment_cache, bank, &self.blockstore, slot) { None } else { @@ -1026,18 +1048,30 @@ impl JsonRpcRequestProcessor { &self, signature: Signature, config: Option>, - ) -> Option { + ) -> Result> { let config = config .map(|config| config.convert_to_current()) .unwrap_or_default(); let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Json); + let commitment = config.commitment.unwrap_or_default(); + check_is_at_least_confirmed(commitment)?; + if self.config.enable_rpc_transaction_history { match self .blockstore - .get_confirmed_transaction(signature) + .get_complete_transaction(signature) .unwrap_or(None) { Some(confirmed_transaction) => { + if commitment.is_confirmed() { + let confirmed_bank = self.bank(Some(CommitmentConfig::confirmed())); + if confirmed_bank + .status_cache_ancestors() + .contains(&confirmed_transaction.slot) + { + return Ok(Some(confirmed_transaction.encode(encoding))); + } + } if confirmed_transaction.slot <= self .block_commitment_cache @@ -1045,21 +1079,21 @@ impl JsonRpcRequestProcessor { .unwrap() .highest_confirmed_root() { - return Some(confirmed_transaction.encode(encoding)); + return Ok(Some(confirmed_transaction.encode(encoding))); } } None => { if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { - return self + return Ok(self .runtime_handle .block_on(bigtable_ledger_storage.get_confirmed_transaction(&signature)) .unwrap_or(None) - .map(|confirmed| confirmed.encode(encoding)); + .map(|confirmed| confirmed.encode(encoding))); } } } } - None + Ok(None) } pub fn get_confirmed_signatures_for_address( @@ -1588,6 +1622,15 @@ fn verify_token_account_filter( } } +fn check_is_at_least_confirmed(commitment: CommitmentConfig) -> Result<()> { + if !commitment.is_at_least_confirmed() { + return Err(Error::invalid_params( + "Method does not support commitment below `confirmed`", + )); + } + Ok(()) +} + fn check_slice_and_encoding(encoding: &UiAccountEncoding, data_slice_is_some: bool) -> Result<()> { match encoding { UiAccountEncoding::JsonParsed => { @@ -2914,7 +2957,7 @@ impl RpcSol for RpcSolImpl { signature_str ); let signature = verify_signature(&signature_str)?; - Ok(meta.get_confirmed_transaction(signature, config)) + meta.get_confirmed_transaction(signature, config) } fn get_confirmed_signatures_for_address( @@ -3250,11 +3293,13 @@ pub mod tests { let keypair2 = Keypair::new(); let keypair3 = Keypair::new(); bank.transfer(4, &alice, &keypair2.pubkey()).unwrap(); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root())); let confirmed_block_signatures = create_test_transactions_and_populate_blockstore( vec![&alice, &keypair1, &keypair2, &keypair3], 0, bank.clone(), blockstore.clone(), + max_complete_transaction_status_slot.clone(), ); let mut commitment_slot0 = BlockCommitment::default(); @@ -3365,6 +3410,7 @@ pub mod tests { Arc::new(RwLock::new(LargestAccountsCache::new(30))), max_slots, Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + max_complete_transaction_status_slot, ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); @@ -4825,6 +4871,7 @@ pub mod tests { Arc::new(RwLock::new(LargestAccountsCache::new(30))), Arc::new(MaxSlots::default()), Arc::new(LeaderScheduleCache::default()), + Arc::new(AtomicU64::default()), ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); @@ -5024,6 +5071,7 @@ pub mod tests { Arc::new(RwLock::new(LargestAccountsCache::new(30))), Arc::new(MaxSlots::default()), Arc::new(LeaderScheduleCache::default()), + Arc::new(AtomicU64::default()), ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); assert_eq!(request_processor.validator_exit(), false); @@ -5060,6 +5108,7 @@ pub mod tests { Arc::new(RwLock::new(LargestAccountsCache::new(30))), Arc::new(MaxSlots::default()), Arc::new(LeaderScheduleCache::default()), + Arc::new(AtomicU64::default()), ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); assert_eq!(request_processor.validator_exit(), true); @@ -5173,6 +5222,7 @@ pub mod tests { Arc::new(RwLock::new(LargestAccountsCache::new(30))), Arc::new(MaxSlots::default()), Arc::new(LeaderScheduleCache::default()), + Arc::new(AtomicU64::default()), ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); assert_eq!( @@ -5368,6 +5418,7 @@ pub mod tests { encoding: None, transaction_details: Some(TransactionDetails::Signatures), rewards: Some(false), + commitment: None, }) ); let res = io.handle_request_sync(&req, meta.clone()); @@ -5388,6 +5439,7 @@ pub mod tests { encoding: None, transaction_details: Some(TransactionDetails::None), rewards: Some(true), + commitment: None, }) ); let res = io.handle_request_sync(&req, meta); @@ -5792,7 +5844,7 @@ pub mod tests { } #[test] - fn test_is_confirmed_rooted() { + fn test_is_finalized() { let bank = Arc::new(Bank::default()); let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); @@ -5820,25 +5872,15 @@ pub mod tests { }, ); - assert!(is_confirmed_rooted( - &block_commitment_cache, - &bank, - &blockstore, - 0 - )); - assert!(is_confirmed_rooted( - &block_commitment_cache, - &bank, - &blockstore, - 1 - )); - assert!(!is_confirmed_rooted( + assert!(is_finalized(&block_commitment_cache, &bank, &blockstore, 0)); + assert!(is_finalized(&block_commitment_cache, &bank, &blockstore, 1)); + assert!(!is_finalized( &block_commitment_cache, &bank, &blockstore, 2 )); - assert!(!is_confirmed_rooted( + assert!(!is_finalized( &block_commitment_cache, &bank, &blockstore, @@ -6463,6 +6505,7 @@ pub mod tests { Arc::new(RwLock::new(LargestAccountsCache::new(30))), Arc::new(MaxSlots::default()), Arc::new(LeaderScheduleCache::default()), + Arc::new(AtomicU64::default()), ); let mut io = MetaIoHandler::default(); diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index a875f1a7b3..b8690ec26d 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -30,7 +30,7 @@ use std::{ collections::HashSet, net::SocketAddr, path::{Path, PathBuf}, - sync::atomic::{AtomicBool, Ordering}, + sync::atomic::{AtomicBool, AtomicU64, Ordering}, sync::{mpsc::channel, Arc, Mutex, RwLock}, thread::{self, Builder, JoinHandle}, }; @@ -276,6 +276,7 @@ impl JsonRpcService { send_transaction_leader_forward_count: u64, max_slots: Arc, leader_schedule_cache: Arc, + current_transaction_status_slot: Arc, ) -> Self { info!("rpc bound to {:?}", rpc_addr); info!("rpc configuration: {:?}", config); @@ -354,6 +355,7 @@ impl JsonRpcService { largest_accounts_cache, max_slots, leader_schedule_cache, + current_transaction_status_slot, ); let leader_info = @@ -515,6 +517,7 @@ mod tests { 1, Arc::new(MaxSlots::default()), Arc::new(LeaderScheduleCache::default()), + Arc::new(AtomicU64::default()), ); let thread = rpc_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); diff --git a/core/src/transaction_status_service.rs b/core/src/transaction_status_service.rs index 1b1f07b16e..1de4db0374 100644 --- a/core/src/transaction_status_service.rs +++ b/core/src/transaction_status_service.rs @@ -1,6 +1,9 @@ use crossbeam_channel::{Receiver, RecvTimeoutError}; use itertools::izip; -use solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusBatch}; +use solana_ledger::{ + blockstore::Blockstore, + blockstore_processor::{TransactionStatusBatch, TransactionStatusMessage}, +}; use solana_runtime::{ bank::{Bank, InnerInstructionsList, NonceRollbackInfo, TransactionLogMessages}, transaction_utils::OrderedIterator, @@ -8,7 +11,7 @@ use solana_runtime::{ use solana_transaction_status::{InnerInstructions, TransactionStatusMeta}; use std::{ sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, thread::{self, Builder, JoinHandle}, @@ -22,7 +25,8 @@ pub struct TransactionStatusService { impl TransactionStatusService { #[allow(clippy::new_ret_no_self)] pub fn new( - write_transaction_status_receiver: Receiver, + write_transaction_status_receiver: Receiver, + max_complete_transaction_status_slot: Arc, blockstore: Arc, exit: &Arc, ) -> Self { @@ -35,6 +39,7 @@ impl TransactionStatusService { } if let Err(RecvTimeoutError::Disconnected) = Self::write_transaction_status_batch( &write_transaction_status_receiver, + &max_complete_transaction_status_slot, &blockstore, ) { break; @@ -45,97 +50,104 @@ impl TransactionStatusService { } fn write_transaction_status_batch( - write_transaction_status_receiver: &Receiver, + write_transaction_status_receiver: &Receiver, + max_complete_transaction_status_slot: &Arc, blockstore: &Arc, ) -> Result<(), RecvTimeoutError> { - let TransactionStatusBatch { - bank, - transactions, - iteration_order, - statuses, - balances, - token_balances, - inner_instructions, - transaction_logs, - } = write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))?; + match write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))? { + TransactionStatusMessage::Batch(TransactionStatusBatch { + bank, + transactions, + iteration_order, + statuses, + balances, + token_balances, + inner_instructions, + transaction_logs, + }) => { + let slot = bank.slot(); + let inner_instructions_iter: Box< + dyn Iterator>, + > = if let Some(inner_instructions) = inner_instructions { + Box::new(inner_instructions.into_iter()) + } else { + Box::new(std::iter::repeat_with(|| None)) + }; + let transaction_logs_iter: Box> = + if let Some(transaction_logs) = transaction_logs { + Box::new(transaction_logs.into_iter()) + } else { + Box::new(std::iter::repeat_with(Vec::new)) + }; + for ( + (_, transaction), + (status, nonce_rollback), + pre_balances, + post_balances, + pre_token_balances, + post_token_balances, + inner_instructions, + log_messages, + ) in izip!( + OrderedIterator::new(&transactions, iteration_order.as_deref()), + statuses, + balances.pre_balances, + balances.post_balances, + token_balances.pre_token_balances, + token_balances.post_token_balances, + inner_instructions_iter, + transaction_logs_iter + ) { + if Bank::can_commit(&status) && !transaction.signatures.is_empty() { + let fee_calculator = nonce_rollback + .map(|nonce_rollback| nonce_rollback.fee_calculator()) + .unwrap_or_else(|| { + bank.get_fee_calculator(&transaction.message().recent_blockhash) + }) + .expect("FeeCalculator must exist"); + let fee = fee_calculator.calculate_fee(transaction.message()); + let (writable_keys, readonly_keys) = + transaction.message.get_account_keys_by_lock_type(); - let slot = bank.slot(); - let inner_instructions_iter: Box>> = - if let Some(inner_instructions) = inner_instructions { - Box::new(inner_instructions.into_iter()) - } else { - Box::new(std::iter::repeat_with(|| None)) - }; - let transaction_logs_iter: Box> = - if let Some(transaction_logs) = transaction_logs { - Box::new(transaction_logs.into_iter()) - } else { - Box::new(std::iter::repeat_with(Vec::new)) - }; - for ( - (_, transaction), - (status, nonce_rollback), - pre_balances, - post_balances, - pre_token_balances, - post_token_balances, - inner_instructions, - log_messages, - ) in izip!( - OrderedIterator::new(&transactions, iteration_order.as_deref()), - statuses, - balances.pre_balances, - balances.post_balances, - token_balances.pre_token_balances, - token_balances.post_token_balances, - inner_instructions_iter, - transaction_logs_iter - ) { - if Bank::can_commit(&status) && !transaction.signatures.is_empty() { - let fee_calculator = nonce_rollback - .map(|nonce_rollback| nonce_rollback.fee_calculator()) - .unwrap_or_else(|| { - bank.get_fee_calculator(&transaction.message().recent_blockhash) - }) - .expect("FeeCalculator must exist"); - let fee = fee_calculator.calculate_fee(transaction.message()); - let (writable_keys, readonly_keys) = - transaction.message.get_account_keys_by_lock_type(); + let inner_instructions = inner_instructions.map(|inner_instructions| { + inner_instructions + .into_iter() + .enumerate() + .map(|(index, instructions)| InnerInstructions { + index: index as u8, + instructions, + }) + .filter(|i| !i.instructions.is_empty()) + .collect() + }); - let inner_instructions = inner_instructions.map(|inner_instructions| { - inner_instructions - .into_iter() - .enumerate() - .map(|(index, instructions)| InnerInstructions { - index: index as u8, - instructions, - }) - .filter(|i| !i.instructions.is_empty()) - .collect() - }); + let log_messages = Some(log_messages); + let pre_token_balances = Some(pre_token_balances); + let post_token_balances = Some(post_token_balances); - let log_messages = Some(log_messages); - let pre_token_balances = Some(pre_token_balances); - let post_token_balances = Some(post_token_balances); - - blockstore - .write_transaction_status( - slot, - transaction.signatures[0], - writable_keys, - readonly_keys, - TransactionStatusMeta { - status, - fee, - pre_balances, - post_balances, - inner_instructions, - log_messages, - pre_token_balances, - post_token_balances, - }, - ) - .expect("Expect database write to succeed"); + blockstore + .write_transaction_status( + slot, + transaction.signatures[0], + writable_keys, + readonly_keys, + TransactionStatusMeta { + status, + fee, + pre_balances, + post_balances, + inner_instructions, + log_messages, + pre_token_balances, + post_token_balances, + }, + ) + .expect("Expect database write to succeed"); + } + } + } + TransactionStatusMessage::Freeze(slot) => { + max_complete_transaction_status_slot.fetch_max(slot, Ordering::SeqCst); } } Ok(()) diff --git a/core/src/validator.rs b/core/src/validator.rs index fcbd860fa7..398daa3827 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -69,7 +69,7 @@ use std::{ net::SocketAddr, ops::Deref, path::{Path, PathBuf}, - sync::atomic::{AtomicBool, Ordering}, + sync::atomic::{AtomicBool, AtomicU64, Ordering}, sync::mpsc::Receiver, sync::{Arc, Mutex, RwLock}, thread::sleep, @@ -206,6 +206,7 @@ impl ValidatorExit { struct TransactionHistoryServices { transaction_status_sender: Option, transaction_status_service: Option, + max_complete_transaction_status_slot: Arc, rewards_recorder_sender: Option, rewards_recorder_service: Option, cache_block_time_sender: Option, @@ -339,6 +340,7 @@ impl Validator { TransactionHistoryServices { transaction_status_sender, transaction_status_service, + max_complete_transaction_status_slot, rewards_recorder_sender, rewards_recorder_service, cache_block_time_sender, @@ -494,6 +496,7 @@ impl Validator { config.send_transaction_leader_forward_count, max_slots.clone(), leader_schedule_cache.clone(), + max_complete_transaction_status_slot, ), pubsub_service: PubSubService::new( config.pubsub_config.clone(), @@ -1186,6 +1189,7 @@ fn initialize_rpc_transaction_history_services( exit: &Arc, enable_cpi_and_log_storage: bool, ) -> TransactionHistoryServices { + let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root())); let (transaction_status_sender, transaction_status_receiver) = unbounded(); let transaction_status_sender = Some(TransactionStatusSender { sender: transaction_status_sender, @@ -1193,6 +1197,7 @@ fn initialize_rpc_transaction_history_services( }); let transaction_status_service = Some(TransactionStatusService::new( transaction_status_receiver, + max_complete_transaction_status_slot.clone(), blockstore.clone(), exit, )); @@ -1215,6 +1220,7 @@ fn initialize_rpc_transaction_history_services( TransactionHistoryServices { transaction_status_sender, transaction_status_service, + max_complete_transaction_status_slot, rewards_recorder_sender, rewards_recorder_service, cache_block_time_sender, diff --git a/docs/src/developing/clients/jsonrpc-api.md b/docs/src/developing/clients/jsonrpc-api.md index 7a9d8b2b7a..841475d997 100644 --- a/docs/src/developing/clients/jsonrpc-api.md +++ b/docs/src/developing/clients/jsonrpc-api.md @@ -464,6 +464,7 @@ Returns identity and transaction information about a confirmed block in the ledg "jsonParsed" encoding attempts to use program-specific instruction parsers to return more human-readable and explicit data in the `transaction.message.instructions` list. If "jsonParsed" is requested but a parser cannot be found, the instruction falls back to regular JSON encoding (`accounts`, `data`, and `programIdIndex` fields). - (optional) `transactionDetails: ` - level of transaction detail to return, either "full", "signatures", or "none". If parameter not provided, the default detail level is "full". - (optional) `rewards: bool` - whether to populate the `rewards` array. If parameter not provided, the default includes rewards. + - (optional) [Commitment](jsonrpc-api.md#configuring-state-commitment); "processed" is not supported. If parameter not provided, the default is "finalized". #### Results: @@ -857,6 +858,7 @@ Returns transaction details for a confirmed transaction - `` - (optional) Configuration object containing the following optional fields: - (optional) `encoding: ` - encoding for each returned Transaction, either "json", "jsonParsed", "base58" (*slow*), "base64". If parameter not provided, the default encoding is "json". "jsonParsed" encoding attempts to use program-specific instruction parsers to return more human-readable and explicit data in the `transaction.message.instructions` list. If "jsonParsed" is requested but a parser cannot be found, the instruction falls back to regular JSON encoding (`accounts`, `data`, and `programIdIndex` fields). + - (optional) [Commitment](jsonrpc-api.md#configuring-state-commitment); "processed" is not supported. If parameter not provided, the default is "finalized". #### Results: diff --git a/ledger/src/bigtable_upload.rs b/ledger/src/bigtable_upload.rs index f5fb5519d7..fe31324f67 100644 --- a/ledger/src/bigtable_upload.rs +++ b/ledger/src/bigtable_upload.rs @@ -138,7 +138,7 @@ pub async fn upload_confirmed_blocks( break; } - let _ = match blockstore.get_confirmed_block(*slot, true) { + let _ = match blockstore.get_rooted_block(*slot, true) { Ok(confirmed_block) => sender.send((*slot, Some(confirmed_block))), Err(err) => { warn!( diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 01f6fcd6b4..383af1fb96 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1729,14 +1729,14 @@ impl Blockstore { Ok(root_iterator.next().unwrap_or_default()) } - pub fn get_confirmed_block( + pub fn get_rooted_block( &self, slot: Slot, require_previous_blockhash: bool, ) -> Result { datapoint_info!( "blockstore-rpc-api", - ("method", "get_confirmed_block".to_string(), String) + ("method", "get_rooted_block".to_string(), String) ); let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap(); // lowest_cleanup_slot is the last slot that was not cleaned up by @@ -1745,15 +1745,25 @@ impl Blockstore { return Err(BlockstoreError::SlotCleanedUp); } if self.is_root(slot) { - let slot_meta_cf = self.db.column::(); - let slot_meta = match slot_meta_cf.get(slot)? { - Some(slot_meta) => slot_meta, - None => { - info!("SlotMeta not found for rooted slot {}", slot); - return Err(BlockstoreError::SlotCleanedUp); - } - }; + return self.get_complete_block(slot, require_previous_blockhash); + } + Err(BlockstoreError::SlotNotRooted) + } + pub fn get_complete_block( + &self, + slot: Slot, + require_previous_blockhash: bool, + ) -> Result { + let slot_meta_cf = self.db.column::(); + let slot_meta = match slot_meta_cf.get(slot)? { + Some(slot_meta) => slot_meta, + None => { + info!("SlotMeta not found for slot {}", slot); + return Err(BlockstoreError::SlotUnavailable); + } + }; + if slot_meta.is_full() { let slot_entries = self.get_slot_entries(slot, 0)?; if !slot_entries.is_empty() { let slot_transaction_iterator = slot_entries @@ -1763,7 +1773,7 @@ impl Blockstore { .map(|transaction| { if let Err(err) = transaction.sanitize() { warn!( - "Blockstore::get_confirmed_block sanitize failed: {:?}, \ + "Blockstore::get_block sanitize failed: {:?}, \ slot: {:?}, \ {:?}", err, slot, transaction, @@ -1805,7 +1815,7 @@ impl Blockstore { return Ok(block); } } - Err(BlockstoreError::SlotNotRooted) + Err(BlockstoreError::SlotUnavailable) } fn map_transactions_to_statuses<'a>( @@ -1959,11 +1969,11 @@ impl Blockstore { Ok(()) } - // Returns a transaction status if it was processed in a root, as well as a loop counter for - // unit testing + // Returns a transaction status, as well as a loop counter for unit testing fn get_transaction_status_with_counter( &self, signature: Signature, + require_root: bool, ) -> Result<(Option<(Slot, TransactionStatusMeta)>, u64)> { let mut counter = 0; for transaction_status_cf_primary_index in 0..=1 { @@ -1976,29 +1986,31 @@ impl Blockstore { if i != transaction_status_cf_primary_index || sig != signature { break; } - if self.is_root(slot) { - let status = self - .transaction_status_cf - .get_protobuf_or_bincode::((i, sig, slot))? - .and_then(|status| status.try_into().ok()) - .map(|status| (slot, status)); - return Ok((status, counter)); + if require_root && !self.is_root(slot) || self.meta(slot)?.is_none() { + continue; } + let status = self + .transaction_status_cf + .get_protobuf_or_bincode::((i, sig, slot))? + .and_then(|status| status.try_into().ok()) + .map(|status| (slot, status)); + return Ok((status, counter)); } } Ok((None, counter)) } - /// Returns a transaction status if it was processed in a root + /// Returns a transaction status pub fn get_transaction_status( &self, signature: Signature, + require_root: bool, ) -> Result> { datapoint_info!( "blockstore-rpc-api", ("method", "get_transaction_status".to_string(), String) ); - self.get_transaction_status_with_counter(signature) + self.get_transaction_status_with_counter(signature, require_root) .map(|(status, _)| status) } @@ -2011,7 +2023,27 @@ impl Blockstore { "blockstore-rpc-api", ("method", "get_confirmed_transaction".to_string(), String) ); - if let Some((slot, status)) = self.get_transaction_status(signature)? { + self.get_transaction_with_status(signature, true) + } + + /// Returns a complete transaction + pub fn get_complete_transaction( + &self, + signature: Signature, + ) -> Result> { + datapoint_info!( + "blockstore-rpc-api", + ("method", "get_complete_transaction".to_string(), String) + ); + self.get_transaction_with_status(signature, false) + } + + fn get_transaction_with_status( + &self, + signature: Signature, + require_root: bool, + ) -> Result> { + if let Some((slot, status)) = self.get_transaction_status(signature, require_root)? { let transaction = self .find_transaction_in_slot(slot, signature)? .ok_or(BlockstoreError::TransactionStatusSlotMismatch)?; // Should not happen @@ -2053,8 +2085,8 @@ impl Blockstore { .find(|transaction| transaction.signatures[0] == signature)) } - // Returns all cached signatures for an address, ordered by slot that the transaction was - // processed in. Within each slot the transactions will be ordered by signature, and NOT by + // Returns all rooted signatures for an address, ordered by slot that the transaction was + // processed in. Within each slot the transactions will be ordered by signature, and NOT by // the order in which the transactions exist in the block fn find_address_signatures( &self, @@ -2129,12 +2161,12 @@ impl Blockstore { let (slot, mut before_excluded_signatures) = match before { None => (highest_confirmed_root, None), Some(before) => { - let transaction_status = self.get_transaction_status(before)?; + let transaction_status = self.get_transaction_status(before, true)?; match transaction_status { None => return Ok(vec![]), Some((slot, _)) => { let confirmed_block = - self.get_confirmed_block(slot, false).map_err(|err| { + self.get_rooted_block(slot, false).map_err(|err| { BlockstoreError::Io(IoError::new( ErrorKind::Other, format!("Unable to get confirmed block: {}", err), @@ -2180,12 +2212,12 @@ impl Blockstore { let (lowest_slot, until_excluded_signatures) = match until { None => (0, HashSet::new()), Some(until) => { - let transaction_status = self.get_transaction_status(until)?; + let transaction_status = self.get_transaction_status(until, true)?; match transaction_status { None => (0, HashSet::new()), Some((slot, _)) => { let confirmed_block = - self.get_confirmed_block(slot, false).map_err(|err| { + self.get_rooted_block(slot, false).map_err(|err| { BlockstoreError::Io(IoError::new( ErrorKind::Other, format!("Unable to get confirmed block: {}", err), @@ -2325,7 +2357,7 @@ impl Blockstore { let mut get_status_info_timer = Measure::start("get_status_info_timer"); let mut infos = vec![]; for (slot, signature) in address_signatures.into_iter() { - let transaction_status = self.get_transaction_status(signature)?; + let transaction_status = self.get_transaction_status(signature, true)?; let err = match transaction_status { None => None, Some((_slot, status)) => status.status.err(), @@ -5735,16 +5767,18 @@ pub mod tests { } #[test] - fn test_get_confirmed_block() { + fn test_get_rooted_block() { let slot = 10; let entries = make_slot_entries_with_transactions(100); let blockhash = get_last_hash(entries.iter()).unwrap(); let shreds = entries_to_test_shreds(entries.clone(), slot, slot - 1, true, 0); let more_shreds = entries_to_test_shreds(entries.clone(), slot + 1, slot, true, 0); + let unrooted_shreds = entries_to_test_shreds(entries.clone(), slot + 2, slot + 1, true, 0); let ledger_path = get_tmp_ledger_path!(); let ledger = Blockstore::open(&ledger_path).unwrap(); ledger.insert_shreds(shreds, None, false).unwrap(); ledger.insert_shreds(more_shreds, None, false).unwrap(); + ledger.insert_shreds(unrooted_shreds, None, false).unwrap(); ledger.set_roots(&[slot - 1, slot, slot + 1]).unwrap(); let parent_meta = SlotMeta { @@ -5798,6 +5832,21 @@ pub mod tests { .transaction_status_cf .put_protobuf((0, signature, slot + 1), &status) .unwrap(); + let status = TransactionStatusMeta { + status: Ok(()), + fee: 42, + pre_balances: pre_balances.clone(), + post_balances: post_balances.clone(), + inner_instructions: Some(vec![]), + log_messages: Some(vec![]), + pre_token_balances: Some(vec![]), + post_token_balances: Some(vec![]), + } + .into(); + ledger + .transaction_status_cf + .put_protobuf((0, signature, slot + 2), &status) + .unwrap(); TransactionWithStatusMeta { transaction, meta: Some(TransactionStatusMeta { @@ -5815,19 +5864,19 @@ pub mod tests { .collect(); // Even if marked as root, a slot that is empty of entries should return an error - let confirmed_block_err = ledger.get_confirmed_block(slot - 1, true).unwrap_err(); - assert_matches!(confirmed_block_err, BlockstoreError::SlotNotRooted); + let confirmed_block_err = ledger.get_rooted_block(slot - 1, true).unwrap_err(); + assert_matches!(confirmed_block_err, BlockstoreError::SlotUnavailable); // The previous_blockhash of `expected_block` is default because its parent slot is a root, // but empty of entries (eg. snapshot root slots). This now returns an error. - let confirmed_block_err = ledger.get_confirmed_block(slot, true).unwrap_err(); + let confirmed_block_err = ledger.get_rooted_block(slot, true).unwrap_err(); assert_matches!( confirmed_block_err, BlockstoreError::ParentEntriesUnavailable ); // Test if require_previous_blockhash is false - let confirmed_block = ledger.get_confirmed_block(slot, false).unwrap(); + let confirmed_block = ledger.get_rooted_block(slot, false).unwrap(); assert_eq!(confirmed_block.transactions.len(), 100); let expected_block = ConfirmedBlock { transactions: expected_transactions.clone(), @@ -5839,11 +5888,11 @@ pub mod tests { }; assert_eq!(confirmed_block, expected_block); - let confirmed_block = ledger.get_confirmed_block(slot + 1, true).unwrap(); + let confirmed_block = ledger.get_rooted_block(slot + 1, true).unwrap(); assert_eq!(confirmed_block.transactions.len(), 100); let mut expected_block = ConfirmedBlock { - transactions: expected_transactions, + transactions: expected_transactions.clone(), parent_slot: slot, blockhash: blockhash.to_string(), previous_blockhash: blockhash.to_string(), @@ -5852,17 +5901,37 @@ pub mod tests { }; assert_eq!(confirmed_block, expected_block); - let not_root = ledger.get_confirmed_block(slot + 2, true).unwrap_err(); + let not_root = ledger.get_rooted_block(slot + 2, true).unwrap_err(); assert_matches!(not_root, BlockstoreError::SlotNotRooted); + let complete_block = ledger.get_complete_block(slot + 2, true).unwrap(); + assert_eq!(complete_block.transactions.len(), 100); + + let mut expected_complete_block = ConfirmedBlock { + transactions: expected_transactions, + parent_slot: slot + 1, + blockhash: blockhash.to_string(), + previous_blockhash: blockhash.to_string(), + rewards: vec![], + block_time: None, + }; + assert_eq!(complete_block, expected_complete_block); + // Test block_time returns, if available let timestamp = 1_576_183_541; ledger.blocktime_cf.put(slot + 1, ×tamp).unwrap(); expected_block.block_time = Some(timestamp); - let confirmed_block = ledger.get_confirmed_block(slot + 1, true).unwrap(); + let confirmed_block = ledger.get_rooted_block(slot + 1, true).unwrap(); assert_eq!(confirmed_block, expected_block); + let timestamp = 1_576_183_542; + ledger.blocktime_cf.put(slot + 2, ×tamp).unwrap(); + expected_complete_block.block_time = Some(timestamp); + + let complete_block = ledger.get_complete_block(slot + 2, true).unwrap(); + assert_eq!(complete_block, expected_complete_block); + drop(ledger); Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); } @@ -6278,16 +6347,25 @@ pub mod tests { // Signature exists, root found in index 0 if let (Some((slot, _status)), counter) = blockstore - .get_transaction_status_with_counter(signature2) + .get_transaction_status_with_counter(signature2, true) .unwrap() { assert_eq!(slot, 2); assert_eq!(counter, 2); } + // Signature exists, root not required + if let (Some((slot, _status)), counter) = blockstore + .get_transaction_status_with_counter(signature2, false) + .unwrap() + { + assert_eq!(slot, 1); + assert_eq!(counter, 1); + } + // Signature exists, root found in index 1 if let (Some((slot, _status)), counter) = blockstore - .get_transaction_status_with_counter(signature4) + .get_transaction_status_with_counter(signature4, true) .unwrap() { assert_eq!(slot, 2); @@ -6296,28 +6374,55 @@ pub mod tests { // Signature exists, no root found let (status, counter) = blockstore - .get_transaction_status_with_counter(signature5) + .get_transaction_status_with_counter(signature5, true) .unwrap(); assert_eq!(status, None); assert_eq!(counter, 6); + // Signature exists, root not required + if let (Some((slot, _status)), counter) = blockstore + .get_transaction_status_with_counter(signature5, false) + .unwrap() + { + assert_eq!(slot, 0); + assert_eq!(counter, 1); + } + // Signature does not exist, smaller than existing entries let (status, counter) = blockstore - .get_transaction_status_with_counter(signature1) + .get_transaction_status_with_counter(signature1, true) + .unwrap(); + assert_eq!(status, None); + assert_eq!(counter, 2); + + let (status, counter) = blockstore + .get_transaction_status_with_counter(signature1, false) .unwrap(); assert_eq!(status, None); assert_eq!(counter, 2); // Signature does not exist, between existing entries let (status, counter) = blockstore - .get_transaction_status_with_counter(signature3) + .get_transaction_status_with_counter(signature3, true) + .unwrap(); + assert_eq!(status, None); + assert_eq!(counter, 2); + + let (status, counter) = blockstore + .get_transaction_status_with_counter(signature3, false) .unwrap(); assert_eq!(status, None); assert_eq!(counter, 2); // Signature does not exist, larger than existing entries let (status, counter) = blockstore - .get_transaction_status_with_counter(signature6) + .get_transaction_status_with_counter(signature6, true) + .unwrap(); + assert_eq!(status, None); + assert_eq!(counter, 2); + + let (status, counter) = blockstore + .get_transaction_status_with_counter(signature6, false) .unwrap(); assert_eq!(status, None); assert_eq!(counter, 2); @@ -6390,6 +6495,14 @@ pub mod tests { let signature = transaction.transaction.signatures[0]; assert_eq!( blockstore.get_confirmed_transaction(signature).unwrap(), + Some(ConfirmedTransaction { + slot, + transaction: transaction.clone(), + block_time: None + }) + ); + assert_eq!( + blockstore.get_complete_transaction(signature).unwrap(), Some(ConfirmedTransaction { slot, transaction, @@ -6406,6 +6519,102 @@ pub mod tests { blockstore.get_confirmed_transaction(signature).unwrap(), None, ); + assert_eq!( + blockstore.get_complete_transaction(signature).unwrap(), + None, + ); + } + } + + #[test] + fn test_get_complete_transaction() { + let slot = 2; + let entries = make_slot_entries_with_transactions(5); + let shreds = entries_to_test_shreds(entries.clone(), slot, slot - 1, true, 0); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&ledger_path).unwrap(); + blockstore.insert_shreds(shreds, None, false).unwrap(); + // blockstore.set_roots(&[slot - 1, slot]).unwrap(); + + let expected_transactions: Vec = entries + .iter() + .cloned() + .filter(|entry| !entry.is_tick()) + .flat_map(|entry| entry.transactions) + .map(|transaction| { + let mut pre_balances: Vec = vec![]; + let mut post_balances: Vec = vec![]; + for (i, _account_key) in transaction.message.account_keys.iter().enumerate() { + pre_balances.push(i as u64 * 10); + post_balances.push(i as u64 * 11); + } + let inner_instructions = Some(vec![InnerInstructions { + index: 0, + instructions: vec![CompiledInstruction::new(1, &(), vec![0])], + }]); + let log_messages = Some(vec![String::from("Test message\n")]); + let pre_token_balances = Some(vec![]); + let post_token_balances = Some(vec![]); + let signature = transaction.signatures[0]; + let status = TransactionStatusMeta { + status: Ok(()), + fee: 42, + pre_balances: pre_balances.clone(), + post_balances: post_balances.clone(), + inner_instructions: inner_instructions.clone(), + log_messages: log_messages.clone(), + pre_token_balances: pre_token_balances.clone(), + post_token_balances: post_token_balances.clone(), + } + .into(); + blockstore + .transaction_status_cf + .put_protobuf((0, signature, slot), &status) + .unwrap(); + TransactionWithStatusMeta { + transaction, + meta: Some(TransactionStatusMeta { + status: Ok(()), + fee: 42, + pre_balances, + post_balances, + inner_instructions, + log_messages, + pre_token_balances, + post_token_balances, + }), + } + }) + .collect(); + + for transaction in expected_transactions.clone() { + let signature = transaction.transaction.signatures[0]; + assert_eq!( + blockstore.get_complete_transaction(signature).unwrap(), + Some(ConfirmedTransaction { + slot, + transaction, + block_time: None + }) + ); + assert_eq!( + blockstore.get_confirmed_transaction(signature).unwrap(), + None + ); + } + + blockstore.run_purge(0, 2, PurgeType::PrimaryIndex).unwrap(); + *blockstore.lowest_cleanup_slot.write().unwrap() = slot; + for TransactionWithStatusMeta { transaction, .. } in expected_transactions { + let signature = transaction.signatures[0]; + assert_eq!( + blockstore.get_complete_transaction(signature).unwrap(), + None, + ); + assert_eq!( + blockstore.get_confirmed_transaction(signature).unwrap(), + None, + ); } } diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index ff014dac39..198dd40b9a 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -75,6 +75,7 @@ pub enum BlockstoreError { ProtobufEncodeError(#[from] prost::EncodeError), ProtobufDecodeError(#[from] prost::DecodeError), ParentEntriesUnavailable, + SlotUnavailable, } pub type Result = std::result::Result; diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 4919dab035..6d253c77d2 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -146,7 +146,7 @@ fn execute_batch( let token_balances = TransactionTokenBalancesSet::new(pre_token_balances, post_token_balances); - send_transaction_status_batch( + transaction_status_sender.send_transaction_status_batch( bank.clone(), batch.transactions(), batch.iteration_order_vec(), @@ -155,7 +155,6 @@ fn execute_batch( token_balances, inner_instructions, transaction_logs, - transaction_status_sender, ); } @@ -1092,6 +1091,11 @@ fn process_single_slot( Ok(()) } +pub enum TransactionStatusMessage { + Batch(TransactionStatusBatch), + Freeze(Slot), +} + pub struct TransactionStatusBatch { pub bank: Arc, pub transactions: Vec, @@ -1105,46 +1109,58 @@ pub struct TransactionStatusBatch { #[derive(Clone)] pub struct TransactionStatusSender { - pub sender: Sender, + pub sender: Sender, pub enable_cpi_and_log_storage: bool, } -pub fn send_transaction_status_batch( - bank: Arc, - transactions: &[Transaction], - iteration_order: Option>, - statuses: Vec, - balances: TransactionBalancesSet, - token_balances: TransactionTokenBalancesSet, - inner_instructions: Vec>, - transaction_logs: Vec, - transaction_status_sender: TransactionStatusSender, -) { - let slot = bank.slot(); - let (inner_instructions, transaction_logs) = - if !transaction_status_sender.enable_cpi_and_log_storage { +impl TransactionStatusSender { + pub fn send_transaction_status_batch( + &self, + bank: Arc, + transactions: &[Transaction], + iteration_order: Option>, + statuses: Vec, + balances: TransactionBalancesSet, + token_balances: TransactionTokenBalancesSet, + inner_instructions: Vec>, + transaction_logs: Vec, + ) { + let slot = bank.slot(); + let (inner_instructions, transaction_logs) = if !self.enable_cpi_and_log_storage { (None, None) } else { (Some(inner_instructions), Some(transaction_logs)) }; - if let Err(e) = transaction_status_sender - .sender - .send(TransactionStatusBatch { - bank, - transactions: transactions.to_vec(), - iteration_order, - statuses, - balances, - token_balances, - inner_instructions, - transaction_logs, - }) - { - trace!( - "Slot {} transaction_status send batch failed: {:?}", - slot, - e - ); + if let Err(e) = self + .sender + .send(TransactionStatusMessage::Batch(TransactionStatusBatch { + bank, + transactions: transactions.to_vec(), + iteration_order, + statuses, + balances, + token_balances, + inner_instructions, + transaction_logs, + })) + { + trace!( + "Slot {} transaction_status send batch failed: {:?}", + slot, + e + ); + } + } + + pub fn send_transaction_status_freeze_message(&self, bank: &Arc) { + let slot = bank.slot(); + if let Err(e) = self.sender.send(TransactionStatusMessage::Freeze(slot)) { + trace!( + "Slot {} transaction_status send freeze message failed: {:?}", + slot, + e + ); + } } } diff --git a/sdk/src/commitment_config.rs b/sdk/src/commitment_config.rs index 6fd40ff261..0551a068c0 100644 --- a/sdk/src/commitment_config.rs +++ b/sdk/src/commitment_config.rs @@ -108,6 +108,10 @@ impl CommitmentConfig { ) } + pub fn is_at_least_confirmed(&self) -> bool { + self.is_confirmed() || self.is_finalized() + } + pub fn use_deprecated_commitment(commitment: CommitmentConfig) -> Self { match commitment.commitment { CommitmentLevel::Finalized => CommitmentConfig::max(), diff --git a/transaction-status/src/lib.rs b/transaction-status/src/lib.rs index e5e2ac81b7..e855e45952 100644 --- a/transaction-status/src/lib.rs +++ b/transaction-status/src/lib.rs @@ -446,6 +446,19 @@ impl From for UiConfirmedBlock { } } +impl From for EncodedConfirmedBlock { + fn from(block: UiConfirmedBlock) -> Self { + Self { + previous_blockhash: block.previous_blockhash, + blockhash: block.blockhash, + parent_slot: block.parent_slot, + transactions: block.transactions.unwrap_or_default(), + rewards: block.rewards.unwrap_or_default(), + block_time: block.block_time, + } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum TransactionDetails {