Rpc: Add until parameter for getConfirmedSignaturesForAddress2 (#11644)

* Refactor bigtable apis to accept start and end keys

* Make helper fn to deserialize cell data

* Refactor get_confirmed_signatures_for_address to use get_row_data range

* Add until param to get_confirmed_signatures_for_address

* Add until param to blockstore api

* Plumb until through client/cli

* Simplify client params
This commit is contained in:
Tyera Eulberg
2020-08-15 10:42:17 -06:00
committed by GitHub
parent b10f874f49
commit 6c5b8f324a
9 changed files with 282 additions and 78 deletions

View File

@@ -29,6 +29,7 @@ pub type RowKey = String;
pub type CellName = String;
pub type CellValue = Vec<u8>;
pub type RowData = Vec<(CellName, CellValue)>;
pub type RowDataSlice<'a> = &'a [(CellName, CellValue)];
#[derive(Debug, Error)]
pub enum Error {
@@ -287,10 +288,14 @@ impl BigTable {
///
/// If `start_at` is provided, the row key listing will start with key.
/// Otherwise the listing will start from the start of the table.
///
/// If `end_at` is provided, the row key listing will end at the key. Otherwise it will
/// continue until the `limit` is reached or the end of the table, whichever comes first.
pub async fn get_row_keys(
&mut self,
table_name: &str,
start_at: Option<RowKey>,
end_at: Option<RowKey>,
rows_limit: i64,
) -> Result<Vec<RowKey>> {
self.refresh_access_token().await;
@@ -301,16 +306,13 @@ impl BigTable {
rows_limit,
rows: Some(RowSet {
row_keys: vec![],
row_ranges: if let Some(row_key) = start_at {
vec![RowRange {
start_key: Some(row_range::StartKey::StartKeyClosed(
row_key.into_bytes(),
)),
end_key: None,
}]
} else {
vec![]
},
row_ranges: vec![RowRange {
start_key: start_at.map(|row_key| {
row_range::StartKey::StartKeyClosed(row_key.into_bytes())
}),
end_key: end_at
.map(|row_key| row_range::EndKey::EndKeyClosed(row_key.into_bytes())),
}],
}),
filter: Some(RowFilter {
filter: Some(row_filter::Filter::Chain(row_filter::Chain {
@@ -339,21 +341,39 @@ impl BigTable {
Ok(rows.into_iter().map(|r| r.0).collect())
}
/// Get latest data from `limit` rows of `table`, starting inclusively at the `row_key` row.
/// Get latest data from `table`.
///
/// All column families are accepted, and only the latest version of each column cell will be
/// returned.
pub async fn get_row_data(&mut self, table_name: &str, row_key: RowKey) -> Result<RowData> {
///
/// If `start_at` is provided, the row key listing will start with key.
/// Otherwise the listing will start from the start of the table.
///
/// If `end_at` is provided, the row key listing will end at the key. Otherwise it will
/// continue until the `limit` is reached or the end of the table, whichever comes first.
pub async fn get_row_data(
&mut self,
table_name: &str,
start_at: Option<RowKey>,
end_at: Option<RowKey>,
rows_limit: i64,
) -> Result<Vec<(RowKey, RowData)>> {
self.refresh_access_token().await;
let response = self
.client
.read_rows(ReadRowsRequest {
table_name: format!("{}{}", self.table_prefix, table_name),
rows_limit: 1,
rows_limit,
rows: Some(RowSet {
row_keys: vec![row_key.into_bytes()],
row_ranges: vec![],
row_keys: vec![],
row_ranges: vec![RowRange {
start_key: start_at.map(|row_key| {
row_range::StartKey::StartKeyClosed(row_key.into_bytes())
}),
end_key: end_at
.map(|row_key| row_range::EndKey::EndKeyClosed(row_key.into_bytes())),
}],
}),
filter: Some(RowFilter {
// Only return the latest version of each cell
@@ -364,11 +384,7 @@ impl BigTable {
.await?
.into_inner();
let rows = Self::decode_read_rows_response(response).await?;
rows.into_iter()
.next()
.map(|r| r.1)
.ok_or_else(|| Error::RowNotFound)
Self::decode_read_rows_response(response).await
}
/// Store data for one or more `table` rows in the `family_name` Column family
@@ -429,19 +445,10 @@ impl BigTable {
where
T: serde::de::DeserializeOwned,
{
let row_data = self.get_row_data(table, key.clone()).await?;
let row_data = self.get_row_data(table, Some(key.clone()), None, 1).await?;
let (row_key, data) = &row_data[0];
let value = row_data
.into_iter()
.find(|(name, _)| name == "bin")
.ok_or_else(|| Error::ObjectNotFound(format!("{}/{}", table, key)))?
.1;
let data = decompress(&value)?;
bincode::deserialize(&data).map_err(|err| {
warn!("Failed to deserialize {}/{}: {}", table, key, err);
Error::ObjectCorrupt(format!("{}/{}", table, key))
})
deserialize_cell_data(data, table, row_key.to_string())
}
pub async fn put_bincode_cells<T>(
@@ -464,3 +471,24 @@ impl BigTable {
Ok(bytes_written)
}
}
pub(crate) fn deserialize_cell_data<T>(
row_data: RowDataSlice,
table: &str,
key: RowKey,
) -> Result<T>
where
T: serde::de::DeserializeOwned,
{
let value = &row_data
.iter()
.find(|(name, _)| name == "bin")
.ok_or_else(|| Error::ObjectNotFound(format!("{}/{}", table, key)))?
.1;
let data = decompress(&value)?;
bincode::deserialize(&data).map_err(|err| {
warn!("Failed to deserialize {}/{}: {}", table, key, err);
Error::ObjectCorrupt(format!("{}/{}", table, key))
})
}

View File

@@ -276,7 +276,7 @@ impl LedgerStorage {
/// Return the available slot that contains a block
pub async fn get_first_available_block(&self) -> Result<Option<Slot>> {
let mut bigtable = self.connection.client();
let blocks = bigtable.get_row_keys("blocks", None, 1).await?;
let blocks = bigtable.get_row_keys("blocks", None, None, 1).await?;
if blocks.is_empty() {
return Ok(None);
}
@@ -290,7 +290,7 @@ impl LedgerStorage {
pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result<Vec<Slot>> {
let mut bigtable = self.connection.client();
let blocks = bigtable
.get_row_keys("blocks", Some(slot_to_key(start_slot)), limit as i64)
.get_row_keys("blocks", Some(slot_to_key(start_slot)), None, limit as i64)
.await?;
Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect())
}
@@ -365,13 +365,14 @@ impl LedgerStorage {
&self,
address: &Pubkey,
before_signature: Option<&Signature>,
until_signature: Option<&Signature>,
limit: usize,
) -> Result<Vec<ConfirmedTransactionStatusWithSignature>> {
let mut bigtable = self.connection.client();
let address_prefix = format!("{}/", address);
// Figure out where to start listing from based on `before_signature`
let (first_slot, mut before_transaction_index) = match before_signature {
let (first_slot, before_transaction_index) = match before_signature {
None => (Slot::MAX, 0),
Some(before_signature) => {
let TransactionInfo { slot, index, .. } = bigtable
@@ -382,6 +383,18 @@ impl LedgerStorage {
}
};
// Figure out where to end listing from based on `until_signature`
let (last_slot, until_transaction_index) = match until_signature {
None => (0, u32::MAX),
Some(until_signature) => {
let TransactionInfo { slot, index, .. } = bigtable
.get_bincode_cell("tx", until_signature.to_string())
.await?;
(slot, index)
}
};
let mut infos = vec![];
let starting_slot_tx_by_addr_infos = bigtable
@@ -391,50 +404,46 @@ impl LedgerStorage {
)
.await?;
// Return the next tx-by-addr keys of amount `limit` plus extra to account for the largest
// Return the next tx-by-addr data of amount `limit` plus extra to account for the largest
// number that might be flitered out
let tx_by_addr_info_keys = bigtable
.get_row_keys(
let tx_by_addr_data = bigtable
.get_row_data(
"tx-by-addr",
Some(format!("{}{}", address_prefix, slot_to_key(!first_slot))),
Some(format!("{}{}", address_prefix, slot_to_key(!last_slot))),
limit as i64 + starting_slot_tx_by_addr_infos.len() as i64,
)
.await?;
// Read each tx-by-addr object until `limit` signatures have been found
'outer: for key in tx_by_addr_info_keys {
trace!("key is {}: slot is {}", key, &key[address_prefix.len()..]);
if !key.starts_with(&address_prefix) {
break 'outer;
}
let slot = !key_to_slot(&key[address_prefix.len()..]).ok_or_else(|| {
'outer: for (row_key, data) in tx_by_addr_data {
let slot = !key_to_slot(&row_key[address_prefix.len()..]).ok_or_else(|| {
bigtable::Error::ObjectCorrupt(format!(
"Failed to convert key to slot: tx-by-addr/{}",
key
row_key
))
})?;
let tx_by_addr_infos = bigtable
.get_bincode_cell::<Vec<TransactionByAddrInfo>>("tx-by-addr", key)
.await?;
for tx_by_addr_info in tx_by_addr_infos
.into_iter()
.filter(|tx_by_addr_info| tx_by_addr_info.index < before_transaction_index)
{
let cell_data: Vec<TransactionByAddrInfo> =
bigtable::deserialize_cell_data(&data, "tx-by-addr", row_key)?;
for tx_by_addr_info in cell_data.into_iter() {
// Filter out records before `before_transaction_index`
if slot == first_slot && tx_by_addr_info.index >= before_transaction_index {
continue;
}
// Filter out records after `until_transaction_index`
if slot == last_slot && tx_by_addr_info.index <= until_transaction_index {
continue;
}
infos.push(ConfirmedTransactionStatusWithSignature {
signature: tx_by_addr_info.signature,
slot,
err: tx_by_addr_info.err,
memo: tx_by_addr_info.memo,
});
// Respect limit
if infos.len() >= limit {
break 'outer;
}
}
before_transaction_index = u32::MAX;
}
Ok(infos)
}