Accounts db replication replica skeleton (#18767)

This is the first installment of the AccountsDb replication. Summary of Changes:

The rpc-node for AccountsDb replication executable cherry-picked from Steven's branch
Auto figuring out the snapshot to download via gossip
The replica now can download the snapshot, initialize the bank and AccountsDb, start the JsonRpcService
Integration test of a validator paired with a replica.
This commit is contained in:
Lijun Wang
2021-07-28 09:31:43 -07:00
committed by GitHub
parent 9d0a937a05
commit 65ccfed868
9 changed files with 1642 additions and 82 deletions

View File

@ -0,0 +1,278 @@
#![allow(clippy::integer_arithmetic)]
use {
log::*,
serial_test::serial,
solana_core::validator::ValidatorConfig,
solana_gossip::{cluster_info::Node, contact_info::ContactInfo},
solana_local_cluster::{
cluster::Cluster,
local_cluster::{ClusterConfig, LocalCluster},
validator_configs::*,
},
solana_replica_node::{
replica_node::{ReplicaNode, ReplicaNodeConfig},
replica_util,
},
solana_rpc::{rpc::JsonRpcConfig, rpc_pubsub_service::PubSubConfig},
solana_runtime::{
accounts_index::AccountSecondaryIndexes,
snapshot_config::SnapshotConfig,
snapshot_utils::{self, ArchiveFormat},
},
solana_sdk::{
client::SyncClient,
clock::Slot,
commitment_config::CommitmentConfig,
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
exit::Exit,
hash::Hash,
signature::{Keypair, Signer},
},
solana_streamer::socket::SocketAddrSpace,
std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
sync::{Arc, RwLock},
thread::sleep,
time::Duration,
},
tempfile::TempDir,
};
const RUST_LOG_FILTER: &str =
"error,solana_core::replay_stage=warn,solana_local_cluster=info,local_cluster=info";
fn wait_for_next_snapshot(
cluster: &LocalCluster,
snapshot_package_output_path: &Path,
) -> (PathBuf, (Slot, Hash)) {
// Get slot after which this was generated
let client = cluster
.get_validator_client(&cluster.entry_point_info.id)
.unwrap();
let last_slot = client
.get_slot_with_commitment(CommitmentConfig::processed())
.expect("Couldn't get slot");
// Wait for a snapshot for a bank >= last_slot to be made so we know that the snapshot
// must include the transactions just pushed
trace!(
"Waiting for snapshot archive to be generated with slot > {}",
last_slot
);
loop {
if let Some(full_snapshot_archive_info) =
snapshot_utils::get_highest_full_snapshot_archive_info(snapshot_package_output_path)
{
trace!(
"full snapshot for slot {} exists",
full_snapshot_archive_info.slot()
);
if *full_snapshot_archive_info.slot() >= last_slot {
return (
full_snapshot_archive_info.path().clone(),
(
*full_snapshot_archive_info.slot(),
*full_snapshot_archive_info.hash(),
),
);
}
trace!(
"full snapshot slot {} < last_slot {}",
full_snapshot_archive_info.slot(),
last_slot
);
}
sleep(Duration::from_millis(1000));
}
}
fn farf_dir() -> PathBuf {
std::env::var("FARF_DIR")
.unwrap_or_else(|_| "farf".to_string())
.into()
}
fn generate_account_paths(num_account_paths: usize) -> (Vec<TempDir>, Vec<PathBuf>) {
let account_storage_dirs: Vec<TempDir> = (0..num_account_paths)
.map(|_| tempfile::tempdir_in(farf_dir()).unwrap())
.collect();
let account_storage_paths: Vec<_> = account_storage_dirs
.iter()
.map(|a| a.path().to_path_buf())
.collect();
(account_storage_dirs, account_storage_paths)
}
struct SnapshotValidatorConfig {
_snapshot_dir: TempDir,
snapshot_archives_dir: TempDir,
account_storage_dirs: Vec<TempDir>,
validator_config: ValidatorConfig,
}
fn setup_snapshot_validator_config(
snapshot_interval_slots: u64,
num_account_paths: usize,
) -> SnapshotValidatorConfig {
// Create the snapshot config
let snapshot_dir = tempfile::tempdir_in(farf_dir()).unwrap();
let snapshot_archives_dir = tempfile::tempdir_in(farf_dir()).unwrap();
let snapshot_config = SnapshotConfig {
snapshot_interval_slots,
snapshot_package_output_path: snapshot_archives_dir.path().to_path_buf(),
snapshot_path: snapshot_dir.path().to_path_buf(),
archive_format: ArchiveFormat::TarBzip2,
snapshot_version: snapshot_utils::SnapshotVersion::default(),
maximum_snapshots_to_retain: snapshot_utils::DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN,
};
// Create the account paths
let (account_storage_dirs, account_storage_paths) = generate_account_paths(num_account_paths);
// Create the validator config
let validator_config = ValidatorConfig {
snapshot_config: Some(snapshot_config),
account_paths: account_storage_paths,
accounts_hash_interval_slots: snapshot_interval_slots,
..ValidatorConfig::default()
};
SnapshotValidatorConfig {
_snapshot_dir: snapshot_dir,
snapshot_archives_dir,
account_storage_dirs,
validator_config,
}
}
fn test_local_cluster_start_and_exit_with_config(socket_addr_space: SocketAddrSpace) {
solana_logger::setup();
const NUM_NODES: usize = 1;
let mut config = ClusterConfig {
validator_configs: make_identical_validator_configs(&ValidatorConfig::default(), NUM_NODES),
node_stakes: vec![3; NUM_NODES],
cluster_lamports: 100,
ticks_per_slot: 8,
slots_per_epoch: MINIMUM_SLOTS_PER_EPOCH as u64,
stakers_slot_offset: MINIMUM_SLOTS_PER_EPOCH as u64,
..ClusterConfig::default()
};
let cluster = LocalCluster::new(&mut config, socket_addr_space);
assert_eq!(cluster.validators.len(), NUM_NODES);
}
#[test]
#[serial]
fn test_replica_bootstrap() {
let socket_addr_space = SocketAddrSpace::new(true);
test_local_cluster_start_and_exit_with_config(socket_addr_space);
solana_logger::setup_with_default(RUST_LOG_FILTER);
// First set up the cluster with 1 node
let snapshot_interval_slots = 50;
let num_account_paths = 3;
let leader_snapshot_test_config =
setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths);
info!(
"Snapshot config for the leader: accounts: {:?}, snapshot: {:?}",
leader_snapshot_test_config.account_storage_dirs,
leader_snapshot_test_config.snapshot_archives_dir
);
let stake = 10_000;
let mut config = ClusterConfig {
node_stakes: vec![stake],
cluster_lamports: 1_000_000,
validator_configs: make_identical_validator_configs(
&leader_snapshot_test_config.validator_config,
1,
),
..ClusterConfig::default()
};
let cluster = LocalCluster::new(&mut config, socket_addr_space);
assert_eq!(cluster.validators.len(), 1);
let contact_info = &cluster.entry_point_info;
info!("Contact info: {:?}", contact_info);
// Get slot after which this was generated
let snapshot_package_output_path = &leader_snapshot_test_config
.validator_config
.snapshot_config
.as_ref()
.unwrap()
.snapshot_package_output_path;
info!("Waiting for snapshot");
let (archive_filename, archive_snapshot_hash) =
wait_for_next_snapshot(&cluster, snapshot_package_output_path);
info!("found: {:?}", archive_filename);
let identity_keypair = Keypair::new();
// now bring up a replica to talk to it.
let ip_addr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
let port = solana_net_utils::find_available_port_in_range(ip_addr, (8101, 8200)).unwrap();
let rpc_addr = SocketAddr::new(ip_addr, port);
let port = solana_net_utils::find_available_port_in_range(ip_addr, (8201, 8300)).unwrap();
let rpc_pubsub_addr = SocketAddr::new(ip_addr, port);
let ledger_dir = tempfile::tempdir_in(farf_dir()).unwrap();
let ledger_path = ledger_dir.path();
let snapshot_output_dir = tempfile::tempdir_in(farf_dir()).unwrap();
let snapshot_output_path = snapshot_output_dir.path();
let snapshot_path = snapshot_output_path.join("snapshot");
let account_paths: Vec<PathBuf> = vec![ledger_path.join("accounts")];
let port = solana_net_utils::find_available_port_in_range(ip_addr, (8301, 8400)).unwrap();
let gossip_addr = SocketAddr::new(ip_addr, port);
let dynamic_port_range = solana_net_utils::parse_port_range("8401-8500").unwrap();
let bind_address = solana_net_utils::parse_host("127.0.0.1").unwrap();
let node = Node::new_with_external_ip(
&identity_keypair.pubkey(),
&gossip_addr,
dynamic_port_range,
bind_address,
);
info!("The peer id: {:?}", &contact_info.id);
let entry_points = vec![ContactInfo::new_gossip_entry_point(&contact_info.gossip)];
let (cluster_info, _rpc_contact_info, _snapshot_info) = replica_util::get_rpc_peer_info(
identity_keypair,
&entry_points,
ledger_path,
&node,
None,
&contact_info.id,
snapshot_output_path,
socket_addr_space,
);
info!("The cluster info:\n{:?}", cluster_info.contact_info_trace());
let config = ReplicaNodeConfig {
rpc_source_addr: contact_info.rpc,
rpc_addr,
rpc_pubsub_addr,
ledger_path: ledger_path.to_path_buf(),
snapshot_output_dir: snapshot_output_path.to_path_buf(),
snapshot_path,
account_paths,
snapshot_info: archive_snapshot_hash,
cluster_info,
rpc_config: JsonRpcConfig::default(),
snapshot_config: None,
pubsub_config: PubSubConfig::default(),
socket_addr_space,
account_indexes: AccountSecondaryIndexes::default(),
accounts_db_caching_enabled: false,
replica_exit: Arc::new(RwLock::new(Exit::default())),
};
let _replica_node = ReplicaNode::new(config);
}