From 24b0fc89279e3d523fb2377d6b564dda74fe6001 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 29 Dec 2021 13:03:32 -0700 Subject: [PATCH] get_signatures_for_address does not correctly account for result sets that span local and Bigtable sources (backport #22115) (#22167) * get_signatures_for_address does not correctly account for result sets that span local and Bigtable sources (#22115) * get_signatures_for_address does not correctly account for result sets that span Blockstore and Bigtable. This causes Bigtable to return `RowNotFound` until the new tx is uploaded. Check that `before` exists in Bigtable, and if not, set it to `None` to return the full data set. References #21442 Closes #22110 * Differentiate between before sig not found and no newer signatures * Dedupe bigtable results to account for potential upload race Co-authored-by: Tyera Eulberg (cherry picked from commit bac6821e192435e620e7c81451cedff7aca7cc9f) # Conflicts: # ledger/src/blockstore.rs * Fix conflicts Co-authored-by: Omar Kilani Co-authored-by: Tyera Eulberg --- ledger/src/blockstore.rs | 93 ++++++++++++++++++++++++++++++---------- rpc/src/rpc.rs | 44 ++++++++++++++++--- 2 files changed, 109 insertions(+), 28 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 0642d7f7d7..038f91966f 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -92,6 +92,12 @@ pub type CompletedSlotsSender = SyncSender>; pub type CompletedSlotsReceiver = Receiver>; type CompletedRanges = Vec<(u32, u32)>; +#[derive(Default)] +pub struct SignatureInfosForAddress { + pub infos: Vec, + pub found_before: bool, +} + #[derive(Clone, Copy)] pub enum PurgeType { Exact, @@ -2402,7 +2408,7 @@ impl Blockstore { before: Option, until: Option, limit: usize, - ) -> Result> { + ) -> Result { datapoint_info!( "blockstore-rpc-api", ( @@ -2426,7 +2432,7 @@ impl Blockstore { let transaction_status = self.get_transaction_status(before, &confirmed_unrooted_slots)?; match transaction_status { - None => return Ok(vec![]), + None => return Ok(SignatureInfosForAddress::default()), Some((slot, _)) => { let block = self.get_complete_block(slot, false).map_err(|err| { BlockstoreError::Io(IoError::new( @@ -2668,7 +2674,10 @@ impl Blockstore { ) ); - Ok(infos) + Ok(SignatureInfosForAddress { + infos, + found_before: true, // if `before` signature was not found, this method returned early + }) } pub fn read_rewards(&self, index: Slot) -> Result> { @@ -7484,7 +7493,7 @@ pub mod tests { let highest_confirmed_root = 8; // Fetch all rooted signatures for address 0 at once... - let all0 = blockstore + let sig_infos = blockstore .get_confirmed_signatures_for_address2( address0, highest_confirmed_root, @@ -7493,6 +7502,8 @@ pub mod tests { usize::MAX, ) .unwrap(); + assert!(sig_infos.found_before); + let all0 = sig_infos.infos; assert_eq!(all0.len(), 12); // Fetch all rooted signatures for address 1 at once... @@ -7504,12 +7515,13 @@ pub mod tests { None, usize::MAX, ) - .unwrap(); + .unwrap() + .infos; assert_eq!(all1.len(), 12); // Fetch all signatures for address 0 individually for i in 0..all0.len() { - let results = blockstore + let sig_infos = blockstore .get_confirmed_signatures_for_address2( address0, highest_confirmed_root, @@ -7522,6 +7534,8 @@ pub mod tests { 1, ) .unwrap(); + assert!(sig_infos.found_before); + let results = sig_infos.infos; assert_eq!(results.len(), 1); assert_eq!(results[0], all0[i], "Unexpected result for {}", i); } @@ -7543,12 +7557,13 @@ pub mod tests { }, 10, ) - .unwrap(); + .unwrap() + .infos; assert_eq!(results.len(), 1); assert_eq!(results[0], all0[i], "Unexpected result for {}", i); } - assert!(blockstore + let sig_infos = blockstore .get_confirmed_signatures_for_address2( address0, highest_confirmed_root, @@ -7556,8 +7571,9 @@ pub mod tests { None, 1, ) - .unwrap() - .is_empty()); + .unwrap(); + assert!(sig_infos.found_before); + assert!(sig_infos.infos.is_empty()); assert!(blockstore .get_confirmed_signatures_for_address2( @@ -7568,6 +7584,7 @@ pub mod tests { 2, ) .unwrap() + .infos .is_empty()); // Fetch all signatures for address 0, three at a time @@ -7585,7 +7602,8 @@ pub mod tests { None, 3, ) - .unwrap(); + .unwrap() + .infos; assert_eq!(results.len(), 3); assert_eq!(results[0], all0[i]); assert_eq!(results[1], all0[i + 1]); @@ -7607,7 +7625,8 @@ pub mod tests { None, 2, ) - .unwrap(); + .unwrap() + .infos; assert_eq!(results.len(), 2); assert_eq!(results[0].slot, results[1].slot); assert!(results[0].signature >= results[1].signature); @@ -7616,7 +7635,7 @@ pub mod tests { } // A search for address 0 with `before` and/or `until` signatures from address1 should also work - let results = blockstore + let sig_infos = blockstore .get_confirmed_signatures_for_address2( address0, highest_confirmed_root, @@ -7625,6 +7644,8 @@ pub mod tests { usize::MAX, ) .unwrap(); + assert!(sig_infos.found_before); + let results = sig_infos.infos; // The exact number of results returned is variable, based on the sort order of the // random signatures that are generated assert!(!results.is_empty()); @@ -7637,7 +7658,8 @@ pub mod tests { Some(all1[4].signature), usize::MAX, ) - .unwrap(); + .unwrap() + .infos; assert!(results2.len() < results.len()); // Duplicate all tests using confirmed signatures @@ -7652,7 +7674,8 @@ pub mod tests { None, usize::MAX, ) - .unwrap(); + .unwrap() + .infos; assert_eq!(all0.len(), 14); // Fetch all signatures for address 1 at once... @@ -7664,7 +7687,8 @@ pub mod tests { None, usize::MAX, ) - .unwrap(); + .unwrap() + .infos; assert_eq!(all1.len(), 14); // Fetch all signatures for address 0 individually @@ -7681,7 +7705,8 @@ pub mod tests { None, 1, ) - .unwrap(); + .unwrap() + .infos; assert_eq!(results.len(), 1); assert_eq!(results[0], all0[i], "Unexpected result for {}", i); } @@ -7703,7 +7728,8 @@ pub mod tests { }, 10, ) - .unwrap(); + .unwrap() + .infos; assert_eq!(results.len(), 1); assert_eq!(results[0], all0[i], "Unexpected result for {}", i); } @@ -7717,6 +7743,7 @@ pub mod tests { 1, ) .unwrap() + .infos .is_empty()); assert!(blockstore @@ -7728,6 +7755,7 @@ pub mod tests { 2, ) .unwrap() + .infos .is_empty()); // Fetch all signatures for address 0, three at a time @@ -7745,7 +7773,8 @@ pub mod tests { None, 3, ) - .unwrap(); + .unwrap() + .infos; if i < 12 { assert_eq!(results.len(), 3); assert_eq!(results[2], all0[i + 2]); @@ -7771,7 +7800,8 @@ pub mod tests { None, 2, ) - .unwrap(); + .unwrap() + .infos; assert_eq!(results.len(), 2); assert_eq!(results[0].slot, results[1].slot); assert!(results[0].signature >= results[1].signature); @@ -7788,7 +7818,8 @@ pub mod tests { None, usize::MAX, ) - .unwrap(); + .unwrap() + .infos; // The exact number of results returned is variable, based on the sort order of the // random signatures that are generated assert!(!results.is_empty()); @@ -7801,8 +7832,26 @@ pub mod tests { Some(all1[4].signature), usize::MAX, ) - .unwrap(); + .unwrap() + .infos; assert!(results2.len() < results.len()); + + // Remove signature + blockstore + .address_signatures_cf + .delete((0, address0, 2, all0[0].signature)) + .unwrap(); + let sig_infos = blockstore + .get_confirmed_signatures_for_address2( + address0, + highest_confirmed_root, + Some(all0[0].signature), + None, + usize::MAX, + ) + .unwrap(); + assert!(!sig_infos.found_before); + assert!(sig_infos.infos.is_empty()); } Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 3e06e94f4e..1b9711732c 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -34,7 +34,9 @@ use { solana_faucet::faucet::request_airdrop_transaction, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_ledger::{ - blockstore::Blockstore, blockstore_db::BlockstoreError, get_tmp_ledger_path, + blockstore::{Blockstore, SignatureInfosForAddress}, + blockstore_db::BlockstoreError, + get_tmp_ledger_path, leader_schedule_cache::LeaderScheduleCache, }, solana_metrics::inc_new_counter_info, @@ -1418,7 +1420,7 @@ impl JsonRpcRequestProcessor { pub async fn get_signatures_for_address( &self, address: Pubkey, - mut before: Option, + before: Option, until: Option, mut limit: usize, commitment: Option, @@ -1439,29 +1441,59 @@ impl JsonRpcRequestProcessor { highest_confirmed_root }; - let mut results = self + let SignatureInfosForAddress { + infos: mut results, + found_before, + } = self .blockstore .get_confirmed_signatures_for_address2(address, highest_slot, before, until, limit) .map_err(|err| Error::invalid_params(format!("{}", err)))?; if results.len() < limit { if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { + let mut bigtable_before = before; if !results.is_empty() { limit -= results.len(); - before = results.last().map(|x| x.signature); + bigtable_before = results.last().map(|x| x.signature); + } + + // If the oldest address-signature found in Blockstore has not yet been + // uploaded to long-term storage, modify the storage query to return all latest + // signatures to prevent erroring on RowNotFound. This can race with upload. + if found_before + && bigtable_before.is_some() + && bigtable_ledger_storage + .get_confirmed_transaction(&bigtable_before.unwrap()) + .await + .ok() + .flatten() + .is_none() + { + bigtable_before = None; } let bigtable_results = bigtable_ledger_storage .get_confirmed_signatures_for_address( &address, - before.as_ref(), + bigtable_before.as_ref(), until.as_ref(), limit, ) .await; match bigtable_results { Ok(bigtable_results) => { - results.extend(bigtable_results.into_iter().map(|x| x.0)); + let results_set: HashSet<_> = + results.iter().map(|result| result.signature).collect(); + for (bigtable_result, _) in bigtable_results { + // In the upload race condition, latest address-signatures in + // long-term storage may include original `before` signature... + if before != Some(bigtable_result.signature) + // ...or earlier Blockstore signatures + && !results_set.contains(&bigtable_result.signature) + { + results.push(bigtable_result); + } + } } Err(err) => { warn!("{:?}", err);