v1.9: Enforce tx metadata upload to bigtable (#23212)
* Enforce tx metadata upload with static types (#23028) * resolve conflicts * fix test
This commit is contained in:
@@ -25,7 +25,6 @@ pub async fn upload_confirmed_blocks(
|
||||
bigtable: solana_storage_bigtable::LedgerStorage,
|
||||
starting_slot: Slot,
|
||||
ending_slot: Option<Slot>,
|
||||
allow_missing_metadata: bool,
|
||||
force_reupload: bool,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
@@ -186,20 +185,7 @@ pub async fn upload_confirmed_blocks(
|
||||
num_blocks -= 1;
|
||||
None
|
||||
}
|
||||
Some(confirmed_block) => {
|
||||
if confirmed_block
|
||||
.transactions
|
||||
.iter()
|
||||
.any(|transaction| transaction.meta.is_none())
|
||||
{
|
||||
if allow_missing_metadata {
|
||||
info!("Transaction metadata missing from slot {}", slot);
|
||||
} else {
|
||||
panic!("Transaction metadata missing from slot {}", slot);
|
||||
}
|
||||
}
|
||||
Some(bigtable.upload_confirmed_block(slot, confirmed_block))
|
||||
}
|
||||
Some(confirmed_block) => Some(bigtable.upload_confirmed_block(slot, confirmed_block)),
|
||||
});
|
||||
|
||||
for result in futures::future::join_all(uploads).await {
|
||||
|
@@ -72,7 +72,6 @@ impl BigTableUploadService {
|
||||
bigtable_ledger_storage.clone(),
|
||||
start_slot,
|
||||
Some(end_slot),
|
||||
true,
|
||||
false,
|
||||
exit.clone(),
|
||||
));
|
||||
|
@@ -42,7 +42,7 @@ use {
|
||||
solana_storage_proto::{StoredExtendedRewards, StoredTransactionStatusMeta},
|
||||
solana_transaction_status::{
|
||||
ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, Rewards,
|
||||
TransactionStatusMeta, TransactionWithStatusMeta,
|
||||
TransactionStatusMeta, TransactionWithMetadata,
|
||||
},
|
||||
std::{
|
||||
borrow::Cow,
|
||||
@@ -1945,18 +1945,17 @@ impl Blockstore {
|
||||
&self,
|
||||
slot: Slot,
|
||||
iterator: impl Iterator<Item = VersionedTransaction>,
|
||||
) -> Result<Vec<TransactionWithStatusMeta>> {
|
||||
) -> Result<Vec<TransactionWithMetadata>> {
|
||||
iterator
|
||||
.map(|versioned_tx| {
|
||||
// TODO: add support for versioned transactions
|
||||
if let Some(transaction) = versioned_tx.into_legacy_transaction() {
|
||||
let signature = transaction.signatures[0];
|
||||
Ok(TransactionWithStatusMeta {
|
||||
Ok(TransactionWithMetadata {
|
||||
transaction,
|
||||
meta: self
|
||||
.read_transaction_status((signature, slot))
|
||||
.ok()
|
||||
.flatten(),
|
||||
.read_transaction_status((signature, slot))?
|
||||
.ok_or(BlockstoreError::MissingTransactionMetadata)?,
|
||||
})
|
||||
} else {
|
||||
Err(BlockstoreError::UnsupportedTransactionVersion)
|
||||
@@ -2243,7 +2242,7 @@ impl Blockstore {
|
||||
signature: Signature,
|
||||
confirmed_unrooted_slots: &[Slot],
|
||||
) -> Result<Option<ConfirmedTransaction>> {
|
||||
if let Some((slot, status)) =
|
||||
if let Some((slot, meta)) =
|
||||
self.get_transaction_status(signature, confirmed_unrooted_slots)?
|
||||
{
|
||||
let transaction = self
|
||||
@@ -2258,10 +2257,7 @@ impl Blockstore {
|
||||
let block_time = self.get_block_time(slot)?;
|
||||
Ok(Some(ConfirmedTransaction {
|
||||
slot,
|
||||
transaction: TransactionWithStatusMeta {
|
||||
transaction,
|
||||
meta: Some(status),
|
||||
},
|
||||
transaction: TransactionWithMetadata { transaction, meta },
|
||||
block_time,
|
||||
}))
|
||||
} else {
|
||||
@@ -6052,7 +6048,7 @@ pub mod tests {
|
||||
.put_meta_bytes(slot - 1, &serialize(&parent_meta).unwrap())
|
||||
.unwrap();
|
||||
|
||||
let expected_transactions: Vec<TransactionWithStatusMeta> = entries
|
||||
let expected_transactions: Vec<TransactionWithMetadata> = entries
|
||||
.iter()
|
||||
.cloned()
|
||||
.filter(|entry| !entry.is_tick())
|
||||
@@ -6118,9 +6114,9 @@ pub mod tests {
|
||||
.transaction_status_cf
|
||||
.put_protobuf((0, signature, slot + 2), &status)
|
||||
.unwrap();
|
||||
TransactionWithStatusMeta {
|
||||
TransactionWithMetadata {
|
||||
transaction,
|
||||
meta: Some(TransactionStatusMeta {
|
||||
meta: TransactionStatusMeta {
|
||||
status: Ok(()),
|
||||
fee: 42,
|
||||
pre_balances,
|
||||
@@ -6130,21 +6126,22 @@ pub mod tests {
|
||||
pre_token_balances: Some(vec![]),
|
||||
post_token_balances: Some(vec![]),
|
||||
rewards: Some(vec![]),
|
||||
}),
|
||||
},
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Even if marked as root, a slot that is empty of entries should return an error
|
||||
let confirmed_block_err = blockstore.get_rooted_block(slot - 1, true).unwrap_err();
|
||||
assert_matches!(confirmed_block_err, BlockstoreError::SlotUnavailable);
|
||||
assert_matches!(
|
||||
blockstore.get_rooted_block(slot - 1, true),
|
||||
Err(BlockstoreError::SlotUnavailable)
|
||||
);
|
||||
|
||||
// The previous_blockhash of `expected_block` is default because its parent slot is a root,
|
||||
// but empty of entries (eg. snapshot root slots). This now returns an error.
|
||||
let confirmed_block_err = blockstore.get_rooted_block(slot, true).unwrap_err();
|
||||
assert_matches!(
|
||||
confirmed_block_err,
|
||||
BlockstoreError::ParentEntriesUnavailable
|
||||
blockstore.get_rooted_block(slot, true),
|
||||
Err(BlockstoreError::ParentEntriesUnavailable)
|
||||
);
|
||||
|
||||
// Test if require_previous_blockhash is false
|
||||
@@ -6908,7 +6905,7 @@ pub mod tests {
|
||||
blockstore.insert_shreds(shreds, None, false).unwrap();
|
||||
blockstore.set_roots(vec![slot - 1, slot].iter()).unwrap();
|
||||
|
||||
let expected_transactions: Vec<TransactionWithStatusMeta> = entries
|
||||
let expected_transactions: Vec<TransactionWithMetadata> = entries
|
||||
.iter()
|
||||
.cloned()
|
||||
.filter(|entry| !entry.is_tick())
|
||||
@@ -6949,9 +6946,9 @@ pub mod tests {
|
||||
.transaction_status_cf
|
||||
.put_protobuf((0, signature, slot), &status)
|
||||
.unwrap();
|
||||
TransactionWithStatusMeta {
|
||||
TransactionWithMetadata {
|
||||
transaction,
|
||||
meta: Some(TransactionStatusMeta {
|
||||
meta: TransactionStatusMeta {
|
||||
status: Ok(()),
|
||||
fee: 42,
|
||||
pre_balances,
|
||||
@@ -6961,7 +6958,7 @@ pub mod tests {
|
||||
pre_token_balances,
|
||||
post_token_balances,
|
||||
rewards,
|
||||
}),
|
||||
},
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
@@ -6990,7 +6987,7 @@ pub mod tests {
|
||||
|
||||
blockstore.run_purge(0, 2, PurgeType::PrimaryIndex).unwrap();
|
||||
*blockstore.lowest_cleanup_slot.write().unwrap() = slot;
|
||||
for TransactionWithStatusMeta { transaction, .. } in expected_transactions {
|
||||
for TransactionWithMetadata { transaction, .. } in expected_transactions {
|
||||
let signature = transaction.signatures[0];
|
||||
assert_eq!(blockstore.get_rooted_transaction(signature).unwrap(), None,);
|
||||
assert_eq!(
|
||||
@@ -7012,7 +7009,7 @@ pub mod tests {
|
||||
let shreds = entries_to_test_shreds(entries.clone(), slot, slot - 1, true, 0);
|
||||
blockstore.insert_shreds(shreds, None, false).unwrap();
|
||||
|
||||
let expected_transactions: Vec<TransactionWithStatusMeta> = entries
|
||||
let expected_transactions: Vec<TransactionWithMetadata> = entries
|
||||
.iter()
|
||||
.cloned()
|
||||
.filter(|entry| !entry.is_tick())
|
||||
@@ -7053,9 +7050,9 @@ pub mod tests {
|
||||
.transaction_status_cf
|
||||
.put_protobuf((0, signature, slot), &status)
|
||||
.unwrap();
|
||||
TransactionWithStatusMeta {
|
||||
TransactionWithMetadata {
|
||||
transaction,
|
||||
meta: Some(TransactionStatusMeta {
|
||||
meta: TransactionStatusMeta {
|
||||
status: Ok(()),
|
||||
fee: 42,
|
||||
pre_balances,
|
||||
@@ -7065,7 +7062,7 @@ pub mod tests {
|
||||
pre_token_balances,
|
||||
post_token_balances,
|
||||
rewards,
|
||||
}),
|
||||
},
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
@@ -7087,7 +7084,7 @@ pub mod tests {
|
||||
|
||||
blockstore.run_purge(0, 2, PurgeType::PrimaryIndex).unwrap();
|
||||
*blockstore.lowest_cleanup_slot.write().unwrap() = slot;
|
||||
for TransactionWithStatusMeta { transaction, .. } in expected_transactions {
|
||||
for TransactionWithMetadata { transaction, .. } in expected_transactions {
|
||||
let signature = transaction.signatures[0];
|
||||
assert_eq!(
|
||||
blockstore
|
||||
@@ -7809,7 +7806,6 @@ pub mod tests {
|
||||
fn test_map_transactions_to_statuses() {
|
||||
let ledger_path = get_tmp_ledger_path_auto_delete!();
|
||||
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
|
||||
|
||||
let transaction_status_cf = &blockstore.transaction_status_cf;
|
||||
|
||||
let slot = 0;
|
||||
@@ -7841,6 +7837,16 @@ pub mod tests {
|
||||
.unwrap();
|
||||
transactions.push(transaction.into());
|
||||
}
|
||||
|
||||
let map_result =
|
||||
blockstore.map_transactions_to_statuses(slot, transactions.clone().into_iter());
|
||||
assert!(map_result.is_ok());
|
||||
let map = map_result.unwrap();
|
||||
assert_eq!(map.len(), 4);
|
||||
for (x, m) in map.iter().enumerate() {
|
||||
assert_eq!(m.meta.fee, x as u64);
|
||||
}
|
||||
|
||||
// Push transaction that will not have matching status, as a test case
|
||||
transactions.push(
|
||||
Transaction::new_with_compiled_instructions(
|
||||
@@ -7854,13 +7860,7 @@ pub mod tests {
|
||||
);
|
||||
|
||||
let map_result = blockstore.map_transactions_to_statuses(slot, transactions.into_iter());
|
||||
assert!(map_result.is_ok());
|
||||
let map = map_result.unwrap();
|
||||
assert_eq!(map.len(), 5);
|
||||
for (x, m) in map.iter().take(4).enumerate() {
|
||||
assert_eq!(m.meta.as_ref().unwrap().fee, x as u64);
|
||||
}
|
||||
assert_eq!(map[4].meta, None);
|
||||
assert_matches!(map_result, Err(BlockstoreError::MissingTransactionMetadata));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@@ -102,6 +102,7 @@ pub enum BlockstoreError {
|
||||
ParentEntriesUnavailable,
|
||||
SlotUnavailable,
|
||||
UnsupportedTransactionVersion,
|
||||
MissingTransactionMetadata,
|
||||
}
|
||||
pub type Result<T> = std::result::Result<T, BlockstoreError>;
|
||||
|
||||
|
Reference in New Issue
Block a user