Files
solana/core/src/bigtable_upload_service.rs

85 lines
2.5 KiB
Rust
Raw Normal View History

2020-09-04 18:49:48 +00:00
use crate::commitment::BlockCommitmentCache;
2020-09-03 19:39:05 -07:00
use solana_ledger::blockstore::Blockstore;
use std::{
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
};
use tokio::runtime;
pub struct BigTableUploadService {
thread: JoinHandle<()>,
}
impl BigTableUploadService {
pub fn new(
runtime_handle: runtime::Handle,
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
exit: Arc<AtomicBool>,
) -> Self {
info!("Starting BigTable upload service");
let thread = Builder::new()
.name("bigtable-upload".to_string())
.spawn(move || {
Self::run(
runtime_handle,
bigtable_ledger_storage,
blockstore,
block_commitment_cache,
exit,
)
})
.unwrap();
Self { thread }
}
fn run(
runtime: runtime::Handle,
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
exit: Arc<AtomicBool>,
) {
let mut starting_slot = 0;
loop {
if exit.load(Ordering::Relaxed) {
break;
}
2020-09-04 18:49:48 +00:00
let largest_confirmed_root = block_commitment_cache
2020-09-03 19:39:05 -07:00
.read()
.unwrap()
2020-09-04 18:49:48 +00:00
.largest_confirmed_root();
2020-09-03 19:39:05 -07:00
2020-09-04 18:49:48 +00:00
if largest_confirmed_root == starting_slot {
2020-09-03 19:39:05 -07:00
std::thread::sleep(std::time::Duration::from_secs(1));
continue;
}
let result = runtime.block_on(solana_ledger::bigtable_upload::upload_confirmed_blocks(
blockstore.clone(),
bigtable_ledger_storage.clone(),
starting_slot,
2020-09-04 18:49:48 +00:00
Some(largest_confirmed_root),
2020-09-03 19:39:05 -07:00
true,
exit.clone(),
2020-09-03 19:39:05 -07:00
));
match result {
2020-09-04 18:49:48 +00:00
Ok(()) => starting_slot = largest_confirmed_root,
2020-09-03 19:39:05 -07:00
Err(err) => {
warn!("bigtable: upload_confirmed_blocks: {}", err);
std::thread::sleep(std::time::Duration::from_secs(2));
}
}
}
}
pub fn join(self) -> thread::Result<()> {
self.thread.join()
}
}