get_signatures_for_address does not correctly account for result sets that span local and Bigtable sources (#22115)
* get_signatures_for_address does not correctly account for result sets that span Blockstore and Bigtable. This causes Bigtable to return `RowNotFound` until the new tx is uploaded. Check that `before` exists in Bigtable, and if not, set it to `None` to return the full data set. References #21442 Closes #22110 * Differentiate between before sig not found and no newer signatures * Dedupe bigtable results to account for potential upload race Co-authored-by: Tyera Eulberg <tyera@solana.com>
This commit is contained in:
		@@ -95,6 +95,12 @@ pub type CompletedSlotsSender = SyncSender<Vec<Slot>>;
 | 
			
		||||
pub type CompletedSlotsReceiver = Receiver<Vec<Slot>>;
 | 
			
		||||
type CompletedRanges = Vec<(u32, u32)>;
 | 
			
		||||
 | 
			
		||||
#[derive(Default)]
 | 
			
		||||
pub struct SignatureInfosForAddress {
 | 
			
		||||
    pub infos: Vec<ConfirmedTransactionStatusWithSignature>,
 | 
			
		||||
    pub found_before: bool,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Clone, Copy)]
 | 
			
		||||
pub enum PurgeType {
 | 
			
		||||
    Exact,
 | 
			
