diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 1fe8490fcf..69758e46e7 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -7,6 +7,7 @@ use crate::cluster_info::FULLNODE_PORT_RANGE; use crate::contact_info::ContactInfo; use crate::service::Service; use crate::streamer; +use rand::{thread_rng, Rng}; use solana_client::thin_client::{create_client, ThinClient}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -107,6 +108,7 @@ pub fn discover( )) } +/// Creates a ThinClient per valid node pub fn get_clients(nodes: &[ContactInfo]) -> Vec { nodes .iter() @@ -115,6 +117,16 @@ pub fn get_clients(nodes: &[ContactInfo]) -> Vec { .collect() } +/// Creates a ThinClient by selecting a valid node at random +pub fn get_client(nodes: &[ContactInfo]) -> ThinClient { + let nodes: Vec<_> = nodes + .iter() + .filter_map(ContactInfo::valid_client_facing_addr) + .collect(); + let select = thread_rng().gen_range(0, nodes.len()); + create_client(nodes[select], FULLNODE_PORT_RANGE) +} + fn spy( spy_ref: Arc>, num_nodes: Option, diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 5864d5e499..8e33db522d 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -206,8 +206,9 @@ impl Replicator { &exit, ); - info!("Looking for leader at {:?}", cluster_entrypoint); - crate::gossip_service::discover_nodes(&cluster_entrypoint.gossip, 1)?; + info!("Connecting to the cluster via {:?}", cluster_entrypoint); + let nodes = crate::gossip_service::discover_nodes(&cluster_entrypoint.gossip, 1)?; + let client = crate::gossip_service::get_client(&nodes); let (storage_blockhash, storage_slot) = Self::poll_for_blockhash_and_slot(&cluster_info)?; @@ -242,30 +243,33 @@ impl Replicator { &Hash::default(), ); - let client = create_client(cluster_entrypoint.client_facing_addr(), FULLNODE_PORT_RANGE); Self::setup_mining_account(&client, &keypair, &storage_keypair)?; - let mut thread_handles = create_request_processor(node.sockets.storage.unwrap(), &exit, slot); // receive blobs from retransmit and drop them. - let exit2 = exit.clone(); - let t_retransmit = spawn(move || loop { - let _ = retransmit_receiver.recv_timeout(Duration::from_secs(1)); - if exit2.load(Ordering::Relaxed) { - break; - } - }); + let t_retransmit = { + let exit = exit.clone(); + spawn(move || loop { + let _ = retransmit_receiver.recv_timeout(Duration::from_secs(1)); + if exit.load(Ordering::Relaxed) { + break; + } + }) + }; thread_handles.push(t_retransmit); - let exit3 = exit.clone(); - let blocktree1 = blocktree.clone(); - let t_replicate = spawn(move || loop { - Self::wait_for_ledger_download(slot, &blocktree1, &exit3, &node_info, &cluster_info); - if exit3.load(Ordering::Relaxed) { - break; - } - }); + let t_replicate = { + let exit = exit.clone(); + let blocktree = blocktree.clone(); + spawn(move || loop { + Self::wait_for_ledger_download(slot, &blocktree, &exit, &node_info, &cluster_info); + if exit.load(Ordering::Relaxed) { + break; + } + }) + }; + //always push this last thread_handles.push(t_replicate); Ok(Self { @@ -291,6 +295,8 @@ impl Replicator { } pub fn run(&mut self) { + info!("waiting for ledger download"); + self.thread_handles.pop().unwrap().join().unwrap(); self.encrypt_ledger() .expect("ledger encrypt not successful"); loop { @@ -310,7 +316,7 @@ impl Replicator { node_info: &ContactInfo, cluster_info: &Arc>, ) { - info!("window created, waiting for ledger download done"); + info!("window created, waiting for ledger download"); let mut _received_so_far = 0; let mut current_slot = start_slot; diff --git a/multinode-demo/common.sh b/multinode-demo/common.sh index d0547b65be..b65c098b94 100644 --- a/multinode-demo/common.sh +++ b/multinode-demo/common.sh @@ -36,6 +36,9 @@ else program=${BASH_REMATCH[1]} features+="cuda," fi + if [[ $program = replicator ]]; then + features+="chacha," + fi if [[ -r "$SOLANA_ROOT/$program"/Cargo.toml ]]; then maybe_package="--package solana-$program" @@ -60,6 +63,7 @@ solana_gossip=$(solana_program gossip) solana_keygen=$(solana_program keygen) solana_ledger_tool=$(solana_program ledger-tool) solana_wallet=$(solana_program wallet) +solana_replicator=$(solana_program replicator) export RUST_LOG=${RUST_LOG:-solana=info} # if RUST_LOG is unset, default to info export RUST_BACKTRACE=1 diff --git a/multinode-demo/fullnode.sh b/multinode-demo/fullnode.sh index 4c9800368d..1a631c8a75 100755 --- a/multinode-demo/fullnode.sh +++ b/multinode-demo/fullnode.sh @@ -15,9 +15,11 @@ fullnode_usage() { echo fi cat </dev/null 2>&1 && wait "$pid"' INT TERM ERR - if ! $bootstrap_leader && ((stake)); then + if [[ $node_type = validator ]] && ((stake)); then setup_vote_account "${entrypoint_address%:*}" "$fullnode_keypair_path" "$fullnode_vote_keypair_path" "$stake" + elif [[ $node_type = replicator ]] && ((stake)); then + setup_replicator_account "${entrypoint_address%:*}" "$replicator_keypair_path" "$stake" fi echo "$PS4$program ${args[*]}" @@ -265,7 +329,7 @@ while true; do pid=$! oom_score_adj "$pid" 1000 - if $bootstrap_leader; then + if [[ $node_type = bootstrap_leader ]]; then wait "$pid" sleep 1 else @@ -286,10 +350,10 @@ while true; do done - echo "############## New genesis detected, restarting fullnode ##############" + echo "############## New genesis detected, restarting $node_type ##############" kill "$pid" || true wait "$pid" || true - rm -rf "$ledger_config_dir" "$accounts_config_dir" "$fullnode_vote_keypair_path".configured + rm -rf "$ledger_config_dir" "$accounts_config_dir" "$fullnode_vote_keypair_path".configured "$replicator_storage_keypair_path".configured sleep 60 # give the network time to come back up fi diff --git a/multinode-demo/replicator.sh b/multinode-demo/replicator.sh new file mode 100644 index 0000000000..68102d9e89 --- /dev/null +++ b/multinode-demo/replicator.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +# +# Start a relpicator +# + +here=$(dirname "$0") +exec "$here"/fullnode.sh --replicator "$@" + + diff --git a/replicator/src/main.rs b/replicator/src/main.rs index 1b7cef8f61..911a1ccab8 100644 --- a/replicator/src/main.rs +++ b/replicator/src/main.rs @@ -39,6 +39,15 @@ fn main() { .required(true) .help("use DIR as persistent ledger location"), ) + .arg( + Arg::with_name("storage_keypair") + .short("s") + .long("storage_id") + .value_name("DIR") + .takes_value(true) + .required(true) + .help("File containing the storage account keypair"), + ) .get_matches(); let ledger_path = matches.value_of("ledger").unwrap(); @@ -51,6 +60,14 @@ fn main() { } else { Keypair::new() }; + let storage_keypair = if let Some(storage_keypair) = matches.value_of("storage_keypair") { + read_keypair(storage_keypair).unwrap_or_else(|err| { + eprintln!("{}: Unable to open keypair file: {}", err, storage_keypair); + exit(1); + }) + } else { + Keypair::new() + }; let entrypoint_addr = matches .value_of("entrypoint") @@ -74,13 +91,12 @@ fn main() { ); let entrypoint_info = ContactInfo::new_gossip_entry_point(&entrypoint_addr); - let storage_keypair = Arc::new(Keypair::new()); let mut replicator = Replicator::new( ledger_path, node, entrypoint_info, Arc::new(keypair), - storage_keypair, + Arc::new(storage_keypair), ) .unwrap();