Expose TransactionStatusService to the other blockstore_processor path (bp #11070) (#11074)

* Expose tss to the other blockstore_processor path (#11070)

(cherry picked from commit 9a80e31bae)

# Conflicts:
#	core/src/validator.rs
#	ledger/src/blockstore_processor.rs

* Fix conflicts

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>
Co-authored-by: Tyera Eulberg <tyera@solana.com>
This commit is contained in:
mergify[bot]
2020-07-15 05:50:45 +00:00
committed by GitHub
parent d1c0f4b4f1
commit d4bbb7f516
4 changed files with 100 additions and 50 deletions

View File

@ -9,7 +9,7 @@ use crate::{
gossip_service::{discover_cluster, GossipService},
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
poh_service::PohService,
rewards_recorder_service::RewardsRecorderService,
rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService},
rpc::JsonRpcConfig,
rpc_pubsub_service::PubSubService,
rpc_service::JsonRpcService,
@ -28,7 +28,8 @@ use solana_ledger::{
bank_forks::{BankForks, SnapshotConfig},
bank_forks_utils,
blockstore::{Blockstore, CompletedSlotsReceiver, PurgeType},
blockstore_processor, create_new_tmp_ledger,
blockstore_processor::{self, TransactionStatusSender},
create_new_tmp_ledger,
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
leader_schedule::FixedSchedule,
leader_schedule_cache::LeaderScheduleCache,
@ -129,6 +130,14 @@ impl ValidatorExit {
}
}
#[derive(Default)]
struct TransactionHistoryServices {
transaction_status_sender: Option<TransactionStatusSender>,
transaction_status_service: Option<TransactionStatusService>,
rewards_recorder_sender: Option<RewardsRecorderSender>,
rewards_recorder_service: Option<RewardsRecorderService>,
}
pub struct Validator {
pub id: Pubkey,
validator_exit: Arc<RwLock<Option<ValidatorExit>>>,
@ -198,6 +207,12 @@ impl Validator {
}
info!("creating bank...");
let mut validator_exit = ValidatorExit::default();
let exit = Arc::new(AtomicBool::new(false));
let exit_ = exit.clone();
validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed)));
let validator_exit = Arc::new(RwLock::new(Some(validator_exit)));
let (
genesis_config,
bank_forks,
@ -206,10 +221,15 @@ impl Validator {
completed_slots_receiver,
leader_schedule_cache,
snapshot_hash,
) = new_banks_from_blockstore(config, ledger_path, poh_verify);
TransactionHistoryServices {
transaction_status_sender,
transaction_status_service,
rewards_recorder_sender,
rewards_recorder_service,
},
) = new_banks_from_blockstore(config, ledger_path, poh_verify, &exit);
let leader_schedule_cache = Arc::new(leader_schedule_cache);
let exit = Arc::new(AtomicBool::new(false));
let bank = bank_forks.working_bank();
let bank_forks = Arc::new(RwLock::new(bank_forks));
@ -221,11 +241,6 @@ impl Validator {
}
}
let mut validator_exit = ValidatorExit::default();
let exit_ = exit.clone();
validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed)));
let validator_exit = Arc::new(RwLock::new(Some(validator_exit)));
node.info.wallclock = timestamp();
node.info.shred_version = compute_shred_version(
&genesis_config.hash(),
@ -244,7 +259,6 @@ impl Validator {
}
let cluster_info = Arc::new(ClusterInfo::new(node.info.clone(), keypair.clone()));
let blockstore = Arc::new(blockstore);
let block_commitment_cache = Arc::new(RwLock::new(
BlockCommitmentCache::default_with_blockstore(blockstore.clone()),
));
@ -287,36 +301,6 @@ impl Validator {
)
});
let (transaction_status_sender, transaction_status_service) =
if rpc_service.is_some() && config.rpc_config.enable_rpc_transaction_history {
let (transaction_status_sender, transaction_status_receiver) = unbounded();
(
Some(transaction_status_sender),
Some(TransactionStatusService::new(
transaction_status_receiver,
blockstore.clone(),
&exit,
)),
)
} else {
(None, None)
};
let (rewards_recorder_sender, rewards_recorder_service) =
if rpc_service.is_some() && config.rpc_config.enable_rpc_transaction_history {
let (rewards_recorder_sender, rewards_receiver) = unbounded();
(
Some(rewards_recorder_sender),
Some(RewardsRecorderService::new(
rewards_receiver,
blockstore.clone(),
&exit,
)),
)
} else {
(None, None)
};
info!(
"Starting PoH: epoch={} slot={} tick_height={} blockhash={} leader={:?}",
bank.epoch(),
@ -575,14 +559,16 @@ fn new_banks_from_blockstore(
config: &ValidatorConfig,
blockstore_path: &Path,
poh_verify: bool,
exit: &Arc<AtomicBool>,
) -> (
GenesisConfig,
BankForks,
Blockstore,
Arc<Blockstore>,
Receiver<bool>,
CompletedSlotsReceiver,
LeaderScheduleCache,
Option<(Slot, Hash)>,
TransactionHistoryServices,
) {
let genesis_config =
open_genesis_config(blockstore_path, config.max_genesis_archive_unpacked_size);
@ -620,12 +606,23 @@ fn new_banks_from_blockstore(
..blockstore_processor::ProcessOptions::default()
};
let blockstore = Arc::new(blockstore);
let transaction_history_services =
if config.rpc_ports.is_some() && config.rpc_config.enable_rpc_transaction_history {
initialize_rpc_transaction_history_services(blockstore.clone(), exit)
} else {
TransactionHistoryServices::default()
};
let (mut bank_forks, mut leader_schedule_cache, snapshot_hash) = bank_forks_utils::load(
&genesis_config,
&blockstore,
config.account_paths.clone(),
config.snapshot_config.as_ref(),
process_options,
transaction_history_services
.transaction_status_sender
.clone(),
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
@ -645,6 +642,7 @@ fn new_banks_from_blockstore(
completed_slots_receiver,
leader_schedule_cache,
snapshot_hash,
transaction_history_services,
)
}
@ -708,6 +706,33 @@ fn backup_and_clear_blockstore(ledger_path: &Path, start_slot: Slot, shred_versi
drop(blockstore);
}
fn initialize_rpc_transaction_history_services(
blockstore: Arc<Blockstore>,
exit: &Arc<AtomicBool>,
) -> TransactionHistoryServices {
let (transaction_status_sender, transaction_status_receiver) = unbounded();
let transaction_status_sender = Some(transaction_status_sender);
let transaction_status_service = Some(TransactionStatusService::new(
transaction_status_receiver,
blockstore.clone(),
exit,
));
let (rewards_recorder_sender, rewards_receiver) = unbounded();
let rewards_recorder_sender = Some(rewards_recorder_sender);
let rewards_recorder_service = Some(RewardsRecorderService::new(
rewards_receiver,
blockstore,
exit,
));
TransactionHistoryServices {
transaction_status_sender,
transaction_status_service,
rewards_recorder_sender,
rewards_recorder_service,
}
}
// Return true on error, indicating the validator should exit.
fn wait_for_supermajority(
config: &ValidatorConfig,

View File

@ -604,6 +604,7 @@ fn load_bank_forks(
account_paths,
snapshot_config.as_ref(),
process_options,
None,
)
}

View File

@ -3,6 +3,7 @@ use crate::{
blockstore::Blockstore,
blockstore_processor::{
self, BlockstoreProcessorError, BlockstoreProcessorResult, ProcessOptions,
TransactionStatusSender,
},
entry::VerifyRecyclers,
leader_schedule_cache::LeaderScheduleCache,
@ -32,6 +33,7 @@ pub fn load(
account_paths: Vec<PathBuf>,
snapshot_config: Option<&SnapshotConfig>,
process_options: ProcessOptions,
transaction_status_sender: Option<TransactionStatusSender>,
) -> LoadResult {
if let Some(snapshot_config) = snapshot_config.as_ref() {
info!(
@ -84,6 +86,7 @@ pub fn load(
Arc::new(deserialized_bank),
&process_options,
&VerifyRecyclers::default(),
transaction_status_sender,
),
Some(deserialized_snapshot_hash),
);

View File

@ -294,7 +294,7 @@ pub fn process_blockstore(
info!("processing ledger for slot 0...");
let recyclers = VerifyRecyclers::default();
process_bank_0(&bank0, blockstore, &opts, &recyclers)?;
process_blockstore_from_root(genesis_config, blockstore, bank0, &opts, &recyclers)
process_blockstore_from_root(genesis_config, blockstore, bank0, &opts, &recyclers, None)
}
// Process blockstore from a known root bank
@ -304,6 +304,7 @@ pub fn process_blockstore_from_root(
bank: Arc<Bank>,
opts: &ProcessOptions,
recyclers: &VerifyRecyclers,
transaction_status_sender: Option<TransactionStatusSender>,
) -> BlockstoreProcessorResult {
info!("processing ledger from slot {}...", bank.slot());
let allocated = thread_mem_usage::Allocatedp::default();
@ -368,6 +369,7 @@ pub fn process_blockstore_from_root(
&mut root,
opts,
recyclers,
transaction_status_sender,
)?;
(initial_forks, leader_schedule_cache)
} else {
@ -456,6 +458,7 @@ fn confirm_full_slot(
opts: &ProcessOptions,
recyclers: &VerifyRecyclers,
progress: &mut ConfirmationProgress,
transaction_status_sender: Option<TransactionStatusSender>,
) -> result::Result<(), BlockstoreProcessorError> {
let mut timing = ConfirmationTiming::default();
let skip_verification = !opts.poh_verify;
@ -465,7 +468,7 @@ fn confirm_full_slot(
&mut timing,
progress,
skip_verification,
None,
transaction_status_sender,
opts.entry_callback.as_ref(),
recyclers,
)?;
@ -629,8 +632,8 @@ fn process_bank_0(
) -> result::Result<(), BlockstoreProcessorError> {
assert_eq!(bank0.slot(), 0);
let mut progress = ConfirmationProgress::new(bank0.last_blockhash());
confirm_full_slot(blockstore, bank0, opts, recyclers, &mut progress)
.expect("processing for bank 0 must succceed");
confirm_full_slot(blockstore, bank0, opts, recyclers, &mut progress, None)
.expect("processing for bank 0 must succeed");
bank0.freeze();
Ok(())
}
@ -702,6 +705,7 @@ fn load_frozen_forks(
root: &mut Slot,
opts: &ProcessOptions,
recyclers: &VerifyRecyclers,
transaction_status_sender: Option<TransactionStatusSender>,
) -> result::Result<Vec<Arc<Bank>>, BlockstoreProcessorError> {
let mut initial_forks = HashMap::new();
let mut last_status_report = Instant::now();
@ -741,7 +745,16 @@ fn load_frozen_forks(
let initial_allocation = allocated.get();
let mut progress = ConfirmationProgress::new(last_entry_hash);
if process_single_slot(blockstore, &bank, opts, recyclers, &mut progress).is_err() {
if process_single_slot(
blockstore,
&bank,
opts,
recyclers,
&mut progress,
transaction_status_sender.clone(),
)
.is_err()
{
continue;
}
txs += progress.num_txs;
@ -788,10 +801,11 @@ fn process_single_slot(
opts: &ProcessOptions,
recyclers: &VerifyRecyclers,
progress: &mut ConfirmationProgress,
transaction_status_sender: Option<TransactionStatusSender>,
) -> result::Result<(), BlockstoreProcessorError> {
// Mark corrupt slots as dead so validators don't replay this slot and
// see DuplicateSignature errors later in ReplayStage
confirm_full_slot(blockstore, bank, opts, recyclers, progress).map_err(|err| {
confirm_full_slot(blockstore, bank, opts, recyclers, progress, transaction_status_sender).map_err(|err| {
let slot = bank.slot();
warn!("slot {} failed to verify: {}", slot, err);
if blockstore.is_primary_access() {
@ -2418,14 +2432,21 @@ pub mod tests {
&opts,
&recyclers,
&mut ConfirmationProgress::new(bank0.last_blockhash()),
None,
)
.unwrap();
bank1.squash();
// Test process_blockstore_from_root() from slot 1 onwards
let (bank_forks, _leader_schedule) =
process_blockstore_from_root(&genesis_config, &blockstore, bank1, &opts, &recyclers)
.unwrap();
let (bank_forks, _leader_schedule) = process_blockstore_from_root(
&genesis_config,
&blockstore,
bank1,
&opts,
&recyclers,
None,
)
.unwrap();
assert_eq!(frozen_bank_slots(&bank_forks), vec![5, 6]);
assert_eq!(bank_forks.working_bank().slot(), 6);