diff --git a/Cargo.lock b/Cargo.lock index efbc4ff229..90d2a77df6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6135,6 +6135,7 @@ dependencies = [ "solana-runtime", "solana-sdk", "solana-send-transaction-service", + "solana-storage-bigtable", "solana-streamer", "solana-test-validator", "solana-version", diff --git a/ledger-tool/src/bigtable.rs b/ledger-tool/src/bigtable.rs index e1061242af..5c2d8785b3 100644 --- a/ledger-tool/src/bigtable.rs +++ b/ledger-tool/src/bigtable.rs @@ -34,8 +34,9 @@ async fn upload( starting_slot: Slot, ending_slot: Option, force_reupload: bool, + config: solana_storage_bigtable::LedgerStorageConfig, ) -> Result<(), Box> { - let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None, None) + let bigtable = solana_storage_bigtable::LedgerStorage::new_with_config(config) .await .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; @@ -50,17 +51,22 @@ async fn upload( .await } -async fn delete_slots(slots: Vec, dry_run: bool) -> Result<(), Box> { - let read_only = dry_run; - let bigtable = solana_storage_bigtable::LedgerStorage::new(read_only, None, None) +async fn delete_slots( + slots: Vec, + config: solana_storage_bigtable::LedgerStorageConfig, +) -> Result<(), Box> { + let dry_run = config.read_only; + let bigtable = solana_storage_bigtable::LedgerStorage::new_with_config(config) .await .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; solana_ledger::bigtable_delete::delete_confirmed_blocks(bigtable, slots, dry_run).await } -async fn first_available_block() -> Result<(), Box> { - let bigtable = solana_storage_bigtable::LedgerStorage::new(true, None, None).await?; +async fn first_available_block( + config: solana_storage_bigtable::LedgerStorageConfig, +) -> Result<(), Box> { + let bigtable = solana_storage_bigtable::LedgerStorage::new_with_config(config).await?; match bigtable.get_first_available_block().await? { Some(block) => println!("{}", block), None => println!("No blocks available"), @@ -69,8 +75,12 @@ async fn first_available_block() -> Result<(), Box> { Ok(()) } -async fn block(slot: Slot, output_format: OutputFormat) -> Result<(), Box> { - let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None, None) +async fn block( + slot: Slot, + output_format: OutputFormat, + config: solana_storage_bigtable::LedgerStorageConfig, +) -> Result<(), Box> { + let bigtable = solana_storage_bigtable::LedgerStorage::new_with_config(config) .await .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; @@ -101,8 +111,12 @@ async fn block(slot: Slot, output_format: OutputFormat) -> Result<(), Box Result<(), Box> { - let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None, None) +async fn blocks( + starting_slot: Slot, + limit: usize, + config: solana_storage_bigtable::LedgerStorageConfig, +) -> Result<(), Box> { + let bigtable = solana_storage_bigtable::LedgerStorage::new_with_config(config) .await .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; @@ -116,11 +130,10 @@ async fn blocks(starting_slot: Slot, limit: usize) -> Result<(), Box Result<(), Box> { - assert!(!credential_path.is_empty()); - - let owned_bigtable = solana_storage_bigtable::LedgerStorage::new(false, None, None) + let owned_bigtable = solana_storage_bigtable::LedgerStorage::new_with_config(config) .await .map_err(|err| format!("failed to connect to owned bigtable: {:?}", err))?; let owned_bigtable_slots = owned_bigtable @@ -130,10 +143,9 @@ async fn compare_blocks( "owned bigtable {} blocks found ", owned_bigtable_slots.len() ); - let reference_bigtable = - solana_storage_bigtable::LedgerStorage::new(false, None, Some(credential_path)) - .await - .map_err(|err| format!("failed to connect to reference bigtable: {:?}", err))?; + let reference_bigtable = solana_storage_bigtable::LedgerStorage::new_with_config(ref_config) + .await + .map_err(|err| format!("failed to connect to reference bigtable: {:?}", err))?; let reference_bigtable_slots = reference_bigtable .get_confirmed_blocks(starting_slot, limit) @@ -160,8 +172,9 @@ async fn confirm( signature: &Signature, verbose: bool, output_format: OutputFormat, + config: solana_storage_bigtable::LedgerStorageConfig, ) -> Result<(), Box> { - let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None, None) + let bigtable = solana_storage_bigtable::LedgerStorage::new_with_config(config) .await .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; @@ -211,8 +224,9 @@ pub async fn transaction_history( verbose: bool, show_transactions: bool, query_chunk_size: usize, + config: solana_storage_bigtable::LedgerStorageConfig, ) -> Result<(), Box> { - let bigtable = solana_storage_bigtable::LedgerStorage::new(true, None, None).await?; + let bigtable = solana_storage_bigtable::LedgerStorage::new_with_config(config).await?; let mut loaded_block: Option<(Slot, ConfirmedBlock)> = None; while limit > 0 { @@ -308,6 +322,15 @@ impl BigTableSubCommand for App<'_, '_> { .about("Ledger data on a BigTable instance") .setting(AppSettings::InferSubcommands) .setting(AppSettings::SubcommandRequiredElseHelp) + .arg( + Arg::with_name("rpc_bigtable_instance_name") + .global(true) + .long("rpc-bigtable-instance-name") + .takes_value(true) + .value_name("INSTANCE_NAME") + .default_value(solana_storage_bigtable::DEFAULT_INSTANCE_NAME) + .help("Name of the target Bigtable instance") + ) .subcommand( SubCommand::with_name("upload") .about("Upload the ledger to BigTable") @@ -417,7 +440,8 @@ impl BigTableSubCommand for App<'_, '_> { .required(true) .default_value("1000") .help("Maximum number of slots to check"), - ).arg( + ) + .arg( Arg::with_name("reference_credential") .long("reference-credential") .short("c") @@ -425,6 +449,14 @@ impl BigTableSubCommand for App<'_, '_> { .takes_value(true) .required(true) .help("File path for a credential to a reference bigtable"), + ) + .arg( + Arg::with_name("reference_instance_name") + .long("reference-instance-name") + .takes_value(true) + .value_name("INSTANCE_NAME") + .default_value(solana_storage_bigtable::DEFAULT_INSTANCE_NAME) + .help("Name of the reference Bigtable instance to compare to") ), ) .subcommand( @@ -521,7 +553,28 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) { let verbose = matches.is_present("verbose"); let output_format = OutputFormat::from_matches(matches, "output_format", verbose); - let future = match matches.subcommand() { + // this is kinda stupid, but there seems to be a bug in clap when a subcommand + // arg is marked both `global(true)` and `default_value("default_value")`. + // despite the "global", when the arg is specified on the subcommand, its value + // is not propagated down to the (sub)subcommand args, resulting in the default + // value when queried there. similarly, if the arg is specified on the + // (sub)subcommand, the value is not propagated back up to the subcommand args, + // again resulting in the default value. the arg having declared a + // `default_value()` obviates `is_present(...)` tests since they will always + // return true. so we consede and compare against the expected default. :/ + let (subcommand, sub_matches) = matches.subcommand(); + let on_command = matches + .value_of("rpc_bigtable_instance_name") + .map(|v| v != solana_storage_bigtable::DEFAULT_INSTANCE_NAME) + .unwrap_or(false); + let instance_name = if on_command { + value_t_or_exit!(matches, "rpc_bigtable_instance_name", String) + } else { + let sub_matches = sub_matches.as_ref().unwrap(); + value_t_or_exit!(sub_matches, "rpc_bigtable_instance_name", String) + }; + + let future = match (subcommand, sub_matches) { ("upload", Some(arg_matches)) => { let starting_slot = value_t!(arg_matches, "starting_slot", Slot).unwrap_or(0); let ending_slot = value_t!(arg_matches, "ending_slot", Slot).ok(); @@ -531,41 +584,79 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) { AccessType::TryPrimaryThenSecondary, None, ); - + let config = solana_storage_bigtable::LedgerStorageConfig { + read_only: false, + instance_name, + ..solana_storage_bigtable::LedgerStorageConfig::default() + }; runtime.block_on(upload( blockstore, starting_slot, ending_slot, force_reupload, + config, )) } ("delete-slots", Some(arg_matches)) => { let slots = values_t_or_exit!(arg_matches, "slots", Slot); - let dry_run = !arg_matches.is_present("force"); - runtime.block_on(delete_slots(slots, dry_run)) + let config = solana_storage_bigtable::LedgerStorageConfig { + read_only: !arg_matches.is_present("force"), + instance_name, + ..solana_storage_bigtable::LedgerStorageConfig::default() + }; + runtime.block_on(delete_slots(slots, config)) + } + ("first-available-block", Some(_arg_matches)) => { + let config = solana_storage_bigtable::LedgerStorageConfig { + read_only: true, + instance_name, + ..solana_storage_bigtable::LedgerStorageConfig::default() + }; + runtime.block_on(first_available_block(config)) } - ("first-available-block", Some(_arg_matches)) => runtime.block_on(first_available_block()), ("block", Some(arg_matches)) => { let slot = value_t_or_exit!(arg_matches, "slot", Slot); - runtime.block_on(block(slot, output_format)) + let config = solana_storage_bigtable::LedgerStorageConfig { + read_only: false, + instance_name, + ..solana_storage_bigtable::LedgerStorageConfig::default() + }; + runtime.block_on(block(slot, output_format, config)) } ("blocks", Some(arg_matches)) => { let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot); let limit = value_t_or_exit!(arg_matches, "limit", usize); + let config = solana_storage_bigtable::LedgerStorageConfig { + read_only: false, + instance_name, + ..solana_storage_bigtable::LedgerStorageConfig::default() + }; - runtime.block_on(blocks(starting_slot, limit)) + runtime.block_on(blocks(starting_slot, limit, config)) } ("compare-blocks", Some(arg_matches)) => { let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot); let limit = value_t_or_exit!(arg_matches, "limit", usize); - let reference_credential_filepath = - value_t_or_exit!(arg_matches, "reference_credential", String); + let config = solana_storage_bigtable::LedgerStorageConfig { + read_only: false, + instance_name, + ..solana_storage_bigtable::LedgerStorageConfig::default() + }; + let credential_path = Some(value_t_or_exit!( + arg_matches, + "reference_credential", + String + )); + let ref_instance_name = + value_t_or_exit!(arg_matches, "reference_instance_name", String); + let ref_config = solana_storage_bigtable::LedgerStorageConfig { + read_only: false, + credential_path, + instance_name: ref_instance_name, + ..solana_storage_bigtable::LedgerStorageConfig::default() + }; - runtime.block_on(compare_blocks( - starting_slot, - limit, - reference_credential_filepath, - )) + runtime.block_on(compare_blocks(starting_slot, limit, config, ref_config)) } ("confirm", Some(arg_matches)) => { let signature = arg_matches @@ -573,8 +664,13 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) { .unwrap() .parse() .expect("Invalid signature"); + let config = solana_storage_bigtable::LedgerStorageConfig { + read_only: false, + instance_name, + ..solana_storage_bigtable::LedgerStorageConfig::default() + }; - runtime.block_on(confirm(&signature, verbose, output_format)) + runtime.block_on(confirm(&signature, verbose, output_format, config)) } ("transaction-history", Some(arg_matches)) => { let address = pubkey_of(arg_matches, "address").unwrap(); @@ -587,6 +683,11 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) { .value_of("until") .map(|signature| signature.parse().expect("Invalid signature")); let show_transactions = arg_matches.is_present("show_transactions"); + let config = solana_storage_bigtable::LedgerStorageConfig { + read_only: true, + instance_name, + ..solana_storage_bigtable::LedgerStorageConfig::default() + }; runtime.block_on(transaction_history( &address, @@ -596,6 +697,7 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) { verbose, show_transactions, query_chunk_size, + config, )) } _ => unreachable!(), diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 9d72a3a48c..ba93817702 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -146,13 +146,11 @@ pub struct JsonRpcConfig { pub enable_cpi_and_log_storage: bool, pub faucet_addr: Option, pub health_check_slot_distance: u64, - pub enable_bigtable_ledger_storage: bool, - pub enable_bigtable_ledger_upload: bool, + pub rpc_bigtable_config: Option, pub max_multiple_accounts: Option, pub account_indexes: AccountSecondaryIndexes, pub rpc_threads: usize, pub rpc_niceness_adj: i8, - pub rpc_bigtable_timeout: Option, pub full_api: bool, pub obsolete_v1_7_api: bool, pub rpc_scan_and_fix_roots: bool, @@ -167,6 +165,24 @@ impl JsonRpcConfig { } } +#[derive(Debug, Clone)] +pub struct RpcBigtableConfig { + pub enable_bigtable_ledger_upload: bool, + pub bigtable_instance_name: String, + pub timeout: Option, +} + +impl Default for RpcBigtableConfig { + fn default() -> Self { + let bigtable_instance_name = solana_storage_bigtable::DEFAULT_INSTANCE_NAME.to_string(); + Self { + enable_bigtable_ledger_upload: false, + bigtable_instance_name, + timeout: None, + } + } +} + #[derive(Clone)] pub struct JsonRpcRequestProcessor { bank_forks: Arc>, diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 2e5048cc53..6656a3ba10 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -376,18 +376,26 @@ impl JsonRpcService { let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false)); let (bigtable_ledger_storage, _bigtable_ledger_upload_service) = - if config.enable_bigtable_ledger_storage || config.enable_bigtable_ledger_upload { + if let Some(RpcBigtableConfig { + enable_bigtable_ledger_upload, + ref bigtable_instance_name, + timeout, + }) = config.rpc_bigtable_config + { + let bigtable_config = solana_storage_bigtable::LedgerStorageConfig { + read_only: !enable_bigtable_ledger_upload, + timeout, + credential_path: None, + instance_name: bigtable_instance_name.clone(), + }; runtime - .block_on(solana_storage_bigtable::LedgerStorage::new( - !config.enable_bigtable_ledger_upload, - config.rpc_bigtable_timeout, - None, + .block_on(solana_storage_bigtable::LedgerStorage::new_with_config( + bigtable_config, )) .map(|bigtable_ledger_storage| { info!("BigTable ledger storage initialized"); - let bigtable_ledger_upload_service = if config.enable_bigtable_ledger_upload - { + let bigtable_ledger_upload_service = if enable_bigtable_ledger_upload { Some(Arc::new(BigTableUploadService::new( runtime.clone(), bigtable_ledger_storage.clone(), diff --git a/storage-bigtable/src/lib.rs b/storage-bigtable/src/lib.rs index 075d98044b..c41a4bdacf 100644 --- a/storage-bigtable/src/lib.rs +++ b/storage-bigtable/src/lib.rs @@ -361,6 +361,27 @@ impl From for TransactionByAddrInfo { } } +pub const DEFAULT_INSTANCE_NAME: &str = "solana-ledger"; + +#[derive(Debug)] +pub struct LedgerStorageConfig { + pub read_only: bool, + pub timeout: Option, + pub credential_path: Option, + pub instance_name: String, +} + +impl Default for LedgerStorageConfig { + fn default() -> Self { + Self { + read_only: true, + timeout: None, + credential_path: None, + instance_name: DEFAULT_INSTANCE_NAME.to_string(), + } + } +} + #[derive(Clone)] pub struct LedgerStorage { connection: bigtable::BigTableConnection, @@ -372,9 +393,29 @@ impl LedgerStorage { timeout: Option, credential_path: Option, ) -> Result { - let connection = - bigtable::BigTableConnection::new("solana-ledger", read_only, timeout, credential_path) - .await?; + Self::new_with_config(LedgerStorageConfig { + read_only, + timeout, + credential_path, + ..LedgerStorageConfig::default() + }) + .await + } + + pub async fn new_with_config(config: LedgerStorageConfig) -> Result { + let LedgerStorageConfig { + read_only, + timeout, + credential_path, + instance_name, + } = config; + let connection = bigtable::BigTableConnection::new( + instance_name.as_str(), + read_only, + timeout, + credential_path, + ) + .await?; Ok(Self { connection }) } diff --git a/validator/Cargo.toml b/validator/Cargo.toml index c52e5ac0a0..f5f944b07b 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -48,6 +48,7 @@ solana-rpc = { path = "../rpc", version = "=1.10.9" } solana-runtime = { path = "../runtime", version = "=1.10.9" } solana-sdk = { path = "../sdk", version = "=1.10.9" } solana-send-transaction-service = { path = "../send-transaction-service", version = "=1.10.9" } +solana-storage-bigtable = { path = "../storage-bigtable", version = "=1.10.9" } solana-streamer = { path = "../streamer", version = "=1.10.9" } solana-test-validator = { path = "../test-validator", version = "=1.10.9" } solana-version = { path = "../version", version = "=1.10.9" } diff --git a/validator/src/bin/solana-test-validator.rs b/validator/src/bin/solana-test-validator.rs index 3e5e1470fd..02d54901fa 100644 --- a/validator/src/bin/solana-test-validator.rs +++ b/validator/src/bin/solana-test-validator.rs @@ -12,7 +12,10 @@ use { solana_client::rpc_client::RpcClient, solana_core::tower_storage::FileTowerStorage, solana_faucet::faucet::{run_local_faucet_with_port, FAUCET_PORT}, - solana_rpc::{rpc::JsonRpcConfig, rpc_pubsub_service::PubSubConfig}, + solana_rpc::{ + rpc::{JsonRpcConfig, RpcBigtableConfig}, + rpc_pubsub_service::PubSubConfig, + }, solana_sdk::{ account::AccountSharedData, clock::Slot, @@ -155,6 +158,23 @@ fn main() { .validator(solana_validator::port_validator) .help("Enable JSON RPC on this port, and the next port for the RPC websocket"), ) + .arg( + Arg::with_name("enable_rpc_bigtable_ledger_storage") + .long("enable-rpc-bigtable-ledger-storage") + .takes_value(false) + .hidden(true) + .help("Fetch historical transaction info from a BigTable instance \ + as a fallback to local ledger data"), + ) + .arg( + Arg::with_name("rpc_bigtable_instance") + .long("rpc-bigtable-instance") + .value_name("INSTANCE_NAME") + .takes_value(true) + .hidden(true) + .default_value("solana-ledger") + .help("Name of BigTable instance to target"), + ) .arg( Arg::with_name("rpc_pubsub_enable_vote_subscription") .long("rpc-pubsub-enable-vote-subscription") @@ -618,6 +638,16 @@ fn main() { None }; + let rpc_bigtable_config = if matches.is_present("enable_rpc_bigtable_ledger_storage") { + Some(RpcBigtableConfig { + enable_bigtable_ledger_upload: false, + bigtable_instance_name: value_t_or_exit!(matches, "rpc_bigtable_instance", String), + timeout: None, + }) + } else { + None + }; + genesis .ledger_path(&ledger_path) .tower_storage(tower_storage) @@ -628,6 +658,7 @@ fn main() { .rpc_config(JsonRpcConfig { enable_rpc_transaction_history: true, enable_cpi_and_log_storage: true, + rpc_bigtable_config, faucet_addr, ..JsonRpcConfig::default_for_test() }) diff --git a/validator/src/main.rs b/validator/src/main.rs index cd26438471..f0da3babe0 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -37,7 +37,10 @@ use { solana_perf::recycler::enable_recycler_warming, solana_poh::poh_service, solana_replica_lib::accountsdb_repl_server::AccountsDbReplServiceConfig, - solana_rpc::{rpc::JsonRpcConfig, rpc_pubsub_service::PubSubConfig}, + solana_rpc::{ + rpc::{JsonRpcConfig, RpcBigtableConfig}, + rpc_pubsub_service::PubSubConfig, + }, solana_runtime::{ accounts_db::{ AccountShrinkThreshold, AccountsDbConfig, DEFAULT_ACCOUNTS_SHRINK_OPTIMIZE_TOTAL_SPACE, @@ -1200,6 +1203,14 @@ pub fn main() { .default_value("30") .help("Number of seconds before timing out RPC requests backed by BigTable"), ) + .arg( + Arg::with_name("rpc_bigtable_instance_name") + .long("rpc-bigtable-instance-name") + .takes_value(true) + .value_name("INSTANCE_NAME") + .default_value(solana_storage_bigtable::DEFAULT_INSTANCE_NAME) + .help("Name of the Bigtable instance to upload to") + ) .arg( Arg::with_name("rpc_pubsub_worker_threads") .long("rpc-pubsub-worker-threads") @@ -2268,6 +2279,20 @@ pub fn main() { warn!("--minimal-rpc-api is now the default behavior. This flag is deprecated and can be removed from the launch args") } + let rpc_bigtable_config = if matches.is_present("enable_rpc_bigtable_ledger_storage") + || matches.is_present("enable_bigtable_ledger_upload") + { + Some(RpcBigtableConfig { + enable_bigtable_ledger_upload: matches.is_present("enable_bigtable_ledger_upload"), + bigtable_instance_name: value_t_or_exit!(matches, "rpc_bigtable_instance_name", String), + timeout: value_t!(matches, "rpc_bigtable_timeout", u64) + .ok() + .map(Duration::from_secs), + }) + } else { + None + }; + let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), tower_storage, @@ -2283,9 +2308,7 @@ pub fn main() { rpc_config: JsonRpcConfig { enable_rpc_transaction_history: matches.is_present("enable_rpc_transaction_history"), enable_cpi_and_log_storage: matches.is_present("enable_cpi_and_log_storage"), - enable_bigtable_ledger_storage: matches - .is_present("enable_rpc_bigtable_ledger_storage"), - enable_bigtable_ledger_upload: matches.is_present("enable_bigtable_ledger_upload"), + rpc_bigtable_config, faucet_addr: matches.value_of("rpc_faucet_addr").map(|address| { solana_net_utils::parse_host_port(address).expect("failed to parse faucet address") }), @@ -2303,9 +2326,6 @@ pub fn main() { ), rpc_threads: value_t_or_exit!(matches, "rpc_threads", usize), rpc_niceness_adj: value_t_or_exit!(matches, "rpc_niceness_adj", i8), - rpc_bigtable_timeout: value_t!(matches, "rpc_bigtable_timeout", u64) - .ok() - .map(Duration::from_secs), account_indexes: account_indexes.clone(), rpc_scan_and_fix_roots: matches.is_present("rpc_scan_and_fix_roots"), },