* Add `delete` subcommand to `ledger-tool bigtable` command
* feedback
(cherry picked from commit c71fab6cb3
)
Co-authored-by: Justin Starry <justin@solana.com>
This commit is contained in:
@ -1,5 +1,7 @@
|
|||||||
/// The `bigtable` subcommand
|
/// The `bigtable` subcommand
|
||||||
use clap::{value_t, value_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand};
|
use clap::{
|
||||||
|
value_t, value_t_or_exit, values_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand,
|
||||||
|
};
|
||||||
use solana_clap_utils::{
|
use solana_clap_utils::{
|
||||||
input_parsers::pubkey_of,
|
input_parsers::pubkey_of,
|
||||||
input_validators::{is_slot, is_valid_pubkey},
|
input_validators::{is_slot, is_valid_pubkey},
|
||||||
@ -41,6 +43,15 @@ async fn upload(
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn delete_slots(slots: Vec<Slot>, dry_run: bool) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let read_only = dry_run;
|
||||||
|
let bigtable = solana_storage_bigtable::LedgerStorage::new(read_only, None)
|
||||||
|
.await
|
||||||
|
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
|
||||||
|
|
||||||
|
solana_ledger::bigtable_delete::delete_confirmed_blocks(bigtable, slots, dry_run).await
|
||||||
|
}
|
||||||
|
|
||||||
async fn first_available_block() -> Result<(), Box<dyn std::error::Error>> {
|
async fn first_available_block() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let bigtable = solana_storage_bigtable::LedgerStorage::new(true, None).await?;
|
let bigtable = solana_storage_bigtable::LedgerStorage::new(true, None).await?;
|
||||||
match bigtable.get_first_available_block().await? {
|
match bigtable.get_first_available_block().await? {
|
||||||
@ -230,7 +241,7 @@ impl BigTableSubCommand for App<'_, '_> {
|
|||||||
Arg::with_name("starting_slot")
|
Arg::with_name("starting_slot")
|
||||||
.long("starting-slot")
|
.long("starting-slot")
|
||||||
.validator(is_slot)
|
.validator(is_slot)
|
||||||
.value_name("SLOT")
|
.value_name("START_SLOT")
|
||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.index(1)
|
.index(1)
|
||||||
.help(
|
.help(
|
||||||
@ -241,7 +252,7 @@ impl BigTableSubCommand for App<'_, '_> {
|
|||||||
Arg::with_name("ending_slot")
|
Arg::with_name("ending_slot")
|
||||||
.long("ending-slot")
|
.long("ending-slot")
|
||||||
.validator(is_slot)
|
.validator(is_slot)
|
||||||
.value_name("SLOT")
|
.value_name("END_SLOT")
|
||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.index(2)
|
.index(2)
|
||||||
.help("Stop uploading at this slot [default: last available slot]"),
|
.help("Stop uploading at this slot [default: last available slot]"),
|
||||||
@ -263,6 +274,28 @@ impl BigTableSubCommand for App<'_, '_> {
|
|||||||
),
|
),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
.subcommand(
|
||||||
|
SubCommand::with_name("delete-slots")
|
||||||
|
.about("Delete ledger information from BigTable")
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("slots")
|
||||||
|
.index(1)
|
||||||
|
.value_name("SLOTS")
|
||||||
|
.takes_value(true)
|
||||||
|
.multiple(true)
|
||||||
|
.required(true)
|
||||||
|
.help("Slots to delete"),
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("force")
|
||||||
|
.long("force")
|
||||||
|
.takes_value(false)
|
||||||
|
.help(
|
||||||
|
"Deletions are only performed when the force flag is enabled. \
|
||||||
|
If force is not enabled, show stats about what ledger data \
|
||||||
|
will be deleted in a real deletion. "),
|
||||||
|
),
|
||||||
|
)
|
||||||
.subcommand(
|
.subcommand(
|
||||||
SubCommand::with_name("first-available-block")
|
SubCommand::with_name("first-available-block")
|
||||||
.about("Get the first available block in the storage"),
|
.about("Get the first available block in the storage"),
|
||||||
@ -415,6 +448,11 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
|
|||||||
force_reupload,
|
force_reupload,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
("delete-slots", Some(arg_matches)) => {
|
||||||
|
let slots = values_t_or_exit!(arg_matches, "slots", Slot);
|
||||||
|
let dry_run = !value_t_or_exit!(arg_matches, "force", bool);
|
||||||
|
runtime.block_on(delete_slots(slots, dry_run))
|
||||||
|
}
|
||||||
("first-available-block", Some(_arg_matches)) => runtime.block_on(first_available_block()),
|
("first-available-block", Some(_arg_matches)) => runtime.block_on(first_available_block()),
|
||||||
("block", Some(arg_matches)) => {
|
("block", Some(arg_matches)) => {
|
||||||
let slot = value_t_or_exit!(arg_matches, "slot", Slot);
|
let slot = value_t_or_exit!(arg_matches, "slot", Slot);
|
||||||
|
53
ledger/src/bigtable_delete.rs
Normal file
53
ledger/src/bigtable_delete.rs
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
use {log::*, solana_measure::measure::Measure, solana_sdk::clock::Slot, std::result::Result};
|
||||||
|
|
||||||
|
// Attempt to delete this many blocks in parallel
|
||||||
|
const NUM_BLOCKS_TO_DELETE_IN_PARALLEL: usize = 32;
|
||||||
|
|
||||||
|
pub async fn delete_confirmed_blocks(
|
||||||
|
bigtable: solana_storage_bigtable::LedgerStorage,
|
||||||
|
blocks_to_delete: Vec<Slot>,
|
||||||
|
dry_run: bool,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let mut measure = Measure::start("entire delete");
|
||||||
|
|
||||||
|
if blocks_to_delete.is_empty() {
|
||||||
|
info!("No blocks to be deleted");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
info!("{} blocks to be deleted", blocks_to_delete.len());
|
||||||
|
|
||||||
|
let mut failures = 0;
|
||||||
|
for blocks in blocks_to_delete.chunks(NUM_BLOCKS_TO_DELETE_IN_PARALLEL) {
|
||||||
|
let mut measure_delete = Measure::start("Delete");
|
||||||
|
info!("Preparing the next {} blocks for deletion", blocks.len());
|
||||||
|
|
||||||
|
let deletion_futures = blocks
|
||||||
|
.iter()
|
||||||
|
.map(|block| bigtable.delete_confirmed_block(*block, dry_run));
|
||||||
|
|
||||||
|
for (block, result) in blocks
|
||||||
|
.iter()
|
||||||
|
.zip(futures::future::join_all(deletion_futures).await)
|
||||||
|
{
|
||||||
|
if result.is_err() {
|
||||||
|
error!(
|
||||||
|
"delete_confirmed_block({}) failed: {:?}",
|
||||||
|
block,
|
||||||
|
result.err()
|
||||||
|
);
|
||||||
|
failures += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
measure_delete.stop();
|
||||||
|
info!("{} for {} blocks", measure_delete, blocks.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
measure.stop();
|
||||||
|
info!("{}", measure);
|
||||||
|
if failures > 0 {
|
||||||
|
Err(format!("Incomplete deletion, {} operations failed", failures).into())
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
@ -4,6 +4,7 @@
|
|||||||
extern crate solana_bpf_loader_program;
|
extern crate solana_bpf_loader_program;
|
||||||
|
|
||||||
pub mod bank_forks_utils;
|
pub mod bank_forks_utils;
|
||||||
|
pub mod bigtable_delete;
|
||||||
pub mod bigtable_upload;
|
pub mod bigtable_upload;
|
||||||
pub mod block_error;
|
pub mod block_error;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
|
@ -6,6 +6,7 @@ use {
|
|||||||
compression::{compress_best, decompress},
|
compression::{compress_best, decompress},
|
||||||
root_ca_certificate,
|
root_ca_certificate,
|
||||||
},
|
},
|
||||||
|
backoff::{future::retry, ExponentialBackoff},
|
||||||
log::*,
|
log::*,
|
||||||
std::time::{Duration, Instant},
|
std::time::{Duration, Instant},
|
||||||
thiserror::Error,
|
thiserror::Error,
|
||||||
@ -66,6 +67,9 @@ pub enum Error {
|
|||||||
#[error("Row write failed")]
|
#[error("Row write failed")]
|
||||||
RowWriteFailed,
|
RowWriteFailed,
|
||||||
|
|
||||||
|
#[error("Row delete failed")]
|
||||||
|
RowDeleteFailed,
|
||||||
|
|
||||||
#[error("Object not found: {0}")]
|
#[error("Object not found: {0}")]
|
||||||
ObjectNotFound(String),
|
ObjectNotFound(String),
|
||||||
|
|
||||||
@ -218,7 +222,6 @@ impl BigTableConnection {
|
|||||||
where
|
where
|
||||||
T: serde::ser::Serialize,
|
T: serde::ser::Serialize,
|
||||||
{
|
{
|
||||||
use backoff::{future::retry, ExponentialBackoff};
|
|
||||||
retry(ExponentialBackoff::default(), || async {
|
retry(ExponentialBackoff::default(), || async {
|
||||||
let mut client = self.client();
|
let mut client = self.client();
|
||||||
Ok(client.put_bincode_cells(table, cells).await?)
|
Ok(client.put_bincode_cells(table, cells).await?)
|
||||||
@ -226,6 +229,29 @@ impl BigTableConnection {
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn delete_rows_with_retry(&self, table: &str, row_keys: &[RowKey]) -> Result<()> {
|
||||||
|
retry(ExponentialBackoff::default(), || async {
|
||||||
|
let mut client = self.client();
|
||||||
|
Ok(client.delete_rows(table, row_keys).await?)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_bincode_cells_with_retry<T>(
|
||||||
|
&self,
|
||||||
|
table: &str,
|
||||||
|
row_keys: &[RowKey],
|
||||||
|
) -> Result<Vec<(RowKey, Result<T>)>>
|
||||||
|
where
|
||||||
|
T: serde::de::DeserializeOwned,
|
||||||
|
{
|
||||||
|
retry(ExponentialBackoff::default(), || async {
|
||||||
|
let mut client = self.client();
|
||||||
|
Ok(client.get_bincode_cells(table, row_keys).await?)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn put_protobuf_cells_with_retry<T>(
|
pub async fn put_protobuf_cells_with_retry<T>(
|
||||||
&self,
|
&self,
|
||||||
table: &str,
|
table: &str,
|
||||||
@ -234,7 +260,6 @@ impl BigTableConnection {
|
|||||||
where
|
where
|
||||||
T: prost::Message,
|
T: prost::Message,
|
||||||
{
|
{
|
||||||
use backoff::{future::retry, ExponentialBackoff};
|
|
||||||
retry(ExponentialBackoff::default(), || async {
|
retry(ExponentialBackoff::default(), || async {
|
||||||
let mut client = self.client();
|
let mut client = self.client();
|
||||||
Ok(client.put_protobuf_cells(table, cells).await?)
|
Ok(client.put_protobuf_cells(table, cells).await?)
|
||||||
@ -444,6 +469,38 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||||||
self.decode_read_rows_response(response).await
|
self.decode_read_rows_response(response).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get latest data from multiple rows of `table`, if those rows exist.
|
||||||
|
pub async fn get_multi_row_data(
|
||||||
|
&mut self,
|
||||||
|
table_name: &str,
|
||||||
|
row_keys: &[RowKey],
|
||||||
|
) -> Result<Vec<(RowKey, RowData)>> {
|
||||||
|
self.refresh_access_token().await;
|
||||||
|
|
||||||
|
let response = self
|
||||||
|
.client
|
||||||
|
.read_rows(ReadRowsRequest {
|
||||||
|
table_name: format!("{}{}", self.table_prefix, table_name),
|
||||||
|
rows_limit: 0, // return all keys
|
||||||
|
rows: Some(RowSet {
|
||||||
|
row_keys: row_keys
|
||||||
|
.iter()
|
||||||
|
.map(|k| k.as_bytes().to_vec())
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
|
row_ranges: vec![],
|
||||||
|
}),
|
||||||
|
filter: Some(RowFilter {
|
||||||
|
// Only return the latest version of each cell
|
||||||
|
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
|
||||||
|
}),
|
||||||
|
..ReadRowsRequest::default()
|
||||||
|
})
|
||||||
|
.await?
|
||||||
|
.into_inner();
|
||||||
|
|
||||||
|
self.decode_read_rows_response(response).await
|
||||||
|
}
|
||||||
|
|
||||||
/// Get latest data from a single row of `table`, if that row exists. Returns an error if that
|
/// Get latest data from a single row of `table`, if that row exists. Returns an error if that
|
||||||
/// row does not exist.
|
/// row does not exist.
|
||||||
///
|
///
|
||||||
@ -481,6 +538,47 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||||||
.ok_or(Error::RowNotFound)
|
.ok_or(Error::RowNotFound)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete one or more `table` rows
|
||||||
|
async fn delete_rows(&mut self, table_name: &str, row_keys: &[RowKey]) -> Result<()> {
|
||||||
|
self.refresh_access_token().await;
|
||||||
|
|
||||||
|
let mut entries = vec![];
|
||||||
|
for row_key in row_keys {
|
||||||
|
entries.push(mutate_rows_request::Entry {
|
||||||
|
row_key: row_key.as_bytes().to_vec(),
|
||||||
|
mutations: vec![Mutation {
|
||||||
|
mutation: Some(mutation::Mutation::DeleteFromRow(
|
||||||
|
mutation::DeleteFromRow {},
|
||||||
|
)),
|
||||||
|
}],
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut response = self
|
||||||
|
.client
|
||||||
|
.mutate_rows(MutateRowsRequest {
|
||||||
|
table_name: format!("{}{}", self.table_prefix, table_name),
|
||||||
|
entries,
|
||||||
|
..MutateRowsRequest::default()
|
||||||
|
})
|
||||||
|
.await?
|
||||||
|
.into_inner();
|
||||||
|
|
||||||
|
while let Some(res) = response.message().await? {
|
||||||
|
for entry in res.entries {
|
||||||
|
if let Some(status) = entry.status {
|
||||||
|
if status.code != 0 {
|
||||||
|
eprintln!("delete_rows error {}: {}", status.code, status.message);
|
||||||
|
warn!("delete_rows error {}: {}", status.code, status.message);
|
||||||
|
return Err(Error::RowDeleteFailed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Store data for one or more `table` rows in the `family_name` Column family
|
/// Store data for one or more `table` rows in the `family_name` Column family
|
||||||
async fn put_row_data(
|
async fn put_row_data(
|
||||||
&mut self,
|
&mut self,
|
||||||
@ -543,6 +641,28 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||||||
deserialize_bincode_cell_data(&row_data, table, key.to_string())
|
deserialize_bincode_cell_data(&row_data, table, key.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn get_bincode_cells<T>(
|
||||||
|
&mut self,
|
||||||
|
table: &str,
|
||||||
|
keys: &[RowKey],
|
||||||
|
) -> Result<Vec<(RowKey, Result<T>)>>
|
||||||
|
where
|
||||||
|
T: serde::de::DeserializeOwned,
|
||||||
|
{
|
||||||
|
Ok(self
|
||||||
|
.get_multi_row_data(table, keys)
|
||||||
|
.await?
|
||||||
|
.into_iter()
|
||||||
|
.map(|(key, row_data)| {
|
||||||
|
let key_str = key.to_string();
|
||||||
|
(
|
||||||
|
key,
|
||||||
|
deserialize_bincode_cell_data(&row_data, table, key_str),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn get_protobuf_or_bincode_cell<B, P>(
|
pub async fn get_protobuf_or_bincode_cell<B, P>(
|
||||||
&mut self,
|
&mut self,
|
||||||
table: &str,
|
table: &str,
|
||||||
|
@ -17,7 +17,10 @@ use {
|
|||||||
TransactionConfirmationStatus, TransactionStatus, TransactionStatusMeta,
|
TransactionConfirmationStatus, TransactionStatus, TransactionStatusMeta,
|
||||||
TransactionWithStatusMeta,
|
TransactionWithStatusMeta,
|
||||||
},
|
},
|
||||||
std::{collections::HashMap, convert::TryInto},
|
std::{
|
||||||
|
collections::{HashMap, HashSet},
|
||||||
|
convert::TryInto,
|
||||||
|
},
|
||||||
thiserror::Error,
|
thiserror::Error,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -254,7 +257,7 @@ impl From<Reward> for StoredConfirmedBlockReward {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// A serialized `TransactionInfo` is stored in the `tx` table
|
// A serialized `TransactionInfo` is stored in the `tx` table
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize, PartialEq)]
|
||||||
struct TransactionInfo {
|
struct TransactionInfo {
|
||||||
slot: Slot, // The slot that contains the block with this transaction in it
|
slot: Slot, // The slot that contains the block with this transaction in it
|
||||||
index: u32, // Where the transaction is located in the block
|
index: u32, // Where the transaction is located in the block
|
||||||
@ -394,6 +397,7 @@ impl LedgerStorage {
|
|||||||
let block = self.get_confirmed_block(slot).await?;
|
let block = self.get_confirmed_block(slot).await?;
|
||||||
match block.transactions.into_iter().nth(index as usize) {
|
match block.transactions.into_iter().nth(index as usize) {
|
||||||
None => {
|
None => {
|
||||||
|
// report this somewhere actionable?
|
||||||
warn!("Transaction info for {} is corrupt", signature);
|
warn!("Transaction info for {} is corrupt", signature);
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
@ -637,6 +641,110 @@ impl LedgerStorage {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete a confirmed block and associated meta data.
|
||||||
|
pub async fn delete_confirmed_block(&self, slot: Slot, dry_run: bool) -> Result<()> {
|
||||||
|
let mut addresses: HashSet<&Pubkey> = HashSet::new();
|
||||||
|
let mut expected_tx_infos: HashMap<String, TransactionInfo> = HashMap::new();
|
||||||
|
let confirmed_block = self.get_confirmed_block(slot).await?;
|
||||||
|
for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
|
||||||
|
let TransactionWithStatusMeta { meta, transaction } = transaction_with_meta;
|
||||||
|
let signature = transaction.signatures[0];
|
||||||
|
let index = index as u32;
|
||||||
|
let err = meta.as_ref().and_then(|meta| meta.status.clone().err());
|
||||||
|
let memo = extract_and_fmt_memos(&transaction.message);
|
||||||
|
|
||||||
|
for address in &transaction.message.account_keys {
|
||||||
|
if !is_sysvar_id(address) {
|
||||||
|
addresses.insert(address);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
expected_tx_infos.insert(
|
||||||
|
signature.to_string(),
|
||||||
|
TransactionInfo {
|
||||||
|
slot,
|
||||||
|
index,
|
||||||
|
err,
|
||||||
|
memo,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let address_slot_rows: Vec<_> = addresses
|
||||||
|
.into_iter()
|
||||||
|
.map(|address| format!("{}/{}", address, slot_to_key(!slot)))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let tx_deletion_rows = if !expected_tx_infos.is_empty() {
|
||||||
|
let signatures = expected_tx_infos
|
||||||
|
.iter()
|
||||||
|
.map(|(signature, _info)| signature)
|
||||||
|
.cloned()
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let fetched_tx_infos = self
|
||||||
|
.connection
|
||||||
|
.get_bincode_cells_with_retry::<TransactionInfo>("tx", &signatures)
|
||||||
|
.await?
|
||||||
|
.into_iter()
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
|
let mut deletion_rows = Vec::with_capacity(expected_tx_infos.len());
|
||||||
|
for (signature, expected_tx_info) in expected_tx_infos {
|
||||||
|
match fetched_tx_infos.get(&signature) {
|
||||||
|
Some(Ok(fetched_tx_info)) if fetched_tx_info == &expected_tx_info => {
|
||||||
|
deletion_rows.push(signature);
|
||||||
|
}
|
||||||
|
Some(Ok(_)) => {
|
||||||
|
warn!(
|
||||||
|
"skipped tx row {} because the bigtable entry did not match",
|
||||||
|
signature
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Some(Err(err)) => {
|
||||||
|
warn!(
|
||||||
|
"skipped tx row {} because the bigtable entry was corrupted: {:?}",
|
||||||
|
signature, err
|
||||||
|
);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
warn!("skipped tx row {} because it was not found", signature);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
deletion_rows
|
||||||
|
} else {
|
||||||
|
vec![]
|
||||||
|
};
|
||||||
|
|
||||||
|
if !dry_run {
|
||||||
|
if !address_slot_rows.is_empty() {
|
||||||
|
self.connection
|
||||||
|
.delete_rows_with_retry("tx-by-addr", &address_slot_rows)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !tx_deletion_rows.is_empty() {
|
||||||
|
self.connection
|
||||||
|
.delete_rows_with_retry("tx", &tx_deletion_rows)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.connection
|
||||||
|
.delete_rows_with_retry("blocks", &[slot.to_string()])
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"{}deleted ledger data for slot {}: {} transaction rows, {} address slot rows",
|
||||||
|
if dry_run { "[dry run] " } else { "" },
|
||||||
|
slot,
|
||||||
|
tx_deletion_rows.len(),
|
||||||
|
address_slot_rows.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
Reference in New Issue
Block a user