From 6a74bc1297794fbd15bc3c5ed0de82ecb5459639 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Sat, 5 Sep 2020 17:09:32 +0000 Subject: [PATCH] validator: Add `--enable-bigtable-ledger-upload` flag (bp #12040) (#12057) * Add --enable-bigtable-ledger-upload flag (cherry picked from commit d8e2038dda12db533bccaafafa6dfc77d0bb73a5) * Relocate BigTable uploader to ledger/ crate (cherry picked from commit 91a56caed213fd00b3151b749ac597146bccc148) # Conflicts: # ledger/Cargo.toml * Cargo.lock (cherry picked from commit 2b8a521562e52ace22a846a4486541a28990bc78) * Add BigTableUploadService (cherry picked from commit bc7731b96985169b10076e85bb431bb3844e2209) * Add BigTableUploadService (cherry picked from commit bafdcf24f55cb53d64daf6e160d2e41b297f5d11) * Add exit flag for bigtable upload operations (cherry picked from commit d3611f74c8401cabfd17bc487f8bc2d86933fa64) * Remove dead code (cherry picked from commit cd3c134b58ba996ffc882d675a2ec6a50e203eb6) * Request correct access (cherry picked from commit 4ba43c29ce29d558465765139dae527494972323) * Add LARGEST_CONFIRMED_ROOT_UPLOAD_DELAY (cherry picked from commit b64fb295a15c870df330b0db7f96dffa15b79a15) * Resolve merge conflicts Co-authored-by: Michael Vines --- Cargo.lock | 4 + core/src/bigtable_upload_service.rs | 93 ++++++++++++ core/src/lib.rs | 1 + core/src/rpc.rs | 1 + core/src/rpc_service.rs | 51 +++++-- ledger-tool/src/bigtable.rs | 209 ++------------------------ ledger/Cargo.toml | 4 + ledger/src/bigtable_upload.rs | 222 ++++++++++++++++++++++++++++ ledger/src/lib.rs | 1 + validator/src/main.rs | 8 + 10 files changed, 385 insertions(+), 209 deletions(-) create mode 100644 core/src/bigtable_upload_service.rs create mode 100644 ledger/src/bigtable_upload.rs diff --git a/Cargo.lock b/Cargo.lock index 9c10f604c7..ead8e1480d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3848,6 +3848,8 @@ dependencies = [ "dlopen_derive", "ed25519-dalek", "fs_extra", + "futures 0.3.5", + "futures-util", "itertools 0.9.0", "lazy_static", "libc", @@ -3873,10 +3875,12 @@ dependencies = [ "solana-runtime", "solana-sdk 1.3.9", "solana-stake-program", + "solana-storage-bigtable", "solana-transaction-status", "solana-vote-program", "tempfile", "thiserror", + "tokio 0.2.22", "trees", ] diff --git a/core/src/bigtable_upload_service.rs b/core/src/bigtable_upload_service.rs new file mode 100644 index 0000000000..23db772af0 --- /dev/null +++ b/core/src/bigtable_upload_service.rs @@ -0,0 +1,93 @@ +use solana_ledger::blockstore::Blockstore; +use solana_runtime::commitment::BlockCommitmentCache; +use std::{ + sync::atomic::{AtomicBool, Ordering}, + sync::{Arc, RwLock}, + thread::{self, Builder, JoinHandle}, +}; +use tokio::runtime; + +// Delay uploading the largest confirmed root for this many slots. This is done in an attempt to +// ensure that the `CacheBlockTimeService` has had enough time to add the block time for the root +// before it's uploaded to BigTable. +// +// A more direct connection between CacheBlockTimeService and BigTableUploadService would be +// preferable... +const LARGEST_CONFIRMED_ROOT_UPLOAD_DELAY: usize = 100; + +pub struct BigTableUploadService { + thread: JoinHandle<()>, +} + +impl BigTableUploadService { + pub fn new( + runtime_handle: runtime::Handle, + bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage, + blockstore: Arc, + block_commitment_cache: Arc>, + exit: Arc, + ) -> Self { + info!("Starting BigTable upload service"); + let thread = Builder::new() + .name("bigtable-upload".to_string()) + .spawn(move || { + Self::run( + runtime_handle, + bigtable_ledger_storage, + blockstore, + block_commitment_cache, + exit, + ) + }) + .unwrap(); + + Self { thread } + } + + fn run( + runtime: runtime::Handle, + bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage, + blockstore: Arc, + block_commitment_cache: Arc>, + exit: Arc, + ) { + let mut start_slot = 0; + loop { + if exit.load(Ordering::Relaxed) { + break; + } + + let end_slot = block_commitment_cache + .read() + .unwrap() + .highest_confirmed_root() + .saturating_sub(LARGEST_CONFIRMED_ROOT_UPLOAD_DELAY as u64); + + if end_slot <= start_slot { + std::thread::sleep(std::time::Duration::from_secs(1)); + continue; + } + + let result = runtime.block_on(solana_ledger::bigtable_upload::upload_confirmed_blocks( + blockstore.clone(), + bigtable_ledger_storage.clone(), + start_slot, + Some(end_slot), + true, + exit.clone(), + )); + + match result { + Ok(()) => start_slot = end_slot, + Err(err) => { + warn!("bigtable: upload_confirmed_blocks: {}", err); + std::thread::sleep(std::time::Duration::from_secs(2)); + } + } + } + } + + pub fn join(self) -> thread::Result<()> { + self.thread.join() + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 7862791286..e5986881a2 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -9,6 +9,7 @@ pub mod accounts_background_service; pub mod accounts_hash_verifier; pub mod banking_stage; +pub mod bigtable_upload_service; pub mod broadcast_stage; pub mod cluster_info_vote_listener; pub mod commitment_service; diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 9769f8ce14..843e6d9676 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -99,6 +99,7 @@ pub struct JsonRpcConfig { pub faucet_addr: Option, pub health_check_slot_distance: u64, pub enable_bigtable_ledger_storage: bool, + pub enable_bigtable_ledger_upload: bool, } #[derive(Clone)] diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 629416ae01..f8ef0c6067 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -1,6 +1,9 @@ //! The `rpc_service` module implements the Solana JSON RPC service. -use crate::{cluster_info::ClusterInfo, rpc::*, rpc_health::*, validator::ValidatorExit}; +use crate::{ + bigtable_upload_service::BigTableUploadService, cluster_info::ClusterInfo, rpc::*, + rpc_health::*, validator::ValidatorExit, +}; use jsonrpc_core::MetaIoHandler; use jsonrpc_http_server::{ hyper, AccessControlAllowOrigin, CloseHandle, DomainsValidation, RequestMiddleware, @@ -260,20 +263,37 @@ impl JsonRpcService { .build() .expect("Runtime"); - let bigtable_ledger_storage = if config.enable_bigtable_ledger_storage { - runtime - .block_on(solana_storage_bigtable::LedgerStorage::new(false)) - .map(|x| { - info!("BigTable ledger storage initialized"); - Some(x) - }) - .unwrap_or_else(|err| { - error!("Failed to initialize BigTable ledger storage: {:?}", err); - None - }) - } else { - None - }; + let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false)); + + let (bigtable_ledger_storage, _bigtable_ledger_upload_service) = + if config.enable_bigtable_ledger_storage || config.enable_bigtable_ledger_upload { + runtime + .block_on(solana_storage_bigtable::LedgerStorage::new( + !config.enable_bigtable_ledger_upload, + )) + .map(|bigtable_ledger_storage| { + info!("BigTable ledger storage initialized"); + + let bigtable_ledger_upload_service = Arc::new(BigTableUploadService::new( + runtime.handle().clone(), + bigtable_ledger_storage.clone(), + blockstore.clone(), + block_commitment_cache.clone(), + exit_bigtable_ledger_upload_service.clone(), + )); + + ( + Some(bigtable_ledger_storage), + Some(bigtable_ledger_upload_service), + ) + }) + .unwrap_or_else(|err| { + error!("Failed to initialize BigTable ledger storage: {:?}", err); + (None, None) + }) + } else { + (None, None) + }; let (request_processor, receiver) = JsonRpcRequestProcessor::new( config, @@ -341,6 +361,7 @@ impl JsonRpcService { close_handle_sender.send(server.close_handle()).unwrap(); server.wait(); exit_send_transaction_service.store(true, Ordering::Relaxed); + exit_bigtable_ledger_upload_service.store(true, Ordering::Relaxed); }) .unwrap(); diff --git a/ledger-tool/src/bigtable.rs b/ledger-tool/src/bigtable.rs index 3f137f1d3e..eff4e2b65b 100644 --- a/ledger-tool/src/bigtable.rs +++ b/ledger-tool/src/bigtable.rs @@ -1,23 +1,19 @@ /// The `bigtable` subcommand use clap::{value_t, value_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand}; -use log::*; use solana_clap_utils::{ input_parsers::pubkey_of, input_validators::{is_slot, is_valid_pubkey}, }; use solana_cli::display::println_transaction; use solana_ledger::{blockstore::Blockstore, blockstore_db::AccessType}; -use solana_measure::measure::Measure; use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; use solana_transaction_status::UiTransactionEncoding; -use std::{collections::HashSet, path::Path, process::exit, result::Result, time::Duration}; -use tokio::time::delay_for; - -// Attempt to upload this many blocks in parallel -const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32; - -// Read up to this many blocks from blockstore before blocking on the upload process -const BLOCK_READ_AHEAD_DEPTH: usize = NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2; +use std::{ + path::Path, + process::exit, + result::Result, + sync::{atomic::AtomicBool, Arc}, +}; async fn upload( blockstore: Blockstore, @@ -25,194 +21,19 @@ async fn upload( ending_slot: Option, allow_missing_metadata: bool, ) -> Result<(), Box> { - let mut measure = Measure::start("entire upload"); - let bigtable = solana_storage_bigtable::LedgerStorage::new(false) .await .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; - info!("Loading ledger slots..."); - let blockstore_slots: Vec<_> = blockstore - .slot_meta_iterator(starting_slot) - .map_err(|err| { - format!( - "Failed to load entries starting from slot {}: {:?}", - starting_slot, err - ) - })? - .filter_map(|(slot, _slot_meta)| { - if let Some(ending_slot) = &ending_slot { - if slot > *ending_slot { - return None; - } - } - Some(slot) - }) - .collect(); - - if blockstore_slots.is_empty() { - info!("Ledger has no slots in the specified range"); - return Ok(()); - } - info!( - "Found {} slots in the range ({}, {})", - blockstore_slots.len(), - blockstore_slots.first().unwrap(), - blockstore_slots.last().unwrap() - ); - - let mut blockstore_slots_with_no_confirmed_block = HashSet::new(); - - // Gather the blocks that are already present in bigtable, by slot - let bigtable_slots = { - let mut bigtable_slots = vec![]; - let first_blockstore_slot = *blockstore_slots.first().unwrap(); - let last_blockstore_slot = *blockstore_slots.last().unwrap(); - info!( - "Loading list of bigtable blocks between slots {} and {}...", - first_blockstore_slot, last_blockstore_slot - ); - - let mut start_slot = *blockstore_slots.first().unwrap(); - while start_slot <= last_blockstore_slot { - let mut next_bigtable_slots = loop { - match bigtable.get_confirmed_blocks(start_slot, 1000).await { - Ok(slots) => break slots, - Err(err) => { - error!("get_confirmed_blocks for {} failed: {:?}", start_slot, err); - // Consider exponential backoff... - delay_for(Duration::from_secs(2)).await; - } - } - }; - if next_bigtable_slots.is_empty() { - break; - } - bigtable_slots.append(&mut next_bigtable_slots); - start_slot = bigtable_slots.last().unwrap() + 1; - } - bigtable_slots - .into_iter() - .filter(|slot| *slot <= last_blockstore_slot) - .collect::>() - }; - - // The blocks that still need to be uploaded is the difference between what's already in the - // bigtable and what's in blockstore... - let blocks_to_upload = { - let blockstore_slots = blockstore_slots.iter().cloned().collect::>(); - let bigtable_slots = bigtable_slots.into_iter().collect::>(); - - let mut blocks_to_upload = blockstore_slots - .difference(&blockstore_slots_with_no_confirmed_block) - .cloned() - .collect::>() - .difference(&bigtable_slots) - .cloned() - .collect::>(); - blocks_to_upload.sort(); - blocks_to_upload - }; - - if blocks_to_upload.is_empty() { - info!("No blocks need to be uploaded to bigtable"); - return Ok(()); - } - info!( - "{} blocks to be uploaded to the bucket in the range ({}, {})", - blocks_to_upload.len(), - blocks_to_upload.first().unwrap(), - blocks_to_upload.last().unwrap() - ); - - // Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading - let (_loader_thread, receiver) = { - let (sender, receiver) = std::sync::mpsc::sync_channel(BLOCK_READ_AHEAD_DEPTH); - ( - std::thread::spawn(move || { - let mut measure = Measure::start("block loader thread"); - for (i, slot) in blocks_to_upload.iter().enumerate() { - let _ = match blockstore.get_confirmed_block( - *slot, - Some(solana_transaction_status::UiTransactionEncoding::Base64), - ) { - Ok(confirmed_block) => sender.send((*slot, Some(confirmed_block))), - Err(err) => { - warn!( - "Failed to get load confirmed block from slot {}: {:?}", - slot, err - ); - sender.send((*slot, None)) - } - }; - - if i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 { - info!( - "{}% of blocks processed ({}/{})", - i * 100 / blocks_to_upload.len(), - i, - blocks_to_upload.len() - ); - } - } - measure.stop(); - info!("{} to load {} blocks", measure, blocks_to_upload.len()); - }), - receiver, - ) - }; - - let mut failures = 0; - use futures::stream::StreamExt; - - let mut stream = - tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL); - - while let Some(blocks) = stream.next().await { - let mut measure_upload = Measure::start("Upload"); - let mut num_blocks = blocks.len(); - info!("Preparing the next {} blocks for upload", num_blocks); - - let uploads = blocks.into_iter().filter_map(|(slot, block)| match block { - None => { - blockstore_slots_with_no_confirmed_block.insert(slot); - num_blocks -= 1; - None - } - Some(confirmed_block) => { - if confirmed_block - .transactions - .iter() - .any(|transaction| transaction.meta.is_none()) - { - if allow_missing_metadata { - info!("Transaction metadata missing from slot {}", slot); - } else { - panic!("Transaction metadata missing from slot {}", slot); - } - } - Some(bigtable.upload_confirmed_block(slot, confirmed_block)) - } - }); - - for result in futures::future::join_all(uploads).await { - if result.is_err() { - error!("upload_confirmed_block() failed: {:?}", result.err()); - failures += 1; - } - } - - measure_upload.stop(); - info!("{} for {} blocks", measure_upload, num_blocks); - } - - measure.stop(); - info!("{}", measure); - if failures > 0 { - Err(format!("Incomplete upload, {} operations failed", failures).into()) - } else { - Ok(()) - } + solana_ledger::bigtable_upload::upload_confirmed_blocks( + Arc::new(blockstore), + bigtable, + starting_slot, + ending_slot, + allow_missing_metadata, + Arc::new(AtomicBool::new(false)), + ) + .await } async fn first_available_block() -> Result<(), Box> { diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index f4ebfa0938..529b418c86 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -17,6 +17,8 @@ dlopen_derive = "0.1.4" dlopen = "0.1.8" ed25519-dalek = "1.0.0-pre.4" fs_extra = "1.1.0" +futures = "0.3.5" +futures-util = "0.3.5" itertools = "0.9.0" lazy_static = "1.4.0" libc = "0.2.72" @@ -40,9 +42,11 @@ solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "1.3.9" } solana-runtime = { path = "../runtime", version = "1.3.9" } solana-sdk = { path = "../sdk", version = "1.3.9" } solana-stake-program = { path = "../programs/stake", version = "1.3.9" } +solana-storage-bigtable = { path = "../storage-bigtable", version = "1.3.9" } solana-vote-program = { path = "../programs/vote", version = "1.3.9" } tempfile = "3.1.0" thiserror = "1.0" +tokio = { version = "0.2.22", features = ["full"] } trees = "0.2.1" [dependencies.rocksdb] diff --git a/ledger/src/bigtable_upload.rs b/ledger/src/bigtable_upload.rs new file mode 100644 index 0000000000..9d4120ef59 --- /dev/null +++ b/ledger/src/bigtable_upload.rs @@ -0,0 +1,222 @@ +use crate::blockstore::Blockstore; +use log::*; +use solana_measure::measure::Measure; +use solana_sdk::clock::Slot; +use std::{ + collections::HashSet, + result::Result, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; +use tokio::time::delay_for; + +// Attempt to upload this many blocks in parallel +const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32; + +// Read up to this many blocks from blockstore before blocking on the upload process +const BLOCK_READ_AHEAD_DEPTH: usize = NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2; + +pub async fn upload_confirmed_blocks( + blockstore: Arc, + bigtable: solana_storage_bigtable::LedgerStorage, + starting_slot: Slot, + ending_slot: Option, + allow_missing_metadata: bool, + exit: Arc, +) -> Result<(), Box> { + let mut measure = Measure::start("entire upload"); + + info!("Loading ledger slots starting at {}...", starting_slot); + let blockstore_slots: Vec<_> = blockstore + .slot_meta_iterator(starting_slot) + .map_err(|err| { + format!( + "Failed to load entries starting from slot {}: {:?}", + starting_slot, err + ) + })? + .filter_map(|(slot, _slot_meta)| { + if let Some(ending_slot) = &ending_slot { + if slot > *ending_slot { + return None; + } + } + Some(slot) + }) + .collect(); + + if blockstore_slots.is_empty() { + return Err(format!( + "Ledger has no slots from {} to {:?}", + starting_slot, ending_slot + ) + .into()); + } + + info!( + "Found {} slots in the range ({}, {})", + blockstore_slots.len(), + blockstore_slots.first().unwrap(), + blockstore_slots.last().unwrap() + ); + + // Gather the blocks that are already present in bigtable, by slot + let bigtable_slots = { + let mut bigtable_slots = vec![]; + let first_blockstore_slot = *blockstore_slots.first().unwrap(); + let last_blockstore_slot = *blockstore_slots.last().unwrap(); + info!( + "Loading list of bigtable blocks between slots {} and {}...", + first_blockstore_slot, last_blockstore_slot + ); + + let mut start_slot = *blockstore_slots.first().unwrap(); + while start_slot <= last_blockstore_slot { + let mut next_bigtable_slots = loop { + match bigtable.get_confirmed_blocks(start_slot, 1000).await { + Ok(slots) => break slots, + Err(err) => { + error!("get_confirmed_blocks for {} failed: {:?}", start_slot, err); + // Consider exponential backoff... + delay_for(Duration::from_secs(2)).await; + } + } + }; + if next_bigtable_slots.is_empty() { + break; + } + bigtable_slots.append(&mut next_bigtable_slots); + start_slot = bigtable_slots.last().unwrap() + 1; + } + bigtable_slots + .into_iter() + .filter(|slot| *slot <= last_blockstore_slot) + .collect::>() + }; + + // The blocks that still need to be uploaded is the difference between what's already in the + // bigtable and what's in blockstore... + let blocks_to_upload = { + let blockstore_slots = blockstore_slots.iter().cloned().collect::>(); + let bigtable_slots = bigtable_slots.into_iter().collect::>(); + + let mut blocks_to_upload = blockstore_slots + .difference(&bigtable_slots) + .cloned() + .collect::>(); + blocks_to_upload.sort(); + blocks_to_upload + }; + + if blocks_to_upload.is_empty() { + info!("No blocks need to be uploaded to bigtable"); + return Ok(()); + } + info!( + "{} blocks to be uploaded to the bucket in the range ({}, {})", + blocks_to_upload.len(), + blocks_to_upload.first().unwrap(), + blocks_to_upload.last().unwrap() + ); + + // Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading + let (_loader_thread, receiver) = { + let exit = exit.clone(); + + let (sender, receiver) = std::sync::mpsc::sync_channel(BLOCK_READ_AHEAD_DEPTH); + ( + std::thread::spawn(move || { + let mut measure = Measure::start("block loader thread"); + for (i, slot) in blocks_to_upload.iter().enumerate() { + if exit.load(Ordering::Relaxed) { + break; + } + + let _ = match blockstore.get_confirmed_block( + *slot, + Some(solana_transaction_status::UiTransactionEncoding::Base64), + ) { + Ok(confirmed_block) => sender.send((*slot, Some(confirmed_block))), + Err(err) => { + warn!( + "Failed to get load confirmed block from slot {}: {:?}", + slot, err + ); + sender.send((*slot, None)) + } + }; + + if i > 0 && i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 { + info!( + "{}% of blocks processed ({}/{})", + i * 100 / blocks_to_upload.len(), + i, + blocks_to_upload.len() + ); + } + } + measure.stop(); + info!("{} to load {} blocks", measure, blocks_to_upload.len()); + }), + receiver, + ) + }; + + let mut failures = 0; + use futures::stream::StreamExt; + + let mut stream = + tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL); + + while let Some(blocks) = stream.next().await { + if exit.load(Ordering::Relaxed) { + break; + } + + let mut measure_upload = Measure::start("Upload"); + let mut num_blocks = blocks.len(); + info!("Preparing the next {} blocks for upload", num_blocks); + + let uploads = blocks.into_iter().filter_map(|(slot, block)| match block { + None => { + num_blocks -= 1; + None + } + Some(confirmed_block) => { + if confirmed_block + .transactions + .iter() + .any(|transaction| transaction.meta.is_none()) + { + if allow_missing_metadata { + info!("Transaction metadata missing from slot {}", slot); + } else { + panic!("Transaction metadata missing from slot {}", slot); + } + } + Some(bigtable.upload_confirmed_block(slot, confirmed_block)) + } + }); + + for result in futures::future::join_all(uploads).await { + if result.is_err() { + error!("upload_confirmed_block() failed: {:?}", result.err()); + failures += 1; + } + } + + measure_upload.stop(); + info!("{} for {} blocks", measure_upload, num_blocks); + } + + measure.stop(); + info!("{}", measure); + if failures > 0 { + Err(format!("Incomplete upload, {} operations failed", failures).into()) + } else { + Ok(()) + } +} diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index b034f8d3c3..0178e9b6f4 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -1,4 +1,5 @@ pub mod bank_forks_utils; +pub mod bigtable_upload; pub mod block_error; #[macro_use] pub mod blockstore; diff --git a/validator/src/main.rs b/validator/src/main.rs index deaa899cff..2b07bc2cb6 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -659,6 +659,13 @@ pub fn main() { .help("Fetch historical transaction info from a BigTable instance \ as a fallback to local ledger data"), ) + .arg( + Arg::with_name("enable_bigtable_ledger_upload") + .long("enable-bigtable-ledger-upload") + .requires("enable_rpc_transaction_history") + .takes_value(false) + .help("Upload new confirmed blocks into a BigTable instance"), + ) .arg( Arg::with_name("health_check_slot_distance") .long("health-check-slot-distance") @@ -976,6 +983,7 @@ pub fn main() { enable_rpc_transaction_history: matches.is_present("enable_rpc_transaction_history"), enable_bigtable_ledger_storage: matches .is_present("enable_rpc_bigtable_ledger_storage"), + enable_bigtable_ledger_upload: matches.is_present("enable_bigtable_ledger_upload"), identity_pubkey: identity_keypair.pubkey(), faucet_addr: matches.value_of("rpc_faucet_addr").map(|address| { solana_net_utils::parse_host_port(address).expect("failed to parse faucet address")