Add BigTableUploadService
This commit is contained in:
		
				
					committed by
					
						![mergify[bot]](/avatar/e3df20cd7a67969c41a65f03bea54961?size=40) mergify[bot]
						mergify[bot]
					
				
			
			
				
	
			
			
			
						parent
						
							82d9624736
						
					
				
				
					commit
					e3753186af
				
			
							
								
								
									
										83
									
								
								core/src/bigtable_upload_service.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										83
									
								
								core/src/bigtable_upload_service.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,83 @@ | ||||
| use solana_ledger::blockstore::Blockstore; | ||||
| use solana_runtime::commitment::BlockCommitmentCache; | ||||
| use std::{ | ||||
|     sync::atomic::{AtomicBool, Ordering}, | ||||
|     sync::{Arc, RwLock}, | ||||
|     thread::{self, Builder, JoinHandle}, | ||||
| }; | ||||
| use tokio::runtime; | ||||
|  | ||||
| pub struct BigTableUploadService { | ||||
|     thread: JoinHandle<()>, | ||||
| } | ||||
|  | ||||
| impl BigTableUploadService { | ||||
|     pub fn new( | ||||
|         runtime_handle: runtime::Handle, | ||||
|         bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage, | ||||
|         blockstore: Arc<Blockstore>, | ||||
|         block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, | ||||
|         exit: Arc<AtomicBool>, | ||||
|     ) -> Self { | ||||
|         info!("Starting BigTable upload service"); | ||||
|         let thread = Builder::new() | ||||
|             .name("bigtable-upload".to_string()) | ||||
|             .spawn(move || { | ||||
|                 Self::run( | ||||
|                     runtime_handle, | ||||
|                     bigtable_ledger_storage, | ||||
|                     blockstore, | ||||
|                     block_commitment_cache, | ||||
|                     exit, | ||||
|                 ) | ||||
|             }) | ||||
|             .unwrap(); | ||||
|  | ||||
|         Self { thread } | ||||
|     } | ||||
|  | ||||
|     fn run( | ||||
|         runtime: runtime::Handle, | ||||
|         bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage, | ||||
|         blockstore: Arc<Blockstore>, | ||||
|         block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, | ||||
|         exit: Arc<AtomicBool>, | ||||
|     ) { | ||||
|         let mut starting_slot = 0; | ||||
|         loop { | ||||
|             if exit.load(Ordering::Relaxed) { | ||||
|                 break; | ||||
|             } | ||||
|  | ||||
|             let max_confirmed_root = block_commitment_cache | ||||
|                 .read() | ||||
|                 .unwrap() | ||||
|                 .highest_confirmed_root(); | ||||
|  | ||||
|             if max_confirmed_root == starting_slot { | ||||
|                 std::thread::sleep(std::time::Duration::from_secs(1)); | ||||
|                 continue; | ||||
|             } | ||||
|  | ||||
|             let result = runtime.block_on(solana_ledger::bigtable_upload::upload_confirmed_blocks( | ||||
|                 blockstore.clone(), | ||||
|                 bigtable_ledger_storage.clone(), | ||||
|                 starting_slot, | ||||
|                 Some(max_confirmed_root), | ||||
|                 true, | ||||
|             )); | ||||
|  | ||||
|             match result { | ||||
|                 Ok(()) => starting_slot = max_confirmed_root, | ||||
|                 Err(err) => { | ||||
|                     warn!("bigtable: upload_confirmed_blocks: {}", err); | ||||
|                     std::thread::sleep(std::time::Duration::from_secs(2)); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn join(self) -> thread::Result<()> { | ||||
|         self.thread.join() | ||||
|     } | ||||
| } | ||||
| @@ -8,6 +8,7 @@ | ||||
| pub mod accounts_background_service; | ||||
| pub mod accounts_hash_verifier; | ||||
| pub mod banking_stage; | ||||
| pub mod bigtable_upload_service; | ||||
| pub mod broadcast_stage; | ||||
| pub mod cluster_info_vote_listener; | ||||
| pub mod commitment; | ||||
|   | ||||
| @@ -1,9 +1,10 @@ | ||||
| //! The `rpc_service` module implements the Solana JSON RPC service. | ||||
|  | ||||
| use crate::{ | ||||
|     cluster_info::ClusterInfo, commitment::BlockCommitmentCache, poh_recorder::PohRecorder, rpc::*, | ||||
|     rpc_health::*, send_transaction_service::LeaderInfo, | ||||
|     send_transaction_service::SendTransactionService, validator::ValidatorExit, | ||||
|     bigtable_upload_service::BigTableUploadService, cluster_info::ClusterInfo, | ||||
|     commitment::BlockCommitmentCache, poh_recorder::PohRecorder, rpc::*, rpc_health::*, | ||||
|     send_transaction_service::LeaderInfo, send_transaction_service::SendTransactionService, | ||||
|     validator::ValidatorExit, | ||||
| }; | ||||
| use jsonrpc_core::MetaIoHandler; | ||||
| use jsonrpc_http_server::{ | ||||
| @@ -272,22 +273,36 @@ impl JsonRpcService { | ||||
|             .build() | ||||
|             .expect("Runtime"); | ||||
|  | ||||
|         let bigtable_ledger_storage = | ||||
|         let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false)); | ||||
|  | ||||
|         let (bigtable_ledger_storage, _bigtable_ledger_upload_service) = | ||||
|             if config.enable_bigtable_ledger_storage || config.enable_bigtable_ledger_upload { | ||||
|                 runtime | ||||
|                     .block_on(solana_storage_bigtable::LedgerStorage::new( | ||||
|                         config.enable_bigtable_ledger_upload, | ||||
|                     )) | ||||
|                     .map(|x| { | ||||
|                     .map(|bigtable_ledger_storage| { | ||||
|                         info!("BigTable ledger storage initialized"); | ||||
|                         Some(x) | ||||
|  | ||||
|                         let bigtable_ledger_upload_service = Arc::new(BigTableUploadService::new( | ||||
|                             runtime.handle().clone(), | ||||
|                             bigtable_ledger_storage.clone(), | ||||
|                             blockstore.clone(), | ||||
|                             block_commitment_cache.clone(), | ||||
|                             exit_bigtable_ledger_upload_service.clone(), | ||||
|                         )); | ||||
|  | ||||
|                         ( | ||||
|                             Some(bigtable_ledger_storage), | ||||
|                             Some(bigtable_ledger_upload_service), | ||||
|                         ) | ||||
|                     }) | ||||
|                     .unwrap_or_else(|err| { | ||||
|                         error!("Failed to initialize BigTable ledger storage: {:?}", err); | ||||
|                         None | ||||
|                         (None, None) | ||||
|                     }) | ||||
|             } else { | ||||
|                 None | ||||
|                 (None, None) | ||||
|             }; | ||||
|  | ||||
|         let request_processor = JsonRpcRequestProcessor::new( | ||||
| @@ -349,6 +364,7 @@ impl JsonRpcService { | ||||
|                 close_handle_sender.send(server.close_handle()).unwrap(); | ||||
|                 server.wait(); | ||||
|                 exit_send_transaction_service.store(true, Ordering::Relaxed); | ||||
|                 exit_bigtable_ledger_upload_service.store(true, Ordering::Relaxed); | ||||
|             }) | ||||
|             .unwrap(); | ||||
|  | ||||
|   | ||||
| @@ -53,6 +53,7 @@ tar = "0.4.28" | ||||
| thiserror = "1.0" | ||||
| tempfile = "3.1.0" | ||||
| lazy_static = "1.4.0" | ||||
| tokio = { version = "0.2.22", features = ["full"] } | ||||
| trees = "0.2.1" | ||||
|  | ||||
| [dependencies.rocksdb] | ||||
|   | ||||
| @@ -20,7 +20,7 @@ pub async fn upload_confirmed_blocks( | ||||
| ) -> Result<(), Box<dyn std::error::Error>> { | ||||
|     let mut measure = Measure::start("entire upload"); | ||||
|  | ||||
|     info!("Loading ledger slots..."); | ||||
|     info!("Loading ledger slots starting at {}...", starting_slot); | ||||
|     let blockstore_slots: Vec<_> = blockstore | ||||
|         .slot_meta_iterator(starting_slot) | ||||
|         .map_err(|err| { | ||||
| @@ -40,8 +40,11 @@ pub async fn upload_confirmed_blocks( | ||||
|         .collect(); | ||||
|  | ||||
|     if blockstore_slots.is_empty() { | ||||
|         info!("Ledger has no slots in the specified range"); | ||||
|         return Ok(()); | ||||
|         return Err(format!( | ||||
|             "Ledger has no slots from {} to {:?}", | ||||
|             starting_slot, ending_slot | ||||
|         ) | ||||
|         .into()); | ||||
|     } | ||||
|  | ||||
|     info!( | ||||
| @@ -136,7 +139,7 @@ pub async fn upload_confirmed_blocks( | ||||
|                         } | ||||
|                     }; | ||||
|  | ||||
|                     if i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 { | ||||
|                     if i > 0 && i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 { | ||||
|                         info!( | ||||
|                             "{}% of blocks processed ({}/{})", | ||||
|                             i * 100 / blocks_to_upload.len(), | ||||
|   | ||||
		Reference in New Issue
	
	Block a user