Add delete subcommand to ledger-tool bigtable (#19931)

* Add `delete` subcommand to `ledger-tool bigtable` command

* feedback
This commit is contained in:
Justin Starry
2021-09-16 18:37:45 -05:00
committed by GitHub
parent 92510a2831
commit c71fab6cb3
5 changed files with 327 additions and 7 deletions

View File

@@ -6,6 +6,7 @@ use {
compression::{compress_best, decompress},
root_ca_certificate,
},
backoff::{future::retry, ExponentialBackoff},
log::*,
std::time::{Duration, Instant},
thiserror::Error,
@@ -66,6 +67,9 @@ pub enum Error {
#[error("Row write failed")]
RowWriteFailed,
#[error("Row delete failed")]
RowDeleteFailed,
#[error("Object not found: {0}")]
ObjectNotFound(String),
@@ -218,7 +222,6 @@ impl BigTableConnection {
where
T: serde::ser::Serialize,
{
use backoff::{future::retry, ExponentialBackoff};
retry(ExponentialBackoff::default(), || async {
let mut client = self.client();
Ok(client.put_bincode_cells(table, cells).await?)
@@ -226,6 +229,29 @@ impl BigTableConnection {
.await
}
pub async fn delete_rows_with_retry(&self, table: &str, row_keys: &[RowKey]) -> Result<()> {
retry(ExponentialBackoff::default(), || async {
let mut client = self.client();
Ok(client.delete_rows(table, row_keys).await?)
})
.await
}
pub async fn get_bincode_cells_with_retry<T>(
&self,
table: &str,
row_keys: &[RowKey],
) -> Result<Vec<(RowKey, Result<T>)>>
where
T: serde::de::DeserializeOwned,
{
retry(ExponentialBackoff::default(), || async {
let mut client = self.client();
Ok(client.get_bincode_cells(table, row_keys).await?)
})
.await
}
pub async fn put_protobuf_cells_with_retry<T>(
&self,
table: &str,
@@ -234,7 +260,6 @@ impl BigTableConnection {
where
T: prost::Message,
{
use backoff::{future::retry, ExponentialBackoff};
retry(ExponentialBackoff::default(), || async {
let mut client = self.client();
Ok(client.put_protobuf_cells(table, cells).await?)
@@ -444,6 +469,38 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
self.decode_read_rows_response(response).await
}
/// Get latest data from multiple rows of `table`, if those rows exist.
pub async fn get_multi_row_data(
&mut self,
table_name: &str,
row_keys: &[RowKey],
) -> Result<Vec<(RowKey, RowData)>> {
self.refresh_access_token().await;
let response = self
.client
.read_rows(ReadRowsRequest {
table_name: format!("{}{}", self.table_prefix, table_name),
rows_limit: 0, // return all keys
rows: Some(RowSet {
row_keys: row_keys
.iter()
.map(|k| k.as_bytes().to_vec())
.collect::<Vec<_>>(),
row_ranges: vec![],
}),
filter: Some(RowFilter {
// Only return the latest version of each cell
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
}),
..ReadRowsRequest::default()
})
.await?
.into_inner();
self.decode_read_rows_response(response).await
}
/// Get latest data from a single row of `table`, if that row exists. Returns an error if that
/// row does not exist.
///
@@ -481,6 +538,47 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
.ok_or(Error::RowNotFound)
}
/// Delete one or more `table` rows
async fn delete_rows(&mut self, table_name: &str, row_keys: &[RowKey]) -> Result<()> {
self.refresh_access_token().await;
let mut entries = vec![];
for row_key in row_keys {
entries.push(mutate_rows_request::Entry {
row_key: row_key.as_bytes().to_vec(),
mutations: vec![Mutation {
mutation: Some(mutation::Mutation::DeleteFromRow(
mutation::DeleteFromRow {},
)),
}],
});
}
let mut response = self
.client
.mutate_rows(MutateRowsRequest {
table_name: format!("{}{}", self.table_prefix, table_name),
entries,
..MutateRowsRequest::default()
})
.await?
.into_inner();
while let Some(res) = response.message().await? {
for entry in res.entries {
if let Some(status) = entry.status {
if status.code != 0 {
eprintln!("delete_rows error {}: {}", status.code, status.message);
warn!("delete_rows error {}: {}", status.code, status.message);
return Err(Error::RowDeleteFailed);
}
}
}
}
Ok(())
}
/// Store data for one or more `table` rows in the `family_name` Column family
async fn put_row_data(
&mut self,
@@ -543,6 +641,28 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
deserialize_bincode_cell_data(&row_data, table, key.to_string())
}
pub async fn get_bincode_cells<T>(
&mut self,
table: &str,
keys: &[RowKey],
) -> Result<Vec<(RowKey, Result<T>)>>
where
T: serde::de::DeserializeOwned,
{
Ok(self
.get_multi_row_data(table, keys)
.await?
.into_iter()
.map(|(key, row_data)| {
let key_str = key.to_string();
(
key,
deserialize_bincode_cell_data(&row_data, table, key_str),
)
})
.collect())
}
pub async fn get_protobuf_or_bincode_cell<B, P>(
&mut self,
table: &str,

View File

@@ -17,7 +17,10 @@ use {
TransactionConfirmationStatus, TransactionStatus, TransactionStatusMeta,
TransactionWithStatusMeta,
},
std::{collections::HashMap, convert::TryInto},
std::{
collections::{HashMap, HashSet},
convert::TryInto,
},
thiserror::Error,
};
@@ -254,7 +257,7 @@ impl From<Reward> for StoredConfirmedBlockReward {
}
// A serialized `TransactionInfo` is stored in the `tx` table
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, PartialEq)]
struct TransactionInfo {
slot: Slot, // The slot that contains the block with this transaction in it
index: u32, // Where the transaction is located in the block
@@ -394,6 +397,7 @@ impl LedgerStorage {
let block = self.get_confirmed_block(slot).await?;
match block.transactions.into_iter().nth(index as usize) {
None => {
// report this somewhere actionable?
warn!("Transaction info for {} is corrupt", signature);
Ok(None)
}
@@ -637,6 +641,110 @@ impl LedgerStorage {
Ok(())
}
// Delete a confirmed block and associated meta data.
pub async fn delete_confirmed_block(&self, slot: Slot, dry_run: bool) -> Result<()> {
let mut addresses: HashSet<&Pubkey> = HashSet::new();
let mut expected_tx_infos: HashMap<String, TransactionInfo> = HashMap::new();
let confirmed_block = self.get_confirmed_block(slot).await?;
for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
let TransactionWithStatusMeta { meta, transaction } = transaction_with_meta;
let signature = transaction.signatures[0];
let index = index as u32;
let err = meta.as_ref().and_then(|meta| meta.status.clone().err());
let memo = extract_and_fmt_memos(&transaction.message);
for address in &transaction.message.account_keys {
if !is_sysvar_id(address) {
addresses.insert(address);
}
}
expected_tx_infos.insert(
signature.to_string(),
TransactionInfo {
slot,
index,
err,
memo,
},
);
}
let address_slot_rows: Vec<_> = addresses
.into_iter()
.map(|address| format!("{}/{}", address, slot_to_key(!slot)))
.collect();
let tx_deletion_rows = if !expected_tx_infos.is_empty() {
let signatures = expected_tx_infos
.iter()
.map(|(signature, _info)| signature)
.cloned()
.collect::<Vec<_>>();
let fetched_tx_infos = self
.connection
.get_bincode_cells_with_retry::<TransactionInfo>("tx", &signatures)
.await?
.into_iter()
.collect::<HashMap<_, _>>();
let mut deletion_rows = Vec::with_capacity(expected_tx_infos.len());
for (signature, expected_tx_info) in expected_tx_infos {
match fetched_tx_infos.get(&signature) {
Some(Ok(fetched_tx_info)) if fetched_tx_info == &expected_tx_info => {
deletion_rows.push(signature);
}
Some(Ok(_)) => {
warn!(
"skipped tx row {} because the bigtable entry did not match",
signature
);
}
Some(Err(err)) => {
warn!(
"skipped tx row {} because the bigtable entry was corrupted: {:?}",
signature, err
);
}
None => {
warn!("skipped tx row {} because it was not found", signature);
}
}
}
deletion_rows
} else {
vec![]
};
if !dry_run {
if !address_slot_rows.is_empty() {
self.connection
.delete_rows_with_retry("tx-by-addr", &address_slot_rows)
.await?;
}
if !tx_deletion_rows.is_empty() {
self.connection
.delete_rows_with_retry("tx", &tx_deletion_rows)
.await?;
}
self.connection
.delete_rows_with_retry("blocks", &[slot.to_string()])
.await?;
}
info!(
"{}deleted ledger data for slot {}: {} transaction rows, {} address slot rows",
if dry_run { "[dry run] " } else { "" },
slot,
tx_deletion_rows.len(),
address_slot_rows.len()
);
Ok(())
}
}
#[cfg(test)]