get_signatures_for_address does not correctly account for result sets that span local and Bigtable sources (#22115) (#22168)

* get_signatures_for_address does not correctly account for result sets that span Blockstore and Bigtable.

This causes Bigtable to return `RowNotFound` until the new tx is uploaded.

Check that `before` exists in Bigtable, and if not, set it to `None` to return the full data set.

References #21442
Closes #22110

* Differentiate between before sig not found and no newer signatures

* Dedupe bigtable results to account for potential upload race

Co-authored-by: Tyera Eulberg <tyera@solana.com>
(cherry picked from commit bac6821e192435e620e7c81451cedff7aca7cc9f)

Co-authored-by: Omar Kilani <omar.kilani@gmail.com>
This commit is contained in:
mergify[bot] 2021-12-29 19:52:36 +00:00 committed by GitHub
parent 9fb67f9b07
commit 74b98c2dd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 109 additions and 28 deletions

View File

@ -95,6 +95,12 @@ pub type CompletedSlotsSender = SyncSender<Vec<Slot>>;
pub type CompletedSlotsReceiver = Receiver<Vec<Slot>>; pub type CompletedSlotsReceiver = Receiver<Vec<Slot>>;
type CompletedRanges = Vec<(u32, u32)>; type CompletedRanges = Vec<(u32, u32)>;
#[derive(Default)]
pub struct SignatureInfosForAddress {
pub infos: Vec<ConfirmedTransactionStatusWithSignature>,
pub found_before: bool,
}
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
pub enum PurgeType { pub enum PurgeType {
Exact, Exact,
@ -2416,7 +2422,7 @@ impl Blockstore {
before: Option<Signature>, before: Option<Signature>,
until: Option<Signature>, until: Option<Signature>,
limit: usize, limit: usize,
) -> Result<Vec<ConfirmedTransactionStatusWithSignature>> { ) -> Result<SignatureInfosForAddress> {
datapoint_info!( datapoint_info!(
"blockstore-rpc-api", "blockstore-rpc-api",
( (
@ -2440,7 +2446,7 @@ impl Blockstore {
let transaction_status = let transaction_status =
self.get_transaction_status(before, &confirmed_unrooted_slots)?; self.get_transaction_status(before, &confirmed_unrooted_slots)?;
match transaction_status { match transaction_status {
None => return Ok(vec![]), None => return Ok(SignatureInfosForAddress::default()),
Some((slot, _)) => { Some((slot, _)) => {
let mut slot_signatures = self.get_sorted_block_signatures(slot)?; let mut slot_signatures = self.get_sorted_block_signatures(slot)?;
if let Some(pos) = slot_signatures.iter().position(|&x| x == before) { if let Some(pos) = slot_signatures.iter().position(|&x| x == before) {
@ -2632,7 +2638,10 @@ impl Blockstore {
) )
); );
Ok(infos) Ok(SignatureInfosForAddress {
infos,
found_before: true, // if `before` signature was not found, this method returned early
})
} }
pub fn read_rewards(&self, index: Slot) -> Result<Option<Rewards>> { pub fn read_rewards(&self, index: Slot) -> Result<Option<Rewards>> {
@ -7418,7 +7427,7 @@ pub mod tests {
let highest_confirmed_root = 8; let highest_confirmed_root = 8;
// Fetch all rooted signatures for address 0 at once... // Fetch all rooted signatures for address 0 at once...
let all0 = blockstore let sig_infos = blockstore
.get_confirmed_signatures_for_address2( .get_confirmed_signatures_for_address2(
address0, address0,
highest_confirmed_root, highest_confirmed_root,
@ -7427,6 +7436,8 @@ pub mod tests {
usize::MAX, usize::MAX,
) )
.unwrap(); .unwrap();
assert!(sig_infos.found_before);
let all0 = sig_infos.infos;
assert_eq!(all0.len(), 12); assert_eq!(all0.len(), 12);
// Fetch all rooted signatures for address 1 at once... // Fetch all rooted signatures for address 1 at once...
@ -7438,12 +7449,13 @@ pub mod tests {
None, None,
usize::MAX, usize::MAX,
) )
.unwrap(); .unwrap()
.infos;
assert_eq!(all1.len(), 12); assert_eq!(all1.len(), 12);
// Fetch all signatures for address 0 individually // Fetch all signatures for address 0 individually
for i in 0..all0.len() { for i in 0..all0.len() {
let results = blockstore let sig_infos = blockstore
.get_confirmed_signatures_for_address2( .get_confirmed_signatures_for_address2(
address0, address0,
highest_confirmed_root, highest_confirmed_root,
@ -7456,6 +7468,8 @@ pub mod tests {
1, 1,
) )
.unwrap(); .unwrap();
assert!(sig_infos.found_before);
let results = sig_infos.infos;
assert_eq!(results.len(), 1); assert_eq!(results.len(), 1);
assert_eq!(results[0], all0[i], "Unexpected result for {}", i); assert_eq!(results[0], all0[i], "Unexpected result for {}", i);
} }
@ -7477,12 +7491,13 @@ pub mod tests {
}, },
10, 10,
) )
.unwrap(); .unwrap()
.infos;
assert_eq!(results.len(), 1); assert_eq!(results.len(), 1);
assert_eq!(results[0], all0[i], "Unexpected result for {}", i); assert_eq!(results[0], all0[i], "Unexpected result for {}", i);
} }
assert!(blockstore let sig_infos = blockstore
.get_confirmed_signatures_for_address2( .get_confirmed_signatures_for_address2(
address0, address0,
highest_confirmed_root, highest_confirmed_root,
@ -7490,8 +7505,9 @@ pub mod tests {
None, None,
1, 1,
) )
.unwrap() .unwrap();
.is_empty()); assert!(sig_infos.found_before);
assert!(sig_infos.infos.is_empty());
assert!(blockstore assert!(blockstore
.get_confirmed_signatures_for_address2( .get_confirmed_signatures_for_address2(
@ -7502,6 +7518,7 @@ pub mod tests {
2, 2,
) )
.unwrap() .unwrap()
.infos
.is_empty()); .is_empty());
// Fetch all signatures for address 0, three at a time // Fetch all signatures for address 0, three at a time
@ -7519,7 +7536,8 @@ pub mod tests {
None, None,
3, 3,
) )
.unwrap(); .unwrap()
.infos;
assert_eq!(results.len(), 3); assert_eq!(results.len(), 3);
assert_eq!(results[0], all0[i]); assert_eq!(results[0], all0[i]);
assert_eq!(results[1], all0[i + 1]); assert_eq!(results[1], all0[i + 1]);
@ -7541,7 +7559,8 @@ pub mod tests {
None, None,
2, 2,
) )
.unwrap(); .unwrap()
.infos;
assert_eq!(results.len(), 2); assert_eq!(results.len(), 2);
assert_eq!(results[0].slot, results[1].slot); assert_eq!(results[0].slot, results[1].slot);
assert!(results[0].signature >= results[1].signature); assert!(results[0].signature >= results[1].signature);
@ -7550,7 +7569,7 @@ pub mod tests {
} }
// A search for address 0 with `before` and/or `until` signatures from address1 should also work // A search for address 0 with `before` and/or `until` signatures from address1 should also work
let results = blockstore let sig_infos = blockstore
.get_confirmed_signatures_for_address2( .get_confirmed_signatures_for_address2(
address0, address0,
highest_confirmed_root, highest_confirmed_root,
@ -7559,6 +7578,8 @@ pub mod tests {
usize::MAX, usize::MAX,
) )
.unwrap(); .unwrap();
assert!(sig_infos.found_before);
let results = sig_infos.infos;
// The exact number of results returned is variable, based on the sort order of the // The exact number of results returned is variable, based on the sort order of the
// random signatures that are generated // random signatures that are generated
assert!(!results.is_empty()); assert!(!results.is_empty());
@ -7571,7 +7592,8 @@ pub mod tests {
Some(all1[4].signature), Some(all1[4].signature),
usize::MAX, usize::MAX,
) )
.unwrap(); .unwrap()
.infos;
assert!(results2.len() < results.len()); assert!(results2.len() < results.len());
// Duplicate all tests using confirmed signatures // Duplicate all tests using confirmed signatures
@ -7586,7 +7608,8 @@ pub mod tests {
None, None,
usize::MAX, usize::MAX,
) )
.unwrap(); .unwrap()
.infos;
assert_eq!(all0.len(), 14); assert_eq!(all0.len(), 14);
// Fetch all signatures for address 1 at once... // Fetch all signatures for address 1 at once...
@ -7598,7 +7621,8 @@ pub mod tests {
None, None,
usize::MAX, usize::MAX,
) )
.unwrap(); .unwrap()
.infos;
assert_eq!(all1.len(), 14); assert_eq!(all1.len(), 14);
// Fetch all signatures for address 0 individually // Fetch all signatures for address 0 individually
@ -7615,7 +7639,8 @@ pub mod tests {
None, None,
1, 1,
) )
.unwrap(); .unwrap()
.infos;
assert_eq!(results.len(), 1); assert_eq!(results.len(), 1);
assert_eq!(results[0], all0[i], "Unexpected result for {}", i); assert_eq!(results[0], all0[i], "Unexpected result for {}", i);
} }
@ -7637,7 +7662,8 @@ pub mod tests {
}, },
10, 10,
) )
.unwrap(); .unwrap()
.infos;
assert_eq!(results.len(), 1); assert_eq!(results.len(), 1);
assert_eq!(results[0], all0[i], "Unexpected result for {}", i); assert_eq!(results[0], all0[i], "Unexpected result for {}", i);
} }
@ -7651,6 +7677,7 @@ pub mod tests {
1, 1,
) )
.unwrap() .unwrap()
.infos
.is_empty()); .is_empty());
assert!(blockstore assert!(blockstore
@ -7662,6 +7689,7 @@ pub mod tests {
2, 2,
) )
.unwrap() .unwrap()
.infos
.is_empty()); .is_empty());
// Fetch all signatures for address 0, three at a time // Fetch all signatures for address 0, three at a time
@ -7679,7 +7707,8 @@ pub mod tests {
None, None,
3, 3,
) )
.unwrap(); .unwrap()
.infos;
if i < 12 { if i < 12 {
assert_eq!(results.len(), 3); assert_eq!(results.len(), 3);
assert_eq!(results[2], all0[i + 2]); assert_eq!(results[2], all0[i + 2]);
@ -7705,7 +7734,8 @@ pub mod tests {
None, None,
2, 2,
) )
.unwrap(); .unwrap()
.infos;
assert_eq!(results.len(), 2); assert_eq!(results.len(), 2);
assert_eq!(results[0].slot, results[1].slot); assert_eq!(results[0].slot, results[1].slot);
assert!(results[0].signature >= results[1].signature); assert!(results[0].signature >= results[1].signature);
@ -7722,7 +7752,8 @@ pub mod tests {
None, None,
usize::MAX, usize::MAX,
) )
.unwrap(); .unwrap()
.infos;
// The exact number of results returned is variable, based on the sort order of the // The exact number of results returned is variable, based on the sort order of the
// random signatures that are generated // random signatures that are generated
assert!(!results.is_empty()); assert!(!results.is_empty());
@ -7735,8 +7766,26 @@ pub mod tests {
Some(all1[4].signature), Some(all1[4].signature),
usize::MAX, usize::MAX,
) )
.unwrap(); .unwrap()
.infos;
assert!(results2.len() < results.len()); assert!(results2.len() < results.len());
// Remove signature
blockstore
.address_signatures_cf
.delete((0, address0, 2, all0[0].signature))
.unwrap();
let sig_infos = blockstore
.get_confirmed_signatures_for_address2(
address0,
highest_confirmed_root,
Some(all0[0].signature),
None,
usize::MAX,
)
.unwrap();
assert!(!sig_infos.found_before);
assert!(sig_infos.infos.is_empty());
} }
#[test] #[test]

