Blockstore address signatures: handle slots that cross primary indexes, and refactor get_confirmed_signatures_for_address2 (#11497) (#11507)

* Freeze address-signature index in the middle of slot to show failure case

* Secondary filter on signature

* Use AddressSignatures iterator instead of manually decrementing slots

* Remove unused method

* Add metrics

* Add transaction-status-index doccumentation

(cherry picked from commit de5fb3ba0e)

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>
This commit is contained in:
mergify[bot]
2020-08-10 17:50:05 +00:00
committed by GitHub
parent 11476038cd
commit 1db1d173fc
3 changed files with 142 additions and 138 deletions

View File

@ -1671,6 +1671,10 @@ impl Blockstore {
.collect() .collect()
} }
/// Initializes the TransactionStatusIndex column family with two records, `0` and `1`,
/// which are used as the primary index for entries in the TransactionStatus and
/// AddressSignatures columns. At any given time, one primary index is active (ie. new records
/// are stored under this index), the other is frozen.
fn initialize_transaction_status_index(&self) -> Result<()> { fn initialize_transaction_status_index(&self) -> Result<()> {
self.transaction_status_index_cf self.transaction_status_index_cf
.put(0, &TransactionStatusIndexMeta::default())?; .put(0, &TransactionStatusIndexMeta::default())?;
@ -1687,6 +1691,8 @@ impl Blockstore {
) )
} }
/// Toggles the active primary index between `0` and `1`, and clears the stored max-slot of the
/// frozen index in preparation for pruning.
fn toggle_transaction_status_index( fn toggle_transaction_status_index(
&self, &self,
batch: &mut WriteBatch, batch: &mut WriteBatch,
@ -1902,34 +1908,10 @@ impl Blockstore {
} }
} }
} }
signatures.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap()); signatures.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap().then(a.1.cmp(&b.1)));
Ok(signatures) Ok(signatures)
} }
fn get_lowest_slot_for_address(&self, address: Pubkey) -> Result<Option<Slot>> {
let mut lowest_slot = None;
for transaction_status_cf_primary_index in 0..=1 {
let mut index_iterator = self.address_signatures_cf.iter(IteratorMode::From(
(
transaction_status_cf_primary_index,
address,
0,
Signature::default(),
),
IteratorDirection::Forward,
))?;
if let Some(((i, key_address, slot, _), _)) = index_iterator.next() {
if i == transaction_status_cf_primary_index
&& key_address == address
&& slot < lowest_slot.unwrap_or(Slot::MAX)
{
lowest_slot = Some(slot);
}
}
}
Ok(lowest_slot)
}
pub fn get_confirmed_signatures_for_address( pub fn get_confirmed_signatures_for_address(
&self, &self,
pubkey: Pubkey, pubkey: Pubkey,
@ -1967,7 +1949,8 @@ impl Blockstore {
// Figure the `slot` to start listing signatures at, based on the ledger location of the // Figure the `slot` to start listing signatures at, based on the ledger location of the
// `before` signature if present. Also generate a HashSet of signatures that should // `before` signature if present. Also generate a HashSet of signatures that should
// be excluded from the results. // be excluded from the results.
let (mut slot, mut excluded_signatures) = match before { let mut get_before_slot_timer = Measure::start("get_before_slot_timer");
let (slot, mut excluded_signatures) = match before {
None => (highest_confirmed_root, None), None => (highest_confirmed_root, None),
Some(before) => { Some(before) => {
let transaction_status = self.get_transaction_status(before)?; let transaction_status = self.get_transaction_status(before)?;
@ -1999,9 +1982,10 @@ impl Blockstore {
.collect(); .collect();
// Sort signatures as a way to entire a stable ordering within a slot, as // Sort signatures as a way to entire a stable ordering within a slot, as
// `self.find_address_signatures()` is ordered by signatures ordered and // the AddressSignatures column is ordered by signatures within a slot,
// not by block ordering // not by block ordering
slot_signatures.sort(); slot_signatures.sort();
slot_signatures.reverse();
if let Some(pos) = slot_signatures.iter().position(|&x| x == before) { if let Some(pos) = slot_signatures.iter().position(|&x| x == before) {
slot_signatures.truncate(pos + 1); slot_signatures.truncate(pos + 1);
@ -2015,41 +1999,96 @@ impl Blockstore {
} }
} }
}; };
get_before_slot_timer.stop();
// Fetch the list of signatures that affect the given address // Fetch the list of signatures that affect the given address
let first_available_block = self.get_first_available_block()?; let first_available_block = self.get_first_available_block()?;
let first_address_slot = self.get_lowest_slot_for_address(address)?;
if first_address_slot.is_none() {
return Ok(vec![]);
}
let lower_bound = cmp::max(first_available_block, first_address_slot.unwrap());
let mut address_signatures = vec![]; let mut address_signatures = vec![];
loop {
if address_signatures.len() >= limit {
address_signatures.truncate(limit);
break;
}
let mut signatures = self.find_address_signatures(address, slot, slot)?; // Get signatures in `slot`
if let Some(excluded_signatures) = excluded_signatures.take() { let mut get_initial_slot_timer = Measure::start("get_initial_slot_timer");
address_signatures.extend( let mut signatures = self.find_address_signatures(address, slot, slot)?;
signatures signatures.reverse();
.into_iter() if let Some(excluded_signatures) = excluded_signatures.take() {
.filter(|(_, signature)| !excluded_signatures.contains(&signature)), address_signatures.extend(
) signatures
} else { .into_iter()
address_signatures.append(&mut signatures); .filter(|(_, signature)| !excluded_signatures.contains(&signature)),
} )
excluded_signatures = None; } else {
address_signatures.append(&mut signatures);
if slot == lower_bound {
break;
}
slot -= 1;
} }
get_initial_slot_timer.stop();
// Check the active_transaction_status_index to see if it contains slot. If so, start with
// that index, as it will contain higher slots
let starting_primary_index = *self.active_transaction_status_index.read().unwrap();
let next_primary_index = if starting_primary_index == 0 { 1 } else { 0 };
let next_max_slot = self
.transaction_status_index_cf
.get(next_primary_index)?
.unwrap()
.max_slot;
let mut starting_primary_index_iter_timer = Measure::start("starting_primary_index_iter");
if slot > next_max_slot {
let mut starting_iterator = self.address_signatures_cf.iter(IteratorMode::From(
(starting_primary_index, address, slot, Signature::default()),
IteratorDirection::Reverse,
))?;
// Iterate through starting_iterator until limit is reached
while address_signatures.len() < limit {
if let Some(((i, key_address, slot, signature), _)) = starting_iterator.next() {
if slot == next_max_slot {
break;
}
if i == starting_primary_index
&& key_address == address
&& slot >= first_available_block
{
address_signatures.push((slot, signature));
continue;
}
}
break;
}
// Handle slots that cross primary indexes
let mut signatures =
self.find_address_signatures(address, next_max_slot, next_max_slot)?;
signatures.reverse();
address_signatures.append(&mut signatures);
}
starting_primary_index_iter_timer.stop();
// Iterate through next_iterator until limit is reached
let mut next_primary_index_iter_timer = Measure::start("next_primary_index_iter_timer");
let mut next_iterator = self.address_signatures_cf.iter(IteratorMode::From(
(next_primary_index, address, slot, Signature::default()),
IteratorDirection::Reverse,
))?;
while address_signatures.len() < limit {
if let Some(((i, key_address, slot, signature), _)) = next_iterator.next() {
// Skip next_max_slot, which is already included
if slot == next_max_slot {
continue;
}
if i == next_primary_index
&& key_address == address
&& slot >= first_available_block
{
address_signatures.push((slot, signature));
continue;
}
}
break;
}
next_primary_index_iter_timer.stop();
address_signatures.truncate(limit); address_signatures.truncate(limit);
// Fill in the status information for each found transaction // Fill in the status information for each found transaction
let mut get_status_info_timer = Measure::start("get_status_info_timer");
let mut infos = vec![]; let mut infos = vec![];
for (slot, signature) in address_signatures.into_iter() { for (slot, signature) in address_signatures.into_iter() {
let transaction_status = self.get_transaction_status(signature)?; let transaction_status = self.get_transaction_status(signature)?;
@ -2064,6 +2103,36 @@ impl Blockstore {
memo: None, memo: None,
}); });
} }
get_status_info_timer.stop();
datapoint_info!(
"blockstore-get-conf-sigs-for-addr-2",
(
"get_before_slot_us",
get_before_slot_timer.as_us() as i64,
i64
),
(
"get_initial_slot_us",
get_initial_slot_timer.as_us() as i64,
i64
),
(
"starting_primary_index_iter_us",
starting_primary_index_iter_timer.as_us() as i64,
i64
),
(
"next_primary_index_iter_us",
next_primary_index_iter_timer.as_us() as i64,
i64
),
(
"get_status_info_us",
get_status_info_timer.as_us() as i64,
i64
)
);
Ok(infos) Ok(infos)
} }
@ -6249,82 +6318,6 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
} }
#[test]
fn test_get_lowest_slot_for_address() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let address = Pubkey::new_rand();
let address2 = Pubkey::new_rand();
let slot = 5;
// Add an additional to record to ensure that existent or lower slots in entries for
// other addresses do not affect return
blockstore
.address_signatures_cf
.put(
(0, address2, slot, Signature::default()),
&AddressSignatureMeta { writeable: false },
)
.unwrap();
assert_eq!(
blockstore.get_lowest_slot_for_address(address).unwrap(),
None
);
let slot = 200;
blockstore
.address_signatures_cf
.put(
(0, address, slot, Signature::default()),
&AddressSignatureMeta { writeable: false },
)
.unwrap();
assert_eq!(
blockstore.get_lowest_slot_for_address(address).unwrap(),
Some(200)
);
blockstore
.address_signatures_cf
.put(
(1, address, slot, Signature::default()),
&AddressSignatureMeta { writeable: false },
)
.unwrap();
assert_eq!(
blockstore.get_lowest_slot_for_address(address).unwrap(),
Some(200)
);
let slot = 300;
blockstore
.address_signatures_cf
.put(
(1, address, slot, Signature::default()),
&AddressSignatureMeta { writeable: false },
)
.unwrap();
assert_eq!(
blockstore.get_lowest_slot_for_address(address).unwrap(),
Some(200)
);
let slot = 100;
blockstore
.address_signatures_cf
.put(
(1, address, slot, Signature::default()),
&AddressSignatureMeta { writeable: false },
)
.unwrap();
assert_eq!(
blockstore.get_lowest_slot_for_address(address).unwrap(),
Some(100)
);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test] #[test]
fn test_get_confirmed_signatures_for_address2() { fn test_get_confirmed_signatures_for_address2() {
let blockstore_path = get_tmp_ledger_path!(); let blockstore_path = get_tmp_ledger_path!();
@ -6351,14 +6344,18 @@ pub mod tests {
let address0 = Pubkey::new_rand(); let address0 = Pubkey::new_rand();
let address1 = Pubkey::new_rand(); let address1 = Pubkey::new_rand();
for slot in 2..=4 { for slot in 2..=7 {
let entries = make_slot_entries_with_transaction_addresses(&[ let entries = make_slot_entries_with_transaction_addresses(&[
address0, address1, address0, address1, address0, address1, address0, address1,
]); ]);
let shreds = entries_to_test_shreds(entries.clone(), slot, slot - 1, true, 0); let shreds = entries_to_test_shreds(entries.clone(), slot, slot - 1, true, 0);
blockstore.insert_shreds(shreds, None, false).unwrap(); blockstore.insert_shreds(shreds, None, false).unwrap();
for entry in &entries { for (i, entry) in entries.iter().enumerate() {
if slot == 4 && i == 2 {
// Purge to freeze index 0 and write address-signatures in new primary index
blockstore.run_purge(0, 1, PurgeType::PrimaryIndex).unwrap();
}
for transaction in &entry.transactions { for transaction in &entry.transactions {
assert_eq!(transaction.signatures.len(), 1); assert_eq!(transaction.signatures.len(), 1);
blockstore blockstore
@ -6373,8 +6370,8 @@ pub mod tests {
} }
} }
} }
blockstore.set_roots(&[1, 2, 3, 4]).unwrap(); blockstore.set_roots(&[1, 2, 3, 4, 5, 6, 7]).unwrap();
let highest_confirmed_root = 4; let highest_confirmed_root = 7;
// Fetch all signatures for address 0 at once... // Fetch all signatures for address 0 at once...
let all0 = blockstore let all0 = blockstore
@ -6385,7 +6382,7 @@ pub mod tests {
usize::MAX, usize::MAX,
) )
.unwrap(); .unwrap();
assert_eq!(all0.len(), 6); assert_eq!(all0.len(), 12);
// Fetch all signatures for address 1 at once... // Fetch all signatures for address 1 at once...
let all1 = blockstore let all1 = blockstore
@ -6396,7 +6393,7 @@ pub mod tests {
usize::MAX, usize::MAX,
) )
.unwrap(); .unwrap();
assert_eq!(all1.len(), 6); assert_eq!(all1.len(), 12);
assert!(all0 != all1); assert!(all0 != all1);
@ -6449,7 +6446,7 @@ pub mod tests {
assert_eq!(results[2], all0[i + 2]); assert_eq!(results[2], all0[i + 2]);
} }
// Ensure that the signatures within a slot are ordered by signature // Ensure that the signatures within a slot are reverse ordered by signature
// (current limitation of the .get_confirmed_signatures_for_address2()) // (current limitation of the .get_confirmed_signatures_for_address2())
for i in (0..all1.len()).step_by(2) { for i in (0..all1.len()).step_by(2) {
let results = blockstore let results = blockstore
@ -6466,7 +6463,7 @@ pub mod tests {
.unwrap(); .unwrap();
assert_eq!(results.len(), 2); assert_eq!(results.len(), 2);
assert_eq!(results[0].slot, results[1].slot); assert_eq!(results[0].slot, results[1].slot);
assert!(results[0].signature <= results[1].signature); assert!(results[0].signature >= results[1].signature);
assert_eq!(results[0], all1[i]); assert_eq!(results[0], all1[i]);
assert_eq!(results[1], all1[i + 1]); assert_eq!(results[1], all1[i + 1]);
} }

View File

@ -235,6 +235,9 @@ impl Blockstore {
Ok(result) Ok(result)
} }
/// Purges special columns (using a non-Slot primary-index) exactly, by deserializing each slot
/// being purged and iterating through all transactions to determine the keys of individual
/// records. **This method is very slow.**
fn purge_special_columns_exact( fn purge_special_columns_exact(
&self, &self,
batch: &mut WriteBatch, batch: &mut WriteBatch,
@ -279,6 +282,8 @@ impl Blockstore {
Ok(()) Ok(())
} }
/// Purges special columns (using a non-Slot primary-index) by range. Purge occurs if frozen
/// primary index has a max-slot less than the highest slot being purged.
fn purge_special_columns_with_primary_index( fn purge_special_columns_with_primary_index(
&self, &self,
write_batch: &mut WriteBatch, write_batch: &mut WriteBatch,

View File

@ -39,7 +39,9 @@ const CODE_SHRED_CF: &str = "code_shred";
const TRANSACTION_STATUS_CF: &str = "transaction_status"; const TRANSACTION_STATUS_CF: &str = "transaction_status";
/// Column family for Address Signatures /// Column family for Address Signatures
const ADDRESS_SIGNATURES_CF: &str = "address_signatures"; const ADDRESS_SIGNATURES_CF: &str = "address_signatures";
/// Column family for Transaction Status Index /// Column family for the Transaction Status Index.
/// This column family is used for tracking the active primary index for columns that for
/// query performance reasons should not be indexed by Slot.
const TRANSACTION_STATUS_INDEX_CF: &str = "transaction_status_index"; const TRANSACTION_STATUS_INDEX_CF: &str = "transaction_status_index";
/// Column family for Rewards /// Column family for Rewards
const REWARDS_CF: &str = "rewards"; const REWARDS_CF: &str = "rewards";