		||||
@@ -2511,7 +2517,7 @@ impl Blockstore {
 | 
			
		||||
        before: Option<Signature>,
 | 
			
		||||
        until: Option<Signature>,
 | 
			
		||||
        limit: usize,
 | 
			
		||||
    ) -> Result<Vec<ConfirmedTransactionStatusWithSignature>> {
 | 
			
		||||
    ) -> Result<SignatureInfosForAddress> {
 | 
			
		||||
        datapoint_info!(
 | 
			
		||||
            "blockstore-rpc-api",
 | 
			
		||||
            (
 | 
			
		||||
@@ -2535,7 +2541,7 @@ impl Blockstore {
 | 
			
		||||
                let transaction_status =
 | 
			
		||||
                    self.get_transaction_status(before, &confirmed_unrooted_slots)?;
 | 
			
		||||
                match transaction_status {
 | 
			
		||||
                    None => return Ok(vec![]),
 | 
			
		||||
                    None => return Ok(SignatureInfosForAddress::default()),
 | 
			
		||||
                    Some((slot, _)) => {
 | 
			
		||||
                        let mut slot_signatures = self.get_sorted_block_signatures(slot)?;
 | 
			
		||||
                        if let Some(pos) = slot_signatures.iter().position(|&x| x == before) {
 | 
			
		||||
@@ -2727,7 +2733,10 @@ impl Blockstore {
 | 
			
		||||
            )
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        Ok(infos)
 | 
			
		||||
        Ok(SignatureInfosForAddress {
 | 
			
		||||
            infos,
 | 
			
		||||
            found_before: true, // if `before` signature was not found, this method returned early
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn read_rewards(&self, index: Slot) -> Result<Option<Rewards>> {
 | 
			
		||||
@@ -7619,7 +7628,7 @@ pub mod tests {
 | 
			
		||||
        let highest_confirmed_root = 8;
 | 
			
		||||
 | 
			
		||||
        // Fetch all rooted signatures for address 0 at once...
 | 
			
		||||
        let all0 = blockstore
 | 
			
		||||
        let sig_infos = blockstore
 | 
			
		||||
            .get_confirmed_signatures_for_address2(
 | 
			
		||||
                address0,
 | 
			
		||||
                highest_confirmed_root,
 | 
			
		||||
@@ -7628,6 +7637,8 @@ pub mod tests {
 | 
			
		||||
                usize::MAX,
 | 
			
		||||
            )
 | 
			
		||||
            .unwrap();
 | 
			
		||||
        assert!(sig_infos.found_before);
 | 
			
		||||
        let all0 = sig_infos.infos;
 | 
			
		||||
        assert_eq!(all0.len(), 12);
 | 
			
		||||
 | 
			
		||||
        // Fetch all rooted signatures for address 1 at once...
 | 
			
		||||
@@ -7639,12 +7650,13 @@ pub mod tests {
 | 
			
		||||
                None,
 | 
			
		||||
                usize::MAX,
 | 
			
		||||
            )
 | 
			
		||||
            .unwrap();
 | 
			
		||||
            .unwrap()
 | 
			
		||||
            .infos;
 | 
			
		||||
        assert_eq!(all1.len(), 12);
 | 
			
		||||
 | 
			
		||||
        // Fetch all signatures for address 0 individually
 | 
			
		||||
        for i in 0..all0.len() {
 | 
			
		||||
            let results = blockstore
 | 
			
		||||
            let sig_infos = blockstore
 | 
			
		||||
                .get_confirmed_signatures_for_address2(
 | 
			
		||||
                    address0,
 | 
			
		||||
                    highest_confirmed_root,
 | 
			
		||||
@@ -7657,6 +7669,8 @@ pub mod tests {
 | 
			
		||||
                    1,
 | 
			
		||||
                )
 | 
			
		||||
                .unwrap();
 | 
			
		||||
            assert!(sig_infos.found_before);
 | 
			
		||||
            let results = sig_infos.infos;
 | 
			
		||||
            assert_eq!(results.len(), 1);
 | 
			
		||||
            assert_eq!(results[0], all0[i], "Unexpected result for {}", i);
 | 
			
		||||
        }
 | 
			
		||||
@@ -7678,12 +7692,13 @@ pub mod tests {
 | 
			
		||||
                    },
 | 
			
		||||
                    10,
 | 
			
		||||
                )
 | 
			
		||||
                .unwrap();
 | 
			
		||||
                .unwrap()
 | 
			
		||||
                .infos;
 | 
			
		||||
            assert_eq!(results.len(), 1);
 | 
			
		||||
            assert_eq!(results[0], all0[i], "Unexpected result for {}", i);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        assert!(blockstore
 | 
			
		||||
        let sig_infos = blockstore
 | 
			
		||||
            .get_confirmed_signatures_for_address2(
 | 
			
		||||
                address0,
 | 
			
		||||
                highest_confirmed_root,
 | 
			
		||||
@@ -7691,8 +7706,9 @@ pub mod tests {
 | 
			
		||||
                None,
 | 
			
		||||
                1,
 | 
			
		||||
            )
 | 
			
		||||
            .unwrap()
 | 
			
		||||
            .is_empty());
 | 
			
		||||
            .unwrap();
 | 
			
		||||
        assert!(sig_infos.found_before);
 | 
			
		||||
        assert!(sig_infos.infos.is_empty());
 | 
			
		||||
 | 
			
		||||
        assert!(blockstore
 | 
			
		||||
            .get_confirmed_signatures_for_address2(
 | 
			
		||||
@@ -7703,6 +7719,7 @@ pub mod tests {
 | 
			
		||||
                2,
 | 
			
		||||
            )
 | 
			
		||||
            .unwrap()
 | 
			
		||||
            .infos
 | 
			
		||||
            .is_empty());
 | 
			
		||||
 | 
			
		||||
        // Fetch all signatures for address 0, three at a time
 | 
			
		||||
@@ -7720,7 +7737,8 @@ pub mod tests {
 | 
			
		||||
                    None,
 | 
			
		||||
                    3,
 | 
			
		||||
                )
 | 
			
		||||
                .unwrap();
 | 
			
		||||
                .unwrap()
 | 
			
		||||
                .infos;
 | 
			
		||||
            assert_eq!(results.len(), 3);
 | 
			
		||||
            assert_eq!(results[0], all0[i]);
 | 
			
		||||
            assert_eq!(results[1], all0[i + 1]);
 | 
			
		||||
@@ -7742,7 +7760,8 @@ pub mod tests {
 | 
			
		||||
                    None,
 | 
			
		||||
                    2,
 | 
			
		||||
                )
 | 
			
		||||
                .unwrap();
 | 
			
		||||
                .unwrap()
 | 
			
		||||
                .infos;
 | 
			
		||||
            assert_eq!(results.len(), 2);
 | 
			
		||||
            assert_eq!(results[0].slot, results[1].slot);
 | 
			
		||||
            assert!(results[0].signature >= results[1].signature);
 | 
			
		||||
@@ -7751,7 +7770,7 @@ pub mod tests {
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // A search for address 0 with `before` and/or `until` signatures from address1 should also work
 | 
			
		||||
        let results = blockstore
 | 
			
		||||
        let sig_infos = blockstore
 | 
			
		||||
            .get_confirmed_signatures_for_address2(
 | 
			
		||||
                address0,
 | 
			
		||||
                highest_confirmed_root,
 | 
			
		||||
@@ -7760,6 +7779,8 @@ pub mod tests {
 | 
			
		||||
                usize::MAX,
 | 
			
		||||
            )
 | 
			
		||||
            .unwrap();
 | 
			
		||||
        assert!(sig_infos.found_before);
 | 
			
		||||
        let results = sig_infos.infos;
 | 
			
		||||
        // The exact number of results returned is variable, based on the sort order of the
 | 
			
		||||
        // random signatures that are generated
 | 
			
		||||
        assert!(!results.is_empty());
 | 
			
		||||
@@ -7772,7 +7793,8 @@ pub mod tests {
 | 
			
		||||
                Some(all1[4].signature),
 | 
			
		||||
                usize::MAX,
 | 
			
		||||
            )
 | 
			
		||||
            .unwrap();
 | 
			
		||||
            .unwrap()
 | 
			
		||||
            .infos;
 | 
			
		||||
        assert!(results2.len() < results.len());
 | 
			
		||||
 | 
			
		||||
        // Duplicate all tests using confirmed signatures
 | 
			
		||||
@@ -7787,7 +7809,8 @@ pub mod tests {
 | 
			
		||||
                None,
 | 
			
		||||
                usize::MAX,
 | 
			
		||||
            )
 | 
			
		||||
            .unwrap();
 | 
			
		||||
            .unwrap()
 | 
			
		||||
            .infos;
 | 
			
		||||
        assert_eq!(all0.len(), 14);
 | 
			
		||||
 | 
			
		||||
        // Fetch all signatures for address 1 at once...
 | 
			
		||||
@@ -7799,7 +7822,8 @@ pub mod tests {
 | 
			
		||||
                None,
 | 
			
		||||
                usize::MAX,
 | 
			
		||||
            )
 | 
			
		||||
            .unwrap();
 | 
			
		||||
            .unwrap()
 | 
			
		||||
            .infos;
 | 
			
		||||
        assert_eq!(all1.len(), 14);
 | 
			
		||||
 | 
			
		||||
        // Fetch all signatures for address 0 individually
 | 
			
		||||
@@ -7816,7 +7840,8 @@ pub mod tests {
 | 
			
		||||
                    None,
 | 
			
		||||
                    1,
 | 
			
		||||
                )
 | 
			
		||||
                .unwrap();
 | 
			
		||||
                .unwrap()
 | 
			
		||||
                .infos;
 | 
			
		||||
            assert_eq!(results.len(), 1);
 | 
			
		||||
            assert_eq!(results[0], all0[i], "Unexpected result for {}", i);
 | 
			
		||||
        }
 | 
			
		||||
@@ -7838,7 +7863,8 @@ pub mod tests {
 | 
			
		||||
                    },
 | 
			
		||||
                    10,
 | 
			
		||||
                )
 | 
			
		||||
                .unwrap();
 | 
			
		||||
                .unwrap()
 | 
			
		||||
                .infos;
 | 
			
		||||
            assert_eq!(results.len(), 1);
 | 
			
		||||
            assert_eq!(results[0], all0[i], "Unexpected result for {}", i);
 | 
			
		||||
        }
 | 
			
		||||
@@ -7852,6 +7878,7 @@ pub mod tests {
 | 
			
		||||
                1,
 | 
			
		||||
            )
 | 
			
		||||
            .unwrap()
 | 
			
		||||
            .infos
 | 
			
		||||
            .is_empty());
 | 
			
		||||
 | 
			
		||||
        assert!(blockstore
 | 
			
		||||
@@ -7863,6 +7890,7 @@ pub mod tests {
 | 
			
		||||
                2,
 | 
			
		||||
            )
 | 
			
		||||
            .unwrap()
 | 
			
		||||
            .infos
 | 
			
		||||
            .is_empty());
 | 
			
		||||
 | 
			
		||||
        // Fetch all signatures for address 0, three at a time
 | 
			
		||||
@@ -7880,7 +7908,8 @@ pub mod tests {
 | 
			
		||||
                    None,
 | 
			
		||||
                    3,
 | 
			
		||||
                )
 | 
			
		||||
                .unwrap();
 | 
			
		||||
                .unwrap()
 | 
			
		||||
                .infos;
 | 
			
		||||
            if i < 12 {
 | 
			
		||||
                assert_eq!(results.len(), 3);
 | 
			
		||||
                assert_eq!(results[2], all0[i + 2]);
 | 
			
		||||
@@ -7906,7 +7935,8 @@ pub mod tests {
 | 
			
		||||
                    None,
 | 
			
		||||
                    2,
 | 
			
		||||
                )
 | 
			
		||||
                .unwrap();
 | 
			
		||||
                .unwrap()
 | 
			
		||||
                .infos;
 | 
			
		||||
            assert_eq!(results.len(), 2);
 | 
			
		||||
            assert_eq!(results[0].slot, results[1].slot);
 | 
			
		||||
            assert!(results[0].signature >= results[1].signature);
 | 
			
		||||
@@ -7923,7 +7953,8 @@ pub mod tests {
 | 
			
		||||
                None,
 | 
			
		||||
                usize::MAX,
 | 
			
		||||
            )
 | 
			
		||||
            .unwrap();
 | 
			
		||||
            .unwrap()
 | 
			
		||||
            .infos;
 | 
			
		||||
        // The exact number of results returned is variable, based on the sort order of the
 | 
			
		||||
        // random signatures that are generated
 | 
			
		||||
        assert!(!results.is_empty());
 | 
			
		||||
@@ -7936,8 +7967,26 @@ pub mod tests {
 | 
			
		||||
                Some(all1[4].signature),
 | 
			
		||||
                usize::MAX,
 | 
			
		||||
            )
 | 
			
		||||
            .unwrap();
 | 
			
		||||
            .unwrap()
 | 
			
		||||
            .infos;
 | 
			
		||||
        assert!(results2.len() < results.len());
 | 
			
		||||
 | 
			
		||||
        // Remove signature
 | 
			
		||||
        blockstore
 | 
			
		||||
            .address_signatures_cf
 | 
			
		||||
            .delete((0, address0, 2, all0[0].signature))
 | 
			
		||||
            .unwrap();
 | 
			
		||||
        let sig_infos = blockstore
 | 
			
		||||
            .get_confirmed_signatures_for_address2(
 | 
			
		||||
                address0,
 | 
			
		||||
                highest_confirmed_root,
 | 
			
		||||
                Some(all0[0].signature),
 | 
			
		||||
                None,
 | 
			
		||||
                usize::MAX,
 | 
			
		||||
            )
 | 
			
		||||
            .unwrap();
 | 
			
		||||
        assert!(!sig_infos.found_before);
 | 
			
		||||
        assert!(sig_infos.infos.is_empty());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
 
 | 
			
		||||
@@ -31,7 +31,9 @@ use {
 | 
			
		||||
    solana_faucet::faucet::request_airdrop_transaction,
 | 
			
		||||
    solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
 | 
			
		||||
    solana_ledger::{
 | 
			
		||||
        blockstore::Blockstore, blockstore_db::BlockstoreError, get_tmp_ledger_path,
 | 
			
		||||
        blockstore::{Blockstore, SignatureInfosForAddress},
 | 
			
		||||
        blockstore_db::BlockstoreError,
 | 
			
		||||
        get_tmp_ledger_path,
 | 
			
		||||
        leader_schedule_cache::LeaderScheduleCache,
 | 
			
		||||
    },
 | 
			
		||||
    solana_metrics::inc_new_counter_info,
 | 
			
		||||
@@ -1439,7 +1441,7 @@ impl JsonRpcRequestProcessor {
 | 
			
		||||
    pub async fn get_signatures_for_address(
 | 
			
		||||
        &self,
 | 
			
		||||
        address: Pubkey,
 | 
			
		||||
        mut before: Option<Signature>,
 | 
			
		||||
        before: Option<Signature>,
 | 
			
		||||
        until: Option<Signature>,
 | 
			
		||||
        mut limit: usize,
 | 
			
		||||
        commitment: Option<CommitmentConfig>,
 | 
			
		||||
@@ -1460,29 +1462,59 @@ impl JsonRpcRequestProcessor {
 | 
			
		||||
                highest_confirmed_root
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
            let mut results = self
 | 
			
		||||
            let SignatureInfosForAddress {
 | 
			
		||||
                infos: mut results,
 | 
			
		||||
                found_before,
 | 
			
		||||
            } = self
 | 
			
		||||
                .blockstore
 | 
			
		||||
                .get_confirmed_signatures_for_address2(address, highest_slot, before, until, limit)
 | 
			
		||||
                .map_err(|err| Error::invalid_params(format!("{}", err)))?;
 | 
			
		||||
 | 
			
		||||
            if results.len() < limit {
 | 
			
		||||
                if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
 | 
			
		||||
                    let mut bigtable_before = before;
 | 
			
		||||
                    if !results.is_empty() {
 | 
			
		||||
                        limit -= results.len();
 | 
			
		||||
                        before = results.last().map(|x| x.signature);
 | 
			
		||||
                        bigtable_before = results.last().map(|x| x.signature);
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    // If the oldest address-signature found in Blockstore has not yet been
 | 
			
		||||
                    // uploaded to long-term storage, modify the storage query to return all latest
 | 
			
		||||
                    // signatures to prevent erroring on RowNotFound. This can race with upload.
 | 
			
		||||
                    if found_before
 | 
			
		||||
                        && bigtable_before.is_some()
 | 
			
		||||
                        && bigtable_ledger_storage
 | 
			
		||||
                            .get_confirmed_transaction(&bigtable_before.unwrap())
 | 
			
		||||
                            .await
 | 
			
		||||
                            .ok()
 | 
			
		||||
                            .flatten()
 | 
			
		||||
                            .is_none()
 | 
			
		||||
                    {
 | 
			
		||||
                        bigtable_before = None;
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    let bigtable_results = bigtable_ledger_storage
 | 
			
		||||
                        .get_confirmed_signatures_for_address(
 | 
			
		||||
                            &address,
 | 
			
		||||
                            before.as_ref(),
 | 
			
		||||
                            bigtable_before.as_ref(),
 | 
			
		||||
                            until.as_ref(),
 | 
			
		||||
                            limit,
 | 
			
		||||
                        )
 | 
			
		||||
                        .await;
 | 
			
		||||
                    match bigtable_results {
 | 
			
		||||
                        Ok(bigtable_results) => {
 | 
			
		||||
                            results.extend(bigtable_results.into_iter().map(|x| x.0));
 | 
			
		||||
                            let results_set: HashSet<_> =
 | 
			
		||||
                                results.iter().map(|result| result.signature).collect();
 | 
			
		||||
                            for (bigtable_result, _) in bigtable_results {
 | 
			
		||||
                                // In the upload race condition, latest address-signatures in
 | 
			
		||||
                                // long-term storage may include original `before` signature...
 | 
			
		||||
                                if before != Some(bigtable_result.signature)
 | 
			
		||||
                                    // ...or earlier Blockstore signatures
 | 
			
		||||
                                    && !results_set.contains(&bigtable_result.signature)
 | 
			
		||||
                                {
 | 
			
		||||
                                    results.push(bigtable_result);
 | 
			
		||||
                                }
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                        Err(err) => {
 | 
			
		||||
                            warn!("{:?}", err);
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user