Store address table lookups in blockstore and bigtable (#22402)

This commit is contained in:
Justin Starry
2022-01-14 15:24:41 +08:00
committed by GitHub
parent 4c577d7f8c
commit f804ccdece
28 changed files with 836 additions and 199 deletions

View File

@ -42,9 +42,9 @@ use {
},
solana_storage_proto::{StoredExtendedRewards, StoredTransactionStatusMeta},
solana_transaction_status::{
ConfirmedBlock, ConfirmedTransactionStatusWithSignature,
ConfirmedTransactionWithStatusMeta, Rewards, TransactionStatusMeta,
TransactionWithStatusMeta,
ConfirmedTransactionStatusWithSignature, Rewards, TransactionStatusMeta,
VersionedConfirmedBlock, VersionedConfirmedTransactionWithStatusMeta,
VersionedTransactionWithStatusMeta,
},
std::{
borrow::Cow,
@ -1923,7 +1923,7 @@ impl Blockstore {
&self,
slot: Slot,
require_previous_blockhash: bool,
) -> Result<ConfirmedBlock> {
) -> Result<VersionedConfirmedBlock> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_rooted_block".to_string(), String)
@ -1940,7 +1940,7 @@ impl Blockstore {
&self,
slot: Slot,
require_previous_blockhash: bool,
) -> Result<ConfirmedBlock> {
) -> Result<VersionedConfirmedBlock> {
let slot_meta_cf = self.db.column::<cf::SlotMeta>();
let slot_meta = match slot_meta_cf.get(slot)? {
Some(slot_meta) => slot_meta,
@ -1998,7 +1998,7 @@ impl Blockstore {
let block_time = self.blocktime_cf.get(slot)?;
let block_height = self.block_height_cf.get(slot)?;
let block = ConfirmedBlock {
let block = VersionedConfirmedBlock {
previous_blockhash: previous_blockhash.to_string(),
blockhash: blockhash.to_string(),
// If the slot is full it should have parent_slot populated
@ -2020,22 +2020,17 @@ impl Blockstore {
&self,
slot: Slot,
iterator: impl Iterator<Item = VersionedTransaction>,
) -> Result<Vec<TransactionWithStatusMeta>> {
) -> Result<Vec<VersionedTransactionWithStatusMeta>> {
iterator
.map(|versioned_tx| {
// TODO: add support for versioned transactions
if let Some(transaction) = versioned_tx.into_legacy_transaction() {
let signature = transaction.signatures[0];
Ok(TransactionWithStatusMeta {
transaction,
meta: self
.read_transaction_status((signature, slot))
.ok()
.flatten(),
})
} else {
Err(BlockstoreError::UnsupportedTransactionVersion)
}
.map(|transaction| {
let signature = transaction.signatures[0];
Ok(VersionedTransactionWithStatusMeta {
transaction,
meta: self
.read_transaction_status((signature, slot))
.ok()
.flatten(),
})
})
.collect()
}
@ -2287,7 +2282,7 @@ impl Blockstore {
pub fn get_rooted_transaction(
&self,
signature: Signature,
) -> Result<Option<ConfirmedTransactionWithStatusMeta>> {
) -> Result<Option<VersionedConfirmedTransactionWithStatusMeta>> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_rooted_transaction".to_string(), String)
@ -2300,7 +2295,7 @@ impl Blockstore {
&self,
signature: Signature,
highest_confirmed_slot: Slot,
) -> Result<Option<ConfirmedTransactionWithStatusMeta>> {
) -> Result<Option<VersionedConfirmedTransactionWithStatusMeta>> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_complete_transaction".to_string(), String)
@ -2317,7 +2312,7 @@ impl Blockstore {
&self,
signature: Signature,
confirmed_unrooted_slots: &[Slot],
) -> Result<Option<ConfirmedTransactionWithStatusMeta>> {
) -> Result<Option<VersionedConfirmedTransactionWithStatusMeta>> {
if let Some((slot, status)) =
self.get_transaction_status(signature, confirmed_unrooted_slots)?
{
@ -2325,15 +2320,10 @@ impl Blockstore {
.find_transaction_in_slot(slot, signature)?
.ok_or(BlockstoreError::TransactionStatusSlotMismatch)?; // Should not happen
// TODO: support retrieving versioned transactions
let transaction = transaction
.into_legacy_transaction()
.ok_or(BlockstoreError::UnsupportedTransactionVersion)?;
let block_time = self.get_block_time(slot)?;
Ok(Some(ConfirmedTransactionWithStatusMeta {
Ok(Some(VersionedConfirmedTransactionWithStatusMeta {
slot,
transaction: TransactionWithStatusMeta {
tx_with_meta: VersionedTransactionWithStatusMeta {
transaction,
meta: Some(status),
},
@ -4146,6 +4136,7 @@ pub mod tests {
solana_sdk::{
hash::{self, hash, Hash},
instruction::CompiledInstruction,
message::v0::LoadedAddresses,
packet::PACKET_DATA_SIZE,
pubkey::Pubkey,
signature::Signature,
@ -6237,20 +6228,15 @@ pub mod tests {
.put_meta_bytes(slot - 1, &serialize(&parent_meta).unwrap())
.unwrap();
let expected_transactions: Vec<TransactionWithStatusMeta> = entries
let expected_transactions: Vec<VersionedTransactionWithStatusMeta> = entries
.iter()
.cloned()
.filter(|entry| !entry.is_tick())
.flat_map(|entry| entry.transactions)
.map(|transaction| {
transaction
.into_legacy_transaction()
.expect("versioned transactions not supported")
})
.map(|transaction| {
let mut pre_balances: Vec<u64> = vec![];
let mut post_balances: Vec<u64> = vec![];
for (i, _account_key) in transaction.message.account_keys.iter().enumerate() {
for i in 0..transaction.message.total_account_keys_len() {
pre_balances.push(i as u64 * 10);
post_balances.push(i as u64 * 11);
}
@ -6265,6 +6251,7 @@ pub mod tests {
pre_token_balances: Some(vec![]),
post_token_balances: Some(vec![]),
rewards: Some(vec![]),
loaded_addresses: LoadedAddresses::default(),
}
.into();
blockstore
@ -6281,6 +6268,7 @@ pub mod tests {
pre_token_balances: Some(vec![]),
post_token_balances: Some(vec![]),
rewards: Some(vec![]),
loaded_addresses: LoadedAddresses::default(),
}
.into();
blockstore
@ -6297,13 +6285,14 @@ pub mod tests {
pre_token_balances: Some(vec![]),
post_token_balances: Some(vec![]),
rewards: Some(vec![]),
loaded_addresses: LoadedAddresses::default(),
}
.into();
blockstore
.transaction_status_cf
.put_protobuf((0, signature, slot + 2), &status)
.unwrap();
TransactionWithStatusMeta {
VersionedTransactionWithStatusMeta {
transaction,
meta: Some(TransactionStatusMeta {
status: Ok(()),
@ -6315,6 +6304,7 @@ pub mod tests {
pre_token_balances: Some(vec![]),
post_token_balances: Some(vec![]),
rewards: Some(vec![]),
loaded_addresses: LoadedAddresses::default(),
}),
}
})
@ -6335,7 +6325,7 @@ pub mod tests {
// Test if require_previous_blockhash is false
let confirmed_block = blockstore.get_rooted_block(slot, false).unwrap();
assert_eq!(confirmed_block.transactions.len(), 100);
let expected_block = ConfirmedBlock {
let expected_block = VersionedConfirmedBlock {
transactions: expected_transactions.clone(),
parent_slot: slot - 1,
blockhash: blockhash.to_string(),
@ -6349,7 +6339,7 @@ pub mod tests {
let confirmed_block = blockstore.get_rooted_block(slot + 1, true).unwrap();
assert_eq!(confirmed_block.transactions.len(), 100);
let mut expected_block = ConfirmedBlock {
let mut expected_block = VersionedConfirmedBlock {
transactions: expected_transactions.clone(),
parent_slot: slot,
blockhash: blockhash.to_string(),
@ -6366,7 +6356,7 @@ pub mod tests {
let complete_block = blockstore.get_complete_block(slot + 2, true).unwrap();
assert_eq!(complete_block.transactions.len(), 100);
let mut expected_complete_block = ConfirmedBlock {
let mut expected_complete_block = VersionedConfirmedBlock {
transactions: expected_transactions,
parent_slot: slot + 1,
blockhash: blockhash.to_string(),
@ -6422,6 +6412,10 @@ pub mod tests {
let pre_token_balances_vec = vec![];
let post_token_balances_vec = vec![];
let rewards_vec = vec![];
let test_loaded_addresses = LoadedAddresses {
writable: vec![Pubkey::new_unique()],
readonly: vec![Pubkey::new_unique()],
};
// result not found
assert!(transaction_status_cf
@ -6440,6 +6434,7 @@ pub mod tests {
pre_token_balances: Some(pre_token_balances_vec.clone()),
post_token_balances: Some(post_token_balances_vec.clone()),
rewards: Some(rewards_vec.clone()),
loaded_addresses: test_loaded_addresses.clone(),
}
.into();
assert!(transaction_status_cf
@ -6457,6 +6452,7 @@ pub mod tests {
pre_token_balances,
post_token_balances,
rewards,
loaded_addresses,
} = transaction_status_cf
.get_protobuf_or_bincode::<StoredTransactionStatusMeta>((0, Signature::default(), 0))
.unwrap()
@ -6472,6 +6468,7 @@ pub mod tests {
assert_eq!(pre_token_balances.unwrap(), pre_token_balances_vec);
assert_eq!(post_token_balances.unwrap(), post_token_balances_vec);
assert_eq!(rewards.unwrap(), rewards_vec);
assert_eq!(loaded_addresses, test_loaded_addresses);
// insert value
let status = TransactionStatusMeta {
@ -6484,6 +6481,7 @@ pub mod tests {
pre_token_balances: Some(pre_token_balances_vec.clone()),
post_token_balances: Some(post_token_balances_vec.clone()),
rewards: Some(rewards_vec.clone()),
loaded_addresses: test_loaded_addresses.clone(),
}
.into();
assert!(transaction_status_cf
@ -6501,6 +6499,7 @@ pub mod tests {
pre_token_balances,
post_token_balances,
rewards,
loaded_addresses,
} = transaction_status_cf
.get_protobuf_or_bincode::<StoredTransactionStatusMeta>((
0,
@ -6522,6 +6521,7 @@ pub mod tests {
assert_eq!(pre_token_balances.unwrap(), pre_token_balances_vec);
assert_eq!(post_token_balances.unwrap(), post_token_balances_vec);
assert_eq!(rewards.unwrap(), rewards_vec);
assert_eq!(loaded_addresses, test_loaded_addresses);
}
#[test]
@ -6749,6 +6749,7 @@ pub mod tests {
pre_token_balances: Some(vec![]),
post_token_balances: Some(vec![]),
rewards: Some(vec![]),
loaded_addresses: LoadedAddresses::default(),
}
.into();
@ -6943,6 +6944,7 @@ pub mod tests {
pre_token_balances: Some(vec![]),
post_token_balances: Some(vec![]),
rewards: Some(vec![]),
loaded_addresses: LoadedAddresses::default(),
}
.into();
@ -7093,19 +7095,15 @@ pub mod tests {
blockstore.insert_shreds(shreds, None, false).unwrap();
blockstore.set_roots(vec![slot - 1, slot].iter()).unwrap();
let expected_transactions: Vec<TransactionWithStatusMeta> = entries
let expected_transactions: Vec<VersionedTransactionWithStatusMeta> = entries
.iter()
.cloned()
.filter(|entry| !entry.is_tick())
.flat_map(|entry| entry.transactions)
.map(|tx| {
tx.into_legacy_transaction()
.expect("versioned transactions not supported")
})
.map(|transaction| {
let mut pre_balances: Vec<u64> = vec![];
let mut post_balances: Vec<u64> = vec![];
for (i, _account_key) in transaction.message.account_keys.iter().enumerate() {
for i in 0..transaction.message.total_account_keys_len() {
pre_balances.push(i as u64 * 10);
post_balances.push(i as u64 * 11);
}
@ -7128,13 +7126,14 @@ pub mod tests {
pre_token_balances: pre_token_balances.clone(),
post_token_balances: post_token_balances.clone(),
rewards: rewards.clone(),
loaded_addresses: LoadedAddresses::default(),
}
.into();
blockstore
.transaction_status_cf
.put_protobuf((0, signature, slot), &status)
.unwrap();
TransactionWithStatusMeta {
VersionedTransactionWithStatusMeta {
transaction,
meta: Some(TransactionStatusMeta {
status: Ok(()),
@ -7146,18 +7145,19 @@ pub mod tests {
pre_token_balances,
post_token_balances,
rewards,
loaded_addresses: LoadedAddresses::default(),
}),
}
})
.collect();
for transaction in expected_transactions.clone() {
let signature = transaction.transaction.signatures[0];
for tx_with_meta in expected_transactions.clone() {
let signature = tx_with_meta.transaction.signatures[0];
assert_eq!(
blockstore.get_rooted_transaction(signature).unwrap(),
Some(ConfirmedTransactionWithStatusMeta {
Some(VersionedConfirmedTransactionWithStatusMeta {
slot,
transaction: transaction.clone(),
tx_with_meta: tx_with_meta.clone(),
block_time: None
})
);
@ -7165,9 +7165,9 @@ pub mod tests {
blockstore
.get_complete_transaction(signature, slot + 1)
.unwrap(),
Some(ConfirmedTransactionWithStatusMeta {
Some(VersionedConfirmedTransactionWithStatusMeta {
slot,
transaction,
tx_with_meta,
block_time: None
})
);
@ -7175,7 +7175,7 @@ pub mod tests {
blockstore.run_purge(0, 2, PurgeType::PrimaryIndex).unwrap();
*blockstore.lowest_cleanup_slot.write().unwrap() = slot;
for TransactionWithStatusMeta { transaction, .. } in expected_transactions {
for VersionedTransactionWithStatusMeta { transaction, .. } in expected_transactions {
let signature = transaction.signatures[0];
assert_eq!(blockstore.get_rooted_transaction(signature).unwrap(), None,);
assert_eq!(
@ -7197,19 +7197,15 @@ pub mod tests {
let shreds = entries_to_test_shreds(&entries, slot, slot - 1, true, 0);
blockstore.insert_shreds(shreds, None, false).unwrap();
let expected_transactions: Vec<TransactionWithStatusMeta> = entries
let expected_transactions: Vec<VersionedTransactionWithStatusMeta> = entries
.iter()
.cloned()
.filter(|entry| !entry.is_tick())
.flat_map(|entry| entry.transactions)
.map(|tx| {
tx.into_legacy_transaction()
.expect("versioned transactions not supported")
})
.map(|transaction| {
let mut pre_balances: Vec<u64> = vec![];
let mut post_balances: Vec<u64> = vec![];
for (i, _account_key) in transaction.message.account_keys.iter().enumerate() {
for i in 0..transaction.message.total_account_keys_len() {
pre_balances.push(i as u64 * 10);
post_balances.push(i as u64 * 11);
}
@ -7232,13 +7228,14 @@ pub mod tests {
pre_token_balances: pre_token_balances.clone(),
post_token_balances: post_token_balances.clone(),
rewards: rewards.clone(),
loaded_addresses: LoadedAddresses::default(),
}
.into();
blockstore
.transaction_status_cf
.put_protobuf((0, signature, slot), &status)
.unwrap();
TransactionWithStatusMeta {
VersionedTransactionWithStatusMeta {
transaction,
meta: Some(TransactionStatusMeta {
status: Ok(()),
@ -7250,20 +7247,21 @@ pub mod tests {
pre_token_balances,
post_token_balances,
rewards,
loaded_addresses: LoadedAddresses::default(),
}),
}
})
.collect();
for transaction in expected_transactions.clone() {
let signature = transaction.transaction.signatures[0];
for tx_with_meta in expected_transactions.clone() {
let signature = tx_with_meta.transaction.signatures[0];
assert_eq!(
blockstore
.get_complete_transaction(signature, slot)
.unwrap(),
Some(ConfirmedTransactionWithStatusMeta {
Some(VersionedConfirmedTransactionWithStatusMeta {
slot,
transaction,
tx_with_meta,
block_time: None
})
);
@ -7272,7 +7270,7 @@ pub mod tests {
blockstore.run_purge(0, 2, PurgeType::PrimaryIndex).unwrap();
*blockstore.lowest_cleanup_slot.write().unwrap() = slot;
for TransactionWithStatusMeta { transaction, .. } in expected_transactions {
for VersionedTransactionWithStatusMeta { transaction, .. } in expected_transactions {
let signature = transaction.signatures[0];
assert_eq!(
blockstore
@ -7560,16 +7558,13 @@ pub mod tests {
// Purge to freeze index 0 and write address-signatures in new primary index
blockstore.run_purge(0, 1, PurgeType::PrimaryIndex).unwrap();
}
for tx in entry.transactions {
let transaction = tx
.into_legacy_transaction()
.expect("versioned transactions not supported");
for transaction in entry.transactions {
assert_eq!(transaction.signatures.len(), 1);
blockstore
.write_transaction_status(
slot,
transaction.signatures[0],
transaction.message.account_keys.iter().collect(),
transaction.message.static_account_keys_iter().collect(),
vec![],
TransactionStatusMeta::default(),
)
@ -7587,16 +7582,13 @@ pub mod tests {
blockstore.insert_shreds(shreds, None, false).unwrap();
for entry in entries.into_iter() {
for tx in entry.transactions {
let transaction = tx
.into_legacy_transaction()
.expect("versioned transactions not supported");
for transaction in entry.transactions {
assert_eq!(transaction.signatures.len(), 1);
blockstore
.write_transaction_status(
slot,
transaction.signatures[0],
transaction.message.account_keys.iter().collect(),
transaction.message.static_account_keys_iter().collect(),
vec![],
TransactionStatusMeta::default(),
)
@ -8019,6 +8011,7 @@ pub mod tests {
pre_token_balances: Some(vec![]),
post_token_balances: Some(vec![]),
rewards: Some(vec![]),
loaded_addresses: LoadedAddresses::default(),
}
.into();
transaction_status_cf
@ -8570,8 +8563,9 @@ pub mod tests {
reward_type: Some(RewardType::Rent),
commission: None,
}]),
loaded_addresses: LoadedAddresses::default(),
};
let deprecated_status: StoredTransactionStatusMeta = status.clone().into();
let deprecated_status: StoredTransactionStatusMeta = status.clone().try_into().unwrap();
let protobuf_status: generated::TransactionStatusMeta = status.into();
for slot in 0..2 {

View File

@ -327,19 +327,24 @@ impl Blockstore {
let mut index1 = self.transaction_status_index_cf.get(1)?.unwrap_or_default();
for slot in from_slot..to_slot {
let slot_entries = self.get_any_valid_slot_entries(slot, 0);
for transaction in slot_entries
.iter()
.cloned()
.flat_map(|entry| entry.transactions)
{
let transactions = slot_entries
.into_iter()
.flat_map(|entry| entry.transactions);
for transaction in transactions {
if let Some(&signature) = transaction.signatures.get(0) {
batch.delete::<cf::TransactionStatus>((0, signature, slot))?;
batch.delete::<cf::TransactionStatus>((1, signature, slot))?;
// TODO: support purging dynamically loaded addresses from versioned transactions
for pubkey in transaction.message.into_static_account_keys() {
batch.delete::<cf::AddressSignatures>((0, pubkey, slot, signature))?;
batch.delete::<cf::AddressSignatures>((1, pubkey, slot, signature))?;
}
let meta = self.read_transaction_status((signature, slot))?;
let loaded_addresses =
meta.map(|meta| meta.loaded_addresses).unwrap_or_default();
for address in loaded_addresses.into_ordered_iter() {
batch.delete::<cf::AddressSignatures>((0, address, slot, signature))?;
batch.delete::<cf::AddressSignatures>((1, address, slot, signature))?;
}
}
}
}