Rpc: enable getConfirmedBlock and getConfirmedTransaction to return confirmed (not yet finalized) data (#16142)
* Add Blockstore block and tx apis that allow unrooted responses * Add TransactionStatusMessage, and send on bank freeze; also refactor TransactionStatusSender * Track highest slot with tx-status writes complete * Rename and unpub fn * Add commitment to GetConfirmed input configs * Support confirmed blocks in getConfirmedBlock * Support confirmed txs in getConfirmedTransaction * Update sigs-for-addr2 comment * Enable confirmed block in cli * Enable confirmed transaction in cli * Review comments * Rename blockstore method
This commit is contained in:
@ -12,10 +12,8 @@ use itertools::Itertools;
|
||||
use lru::LruCache;
|
||||
use retain_mut::RetainMut;
|
||||
use solana_ledger::{
|
||||
blockstore::Blockstore,
|
||||
blockstore_processor::{send_transaction_status_batch, TransactionStatusSender},
|
||||
entry::hash_transactions,
|
||||
leader_schedule_cache::LeaderScheduleCache,
|
||||
blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
|
||||
entry::hash_transactions, leader_schedule_cache::LeaderScheduleCache,
|
||||
};
|
||||
use solana_measure::{measure::Measure, thread_mem_usage};
|
||||
use solana_metrics::{inc_new_counter_debug, inc_new_counter_info};
|
||||
@ -768,7 +766,7 @@ impl BankingStage {
|
||||
if let Some(transaction_status_sender) = transaction_status_sender {
|
||||
let post_balances = bank.collect_balances(batch);
|
||||
let post_token_balances = collect_token_balances(&bank, &batch, &mut mint_decimals);
|
||||
send_transaction_status_batch(
|
||||
transaction_status_sender.send_transaction_status_batch(
|
||||
bank.clone(),
|
||||
batch.transactions(),
|
||||
batch.iteration_order_vec(),
|
||||
@ -777,7 +775,6 @@ impl BankingStage {
|
||||
TransactionTokenBalancesSet::new(pre_token_balances, post_token_balances),
|
||||
inner_instructions,
|
||||
transaction_logs,
|
||||
transaction_status_sender,
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -2406,6 +2403,7 @@ mod tests {
|
||||
let (transaction_status_sender, transaction_status_receiver) = unbounded();
|
||||
let transaction_status_service = TransactionStatusService::new(
|
||||
transaction_status_receiver,
|
||||
Arc::new(AtomicU64::default()),
|
||||
blockstore.clone(),
|
||||
&Arc::new(AtomicBool::new(false)),
|
||||
);
|
||||
@ -2426,7 +2424,7 @@ mod tests {
|
||||
|
||||
transaction_status_service.join().unwrap();
|
||||
|
||||
let confirmed_block = blockstore.get_confirmed_block(bank.slot(), false).unwrap();
|
||||
let confirmed_block = blockstore.get_rooted_block(bank.slot(), false).unwrap();
|
||||
assert_eq!(confirmed_block.transactions.len(), 3);
|
||||
|
||||
for TransactionWithStatusMeta { transaction, meta } in
|
||||
|
@ -1587,6 +1587,9 @@ impl ReplayStage {
|
||||
);
|
||||
did_complete_bank = true;
|
||||
info!("bank frozen: {}", bank.slot());
|
||||
if let Some(transaction_status_sender) = transaction_status_sender.clone() {
|
||||
transaction_status_sender.send_transaction_status_freeze_message(&bank);
|
||||
}
|
||||
bank.freeze();
|
||||
heaviest_subtree_fork_choice
|
||||
.add_new_leaf_slot(bank.slot(), Some(bank.parent_slot()));
|
||||
@ -2369,7 +2372,7 @@ pub(crate) mod tests {
|
||||
use std::{
|
||||
fs::remove_dir_all,
|
||||
iter,
|
||||
sync::{Arc, RwLock},
|
||||
sync::{atomic::AtomicU64, Arc, RwLock},
|
||||
};
|
||||
use trees::tr;
|
||||
|
||||
@ -3073,6 +3076,7 @@ pub(crate) mod tests {
|
||||
previous_slot: Slot,
|
||||
bank: Arc<Bank>,
|
||||
blockstore: Arc<Blockstore>,
|
||||
max_complete_transaction_status_slot: Arc<AtomicU64>,
|
||||
) -> Vec<Signature> {
|
||||
let mint_keypair = keypairs[0];
|
||||
let keypair1 = keypairs[1];
|
||||
@ -3106,12 +3110,13 @@ pub(crate) mod tests {
|
||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||
let transaction_status_service = TransactionStatusService::new(
|
||||
transaction_status_receiver,
|
||||
max_complete_transaction_status_slot,
|
||||
blockstore,
|
||||
&Arc::new(AtomicBool::new(false)),
|
||||
);
|
||||
|
||||
// Check that process_entries successfully writes can_commit transactions statuses, and
|
||||
// that they are matched properly by get_confirmed_block
|
||||
// that they are matched properly by get_rooted_block
|
||||
let _result = blockstore_processor::process_entries(
|
||||
&bank,
|
||||
&entries,
|
||||
@ -3158,9 +3163,10 @@ pub(crate) mod tests {
|
||||
bank0.slot(),
|
||||
bank1,
|
||||
blockstore.clone(),
|
||||
Arc::new(AtomicU64::default()),
|
||||
);
|
||||
|
||||
let confirmed_block = blockstore.get_confirmed_block(slot, false).unwrap();
|
||||
let confirmed_block = blockstore.get_rooted_block(slot, false).unwrap();
|
||||
assert_eq!(confirmed_block.transactions.len(), 3);
|
||||
|
||||
for TransactionWithStatusMeta { transaction, meta } in
|
||||
|
141
core/src/rpc.rs
141
core/src/rpc.rs
@ -84,7 +84,7 @@ use std::{
|
||||
net::SocketAddr,
|
||||
str::FromStr,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
atomic::{AtomicBool, AtomicU64, Ordering},
|
||||
mpsc::{channel, Receiver, Sender},
|
||||
Arc, Mutex, RwLock,
|
||||
},
|
||||
@ -100,7 +100,7 @@ fn new_response<T>(bank: &Bank, value: T) -> RpcResponse<T> {
|
||||
Response { context, value }
|
||||
}
|
||||
|
||||
pub fn is_confirmed_rooted(
|
||||
fn is_finalized(
|
||||
block_commitment_cache: &BlockCommitmentCache,
|
||||
bank: &Bank,
|
||||
blockstore: &Blockstore,
|
||||
@ -144,6 +144,7 @@ pub struct JsonRpcRequestProcessor {
|
||||
largest_accounts_cache: Arc<RwLock<LargestAccountsCache>>,
|
||||
max_slots: Arc<MaxSlots>,
|
||||
leader_schedule_cache: Arc<LeaderScheduleCache>,
|
||||
max_complete_transaction_status_slot: Arc<AtomicU64>,
|
||||
}
|
||||
impl Metadata for JsonRpcRequestProcessor {}
|
||||
|
||||
@ -229,6 +230,7 @@ impl JsonRpcRequestProcessor {
|
||||
largest_accounts_cache: Arc<RwLock<LargestAccountsCache>>,
|
||||
max_slots: Arc<MaxSlots>,
|
||||
leader_schedule_cache: Arc<LeaderScheduleCache>,
|
||||
max_complete_transaction_status_slot: Arc<AtomicU64>,
|
||||
) -> (Self, Receiver<TransactionInfo>) {
|
||||
let (sender, receiver) = channel();
|
||||
(
|
||||
@ -249,6 +251,7 @@ impl JsonRpcRequestProcessor {
|
||||
largest_accounts_cache,
|
||||
max_slots,
|
||||
leader_schedule_cache,
|
||||
max_complete_transaction_status_slot,
|
||||
},
|
||||
receiver,
|
||||
)
|
||||
@ -291,6 +294,7 @@ impl JsonRpcRequestProcessor {
|
||||
largest_accounts_cache: Arc::new(RwLock::new(LargestAccountsCache::new(30))),
|
||||
max_slots: Arc::new(MaxSlots::default()),
|
||||
leader_schedule_cache: Arc::new(LeaderScheduleCache::new_from_bank(bank)),
|
||||
max_complete_transaction_status_slot: Arc::new(AtomicU64::default()),
|
||||
}
|
||||
}
|
||||
|
||||
@ -729,40 +733,58 @@ impl JsonRpcRequestProcessor {
|
||||
slot: Slot,
|
||||
config: Option<RpcEncodingConfigWrapper<RpcConfirmedBlockConfig>>,
|
||||
) -> Result<Option<UiConfirmedBlock>> {
|
||||
let config = config
|
||||
.map(|config| config.convert_to_current())
|
||||
.unwrap_or_default();
|
||||
let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Json);
|
||||
let transaction_details = config.transaction_details.unwrap_or_default();
|
||||
let show_rewards = config.rewards.unwrap_or(true);
|
||||
if self.config.enable_rpc_transaction_history
|
||||
&& slot
|
||||
if self.config.enable_rpc_transaction_history {
|
||||
let config = config
|
||||
.map(|config| config.convert_to_current())
|
||||
.unwrap_or_default();
|
||||
let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Json);
|
||||
let transaction_details = config.transaction_details.unwrap_or_default();
|
||||
let show_rewards = config.rewards.unwrap_or(true);
|
||||
let commitment = config.commitment.unwrap_or_default();
|
||||
check_is_at_least_confirmed(commitment)?;
|
||||
|
||||
// Block is old enough to be finalized
|
||||
if slot
|
||||
<= self
|
||||
.block_commitment_cache
|
||||
.read()
|
||||
.unwrap()
|
||||
.highest_confirmed_root()
|
||||
{
|
||||
let result = self.blockstore.get_confirmed_block(slot, true);
|
||||
self.check_blockstore_root(&result, slot)?;
|
||||
if result.is_err() {
|
||||
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
|
||||
let bigtable_result = self
|
||||
.runtime
|
||||
.block_on(bigtable_ledger_storage.get_confirmed_block(slot));
|
||||
self.check_bigtable_result(&bigtable_result)?;
|
||||
return Ok(bigtable_result.ok().map(|confirmed_block| {
|
||||
{
|
||||
let result = self.blockstore.get_rooted_block(slot, true);
|
||||
self.check_blockstore_root(&result, slot)?;
|
||||
if result.is_err() {
|
||||
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
|
||||
let bigtable_result = self
|
||||
.runtime
|
||||
.block_on(bigtable_ledger_storage.get_confirmed_block(slot));
|
||||
self.check_bigtable_result(&bigtable_result)?;
|
||||
return Ok(bigtable_result.ok().map(|confirmed_block| {
|
||||
confirmed_block.configure(encoding, transaction_details, show_rewards)
|
||||
}));
|
||||
}
|
||||
}
|
||||
self.check_slot_cleaned_up(&result, slot)?;
|
||||
return Ok(result.ok().map(|confirmed_block| {
|
||||
confirmed_block.configure(encoding, transaction_details, show_rewards)
|
||||
}));
|
||||
} else if commitment.is_confirmed() {
|
||||
// Check if block is confirmed
|
||||
let confirmed_bank = self.bank(Some(CommitmentConfig::confirmed()));
|
||||
if confirmed_bank.status_cache_ancestors().contains(&slot)
|
||||
&& slot
|
||||
<= self
|
||||
.max_complete_transaction_status_slot
|
||||
.load(Ordering::SeqCst)
|
||||
{
|
||||
let result = self.blockstore.get_complete_block(slot, true);
|
||||
return Ok(result.ok().map(|confirmed_block| {
|
||||
confirmed_block.configure(encoding, transaction_details, show_rewards)
|
||||
}));
|
||||
}
|
||||
}
|
||||
self.check_slot_cleaned_up(&result, slot)?;
|
||||
Ok(result.ok().map(|confirmed_block| {
|
||||
confirmed_block.configure(encoding, transaction_details, show_rewards)
|
||||
}))
|
||||
} else {
|
||||
Err(RpcCustomError::BlockNotAvailable { slot }.into())
|
||||
}
|
||||
Err(RpcCustomError::BlockNotAvailable { slot }.into())
|
||||
}
|
||||
|
||||
pub fn get_confirmed_blocks(
|
||||
@ -928,7 +950,7 @@ impl JsonRpcRequestProcessor {
|
||||
Some(status)
|
||||
} else if self.config.enable_rpc_transaction_history && search_transaction_history {
|
||||
self.blockstore
|
||||
.get_transaction_status(signature)
|
||||
.get_transaction_status(signature, true)
|
||||
.map_err(|_| Error::internal_error())?
|
||||
.filter(|(slot, _status_meta)| {
|
||||
slot <= &self
|
||||
@ -978,7 +1000,7 @@ impl JsonRpcRequestProcessor {
|
||||
optimistically_confirmed_bank.get_signature_status_slot(&signature);
|
||||
|
||||
let confirmations = if r_block_commitment_cache.root() >= slot
|
||||
&& is_confirmed_rooted(&r_block_commitment_cache, bank, &self.blockstore, slot)
|
||||
&& is_finalized(&r_block_commitment_cache, bank, &self.blockstore, slot)
|
||||
{
|
||||
None
|
||||
} else {
|
||||
@ -1006,18 +1028,30 @@ impl JsonRpcRequestProcessor {
|
||||
&self,
|
||||
signature: Signature,
|
||||
config: Option<RpcEncodingConfigWrapper<RpcConfirmedTransactionConfig>>,
|
||||
) -> Option<EncodedConfirmedTransaction> {
|
||||
) -> Result<Option<EncodedConfirmedTransaction>> {
|
||||
let config = config
|
||||
.map(|config| config.convert_to_current())
|
||||
.unwrap_or_default();
|
||||
let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Json);
|
||||
let commitment = config.commitment.unwrap_or_default();
|
||||
check_is_at_least_confirmed(commitment)?;
|
||||
|
||||
if self.config.enable_rpc_transaction_history {
|
||||
match self
|
||||
.blockstore
|
||||
.get_confirmed_transaction(signature)
|
||||
.get_complete_transaction(signature)
|
||||
.unwrap_or(None)
|
||||
{
|
||||
Some(confirmed_transaction) => {
|
||||
if commitment.is_confirmed() {
|
||||
let confirmed_bank = self.bank(Some(CommitmentConfig::confirmed()));
|
||||
if confirmed_bank
|
||||
.status_cache_ancestors()
|
||||
.contains(&confirmed_transaction.slot)
|
||||
{
|
||||
return Ok(Some(confirmed_transaction.encode(encoding)));
|
||||
}
|
||||
}
|
||||
if confirmed_transaction.slot
|
||||
<= self
|
||||
.block_commitment_cache
|
||||
@ -1025,21 +1059,21 @@ impl JsonRpcRequestProcessor {
|
||||
.unwrap()
|
||||
.highest_confirmed_root()
|
||||
{
|
||||
return Some(confirmed_transaction.encode(encoding));
|
||||
return Ok(Some(confirmed_transaction.encode(encoding)));
|
||||
}
|
||||
}
|
||||
None => {
|
||||
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
|
||||
return self
|
||||
return Ok(self
|
||||
.runtime
|
||||
.block_on(bigtable_ledger_storage.get_confirmed_transaction(&signature))
|
||||
.unwrap_or(None)
|
||||
.map(|confirmed| confirmed.encode(encoding));
|
||||
.map(|confirmed| confirmed.encode(encoding)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub fn get_confirmed_signatures_for_address(
|
||||
@ -1568,6 +1602,15 @@ fn verify_token_account_filter(
|
||||
}
|
||||
}
|
||||
|
||||
fn check_is_at_least_confirmed(commitment: CommitmentConfig) -> Result<()> {
|
||||
if !commitment.is_at_least_confirmed() {
|
||||
return Err(Error::invalid_params(
|
||||
"Method does not support commitment below `confirmed`",
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn check_slice_and_encoding(encoding: &UiAccountEncoding, data_slice_is_some: bool) -> Result<()> {
|
||||
match encoding {
|
||||
UiAccountEncoding::JsonParsed => {
|
||||
@ -2917,7 +2960,7 @@ pub mod rpc_full {
|
||||
signature_str
|
||||
);
|
||||
let signature = verify_signature(&signature_str)?;
|
||||
Ok(meta.get_confirmed_transaction(signature, config))
|
||||
meta.get_confirmed_transaction(signature, config)
|
||||
}
|
||||
|
||||
fn get_confirmed_signatures_for_address(
|
||||
@ -3253,11 +3296,13 @@ pub mod tests {
|
||||
let keypair2 = Keypair::new();
|
||||
let keypair3 = Keypair::new();
|
||||
bank.transfer(4, &alice, &keypair2.pubkey()).unwrap();
|
||||
let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
|
||||
let confirmed_block_signatures = create_test_transactions_and_populate_blockstore(
|
||||
vec![&alice, &keypair1, &keypair2, &keypair3],
|
||||
0,
|
||||
bank.clone(),
|
||||
blockstore.clone(),
|
||||
max_complete_transaction_status_slot.clone(),
|
||||
);
|
||||
|
||||
let mut commitment_slot0 = BlockCommitment::default();
|
||||
@ -3371,6 +3416,7 @@ pub mod tests {
|
||||
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
|
||||
max_slots,
|
||||
Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
max_complete_transaction_status_slot,
|
||||
);
|
||||
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
|
||||
|
||||
@ -4837,6 +4883,7 @@ pub mod tests {
|
||||
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
|
||||
Arc::new(MaxSlots::default()),
|
||||
Arc::new(LeaderScheduleCache::default()),
|
||||
Arc::new(AtomicU64::default()),
|
||||
);
|
||||
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
|
||||
|
||||
@ -5113,6 +5160,7 @@ pub mod tests {
|
||||
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
|
||||
Arc::new(MaxSlots::default()),
|
||||
Arc::new(LeaderScheduleCache::default()),
|
||||
Arc::new(AtomicU64::default()),
|
||||
);
|
||||
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
|
||||
assert_eq!(
|
||||
@ -5308,6 +5356,7 @@ pub mod tests {
|
||||
encoding: None,
|
||||
transaction_details: Some(TransactionDetails::Signatures),
|
||||
rewards: Some(false),
|
||||
commitment: None,
|
||||
})
|
||||
);
|
||||
let res = io.handle_request_sync(&req, meta.clone());
|
||||
@ -5328,6 +5377,7 @@ pub mod tests {
|
||||
encoding: None,
|
||||
transaction_details: Some(TransactionDetails::None),
|
||||
rewards: Some(true),
|
||||
commitment: None,
|
||||
})
|
||||
);
|
||||
let res = io.handle_request_sync(&req, meta);
|
||||
@ -5732,7 +5782,7 @@ pub mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_is_confirmed_rooted() {
|
||||
fn test_is_finalized() {
|
||||
let bank = Arc::new(Bank::default());
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
||||
@ -5760,25 +5810,15 @@ pub mod tests {
|
||||
},
|
||||
);
|
||||
|
||||
assert!(is_confirmed_rooted(
|
||||
&block_commitment_cache,
|
||||
&bank,
|
||||
&blockstore,
|
||||
0
|
||||
));
|
||||
assert!(is_confirmed_rooted(
|
||||
&block_commitment_cache,
|
||||
&bank,
|
||||
&blockstore,
|
||||
1
|
||||
));
|
||||
assert!(!is_confirmed_rooted(
|
||||
assert!(is_finalized(&block_commitment_cache, &bank, &blockstore, 0));
|
||||
assert!(is_finalized(&block_commitment_cache, &bank, &blockstore, 1));
|
||||
assert!(!is_finalized(
|
||||
&block_commitment_cache,
|
||||
&bank,
|
||||
&blockstore,
|
||||
2
|
||||
));
|
||||
assert!(!is_confirmed_rooted(
|
||||
assert!(!is_finalized(
|
||||
&block_commitment_cache,
|
||||
&bank,
|
||||
&blockstore,
|
||||
@ -6403,6 +6443,7 @@ pub mod tests {
|
||||
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
|
||||
Arc::new(MaxSlots::default()),
|
||||
Arc::new(LeaderScheduleCache::default()),
|
||||
Arc::new(AtomicU64::default()),
|
||||
);
|
||||
|
||||
let mut io = MetaIoHandler::default();
|
||||
|
@ -30,7 +30,7 @@ use std::{
|
||||
collections::HashSet,
|
||||
net::SocketAddr,
|
||||
path::{Path, PathBuf},
|
||||
sync::atomic::{AtomicBool, Ordering},
|
||||
sync::atomic::{AtomicBool, AtomicU64, Ordering},
|
||||
sync::{mpsc::channel, Arc, Mutex, RwLock},
|
||||
thread::{self, Builder, JoinHandle},
|
||||
};
|
||||
@ -276,6 +276,7 @@ impl JsonRpcService {
|
||||
send_transaction_leader_forward_count: u64,
|
||||
max_slots: Arc<MaxSlots>,
|
||||
leader_schedule_cache: Arc<LeaderScheduleCache>,
|
||||
current_transaction_status_slot: Arc<AtomicU64>,
|
||||
) -> Self {
|
||||
info!("rpc bound to {:?}", rpc_addr);
|
||||
info!("rpc configuration: {:?}", config);
|
||||
@ -356,6 +357,7 @@ impl JsonRpcService {
|
||||
largest_accounts_cache,
|
||||
max_slots,
|
||||
leader_schedule_cache,
|
||||
current_transaction_status_slot,
|
||||
);
|
||||
|
||||
let leader_info =
|
||||
@ -521,6 +523,7 @@ mod tests {
|
||||
1,
|
||||
Arc::new(MaxSlots::default()),
|
||||
Arc::new(LeaderScheduleCache::default()),
|
||||
Arc::new(AtomicU64::default()),
|
||||
);
|
||||
let thread = rpc_service.thread_hdl.thread();
|
||||
assert_eq!(thread.name().unwrap(), "solana-jsonrpc");
|
||||
|
@ -1,6 +1,9 @@
|
||||
use crossbeam_channel::{Receiver, RecvTimeoutError};
|
||||
use itertools::izip;
|
||||
use solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusBatch};
|
||||
use solana_ledger::{
|
||||
blockstore::Blockstore,
|
||||
blockstore_processor::{TransactionStatusBatch, TransactionStatusMessage},
|
||||
};
|
||||
use solana_runtime::{
|
||||
bank::{Bank, InnerInstructionsList, NonceRollbackInfo, TransactionLogMessages},
|
||||
transaction_utils::OrderedIterator,
|
||||
@ -8,7 +11,7 @@ use solana_runtime::{
|
||||
use solana_transaction_status::{InnerInstructions, TransactionStatusMeta};
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
atomic::{AtomicBool, AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
thread::{self, Builder, JoinHandle},
|
||||
@ -22,7 +25,8 @@ pub struct TransactionStatusService {
|
||||
impl TransactionStatusService {
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub fn new(
|
||||
write_transaction_status_receiver: Receiver<TransactionStatusBatch>,
|
||||
write_transaction_status_receiver: Receiver<TransactionStatusMessage>,
|
||||
max_complete_transaction_status_slot: Arc<AtomicU64>,
|
||||
blockstore: Arc<Blockstore>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
) -> Self {
|
||||
@ -35,6 +39,7 @@ impl TransactionStatusService {
|
||||
}
|
||||
if let Err(RecvTimeoutError::Disconnected) = Self::write_transaction_status_batch(
|
||||
&write_transaction_status_receiver,
|
||||
&max_complete_transaction_status_slot,
|
||||
&blockstore,
|
||||
) {
|
||||
break;
|
||||
@ -45,97 +50,104 @@ impl TransactionStatusService {
|
||||
}
|
||||
|
||||
fn write_transaction_status_batch(
|
||||
write_transaction_status_receiver: &Receiver<TransactionStatusBatch>,
|
||||
write_transaction_status_receiver: &Receiver<TransactionStatusMessage>,
|
||||
max_complete_transaction_status_slot: &Arc<AtomicU64>,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
) -> Result<(), RecvTimeoutError> {
|
||||
let TransactionStatusBatch {
|
||||
bank,
|
||||
transactions,
|
||||
iteration_order,
|
||||
statuses,
|
||||
balances,
|
||||
token_balances,
|
||||
inner_instructions,
|
||||
transaction_logs,
|
||||
} = write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))?;
|
||||
match write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))? {
|
||||
TransactionStatusMessage::Batch(TransactionStatusBatch {
|
||||
bank,
|
||||
transactions,
|
||||
iteration_order,
|
||||
statuses,
|
||||
balances,
|
||||
token_balances,
|
||||
inner_instructions,
|
||||
transaction_logs,
|
||||
}) => {
|
||||
let slot = bank.slot();
|
||||
let inner_instructions_iter: Box<
|
||||
dyn Iterator<Item = Option<InnerInstructionsList>>,
|
||||
> = if let Some(inner_instructions) = inner_instructions {
|
||||
Box::new(inner_instructions.into_iter())
|
||||
} else {
|
||||
Box::new(std::iter::repeat_with(|| None))
|
||||
};
|
||||
let transaction_logs_iter: Box<dyn Iterator<Item = TransactionLogMessages>> =
|
||||
if let Some(transaction_logs) = transaction_logs {
|
||||
Box::new(transaction_logs.into_iter())
|
||||
} else {
|
||||
Box::new(std::iter::repeat_with(Vec::new))
|
||||
};
|
||||
for (
|
||||
(_, transaction),
|
||||
(status, nonce_rollback),
|
||||
pre_balances,
|
||||
post_balances,
|
||||
pre_token_balances,
|
||||
post_token_balances,
|
||||
inner_instructions,
|
||||
log_messages,
|
||||
) in izip!(
|
||||
OrderedIterator::new(&transactions, iteration_order.as_deref()),
|
||||
statuses,
|
||||
balances.pre_balances,
|
||||
balances.post_balances,
|
||||
token_balances.pre_token_balances,
|
||||
token_balances.post_token_balances,
|
||||
inner_instructions_iter,
|
||||
transaction_logs_iter
|
||||
) {
|
||||
if Bank::can_commit(&status) && !transaction.signatures.is_empty() {
|
||||
let fee_calculator = nonce_rollback
|
||||
.map(|nonce_rollback| nonce_rollback.fee_calculator())
|
||||
.unwrap_or_else(|| {
|
||||
bank.get_fee_calculator(&transaction.message().recent_blockhash)
|
||||
})
|
||||
.expect("FeeCalculator must exist");
|
||||
let fee = fee_calculator.calculate_fee(transaction.message());
|
||||
let (writable_keys, readonly_keys) =
|
||||
transaction.message.get_account_keys_by_lock_type();
|
||||
|
||||
let slot = bank.slot();
|
||||
let inner_instructions_iter: Box<dyn Iterator<Item = Option<InnerInstructionsList>>> =
|
||||
if let Some(inner_instructions) = inner_instructions {
|
||||
Box::new(inner_instructions.into_iter())
|
||||
} else {
|
||||
Box::new(std::iter::repeat_with(|| None))
|
||||
};
|
||||
let transaction_logs_iter: Box<dyn Iterator<Item = TransactionLogMessages>> =
|
||||
if let Some(transaction_logs) = transaction_logs {
|
||||
Box::new(transaction_logs.into_iter())
|
||||
} else {
|
||||
Box::new(std::iter::repeat_with(Vec::new))
|
||||
};
|
||||
for (
|
||||
(_, transaction),
|
||||
(status, nonce_rollback),
|
||||
pre_balances,
|
||||
post_balances,
|
||||
pre_token_balances,
|
||||
post_token_balances,
|
||||
inner_instructions,
|
||||
log_messages,
|
||||
) in izip!(
|
||||
OrderedIterator::new(&transactions, iteration_order.as_deref()),
|
||||
statuses,
|
||||
balances.pre_balances,
|
||||
balances.post_balances,
|
||||
token_balances.pre_token_balances,
|
||||
token_balances.post_token_balances,
|
||||
inner_instructions_iter,
|
||||
transaction_logs_iter
|
||||
) {
|
||||
if Bank::can_commit(&status) && !transaction.signatures.is_empty() {
|
||||
let fee_calculator = nonce_rollback
|
||||
.map(|nonce_rollback| nonce_rollback.fee_calculator())
|
||||
.unwrap_or_else(|| {
|
||||
bank.get_fee_calculator(&transaction.message().recent_blockhash)
|
||||
})
|
||||
.expect("FeeCalculator must exist");
|
||||
let fee = fee_calculator.calculate_fee(transaction.message());
|
||||
let (writable_keys, readonly_keys) =
|
||||
transaction.message.get_account_keys_by_lock_type();
|
||||
let inner_instructions = inner_instructions.map(|inner_instructions| {
|
||||
inner_instructions
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(index, instructions)| InnerInstructions {
|
||||
index: index as u8,
|
||||
instructions,
|
||||
})
|
||||
.filter(|i| !i.instructions.is_empty())
|
||||
.collect()
|
||||
});
|
||||
|
||||
let inner_instructions = inner_instructions.map(|inner_instructions| {
|
||||
inner_instructions
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(index, instructions)| InnerInstructions {
|
||||
index: index as u8,
|
||||
instructions,
|
||||
})
|
||||
.filter(|i| !i.instructions.is_empty())
|
||||
.collect()
|
||||
});
|
||||
let log_messages = Some(log_messages);
|
||||
let pre_token_balances = Some(pre_token_balances);
|
||||
let post_token_balances = Some(post_token_balances);
|
||||
|
||||
let log_messages = Some(log_messages);
|
||||
let pre_token_balances = Some(pre_token_balances);
|
||||
let post_token_balances = Some(post_token_balances);
|
||||
|
||||
blockstore
|
||||
.write_transaction_status(
|
||||
slot,
|
||||
transaction.signatures[0],
|
||||
writable_keys,
|
||||
readonly_keys,
|
||||
TransactionStatusMeta {
|
||||
status,
|
||||
fee,
|
||||
pre_balances,
|
||||
post_balances,
|
||||
inner_instructions,
|
||||
log_messages,
|
||||
pre_token_balances,
|
||||
post_token_balances,
|
||||
},
|
||||
)
|
||||
.expect("Expect database write to succeed");
|
||||
blockstore
|
||||
.write_transaction_status(
|
||||
slot,
|
||||
transaction.signatures[0],
|
||||
writable_keys,
|
||||
readonly_keys,
|
||||
TransactionStatusMeta {
|
||||
status,
|
||||
fee,
|
||||
pre_balances,
|
||||
post_balances,
|
||||
inner_instructions,
|
||||
log_messages,
|
||||
pre_token_balances,
|
||||
post_token_balances,
|
||||
},
|
||||
)
|
||||
.expect("Expect database write to succeed");
|
||||
}
|
||||
}
|
||||
}
|
||||
TransactionStatusMessage::Freeze(slot) => {
|
||||
max_complete_transaction_status_slot.fetch_max(slot, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
@ -70,7 +70,7 @@ use std::{
|
||||
net::SocketAddr,
|
||||
ops::Deref,
|
||||
path::{Path, PathBuf},
|
||||
sync::atomic::{AtomicBool, Ordering},
|
||||
sync::atomic::{AtomicBool, AtomicU64, Ordering},
|
||||
sync::mpsc::Receiver,
|
||||
sync::{Arc, Mutex, RwLock},
|
||||
thread::sleep,
|
||||
@ -249,6 +249,7 @@ impl fmt::Debug for ValidatorExit {
|
||||
struct TransactionHistoryServices {
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
transaction_status_service: Option<TransactionStatusService>,
|
||||
max_complete_transaction_status_slot: Arc<AtomicU64>,
|
||||
rewards_recorder_sender: Option<RewardsRecorderSender>,
|
||||
rewards_recorder_service: Option<RewardsRecorderService>,
|
||||
cache_block_time_sender: Option<CacheBlockTimeSender>,
|
||||
@ -384,6 +385,7 @@ impl Validator {
|
||||
TransactionHistoryServices {
|
||||
transaction_status_sender,
|
||||
transaction_status_service,
|
||||
max_complete_transaction_status_slot,
|
||||
rewards_recorder_sender,
|
||||
rewards_recorder_service,
|
||||
cache_block_time_sender,
|
||||
@ -545,6 +547,7 @@ impl Validator {
|
||||
config.send_transaction_leader_forward_count,
|
||||
max_slots.clone(),
|
||||
leader_schedule_cache.clone(),
|
||||
max_complete_transaction_status_slot,
|
||||
)),
|
||||
if config.rpc_config.minimal_api {
|
||||
None
|
||||
@ -1264,6 +1267,7 @@ fn initialize_rpc_transaction_history_services(
|
||||
exit: &Arc<AtomicBool>,
|
||||
enable_cpi_and_log_storage: bool,
|
||||
) -> TransactionHistoryServices {
|
||||
let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
|
||||
let (transaction_status_sender, transaction_status_receiver) = unbounded();
|
||||
let transaction_status_sender = Some(TransactionStatusSender {
|
||||
sender: transaction_status_sender,
|
||||
@ -1271,6 +1275,7 @@ fn initialize_rpc_transaction_history_services(
|
||||
});
|
||||
let transaction_status_service = Some(TransactionStatusService::new(
|
||||
transaction_status_receiver,
|
||||
max_complete_transaction_status_slot.clone(),
|
||||
blockstore.clone(),
|
||||
exit,
|
||||
));
|
||||
@ -1293,6 +1298,7 @@ fn initialize_rpc_transaction_history_services(
|
||||
TransactionHistoryServices {
|
||||
transaction_status_sender,
|
||||
transaction_status_service,
|
||||
max_complete_transaction_status_slot,
|
||||
rewards_recorder_sender,
|
||||
rewards_recorder_service,
|
||||
cache_block_time_sender,
|
||||
|
Reference in New Issue
Block a user