diff --git a/core/src/validator.rs b/core/src/validator.rs index cbfc5593d7..2f5346cd71 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -497,7 +497,7 @@ impl Validator { cluster_info.restore_contact_info(ledger_path, config.contact_save_interval); let cluster_info = Arc::new(cluster_info); let mut block_commitment_cache = BlockCommitmentCache::default(); - block_commitment_cache.initialize_slots(bank.slot()); + block_commitment_cache.initialize_slots(bank.slot(), bank_forks.read().unwrap().root()); let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache)); let optimistically_confirmed_bank = diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 1c1bfd3800..6ab6cb1634 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -1342,13 +1342,10 @@ fn test_validator_saves_tower() { .clone(); // Wait for some votes to be generated - let mut last_replayed_root; loop { if let Ok(slot) = validator_client.get_slot_with_commitment(CommitmentConfig::processed()) { trace!("current slot: {}", slot); if slot > 2 { - // this will be the root next time a validator starts - last_replayed_root = slot; break; } } @@ -1360,35 +1357,31 @@ fn test_validator_saves_tower() { let tower1 = Tower::restore(&ledger_path, &validator_id).unwrap(); trace!("tower1: {:?}", tower1); assert_eq!(tower1.root(), 0); + assert!(tower1.last_voted_slot().is_some()); // Restart the validator and wait for a new root cluster.restart_node(&validator_id, validator_info, SocketAddrSpace::Unspecified); let validator_client = cluster.get_validator_client(&validator_id).unwrap(); - // Wait for the first root - loop { + // Wait for the first new root + let last_replayed_root = loop { #[allow(deprecated)] // This test depends on knowing the immediate root, without any delay from the commitment // service, so the deprecated CommitmentConfig::root() is retained if let Ok(root) = validator_client.get_slot_with_commitment(CommitmentConfig::root()) { trace!("current root: {}", root); - if root > last_replayed_root + 1 { - last_replayed_root = root; - break; + if root > 0 { + break root; } } sleep(Duration::from_millis(50)); - } + }; // Stop validator, and check saved tower - let recent_slot = validator_client - .get_slot_with_commitment(CommitmentConfig::processed()) - .unwrap(); let validator_info = cluster.exit_node(&validator_id); let tower2 = Tower::restore(&ledger_path, &validator_id).unwrap(); trace!("tower2: {:?}", tower2); assert_eq!(tower2.root(), last_replayed_root); - last_replayed_root = recent_slot; // Rollback saved tower to `tower1` to simulate a validator starting from a newer snapshot // without having to wait for that snapshot to be generated in this test @@ -1398,7 +1391,7 @@ fn test_validator_saves_tower() { let validator_client = cluster.get_validator_client(&validator_id).unwrap(); // Wait for a new root, demonstrating the validator was able to make progress from the older `tower1` - loop { + let new_root = loop { #[allow(deprecated)] // This test depends on knowing the immediate root, without any delay from the commitment // service, so the deprecated CommitmentConfig::root() is retained @@ -1409,17 +1402,18 @@ fn test_validator_saves_tower() { last_replayed_root ); if root > last_replayed_root { - break; + break root; } } sleep(Duration::from_millis(50)); - } + }; // Check the new root is reflected in the saved tower state let mut validator_info = cluster.exit_node(&validator_id); let tower3 = Tower::restore(&ledger_path, &validator_id).unwrap(); trace!("tower3: {:?}", tower3); - assert!(tower3.root() > last_replayed_root); + let tower3_root = tower3.root(); + assert!(tower3_root >= new_root); // Remove the tower file entirely and allow the validator to start without a tower. It will // rebuild tower from its vote account contents @@ -1429,26 +1423,25 @@ fn test_validator_saves_tower() { cluster.restart_node(&validator_id, validator_info, SocketAddrSpace::Unspecified); let validator_client = cluster.get_validator_client(&validator_id).unwrap(); - // Wait for a couple more slots to pass so another vote occurs - let current_slot = validator_client - .get_slot_with_commitment(CommitmentConfig::processed()) - .unwrap(); - loop { - if let Ok(slot) = validator_client.get_slot_with_commitment(CommitmentConfig::processed()) { - trace!("current_slot: {}, slot: {}", current_slot, slot); - if slot > current_slot + 1 { - break; + // Wait for another new root + let new_root = loop { + #[allow(deprecated)] + // This test depends on knowing the immediate root, without any delay from the commitment + // service, so the deprecated CommitmentConfig::root() is retained + if let Ok(root) = validator_client.get_slot_with_commitment(CommitmentConfig::root()) { + trace!("current root: {}, last tower root: {}", root, tower3_root); + if root > tower3_root { + break root; } } sleep(Duration::from_millis(50)); - } + }; cluster.close_preserve_ledgers(); let tower4 = Tower::restore(&ledger_path, &validator_id).unwrap(); trace!("tower4: {:?}", tower4); - // should tower4 advance 1 slot compared to tower3???? - assert_eq!(tower4.root(), tower3.root() + 1); + assert!(tower4.root() >= new_root); } fn root_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option { diff --git a/replica-node/src/replica_node.rs b/replica-node/src/replica_node.rs new file mode 100644 index 0000000000..7f2a55f048 --- /dev/null +++ b/replica-node/src/replica_node.rs @@ -0,0 +1,346 @@ +use { + crate::accountsdb_repl_service::AccountsDbReplService, + crossbeam_channel::unbounded, + log::*, + solana_download_utils::download_snapshot_archive, + solana_genesis_utils::download_then_check_genesis_hash, + solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, + solana_ledger::{ + blockstore::Blockstore, blockstore_db::BlockstoreOptions, blockstore_processor, + leader_schedule_cache::LeaderScheduleCache, + }, + solana_replica_lib::accountsdb_repl_client::AccountsDbReplClientServiceConfig, + solana_rpc::{ + max_slots::MaxSlots, + optimistically_confirmed_bank_tracker::{ + OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker, + }, + rpc::JsonRpcConfig, + rpc_pubsub_service::{PubSubConfig, PubSubService}, + rpc_service::JsonRpcService, + rpc_subscriptions::RpcSubscriptions, + }, + solana_runtime::{ + accounts_index::AccountSecondaryIndexes, bank_forks::BankForks, + commitment::BlockCommitmentCache, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, + snapshot_config::SnapshotConfig, snapshot_package::SnapshotType, snapshot_utils, + }, + solana_sdk::{clock::Slot, exit::Exit, genesis_config::GenesisConfig, hash::Hash}, + solana_send_transaction_service::send_transaction_service, + solana_streamer::socket::SocketAddrSpace, + std::{ + fs, + net::SocketAddr, + path::PathBuf, + sync::{ + atomic::{AtomicBool, AtomicU64}, + Arc, RwLock, + }, + }, +}; + +pub struct ReplicaNodeConfig { + pub rpc_peer_addr: SocketAddr, + pub accountsdb_repl_peer_addr: Option, + pub rpc_addr: SocketAddr, + pub rpc_pubsub_addr: SocketAddr, + pub ledger_path: PathBuf, + pub snapshot_archives_dir: PathBuf, + pub bank_snapshots_dir: PathBuf, + pub account_paths: Vec, + pub snapshot_info: (Slot, Hash), + pub cluster_info: Arc, + pub rpc_config: JsonRpcConfig, + pub snapshot_config: Option, + pub pubsub_config: PubSubConfig, + pub account_indexes: AccountSecondaryIndexes, + pub accounts_db_caching_enabled: bool, + pub replica_exit: Arc>, + pub socket_addr_space: SocketAddrSpace, +} + +pub struct ReplicaNode { + json_rpc_service: Option, + pubsub_service: Option, + optimistically_confirmed_bank_tracker: Option, + accountsdb_repl_service: Option, +} + +// Struct maintaining information about banks +struct ReplicaBankInfo { + bank_forks: Arc>, + optimistically_confirmed_bank: Arc>, + leader_schedule_cache: Arc, + block_commitment_cache: Arc>, +} + +// Initialize the replica by downloading snapshot from the peer, initialize +// the BankForks, OptimisticallyConfirmedBank, LeaderScheduleCache and +// BlockCommitmentCache and return the info wrapped as ReplicaBankInfo. +fn initialize_from_snapshot( + replica_config: &ReplicaNodeConfig, + snapshot_config: &SnapshotConfig, + genesis_config: &GenesisConfig, +) -> ReplicaBankInfo { + info!( + "Downloading snapshot from the peer into {:?}", + replica_config.snapshot_archives_dir + ); + + download_snapshot_archive( + &replica_config.rpc_peer_addr, + &replica_config.snapshot_archives_dir, + replica_config.snapshot_info, + SnapshotType::FullSnapshot, + snapshot_config.maximum_full_snapshot_archives_to_retain, + snapshot_config.maximum_incremental_snapshot_archives_to_retain, + false, + &mut None, + ) + .unwrap(); + + fs::create_dir_all(&snapshot_config.bank_snapshots_dir) + .expect("Couldn't create bank snapshot directory"); + + let archive_info = snapshot_utils::get_highest_full_snapshot_archive_info( + &replica_config.snapshot_archives_dir, + ) + .unwrap(); + + let process_options = blockstore_processor::ProcessOptions { + account_indexes: replica_config.account_indexes.clone(), + accounts_db_caching_enabled: replica_config.accounts_db_caching_enabled, + ..blockstore_processor::ProcessOptions::default() + }; + + info!( + "Build bank from snapshot archive: {:?}", + &snapshot_config.bank_snapshots_dir + ); + let (bank0, _) = snapshot_utils::bank_from_snapshot_archives( + &replica_config.account_paths, + &snapshot_config.bank_snapshots_dir, + &archive_info, + None, + genesis_config, + process_options.debug_keys.clone(), + None, + process_options.account_indexes.clone(), + process_options.accounts_db_caching_enabled, + process_options.limit_load_slot_count_from_snapshot, + process_options.shrink_ratio, + process_options.accounts_db_test_hash_calculation, + false, + process_options.verify_index, + process_options.accounts_db_config, + None, + ) + .unwrap(); + + let bank0_slot = bank0.slot(); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0)); + + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0))); + + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + + let mut block_commitment_cache = BlockCommitmentCache::default(); + block_commitment_cache.initialize_slots(bank0_slot, bank0_slot); + let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache)); + + ReplicaBankInfo { + bank_forks, + optimistically_confirmed_bank, + leader_schedule_cache, + block_commitment_cache, + } +} + +fn start_client_rpc_services( + replica_config: &ReplicaNodeConfig, + genesis_config: &GenesisConfig, + cluster_info: Arc, + bank_info: &ReplicaBankInfo, + socket_addr_space: &SocketAddrSpace, +) -> ( + Option, + Option, + Option, +) { + let ReplicaBankInfo { + bank_forks, + optimistically_confirmed_bank, + leader_schedule_cache, + block_commitment_cache, + } = bank_info; + let blockstore = Arc::new( + Blockstore::open_with_options( + &replica_config.ledger_path, + BlockstoreOptions { + enforce_ulimit_nofile: false, + ..BlockstoreOptions::default() + }, + ) + .unwrap(), + ); + + let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(0)); + + let max_slots = Arc::new(MaxSlots::default()); + let exit = Arc::new(AtomicBool::new(false)); + + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + max_complete_transaction_status_slot.clone(), + blockstore.clone(), + bank_forks.clone(), + block_commitment_cache.clone(), + optimistically_confirmed_bank.clone(), + )); + + let rpc_override_health_check = Arc::new(AtomicBool::new(false)); + if ContactInfo::is_valid_address(&replica_config.rpc_addr, socket_addr_space) { + assert!(ContactInfo::is_valid_address( + &replica_config.rpc_pubsub_addr, + socket_addr_space + )); + } else { + assert!(!ContactInfo::is_valid_address( + &replica_config.rpc_pubsub_addr, + socket_addr_space + )); + } + + let (trigger, pubsub_service) = PubSubService::new( + replica_config.pubsub_config.clone(), + &subscriptions, + replica_config.rpc_pubsub_addr, + ); + replica_config + .replica_exit + .write() + .unwrap() + .register_exit(Box::new(move || trigger.cancel())); + + let (_bank_notification_sender, bank_notification_receiver) = unbounded(); + ( + Some(JsonRpcService::new( + replica_config.rpc_addr, + replica_config.rpc_config.clone(), + replica_config.snapshot_config.clone(), + bank_forks.clone(), + block_commitment_cache.clone(), + blockstore, + cluster_info, + None, + genesis_config.hash(), + &replica_config.ledger_path, + replica_config.replica_exit.clone(), + None, + rpc_override_health_check, + optimistically_confirmed_bank.clone(), + send_transaction_service::Config { + retry_rate_ms: 0, + leader_forward_count: 0, + ..send_transaction_service::Config::default() + }, + max_slots, + leader_schedule_cache.clone(), + max_complete_transaction_status_slot, + )), + Some(pubsub_service), + Some(OptimisticallyConfirmedBankTracker::new( + bank_notification_receiver, + &exit, + bank_forks.clone(), + optimistically_confirmed_bank.clone(), + subscriptions, + None, + )), + ) +} + +impl ReplicaNode { + pub fn new(replica_config: ReplicaNodeConfig) -> Self { + let genesis_config = download_then_check_genesis_hash( + &replica_config.rpc_peer_addr, + &replica_config.ledger_path, + None, + MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, + false, + true, + ) + .unwrap(); + + let snapshot_config = SnapshotConfig { + full_snapshot_archive_interval_slots: Slot::MAX, + incremental_snapshot_archive_interval_slots: Slot::MAX, + snapshot_archives_dir: replica_config.snapshot_archives_dir.clone(), + bank_snapshots_dir: replica_config.bank_snapshots_dir.clone(), + ..SnapshotConfig::default() + }; + + let bank_info = + initialize_from_snapshot(&replica_config, &snapshot_config, &genesis_config); + + let (json_rpc_service, pubsub_service, optimistically_confirmed_bank_tracker) = + start_client_rpc_services( + &replica_config, + &genesis_config, + replica_config.cluster_info.clone(), + &bank_info, + &replica_config.socket_addr_space, + ); + + let accountsdb_repl_client_config = AccountsDbReplClientServiceConfig { + worker_threads: 1, + replica_server_addr: replica_config.accountsdb_repl_peer_addr.unwrap(), + }; + + let last_replicated_slot = bank_info.bank_forks.read().unwrap().root_bank().slot(); + info!( + "Starting AccountsDbReplService from slot {:?}", + last_replicated_slot + ); + let accountsdb_repl_service = Some( + AccountsDbReplService::new(last_replicated_slot, accountsdb_repl_client_config) + .expect("Failed to start AccountsDb replication service"), + ); + + info!( + "Started AccountsDbReplService from slot {:?}", + last_replicated_slot + ); + + ReplicaNode { + json_rpc_service, + pubsub_service, + optimistically_confirmed_bank_tracker, + accountsdb_repl_service, + } + } + + pub fn join(self) { + if let Some(json_rpc_service) = self.json_rpc_service { + json_rpc_service.join().expect("rpc_service"); + } + + if let Some(pubsub_service) = self.pubsub_service { + pubsub_service.join().expect("pubsub_service"); + } + + if let Some(optimistically_confirmed_bank_tracker) = + self.optimistically_confirmed_bank_tracker + { + optimistically_confirmed_bank_tracker + .join() + .expect("optimistically_confirmed_bank_tracker"); + } + if let Some(accountsdb_repl_service) = self.accountsdb_repl_service { + accountsdb_repl_service + .join() + .expect("accountsdb_repl_service"); + } + } +} diff --git a/runtime/src/commitment.rs b/runtime/src/commitment.rs index cb420e05bb..192bdbd280 100644 --- a/runtime/src/commitment.rs +++ b/runtime/src/commitment.rs @@ -193,9 +193,9 @@ impl BlockCommitmentCache { self.commitment_slots.highest_confirmed_root = root; } - pub fn initialize_slots(&mut self, slot: Slot) { + pub fn initialize_slots(&mut self, slot: Slot, root: Slot) { self.commitment_slots.slot = slot; - self.commitment_slots.root = slot; + self.commitment_slots.root = root; } pub fn set_all_slots(&mut self, slot: Slot, root: Slot) {