View File

@ -31,7 +31,9 @@ use {
solana_faucet::faucet::request_airdrop_transaction, solana_faucet::faucet::request_airdrop_transaction,
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
solana_ledger::{ solana_ledger::{
blockstore::Blockstore, blockstore_db::BlockstoreError, get_tmp_ledger_path, blockstore::{Blockstore, SignatureInfosForAddress},
blockstore_db::BlockstoreError,
get_tmp_ledger_path,
leader_schedule_cache::LeaderScheduleCache, leader_schedule_cache::LeaderScheduleCache,
}, },
solana_metrics::inc_new_counter_info, solana_metrics::inc_new_counter_info,
@ -1439,7 +1441,7 @@ impl JsonRpcRequestProcessor {
pub async fn get_signatures_for_address( pub async fn get_signatures_for_address(
&self, &self,
address: Pubkey, address: Pubkey,
mut before: Option<Signature>, before: Option<Signature>,
until: Option<Signature>, until: Option<Signature>,
mut limit: usize, mut limit: usize,
commitment: Option<CommitmentConfig>, commitment: Option<CommitmentConfig>,
@ -1460,29 +1462,59 @@ impl JsonRpcRequestProcessor {
highest_confirmed_root highest_confirmed_root
}; };
let mut results = self let SignatureInfosForAddress {
infos: mut results,
found_before,
} = self
.blockstore .blockstore
.get_confirmed_signatures_for_address2(address, highest_slot, before, until, limit) .get_confirmed_signatures_for_address2(address, highest_slot, before, until, limit)
.map_err(|err| Error::invalid_params(format!("{}", err)))?; .map_err(|err| Error::invalid_params(format!("{}", err)))?;
if results.len() < limit { if results.len() < limit {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
let mut bigtable_before = before;
if !results.is_empty() { if !results.is_empty() {
limit -= results.len(); limit -= results.len();
before = results.last().map(|x| x.signature); bigtable_before = results.last().map(|x| x.signature);
}
// If the oldest address-signature found in Blockstore has not yet been
// uploaded to long-term storage, modify the storage query to return all latest
// signatures to prevent erroring on RowNotFound. This can race with upload.
if found_before
&& bigtable_before.is_some()
&& bigtable_ledger_storage
.get_confirmed_transaction(&bigtable_before.unwrap())
.await
.ok()
.flatten()
.is_none()
{
bigtable_before = None;
} }
let bigtable_results = bigtable_ledger_storage let bigtable_results = bigtable_ledger_storage
.get_confirmed_signatures_for_address( .get_confirmed_signatures_for_address(
&address, &address,
before.as_ref(), bigtable_before.as_ref(),
until.as_ref(), until.as_ref(),
limit, limit,
) )
.await; .await;
match bigtable_results { match bigtable_results {
Ok(bigtable_results) => { Ok(bigtable_results) => {
results.extend(bigtable_results.into_iter().map(|x| x.0)); let results_set: HashSet<_> =
results.iter().map(|result| result.signature).collect();
for (bigtable_result, _) in bigtable_results {
// In the upload race condition, latest address-signatures in
// long-term storage may include original `before` signature...
if before != Some(bigtable_result.signature)
// ...or earlier Blockstore signatures
&& !results_set.contains(&bigtable_result.signature)
{
results.push(bigtable_result);
}
}
} }
Err(err) => { Err(err) => {
warn!("{:?}", err); warn!("{:?}", err);