* Add --enable-bigtable-ledger-upload flag (cherry picked from commitd8e2038dda
) * Relocate BigTable uploader to ledger/ crate (cherry picked from commit91a56caed2
) # Conflicts: # ledger/Cargo.toml * Cargo.lock (cherry picked from commit2b8a521562
) * Add BigTableUploadService (cherry picked from commitbc7731b969
) * Add BigTableUploadService (cherry picked from commitbafdcf24f5
) * Add exit flag for bigtable upload operations (cherry picked from commitd3611f74c8
) * Remove dead code (cherry picked from commitcd3c134b58
) * Request correct access (cherry picked from commit4ba43c29ce
) * Add LARGEST_CONFIRMED_ROOT_UPLOAD_DELAY (cherry picked from commitb64fb295a1
) * Resolve merge conflicts Co-authored-by: Michael Vines <mvines@gmail.com>
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -3848,6 +3848,8 @@ dependencies = [
|
|||||||
"dlopen_derive",
|
"dlopen_derive",
|
||||||
"ed25519-dalek",
|
"ed25519-dalek",
|
||||||
"fs_extra",
|
"fs_extra",
|
||||||
|
"futures 0.3.5",
|
||||||
|
"futures-util",
|
||||||
"itertools 0.9.0",
|
"itertools 0.9.0",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"libc",
|
"libc",
|
||||||
@ -3873,10 +3875,12 @@ dependencies = [
|
|||||||
"solana-runtime",
|
"solana-runtime",
|
||||||
"solana-sdk 1.3.9",
|
"solana-sdk 1.3.9",
|
||||||
"solana-stake-program",
|
"solana-stake-program",
|
||||||
|
"solana-storage-bigtable",
|
||||||
"solana-transaction-status",
|
"solana-transaction-status",
|
||||||
"solana-vote-program",
|
"solana-vote-program",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
|
"tokio 0.2.22",
|
||||||
"trees",
|
"trees",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
93
core/src/bigtable_upload_service.rs
Normal file
93
core/src/bigtable_upload_service.rs
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
use solana_ledger::blockstore::Blockstore;
|
||||||
|
use solana_runtime::commitment::BlockCommitmentCache;
|
||||||
|
use std::{
|
||||||
|
sync::atomic::{AtomicBool, Ordering},
|
||||||
|
sync::{Arc, RwLock},
|
||||||
|
thread::{self, Builder, JoinHandle},
|
||||||
|
};
|
||||||
|
use tokio::runtime;
|
||||||
|
|
||||||
|
// Delay uploading the largest confirmed root for this many slots. This is done in an attempt to
|
||||||
|
// ensure that the `CacheBlockTimeService` has had enough time to add the block time for the root
|
||||||
|
// before it's uploaded to BigTable.
|
||||||
|
//
|
||||||
|
// A more direct connection between CacheBlockTimeService and BigTableUploadService would be
|
||||||
|
// preferable...
|
||||||
|
const LARGEST_CONFIRMED_ROOT_UPLOAD_DELAY: usize = 100;
|
||||||
|
|
||||||
|
pub struct BigTableUploadService {
|
||||||
|
thread: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BigTableUploadService {
|
||||||
|
pub fn new(
|
||||||
|
runtime_handle: runtime::Handle,
|
||||||
|
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
|
||||||
|
blockstore: Arc<Blockstore>,
|
||||||
|
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
||||||
|
exit: Arc<AtomicBool>,
|
||||||
|
) -> Self {
|
||||||
|
info!("Starting BigTable upload service");
|
||||||
|
let thread = Builder::new()
|
||||||
|
.name("bigtable-upload".to_string())
|
||||||
|
.spawn(move || {
|
||||||
|
Self::run(
|
||||||
|
runtime_handle,
|
||||||
|
bigtable_ledger_storage,
|
||||||
|
blockstore,
|
||||||
|
block_commitment_cache,
|
||||||
|
exit,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
Self { thread }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run(
|
||||||
|
runtime: runtime::Handle,
|
||||||
|
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
|
||||||
|
blockstore: Arc<Blockstore>,
|
||||||
|
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
||||||
|
exit: Arc<AtomicBool>,
|
||||||
|
) {
|
||||||
|
let mut start_slot = 0;
|
||||||
|
loop {
|
||||||
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let end_slot = block_commitment_cache
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.highest_confirmed_root()
|
||||||
|
.saturating_sub(LARGEST_CONFIRMED_ROOT_UPLOAD_DELAY as u64);
|
||||||
|
|
||||||
|
if end_slot <= start_slot {
|
||||||
|
std::thread::sleep(std::time::Duration::from_secs(1));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let result = runtime.block_on(solana_ledger::bigtable_upload::upload_confirmed_blocks(
|
||||||
|
blockstore.clone(),
|
||||||
|
bigtable_ledger_storage.clone(),
|
||||||
|
start_slot,
|
||||||
|
Some(end_slot),
|
||||||
|
true,
|
||||||
|
exit.clone(),
|
||||||
|
));
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(()) => start_slot = end_slot,
|
||||||
|
Err(err) => {
|
||||||
|
warn!("bigtable: upload_confirmed_blocks: {}", err);
|
||||||
|
std::thread::sleep(std::time::Duration::from_secs(2));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn join(self) -> thread::Result<()> {
|
||||||
|
self.thread.join()
|
||||||
|
}
|
||||||
|
}
|
@ -9,6 +9,7 @@
|
|||||||
pub mod accounts_background_service;
|
pub mod accounts_background_service;
|
||||||
pub mod accounts_hash_verifier;
|
pub mod accounts_hash_verifier;
|
||||||
pub mod banking_stage;
|
pub mod banking_stage;
|
||||||
|
pub mod bigtable_upload_service;
|
||||||
pub mod broadcast_stage;
|
pub mod broadcast_stage;
|
||||||
pub mod cluster_info_vote_listener;
|
pub mod cluster_info_vote_listener;
|
||||||
pub mod commitment_service;
|
pub mod commitment_service;
|
||||||
|
@ -99,6 +99,7 @@ pub struct JsonRpcConfig {
|
|||||||
pub faucet_addr: Option<SocketAddr>,
|
pub faucet_addr: Option<SocketAddr>,
|
||||||
pub health_check_slot_distance: u64,
|
pub health_check_slot_distance: u64,
|
||||||
pub enable_bigtable_ledger_storage: bool,
|
pub enable_bigtable_ledger_storage: bool,
|
||||||
|
pub enable_bigtable_ledger_upload: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -1,6 +1,9 @@
|
|||||||
//! The `rpc_service` module implements the Solana JSON RPC service.
|
//! The `rpc_service` module implements the Solana JSON RPC service.
|
||||||
|
|
||||||
use crate::{cluster_info::ClusterInfo, rpc::*, rpc_health::*, validator::ValidatorExit};
|
use crate::{
|
||||||
|
bigtable_upload_service::BigTableUploadService, cluster_info::ClusterInfo, rpc::*,
|
||||||
|
rpc_health::*, validator::ValidatorExit,
|
||||||
|
};
|
||||||
use jsonrpc_core::MetaIoHandler;
|
use jsonrpc_core::MetaIoHandler;
|
||||||
use jsonrpc_http_server::{
|
use jsonrpc_http_server::{
|
||||||
hyper, AccessControlAllowOrigin, CloseHandle, DomainsValidation, RequestMiddleware,
|
hyper, AccessControlAllowOrigin, CloseHandle, DomainsValidation, RequestMiddleware,
|
||||||
@ -260,20 +263,37 @@ impl JsonRpcService {
|
|||||||
.build()
|
.build()
|
||||||
.expect("Runtime");
|
.expect("Runtime");
|
||||||
|
|
||||||
let bigtable_ledger_storage = if config.enable_bigtable_ledger_storage {
|
let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false));
|
||||||
runtime
|
|
||||||
.block_on(solana_storage_bigtable::LedgerStorage::new(false))
|
let (bigtable_ledger_storage, _bigtable_ledger_upload_service) =
|
||||||
.map(|x| {
|
if config.enable_bigtable_ledger_storage || config.enable_bigtable_ledger_upload {
|
||||||
info!("BigTable ledger storage initialized");
|
runtime
|
||||||
Some(x)
|
.block_on(solana_storage_bigtable::LedgerStorage::new(
|
||||||
})
|
!config.enable_bigtable_ledger_upload,
|
||||||
.unwrap_or_else(|err| {
|
))
|
||||||
error!("Failed to initialize BigTable ledger storage: {:?}", err);
|
.map(|bigtable_ledger_storage| {
|
||||||
None
|
info!("BigTable ledger storage initialized");
|
||||||
})
|
|
||||||
} else {
|
let bigtable_ledger_upload_service = Arc::new(BigTableUploadService::new(
|
||||||
None
|
runtime.handle().clone(),
|
||||||
};
|
bigtable_ledger_storage.clone(),
|
||||||
|
blockstore.clone(),
|
||||||
|
block_commitment_cache.clone(),
|
||||||
|
exit_bigtable_ledger_upload_service.clone(),
|
||||||
|
));
|
||||||
|
|
||||||
|
(
|
||||||
|
Some(bigtable_ledger_storage),
|
||||||
|
Some(bigtable_ledger_upload_service),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|err| {
|
||||||
|
error!("Failed to initialize BigTable ledger storage: {:?}", err);
|
||||||
|
(None, None)
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
(None, None)
|
||||||
|
};
|
||||||
|
|
||||||
let (request_processor, receiver) = JsonRpcRequestProcessor::new(
|
let (request_processor, receiver) = JsonRpcRequestProcessor::new(
|
||||||
config,
|
config,
|
||||||
@ -341,6 +361,7 @@ impl JsonRpcService {
|
|||||||
close_handle_sender.send(server.close_handle()).unwrap();
|
close_handle_sender.send(server.close_handle()).unwrap();
|
||||||
server.wait();
|
server.wait();
|
||||||
exit_send_transaction_service.store(true, Ordering::Relaxed);
|
exit_send_transaction_service.store(true, Ordering::Relaxed);
|
||||||
|
exit_bigtable_ledger_upload_service.store(true, Ordering::Relaxed);
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
@ -1,23 +1,19 @@
|
|||||||
/// The `bigtable` subcommand
|
/// The `bigtable` subcommand
|
||||||
use clap::{value_t, value_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand};
|
use clap::{value_t, value_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand};
|
||||||
use log::*;
|
|
||||||
use solana_clap_utils::{
|
use solana_clap_utils::{
|
||||||
input_parsers::pubkey_of,
|
input_parsers::pubkey_of,
|
||||||
input_validators::{is_slot, is_valid_pubkey},
|
input_validators::{is_slot, is_valid_pubkey},
|
||||||
};
|
};
|
||||||
use solana_cli::display::println_transaction;
|
use solana_cli::display::println_transaction;
|
||||||
use solana_ledger::{blockstore::Blockstore, blockstore_db::AccessType};
|
use solana_ledger::{blockstore::Blockstore, blockstore_db::AccessType};
|
||||||
use solana_measure::measure::Measure;
|
|
||||||
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature};
|
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature};
|
||||||
use solana_transaction_status::UiTransactionEncoding;
|
use solana_transaction_status::UiTransactionEncoding;
|
||||||
use std::{collections::HashSet, path::Path, process::exit, result::Result, time::Duration};
|
use std::{
|
||||||
use tokio::time::delay_for;
|
path::Path,
|
||||||
|
process::exit,
|
||||||
// Attempt to upload this many blocks in parallel
|
result::Result,
|
||||||
const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32;
|
sync::{atomic::AtomicBool, Arc},
|
||||||
|
};
|
||||||
// Read up to this many blocks from blockstore before blocking on the upload process
|
|
||||||
const BLOCK_READ_AHEAD_DEPTH: usize = NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2;
|
|
||||||
|
|
||||||
async fn upload(
|
async fn upload(
|
||||||
blockstore: Blockstore,
|
blockstore: Blockstore,
|
||||||
@ -25,194 +21,19 @@ async fn upload(
|
|||||||
ending_slot: Option<Slot>,
|
ending_slot: Option<Slot>,
|
||||||
allow_missing_metadata: bool,
|
allow_missing_metadata: bool,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let mut measure = Measure::start("entire upload");
|
|
||||||
|
|
||||||
let bigtable = solana_storage_bigtable::LedgerStorage::new(false)
|
let bigtable = solana_storage_bigtable::LedgerStorage::new(false)
|
||||||
.await
|
.await
|
||||||
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
|
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
|
||||||
|
|
||||||
info!("Loading ledger slots...");
|
solana_ledger::bigtable_upload::upload_confirmed_blocks(
|
||||||
let blockstore_slots: Vec<_> = blockstore
|
Arc::new(blockstore),
|
||||||
.slot_meta_iterator(starting_slot)
|
bigtable,
|
||||||
.map_err(|err| {
|
starting_slot,
|
||||||
format!(
|
ending_slot,
|
||||||
"Failed to load entries starting from slot {}: {:?}",
|
allow_missing_metadata,
|
||||||
starting_slot, err
|
Arc::new(AtomicBool::new(false)),
|
||||||
)
|
)
|
||||||
})?
|
.await
|
||||||
.filter_map(|(slot, _slot_meta)| {
|
|
||||||
if let Some(ending_slot) = &ending_slot {
|
|
||||||
if slot > *ending_slot {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some(slot)
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
if blockstore_slots.is_empty() {
|
|
||||||
info!("Ledger has no slots in the specified range");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
info!(
|
|
||||||
"Found {} slots in the range ({}, {})",
|
|
||||||
blockstore_slots.len(),
|
|
||||||
blockstore_slots.first().unwrap(),
|
|
||||||
blockstore_slots.last().unwrap()
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut blockstore_slots_with_no_confirmed_block = HashSet::new();
|
|
||||||
|
|
||||||
// Gather the blocks that are already present in bigtable, by slot
|
|
||||||
let bigtable_slots = {
|
|
||||||
let mut bigtable_slots = vec![];
|
|
||||||
let first_blockstore_slot = *blockstore_slots.first().unwrap();
|
|
||||||
let last_blockstore_slot = *blockstore_slots.last().unwrap();
|
|
||||||
info!(
|
|
||||||
"Loading list of bigtable blocks between slots {} and {}...",
|
|
||||||
first_blockstore_slot, last_blockstore_slot
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut start_slot = *blockstore_slots.first().unwrap();
|
|
||||||
while start_slot <= last_blockstore_slot {
|
|
||||||
let mut next_bigtable_slots = loop {
|
|
||||||
match bigtable.get_confirmed_blocks(start_slot, 1000).await {
|
|
||||||
Ok(slots) => break slots,
|
|
||||||
Err(err) => {
|
|
||||||
error!("get_confirmed_blocks for {} failed: {:?}", start_slot, err);
|
|
||||||
// Consider exponential backoff...
|
|
||||||
delay_for(Duration::from_secs(2)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if next_bigtable_slots.is_empty() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
bigtable_slots.append(&mut next_bigtable_slots);
|
|
||||||
start_slot = bigtable_slots.last().unwrap() + 1;
|
|
||||||
}
|
|
||||||
bigtable_slots
|
|
||||||
.into_iter()
|
|
||||||
.filter(|slot| *slot <= last_blockstore_slot)
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
};
|
|
||||||
|
|
||||||
// The blocks that still need to be uploaded is the difference between what's already in the
|
|
||||||
// bigtable and what's in blockstore...
|
|
||||||
let blocks_to_upload = {
|
|
||||||
let blockstore_slots = blockstore_slots.iter().cloned().collect::<HashSet<_>>();
|
|
||||||
let bigtable_slots = bigtable_slots.into_iter().collect::<HashSet<_>>();
|
|
||||||
|
|
||||||
let mut blocks_to_upload = blockstore_slots
|
|
||||||
.difference(&blockstore_slots_with_no_confirmed_block)
|
|
||||||
.cloned()
|
|
||||||
.collect::<HashSet<_>>()
|
|
||||||
.difference(&bigtable_slots)
|
|
||||||
.cloned()
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
blocks_to_upload.sort();
|
|
||||||
blocks_to_upload
|
|
||||||
};
|
|
||||||
|
|
||||||
if blocks_to_upload.is_empty() {
|
|
||||||
info!("No blocks need to be uploaded to bigtable");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
info!(
|
|
||||||
"{} blocks to be uploaded to the bucket in the range ({}, {})",
|
|
||||||
blocks_to_upload.len(),
|
|
||||||
blocks_to_upload.first().unwrap(),
|
|
||||||
blocks_to_upload.last().unwrap()
|
|
||||||
);
|
|
||||||
|
|
||||||
// Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading
|
|
||||||
let (_loader_thread, receiver) = {
|
|
||||||
let (sender, receiver) = std::sync::mpsc::sync_channel(BLOCK_READ_AHEAD_DEPTH);
|
|
||||||
(
|
|
||||||
std::thread::spawn(move || {
|
|
||||||
let mut measure = Measure::start("block loader thread");
|
|
||||||
for (i, slot) in blocks_to_upload.iter().enumerate() {
|
|
||||||
let _ = match blockstore.get_confirmed_block(
|
|
||||||
*slot,
|
|
||||||
Some(solana_transaction_status::UiTransactionEncoding::Base64),
|
|
||||||
) {
|
|
||||||
Ok(confirmed_block) => sender.send((*slot, Some(confirmed_block))),
|
|
||||||
Err(err) => {
|
|
||||||
warn!(
|
|
||||||
"Failed to get load confirmed block from slot {}: {:?}",
|
|
||||||
slot, err
|
|
||||||
);
|
|
||||||
sender.send((*slot, None))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 {
|
|
||||||
info!(
|
|
||||||
"{}% of blocks processed ({}/{})",
|
|
||||||
i * 100 / blocks_to_upload.len(),
|
|
||||||
i,
|
|
||||||
blocks_to_upload.len()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
measure.stop();
|
|
||||||
info!("{} to load {} blocks", measure, blocks_to_upload.len());
|
|
||||||
}),
|
|
||||||
receiver,
|
|
||||||
)
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut failures = 0;
|
|
||||||
use futures::stream::StreamExt;
|
|
||||||
|
|
||||||
let mut stream =
|
|
||||||
tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL);
|
|
||||||
|
|
||||||
while let Some(blocks) = stream.next().await {
|
|
||||||
let mut measure_upload = Measure::start("Upload");
|
|
||||||
let mut num_blocks = blocks.len();
|
|
||||||
info!("Preparing the next {} blocks for upload", num_blocks);
|
|
||||||
|
|
||||||
let uploads = blocks.into_iter().filter_map(|(slot, block)| match block {
|
|
||||||
None => {
|
|
||||||
blockstore_slots_with_no_confirmed_block.insert(slot);
|
|
||||||
num_blocks -= 1;
|
|
||||||
None
|
|
||||||
}
|
|
||||||
Some(confirmed_block) => {
|
|
||||||
if confirmed_block
|
|
||||||
.transactions
|
|
||||||
.iter()
|
|
||||||
.any(|transaction| transaction.meta.is_none())
|
|
||||||
{
|
|
||||||
if allow_missing_metadata {
|
|
||||||
info!("Transaction metadata missing from slot {}", slot);
|
|
||||||
} else {
|
|
||||||
panic!("Transaction metadata missing from slot {}", slot);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some(bigtable.upload_confirmed_block(slot, confirmed_block))
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
for result in futures::future::join_all(uploads).await {
|
|
||||||
if result.is_err() {
|
|
||||||
error!("upload_confirmed_block() failed: {:?}", result.err());
|
|
||||||
failures += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
measure_upload.stop();
|
|
||||||
info!("{} for {} blocks", measure_upload, num_blocks);
|
|
||||||
}
|
|
||||||
|
|
||||||
measure.stop();
|
|
||||||
info!("{}", measure);
|
|
||||||
if failures > 0 {
|
|
||||||
Err(format!("Incomplete upload, {} operations failed", failures).into())
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn first_available_block() -> Result<(), Box<dyn std::error::Error>> {
|
async fn first_available_block() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
@ -17,6 +17,8 @@ dlopen_derive = "0.1.4"
|
|||||||
dlopen = "0.1.8"
|
dlopen = "0.1.8"
|
||||||
ed25519-dalek = "1.0.0-pre.4"
|
ed25519-dalek = "1.0.0-pre.4"
|
||||||
fs_extra = "1.1.0"
|
fs_extra = "1.1.0"
|
||||||
|
futures = "0.3.5"
|
||||||
|
futures-util = "0.3.5"
|
||||||
itertools = "0.9.0"
|
itertools = "0.9.0"
|
||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
libc = "0.2.72"
|
libc = "0.2.72"
|
||||||
@ -40,9 +42,11 @@ solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "1.3.9" }
|
|||||||
solana-runtime = { path = "../runtime", version = "1.3.9" }
|
solana-runtime = { path = "../runtime", version = "1.3.9" }
|
||||||
solana-sdk = { path = "../sdk", version = "1.3.9" }
|
solana-sdk = { path = "../sdk", version = "1.3.9" }
|
||||||
solana-stake-program = { path = "../programs/stake", version = "1.3.9" }
|
solana-stake-program = { path = "../programs/stake", version = "1.3.9" }
|
||||||
|
solana-storage-bigtable = { path = "../storage-bigtable", version = "1.3.9" }
|
||||||
solana-vote-program = { path = "../programs/vote", version = "1.3.9" }
|
solana-vote-program = { path = "../programs/vote", version = "1.3.9" }
|
||||||
tempfile = "3.1.0"
|
tempfile = "3.1.0"
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
|
tokio = { version = "0.2.22", features = ["full"] }
|
||||||
trees = "0.2.1"
|
trees = "0.2.1"
|
||||||
|
|
||||||
[dependencies.rocksdb]
|
[dependencies.rocksdb]
|
||||||
|
222
ledger/src/bigtable_upload.rs
Normal file
222
ledger/src/bigtable_upload.rs
Normal file
@ -0,0 +1,222 @@
|
|||||||
|
use crate::blockstore::Blockstore;
|
||||||
|
use log::*;
|
||||||
|
use solana_measure::measure::Measure;
|
||||||
|
use solana_sdk::clock::Slot;
|
||||||
|
use std::{
|
||||||
|
collections::HashSet,
|
||||||
|
result::Result,
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicBool, Ordering},
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
use tokio::time::delay_for;
|
||||||
|
|
||||||
|
// Attempt to upload this many blocks in parallel
|
||||||
|
const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32;
|
||||||
|
|
||||||
|
// Read up to this many blocks from blockstore before blocking on the upload process
|
||||||
|
const BLOCK_READ_AHEAD_DEPTH: usize = NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2;
|
||||||
|
|
||||||
|
pub async fn upload_confirmed_blocks(
|
||||||
|
blockstore: Arc<Blockstore>,
|
||||||
|
bigtable: solana_storage_bigtable::LedgerStorage,
|
||||||
|
starting_slot: Slot,
|
||||||
|
ending_slot: Option<Slot>,
|
||||||
|
allow_missing_metadata: bool,
|
||||||
|
exit: Arc<AtomicBool>,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let mut measure = Measure::start("entire upload");
|
||||||
|
|
||||||
|
info!("Loading ledger slots starting at {}...", starting_slot);
|
||||||
|
let blockstore_slots: Vec<_> = blockstore
|
||||||
|
.slot_meta_iterator(starting_slot)
|
||||||
|
.map_err(|err| {
|
||||||
|
format!(
|
||||||
|
"Failed to load entries starting from slot {}: {:?}",
|
||||||
|
starting_slot, err
|
||||||
|
)
|
||||||
|
})?
|
||||||
|
.filter_map(|(slot, _slot_meta)| {
|
||||||
|
if let Some(ending_slot) = &ending_slot {
|
||||||
|
if slot > *ending_slot {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(slot)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if blockstore_slots.is_empty() {
|
||||||
|
return Err(format!(
|
||||||
|
"Ledger has no slots from {} to {:?}",
|
||||||
|
starting_slot, ending_slot
|
||||||
|
)
|
||||||
|
.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"Found {} slots in the range ({}, {})",
|
||||||
|
blockstore_slots.len(),
|
||||||
|
blockstore_slots.first().unwrap(),
|
||||||
|
blockstore_slots.last().unwrap()
|
||||||
|
);
|
||||||
|
|
||||||
|
// Gather the blocks that are already present in bigtable, by slot
|
||||||
|
let bigtable_slots = {
|
||||||
|
let mut bigtable_slots = vec![];
|
||||||
|
let first_blockstore_slot = *blockstore_slots.first().unwrap();
|
||||||
|
let last_blockstore_slot = *blockstore_slots.last().unwrap();
|
||||||
|
info!(
|
||||||
|
"Loading list of bigtable blocks between slots {} and {}...",
|
||||||
|
first_blockstore_slot, last_blockstore_slot
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut start_slot = *blockstore_slots.first().unwrap();
|
||||||
|
while start_slot <= last_blockstore_slot {
|
||||||
|
let mut next_bigtable_slots = loop {
|
||||||
|
match bigtable.get_confirmed_blocks(start_slot, 1000).await {
|
||||||
|
Ok(slots) => break slots,
|
||||||
|
Err(err) => {
|
||||||
|
error!("get_confirmed_blocks for {} failed: {:?}", start_slot, err);
|
||||||
|
// Consider exponential backoff...
|
||||||
|
delay_for(Duration::from_secs(2)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if next_bigtable_slots.is_empty() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
bigtable_slots.append(&mut next_bigtable_slots);
|
||||||
|
start_slot = bigtable_slots.last().unwrap() + 1;
|
||||||
|
}
|
||||||
|
bigtable_slots
|
||||||
|
.into_iter()
|
||||||
|
.filter(|slot| *slot <= last_blockstore_slot)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
};
|
||||||
|
|
||||||
|
// The blocks that still need to be uploaded is the difference between what's already in the
|
||||||
|
// bigtable and what's in blockstore...
|
||||||
|
let blocks_to_upload = {
|
||||||
|
let blockstore_slots = blockstore_slots.iter().cloned().collect::<HashSet<_>>();
|
||||||
|
let bigtable_slots = bigtable_slots.into_iter().collect::<HashSet<_>>();
|
||||||
|
|
||||||
|
let mut blocks_to_upload = blockstore_slots
|
||||||
|
.difference(&bigtable_slots)
|
||||||
|
.cloned()
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
blocks_to_upload.sort();
|
||||||
|
blocks_to_upload
|
||||||
|
};
|
||||||
|
|
||||||
|
if blocks_to_upload.is_empty() {
|
||||||
|
info!("No blocks need to be uploaded to bigtable");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
info!(
|
||||||
|
"{} blocks to be uploaded to the bucket in the range ({}, {})",
|
||||||
|
blocks_to_upload.len(),
|
||||||
|
blocks_to_upload.first().unwrap(),
|
||||||
|
blocks_to_upload.last().unwrap()
|
||||||
|
);
|
||||||
|
|
||||||
|
// Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading
|
||||||
|
let (_loader_thread, receiver) = {
|
||||||
|
let exit = exit.clone();
|
||||||
|
|
||||||
|
let (sender, receiver) = std::sync::mpsc::sync_channel(BLOCK_READ_AHEAD_DEPTH);
|
||||||
|
(
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
let mut measure = Measure::start("block loader thread");
|
||||||
|
for (i, slot) in blocks_to_upload.iter().enumerate() {
|
||||||
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = match blockstore.get_confirmed_block(
|
||||||
|
*slot,
|
||||||
|
Some(solana_transaction_status::UiTransactionEncoding::Base64),
|
||||||
|
) {
|
||||||
|
Ok(confirmed_block) => sender.send((*slot, Some(confirmed_block))),
|
||||||
|
Err(err) => {
|
||||||
|
warn!(
|
||||||
|
"Failed to get load confirmed block from slot {}: {:?}",
|
||||||
|
slot, err
|
||||||
|
);
|
||||||
|
sender.send((*slot, None))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if i > 0 && i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 {
|
||||||
|
info!(
|
||||||
|
"{}% of blocks processed ({}/{})",
|
||||||
|
i * 100 / blocks_to_upload.len(),
|
||||||
|
i,
|
||||||
|
blocks_to_upload.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
measure.stop();
|
||||||
|
info!("{} to load {} blocks", measure, blocks_to_upload.len());
|
||||||
|
}),
|
||||||
|
receiver,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut failures = 0;
|
||||||
|
use futures::stream::StreamExt;
|
||||||
|
|
||||||
|
let mut stream =
|
||||||
|
tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL);
|
||||||
|
|
||||||
|
while let Some(blocks) = stream.next().await {
|
||||||
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut measure_upload = Measure::start("Upload");
|
||||||
|
let mut num_blocks = blocks.len();
|
||||||
|
info!("Preparing the next {} blocks for upload", num_blocks);
|
||||||
|
|
||||||
|
let uploads = blocks.into_iter().filter_map(|(slot, block)| match block {
|
||||||
|
None => {
|
||||||
|
num_blocks -= 1;
|
||||||
|
None
|
||||||
|
}
|
||||||
|
Some(confirmed_block) => {
|
||||||
|
if confirmed_block
|
||||||
|
.transactions
|
||||||
|
.iter()
|
||||||
|
.any(|transaction| transaction.meta.is_none())
|
||||||
|
{
|
||||||
|
if allow_missing_metadata {
|
||||||
|
info!("Transaction metadata missing from slot {}", slot);
|
||||||
|
} else {
|
||||||
|
panic!("Transaction metadata missing from slot {}", slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(bigtable.upload_confirmed_block(slot, confirmed_block))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
for result in futures::future::join_all(uploads).await {
|
||||||
|
if result.is_err() {
|
||||||
|
error!("upload_confirmed_block() failed: {:?}", result.err());
|
||||||
|
failures += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
measure_upload.stop();
|
||||||
|
info!("{} for {} blocks", measure_upload, num_blocks);
|
||||||
|
}
|
||||||
|
|
||||||
|
measure.stop();
|
||||||
|
info!("{}", measure);
|
||||||
|
if failures > 0 {
|
||||||
|
Err(format!("Incomplete upload, {} operations failed", failures).into())
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,5 @@
|
|||||||
pub mod bank_forks_utils;
|
pub mod bank_forks_utils;
|
||||||
|
pub mod bigtable_upload;
|
||||||
pub mod block_error;
|
pub mod block_error;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
pub mod blockstore;
|
pub mod blockstore;
|
||||||
|
@ -659,6 +659,13 @@ pub fn main() {
|
|||||||
.help("Fetch historical transaction info from a BigTable instance \
|
.help("Fetch historical transaction info from a BigTable instance \
|
||||||
as a fallback to local ledger data"),
|
as a fallback to local ledger data"),
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("enable_bigtable_ledger_upload")
|
||||||
|
.long("enable-bigtable-ledger-upload")
|
||||||
|
.requires("enable_rpc_transaction_history")
|
||||||
|
.takes_value(false)
|
||||||
|
.help("Upload new confirmed blocks into a BigTable instance"),
|
||||||
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("health_check_slot_distance")
|
Arg::with_name("health_check_slot_distance")
|
||||||
.long("health-check-slot-distance")
|
.long("health-check-slot-distance")
|
||||||
@ -976,6 +983,7 @@ pub fn main() {
|
|||||||
enable_rpc_transaction_history: matches.is_present("enable_rpc_transaction_history"),
|
enable_rpc_transaction_history: matches.is_present("enable_rpc_transaction_history"),
|
||||||
enable_bigtable_ledger_storage: matches
|
enable_bigtable_ledger_storage: matches
|
||||||
.is_present("enable_rpc_bigtable_ledger_storage"),
|
.is_present("enable_rpc_bigtable_ledger_storage"),
|
||||||
|
enable_bigtable_ledger_upload: matches.is_present("enable_bigtable_ledger_upload"),
|
||||||
identity_pubkey: identity_keypair.pubkey(),
|
identity_pubkey: identity_keypair.pubkey(),
|
||||||
faucet_addr: matches.value_of("rpc_faucet_addr").map(|address| {
|
faucet_addr: matches.value_of("rpc_faucet_addr").map(|address| {
|
||||||
solana_net_utils::parse_host_port(address).expect("failed to parse faucet address")
|
solana_net_utils::parse_host_port(address).expect("failed to parse faucet address")
|
||||||
|
Reference in New Issue
Block a user