From e3753186af1e25efb04b3da1ef71705920739e82 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 3 Sep 2020 19:39:05 -0700 Subject: [PATCH] Add BigTableUploadService --- core/src/bigtable_upload_service.rs | 83 +++++++++++++++++++++++++++++ core/src/lib.rs | 1 + core/src/rpc_service.rs | 32 ++++++++--- ledger/Cargo.toml | 1 + ledger/src/bigtable_upload.rs | 11 ++-- 5 files changed, 116 insertions(+), 12 deletions(-) create mode 100644 core/src/bigtable_upload_service.rs diff --git a/core/src/bigtable_upload_service.rs b/core/src/bigtable_upload_service.rs new file mode 100644 index 0000000000..1d423b1486 --- /dev/null +++ b/core/src/bigtable_upload_service.rs @@ -0,0 +1,83 @@ +use solana_ledger::blockstore::Blockstore; +use solana_runtime::commitment::BlockCommitmentCache; +use std::{ + sync::atomic::{AtomicBool, Ordering}, + sync::{Arc, RwLock}, + thread::{self, Builder, JoinHandle}, +}; +use tokio::runtime; + +pub struct BigTableUploadService { + thread: JoinHandle<()>, +} + +impl BigTableUploadService { + pub fn new( + runtime_handle: runtime::Handle, + bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage, + blockstore: Arc, + block_commitment_cache: Arc>, + exit: Arc, + ) -> Self { + info!("Starting BigTable upload service"); + let thread = Builder::new() + .name("bigtable-upload".to_string()) + .spawn(move || { + Self::run( + runtime_handle, + bigtable_ledger_storage, + blockstore, + block_commitment_cache, + exit, + ) + }) + .unwrap(); + + Self { thread } + } + + fn run( + runtime: runtime::Handle, + bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage, + blockstore: Arc, + block_commitment_cache: Arc>, + exit: Arc, + ) { + let mut starting_slot = 0; + loop { + if exit.load(Ordering::Relaxed) { + break; + } + + let max_confirmed_root = block_commitment_cache + .read() + .unwrap() + .highest_confirmed_root(); + + if max_confirmed_root == starting_slot { + std::thread::sleep(std::time::Duration::from_secs(1)); + continue; + } + + let result = runtime.block_on(solana_ledger::bigtable_upload::upload_confirmed_blocks( + blockstore.clone(), + bigtable_ledger_storage.clone(), + starting_slot, + Some(max_confirmed_root), + true, + )); + + match result { + Ok(()) => starting_slot = max_confirmed_root, + Err(err) => { + warn!("bigtable: upload_confirmed_blocks: {}", err); + std::thread::sleep(std::time::Duration::from_secs(2)); + } + } + } + } + + pub fn join(self) -> thread::Result<()> { + self.thread.join() + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index a7dde58607..f02ed951db 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -8,6 +8,7 @@ pub mod accounts_background_service; pub mod accounts_hash_verifier; pub mod banking_stage; +pub mod bigtable_upload_service; pub mod broadcast_stage; pub mod cluster_info_vote_listener; pub mod commitment; diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index c3fa270793..351c399f69 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -1,9 +1,10 @@ //! The `rpc_service` module implements the Solana JSON RPC service. use crate::{ - cluster_info::ClusterInfo, commitment::BlockCommitmentCache, poh_recorder::PohRecorder, rpc::*, - rpc_health::*, send_transaction_service::LeaderInfo, - send_transaction_service::SendTransactionService, validator::ValidatorExit, + bigtable_upload_service::BigTableUploadService, cluster_info::ClusterInfo, + commitment::BlockCommitmentCache, poh_recorder::PohRecorder, rpc::*, rpc_health::*, + send_transaction_service::LeaderInfo, send_transaction_service::SendTransactionService, + validator::ValidatorExit, }; use jsonrpc_core::MetaIoHandler; use jsonrpc_http_server::{ @@ -272,22 +273,36 @@ impl JsonRpcService { .build() .expect("Runtime"); - let bigtable_ledger_storage = + let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false)); + + let (bigtable_ledger_storage, _bigtable_ledger_upload_service) = if config.enable_bigtable_ledger_storage || config.enable_bigtable_ledger_upload { runtime .block_on(solana_storage_bigtable::LedgerStorage::new( config.enable_bigtable_ledger_upload, )) - .map(|x| { + .map(|bigtable_ledger_storage| { info!("BigTable ledger storage initialized"); - Some(x) + + let bigtable_ledger_upload_service = Arc::new(BigTableUploadService::new( + runtime.handle().clone(), + bigtable_ledger_storage.clone(), + blockstore.clone(), + block_commitment_cache.clone(), + exit_bigtable_ledger_upload_service.clone(), + )); + + ( + Some(bigtable_ledger_storage), + Some(bigtable_ledger_upload_service), + ) }) .unwrap_or_else(|err| { error!("Failed to initialize BigTable ledger storage: {:?}", err); - None + (None, None) }) } else { - None + (None, None) }; let request_processor = JsonRpcRequestProcessor::new( @@ -349,6 +364,7 @@ impl JsonRpcService { close_handle_sender.send(server.close_handle()).unwrap(); server.wait(); exit_send_transaction_service.store(true, Ordering::Relaxed); + exit_bigtable_ledger_upload_service.store(true, Ordering::Relaxed); }) .unwrap(); diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index 8ff86465d9..f8b687c473 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -53,6 +53,7 @@ tar = "0.4.28" thiserror = "1.0" tempfile = "3.1.0" lazy_static = "1.4.0" +tokio = { version = "0.2.22", features = ["full"] } trees = "0.2.1" [dependencies.rocksdb] diff --git a/ledger/src/bigtable_upload.rs b/ledger/src/bigtable_upload.rs index fe2356ca49..fe347a1e98 100644 --- a/ledger/src/bigtable_upload.rs +++ b/ledger/src/bigtable_upload.rs @@ -20,7 +20,7 @@ pub async fn upload_confirmed_blocks( ) -> Result<(), Box> { let mut measure = Measure::start("entire upload"); - info!("Loading ledger slots..."); + info!("Loading ledger slots starting at {}...", starting_slot); let blockstore_slots: Vec<_> = blockstore .slot_meta_iterator(starting_slot) .map_err(|err| { @@ -40,8 +40,11 @@ pub async fn upload_confirmed_blocks( .collect(); if blockstore_slots.is_empty() { - info!("Ledger has no slots in the specified range"); - return Ok(()); + return Err(format!( + "Ledger has no slots from {} to {:?}", + starting_slot, ending_slot + ) + .into()); } info!( @@ -136,7 +139,7 @@ pub async fn upload_confirmed_blocks( } }; - if i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 { + if i > 0 && i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 { info!( "{}% of blocks processed ({}/{})", i * 100 / blocks_to_upload.len(),