diff --git a/ci/localnet-sanity.sh b/ci/localnet-sanity.sh index 570c77f47a..3809e1eba2 100755 --- a/ci/localnet-sanity.sh +++ b/ci/localnet-sanity.sh @@ -70,7 +70,7 @@ echo "--- Wallet sanity" echo "--- Node count" ( set -x - ./multinode-demo/client.sh "$PWD" 3 -c + ./multinode-demo/client.sh localhost --num-nodes 3 --converge-only ) || flag_error killBackgroundCommands diff --git a/multinode-demo/client.sh b/multinode-demo/client.sh index eaa41f9761..9ef5be606e 100755 --- a/multinode-demo/client.sh +++ b/multinode-demo/client.sh @@ -1,68 +1,37 @@ #!/bin/bash -e -# -USAGE=" usage: $0 [leader_url] [num_nodes] [--loop] [extra args] - - Run bench-tps against the specified network - - leader_url URL to the leader (defaults to ..) - num_nodes Minimum number of nodes to look for while converging - --loop Add this flag to cause the program to loop infinitely - \"extra args\" Any additional arguments are pass along to solana-bench-tps -" here=$(dirname "$0") # shellcheck source=multinode-demo/common.sh source "$here"/common.sh -leader=$1 -if [[ -n $leader ]]; then - if [[ $leader == "-h" || $leader == "--help" ]]; then - echo "$USAGE" - exit 0 +usage() { + if [[ -n $1 ]]; then + echo "$*" + echo fi - shift + echo "usage: $0 [network entry point] [extra args]" + echo + echo " Run bench-tps against the specified network" + echo + echo " extra args: additional arguments are pass along to solana-bench-tps" + echo + exit 1 +} + +# this is a little hacky +if [[ ${1:0:2} != "--" ]]; then + read -r _ leader_address shift < <(find_leader "${@:1:1}") else - if [[ -d "$SNAP" ]]; then - leader=testnet.solana.com # Default to testnet when running as a Snap - else - leader=$here/.. # Default to local solana repo - fi + read -r _ leader_address shift < <(find_leader) fi +shift "$shift" -count=$1 -if [[ -n $count ]]; then - shift -else - count=1 -fi -loop= -if [[ $1 = --loop ]]; then - loop=1 - shift -fi +client_json="$SOLANA_CONFIG_CLIENT_DIR"/client.json +[[ -r $client_json ]] || $solana_keygen -o "$client_json" -rsync_leader_url=$(rsync_url "$leader") -( - set -x - mkdir -p "$SOLANA_CONFIG_CLIENT_DIR" - $rsync -vPz "$rsync_leader_url"/config/leader.json "$SOLANA_CONFIG_CLIENT_DIR"/ - - client_json="$SOLANA_CONFIG_CLIENT_DIR"/client.json - [[ -r $client_json ]] || $solana_keygen -o "$client_json" -) - -iteration=0 set -x -while true; do - $solana_bench_tps \ - -n "$count" \ - -l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json \ - -k "$SOLANA_CONFIG_CLIENT_DIR"/client.json \ - "$@" - [[ -n $loop ]] || exit 0 - iteration=$((iteration + 1)) - echo ------------------------------------------------------------------------ - echo "Iteration: $iteration" - echo ------------------------------------------------------------------------ -done +$solana_bench_tps \ + --network "$leader_address" \ + --keypair "$SOLANA_CONFIG_CLIENT_DIR"/client.json \ + "$@" diff --git a/multinode-demo/common.sh b/multinode-demo/common.sh index ae2bc8a71d..03de7d6b39 100644 --- a/multinode-demo/common.sh +++ b/multinode-demo/common.sh @@ -159,3 +159,46 @@ rsync_url() { # adds the 'rsync://` prefix to URLs that need it # Default to rsync:// URL echo "rsync://$url" } + +# called from drone, validator, client +find_leader() { + declare leader leader_address + declare shift=0 + + if [[ -d $SNAP ]]; then + # Exit if mode is not yet configured + # (typically the case after the Snap is first installed) + [[ -n $(snapctl get mode) ]] || exit 0 + + # Select leader from the Snap configuration + leader_address=$(snapctl get leader-address) + if [[ -z $leader_address ]]; then + # Assume public testnet by default + leader_address=35.227.93.37:8001 # testnet.solana.com + fi + leader=$leader_address + else + if [[ -z $1 ]]; then + leader=${here}/.. # Default to local tree for rsync + leader_address=127.0.0.1:8001 # Default to local leader + elif [[ -z $2 ]]; then + leader=$1 + + declare leader_ip + leader_ip=$(dig +short "${leader%:*}" | head -n1) + + if [[ -z $leader_ip ]]; then + usage "Error: unable to resolve IP address for $leader" + fi + + leader_address=${leader_ip}:8001 + shift=1 + else + leader=$1 + leader_address=$2 + shift=2 + fi + fi + + echo "$leader" "$leader_address" "$shift" +} diff --git a/multinode-demo/drone.sh b/multinode-demo/drone.sh index f57c755931..205c0b99db 100755 --- a/multinode-demo/drone.sh +++ b/multinode-demo/drone.sh @@ -2,32 +2,25 @@ # # Starts an instance of solana-drone # -# usage: $0 -# - here=$(dirname "$0") + # shellcheck source=multinode-demo/common.sh source "$here"/common.sh -SOLANA_CONFIG_DIR="$SOLANA_CONFIG_DIR"-drone -# shellcheck source=scripts/oom-score-adj.sh -source "$here"/../scripts/oom-score-adj.sh - -if [[ -d "$SNAP" ]]; then - # Exit if mode is not yet configured - # (typically the case after the Snap is first installed) - [[ -n "$(snapctl get mode)" ]] || exit 0 - - # Select leader from the Snap configuration - leader_address="$(snapctl get leader-address)" - if [[ -z "$leader_address" ]]; then - # Assume drone is running on the same node as the leader by default - leader_address="localhost" +usage() { + if [[ -n $1 ]]; then + echo "$*" + echo fi - leader="$leader_address" -else - leader=${1:-${here}/..} # Default to local tree for data -fi + echo "usage: $0 [network entry point]" + echo + echo " Run an airdrop drone for the specified network" + echo + exit 1 +} + +read -r _ leader_address shift < <(find_leader "${@:1:1}") +shift "$shift" [[ -f "$SOLANA_CONFIG_PRIVATE_DIR"/mint.json ]] || { echo "$SOLANA_CONFIG_PRIVATE_DIR/mint.json not found, create it by running:" @@ -36,17 +29,12 @@ fi exit 1 } -rsync_leader_url=$(rsync_url "$leader") set -ex -mkdir -p "$SOLANA_CONFIG_DIR" -$rsync -vPz "$rsync_leader_url"/config/leader.json "$SOLANA_CONFIG_DIR"/ - trap 'kill "$pid" && wait "$pid"' INT TERM $solana_drone \ - -l "$SOLANA_CONFIG_DIR"/leader.json -k "$SOLANA_CONFIG_PRIVATE_DIR"/mint.json \ - --timeout 120 \ + --keypair "$SOLANA_CONFIG_PRIVATE_DIR"/mint.json \ + --network "$leader_address" \ > >($drone_logger) 2>&1 & pid=$! -oom_score_adj "$pid" 1000 wait "$pid" diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index d8cd2c229c..999c10f365 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -14,9 +14,12 @@ usage() { echo "$*" echo fi - echo "usage: $0 [-x] [rsync network path to solana repo on leader machine] [network ip address of leader]" - echo "" - echo " -x: runs a new, dynamically-configured validator" + echo "usage: $0 [-x] [rsync network path to leader] [network entry point]" + echo + echo " Start a validator on the specified network" + echo + echo " -x: runs a new, dynamically-configured validator" + echo exit 1 } @@ -35,34 +38,8 @@ if [[ -n $3 ]]; then usage fi -if [[ -d $SNAP ]]; then - # Exit if mode is not yet configured - # (typically the case after the Snap is first installed) - [[ -n $(snapctl get mode) ]] || exit 0 - - # Select leader from the Snap configuration - leader_address=$(snapctl get leader-address) - if [[ -z $leader_address ]]; then - # Assume public testnet by default - leader_address=35.227.93.37 # testnet.solana.com - fi - leader=$leader_address -else - if [[ -z $1 ]]; then - leader=${1:-${here}/..} # Default to local tree for data - leader_address=${2:-127.0.0.1} # Default to local leader - elif [[ -z $2 ]]; then - leader=$1 - leader_address=$(dig +short "${leader%:*}" | head -n1) - if [[ -z $leader_address ]]; then - usage "Error: unable to resolve IP address for $leader" - fi - else - leader=$1 - leader_address=$2 - fi -fi -leader_port=8001 +read -r leader leader_address shift < <(find_leader "${@:1:2}") +shift "$shift" if [[ -n $SOLANA_CUDA ]]; then program=$solana_fullnode_cuda @@ -109,7 +86,7 @@ $rsync -vPr "$rsync_leader_url"/config/ "$SOLANA_LEADER_CONFIG_DIR" trap 'kill "$pid" && wait "$pid"' INT TERM $program \ --identity "$validator_json_path" \ - --testnet "$leader_address:$leader_port" \ + --network "$leader_address" \ --ledger "$SOLANA_LEADER_CONFIG_DIR"/ledger \ > >($validator_logger) 2>&1 & pid=$! diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index c7dca9fa41..473fe9dddf 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -12,21 +12,19 @@ use rayon::prelude::*; use solana::client::mk_client; use solana::crdt::{Crdt, NodeInfo}; use solana::drone::DRONE_PORT; -use solana::fullnode::Config; use solana::hash::Hash; use solana::logger; use solana::metrics; use solana::ncp::Ncp; use solana::service::Service; use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil}; -use solana::thin_client::ThinClient; +use solana::thin_client::{poll_gossip_for_leader, ThinClient}; use solana::timing::{duration_as_ms, duration_as_s}; use solana::transaction::Transaction; use solana::wallet::request_airdrop; use solana::window::default_window; use std::collections::VecDeque; -use std::fs::File; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::net::SocketAddr; use std::process::exit; use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering}; use std::sync::{Arc, RwLock}; @@ -282,6 +280,7 @@ fn airdrop_tokens(client: &mut ThinClient, leader: &NodeInfo, id: &Keypair, tx_c let starting_balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0); metrics_submit_token_balance(starting_balance); + println!("starting balance {}", starting_balance); if starting_balance < tx_count { let airdrop_amount = tx_count - starting_balance; @@ -299,13 +298,14 @@ fn airdrop_tokens(client: &mut ThinClient, leader: &NodeInfo, id: &Keypair, tx_c let mut current_balance = starting_balance; for _ in 0..20 { sleep(Duration::from_millis(500)); - current_balance = client - .poll_get_balance(&id.pubkey()) - .unwrap_or(starting_balance); + current_balance = client.poll_get_balance(&id.pubkey()).unwrap_or_else(|e| { + println!("airdrop error {}", e); + starting_balance + }); if starting_balance != current_balance { break; } - println!("."); + println!("current balance {}...", current_balance); } metrics_submit_token_balance(current_balance); if current_balance - starting_balance != airdrop_amount { @@ -394,12 +394,13 @@ fn main() { let matches = App::new("solana-bench-tps") .version(crate_version!()) .arg( - Arg::with_name("leader") - .short("l") - .long("leader") - .value_name("PATH") + Arg::with_name("network") + .short("n") + .long("network") + .value_name("HOST:PORT") .takes_value(true) - .help("/path/to/leader.json"), + .required(true) + .help("rendezvous with the network at this gossip entry point"), ) .arg( Arg::with_name("keypair") @@ -411,32 +412,33 @@ fn main() { .help("/path/to/id.json"), ) .arg( - Arg::with_name("num_nodes") - .short("n") - .long("nodes") - .value_name("NUMBER") + Arg::with_name("num-nodes") + .short("N") + .long("num-nodes") + .value_name("NUM") .takes_value(true) - .help("number of nodes to converge to"), + .help("wait for NUM nodes to converge"), ) .arg( Arg::with_name("threads") .short("t") .long("threads") - .value_name("NUMBER") + .value_name("NUM") .takes_value(true) .help("number of threads"), ) .arg( Arg::with_name("seconds") .short("s") - .long("sec") - .value_name("NUMBER") + .long("seconds") + .value_name("NUM") .takes_value(true) .help("send transactions for this many seconds"), ) .arg( - Arg::with_name("converge_only") + Arg::with_name("converge-only") .short("c") + .long("converge-only") .help("exit immediately after converging"), ) .arg( @@ -453,13 +455,14 @@ fn main() { ) .get_matches(); - let leader: NodeInfo; - if let Some(l) = matches.value_of("leader") { - leader = read_leader(l).node_info; - } else { - let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); - leader = NodeInfo::new_leader(&server_addr); - }; + let network = matches + .value_of("network") + .unwrap() + .parse() + .unwrap_or_else(|e| { + eprintln!("failed to parse network: {}", e); + exit(1) + }); let id = read_keypair(matches.value_of("keypair").unwrap()).expect("client keypair"); @@ -483,6 +486,8 @@ fn main() { sustained = true; } + let leader = poll_gossip_for_leader(network, None).expect("unable to find leader on network"); + let exit_signal = Arc::new(AtomicBool::new(false)); let mut c_threads = vec![]; let (validators, leader) = converge(&leader, &exit_signal, num_nodes, &mut c_threads); @@ -510,9 +515,10 @@ fn main() { exit(1); } - if matches.is_present("converge_only") { + if matches.is_present("converge-only") { return; } + let leader = leader.unwrap(); println!("leader is at {} {}", leader.contact_info.rpu, leader.id); @@ -678,7 +684,7 @@ fn converge( .unwrap() .table .values() - .filter(|x| Crdt::is_valid_address(x.contact_info.rpu)) + .filter(|x| Crdt::is_valid_address(&x.contact_info.rpu)) .cloned() .collect(); @@ -698,8 +704,3 @@ fn converge( let leader = spy_ref.read().unwrap().leader_data().cloned(); (v, leader) } - -fn read_leader(path: &str) -> Config { - let file = File::open(path).unwrap_or_else(|_| panic!("file not found: {}", path)); - serde_json::from_reader(file).unwrap_or_else(|_| panic!("failed to parse {}", path)) -} diff --git a/src/bin/drone.rs b/src/bin/drone.rs index f19cd1f99c..b6c9cb091d 100644 --- a/src/bin/drone.rs +++ b/src/bin/drone.rs @@ -10,35 +10,43 @@ extern crate tokio_codec; use bincode::{deserialize, serialize}; use bytes::Bytes; use clap::{App, Arg}; -use solana::crdt::NodeInfo; use solana::drone::{Drone, DroneRequest, DRONE_PORT}; -use solana::fullnode::Config; use solana::logger; use solana::metrics::set_panic_hook; use solana::signature::read_keypair; use solana::thin_client::poll_gossip_for_leader; use std::error; -use std::fs::File; -use std::io; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::net::{Ipv4Addr, SocketAddr}; +use std::process::exit; use std::sync::{Arc, Mutex}; use std::thread; use tokio::net::TcpListener; use tokio::prelude::*; use tokio_codec::{BytesCodec, Decoder}; +macro_rules! socketaddr { + ($ip:expr, $port:expr) => { + SocketAddr::from((Ipv4Addr::from($ip), $port)) + }; + ($str:expr) => {{ + let a: SocketAddr = $str.parse().unwrap(); + a + }}; +} + fn main() -> Result<(), Box> { logger::setup(); set_panic_hook("drone"); let matches = App::new("drone") .version(crate_version!()) .arg( - Arg::with_name("leader") - .short("l") - .long("leader") - .value_name("PATH") + Arg::with_name("network") + .short("n") + .long("network") + .value_name("HOST:PORT") .takes_value(true) - .help("/path/to/leader.json"), + .required(true) + .help("rendezvous with the network at this gossip entry point"), ) .arg( Arg::with_name("keypair") @@ -47,7 +55,7 @@ fn main() -> Result<(), Box> { .value_name("PATH") .takes_value(true) .required(true) - .help("/path/to/mint.json"), + .help("File to read the client's keypair from"), ) .arg( Arg::with_name("slice") @@ -72,39 +80,40 @@ fn main() -> Result<(), Box> { ) .get_matches(); - let leader: NodeInfo; - if let Some(l) = matches.value_of("leader") { - leader = read_leader(l).node_info; - } else { - let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); - leader = NodeInfo::new_leader(&server_addr); - }; + let network = matches + .value_of("network") + .unwrap() + .parse() + .unwrap_or_else(|e| { + eprintln!("failed to parse network: {}", e); + exit(1) + }); let mint_keypair = - read_keypair(matches.value_of("keypair").expect("keypair")).expect("client keypair"); + read_keypair(matches.value_of("keypair").unwrap()).expect("failed to read client keypair"); let time_slice: Option; if let Some(secs) = matches.value_of("slice") { - time_slice = Some(secs.to_string().parse().expect("integer")); + time_slice = Some(secs.to_string().parse().expect("failed to parse slice")); } else { time_slice = None; } let request_cap: Option; if let Some(c) = matches.value_of("cap") { - request_cap = Some(c.to_string().parse().expect("integer")); + request_cap = Some(c.to_string().parse().expect("failed to parse cap")); } else { request_cap = None; } let timeout: Option; if let Some(secs) = matches.value_of("timeout") { - timeout = Some(secs.to_string().parse().expect("integer")); + timeout = Some(secs.to_string().parse().expect("failed to parse timeout")); } else { timeout = None; } - let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout)?; + let leader = poll_gossip_for_leader(network, timeout)?; - let drone_addr: SocketAddr = format!("0.0.0.0:{}", DRONE_PORT).parse().unwrap(); + let drone_addr = socketaddr!(0, DRONE_PORT); let drone = Arc::new(Mutex::new(Drone::new( mint_keypair, @@ -172,8 +181,3 @@ fn main() -> Result<(), Box> { tokio::run(done); Ok(()) } - -fn read_leader(path: &str) -> Config { - let file = File::open(path).unwrap_or_else(|_| panic!("file not found: {}", path)); - serde_json::from_reader(file).unwrap_or_else(|_| panic!("failed to parse {}", path)) -} diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 64e119cb64..338facf7f7 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -1,23 +1,27 @@ #[macro_use] extern crate clap; extern crate getopts; +#[macro_use] extern crate log; extern crate serde_json; +#[macro_use] extern crate solana; use clap::{App, Arg}; use solana::client::mk_client; -use solana::crdt::{Node, NodeInfo}; +use solana::crdt::Node; use solana::drone::DRONE_PORT; use solana::fullnode::{Config, Fullnode}; use solana::logger; use solana::metrics::set_panic_hook; use solana::service::Service; use solana::signature::{Keypair, KeypairUtil}; +use solana::thin_client::poll_gossip_for_leader; use solana::wallet::request_airdrop; use std::fs::File; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::net::{Ipv4Addr, SocketAddr}; use std::process::exit; +use std::thread::sleep; use std::time::Duration; fn main() -> () { @@ -34,12 +38,12 @@ fn main() -> () { .help("run with the identity found in FILE"), ) .arg( - Arg::with_name("testnet") - .short("t") - .long("testnet") + Arg::with_name("network") + .short("n") + .long("network") .value_name("HOST:PORT") .takes_value(true) - .help("connect to the network at this gossip entry point"), + .help("connect/rendezvous with the network at this gossip entry point"), ) .arg( Arg::with_name("ledger") @@ -52,16 +56,12 @@ fn main() -> () { ) .get_matches(); - let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); - let mut keypair = Keypair::new(); - let mut repl_data = NodeInfo::new_leader_with_pubkey(keypair.pubkey(), &bind_addr); - if let Some(i) = matches.value_of("identity") { + let (keypair, ncp) = if let Some(i) = matches.value_of("identity") { let path = i.to_string(); if let Ok(file) = File::open(path.clone()) { let parse: serde_json::Result = serde_json::from_reader(file); if let Ok(data) = parse { - keypair = data.keypair(); - repl_data = data.node_info; + (data.keypair(), data.node_info.contact_info.ncp) } else { eprintln!("failed to parse {}", path); exit(1); @@ -70,60 +70,62 @@ fn main() -> () { eprintln!("failed to read {}", path); exit(1); } - } - - let leader_pubkey = keypair.pubkey(); + } else { + (Keypair::new(), socketaddr!(0, 8000)) + }; let ledger_path = matches.value_of("ledger").unwrap(); - let port_range = (8100, 10000); - let node = if let Some(_t) = matches.value_of("testnet") { - Node::new_with_external_ip( - leader_pubkey, - repl_data.contact_info.ncp.ip(), - port_range, - 0, - ) - } else { - Node::new_with_external_ip( - leader_pubkey, - repl_data.contact_info.ncp.ip(), - port_range, - repl_data.contact_info.ncp.port(), - ) + // socketaddr that is initial pointer into the network's gossip (ncp) + let network = matches + .value_of("network") + .map(|network| network.parse().expect("failed to parse network address")); + + let node = Node::new_with_external_ip(keypair.pubkey(), &ncp); + + // save off some stuff for airdrop + let node_info = node.info.clone(); + let pubkey = keypair.pubkey(); + + let fullnode = Fullnode::new(node, ledger_path, keypair, network, false); + + // airdrop stuff, probably goes away at some point + let leader = match network { + Some(network) => { + poll_gossip_for_leader(network, None).expect("can't find leader on network") + } + None => node_info, }; - let repl_clone = node.data.clone(); - let mut drone_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), DRONE_PORT); - let testnet_addr = matches.value_of("testnet").map(|addr_str| { - let addr: SocketAddr = addr_str.parse().unwrap(); - drone_addr.set_ip(addr.ip()); - addr - }); - let fullnode = Fullnode::new(node, ledger_path, keypair, testnet_addr, false); + let mut client = mk_client(&leader); - let mut client = mk_client(&repl_clone); - let previous_balance = client.poll_get_balance(&leader_pubkey).unwrap_or(0); - eprintln!("balance is {}", previous_balance); + // TODO: maybe have the drone put itself in gossip somewhere instead of hardcoding? + let drone_addr = match network { + Some(network) => SocketAddr::new(network.ip(), DRONE_PORT), + None => SocketAddr::new(ncp.ip(), DRONE_PORT), + }; - if previous_balance == 0 { - eprintln!("requesting airdrop from {}", drone_addr); - request_airdrop(&drone_addr, &leader_pubkey, 50).unwrap_or_else(|_| { - panic!( - "Airdrop failed, is the drone address correct {:?} drone running?", + loop { + let balance = client.poll_get_balance(&pubkey).unwrap_or(0); + info!("balance is {}", balance); + + if balance >= 50 { + info!("good to go!"); + break; + } + + info!("requesting airdrop from {}", drone_addr); + loop { + if request_airdrop(&drone_addr, &pubkey, 50).is_ok() { + break; + } + info!( + "airdrop request, is the drone address correct {:?}, drone running?", drone_addr - ) - }); - - let balance_ok = client - .poll_balance_with_timeout( - &leader_pubkey, - &Duration::from_millis(100), - &Duration::from_secs(30), - ) - .unwrap() > 0; - assert!(balance_ok, "0 balance, airdrop failed?"); + ); + sleep(Duration::from_secs(2)); + } } - fullnode.join().expect("join"); + fullnode.join().expect("to never happen"); } diff --git a/src/bin/wallet.rs b/src/bin/wallet.rs index 09e07b68db..aac5e8908f 100644 --- a/src/bin/wallet.rs +++ b/src/bin/wallet.rs @@ -5,6 +5,7 @@ extern crate bs58; extern crate clap; extern crate dirs; extern crate serde_json; +#[macro_use] extern crate solana; use clap::{App, Arg, SubCommand}; @@ -63,9 +64,9 @@ struct WalletConfig { impl Default for WalletConfig { fn default() -> WalletConfig { - let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); + let default_addr = socketaddr!(0, 8000); WalletConfig { - leader: NodeInfo::new_leader(&default_addr), + leader: NodeInfo::new_with_socketaddr(&default_addr), id: Keypair::new(), drone_addr: default_addr, command: WalletCommand::Balance, @@ -150,7 +151,7 @@ fn parse_args() -> Result> { leader = read_leader(l)?.node_info; } else { let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); - leader = NodeInfo::new_leader(&server_addr); + leader = NodeInfo::new_with_socketaddr(&server_addr); }; let timeout: Option; if let Some(secs) = matches.value_of("timeout") { diff --git a/src/crdt.rs b/src/crdt.rs index 42764d0236..2084bf789d 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -12,7 +12,6 @@ //! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes. //! //! Bank needs to provide an interface for us to query the stake weight - use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt}; use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy}; @@ -48,6 +47,7 @@ const GOSSIP_PURGE_MILLIS: u64 = 15000; /// minimum membership table size before we start purging dead nodes const MIN_TABLE_SIZE: usize = 2; +#[macro_export] macro_rules! socketaddr { ($ip:expr, $port:expr) => { SocketAddr::from((Ipv4Addr::from($ip), $port)) @@ -57,6 +57,7 @@ macro_rules! socketaddr { a }}; } +#[macro_export] macro_rules! socketaddr_any { () => { socketaddr!(0, 0) @@ -164,7 +165,7 @@ impl NodeInfo { nxt_addr.set_port(addr.port() + nxt); nxt_addr } - pub fn new_leader_with_pubkey(pubkey: Pubkey, bind_addr: &SocketAddr) -> Self { + pub fn new_with_pubkey_socketaddr(pubkey: Pubkey, bind_addr: &SocketAddr) -> Self { let transactions_addr = *bind_addr; let gossip_addr = Self::next_port(&bind_addr, 1); let replicate_addr = Self::next_port(&bind_addr, 2); @@ -177,13 +178,14 @@ impl NodeInfo { transactions_addr, ) } - pub fn new_leader(bind_addr: &SocketAddr) -> Self { + pub fn new_with_socketaddr(bind_addr: &SocketAddr) -> Self { let keypair = Keypair::new(); - Self::new_leader_with_pubkey(keypair.pubkey(), bind_addr) + Self::new_with_pubkey_socketaddr(keypair.pubkey(), bind_addr) } - pub fn new_entry_point(gossip_addr: SocketAddr) -> Self { + // + pub fn new_entry_point(gossip_addr: &SocketAddr) -> Self { let daddr: SocketAddr = socketaddr!("0.0.0.0:0"); - NodeInfo::new(Pubkey::default(), gossip_addr, daddr, daddr, daddr) + NodeInfo::new(Pubkey::default(), *gossip_addr, daddr, daddr, daddr) } } @@ -460,7 +462,7 @@ impl Crdt { if me.id == v.id { //filter myself false - } else if !(Self::is_valid_address(v.contact_info.tvu)) { + } else if !(Self::is_valid_address(&v.contact_info.tvu)) { trace!( "{:x}:broadcast skip not listening {:x} {}", me.debug_id(), @@ -624,7 +626,7 @@ impl Crdt { } else if me.leader_id == v.id { trace!("skip retransmit to leader {:?}", v.id); false - } else if !(Self::is_valid_address(v.contact_info.tvu)) { + } else if !(Self::is_valid_address(&v.contact_info.tvu)) { trace!( "skip nodes that are not listening {:?} {}", v.id, @@ -691,8 +693,8 @@ impl Crdt { .values() .filter(|r| { r.id != Pubkey::default() - && (Self::is_valid_address(r.contact_info.tpu) - || Self::is_valid_address(r.contact_info.tvu)) + && (Self::is_valid_address(&r.contact_info.tpu) + || Self::is_valid_address(&r.contact_info.tvu)) }) .map(|x| x.ledger_state.last_id) .collect() @@ -704,7 +706,7 @@ impl Crdt { let valid: Vec<_> = self .table .values() - .filter(|r| r.id != self.id && Self::is_valid_address(r.contact_info.tvu)) + .filter(|r| r.id != self.id && Self::is_valid_address(&r.contact_info.tvu)) .collect(); if valid.is_empty() { Err(CrdtError::NoPeers)?; @@ -1002,7 +1004,7 @@ impl Crdt { match deserialize(&blob.data[..blob.meta.size]) { Ok(request) => Crdt::handle_protocol( obj, - blob.meta.addr(), + &blob.meta.addr(), request, window, ledger_window, @@ -1017,7 +1019,7 @@ impl Crdt { fn handle_protocol( me: &Arc>, - from_addr: SocketAddr, + from_addr: &SocketAddr, request: Protocol, window: &SharedWindow, ledger_window: &mut Option<&mut LedgerWindow>, @@ -1051,7 +1053,7 @@ impl Crdt { // an unspecified address in our table if from.contact_info.ncp.ip().is_unspecified() { inc_new_counter_info!("crdt-window-request-updates-unspec-ncp", 1); - from.contact_info.ncp = from_addr; + from.contact_info.ncp = *from_addr; } let (from_id, ups, data, liveness) = { @@ -1247,7 +1249,7 @@ impl Crdt { /// port must not be 0 /// ip must be specified and not mulitcast /// loopback ip is only allowed in tests - pub fn is_valid_address(addr: SocketAddr) -> bool { + pub fn is_valid_address(addr: &SocketAddr) -> bool { (addr.port() != 0) && Self::is_valid_ip(addr.ip()) } @@ -1273,7 +1275,7 @@ pub struct Sockets { } pub struct Node { - pub data: NodeInfo, + pub info: NodeInfo, pub sockets: Sockets, } @@ -1292,7 +1294,7 @@ impl Node { let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); - let data = NodeInfo::new( + let info = NodeInfo::new( pubkey, gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), @@ -1300,7 +1302,7 @@ impl Node { transaction.local_addr().unwrap(), ); Node { - data, + info, sockets: Sockets { gossip, requests, @@ -1313,17 +1315,12 @@ impl Node { }, } } - pub fn new_with_external_ip( - pubkey: Pubkey, - ip: IpAddr, - port_range: (u16, u16), - ncp_port: u16, - ) -> Node { - fn bind(port_range: (u16, u16)) -> (u16, UdpSocket) { - match bind_in_range(port_range) { + pub fn new_with_external_ip(pubkey: Pubkey, ncp: &SocketAddr) -> Node { + fn bind() -> (u16, UdpSocket) { + match bind_in_range(SOLANA_PORT_RANGE) { Ok(socket) => (socket.local_addr().unwrap().port(), socket), Err(err) => { - panic!("Failed to bind to {:?}", err); + panic!("Failed to bind err: {}", err); } } }; @@ -1333,39 +1330,40 @@ impl Node { match UdpSocket::bind(addr) { Ok(socket) => socket, Err(err) => { - panic!("Failed to bind to {:?}: {:?}", addr, err); + panic!("Failed to bind to {:?}, err: {}", addr, err); } } }; - let (gossip_port, gossip) = if ncp_port != 0 { - (ncp_port, bind_to(ncp_port)) + let (gossip_port, gossip) = if ncp.port() != 0 { + (ncp.port(), bind_to(ncp.port())) } else { - bind(port_range) + bind() }; - let (replicate_port, replicate) = bind(port_range); - let (requests_port, requests) = bind(port_range); - let (transaction_port, transaction) = bind(port_range); - let (_, repair) = bind(port_range); - let (_, broadcast) = bind(port_range); - let (_, retransmit) = bind(port_range); + let (replicate_port, replicate) = bind(); + let (requests_port, requests) = bind(); + let (transaction_port, transaction) = bind(); + + let (_, repair) = bind(); + let (_, broadcast) = bind(); + let (_, retransmit) = bind(); // Responses are sent from the same Udp port as requests are received // from, in hopes that a NAT sitting in the middle will route the // response Udp packet correctly back to the requester. let respond = requests.try_clone().unwrap(); - let node_info = NodeInfo::new( + let info = NodeInfo::new( pubkey, - SocketAddr::new(ip, gossip_port), - SocketAddr::new(ip, replicate_port), - SocketAddr::new(ip, requests_port), - SocketAddr::new(ip, transaction_port), + SocketAddr::new(ncp.ip(), gossip_port), + SocketAddr::new(ncp.ip(), replicate_port), + SocketAddr::new(ncp.ip(), requests_port), + SocketAddr::new(ncp.ip(), transaction_port), ); Node { - data: node_info, + info, sockets: Sockets { gossip, requests, @@ -1391,7 +1389,7 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) { mod tests { use crdt::{ Crdt, CrdtError, Node, NodeInfo, Protocol, GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS, - MIN_TABLE_SIZE, + MIN_TABLE_SIZE, SOLANA_PORT_RANGE, }; use entry::Entry; use hash::{hash, Hash}; @@ -1444,11 +1442,11 @@ mod tests { } #[test] fn test_new_vote() { - let d = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); + let d = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()).unwrap(); assert_eq!(crdt.table[&d.id].version, 0); - let leader = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1235")); + let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1235")); assert_ne!(d.id, leader.id); assert_matches!( crdt.new_vote(Hash::default()).err(), @@ -1471,7 +1469,7 @@ mod tests { #[test] fn test_insert_vote() { - let d = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); + let d = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()).unwrap(); assert_eq!(crdt.table[&d.id].version, 0); @@ -1504,9 +1502,9 @@ mod tests { copy } #[test] - fn replicated_data_new_leader_with_pubkey() { + fn replicated_data_new_with_socketaddr_with_pubkey() { let keypair = Keypair::new(); - let d1 = NodeInfo::new_leader_with_pubkey( + let d1 = NodeInfo::new_with_pubkey_socketaddr( keypair.pubkey().clone(), &socketaddr!("127.0.0.1:1234"), ); @@ -1567,7 +1565,7 @@ mod tests { sorted(&crdt2.table.values().map(|x| x.clone()).collect()), sorted(&crdt.table.values().map(|x| x.clone()).collect()) ); - let d4 = NodeInfo::new_entry_point(socketaddr!("127.0.0.4:1234")); + let d4 = NodeInfo::new_entry_point(&socketaddr!("127.0.0.4:1234")); crdt.insert(&d4); let (_key, _ix, ups) = crdt.get_updates_since(0); assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3])); @@ -1671,7 +1669,7 @@ mod tests { let rv = crdt.gossip_request().unwrap(); assert_eq!(rv.0, nxt1.contact_info.ncp); - let nxt2 = NodeInfo::new_entry_point(socketaddr!("127.0.0.3:1234")); + let nxt2 = NodeInfo::new_entry_point(&socketaddr!("127.0.0.3:1234")); crdt.insert(&nxt2); // check that the service works // and that it eventually produces a request for both nodes @@ -1712,9 +1710,9 @@ mod tests { #[test] fn purge_test() { logger::setup(); - let me = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); + let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); - let nxt = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1234")); + let nxt = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234")); assert_ne!(me.id, nxt.id); crdt.set_leader(me.id); crdt.insert(&nxt); @@ -1733,7 +1731,7 @@ mod tests { let rv = crdt.gossip_request().unwrap(); assert_eq!(rv.0, nxt.contact_info.ncp); - let mut nxt2 = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1234")); + let mut nxt2 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234")); assert_ne!(me.id, nxt2.id); assert_ne!(nxt.id, nxt2.id); crdt.insert(&nxt2); @@ -1755,14 +1753,14 @@ mod tests { #[test] fn purge_leader_test() { logger::setup(); - let me = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); + let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); - let nxt = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1234")); + let nxt = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234")); assert_ne!(me.id, nxt.id); crdt.insert(&nxt); crdt.set_leader(nxt.id); let now = crdt.alive[&nxt.id]; - let mut nxt2 = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1234")); + let mut nxt2 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234")); crdt.insert(&nxt2); while now == crdt.alive[&nxt2.id] { sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); @@ -1865,10 +1863,10 @@ mod tests { fn run_window_request_with_backoff() { let window = default_window(); - let mut me = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); + let mut me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); me.leader_id = me.id; - let mock_peer = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); + let mock_peer = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); let recycler = BlobRecycler::default(); @@ -1916,16 +1914,16 @@ mod tests { #[test] fn test_update_leader() { logger::setup(); - let me = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); - let leader0 = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); - let leader1 = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); + let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); + let leader0 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); + let leader1 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); assert_eq!(crdt.top_leader(), None); crdt.set_leader(leader0.id); assert_eq!(crdt.top_leader().unwrap(), leader0.id); //add a bunch of nodes with a new leader for _ in 0..10 { - let mut dum = NodeInfo::new_entry_point(socketaddr!("127.0.0.1:1234")); + let mut dum = NodeInfo::new_entry_point(&socketaddr!("127.0.0.1:1234")); dum.id = Keypair::new().pubkey(); dum.leader_id = leader1.id; crdt.insert(&dum); @@ -1941,12 +1939,12 @@ mod tests { #[test] fn test_valid_last_ids() { logger::setup(); - let mut leader0 = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1234")); + let mut leader0 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234")); leader0.ledger_state.last_id = hash(b"0"); let mut leader1 = NodeInfo::new_multicast(); leader1.ledger_state.last_id = hash(b"1"); let mut leader2 = - NodeInfo::new_leader_with_pubkey(Pubkey::default(), &socketaddr!("127.0.0.2:1234")); + NodeInfo::new_with_pubkey_socketaddr(Pubkey::default(), &socketaddr!("127.0.0.2:1234")); leader2.ledger_state.last_id = hash(b"2"); // test that only valid tvu or tpu are retured as nodes let mut leader3 = NodeInfo::new( @@ -1973,10 +1971,10 @@ mod tests { let window = default_window(); let recycler = BlobRecycler::default(); - let node = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); - let node_with_same_addr = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); + let node = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); + let node_with_same_addr = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); assert_ne!(node.id, node_with_same_addr.id); - let node_with_diff_addr = NodeInfo::new_leader(&socketaddr!("127.0.0.1:4321")); + let node_with_diff_addr = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:4321")); let crdt = Crdt::new(node.clone()).expect("Crdt::new"); assert_eq!(crdt.alive.len(), 0); @@ -1987,7 +1985,7 @@ mod tests { assert!( Crdt::handle_protocol( &obj, - node.contact_info.ncp, + &node.contact_info.ncp, request, &window, &mut None, @@ -1999,7 +1997,7 @@ mod tests { assert!( Crdt::handle_protocol( &obj, - node.contact_info.ncp, + &node.contact_info.ncp, request, &window, &mut None, @@ -2010,7 +2008,7 @@ mod tests { let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone()); Crdt::handle_protocol( &obj, - node.contact_info.ncp, + &node.contact_info.ncp, request, &window, &mut None, @@ -2031,13 +2029,13 @@ mod tests { fn test_is_valid_address() { assert!(cfg!(test)); let bad_address_port = socketaddr!("127.0.0.1:0"); - assert!(!Crdt::is_valid_address(bad_address_port)); + assert!(!Crdt::is_valid_address(&bad_address_port)); let bad_address_unspecified = socketaddr!(0, 1234); - assert!(!Crdt::is_valid_address(bad_address_unspecified)); + assert!(!Crdt::is_valid_address(&bad_address_unspecified)); let bad_address_multicast = socketaddr!([224, 254, 0, 0], 1234); - assert!(!Crdt::is_valid_address(bad_address_multicast)); + assert!(!Crdt::is_valid_address(&bad_address_multicast)); let loopback = socketaddr!("127.0.0.1:1234"); - assert!(Crdt::is_valid_address(loopback)); + assert!(Crdt::is_valid_address(&loopback)); // assert!(!Crdt::is_valid_ip_internal(loopback.ip(), false)); } @@ -2052,37 +2050,37 @@ mod tests { socketaddr!("127.0.0.1:1237"), ); let mut crdt = Crdt::new(node_info).unwrap(); - let network_entry_point = NodeInfo::new_entry_point(socketaddr!("127.0.0.1:1239")); + let network_entry_point = NodeInfo::new_entry_point(&socketaddr!("127.0.0.1:1239")); crdt.insert(&network_entry_point); assert!(crdt.leader_data().is_none()); } #[test] fn new_with_external_ip_test_random() { - let ip = IpAddr::V4(Ipv4Addr::from(0)); - let node = Node::new_with_external_ip(Keypair::new().pubkey(), ip, (8100, 8200), 0); + let ip = Ipv4Addr::from(0); + let node = Node::new_with_external_ip(Keypair::new().pubkey(), &socketaddr!(ip, 0)); assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.replicate.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.transaction.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip); - assert!(node.sockets.gossip.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.gossip.local_addr().unwrap().port() < 8200); - assert!(node.sockets.replicate.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.replicate.local_addr().unwrap().port() < 8200); - assert!(node.sockets.requests.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.requests.local_addr().unwrap().port() < 8200); - assert!(node.sockets.transaction.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.transaction.local_addr().unwrap().port() < 8200); - assert!(node.sockets.repair.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.repair.local_addr().unwrap().port() < 8200); + assert!(node.sockets.gossip.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0); + assert!(node.sockets.gossip.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1); + assert!(node.sockets.replicate.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0); + assert!(node.sockets.replicate.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1); + assert!(node.sockets.requests.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0); + assert!(node.sockets.requests.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1); + assert!(node.sockets.transaction.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0); + assert!(node.sockets.transaction.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1); + assert!(node.sockets.repair.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0); + assert!(node.sockets.repair.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1); } #[test] fn new_with_external_ip_test_gossip() { let ip = IpAddr::V4(Ipv4Addr::from(0)); - let node = Node::new_with_external_ip(Keypair::new().pubkey(), ip, (8100, 8200), 8050); + let node = Node::new_with_external_ip(Keypair::new().pubkey(), &socketaddr!(0, 8050)); assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.replicate.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip); @@ -2090,13 +2088,13 @@ mod tests { assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.gossip.local_addr().unwrap().port(), 8050); - assert!(node.sockets.replicate.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.replicate.local_addr().unwrap().port() < 8200); - assert!(node.sockets.requests.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.requests.local_addr().unwrap().port() < 8200); - assert!(node.sockets.transaction.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.transaction.local_addr().unwrap().port() < 8200); - assert!(node.sockets.repair.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.repair.local_addr().unwrap().port() < 8200); + assert!(node.sockets.replicate.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0); + assert!(node.sockets.replicate.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1); + assert!(node.sockets.requests.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0); + assert!(node.sockets.requests.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1); + assert!(node.sockets.transaction.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0); + assert!(node.sockets.transaction.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1); + assert!(node.sockets.repair.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0); + assert!(node.sockets.repair.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1); } } diff --git a/src/drone.rs b/src/drone.rs index 88c61282a3..94f64b2498 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -283,7 +283,7 @@ mod tests { let bob_pubkey = Keypair::new().pubkey(); let carlos_pubkey = Keypair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let leader_data = leader.data.clone(); + let leader_data = leader.info.clone(); let ledger_path = tmp_ledger_path("send_airdrop"); let server = Fullnode::new_with_bank( diff --git a/src/fullnode.rs b/src/fullnode.rs index 1794b82dca..f92662da4c 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -39,7 +39,7 @@ impl Config { let keypair = Keypair::from_pkcs8(Input::from(&pkcs8)).expect("from_pkcs8 in fullnode::Config new"); let pubkey = keypair.pubkey(); - let node_info = NodeInfo::new_leader_with_pubkey(pubkey, bind_addr); + let node_info = NodeInfo::new_with_pubkey_socketaddr(pubkey, bind_addr); Config { node_info, pkcs8 } } pub fn keypair(&self) -> Keypair { @@ -74,12 +74,12 @@ impl Fullnode { let local_gossip_addr = node.sockets.gossip.local_addr().unwrap(); info!( "starting... local gossip address: {} (advertising {})", - local_gossip_addr, node.data.contact_info.ncp + local_gossip_addr, node.info.contact_info.ncp ); let exit = Arc::new(AtomicBool::new(false)); let local_requests_addr = node.sockets.requests.local_addr().unwrap(); - let requests_addr = node.data.contact_info.rpu; - let leader_info = leader_addr.map(NodeInfo::new_entry_point); + let requests_addr = node.info.contact_info.rpu; + let leader_info = leader_addr.map(|i| NodeInfo::new_entry_point(&i)); let server = Self::new_with_bank( keypair, bank, @@ -172,7 +172,7 @@ impl Fullnode { sigverify_disabled: bool, ) -> Self { if leader_info.is_none() { - node.data.leader_id = node.data.id; + node.info.leader_id = node.info.id; } let bank = Arc::new(bank); @@ -186,12 +186,13 @@ impl Fullnode { ); thread_hdls.extend(rpu.thread_hdls()); - let mut drone_addr = node.data.contact_info.tpu; + // TODO: this code assumes this node is the leader + let mut drone_addr = node.info.contact_info.tpu; drone_addr.set_port(DRONE_PORT); - let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), RPC_PORT); + let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), RPC_PORT); let rpc_service = JsonRpcService::new( &bank, - node.data.contact_info.tpu, + node.info.contact_info.tpu, drone_addr, rpc_addr, exit.clone(), @@ -200,9 +201,9 @@ impl Fullnode { let blob_recycler = BlobRecycler::default(); let window = - window::new_window_from_entries(ledger_tail, entry_height, &node.data, &blob_recycler); + window::new_window_from_entries(ledger_tail, entry_height, &node.info, &blob_recycler); - let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new"))); + let crdt = Arc::new(RwLock::new(Crdt::new(node.info).expect("Crdt::new"))); let ncp = Ncp::new( &crdt, @@ -216,6 +217,7 @@ impl Fullnode { match leader_info { Some(leader_info) => { // Start in validator mode. + // TODO: let Crdt get that data from the network? crdt.write().unwrap().insert(leader_info); let tvu = Tvu::new( keypair, @@ -307,7 +309,7 @@ mod tests { let alice = Mint::new(10_000); let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); - let entry = tn.data.clone(); + let entry = tn.info.clone(); let v = Fullnode::new_with_bank(keypair, bank, 0, &[], tn, Some(&entry), exit, None, false); v.exit(); v.join().unwrap(); @@ -321,7 +323,7 @@ mod tests { let alice = Mint::new(10_000); let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); - let entry = tn.data.clone(); + let entry = tn.info.clone(); Fullnode::new_with_bank(keypair, bank, 0, &[], tn, Some(&entry), exit, None, false) }) .collect(); diff --git a/src/ncp.rs b/src/ncp.rs index 7ebdc3f757..bef2191487 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -92,7 +92,7 @@ mod tests { fn test_exit() { let exit = Arc::new(AtomicBool::new(false)); let tn = Node::new_localhost(); - let crdt = Crdt::new(tn.data.clone()).expect("Crdt::new"); + let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new"); let c = Arc::new(RwLock::new(crdt)); let w = Arc::new(RwLock::new(vec![])); let d = Ncp::new(&c, w, None, tn.sockets.gossip, exit.clone()).unwrap(); diff --git a/src/thin_client.rs b/src/thin_client.rs index c1a49bbc91..6e770cfece 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -64,8 +64,8 @@ impl ThinClient { pub fn recv_response(&self) -> io::Result { let mut buf = vec![0u8; 1024]; trace!("start recv_from"); - self.requests_socket.recv_from(&mut buf)?; - trace!("end recv_from"); + let (len, from) = self.requests_socket.recv_from(&mut buf)?; + trace!("end recv_from got {} {}", len, from); deserialize(&buf).or_else(|_| Err(io::Error::new(io::ErrorKind::Other, "deserialize"))) } @@ -161,7 +161,7 @@ impl ThinClient { /// until the server sends a response. If the response packet is dropped /// by the network, this method will hang indefinitely. pub fn get_balance(&mut self, pubkey: &Pubkey) -> io::Result { - trace!("get_balance"); + trace!("get_balance sending request to {}", self.requests_addr); let req = Request::GetAccount { key: *pubkey }; let data = serialize(&req).expect("serialize GetAccount in pub fn get_balance"); self.requests_socket @@ -367,7 +367,7 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option) -> R let crdt = Arc::new(RwLock::new(Crdt::new(node).expect("Crdt::new"))); let window = Arc::new(RwLock::new(vec![])); let ncp = Ncp::new(&crdt.clone(), window, None, gossip_socket, exit.clone()).unwrap(); - let leader_entry_point = NodeInfo::new_entry_point(leader_ncp); + let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp); crdt.write().unwrap().insert(&leader_entry_point); sleep(Duration::from_millis(100)); @@ -420,7 +420,7 @@ mod tests { logger::setup(); let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); - let leader_data = leader.data.clone(); + let leader_data = leader.info.clone(); let alice = Mint::new(10_000); let bank = Bank::new(&alice); @@ -473,7 +473,7 @@ mod tests { let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let leader_data = leader.data.clone(); + let leader_data = leader.info.clone(); let ledger_path = tmp_ledger("bad_sig", &alice); let server = Fullnode::new_with_bank( @@ -533,7 +533,7 @@ mod tests { let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let leader_data = leader.data.clone(); + let leader_data = leader.info.clone(); let ledger_path = tmp_ledger("client_check_signature", &alice); let server = Fullnode::new_with_bank( diff --git a/src/tvu.rs b/src/tvu.rs index 31cf097bda..6f012d1248 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -187,17 +187,17 @@ pub mod tests { let exit = Arc::new(AtomicBool::new(false)); //start crdt_leader - let mut crdt_l = Crdt::new(leader.data.clone()).expect("Crdt::new"); - crdt_l.set_leader(leader.data.id); + let mut crdt_l = Crdt::new(leader.info.clone()).expect("Crdt::new"); + crdt_l.set_leader(leader.info.id); let cref_l = Arc::new(RwLock::new(crdt_l)); let dr_l = new_ncp(cref_l, leader.sockets.gossip, exit.clone()).unwrap(); //start crdt2 - let mut crdt2 = Crdt::new(target2.data.clone()).expect("Crdt::new"); - crdt2.insert(&leader.data); - crdt2.set_leader(leader.data.id); - let leader_id = leader.data.id; + let mut crdt2 = Crdt::new(target2.info.clone()).expect("Crdt::new"); + crdt2.insert(&leader.info); + crdt2.set_leader(leader.info.id); + let leader_id = leader.info.id; let cref2 = Arc::new(RwLock::new(crdt2)); let dr_2 = new_ncp(cref2, target2.sockets.gossip, exit.clone()).unwrap(); @@ -225,13 +225,13 @@ pub mod tests { let starting_balance = 10_000; let mint = Mint::new(starting_balance); - let replicate_addr = target1.data.contact_info.tvu; + let replicate_addr = target1.info.contact_info.tvu; let bank = Arc::new(Bank::new(&mint)); //start crdt1 - let mut crdt1 = Crdt::new(target1.data.clone()).expect("Crdt::new"); - crdt1.insert(&leader.data); - crdt1.set_leader(leader.data.id); + let mut crdt1 = Crdt::new(target1.info.clone()).expect("Crdt::new"); + crdt1.insert(&leader.info); + crdt1.set_leader(leader.info.id); let cref1 = Arc::new(RwLock::new(crdt1)); let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()).unwrap(); diff --git a/src/vote_stage.rs b/src/vote_stage.rs index 5ae720d966..0398ee32d7 100755 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -254,8 +254,8 @@ pub mod tests { let bank = Arc::new(Bank::new(&mint)); let node = Node::new_localhost(); - let mut crdt = Crdt::new(node.data.clone()).expect("Crdt::new"); - crdt.set_leader(node.data.id); + let mut crdt = Crdt::new(node.info.clone()).expect("Crdt::new"); + crdt.set_leader(node.info.id); let blob_recycler = BlobRecycler::default(); let (sender, receiver) = channel(); let exit = Arc::new(AtomicBool::new(false)); @@ -293,7 +293,7 @@ pub mod tests { bank.register_entry_id(&entry.id); // Create a leader - let leader_data = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let leader_data = NodeInfo::new_with_socketaddr(&"127.0.0.1:1234".parse().unwrap()); let leader_pubkey = leader_data.id.clone(); let mut leader_crdt = Crdt::new(leader_data).unwrap(); @@ -308,7 +308,7 @@ pub mod tests { // and votes for new last_id for i in 0..10 { let mut validator = - NodeInfo::new_leader(&format!("127.0.0.1:234{}", i).parse().unwrap()); + NodeInfo::new_with_socketaddr(&format!("127.0.0.1:234{}", i).parse().unwrap()); let vote = Vote { version: validator.version + 1, @@ -350,7 +350,7 @@ pub mod tests { // add two more nodes and see that it succeeds for i in 0..2 { let mut validator = - NodeInfo::new_leader(&format!("127.0.0.1:234{}", i).parse().unwrap()); + NodeInfo::new_with_socketaddr(&format!("127.0.0.1:234{}", i).parse().unwrap()); let vote = Vote { version: validator.version + 1, diff --git a/src/window.rs b/src/window.rs index 22570feeca..a503e181b3 100644 --- a/src/window.rs +++ b/src/window.rs @@ -787,7 +787,7 @@ mod test { logger::setup(); let tn = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); - let mut crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new"); + let mut crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new"); let me_id = crdt_me.my_data().id; crdt_me.set_leader(me_id); let subs = Arc::new(RwLock::new(crdt_me)); @@ -831,7 +831,7 @@ mod test { w.set_id(me_id).unwrap(); assert_eq!(i, w.get_index().unwrap()); w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&tn.data.contact_info.ncp); + w.meta.set_addr(&tn.info.contact_info.ncp); } msgs.push_back(b); } @@ -858,7 +858,7 @@ mod test { logger::setup(); let tn = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); - let crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new"); + let crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new"); let me_id = crdt_me.my_data().id; let subs = Arc::new(RwLock::new(crdt_me)); @@ -901,7 +901,7 @@ mod test { w.set_id(me_id).unwrap(); assert_eq!(i, w.get_index().unwrap()); w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&tn.data.contact_info.ncp); + w.meta.set_addr(&tn.info.contact_info.ncp); } msgs.push_back(b); } @@ -921,7 +921,7 @@ mod test { logger::setup(); let tn = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); - let crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new"); + let crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new"); let me_id = crdt_me.my_data().id; let subs = Arc::new(RwLock::new(crdt_me)); @@ -964,7 +964,7 @@ mod test { w.set_id(me_id).unwrap(); assert_eq!(i, w.get_index().unwrap()); w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&tn.data.contact_info.ncp); + w.meta.set_addr(&tn.info.contact_info.ncp); } msgs.push_back(b); } @@ -984,7 +984,7 @@ mod test { w.set_id(me_id).unwrap(); assert_eq!(i, w.get_index().unwrap()); w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&tn.data.contact_info.ncp); + w.meta.set_addr(&tn.info.contact_info.ncp); } msgs1.push_back(b); } diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index 5eed945157..fd9f8ec0f2 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -18,7 +18,7 @@ use std::time::Duration; fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { let tn = Node::new_localhost(); - let crdt = Crdt::new(tn.data.clone()).expect("Crdt::new"); + let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new"); let c = Arc::new(RwLock::new(crdt)); let w = Arc::new(RwLock::new(vec![])); let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit).unwrap(); diff --git a/tests/multinode.rs b/tests/multinode.rs index 6ecf730794..c54401401a 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -34,10 +34,10 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { let exit = Arc::new(AtomicBool::new(false)); let mut spy = Node::new_localhost(); let daddr = "0.0.0.0:0".parse().unwrap(); - let me = spy.data.id.clone(); - spy.data.contact_info.tvu = daddr; - spy.data.contact_info.rpu = daddr; - let mut spy_crdt = Crdt::new(spy.data).expect("Crdt::new"); + let me = spy.info.id.clone(); + spy.info.contact_info.tvu = daddr; + spy.info.contact_info.rpu = daddr; + let mut spy_crdt = Crdt::new(spy.info).expect("Crdt::new"); spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_crdt)); @@ -55,7 +55,7 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { .values() .into_iter() .filter(|x| x.id != me) - .filter(|x| Crdt::is_valid_address(x.contact_info.rpu)) + .filter(|x| Crdt::is_valid_address(&x.contact_info.rpu)) .cloned() .collect(); if num >= num_nodes as u64 && v.len() >= num_nodes { @@ -118,7 +118,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { let leader_keypair = Keypair::new(); let leader_pubkey = leader_keypair.pubkey().clone(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); - let leader_data = leader.data.clone(); + let leader_data = leader.info.clone(); let bob_pubkey = Keypair::new().pubkey(); let mut ledger_paths = Vec::new(); @@ -149,7 +149,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { // balances let keypair = Keypair::new(); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); - let validator_data = validator.data.clone(); + let validator_data = validator.info.clone(); let validator = Fullnode::new( validator, &zero_ledger_path, @@ -198,7 +198,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { let leader_keypair = Keypair::new(); let leader_pubkey = leader_keypair.pubkey().clone(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); - let leader_data = leader.data.clone(); + let leader_data = leader.info.clone(); let bob_pubkey = Keypair::new().pubkey(); let mut ledger_paths = Vec::new(); @@ -322,7 +322,7 @@ fn test_multi_node_basic() { let leader_keypair = Keypair::new(); let leader_pubkey = leader_keypair.pubkey().clone(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); - let leader_data = leader.data.clone(); + let leader_data = leader.info.clone(); let bob_pubkey = Keypair::new().pubkey(); let mut ledger_paths = Vec::new(); @@ -388,7 +388,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { let mut ledger_paths = Vec::new(); ledger_paths.push(leader_ledger_path.clone()); - let leader_data = leader.data.clone(); + let leader_data = leader.info.clone(); let leader_fullnode = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, false); let leader_balance = send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap(); @@ -399,7 +399,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { let keypair = Keypair::new(); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); - let validator_data = validator.data.clone(); + let validator_data = validator.info.clone(); let ledger_path = tmp_copy_ledger(&leader_ledger_path, "boot_validator_from_file"); ledger_paths.push(ledger_path.clone()); let val_fullnode = Fullnode::new( @@ -425,7 +425,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { fn create_leader(ledger_path: &str) -> (NodeInfo, Fullnode) { let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); - let leader_data = leader.data.clone(); + let leader_data = leader.info.clone(); let leader_fullnode = Fullnode::new(leader, &ledger_path, leader_keypair, None, false); (leader_data, leader_fullnode) } @@ -472,7 +472,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { // start validator from old ledger let keypair = Keypair::new(); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); - let validator_data = validator.data.clone(); + let validator_data = validator.info.clone(); let val_fullnode = Fullnode::new( validator, @@ -535,7 +535,7 @@ fn test_multi_node_dynamic_network() { ledger_paths.push(leader_ledger_path.clone()); let alice_arc = Arc::new(RwLock::new(alice)); - let leader_data = leader.data.clone(); + let leader_data = leader.info.clone(); let server = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, true); @@ -603,7 +603,7 @@ fn test_multi_node_dynamic_network() { .name("validator-launch-thread".to_string()) .spawn(move || { let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); - let rd = validator.data.clone(); + let rd = validator.info.clone(); info!("starting {} {:x}", keypair.pubkey(), rd.debug_id()); let val = Fullnode::new( validator, @@ -712,8 +712,8 @@ fn mk_client(leader: &NodeInfo) -> ThinClient { .set_read_timeout(Some(Duration::new(1, 0))) .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - assert!(Crdt::is_valid_address(leader.contact_info.rpu)); - assert!(Crdt::is_valid_address(leader.contact_info.tpu)); + assert!(Crdt::is_valid_address(&leader.contact_info.rpu)); + assert!(Crdt::is_valid_address(&leader.contact_info.tpu)); ThinClient::new( leader.contact_info.rpu, requests_socket,