From 3b3822846be73f4f1d288f2c7121b15fc8a11c80 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 17 Sep 2021 04:25:36 +0000 Subject: [PATCH] Add `delete` subcommand to `ledger-tool bigtable` (#19931) (#19962) * Add `delete` subcommand to `ledger-tool bigtable` command * feedback (cherry picked from commit c71fab6cb3392cca6fd8d23347dc77559c9e3097) Co-authored-by: Justin Starry --- ledger-tool/src/bigtable.rs | 44 ++++++++++- ledger/src/bigtable_delete.rs | 53 +++++++++++++ ledger/src/lib.rs | 1 + storage-bigtable/src/bigtable.rs | 124 ++++++++++++++++++++++++++++++- storage-bigtable/src/lib.rs | 112 +++++++++++++++++++++++++++- 5 files changed, 327 insertions(+), 7 deletions(-) create mode 100644 ledger/src/bigtable_delete.rs diff --git a/ledger-tool/src/bigtable.rs b/ledger-tool/src/bigtable.rs index 38c3874043..2e112abba0 100644 --- a/ledger-tool/src/bigtable.rs +++ b/ledger-tool/src/bigtable.rs @@ -1,5 +1,7 @@ /// The `bigtable` subcommand -use clap::{value_t, value_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand}; +use clap::{ + value_t, value_t_or_exit, values_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand, +}; use solana_clap_utils::{ input_parsers::pubkey_of, input_validators::{is_slot, is_valid_pubkey}, @@ -41,6 +43,15 @@ async fn upload( .await } +async fn delete_slots(slots: Vec, dry_run: bool) -> Result<(), Box> { + let read_only = dry_run; + let bigtable = solana_storage_bigtable::LedgerStorage::new(read_only, None) + .await + .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; + + solana_ledger::bigtable_delete::delete_confirmed_blocks(bigtable, slots, dry_run).await +} + async fn first_available_block() -> Result<(), Box> { let bigtable = solana_storage_bigtable::LedgerStorage::new(true, None).await?; match bigtable.get_first_available_block().await? { @@ -230,7 +241,7 @@ impl BigTableSubCommand for App<'_, '_> { Arg::with_name("starting_slot") .long("starting-slot") .validator(is_slot) - .value_name("SLOT") + .value_name("START_SLOT") .takes_value(true) .index(1) .help( @@ -241,7 +252,7 @@ impl BigTableSubCommand for App<'_, '_> { Arg::with_name("ending_slot") .long("ending-slot") .validator(is_slot) - .value_name("SLOT") + .value_name("END_SLOT") .takes_value(true) .index(2) .help("Stop uploading at this slot [default: last available slot]"), @@ -263,6 +274,28 @@ impl BigTableSubCommand for App<'_, '_> { ), ), ) + .subcommand( + SubCommand::with_name("delete-slots") + .about("Delete ledger information from BigTable") + .arg( + Arg::with_name("slots") + .index(1) + .value_name("SLOTS") + .takes_value(true) + .multiple(true) + .required(true) + .help("Slots to delete"), + ) + .arg( + Arg::with_name("force") + .long("force") + .takes_value(false) + .help( + "Deletions are only performed when the force flag is enabled. \ + If force is not enabled, show stats about what ledger data \ + will be deleted in a real deletion. "), + ), + ) .subcommand( SubCommand::with_name("first-available-block") .about("Get the first available block in the storage"), @@ -415,6 +448,11 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) { force_reupload, )) } + ("delete-slots", Some(arg_matches)) => { + let slots = values_t_or_exit!(arg_matches, "slots", Slot); + let dry_run = !value_t_or_exit!(arg_matches, "force", bool); + runtime.block_on(delete_slots(slots, dry_run)) + } ("first-available-block", Some(_arg_matches)) => runtime.block_on(first_available_block()), ("block", Some(arg_matches)) => { let slot = value_t_or_exit!(arg_matches, "slot", Slot); diff --git a/ledger/src/bigtable_delete.rs b/ledger/src/bigtable_delete.rs new file mode 100644 index 0000000000..dc3ede3583 --- /dev/null +++ b/ledger/src/bigtable_delete.rs @@ -0,0 +1,53 @@ +use {log::*, solana_measure::measure::Measure, solana_sdk::clock::Slot, std::result::Result}; + +// Attempt to delete this many blocks in parallel +const NUM_BLOCKS_TO_DELETE_IN_PARALLEL: usize = 32; + +pub async fn delete_confirmed_blocks( + bigtable: solana_storage_bigtable::LedgerStorage, + blocks_to_delete: Vec, + dry_run: bool, +) -> Result<(), Box> { + let mut measure = Measure::start("entire delete"); + + if blocks_to_delete.is_empty() { + info!("No blocks to be deleted"); + return Ok(()); + } + info!("{} blocks to be deleted", blocks_to_delete.len()); + + let mut failures = 0; + for blocks in blocks_to_delete.chunks(NUM_BLOCKS_TO_DELETE_IN_PARALLEL) { + let mut measure_delete = Measure::start("Delete"); + info!("Preparing the next {} blocks for deletion", blocks.len()); + + let deletion_futures = blocks + .iter() + .map(|block| bigtable.delete_confirmed_block(*block, dry_run)); + + for (block, result) in blocks + .iter() + .zip(futures::future::join_all(deletion_futures).await) + { + if result.is_err() { + error!( + "delete_confirmed_block({}) failed: {:?}", + block, + result.err() + ); + failures += 1; + } + } + + measure_delete.stop(); + info!("{} for {} blocks", measure_delete, blocks.len()); + } + + measure.stop(); + info!("{}", measure); + if failures > 0 { + Err(format!("Incomplete deletion, {} operations failed", failures).into()) + } else { + Ok(()) + } +} diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index 603f585da3..89779bb695 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -4,6 +4,7 @@ extern crate solana_bpf_loader_program; pub mod bank_forks_utils; +pub mod bigtable_delete; pub mod bigtable_upload; pub mod block_error; #[macro_use] diff --git a/storage-bigtable/src/bigtable.rs b/storage-bigtable/src/bigtable.rs index eb619b9a91..bb482275f7 100644 --- a/storage-bigtable/src/bigtable.rs +++ b/storage-bigtable/src/bigtable.rs @@ -6,6 +6,7 @@ use { compression::{compress_best, decompress}, root_ca_certificate, }, + backoff::{future::retry, ExponentialBackoff}, log::*, std::time::{Duration, Instant}, thiserror::Error, @@ -66,6 +67,9 @@ pub enum Error { #[error("Row write failed")] RowWriteFailed, + #[error("Row delete failed")] + RowDeleteFailed, + #[error("Object not found: {0}")] ObjectNotFound(String), @@ -218,7 +222,6 @@ impl BigTableConnection { where T: serde::ser::Serialize, { - use backoff::{future::retry, ExponentialBackoff}; retry(ExponentialBackoff::default(), || async { let mut client = self.client(); Ok(client.put_bincode_cells(table, cells).await?) @@ -226,6 +229,29 @@ impl BigTableConnection { .await } + pub async fn delete_rows_with_retry(&self, table: &str, row_keys: &[RowKey]) -> Result<()> { + retry(ExponentialBackoff::default(), || async { + let mut client = self.client(); + Ok(client.delete_rows(table, row_keys).await?) + }) + .await + } + + pub async fn get_bincode_cells_with_retry( + &self, + table: &str, + row_keys: &[RowKey], + ) -> Result)>> + where + T: serde::de::DeserializeOwned, + { + retry(ExponentialBackoff::default(), || async { + let mut client = self.client(); + Ok(client.get_bincode_cells(table, row_keys).await?) + }) + .await + } + pub async fn put_protobuf_cells_with_retry( &self, table: &str, @@ -234,7 +260,6 @@ impl BigTableConnection { where T: prost::Message, { - use backoff::{future::retry, ExponentialBackoff}; retry(ExponentialBackoff::default(), || async { let mut client = self.client(); Ok(client.put_protobuf_cells(table, cells).await?) @@ -444,6 +469,38 @@ impl) -> InterceptedRequestResult> BigTable { self.decode_read_rows_response(response).await } + /// Get latest data from multiple rows of `table`, if those rows exist. + pub async fn get_multi_row_data( + &mut self, + table_name: &str, + row_keys: &[RowKey], + ) -> Result> { + self.refresh_access_token().await; + + let response = self + .client + .read_rows(ReadRowsRequest { + table_name: format!("{}{}", self.table_prefix, table_name), + rows_limit: 0, // return all keys + rows: Some(RowSet { + row_keys: row_keys + .iter() + .map(|k| k.as_bytes().to_vec()) + .collect::>(), + row_ranges: vec![], + }), + filter: Some(RowFilter { + // Only return the latest version of each cell + filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)), + }), + ..ReadRowsRequest::default() + }) + .await? + .into_inner(); + + self.decode_read_rows_response(response).await + } + /// Get latest data from a single row of `table`, if that row exists. Returns an error if that /// row does not exist. /// @@ -481,6 +538,47 @@ impl) -> InterceptedRequestResult> BigTable { .ok_or(Error::RowNotFound) } + /// Delete one or more `table` rows + async fn delete_rows(&mut self, table_name: &str, row_keys: &[RowKey]) -> Result<()> { + self.refresh_access_token().await; + + let mut entries = vec![]; + for row_key in row_keys { + entries.push(mutate_rows_request::Entry { + row_key: row_key.as_bytes().to_vec(), + mutations: vec![Mutation { + mutation: Some(mutation::Mutation::DeleteFromRow( + mutation::DeleteFromRow {}, + )), + }], + }); + } + + let mut response = self + .client + .mutate_rows(MutateRowsRequest { + table_name: format!("{}{}", self.table_prefix, table_name), + entries, + ..MutateRowsRequest::default() + }) + .await? + .into_inner(); + + while let Some(res) = response.message().await? { + for entry in res.entries { + if let Some(status) = entry.status { + if status.code != 0 { + eprintln!("delete_rows error {}: {}", status.code, status.message); + warn!("delete_rows error {}: {}", status.code, status.message); + return Err(Error::RowDeleteFailed); + } + } + } + } + + Ok(()) + } + /// Store data for one or more `table` rows in the `family_name` Column family async fn put_row_data( &mut self, @@ -543,6 +641,28 @@ impl) -> InterceptedRequestResult> BigTable { deserialize_bincode_cell_data(&row_data, table, key.to_string()) } + pub async fn get_bincode_cells( + &mut self, + table: &str, + keys: &[RowKey], + ) -> Result)>> + where + T: serde::de::DeserializeOwned, + { + Ok(self + .get_multi_row_data(table, keys) + .await? + .into_iter() + .map(|(key, row_data)| { + let key_str = key.to_string(); + ( + key, + deserialize_bincode_cell_data(&row_data, table, key_str), + ) + }) + .collect()) + } + pub async fn get_protobuf_or_bincode_cell( &mut self, table: &str, diff --git a/storage-bigtable/src/lib.rs b/storage-bigtable/src/lib.rs index bf7e966a42..e9ddb88632 100644 --- a/storage-bigtable/src/lib.rs +++ b/storage-bigtable/src/lib.rs @@ -17,7 +17,10 @@ use { TransactionConfirmationStatus, TransactionStatus, TransactionStatusMeta, TransactionWithStatusMeta, }, - std::{collections::HashMap, convert::TryInto}, + std::{ + collections::{HashMap, HashSet}, + convert::TryInto, + }, thiserror::Error, }; @@ -254,7 +257,7 @@ impl From for StoredConfirmedBlockReward { } // A serialized `TransactionInfo` is stored in the `tx` table -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, PartialEq)] struct TransactionInfo { slot: Slot, // The slot that contains the block with this transaction in it index: u32, // Where the transaction is located in the block @@ -394,6 +397,7 @@ impl LedgerStorage { let block = self.get_confirmed_block(slot).await?; match block.transactions.into_iter().nth(index as usize) { None => { + // report this somewhere actionable? warn!("Transaction info for {} is corrupt", signature); Ok(None) } @@ -637,6 +641,110 @@ impl LedgerStorage { Ok(()) } + + // Delete a confirmed block and associated meta data. + pub async fn delete_confirmed_block(&self, slot: Slot, dry_run: bool) -> Result<()> { + let mut addresses: HashSet<&Pubkey> = HashSet::new(); + let mut expected_tx_infos: HashMap = HashMap::new(); + let confirmed_block = self.get_confirmed_block(slot).await?; + for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() { + let TransactionWithStatusMeta { meta, transaction } = transaction_with_meta; + let signature = transaction.signatures[0]; + let index = index as u32; + let err = meta.as_ref().and_then(|meta| meta.status.clone().err()); + let memo = extract_and_fmt_memos(&transaction.message); + + for address in &transaction.message.account_keys { + if !is_sysvar_id(address) { + addresses.insert(address); + } + } + + expected_tx_infos.insert( + signature.to_string(), + TransactionInfo { + slot, + index, + err, + memo, + }, + ); + } + + let address_slot_rows: Vec<_> = addresses + .into_iter() + .map(|address| format!("{}/{}", address, slot_to_key(!slot))) + .collect(); + + let tx_deletion_rows = if !expected_tx_infos.is_empty() { + let signatures = expected_tx_infos + .iter() + .map(|(signature, _info)| signature) + .cloned() + .collect::>(); + let fetched_tx_infos = self + .connection + .get_bincode_cells_with_retry::("tx", &signatures) + .await? + .into_iter() + .collect::>(); + + let mut deletion_rows = Vec::with_capacity(expected_tx_infos.len()); + for (signature, expected_tx_info) in expected_tx_infos { + match fetched_tx_infos.get(&signature) { + Some(Ok(fetched_tx_info)) if fetched_tx_info == &expected_tx_info => { + deletion_rows.push(signature); + } + Some(Ok(_)) => { + warn!( + "skipped tx row {} because the bigtable entry did not match", + signature + ); + } + Some(Err(err)) => { + warn!( + "skipped tx row {} because the bigtable entry was corrupted: {:?}", + signature, err + ); + } + None => { + warn!("skipped tx row {} because it was not found", signature); + } + } + } + deletion_rows + } else { + vec![] + }; + + if !dry_run { + if !address_slot_rows.is_empty() { + self.connection + .delete_rows_with_retry("tx-by-addr", &address_slot_rows) + .await?; + } + + if !tx_deletion_rows.is_empty() { + self.connection + .delete_rows_with_retry("tx", &tx_deletion_rows) + .await?; + } + + self.connection + .delete_rows_with_retry("blocks", &[slot.to_string()]) + .await?; + } + + info!( + "{}deleted ledger data for slot {}: {} transaction rows, {} address slot rows", + if dry_run { "[dry run] " } else { "" }, + slot, + tx_deletion_rows.len(), + address_slot_rows.len() + ); + + Ok(()) + } } #[cfg(test)]