* Update various terms to "known" (#21257)
* Update snapshot bootstrap terminology
* Update only known rpc terminology
* Update replica-node
(cherry picked from commit ae3777cadb
)
# Conflicts:
# replica-node/src/replica_util.rs
# validator/src/bootstrap.rs
* Fix conflicts
* Remove dangling file
Co-authored-by: Tyera Eulberg <teulberg@gmail.com>
Co-authored-by: Tyera Eulberg <tyera@solana.com>
This commit is contained in:
@ -1,340 +0,0 @@
|
||||
use {
|
||||
crate::accountsdb_repl_service::AccountsDbReplService,
|
||||
crossbeam_channel::unbounded,
|
||||
log::*,
|
||||
solana_download_utils::download_snapshot_archive,
|
||||
solana_genesis_utils::download_then_check_genesis_hash,
|
||||
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
|
||||
solana_ledger::{
|
||||
blockstore::Blockstore, blockstore_db::AccessType, blockstore_processor,
|
||||
leader_schedule_cache::LeaderScheduleCache,
|
||||
},
|
||||
solana_replica_lib::accountsdb_repl_client::AccountsDbReplClientServiceConfig,
|
||||
solana_rpc::{
|
||||
max_slots::MaxSlots,
|
||||
optimistically_confirmed_bank_tracker::{
|
||||
OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
|
||||
},
|
||||
rpc::JsonRpcConfig,
|
||||
rpc_pubsub_service::{PubSubConfig, PubSubService},
|
||||
rpc_service::JsonRpcService,
|
||||
rpc_subscriptions::RpcSubscriptions,
|
||||
},
|
||||
solana_runtime::{
|
||||
accounts_index::AccountSecondaryIndexes, bank_forks::BankForks,
|
||||
commitment::BlockCommitmentCache, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
|
||||
snapshot_config::SnapshotConfig, snapshot_package::SnapshotType, snapshot_utils,
|
||||
},
|
||||
solana_sdk::{clock::Slot, exit::Exit, genesis_config::GenesisConfig, hash::Hash},
|
||||
solana_streamer::socket::SocketAddrSpace,
|
||||
std::{
|
||||
fs,
|
||||
net::SocketAddr,
|
||||
path::PathBuf,
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicU64},
|
||||
Arc, RwLock,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
pub struct ReplicaNodeConfig {
|
||||
pub rpc_peer_addr: SocketAddr,
|
||||
pub accountsdb_repl_peer_addr: Option<SocketAddr>,
|
||||
pub rpc_addr: SocketAddr,
|
||||
pub rpc_pubsub_addr: SocketAddr,
|
||||
pub ledger_path: PathBuf,
|
||||
pub snapshot_archives_dir: PathBuf,
|
||||
pub bank_snapshots_dir: PathBuf,
|
||||
pub account_paths: Vec<PathBuf>,
|
||||
pub snapshot_info: (Slot, Hash),
|
||||
pub cluster_info: Arc<ClusterInfo>,
|
||||
pub rpc_config: JsonRpcConfig,
|
||||
pub snapshot_config: Option<SnapshotConfig>,
|
||||
pub pubsub_config: PubSubConfig,
|
||||
pub account_indexes: AccountSecondaryIndexes,
|
||||
pub accounts_db_caching_enabled: bool,
|
||||
pub replica_exit: Arc<RwLock<Exit>>,
|
||||
pub socket_addr_space: SocketAddrSpace,
|
||||
}
|
||||
|
||||
pub struct ReplicaNode {
|
||||
json_rpc_service: Option<JsonRpcService>,
|
||||
pubsub_service: Option<PubSubService>,
|
||||
optimistically_confirmed_bank_tracker: Option<OptimisticallyConfirmedBankTracker>,
|
||||
accountsdb_repl_service: Option<AccountsDbReplService>,
|
||||
}
|
||||
|
||||
// Struct maintaining information about banks
|
||||
struct ReplicaBankInfo {
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
|
||||
leader_schedule_cache: Arc<LeaderScheduleCache>,
|
||||
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
||||
}
|
||||
|
||||
// Initialize the replica by downloading snapshot from the peer, initialize
|
||||
// the BankForks, OptimisticallyConfirmedBank, LeaderScheduleCache and
|
||||
// BlockCommitmentCache and return the info wrapped as ReplicaBankInfo.
|
||||
fn initialize_from_snapshot(
|
||||
replica_config: &ReplicaNodeConfig,
|
||||
snapshot_config: &SnapshotConfig,
|
||||
genesis_config: &GenesisConfig,
|
||||
) -> ReplicaBankInfo {
|
||||
info!(
|
||||
"Downloading snapshot from the peer into {:?}",
|
||||
replica_config.snapshot_archives_dir
|
||||
);
|
||||
|
||||
download_snapshot_archive(
|
||||
&replica_config.rpc_peer_addr,
|
||||
&replica_config.snapshot_archives_dir,
|
||||
replica_config.snapshot_info,
|
||||
SnapshotType::FullSnapshot,
|
||||
snapshot_config.maximum_full_snapshot_archives_to_retain,
|
||||
snapshot_config.maximum_incremental_snapshot_archives_to_retain,
|
||||
false,
|
||||
&mut None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
fs::create_dir_all(&snapshot_config.bank_snapshots_dir)
|
||||
.expect("Couldn't create bank snapshot directory");
|
||||
|
||||
let archive_info = snapshot_utils::get_highest_full_snapshot_archive_info(
|
||||
&replica_config.snapshot_archives_dir,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let process_options = blockstore_processor::ProcessOptions {
|
||||
account_indexes: replica_config.account_indexes.clone(),
|
||||
accounts_db_caching_enabled: replica_config.accounts_db_caching_enabled,
|
||||
..blockstore_processor::ProcessOptions::default()
|
||||
};
|
||||
|
||||
info!(
|
||||
"Build bank from snapshot archive: {:?}",
|
||||
&snapshot_config.bank_snapshots_dir
|
||||
);
|
||||
let (bank0, _) = snapshot_utils::bank_from_snapshot_archives(
|
||||
&replica_config.account_paths,
|
||||
&[],
|
||||
&snapshot_config.bank_snapshots_dir,
|
||||
&archive_info,
|
||||
None,
|
||||
genesis_config,
|
||||
process_options.debug_keys.clone(),
|
||||
None,
|
||||
process_options.account_indexes.clone(),
|
||||
process_options.accounts_db_caching_enabled,
|
||||
process_options.limit_load_slot_count_from_snapshot,
|
||||
process_options.shrink_ratio,
|
||||
process_options.accounts_db_test_hash_calculation,
|
||||
false,
|
||||
process_options.verify_index,
|
||||
process_options.accounts_db_config,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let bank0_slot = bank0.slot();
|
||||
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0));
|
||||
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0)));
|
||||
|
||||
let optimistically_confirmed_bank =
|
||||
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
||||
|
||||
let mut block_commitment_cache = BlockCommitmentCache::default();
|
||||
block_commitment_cache.initialize_slots(bank0_slot);
|
||||
let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache));
|
||||
|
||||
ReplicaBankInfo {
|
||||
bank_forks,
|
||||
optimistically_confirmed_bank,
|
||||
leader_schedule_cache,
|
||||
block_commitment_cache,
|
||||
}
|
||||
}
|
||||
|
||||
fn start_client_rpc_services(
|
||||
replica_config: &ReplicaNodeConfig,
|
||||
genesis_config: &GenesisConfig,
|
||||
cluster_info: Arc<ClusterInfo>,
|
||||
bank_info: &ReplicaBankInfo,
|
||||
socket_addr_space: &SocketAddrSpace,
|
||||
) -> (
|
||||
Option<JsonRpcService>,
|
||||
Option<PubSubService>,
|
||||
Option<OptimisticallyConfirmedBankTracker>,
|
||||
) {
|
||||
let ReplicaBankInfo {
|
||||
bank_forks,
|
||||
optimistically_confirmed_bank,
|
||||
leader_schedule_cache,
|
||||
block_commitment_cache,
|
||||
} = bank_info;
|
||||
let blockstore = Arc::new(
|
||||
Blockstore::open_with_access_type(
|
||||
&replica_config.ledger_path,
|
||||
AccessType::PrimaryOnly,
|
||||
None,
|
||||
false,
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(0));
|
||||
|
||||
let max_slots = Arc::new(MaxSlots::default());
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let subscriptions = Arc::new(RpcSubscriptions::new(
|
||||
&exit,
|
||||
bank_forks.clone(),
|
||||
block_commitment_cache.clone(),
|
||||
optimistically_confirmed_bank.clone(),
|
||||
));
|
||||
|
||||
let rpc_override_health_check = Arc::new(AtomicBool::new(false));
|
||||
if ContactInfo::is_valid_address(&replica_config.rpc_addr, socket_addr_space) {
|
||||
assert!(ContactInfo::is_valid_address(
|
||||
&replica_config.rpc_pubsub_addr,
|
||||
socket_addr_space
|
||||
));
|
||||
} else {
|
||||
assert!(!ContactInfo::is_valid_address(
|
||||
&replica_config.rpc_pubsub_addr,
|
||||
socket_addr_space
|
||||
));
|
||||
}
|
||||
|
||||
let (trigger, pubsub_service) = PubSubService::new(
|
||||
replica_config.pubsub_config.clone(),
|
||||
&subscriptions,
|
||||
replica_config.rpc_pubsub_addr,
|
||||
);
|
||||
replica_config
|
||||
.replica_exit
|
||||
.write()
|
||||
.unwrap()
|
||||
.register_exit(Box::new(move || trigger.cancel()));
|
||||
|
||||
let (_bank_notification_sender, bank_notification_receiver) = unbounded();
|
||||
(
|
||||
Some(JsonRpcService::new(
|
||||
replica_config.rpc_addr,
|
||||
replica_config.rpc_config.clone(),
|
||||
replica_config.snapshot_config.clone(),
|
||||
bank_forks.clone(),
|
||||
block_commitment_cache.clone(),
|
||||
blockstore,
|
||||
cluster_info,
|
||||
None,
|
||||
genesis_config.hash(),
|
||||
&replica_config.ledger_path,
|
||||
replica_config.replica_exit.clone(),
|
||||
None,
|
||||
rpc_override_health_check,
|
||||
optimistically_confirmed_bank.clone(),
|
||||
0,
|
||||
0,
|
||||
max_slots,
|
||||
leader_schedule_cache.clone(),
|
||||
max_complete_transaction_status_slot,
|
||||
)),
|
||||
Some(pubsub_service),
|
||||
Some(OptimisticallyConfirmedBankTracker::new(
|
||||
bank_notification_receiver,
|
||||
&exit,
|
||||
bank_forks.clone(),
|
||||
optimistically_confirmed_bank.clone(),
|
||||
subscriptions,
|
||||
None,
|
||||
)),
|
||||
)
|
||||
}
|
||||
|
||||
impl ReplicaNode {
|
||||
pub fn new(replica_config: ReplicaNodeConfig) -> Self {
|
||||
let genesis_config = download_then_check_genesis_hash(
|
||||
&replica_config.rpc_peer_addr,
|
||||
&replica_config.ledger_path,
|
||||
None,
|
||||
MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
|
||||
false,
|
||||
true,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let snapshot_config = SnapshotConfig {
|
||||
full_snapshot_archive_interval_slots: Slot::MAX,
|
||||
incremental_snapshot_archive_interval_slots: Slot::MAX,
|
||||
snapshot_archives_dir: replica_config.snapshot_archives_dir.clone(),
|
||||
bank_snapshots_dir: replica_config.bank_snapshots_dir.clone(),
|
||||
..SnapshotConfig::default()
|
||||
};
|
||||
|
||||
let bank_info =
|
||||
initialize_from_snapshot(&replica_config, &snapshot_config, &genesis_config);
|
||||
|
||||
let (json_rpc_service, pubsub_service, optimistically_confirmed_bank_tracker) =
|
||||
start_client_rpc_services(
|
||||
&replica_config,
|
||||
&genesis_config,
|
||||
replica_config.cluster_info.clone(),
|
||||
&bank_info,
|
||||
&replica_config.socket_addr_space,
|
||||
);
|
||||
|
||||
let accountsdb_repl_client_config = AccountsDbReplClientServiceConfig {
|
||||
worker_threads: 1,
|
||||
replica_server_addr: replica_config.accountsdb_repl_peer_addr.unwrap(),
|
||||
};
|
||||
|
||||
let last_replicated_slot = bank_info.bank_forks.read().unwrap().root_bank().slot();
|
||||
info!(
|
||||
"Starting AccountsDbReplService from slot {:?}",
|
||||
last_replicated_slot
|
||||
);
|
||||
let accountsdb_repl_service = Some(
|
||||
AccountsDbReplService::new(last_replicated_slot, accountsdb_repl_client_config)
|
||||
.expect("Failed to start AccountsDb replication service"),
|
||||
);
|
||||
|
||||
info!(
|
||||
"Started AccountsDbReplService from slot {:?}",
|
||||
last_replicated_slot
|
||||
);
|
||||
|
||||
ReplicaNode {
|
||||
json_rpc_service,
|
||||
pubsub_service,
|
||||
optimistically_confirmed_bank_tracker,
|
||||
accountsdb_repl_service,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn join(self) {
|
||||
if let Some(json_rpc_service) = self.json_rpc_service {
|
||||
json_rpc_service.join().expect("rpc_service");
|
||||
}
|
||||
|
||||
if let Some(pubsub_service) = self.pubsub_service {
|
||||
pubsub_service.join().expect("pubsub_service");
|
||||
}
|
||||
|
||||
if let Some(optimistically_confirmed_bank_tracker) =
|
||||
self.optimistically_confirmed_bank_tracker
|
||||
{
|
||||
optimistically_confirmed_bank_tracker
|
||||
.join()
|
||||
.expect("optimistically_confirmed_bank_tracker");
|
||||
}
|
||||
if let Some(accountsdb_repl_service) = self.accountsdb_repl_service {
|
||||
accountsdb_repl_service
|
||||
.join()
|
||||
.expect("accountsdb_repl_service");
|
||||
}
|
||||
}
|
||||
}
|
@ -414,7 +414,7 @@ fn get_rpc_node(
|
||||
validator_config: &ValidatorConfig,
|
||||
blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
|
||||
snapshot_not_required: bool,
|
||||
no_untrusted_rpc: bool,
|
||||
only_known_rpc: bool,
|
||||
snapshot_output_dir: &Path,
|
||||
) -> Option<(ContactInfo, Option<(Slot, Hash)>)> {
|
||||
let mut blacklist_timeout = Instant::now();
|
||||
@ -503,7 +503,7 @@ fn get_rpc_node(
|
||||
let mut eligible_rpc_peers = vec![];
|
||||
|
||||
for rpc_peer in rpc_peers.iter() {
|
||||
if no_untrusted_rpc
|
||||
if only_known_rpc
|
||||
&& !is_known_validator(&rpc_peer.id, &validator_config.known_validators)
|
||||
{
|
||||
continue;
|
||||
@ -744,7 +744,7 @@ fn verify_reachable_ports(
|
||||
struct RpcBootstrapConfig {
|
||||
no_genesis_fetch: bool,
|
||||
no_snapshot_fetch: bool,
|
||||
no_untrusted_rpc: bool,
|
||||
only_known_rpc: bool,
|
||||
max_genesis_archive_unpacked_size: u64,
|
||||
no_check_vote_account: bool,
|
||||
}
|
||||
@ -754,7 +754,7 @@ impl Default for RpcBootstrapConfig {
|
||||
Self {
|
||||
no_genesis_fetch: true,
|
||||
no_snapshot_fetch: true,
|
||||
no_untrusted_rpc: true,
|
||||
only_known_rpc: true,
|
||||
max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
|
||||
no_check_vote_account: true,
|
||||
}
|
||||
@ -826,7 +826,7 @@ fn rpc_bootstrap(
|
||||
validator_config,
|
||||
&mut blacklisted_rpc_nodes,
|
||||
bootstrap_config.no_snapshot_fetch,
|
||||
bootstrap_config.no_untrusted_rpc,
|
||||
bootstrap_config.only_known_rpc,
|
||||
snapshot_output_dir,
|
||||
);
|
||||
if rpc_node_details.is_none() {
|
||||
@ -946,7 +946,7 @@ fn rpc_bootstrap(
|
||||
if let Some(ref known_validators) = validator_config.known_validators {
|
||||
if known_validators.contains(&rpc_contact_info.id)
|
||||
&& known_validators.len() == 1
|
||||
&& bootstrap_config.no_untrusted_rpc {
|
||||
&& bootstrap_config.only_known_rpc {
|
||||
warn!("The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, but will NOT abort \
|
||||
and try a different node as it is the only known validator and the --only-known-rpc flag \
|
||||
is set. \
|
||||
@ -1538,7 +1538,7 @@ pub fn main() {
|
||||
.help("Log when transactions are processed which reference a given key."),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("no_untrusted_rpc")
|
||||
Arg::with_name("only_known_rpc")
|
||||
.alias("no-untrusted-rpc")
|
||||
.long("only-known-rpc")
|
||||
.takes_value(false)
|
||||
@ -2258,7 +2258,7 @@ pub fn main() {
|
||||
no_genesis_fetch: matches.is_present("no_genesis_fetch"),
|
||||
no_snapshot_fetch: matches.is_present("no_snapshot_fetch"),
|
||||
no_check_vote_account: matches.is_present("no_check_vote_account"),
|
||||
no_untrusted_rpc: matches.is_present("no_untrusted_rpc"),
|
||||
only_known_rpc: matches.is_present("only_known_rpc"),
|
||||
max_genesis_archive_unpacked_size: value_t_or_exit!(
|
||||
matches,
|
||||
"max_genesis_archive_unpacked_size",
|
||||
|
Reference in New Issue
Block a user