Compare commits
40 Commits
0.7.0-rc.3
...
v0.7.0-rc.
Author | SHA1 | Date | |
---|---|---|---|
|
a6cb2f1bcf | ||
|
28af9a39b4 | ||
|
8cf5620b87 | ||
|
85d6627ee6 | ||
|
611a005ec9 | ||
|
90b3b90391 | ||
|
fd4f294fd3 | ||
|
145274c001 | ||
|
df5d6693f6 | ||
|
05c5603879 | ||
|
c2c48a5c3c | ||
|
4af556f70e | ||
|
8bad411962 | ||
|
5b0418793e | ||
|
4423ee6902 | ||
|
f0c39cc84d | ||
|
3d45b04da8 | ||
|
9e2f26a5d2 | ||
|
a016f6e82e | ||
|
eb3e5fd204 | ||
|
72282dc493 | ||
|
47a22c66b4 | ||
|
fb11d8a909 | ||
|
7d872f52f4 | ||
|
d882bfe65c | ||
|
103584ef27 | ||
|
1fb537deb9 | ||
|
2bd48b4207 | ||
|
f5a6db3dc0 | ||
|
dd0c1ac5b2 | ||
|
d8c9655128 | ||
|
09f2d273c5 | ||
|
f6eb85e7a3 | ||
|
0d85b43901 | ||
|
fdf94a77b4 | ||
|
af40ab0c04 | ||
|
015b7a1ddb | ||
|
ab3e460e64 | ||
|
194a84c8dd | ||
|
51d932dad1 |
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "solana"
|
||||
description = "Blockchain, Rebuilt for Scale"
|
||||
version = "0.7.0-rc.2"
|
||||
version = "0.7.0-rc.5"
|
||||
documentation = "https://docs.rs/solana"
|
||||
homepage = "http://solana.com/"
|
||||
readme = "README.md"
|
||||
|
@@ -32,7 +32,7 @@ fn bench_process_transaction(bencher: &mut Bencher) {
|
||||
|
||||
let rando1 = KeyPair::new();
|
||||
let tx = Transaction::new(&rando0, rando1.pubkey(), 1, last_id);
|
||||
assert!(bank.process_transaction(&tx.clone()).is_ok());
|
||||
assert!(bank.process_transaction(&tx).is_ok());
|
||||
|
||||
// Finally, return the transaction to the benchmark.
|
||||
tx
|
||||
|
@@ -2,6 +2,6 @@ FROM snapcraft/xenial-amd64
|
||||
|
||||
# Update snapcraft to latest version
|
||||
RUN apt-get update -qq \
|
||||
&& apt-get install -y snapcraft \
|
||||
&& apt-get install -y snapcraft daemontools \
|
||||
&& rm -rf /var/lib/apt/lists/* \
|
||||
&& snapcraft --version
|
||||
|
@@ -37,6 +37,12 @@ fi
|
||||
|
||||
set -x
|
||||
|
||||
echo --- checking for multilog
|
||||
if [[ ! -x /usr/bin/multilog ]]; then
|
||||
echo "multilog not found, install with: sudo apt-get install -y daemontools"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo --- build
|
||||
snapcraft
|
||||
|
||||
|
@@ -1,11 +1,13 @@
|
||||
#!/bin/bash -e
|
||||
#
|
||||
# Refreshes the Solana software running on the Testnet full nodes
|
||||
# Deploys the Solana software running on the testnet full nodes
|
||||
#
|
||||
# This script must be run by a user/machine that has successfully authenticated
|
||||
# with GCP and has sufficient permission.
|
||||
#
|
||||
|
||||
cd "$(dirname "$0")/.."
|
||||
|
||||
if [[ -z $SOLANA_METRICS_CONFIG ]]; then
|
||||
echo Error: SOLANA_METRICS_CONFIG environment variable is unset
|
||||
exit 1
|
||||
@@ -19,10 +21,12 @@ fi
|
||||
|
||||
case $SOLANA_SNAP_CHANNEL in
|
||||
edge)
|
||||
resourcePrefix=master-testnet-solana-com
|
||||
publicUrl=master.testnet.solana.com
|
||||
publicIp=$(dig +short $publicUrl | head -n1)
|
||||
;;
|
||||
beta)
|
||||
resourcePrefix=testnet-solana-com
|
||||
publicUrl=testnet.solana.com
|
||||
publicIp=# # Use default value
|
||||
;;
|
||||
*)
|
||||
echo Error: Unknown SOLANA_SNAP_CHANNEL=$SOLANA_SNAP_CHANNEL
|
||||
@@ -30,7 +34,7 @@ beta)
|
||||
;;
|
||||
esac
|
||||
|
||||
publicUrl=${resourcePrefix//-/.}
|
||||
resourcePrefix=${publicUrl//./-}
|
||||
vmlist=("$resourcePrefix":us-west1-b) # Leader is hard coded as the first entry
|
||||
validatorNamePrefix=$resourcePrefix-validator-
|
||||
|
||||
@@ -45,65 +49,87 @@ while read -r vmName vmZone status; do
|
||||
vmlist+=("$vmName:$vmZone")
|
||||
done < <(gcloud compute instances list --filter="$filter" --format 'value(name,zone,status)')
|
||||
|
||||
wait_for_node() {
|
||||
declare pid=$1
|
||||
|
||||
declare ok=true
|
||||
wait "$pid" || ok=false
|
||||
cat "log-$pid.txt"
|
||||
if ! $ok; then
|
||||
echo ^^^ +++
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
|
||||
echo "--- Refreshing leader for $publicUrl"
|
||||
leader=true
|
||||
logfiles=()
|
||||
pids=()
|
||||
count=1
|
||||
for info in "${vmlist[@]}"; do
|
||||
nodePosition="($count/${#vmlist[*]})"
|
||||
|
||||
vmName=${info%:*}
|
||||
vmZone=${info#*:}
|
||||
echo "Starting refresh for $vmName"
|
||||
echo "Starting refresh for $vmName $nodePosition"
|
||||
|
||||
(
|
||||
SECONDS=0
|
||||
echo "--- $vmName in zone $vmZone"
|
||||
echo "--- $vmName in zone $vmZone $nodePosition"
|
||||
if $leader; then
|
||||
nodeConfig="mode=leader+drone enable-cuda=1 metrics-config=$SOLANA_METRICS_CONFIG"
|
||||
nodeConfig="mode=leader+drone metrics-config=$SOLANA_METRICS_CONFIG"
|
||||
if [[ -n $SOLANA_CUDA ]]; then
|
||||
nodeConfig="$nodeConfig enable-cuda=1"
|
||||
fi
|
||||
else
|
||||
nodeConfig="mode=validator metrics-config=$SOLANA_METRICS_CONFIG"
|
||||
nodeConfig="mode=validator metrics-config=$SOLANA_METRICS_CONFIG leader-address=$publicIp"
|
||||
fi
|
||||
cat > "autogen-refresh-$vmName.sh" <<EOF
|
||||
set -x
|
||||
sudo snap remove solana
|
||||
sudo snap install solana --$SOLANA_SNAP_CHANNEL --devmode
|
||||
sudo snap set solana $nodeConfig
|
||||
snap info solana
|
||||
sudo snap logs solana -n200
|
||||
EOF
|
||||
|
||||
set -x
|
||||
gcloud compute scp --zone "$vmZone" "autogen-refresh-$vmName.sh" "$vmName":
|
||||
gcloud compute ssh "$vmName" --zone "$vmZone" \
|
||||
--ssh-flag="-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -t" \
|
||||
--command="bash ./autogen-refresh-$vmName.sh"
|
||||
--command="\
|
||||
set -ex; \
|
||||
logmarker='solana deploy $(date)/$RANDOM'; \
|
||||
sudo snap remove solana; \
|
||||
logger \$logmarker; \
|
||||
sudo snap install solana --$SOLANA_SNAP_CHANNEL --devmode; \
|
||||
sudo snap set solana $nodeConfig; \
|
||||
snap info solana; \
|
||||
echo Slight delay to get more syslog output; \
|
||||
sleep 2; \
|
||||
sudo grep -Pzo \"\$logmarker(.|\\n)*\" /var/log/syslog \
|
||||
"
|
||||
echo "Succeeded in ${SECONDS} seconds"
|
||||
) > "log-$vmName.txt" 2>&1 &
|
||||
pid=$!
|
||||
# Rename log file so it can be discovered later by $pid
|
||||
mv "log-$vmName.txt" "log-$pid.txt"
|
||||
|
||||
if $leader; then
|
||||
echo Waiting for leader...
|
||||
# Wait for the leader to initialize before starting the validators
|
||||
# TODO: Remove this limitation eventually.
|
||||
wait
|
||||
wait_for_node "$pid"
|
||||
|
||||
cat "log-$vmName.txt"
|
||||
echo "--- Refreshing validators"
|
||||
else
|
||||
# Slow down deployment to ~30 machines a minute to avoid triggering GCP login
|
||||
# quota limits (the previous |scp| and |ssh| each count as a login)
|
||||
sleep 2
|
||||
# Slow down deployment to ~20 machines a minute to avoid triggering GCP login
|
||||
# quota limits (each |ssh| counts as a login)
|
||||
sleep 3
|
||||
|
||||
logfiles+=("log-$vmName.txt")
|
||||
pids+=("$pid")
|
||||
fi
|
||||
leader=false
|
||||
count=$((count + 1))
|
||||
done
|
||||
|
||||
echo --- Waiting for validators
|
||||
wait
|
||||
|
||||
for log in "${logfiles[@]}"; do
|
||||
cat "$log"
|
||||
for pid in "${pids[@]}"; do
|
||||
wait_for_node "$pid"
|
||||
done
|
||||
|
||||
echo "--- $publicUrl sanity test"
|
||||
USE_SNAP=1 ./multinode-demo/test/wallet-sanity.sh $publicUrl
|
||||
USE_SNAP=1 ci/testnet-sanity.sh $publicUrl
|
||||
|
||||
exit 0
|
17
ci/testnet-sanity.sh
Executable file
17
ci/testnet-sanity.sh
Executable file
@@ -0,0 +1,17 @@
|
||||
#!/bin/bash -e
|
||||
#
|
||||
# Perform a quick sanity test on the specific testnet
|
||||
#
|
||||
|
||||
cd "$(dirname "$0")/.."
|
||||
|
||||
TESTNET=$1
|
||||
if [[ -z $TESTNET ]]; then
|
||||
TESTNET=testnet.solana.com
|
||||
fi
|
||||
|
||||
echo "--- $TESTNET: wallet sanity"
|
||||
multinode-demo/test/wallet-sanity.sh $TESTNET
|
||||
|
||||
echo --- fin
|
||||
exit 0
|
@@ -21,18 +21,13 @@ rsync_leader_url=$(rsync_url "$leader")
|
||||
|
||||
set -ex
|
||||
mkdir -p "$SOLANA_CONFIG_CLIENT_DIR"
|
||||
if [[ ! -r "$SOLANA_CONFIG_CLIENT_DIR"/leader.json ]]; then
|
||||
(
|
||||
set -x
|
||||
$rsync -vPz "$rsync_leader_url"/config/leader.json "$SOLANA_CONFIG_CLIENT_DIR"/
|
||||
)
|
||||
fi
|
||||
$rsync -vPz "$rsync_leader_url"/config/leader.json "$SOLANA_CONFIG_CLIENT_DIR"/
|
||||
|
||||
client_json="$SOLANA_CONFIG_CLIENT_DIR"/client.json
|
||||
if [[ ! -r $client_json ]]; then
|
||||
$solana_keygen -o "$client_json"
|
||||
fi
|
||||
[[ -r $client_json ]] || $solana_keygen -o "$client_json"
|
||||
|
||||
$solana_client_demo \
|
||||
-n "$count" \
|
||||
-l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json \
|
||||
-k "$SOLANA_CONFIG_CLIENT_DIR"/client.json \
|
||||
|
||||
# shellcheck disable=SC2086 # $solana_client_demo should not be quoted
|
||||
exec $solana_client_demo \
|
||||
-n "$count" -l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json -k "$SOLANA_CONFIG_CLIENT_DIR"/client.json
|
||||
|
@@ -10,6 +10,10 @@ if [[ -z $here ]]; then
|
||||
fi
|
||||
|
||||
rsync=rsync
|
||||
leader_logger="cat"
|
||||
validator_logger="cat"
|
||||
drone_logger="cat"
|
||||
|
||||
if [[ -d "$SNAP" ]]; then # Running inside a Linux Snap?
|
||||
solana_program() {
|
||||
declare program="$1"
|
||||
@@ -22,8 +26,17 @@ if [[ -d "$SNAP" ]]; then # Running inside a Linux Snap?
|
||||
fi
|
||||
}
|
||||
rsync="$SNAP"/bin/rsync
|
||||
multilog="$SNAP/bin/multilog t s16777215"
|
||||
leader_logger="$multilog $SNAP_DATA/leader"
|
||||
validator_logger="$multilog t $SNAP_DATA/validator"
|
||||
drone_logger="$multilog $SNAP_DATA/drone"
|
||||
# Create log directories manually to prevent multilog from creating them as
|
||||
# 0700
|
||||
mkdir -p "$SNAP_DATA"/{drone,leader,validator}
|
||||
|
||||
SOLANA_METRICS_CONFIG="$(snapctl get metrics-config)"
|
||||
SOLANA_CUDA="$(snapctl get enable-cuda)"
|
||||
RUST_LOG="$(snapctl get rust-log)"
|
||||
|
||||
elif [[ -n "$USE_SNAP" ]]; then # Use the Linux Snap binaries
|
||||
solana_program() {
|
||||
|
@@ -36,6 +36,7 @@ set -ex
|
||||
mkdir -p "$SOLANA_CONFIG_DIR"
|
||||
$rsync -vPz "$rsync_leader_url"/config/leader.json "$SOLANA_CONFIG_DIR"/
|
||||
|
||||
# shellcheck disable=SC2086 # $solana_drone should not be quoted
|
||||
exec $solana_drone \
|
||||
-l "$SOLANA_CONFIG_DIR"/leader.json -k "$SOLANA_CONFIG_PRIVATE_DIR"/mint.json
|
||||
set -o pipefail
|
||||
$solana_drone \
|
||||
-l "$SOLANA_CONFIG_DIR"/leader.json -k "$SOLANA_CONFIG_PRIVATE_DIR"/mint.json \
|
||||
2>&1 | $drone_logger
|
||||
|
@@ -25,15 +25,8 @@ fi
|
||||
|
||||
tune_networking
|
||||
|
||||
# migrate from old ledger format? why not...
|
||||
if [[ ! -f "$SOLANA_CONFIG_DIR"/ledger.log &&
|
||||
-f "$SOLANA_CONFIG_DIR"/genesis.log ]]; then
|
||||
(shopt -s nullglob &&
|
||||
cat "$SOLANA_CONFIG_DIR"/genesis.log \
|
||||
"$SOLANA_CONFIG_DIR"/tx-*.log) > "$SOLANA_CONFIG_DIR"/ledger.log
|
||||
fi
|
||||
|
||||
# shellcheck disable=SC2086 # $program should not be quoted
|
||||
exec $program \
|
||||
set -xo pipefail
|
||||
$program \
|
||||
--identity "$SOLANA_CONFIG_DIR"/leader.json \
|
||||
--ledger "$SOLANA_CONFIG_DIR"/ledger.log
|
||||
--ledger "$SOLANA_CONFIG_DIR"/ledger.log \
|
||||
2>&1 | $leader_logger
|
||||
|
14
multinode-demo/remote_leader.sh
Executable file
14
multinode-demo/remote_leader.sh
Executable file
@@ -0,0 +1,14 @@
|
||||
#!/bin/bash -e
|
||||
|
||||
[[ -n $FORCE ]] || exit
|
||||
|
||||
chmod 600 ~/.ssh/authorized_keys ~/.ssh/id_rsa
|
||||
|
||||
PATH="$HOME"/.cargo/bin:"$PATH"
|
||||
|
||||
./fetch-perf-libs.sh
|
||||
|
||||
# Run setup
|
||||
USE_INSTALL=1 ./multinode-demo/setup.sh -p
|
||||
USE_INSTALL=1 SOLANA_CUDA=1 ./multinode-demo/leader.sh >leader.log 2>&1 &
|
||||
USE_INSTALL=1 ./multinode-demo/drone.sh >drone.log 2>&1 &
|
186
multinode-demo/remote_nodes.sh
Executable file
186
multinode-demo/remote_nodes.sh
Executable file
@@ -0,0 +1,186 @@
|
||||
#!/bin/bash
|
||||
|
||||
command=$1
|
||||
ip_addr_file=
|
||||
remote_user=
|
||||
ssh_keys=
|
||||
|
||||
shift
|
||||
|
||||
usage() {
|
||||
exitcode=0
|
||||
if [[ -n "$1" ]]; then
|
||||
exitcode=1
|
||||
echo "Error: $*"
|
||||
fi
|
||||
cat <<EOF
|
||||
usage: $0 <start|stop> <-f IP Addr Array file> <-u username> [-k ssh-keys]
|
||||
|
||||
Manage a GCE multinode network
|
||||
|
||||
start|stop - Create or delete the network
|
||||
-f file - A bash script that exports an array of IP addresses, ip_addr_array.
|
||||
Elements of the array are public IP address of remote nodes.
|
||||
-u username - The username for logging into remote nodes.
|
||||
-k ssh-keys - Path to public/private key pair that remote nodes can use to perform
|
||||
rsync and ssh among themselves. Must contain pub, and priv keys.
|
||||
|
||||
EOF
|
||||
exit $exitcode
|
||||
}
|
||||
|
||||
while getopts "h?f:u:k:" opt; do
|
||||
case $opt in
|
||||
h | \?)
|
||||
usage
|
||||
;;
|
||||
f)
|
||||
ip_addr_file=$OPTARG
|
||||
;;
|
||||
u)
|
||||
remote_user=$OPTARG
|
||||
;;
|
||||
k)
|
||||
ssh_keys=$OPTARG
|
||||
;;
|
||||
*)
|
||||
usage "Error: unhandled option: $opt"
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
set -e
|
||||
|
||||
# Sample IP Address array file contents
|
||||
# ip_addr_array=(192.168.1.1 192.168.1.5 192.168.2.2)
|
||||
|
||||
[[ -n $command ]] || usage "Need a command (start|stop)"
|
||||
[[ -n $ip_addr_file ]] || usage "Need a file with IP address array"
|
||||
[[ -n $remote_user ]] || usage "Need the username for remote nodes"
|
||||
|
||||
ip_addr_array=()
|
||||
# Get IP address array
|
||||
# shellcheck source=/dev/null
|
||||
source "$ip_addr_file"
|
||||
|
||||
build_project() {
|
||||
echo "Build started at $(date)"
|
||||
SECONDS=0
|
||||
|
||||
# Build and install locally
|
||||
PATH="$HOME"/.cargo/bin:"$PATH"
|
||||
cargo install --force
|
||||
|
||||
echo "Build took $SECONDS seconds"
|
||||
}
|
||||
|
||||
common_start_setup() {
|
||||
ip_addr=$1
|
||||
|
||||
# Killing sshguard for now. TODO: Find a better solution
|
||||
# sshguard is blacklisting IP address after ssh-keyscan and ssh login attempts
|
||||
ssh -n -f "$remote_user@$ip_addr" " \
|
||||
set -ex; \
|
||||
sudo service sshguard stop; \
|
||||
sudo apt-get --assume-yes install rsync libssl-dev; \
|
||||
mkdir -p ~/.ssh ~/solana ~/.cargo/bin; \
|
||||
" >log/"$ip_addr".log
|
||||
|
||||
# If provided, deploy SSH keys
|
||||
if [[ -n $ssh_keys ]]; then
|
||||
{
|
||||
rsync -vPrz "$ssh_keys"/id_rsa "$remote_user@$ip_addr":~/.ssh/
|
||||
rsync -vPrz "$ssh_keys"/id_rsa.pub "$remote_user@$ip_addr":~/.ssh/
|
||||
rsync -vPrz "$ssh_keys"/id_rsa.pub "$remote_user@$ip_addr":~/.ssh/authorized_keys
|
||||
} >>log/"$ip_addr".log
|
||||
fi
|
||||
}
|
||||
|
||||
start_leader() {
|
||||
common_start_setup "$1"
|
||||
|
||||
{
|
||||
rsync -vPrz ~/.cargo/bin/solana* "$remote_user@$ip_addr":~/.cargo/bin/
|
||||
rsync -vPrz ./multinode-demo "$remote_user@$ip_addr":~/solana/
|
||||
rsync -vPrz ./fetch-perf-libs.sh "$remote_user@$ip_addr":~/solana/
|
||||
ssh -n -f "$remote_user@$ip_addr" 'cd solana; FORCE=1 ./multinode-demo/remote_leader.sh'
|
||||
} >>log/"$1".log
|
||||
|
||||
leader_ip=$1
|
||||
leader_time=$SECONDS
|
||||
SECONDS=0
|
||||
}
|
||||
|
||||
start_validator() {
|
||||
common_start_setup "$1"
|
||||
|
||||
ssh "$remote_user@$ip_addr" "rsync -vPrz ""$remote_user@$leader_ip"":~/solana/multinode-demo ~/solana/" >>log/"$1".log
|
||||
ssh -n -f "$remote_user@$ip_addr" "cd solana; FORCE=1 ./multinode-demo/remote_validator.sh $leader_ip" >>log/"$1".log
|
||||
}
|
||||
|
||||
start_all_nodes() {
|
||||
echo "Deployment started at $(date)"
|
||||
SECONDS=0
|
||||
count=0
|
||||
leader_ip=
|
||||
leader_time=
|
||||
|
||||
mkdir -p log
|
||||
|
||||
for ip_addr in "${ip_addr_array[@]}"; do
|
||||
ssh-keygen -R "$ip_addr" >log/local.log
|
||||
ssh-keyscan "$ip_addr" >>~/.ssh/known_hosts 2>/dev/null
|
||||
|
||||
if ((!count)); then
|
||||
# Start the leader on the first node
|
||||
echo "Leader node $ip_addr, killing previous instance and restarting"
|
||||
start_leader "$ip_addr"
|
||||
else
|
||||
# Start validator on all other nodes
|
||||
echo "Validator[$count] node $ip_addr, killing previous instance and restarting"
|
||||
start_validator "$ip_addr" &
|
||||
# TBD: Remove the sleep or reduce time once GCP login quota is increased
|
||||
sleep 2
|
||||
fi
|
||||
|
||||
((count = count + 1))
|
||||
done
|
||||
|
||||
wait
|
||||
|
||||
((validator_count = count - 1))
|
||||
|
||||
echo "Deployment finished at $(date)"
|
||||
echo "Leader deployment too $leader_time seconds"
|
||||
echo "$validator_count Validator deployment took $SECONDS seconds"
|
||||
}
|
||||
|
||||
stop_all_nodes() {
|
||||
SECONDS=0
|
||||
local count=0
|
||||
for ip_addr in "${ip_addr_array[@]}"; do
|
||||
echo "Stopping node[$count] $ip_addr. Remote user $remote_user"
|
||||
|
||||
ssh -n -f "$remote_user@$ip_addr" " \
|
||||
set -ex; \
|
||||
sudo service sshguard stop; \
|
||||
pkill -9 solana-; \
|
||||
pkill -9 validator; \
|
||||
pkill -9 leader; \
|
||||
"
|
||||
sleep 2
|
||||
((count = count + 1))
|
||||
echo "Stopped node[$count] $ip_addr"
|
||||
done
|
||||
echo "Stopping $count nodes took $SECONDS seconds"
|
||||
}
|
||||
|
||||
if [[ $command == "start" ]]; then
|
||||
#build_project
|
||||
stop_all_nodes
|
||||
start_all_nodes
|
||||
elif [[ $command == "stop" ]]; then
|
||||
stop_all_nodes
|
||||
else
|
||||
usage "Unknown command: $command"
|
||||
fi
|
16
multinode-demo/remote_validator.sh
Executable file
16
multinode-demo/remote_validator.sh
Executable file
@@ -0,0 +1,16 @@
|
||||
#!/bin/bash -e
|
||||
|
||||
[[ -n $FORCE ]] || exit
|
||||
|
||||
chmod 600 ~/.ssh/authorized_keys ~/.ssh/id_rsa
|
||||
|
||||
PATH="$HOME"/.cargo/bin:"$PATH"
|
||||
|
||||
ssh-keygen -R "$1"
|
||||
ssh-keyscan "$1" >>~/.ssh/known_hosts 2>/dev/null
|
||||
|
||||
rsync -vPrz "$1":~/.cargo/bin/solana* ~/.cargo/bin/
|
||||
|
||||
# Run setup
|
||||
USE_INSTALL=1 ./multinode-demo/setup.sh -p
|
||||
USE_INSTALL=1 ./multinode-demo/validator.sh "$1":~/solana "$1" >validator.log 2>&1
|
@@ -1,127 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
ip_addr_file=$1
|
||||
remote_user=$2
|
||||
ssh_keys=$3
|
||||
|
||||
usage() {
|
||||
echo -e "\\tUsage: $0 <IP Address array> <username> [path to ssh keys]\\n"
|
||||
echo -e "\\t <IP Address array>: A bash script that exports an array of IP addresses, ip_addr_array. Elements of the array are public IP address of remote nodes."
|
||||
echo -e "\\t <username>: The username for logging into remote nodes."
|
||||
echo -e "\\t [path to ssh keys]: The public/private key pair that remote nodes can use to perform rsync and ssh among themselves. Must contain pub, priv and authorized_keys.\\n"
|
||||
exit 1
|
||||
}
|
||||
|
||||
# Sample IP Address array file contents
|
||||
# ip_addr_array=(192.168.1.1 192.168.1.5 192.168.2.2)
|
||||
|
||||
if [[ -z "$ip_addr_file" ]]; then
|
||||
usage
|
||||
fi
|
||||
|
||||
if [[ -z "$remote_user" ]]; then
|
||||
usage
|
||||
fi
|
||||
|
||||
echo "Build started at $(date)"
|
||||
SECONDS=0
|
||||
# Build and install locally
|
||||
PATH="$HOME"/.cargo/bin:"$PATH"
|
||||
cargo install --force
|
||||
|
||||
echo "Build took $SECONDS seconds"
|
||||
|
||||
ip_addr_array=()
|
||||
# Get IP address array
|
||||
# shellcheck source=/dev/null
|
||||
source "$ip_addr_file"
|
||||
|
||||
# shellcheck disable=SC2089,SC2016
|
||||
ssh_command_prefix='export PATH="$HOME/.cargo/bin:$PATH"; cd solana; USE_INSTALL=1'
|
||||
|
||||
echo "Deployment started at $(date)"
|
||||
SECONDS=0
|
||||
count=0
|
||||
leader_ip=
|
||||
|
||||
common_setup() {
|
||||
ip_addr=$1
|
||||
|
||||
ssh -n -f "$remote_user@$ip_addr" 'mkdir -p ~/.ssh ~/solana ~/.cargo/bin'
|
||||
|
||||
# Killing sshguard for now. TODO: Find a better solution
|
||||
# sshguard is blacklisting IP address after ssh-keyscan and ssh login attempts
|
||||
ssh -n -f "$remote_user@$ip_addr" "sudo service sshguard stop"
|
||||
ssh -n -f "$remote_user@$ip_addr" 'sudo apt-get --assume-yes install rsync libssl-dev'
|
||||
|
||||
# If provided, deploy SSH keys
|
||||
if [[ -z $ssh_keys ]]; then
|
||||
echo "skip copying the ssh keys"
|
||||
else
|
||||
rsync -vPrz "$ssh_keys"/id_rsa "$remote_user@$ip_addr":~/.ssh/
|
||||
rsync -vPrz "$ssh_keys"/id_rsa.pub "$remote_user@$ip_addr":~/.ssh/
|
||||
rsync -vPrz "$ssh_keys"/id_rsa.pub "$remote_user@$ip_addr":~/.ssh/authorized_keys
|
||||
ssh -n -f "$remote_user@$ip_addr" 'chmod 600 ~/.ssh/authorized_keys ~/.ssh/id_rsa'
|
||||
fi
|
||||
|
||||
# Stop current nodes
|
||||
ssh "$remote_user@$ip_addr" 'pkill -9 solana-'
|
||||
}
|
||||
|
||||
leader() {
|
||||
common_setup "$1"
|
||||
|
||||
rsync -vPrz ~/.cargo/bin/solana* "$remote_user@$ip_addr":~/.cargo/bin/ # Deploy build and scripts to remote node
|
||||
rsync -vPrz ./multinode-demo "$remote_user@$ip_addr":~/solana/
|
||||
rsync -vPrz ./fetch-perf-libs.sh "$remote_user@$ip_addr":~/solana/
|
||||
|
||||
# Run setup
|
||||
ssh "$remote_user@$ip_addr" "$ssh_command_prefix"' ./multinode-demo/setup.sh -p "$ip_addr"'
|
||||
|
||||
echo "Starting leader node $ip_addr"
|
||||
ssh -n -f "$remote_user@$ip_addr" 'cd solana; ./fetch-perf-libs.sh'
|
||||
ssh -n -f "$remote_user@$ip_addr" "$ssh_command_prefix"' SOLANA_CUDA=1 ./multinode-demo/leader.sh > leader.log 2>&1'
|
||||
ssh -n -f "$remote_user@$ip_addr" "$ssh_command_prefix"' ./multinode-demo/drone.sh > drone.log 2>&1'
|
||||
leader_ip=${ip_addr_array[0]}
|
||||
}
|
||||
|
||||
validator() {
|
||||
common_setup "$1"
|
||||
|
||||
echo "Adding known hosts for $ip_addr"
|
||||
ssh "$remote_user@$ip_addr" "ssh-keygen -R ""$leader_ip"
|
||||
ssh "$remote_user@$ip_addr" "ssh-keyscan ""$leader_ip >> ~/.ssh/known_hosts"
|
||||
|
||||
ssh "$remote_user@$ip_addr" "rsync -vPrz ""$remote_user@$leader_ip"":~/.cargo/bin/solana* ~/.cargo/bin/"
|
||||
ssh "$remote_user@$ip_addr" "rsync -vPrz ""$remote_user@$leader_ip"":~/solana/multinode-demo ~/solana/"
|
||||
ssh "$remote_user@$ip_addr" "rsync -vPrz ""$remote_user@$leader_ip"":~/solana/fetch-perf-libs.sh ~/solana/"
|
||||
|
||||
# Run setup
|
||||
ssh "$remote_user@$ip_addr" "$ssh_command_prefix"' ./multinode-demo/setup.sh -p "$ip_addr"'
|
||||
|
||||
echo "Starting validator node $ip_addr"
|
||||
ssh -n -f "$remote_user@$ip_addr" "$ssh_command_prefix"" ./multinode-demo/validator.sh $remote_user@$leader_ip:~/solana $leader_ip > validator.log 2>&1"
|
||||
}
|
||||
|
||||
for ip_addr in "${ip_addr_array[@]}"; do
|
||||
echo "$ip_addr"
|
||||
ssh-keygen -R "$ip_addr"
|
||||
ssh-keyscan "$ip_addr" >>~/.ssh/known_hosts
|
||||
|
||||
if ((!count)); then
|
||||
# Start the leader on the first node
|
||||
leader "$ip_addr"
|
||||
else
|
||||
# Start validator on all other nodes
|
||||
validator "$ip_addr" &
|
||||
# TBD: Remove the sleep or reduce time once GCP login quota is increased
|
||||
sleep 2
|
||||
fi
|
||||
|
||||
((count++))
|
||||
done
|
||||
|
||||
wait
|
||||
|
||||
echo "Deployment finished at $(date)"
|
||||
echo "Deployment took $SECONDS seconds"
|
@@ -88,8 +88,9 @@ while ! $solana_wallet \
|
||||
sleep 1
|
||||
done
|
||||
|
||||
# shellcheck disable=SC2086 # $program should not be quoted
|
||||
exec $program \
|
||||
set -o pipefail
|
||||
$program \
|
||||
--identity "$SOLANA_CONFIG_DIR"/validator.json \
|
||||
--testnet "$leader_address:$leader_port" \
|
||||
--ledger "$SOLANA_LEADER_CONFIG_DIR"/ledger.log
|
||||
--ledger "$SOLANA_LEADER_CONFIG_DIR"/ledger.log \
|
||||
2>&1 | $validator_logger
|
||||
|
@@ -59,18 +59,24 @@ apps:
|
||||
plugs:
|
||||
- network
|
||||
- home
|
||||
|
||||
daemon-validator:
|
||||
daemon: simple
|
||||
command: validator.sh
|
||||
|
||||
plugs:
|
||||
- network
|
||||
- network-bind
|
||||
daemon-leader:
|
||||
daemon: simple
|
||||
command: leader.sh
|
||||
|
||||
plugs:
|
||||
- network
|
||||
- network-bind
|
||||
daemon-drone:
|
||||
daemon: simple
|
||||
command: drone.sh
|
||||
plugs:
|
||||
- network
|
||||
- network-bind
|
||||
|
||||
parts:
|
||||
solana:
|
||||
@@ -104,8 +110,9 @@ parts:
|
||||
mkdir -p $SNAPCRAFT_PART_INSTALL/bin
|
||||
cp -av multinode-demo/* $SNAPCRAFT_PART_INSTALL/bin/
|
||||
|
||||
# TODO: build rsync from source instead of sneaking it in from the host
|
||||
# TODO: build rsync/multilog from source instead of sneaking it in from the host
|
||||
# system...
|
||||
set -x
|
||||
mkdir -p $SNAPCRAFT_PART_INSTALL/bin
|
||||
cp -av /usr/bin/rsync $SNAPCRAFT_PART_INSTALL/bin/
|
||||
cp -av /usr/bin/multilog $SNAPCRAFT_PART_INSTALL/bin/
|
||||
|
14
src/bank.rs
14
src/bank.rs
@@ -6,7 +6,7 @@
|
||||
extern crate libc;
|
||||
|
||||
use chrono::prelude::*;
|
||||
use counter::{self, Counter};
|
||||
use counter::Counter;
|
||||
use entry::Entry;
|
||||
use hash::Hash;
|
||||
use itertools::Itertools;
|
||||
@@ -204,17 +204,9 @@ impl Bank {
|
||||
let option = bals.get_mut(&tx.from);
|
||||
if option.is_none() {
|
||||
if let Instruction::NewVote(_) = &tx.instruction {
|
||||
static mut COUNTER_VOTE_ACCOUNT_NOT_FOUND: Counter = create_counter!(
|
||||
"bank-appy_debits-vote_account_not_found",
|
||||
counter::DEFAULT_LOG_RATE
|
||||
);
|
||||
inc_counter!(COUNTER_VOTE_ACCOUNT_NOT_FOUND, 1);
|
||||
inc_new_counter!("bank-appy_debits-vote_account_not_found", 1);
|
||||
} else {
|
||||
static mut COUNTER_ACCOUNT_NOT_FOUND: Counter = create_counter!(
|
||||
"bank-appy_debits-generic_account_not_found",
|
||||
counter::DEFAULT_LOG_RATE
|
||||
);
|
||||
inc_counter!(COUNTER_ACCOUNT_NOT_FOUND, 1);
|
||||
inc_new_counter!("bank-appy_debits-generic_account_not_found", 1);
|
||||
}
|
||||
return Err(BankError::AccountNotFound(tx.from));
|
||||
}
|
||||
|
@@ -40,7 +40,7 @@ impl BankingStage {
|
||||
.name("solana-banking-stage".to_string())
|
||||
.spawn(move || loop {
|
||||
if let Err(e) = Self::process_packets(
|
||||
&bank.clone(),
|
||||
&bank,
|
||||
&verified_receiver,
|
||||
&signal_sender,
|
||||
&packet_recycler,
|
||||
@@ -89,7 +89,6 @@ impl BankingStage {
|
||||
mms.len(),
|
||||
);
|
||||
let count = mms.iter().map(|x| x.1.len()).sum();
|
||||
static mut COUNTER: Counter = create_counter!("banking_stage_process_packets", 1);
|
||||
let proc_start = Instant::now();
|
||||
for (msgs, vers) in mms {
|
||||
let transactions = Self::deserialize_transactions(&msgs.read().unwrap());
|
||||
@@ -125,7 +124,7 @@ impl BankingStage {
|
||||
reqs_len,
|
||||
(reqs_len as f32) / (total_time_s)
|
||||
);
|
||||
inc_counter!(COUNTER, count);
|
||||
inc_new_counter!("banking_stage-process_packets", count);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
10
src/bin/client-demo.rs
Normal file → Executable file
10
src/bin/client-demo.rs
Normal file → Executable file
@@ -211,7 +211,7 @@ fn main() {
|
||||
threads = t.to_string().parse().expect("integer");
|
||||
}
|
||||
|
||||
if let Some(n) = matches.value_of("nodes") {
|
||||
if let Some(n) = matches.value_of("num_nodes") {
|
||||
num_nodes = n.to_string().parse().expect("integer");
|
||||
}
|
||||
|
||||
@@ -224,7 +224,7 @@ fn main() {
|
||||
|
||||
let signal = Arc::new(AtomicBool::new(false));
|
||||
let mut c_threads = vec![];
|
||||
let validators = converge(&leader, &signal.clone(), num_nodes, &mut c_threads);
|
||||
let validators = converge(&leader, &signal, num_nodes, &mut c_threads);
|
||||
assert_eq!(validators.len(), num_nodes);
|
||||
|
||||
let mut client = mk_client(&leader);
|
||||
@@ -365,6 +365,8 @@ fn spy_node() -> (NodeInfo, UdpSocket) {
|
||||
let gossip_socket_pair = udp_public_bind("gossip", 8000, 10000);
|
||||
let pubkey = KeyPair::new().pubkey();
|
||||
let daddr = "0.0.0.0:0".parse().unwrap();
|
||||
assert!(!gossip_socket_pair.addr.ip().is_unspecified());
|
||||
assert!(!gossip_socket_pair.addr.ip().is_multicast());
|
||||
let node = NodeInfo::new(
|
||||
pubkey,
|
||||
//gossip.local_addr().unwrap(),
|
||||
@@ -386,14 +388,14 @@ fn converge(
|
||||
//lets spy on the network
|
||||
let daddr = "0.0.0.0:0".parse().unwrap();
|
||||
let (spy, spy_gossip) = spy_node();
|
||||
let mut spy_crdt = Crdt::new(spy);
|
||||
let mut spy_crdt = Crdt::new(spy).expect("Crdt::new");
|
||||
spy_crdt.insert(&leader);
|
||||
spy_crdt.set_leader(leader.id);
|
||||
let spy_ref = Arc::new(RwLock::new(spy_crdt));
|
||||
let window = default_window();
|
||||
let gossip_send_socket = udp_random_bind(8000, 10000, 5).unwrap();
|
||||
let ncp = Ncp::new(
|
||||
&spy_ref.clone(),
|
||||
&spy_ref,
|
||||
window.clone(),
|
||||
spy_gossip,
|
||||
gossip_send_socket,
|
||||
|
@@ -12,6 +12,7 @@ use clap::{App, Arg};
|
||||
use solana::crdt::NodeInfo;
|
||||
use solana::drone::{Drone, DroneRequest};
|
||||
use solana::fullnode::Config;
|
||||
use solana::metrics::set_panic_hook;
|
||||
use solana::signature::read_keypair;
|
||||
use std::fs::File;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
@@ -23,7 +24,8 @@ use tokio_codec::{BytesCodec, Decoder};
|
||||
|
||||
fn main() {
|
||||
env_logger::init();
|
||||
let matches = App::new("solana-client-demo")
|
||||
set_panic_hook("drone");
|
||||
let matches = App::new("drone")
|
||||
.arg(
|
||||
Arg::with_name("leader")
|
||||
.short("l")
|
||||
|
@@ -8,6 +8,7 @@ extern crate solana;
|
||||
use clap::{App, Arg};
|
||||
use solana::crdt::{NodeInfo, TestNode};
|
||||
use solana::fullnode::{Config, FullNode, LedgerFile};
|
||||
use solana::metrics::set_panic_hook;
|
||||
use solana::service::Service;
|
||||
use solana::signature::{KeyPair, KeyPairUtil};
|
||||
use std::fs::File;
|
||||
@@ -17,6 +18,7 @@ use std::process::exit;
|
||||
|
||||
fn main() -> () {
|
||||
env_logger::init();
|
||||
set_panic_hook("fullnode");
|
||||
let matches = App::new("fullnode")
|
||||
.arg(
|
||||
Arg::with_name("identity")
|
||||
|
@@ -29,7 +29,7 @@ impl<'a, 'b> ChooseRandomPeerStrategy<'a> {
|
||||
impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> {
|
||||
fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> {
|
||||
if options.is_empty() {
|
||||
Err(CrdtError::TooSmall)?;
|
||||
Err(CrdtError::NoPeers)?;
|
||||
}
|
||||
|
||||
let n = ((self.random)() as usize) % options.len();
|
||||
@@ -174,7 +174,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> {
|
||||
impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> {
|
||||
fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> {
|
||||
if options.is_empty() {
|
||||
Err(CrdtError::TooSmall)?;
|
||||
Err(CrdtError::NoPeers)?;
|
||||
}
|
||||
|
||||
let mut weighted_peers = vec![];
|
||||
|
@@ -1,10 +1,10 @@
|
||||
use influx_db_client as influxdb;
|
||||
use metrics;
|
||||
use std::env;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use timing;
|
||||
|
||||
const INFLUX_RATE: usize = 100;
|
||||
pub const DEFAULT_LOG_RATE: usize = 10;
|
||||
const DEFAULT_METRICS_RATE: usize = 100;
|
||||
|
||||
pub struct Counter {
|
||||
pub name: &'static str,
|
||||
@@ -13,7 +13,7 @@ pub struct Counter {
|
||||
pub times: AtomicUsize,
|
||||
/// last accumulated value logged
|
||||
pub lastlog: AtomicUsize,
|
||||
pub lograte: usize,
|
||||
pub lograte: AtomicUsize,
|
||||
}
|
||||
|
||||
macro_rules! create_counter {
|
||||
@@ -23,7 +23,7 @@ macro_rules! create_counter {
|
||||
counts: AtomicUsize::new(0),
|
||||
times: AtomicUsize::new(0),
|
||||
lastlog: AtomicUsize::new(0),
|
||||
lograte: $lograte,
|
||||
lograte: AtomicUsize::new($lograte),
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -34,12 +34,38 @@ macro_rules! inc_counter {
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! inc_new_counter {
|
||||
($name:expr, $count:expr) => {{
|
||||
static mut INC_NEW_COUNTER: Counter = create_counter!($name, 0);
|
||||
inc_counter!(INC_NEW_COUNTER, $count);
|
||||
}};
|
||||
($name:expr, $count:expr, $lograte:expr) => {{
|
||||
static mut INC_NEW_COUNTER: Counter = create_counter!($name, $lograte);
|
||||
inc_counter!(INC_NEW_COUNTER, $count);
|
||||
}};
|
||||
}
|
||||
|
||||
impl Counter {
|
||||
fn default_log_rate() -> usize {
|
||||
let v = env::var("SOLANA_DEFAULT_METRICS_RATE")
|
||||
.map(|x| x.parse().unwrap_or(DEFAULT_METRICS_RATE))
|
||||
.unwrap_or(DEFAULT_METRICS_RATE);
|
||||
if v == 0 {
|
||||
DEFAULT_METRICS_RATE
|
||||
} else {
|
||||
v
|
||||
}
|
||||
}
|
||||
pub fn inc(&mut self, events: usize) {
|
||||
let counts = self.counts.fetch_add(events, Ordering::Relaxed);
|
||||
let times = self.times.fetch_add(1, Ordering::Relaxed);
|
||||
let lastlog = self.lastlog.load(Ordering::Relaxed);
|
||||
if times % self.lograte == 0 && times > 0 {
|
||||
let mut lograte = self.lograte.load(Ordering::Relaxed);
|
||||
if lograte == 0 {
|
||||
lograte = Counter::default_log_rate();
|
||||
self.lograte.store(lograte, Ordering::Relaxed);
|
||||
}
|
||||
if times % lograte == 0 && times > 0 {
|
||||
let lastlog = self.lastlog.load(Ordering::Relaxed);
|
||||
info!(
|
||||
"COUNTER:{{\"name\": \"{}\", \"counts\": {}, \"samples\": {}, \"now\": {}}}",
|
||||
self.name,
|
||||
@@ -47,10 +73,8 @@ impl Counter {
|
||||
times,
|
||||
timing::timestamp(),
|
||||
);
|
||||
}
|
||||
if times % INFLUX_RATE == 0 && times > 0 {
|
||||
metrics::submit(
|
||||
influxdb::Point::new(&format!("counter_{}", self.name))
|
||||
influxdb::Point::new(&format!("counter-{}", self.name))
|
||||
.add_field(
|
||||
"count",
|
||||
influxdb::Value::Integer(counts as i64 - lastlog as i64),
|
||||
@@ -64,7 +88,8 @@ impl Counter {
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use counter::Counter;
|
||||
use counter::{Counter, DEFAULT_METRICS_RATE};
|
||||
use std::env;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
#[test]
|
||||
fn test_counter() {
|
||||
@@ -74,7 +99,7 @@ mod tests {
|
||||
unsafe {
|
||||
assert_eq!(COUNTER.counts.load(Ordering::Relaxed), 1);
|
||||
assert_eq!(COUNTER.times.load(Ordering::Relaxed), 1);
|
||||
assert_eq!(COUNTER.lograte, 100);
|
||||
assert_eq!(COUNTER.lograte.load(Ordering::Relaxed), 100);
|
||||
assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 0);
|
||||
assert_eq!(COUNTER.name, "test");
|
||||
}
|
||||
@@ -89,4 +114,42 @@ mod tests {
|
||||
assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 399);
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn test_inc_new_counter() {
|
||||
//make sure that macros are syntactically correct
|
||||
//the variable is internal to the macro scope so there is no way to introspect it
|
||||
inc_new_counter!("counter-1", 1);
|
||||
inc_new_counter!("counter-2", 1, 2);
|
||||
}
|
||||
#[test]
|
||||
fn test_lograte() {
|
||||
static mut COUNTER: Counter = create_counter!("test_lograte", 0);
|
||||
inc_counter!(COUNTER, 2);
|
||||
unsafe {
|
||||
assert_eq!(
|
||||
COUNTER.lograte.load(Ordering::Relaxed),
|
||||
DEFAULT_METRICS_RATE
|
||||
);
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn test_lograte_env() {
|
||||
assert_ne!(DEFAULT_METRICS_RATE, 0);
|
||||
static mut COUNTER: Counter = create_counter!("test_lograte_env", 0);
|
||||
env::set_var("SOLANA_DEFAULT_METRICS_RATE", "50");
|
||||
inc_counter!(COUNTER, 2);
|
||||
unsafe {
|
||||
assert_eq!(COUNTER.lograte.load(Ordering::Relaxed), 50);
|
||||
}
|
||||
|
||||
static mut COUNTER2: Counter = create_counter!("test_lograte_env", 0);
|
||||
env::set_var("SOLANA_DEFAULT_METRICS_RATE", "0");
|
||||
inc_counter!(COUNTER2, 2);
|
||||
unsafe {
|
||||
assert_eq!(
|
||||
COUNTER2.lograte.load(Ordering::Relaxed),
|
||||
DEFAULT_METRICS_RATE
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
299
src/crdt.rs
299
src/crdt.rs
@@ -37,8 +37,6 @@ use streamer::{BlobReceiver, BlobSender, Window};
|
||||
use timing::timestamp;
|
||||
use transaction::Vote;
|
||||
|
||||
const LOG_RATE: usize = 10;
|
||||
|
||||
/// milliseconds we sleep for between gossip requests
|
||||
const GOSSIP_SLEEP_MILLIS: u64 = 100;
|
||||
const GOSSIP_PURGE_MILLIS: u64 = 15000;
|
||||
@@ -48,8 +46,11 @@ const MIN_TABLE_SIZE: usize = 2;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum CrdtError {
|
||||
TooSmall,
|
||||
NoPeers,
|
||||
NoLeader,
|
||||
BadContactInfo,
|
||||
BadNodeInfo,
|
||||
BadGossipAddress,
|
||||
}
|
||||
|
||||
pub fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr {
|
||||
@@ -150,7 +151,7 @@ impl NodeInfo {
|
||||
rpu: SocketAddr,
|
||||
tpu: SocketAddr,
|
||||
tvu_window: SocketAddr,
|
||||
) -> NodeInfo {
|
||||
) -> Self {
|
||||
NodeInfo {
|
||||
id,
|
||||
version: 0,
|
||||
@@ -169,6 +170,35 @@ impl NodeInfo {
|
||||
},
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
/// NodeInfo with unspecified addresses for adversarial testing.
|
||||
pub fn new_unspecified() -> Self {
|
||||
let addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||
assert!(addr.ip().is_unspecified());
|
||||
Self::new(
|
||||
KeyPair::new().pubkey(),
|
||||
addr.clone(),
|
||||
addr.clone(),
|
||||
addr.clone(),
|
||||
addr.clone(),
|
||||
addr.clone(),
|
||||
)
|
||||
}
|
||||
#[cfg(test)]
|
||||
/// NodeInfo with multicast addresses for adversarial testing.
|
||||
pub fn new_multicast() -> Self {
|
||||
let addr: SocketAddr = "224.0.1.255:1000".parse().unwrap();
|
||||
assert!(addr.ip().is_multicast());
|
||||
Self::new(
|
||||
KeyPair::new().pubkey(),
|
||||
addr.clone(),
|
||||
addr.clone(),
|
||||
addr.clone(),
|
||||
addr.clone(),
|
||||
addr.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn debug_id(&self) -> u64 {
|
||||
make_debug_id(&self.id)
|
||||
}
|
||||
@@ -256,8 +286,31 @@ enum Protocol {
|
||||
}
|
||||
|
||||
impl Crdt {
|
||||
pub fn new(me: NodeInfo) -> Crdt {
|
||||
assert_eq!(me.version, 0);
|
||||
pub fn new(me: NodeInfo) -> Result<Crdt> {
|
||||
if me.version != 0 {
|
||||
return Err(Error::CrdtError(CrdtError::BadNodeInfo));
|
||||
}
|
||||
if me.contact_info.ncp.ip().is_unspecified()
|
||||
|| me.contact_info.ncp.port() == 0
|
||||
|| me.contact_info.ncp.ip().is_multicast()
|
||||
{
|
||||
return Err(Error::CrdtError(CrdtError::BadGossipAddress));
|
||||
}
|
||||
for addr in &[
|
||||
me.contact_info.tvu,
|
||||
me.contact_info.rpu,
|
||||
me.contact_info.tpu,
|
||||
me.contact_info.tvu_window,
|
||||
] {
|
||||
//dummy address is allowed, services will filter them
|
||||
if addr.ip().is_unspecified() && addr.port() == 0 {
|
||||
continue;
|
||||
}
|
||||
//if addr is not a dummy address, than it must be valid
|
||||
if addr.ip().is_unspecified() || addr.port() == 0 || addr.ip().is_multicast() {
|
||||
return Err(Error::CrdtError(CrdtError::BadContactInfo));
|
||||
}
|
||||
}
|
||||
let mut g = Crdt {
|
||||
table: HashMap::new(),
|
||||
local: HashMap::new(),
|
||||
@@ -269,7 +322,7 @@ impl Crdt {
|
||||
};
|
||||
g.local.insert(me.id, g.update_index);
|
||||
g.table.insert(me.id, me);
|
||||
g
|
||||
Ok(g)
|
||||
}
|
||||
pub fn debug_id(&self) -> u64 {
|
||||
make_debug_id(&self.me)
|
||||
@@ -348,8 +401,7 @@ impl Crdt {
|
||||
}
|
||||
}
|
||||
pub fn insert_votes(&mut self, votes: &[(PublicKey, Vote, Hash)]) {
|
||||
static mut COUNTER_VOTE: Counter = create_counter!("crdt-vote-count", LOG_RATE);
|
||||
inc_counter!(COUNTER_VOTE, votes.len());
|
||||
inc_new_counter!("crdt-vote-count", votes.len());
|
||||
if !votes.is_empty() {
|
||||
info!("{:x}: INSERTING VOTES {}", self.debug_id(), votes.len());
|
||||
}
|
||||
@@ -369,12 +421,14 @@ impl Crdt {
|
||||
v.debug_id(),
|
||||
v.version
|
||||
);
|
||||
if self.table.get(&v.id).is_none() {
|
||||
inc_new_counter!("crdt-insert-new_entry", 1, 1);
|
||||
}
|
||||
|
||||
self.update_index += 1;
|
||||
let _ = self.table.insert(v.id, v.clone());
|
||||
let _ = self.local.insert(v.id, self.update_index);
|
||||
static mut COUNTER_UPDATE: Counter = create_counter!("crdt-update-count", LOG_RATE);
|
||||
inc_counter!(COUNTER_UPDATE, 1);
|
||||
inc_new_counter!("crdt-update-count", 1);
|
||||
} else {
|
||||
trace!(
|
||||
"{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}",
|
||||
@@ -446,8 +500,7 @@ impl Crdt {
|
||||
})
|
||||
.collect();
|
||||
|
||||
static mut COUNTER_PURGE: Counter = create_counter!("crdt-purge-count", LOG_RATE);
|
||||
inc_counter!(COUNTER_PURGE, dead_ids.len());
|
||||
inc_new_counter!("crdt-purge-count", dead_ids.len());
|
||||
|
||||
for id in &dead_ids {
|
||||
self.alive.remove(id);
|
||||
@@ -522,7 +575,8 @@ impl Crdt {
|
||||
) -> Result<()> {
|
||||
if broadcast_table.is_empty() {
|
||||
warn!("{:x}:not enough peers in crdt table", me.debug_id());
|
||||
Err(CrdtError::TooSmall)?;
|
||||
inc_new_counter!("crdt-broadcast-not_enough_peers_error", 1, 1);
|
||||
Err(CrdtError::NoPeers)?;
|
||||
}
|
||||
trace!("broadcast nodes {}", broadcast_table.len());
|
||||
|
||||
@@ -628,7 +682,8 @@ impl Crdt {
|
||||
.collect();
|
||||
for e in errs {
|
||||
if let Err(e) = &e {
|
||||
error!("broadcast result {:?}", e);
|
||||
inc_new_counter!("crdt-retransmit-send_to_error", 1, 1);
|
||||
error!("retransmit result {:?}", e);
|
||||
}
|
||||
e?;
|
||||
}
|
||||
@@ -669,7 +724,7 @@ impl Crdt {
|
||||
.filter(|r| r.id != self.me && r.contact_info.tvu_window != daddr)
|
||||
.collect();
|
||||
if valid.is_empty() {
|
||||
Err(CrdtError::TooSmall)?;
|
||||
Err(CrdtError::NoPeers)?;
|
||||
}
|
||||
let n = (Self::random() as usize) % valid.len();
|
||||
let addr = valid[n].contact_info.ncp;
|
||||
@@ -684,7 +739,14 @@ impl Crdt {
|
||||
/// * A - Address to send to
|
||||
/// * B - RequestUpdates protocol message
|
||||
fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> {
|
||||
let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect();
|
||||
let options: Vec<_> = self.table
|
||||
.values()
|
||||
.filter(|v| {
|
||||
v.id != self.me
|
||||
&& !v.contact_info.ncp.ip().is_unspecified()
|
||||
&& !v.contact_info.ncp.ip().is_multicast()
|
||||
})
|
||||
.collect();
|
||||
|
||||
let choose_peer_strategy = ChooseWeightedPeerStrategy::new(
|
||||
&self.remote,
|
||||
@@ -694,7 +756,7 @@ impl Crdt {
|
||||
|
||||
let choose_peer_result = choose_peer_strategy.choose_peer(options);
|
||||
|
||||
if let Err(Error::CrdtError(CrdtError::TooSmall)) = &choose_peer_result {
|
||||
if let Err(Error::CrdtError(CrdtError::NoPeers)) = &choose_peer_result {
|
||||
trace!(
|
||||
"crdt too small for gossip {:x} {}",
|
||||
self.debug_id(),
|
||||
@@ -898,24 +960,18 @@ impl Crdt {
|
||||
outblob.meta.set_addr(&from.contact_info.tvu_window);
|
||||
outblob.set_id(sender_id).expect("blob set_id");
|
||||
}
|
||||
static mut COUNTER_REQ_WINDOW_PASS: Counter =
|
||||
create_counter!("crdt-window-request-pass", LOG_RATE);
|
||||
inc_counter!(COUNTER_REQ_WINDOW_PASS, 1);
|
||||
inc_new_counter!("crdt-window-request-pass", 1);
|
||||
|
||||
return Some(out);
|
||||
} else {
|
||||
static mut COUNTER_REQ_WINDOW_OUTSIDE: Counter =
|
||||
create_counter!("crdt-window-request-outside", LOG_RATE);
|
||||
inc_counter!(COUNTER_REQ_WINDOW_OUTSIDE, 1);
|
||||
inc_new_counter!("crdt-window-request-outside", 1);
|
||||
info!(
|
||||
"requested ix {} != blob_ix {}, outside window!",
|
||||
ix, blob_ix
|
||||
);
|
||||
}
|
||||
} else {
|
||||
static mut COUNTER_REQ_WINDOW_FAIL: Counter =
|
||||
create_counter!("crdt-window-request-fail", LOG_RATE);
|
||||
inc_counter!(COUNTER_REQ_WINDOW_FAIL, 1);
|
||||
inc_new_counter!("crdt-window-request-fail", 1);
|
||||
assert!(window.read().unwrap()[pos].is_none());
|
||||
info!(
|
||||
"{:x}: failed RequestWindowIndex {:x} {} {}",
|
||||
@@ -991,16 +1047,23 @@ impl Crdt {
|
||||
//TODO verify from is signed
|
||||
obj.write().unwrap().insert(&from);
|
||||
let me = obj.read().unwrap().my_data().clone();
|
||||
static mut COUNTER_REQ_WINDOW: Counter =
|
||||
create_counter!("crdt-window-request-recv", LOG_RATE);
|
||||
inc_counter!(COUNTER_REQ_WINDOW, 1);
|
||||
inc_new_counter!("crdt-window-request-recv", 1);
|
||||
trace!(
|
||||
"{:x}:received RequestWindowIndex {:x} {} ",
|
||||
me.debug_id(),
|
||||
from.debug_id(),
|
||||
ix,
|
||||
);
|
||||
assert_ne!(from.contact_info.tvu_window, me.contact_info.tvu_window);
|
||||
if from.contact_info.tvu_window == me.contact_info.tvu_window {
|
||||
warn!(
|
||||
"Ignored {:x}:received RequestWindowIndex from ME {:x} {} ",
|
||||
me.debug_id(),
|
||||
from.debug_id(),
|
||||
ix,
|
||||
);
|
||||
inc_new_counter!("crdt-window-request-address-eq", 1);
|
||||
return None;
|
||||
}
|
||||
Self::run_window_request(&window, &me, &from, ix, blob_recycler)
|
||||
}
|
||||
Err(_) => {
|
||||
@@ -1084,23 +1147,18 @@ pub struct TestNode {
|
||||
pub sockets: Sockets,
|
||||
}
|
||||
|
||||
impl Default for TestNode {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl TestNode {
|
||||
pub fn new() -> Self {
|
||||
pub fn new_localhost() -> Self {
|
||||
let pubkey = KeyPair::new().pubkey();
|
||||
Self::new_with_pubkey(pubkey)
|
||||
Self::new_localhost_with_pubkey(pubkey)
|
||||
}
|
||||
pub fn new_with_pubkey(pubkey: PublicKey) -> Self {
|
||||
let transaction = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let requests = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let repair = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
pub fn new_localhost_with_pubkey(pubkey: PublicKey) -> Self {
|
||||
let transaction = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let requests = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let repair = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
|
||||
let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
@@ -1203,6 +1261,107 @@ mod tests {
|
||||
let p3 = parse_port_or_addr(None);
|
||||
assert_eq!(p3.port(), 8000);
|
||||
}
|
||||
#[test]
|
||||
fn test_bad_address() {
|
||||
let d1 = NodeInfo::new(
|
||||
KeyPair::new().pubkey(),
|
||||
"0.0.0.0:1234".parse().unwrap(),
|
||||
"0.0.0.0:1235".parse().unwrap(),
|
||||
"0.0.0.0:1236".parse().unwrap(),
|
||||
"0.0.0.0:1237".parse().unwrap(),
|
||||
"0.0.0.0:1238".parse().unwrap(),
|
||||
);
|
||||
assert_matches!(
|
||||
Crdt::new(d1).err(),
|
||||
Some(Error::CrdtError(CrdtError::BadGossipAddress))
|
||||
);
|
||||
let d1_1 = NodeInfo::new(
|
||||
KeyPair::new().pubkey(),
|
||||
"0.0.0.1:1234".parse().unwrap(),
|
||||
"0.0.0.0:1235".parse().unwrap(),
|
||||
"0.0.0.0:1236".parse().unwrap(),
|
||||
"0.0.0.0:1237".parse().unwrap(),
|
||||
"0.0.0.0:1238".parse().unwrap(),
|
||||
);
|
||||
assert_matches!(
|
||||
Crdt::new(d1_1).err(),
|
||||
Some(Error::CrdtError(CrdtError::BadContactInfo))
|
||||
);
|
||||
let d2 = NodeInfo::new(
|
||||
KeyPair::new().pubkey(),
|
||||
"0.0.0.1:0".parse().unwrap(),
|
||||
"0.0.0.1:0".parse().unwrap(),
|
||||
"0.0.0.1:0".parse().unwrap(),
|
||||
"0.0.0.1:0".parse().unwrap(),
|
||||
"0.0.0.1:0".parse().unwrap(),
|
||||
);
|
||||
assert_matches!(
|
||||
Crdt::new(d2).err(),
|
||||
Some(Error::CrdtError(CrdtError::BadGossipAddress))
|
||||
);
|
||||
let d2_1 = NodeInfo::new(
|
||||
KeyPair::new().pubkey(),
|
||||
"0.0.0.1:1234".parse().unwrap(),
|
||||
"0.0.0.1:0".parse().unwrap(),
|
||||
"0.0.0.1:0".parse().unwrap(),
|
||||
"0.0.0.1:0".parse().unwrap(),
|
||||
"0.0.0.1:0".parse().unwrap(),
|
||||
);
|
||||
assert_matches!(
|
||||
Crdt::new(d2_1).err(),
|
||||
Some(Error::CrdtError(CrdtError::BadContactInfo))
|
||||
);
|
||||
let d3 = NodeInfo::new_unspecified();
|
||||
assert_matches!(
|
||||
Crdt::new(d3).err(),
|
||||
Some(Error::CrdtError(CrdtError::BadGossipAddress))
|
||||
);
|
||||
let d4 = NodeInfo::new_multicast();
|
||||
assert_matches!(
|
||||
Crdt::new(d4).err(),
|
||||
Some(Error::CrdtError(CrdtError::BadGossipAddress))
|
||||
);
|
||||
let mut d5 = NodeInfo::new_multicast();
|
||||
d5.version = 1;
|
||||
assert_matches!(
|
||||
Crdt::new(d5).err(),
|
||||
Some(Error::CrdtError(CrdtError::BadNodeInfo))
|
||||
);
|
||||
let d6 = NodeInfo::new(
|
||||
KeyPair::new().pubkey(),
|
||||
"0.0.0.0:1234".parse().unwrap(),
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
);
|
||||
assert_matches!(
|
||||
Crdt::new(d6).err(),
|
||||
Some(Error::CrdtError(CrdtError::BadGossipAddress))
|
||||
);
|
||||
let d7 = NodeInfo::new(
|
||||
KeyPair::new().pubkey(),
|
||||
"0.0.0.1:0".parse().unwrap(),
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
);
|
||||
assert_matches!(
|
||||
Crdt::new(d7).err(),
|
||||
Some(Error::CrdtError(CrdtError::BadGossipAddress))
|
||||
);
|
||||
let d8 = NodeInfo::new(
|
||||
KeyPair::new().pubkey(),
|
||||
"0.0.0.1:1234".parse().unwrap(),
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
);
|
||||
assert_eq!(Crdt::new(d8).is_ok(), true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn insert_test() {
|
||||
let mut d = NodeInfo::new(
|
||||
@@ -1214,7 +1373,7 @@ mod tests {
|
||||
"127.0.0.1:1238".parse().unwrap(),
|
||||
);
|
||||
assert_eq!(d.version, 0);
|
||||
let mut crdt = Crdt::new(d.clone());
|
||||
let mut crdt = Crdt::new(d.clone()).unwrap();
|
||||
assert_eq!(crdt.table[&d.id].version, 0);
|
||||
d.version = 2;
|
||||
crdt.insert(&d);
|
||||
@@ -1227,7 +1386,7 @@ mod tests {
|
||||
fn test_new_vote() {
|
||||
let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
|
||||
assert_eq!(d.version, 0);
|
||||
let mut crdt = Crdt::new(d.clone());
|
||||
let mut crdt = Crdt::new(d.clone()).unwrap();
|
||||
assert_eq!(crdt.table[&d.id].version, 0);
|
||||
let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap());
|
||||
assert_ne!(d.id, leader.id);
|
||||
@@ -1254,7 +1413,7 @@ mod tests {
|
||||
fn test_insert_vote() {
|
||||
let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
|
||||
assert_eq!(d.version, 0);
|
||||
let mut crdt = Crdt::new(d.clone());
|
||||
let mut crdt = Crdt::new(d.clone()).unwrap();
|
||||
assert_eq!(crdt.table[&d.id].version, 0);
|
||||
let vote_same_version = Vote {
|
||||
version: d.version,
|
||||
@@ -1286,7 +1445,7 @@ mod tests {
|
||||
// TODO: remove this test once leaders vote
|
||||
let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
|
||||
assert_eq!(d.version, 0);
|
||||
let mut crdt = Crdt::new(d.clone());
|
||||
let mut crdt = Crdt::new(d.clone()).unwrap();
|
||||
let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap());
|
||||
assert_ne!(d.id, leader.id);
|
||||
crdt.insert(&leader);
|
||||
@@ -1355,7 +1514,7 @@ mod tests {
|
||||
"127.0.0.1:1237".parse().unwrap(),
|
||||
"127.0.0.1:1238".parse().unwrap(),
|
||||
);
|
||||
let mut crdt = Crdt::new(d1.clone());
|
||||
let mut crdt = Crdt::new(d1.clone()).expect("Crdt::new");
|
||||
let (key, ix, ups) = crdt.get_updates_since(0);
|
||||
assert_eq!(key, d1.id);
|
||||
assert_eq!(ix, 1);
|
||||
@@ -1376,7 +1535,7 @@ mod tests {
|
||||
sorted(&ups),
|
||||
sorted(&vec![d1.clone(), d2.clone(), d3.clone()])
|
||||
);
|
||||
let mut crdt2 = Crdt::new(d2.clone());
|
||||
let mut crdt2 = Crdt::new(d2.clone()).expect("Crdt::new");
|
||||
crdt2.apply_updates(key, ix, &ups, &vec![]);
|
||||
assert_eq!(crdt2.table.values().len(), 3);
|
||||
assert_eq!(
|
||||
@@ -1398,9 +1557,9 @@ mod tests {
|
||||
"127.0.0.1:1237".parse().unwrap(),
|
||||
"127.0.0.1:1238".parse().unwrap(),
|
||||
);
|
||||
let mut crdt = Crdt::new(me.clone());
|
||||
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
|
||||
let rv = crdt.window_index_request(0);
|
||||
assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
|
||||
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers)));
|
||||
let nxt = NodeInfo::new(
|
||||
KeyPair::new().pubkey(),
|
||||
"127.0.0.1:1234".parse().unwrap(),
|
||||
@@ -1411,7 +1570,7 @@ mod tests {
|
||||
);
|
||||
crdt.insert(&nxt);
|
||||
let rv = crdt.window_index_request(0);
|
||||
assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
|
||||
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers)));
|
||||
let nxt = NodeInfo::new(
|
||||
KeyPair::new().pubkey(),
|
||||
"127.0.0.2:1234".parse().unwrap(),
|
||||
@@ -1449,6 +1608,30 @@ mod tests {
|
||||
assert!(one && two);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gossip_request_bad_addr() {
|
||||
let me = NodeInfo::new(
|
||||
KeyPair::new().pubkey(),
|
||||
"127.0.0.1:127".parse().unwrap(),
|
||||
"127.0.0.1:127".parse().unwrap(),
|
||||
"127.0.0.1:127".parse().unwrap(),
|
||||
"127.0.0.1:127".parse().unwrap(),
|
||||
"127.0.0.1:127".parse().unwrap(),
|
||||
);
|
||||
|
||||
let mut crdt = Crdt::new(me).expect("Crdt::new");
|
||||
let nxt1 = NodeInfo::new_unspecified();
|
||||
// Filter out unspecified addresses
|
||||
crdt.insert(&nxt1); //<--- attack!
|
||||
let rv = crdt.gossip_request();
|
||||
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers)));
|
||||
let nxt2 = NodeInfo::new_multicast();
|
||||
// Filter out multicast addresses
|
||||
crdt.insert(&nxt2); //<--- attack!
|
||||
let rv = crdt.gossip_request();
|
||||
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers)));
|
||||
}
|
||||
|
||||
/// test that gossip requests are eventually generated for all nodes
|
||||
#[test]
|
||||
fn gossip_request() {
|
||||
@@ -1460,9 +1643,9 @@ mod tests {
|
||||
"127.0.0.1:1237".parse().unwrap(),
|
||||
"127.0.0.1:1238".parse().unwrap(),
|
||||
);
|
||||
let mut crdt = Crdt::new(me.clone());
|
||||
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
|
||||
let rv = crdt.gossip_request();
|
||||
assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
|
||||
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers)));
|
||||
let nxt1 = NodeInfo::new(
|
||||
KeyPair::new().pubkey(),
|
||||
"127.0.0.2:1234".parse().unwrap(),
|
||||
@@ -1519,7 +1702,7 @@ mod tests {
|
||||
fn purge_test() {
|
||||
logger::setup();
|
||||
let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
|
||||
let mut crdt = Crdt::new(me.clone());
|
||||
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
|
||||
let nxt = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap());
|
||||
assert_ne!(me.id, nxt.id);
|
||||
crdt.set_leader(me.id);
|
||||
@@ -1630,7 +1813,7 @@ mod tests {
|
||||
let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
|
||||
let leader0 = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
|
||||
let leader1 = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
|
||||
let mut crdt = Crdt::new(me.clone());
|
||||
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
|
||||
assert_eq!(crdt.top_leader(), None);
|
||||
crdt.set_leader(leader0.id);
|
||||
assert_eq!(crdt.top_leader().unwrap(), leader0.id);
|
||||
|
@@ -265,7 +265,7 @@ mod tests {
|
||||
const TPS_BATCH: i64 = 5_000_000;
|
||||
|
||||
logger::setup();
|
||||
let leader = TestNode::new();
|
||||
let leader = TestNode::new_localhost();
|
||||
|
||||
let alice = Mint::new(10_000_000);
|
||||
let bank = Bank::new(&alice);
|
||||
|
@@ -195,7 +195,7 @@ impl FullNode {
|
||||
let bank = Arc::new(bank);
|
||||
let mut thread_hdls = vec![];
|
||||
let rpu = Rpu::new(
|
||||
&bank.clone(),
|
||||
&bank,
|
||||
node.sockets.requests,
|
||||
node.sockets.respond,
|
||||
exit.clone(),
|
||||
@@ -203,20 +203,20 @@ impl FullNode {
|
||||
thread_hdls.extend(rpu.thread_hdls());
|
||||
|
||||
let blob_recycler = BlobRecycler::default();
|
||||
let crdt = Arc::new(RwLock::new(Crdt::new(node.data)));
|
||||
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
|
||||
let (tpu, blob_receiver) = Tpu::new(
|
||||
&bank.clone(),
|
||||
&crdt.clone(),
|
||||
&bank,
|
||||
&crdt,
|
||||
tick_duration,
|
||||
node.sockets.transaction,
|
||||
&blob_recycler.clone(),
|
||||
&blob_recycler,
|
||||
exit.clone(),
|
||||
writer,
|
||||
);
|
||||
thread_hdls.extend(tpu.thread_hdls());
|
||||
let window = FullNode::new_window(ledger_tail, entry_height, &crdt, &blob_recycler);
|
||||
let ncp = Ncp::new(
|
||||
&crdt.clone(),
|
||||
&crdt,
|
||||
window.clone(),
|
||||
node.sockets.gossip,
|
||||
node.sockets.gossip_send,
|
||||
@@ -278,14 +278,14 @@ impl FullNode {
|
||||
let bank = Arc::new(bank);
|
||||
let mut thread_hdls = vec![];
|
||||
let rpu = Rpu::new(
|
||||
&bank.clone(),
|
||||
&bank,
|
||||
node.sockets.requests,
|
||||
node.sockets.respond,
|
||||
exit.clone(),
|
||||
);
|
||||
thread_hdls.extend(rpu.thread_hdls());
|
||||
|
||||
let crdt = Arc::new(RwLock::new(Crdt::new(node.data)));
|
||||
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
|
||||
crdt.write()
|
||||
.expect("'crdt' write lock before insert() in pub fn replicate")
|
||||
.insert(&entry_point);
|
||||
@@ -295,7 +295,7 @@ impl FullNode {
|
||||
let window = FullNode::new_window(ledger_tail, entry_height, &crdt, &blob_recycler);
|
||||
|
||||
let ncp = Ncp::new(
|
||||
&crdt.clone(),
|
||||
&crdt,
|
||||
window.clone(),
|
||||
node.sockets.gossip,
|
||||
node.sockets.gossip_send,
|
||||
@@ -304,7 +304,7 @@ impl FullNode {
|
||||
|
||||
let tvu = Tvu::new(
|
||||
keypair,
|
||||
bank.clone(),
|
||||
&bank,
|
||||
entry_height,
|
||||
crdt.clone(),
|
||||
window.clone(),
|
||||
@@ -318,8 +318,12 @@ impl FullNode {
|
||||
FullNode { exit, thread_hdls }
|
||||
}
|
||||
|
||||
pub fn close(self) -> Result<()> {
|
||||
//used for notifying many nodes in parallel to exit
|
||||
pub fn exit(&self) {
|
||||
self.exit.store(true, Ordering::Relaxed);
|
||||
}
|
||||
pub fn close(self) -> Result<()> {
|
||||
self.exit();
|
||||
self.join()
|
||||
}
|
||||
}
|
||||
@@ -343,18 +347,39 @@ mod tests {
|
||||
use crdt::TestNode;
|
||||
use fullnode::FullNode;
|
||||
use mint::Mint;
|
||||
use service::Service;
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
#[test]
|
||||
fn validator_exit() {
|
||||
let kp = KeyPair::new();
|
||||
let tn = TestNode::new_with_pubkey(kp.pubkey());
|
||||
let tn = TestNode::new_localhost_with_pubkey(kp.pubkey());
|
||||
let alice = Mint::new(10_000);
|
||||
let bank = Bank::new(&alice);
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let entry = tn.data.clone();
|
||||
let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit);
|
||||
v.close().unwrap();
|
||||
v.exit();
|
||||
v.join().unwrap();
|
||||
}
|
||||
#[test]
|
||||
fn validator_parallel_exit() {
|
||||
let vals: Vec<FullNode> = (0..2)
|
||||
.map(|_| {
|
||||
let kp = KeyPair::new();
|
||||
let tn = TestNode::new_localhost_with_pubkey(kp.pubkey());
|
||||
let alice = Mint::new(10_000);
|
||||
let bank = Bank::new(&alice);
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let entry = tn.data.clone();
|
||||
FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit)
|
||||
})
|
||||
.collect();
|
||||
//each validator can exit in parallel to speed many sequential calls to `join`
|
||||
vals.iter().for_each(|v| v.exit());
|
||||
//while join is called sequentially, the above exit call notified all the
|
||||
//validators to exit from all their threads
|
||||
vals.into_iter().for_each(|v| v.join().unwrap());
|
||||
}
|
||||
}
|
||||
|
@@ -184,6 +184,50 @@ pub fn flush() {
|
||||
agent.flush();
|
||||
}
|
||||
|
||||
/// Hook the panic handler to generate a data point on each panic
|
||||
pub fn set_panic_hook(program: &'static str) {
|
||||
use std::panic;
|
||||
use std::sync::{Once, ONCE_INIT};
|
||||
static SET_HOOK: Once = ONCE_INIT;
|
||||
SET_HOOK.call_once(|| {
|
||||
let default_hook = panic::take_hook();
|
||||
panic::set_hook(Box::new(move |ono| {
|
||||
default_hook(ono);
|
||||
submit(
|
||||
influxdb::Point::new("panic")
|
||||
.add_tag("program", influxdb::Value::String(program.to_string()))
|
||||
.add_tag(
|
||||
"thread",
|
||||
influxdb::Value::String(
|
||||
thread::current().name().unwrap_or("?").to_string(),
|
||||
),
|
||||
)
|
||||
// The 'one' field exists to give Kapacitor Alerts a numerical value
|
||||
// to filter on
|
||||
.add_field("one", influxdb::Value::Integer(1))
|
||||
.add_field(
|
||||
"message",
|
||||
influxdb::Value::String(
|
||||
// TODO: use ono.message() when it becomes stable
|
||||
ono.to_string(),
|
||||
),
|
||||
)
|
||||
.add_field(
|
||||
"location",
|
||||
influxdb::Value::String(match ono.location() {
|
||||
Some(location) => location.to_string(),
|
||||
None => "?".to_string(),
|
||||
}),
|
||||
)
|
||||
.to_owned(),
|
||||
);
|
||||
// Flush metrics immediately in case the process exits immediately
|
||||
// upon return
|
||||
flush();
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
8
src/nat.rs
Normal file → Executable file
8
src/nat.rs
Normal file → Executable file
@@ -106,9 +106,13 @@ pub fn udp_public_bind(label: &str, startport: u16, endport: u16) -> UdpSocketPa
|
||||
Err(_) => {
|
||||
let sender = udp_random_bind(startport, endport, 5).unwrap();
|
||||
let local_addr = sender.local_addr().unwrap();
|
||||
info!("Using local address {} for {}", local_addr, label);
|
||||
|
||||
let pub_ip = get_public_ip_addr().unwrap();
|
||||
let pub_addr = SocketAddr::new(pub_ip, local_addr.port());
|
||||
|
||||
info!("Using source address {} for {}", pub_addr, label);
|
||||
UdpSocketPair {
|
||||
addr: private_addr,
|
||||
addr: pub_addr,
|
||||
receiver: sender.try_clone().unwrap(),
|
||||
sender,
|
||||
}
|
||||
|
@@ -88,12 +88,12 @@ mod tests {
|
||||
// test that stage will exit when flag is set
|
||||
fn test_exit() {
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let tn = TestNode::new();
|
||||
let crdt = Crdt::new(tn.data.clone());
|
||||
let tn = TestNode::new_localhost();
|
||||
let crdt = Crdt::new(tn.data.clone()).expect("Crdt::new");
|
||||
let c = Arc::new(RwLock::new(crdt));
|
||||
let w = Arc::new(RwLock::new(vec![]));
|
||||
let d = Ncp::new(
|
||||
&c.clone(),
|
||||
&c,
|
||||
w,
|
||||
tn.sockets.gossip,
|
||||
tn.sockets.gossip_send,
|
||||
|
@@ -19,7 +19,6 @@ pub type SharedBlobs = VecDeque<SharedBlob>;
|
||||
pub type PacketRecycler = Recycler<Packets>;
|
||||
pub type BlobRecycler = Recycler<Blob>;
|
||||
|
||||
const LOG_RATE: usize = 10;
|
||||
pub const NUM_PACKETS: usize = 1024 * 8;
|
||||
pub const BLOB_SIZE: usize = 64 * 1024;
|
||||
pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_HEADER_SIZE;
|
||||
@@ -188,7 +187,6 @@ impl<T: Default> Recycler<T> {
|
||||
|
||||
impl Packets {
|
||||
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
|
||||
static mut COUNTER: Counter = create_counter!("packets", LOG_RATE);
|
||||
self.packets.resize(NUM_PACKETS, Packet::default());
|
||||
let mut i = 0;
|
||||
//DOCUMENTED SIDE-EFFECT
|
||||
@@ -203,7 +201,7 @@ impl Packets {
|
||||
trace!("receiving on {}", socket.local_addr().unwrap());
|
||||
match socket.recv_from(&mut p.data) {
|
||||
Err(_) if i > 0 => {
|
||||
inc_counter!(COUNTER, i);
|
||||
inc_new_counter!("packets-recv_count", 1);
|
||||
debug!("got {:?} messages on {}", i, socket.local_addr().unwrap());
|
||||
break;
|
||||
}
|
||||
@@ -423,7 +421,7 @@ impl Blob {
|
||||
let p = r.read().expect("'r' read lock in pub fn send_to");
|
||||
let a = p.meta.addr();
|
||||
if let Err(e) = socket.send_to(&p.data[..p.meta.size], &a) {
|
||||
info!(
|
||||
warn!(
|
||||
"error sending {} byte packet to {:?}: {:?}",
|
||||
p.meta.size, a, e
|
||||
);
|
||||
|
@@ -27,7 +27,6 @@ pub struct ReplicateStage {
|
||||
}
|
||||
|
||||
const VOTE_TIMEOUT_MS: u64 = 1000;
|
||||
const LOG_RATE: usize = 10;
|
||||
|
||||
impl ReplicateStage {
|
||||
/// Process entry blobs, already in order
|
||||
@@ -48,11 +47,13 @@ impl ReplicateStage {
|
||||
}
|
||||
let blobs_len = blobs.len();
|
||||
let entries = ledger::reconstruct_entries_from_blobs(blobs.clone())?;
|
||||
let votes = entries_to_votes(&entries);
|
||||
|
||||
static mut COUNTER_REPLICATE: Counter = create_counter!("replicate-transactions", LOG_RATE);
|
||||
inc_counter!(
|
||||
COUNTER_REPLICATE,
|
||||
{
|
||||
let votes = entries_to_votes(&entries);
|
||||
let mut wcrdt = crdt.write().unwrap();
|
||||
wcrdt.insert_votes(&votes);
|
||||
};
|
||||
inc_new_counter!(
|
||||
"replicate-transactions",
|
||||
entries.iter().map(|x| x.transactions.len()).sum()
|
||||
);
|
||||
let res = bank.process_entries(entries);
|
||||
@@ -66,7 +67,6 @@ impl ReplicateStage {
|
||||
let shared_blob = blob_recycler.allocate();
|
||||
let (vote, addr) = {
|
||||
let mut wcrdt = crdt.write().unwrap();
|
||||
wcrdt.insert_votes(&votes);
|
||||
//TODO: doesn't seem like there is a synchronous call to get height and id
|
||||
info!("replicate_stage {} {:?}", height, &last_id[..8]);
|
||||
wcrdt.new_vote(height, last_id)
|
||||
@@ -80,7 +80,9 @@ impl ReplicateStage {
|
||||
blob.meta.set_addr(&addr);
|
||||
blob.meta.size = len;
|
||||
}
|
||||
inc_new_counter!("replicate-vote_sent", 1);
|
||||
*last_vote = now;
|
||||
|
||||
vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?;
|
||||
}
|
||||
while let Some(blob) = blobs.pop_front() {
|
||||
|
@@ -77,7 +77,6 @@ fn batch_size(batches: &[SharedPackets]) -> usize {
|
||||
#[cfg(not(feature = "cuda"))]
|
||||
pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
|
||||
use rayon::prelude::*;
|
||||
static mut COUNTER: Counter = create_counter!("ed25519_verify", 1);
|
||||
let count = batch_size(batches);
|
||||
info!("CPU ECDSA for {}", batch_size(batches));
|
||||
let rv = batches
|
||||
@@ -91,7 +90,7 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
|
||||
.collect()
|
||||
})
|
||||
.collect();
|
||||
inc_counter!(COUNTER, count);
|
||||
inc_new_counter!("ed25519_verify", count);
|
||||
rv
|
||||
}
|
||||
|
||||
@@ -109,7 +108,6 @@ pub fn init() {
|
||||
#[cfg(feature = "cuda")]
|
||||
pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
|
||||
use packet::PACKET_DATA_SIZE;
|
||||
static mut COUNTER: Counter = create_counter!("ed25519_verify_cuda", 1);
|
||||
let count = batch_size(batches);
|
||||
info!("CUDA ECDSA for {}", batch_size(batches));
|
||||
let mut out = Vec::new();
|
||||
@@ -169,7 +167,7 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
|
||||
num += 1;
|
||||
}
|
||||
}
|
||||
inc_counter!(COUNTER, count);
|
||||
inc_new_counter!("ed25519_verify", count);
|
||||
rvs
|
||||
}
|
||||
|
||||
|
@@ -78,7 +78,7 @@ impl SigVerifyStage {
|
||||
verified_sender: Arc<Mutex<Sender<VerifiedPackets>>>,
|
||||
) -> JoinHandle<()> {
|
||||
spawn(move || loop {
|
||||
if let Err(e) = Self::verifier(&packet_receiver.clone(), &verified_sender.clone()) {
|
||||
if let Err(e) = Self::verifier(&packet_receiver, &verified_sender) {
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
|
@@ -18,7 +18,6 @@ use std::sync::{Arc, RwLock};
|
||||
use std::thread::{Builder, JoinHandle};
|
||||
use std::time::Duration;
|
||||
|
||||
const LOG_RATE: usize = 10;
|
||||
pub const WINDOW_SIZE: u64 = 2 * 1024;
|
||||
pub type PacketReceiver = Receiver<SharedPackets>;
|
||||
pub type PacketSender = Sender<SharedPackets>;
|
||||
@@ -117,7 +116,7 @@ pub fn responder(
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
_ => error!("{} responder error: {:?}", name, e),
|
||||
_ => warn!("{} responder error: {:?}", name, e),
|
||||
}
|
||||
}
|
||||
})
|
||||
@@ -224,9 +223,7 @@ fn repair_window(
|
||||
let reqs = find_next_missing(locked_window, crdt, consumed, received)?;
|
||||
trace!("{:x}: repair_window missing: {}", debug_id, reqs.len());
|
||||
if !reqs.is_empty() {
|
||||
static mut COUNTER_REPAIR: Counter =
|
||||
create_counter!("streamer-repair_window-repair", LOG_RATE);
|
||||
inc_counter!(COUNTER_REPAIR, reqs.len());
|
||||
inc_new_counter!("streamer-repair_window-repair", reqs.len());
|
||||
debug!(
|
||||
"{:x}: repair_window counter times: {} consumed: {} received: {} missing: {}",
|
||||
debug_id,
|
||||
@@ -301,9 +298,7 @@ fn retransmit_all_leader_blocks(
|
||||
*received,
|
||||
retransmit_queue.len(),
|
||||
);
|
||||
static mut COUNTER_RETRANSMIT: Counter =
|
||||
create_counter!("streamer-recv_window-retransmit", LOG_RATE);
|
||||
inc_counter!(COUNTER_RETRANSMIT, retransmit_queue.len());
|
||||
inc_new_counter!("streamer-recv_window-retransmit", retransmit_queue.len());
|
||||
retransmit.send(retransmit_queue)?;
|
||||
}
|
||||
Ok(())
|
||||
@@ -413,8 +408,7 @@ fn recv_window(
|
||||
while let Ok(mut nq) = r.try_recv() {
|
||||
dq.append(&mut nq)
|
||||
}
|
||||
static mut COUNTER_RECV: Counter = create_counter!("streamer-recv_window-recv", LOG_RATE);
|
||||
inc_counter!(COUNTER_RECV, dq.len());
|
||||
inc_new_counter!("streamer-recv_window-recv", dq.len());
|
||||
debug!(
|
||||
"{:x}: RECV_WINDOW {} {}: got packets {}",
|
||||
debug_id,
|
||||
@@ -480,9 +474,7 @@ fn recv_window(
|
||||
consume_queue.len(),
|
||||
);
|
||||
trace!("sending consume_queue.len: {}", consume_queue.len());
|
||||
static mut COUNTER_CONSUME: Counter =
|
||||
create_counter!("streamer-recv_window-consume", LOG_RATE);
|
||||
inc_counter!(COUNTER_CONSUME, consume_queue.len());
|
||||
inc_new_counter!("streamer-recv_window-consume", consume_queue.len());
|
||||
s.send(consume_queue)?;
|
||||
}
|
||||
Ok(())
|
||||
@@ -591,7 +583,10 @@ pub fn window(
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
_ => error!("window error: {:?}", e),
|
||||
_ => {
|
||||
inc_new_counter!("streamer-window-error", 1, 1);
|
||||
error!("window error: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
let _ = repair_window(
|
||||
@@ -647,9 +642,7 @@ fn broadcast(
|
||||
// Index the blobs
|
||||
Crdt::index_blobs(&me, &blobs, receive_index)?;
|
||||
// keep the cache of blobs that are broadcast
|
||||
static mut COUNTER_BROADCAST: Counter =
|
||||
create_counter!("streamer-broadcast-sent", LOG_RATE);
|
||||
inc_counter!(COUNTER_BROADCAST, blobs.len());
|
||||
inc_new_counter!("streamer-broadcast-sent", blobs.len());
|
||||
{
|
||||
let mut win = window.write().unwrap();
|
||||
assert!(blobs.len() <= win.len());
|
||||
@@ -738,8 +731,11 @@ pub fn broadcaster(
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
Error::CrdtError(CrdtError::TooSmall) => (), // TODO: Why are the unit-tests throwing hundreds of these?
|
||||
_ => error!("broadcaster error: {:?}", e),
|
||||
Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
|
||||
_ => {
|
||||
inc_new_counter!("streamer-broadcaster-error", 1, 1);
|
||||
error!("broadcaster error: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -792,7 +788,10 @@ pub fn retransmitter(
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
_ => error!("retransmitter error: {:?}", e),
|
||||
_ => {
|
||||
inc_new_counter!("streamer-retransmit-error", 1, 1);
|
||||
error!("retransmitter error: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -899,9 +898,9 @@ mod test {
|
||||
#[test]
|
||||
pub fn window_send_test() {
|
||||
logger::setup();
|
||||
let tn = TestNode::new();
|
||||
let tn = TestNode::new_localhost();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let mut crdt_me = Crdt::new(tn.data.clone());
|
||||
let mut crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new");
|
||||
let me_id = crdt_me.my_data().id;
|
||||
crdt_me.set_leader(me_id);
|
||||
let subs = Arc::new(RwLock::new(crdt_me));
|
||||
|
@@ -274,7 +274,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_thin_client() {
|
||||
logger::setup();
|
||||
let leader = TestNode::new();
|
||||
let leader = TestNode::new_localhost();
|
||||
let leader_data = leader.data.clone();
|
||||
|
||||
let alice = Mint::new(10_000);
|
||||
@@ -317,7 +317,7 @@ mod tests {
|
||||
#[ignore]
|
||||
fn test_bad_sig() {
|
||||
logger::setup();
|
||||
let leader = TestNode::new();
|
||||
let leader = TestNode::new_localhost();
|
||||
let alice = Mint::new(10_000);
|
||||
let bank = Bank::new(&alice);
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
@@ -371,7 +371,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_client_check_signature() {
|
||||
logger::setup();
|
||||
let leader = TestNode::new();
|
||||
let leader = TestNode::new_localhost();
|
||||
let alice = Mint::new(10_000);
|
||||
let bank = Bank::new(&alice);
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
|
@@ -63,7 +63,7 @@ impl Tpu {
|
||||
let packet_recycler = PacketRecycler::default();
|
||||
|
||||
let (fetch_stage, packet_receiver) =
|
||||
FetchStage::new(transactions_socket, exit, &packet_recycler.clone());
|
||||
FetchStage::new(transactions_socket, exit, &packet_recycler);
|
||||
|
||||
let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver);
|
||||
|
||||
|
31
src/tvu.rs
31
src/tvu.rs
@@ -70,7 +70,7 @@ impl Tvu {
|
||||
/// * `exit` - The exit signal.
|
||||
pub fn new(
|
||||
keypair: KeyPair,
|
||||
bank: Arc<Bank>,
|
||||
bank: &Arc<Bank>,
|
||||
entry_height: u64,
|
||||
crdt: Arc<RwLock<Crdt>>,
|
||||
window: Window,
|
||||
@@ -83,22 +83,27 @@ impl Tvu {
|
||||
let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket(
|
||||
vec![replicate_socket, repair_socket],
|
||||
exit,
|
||||
&blob_recycler.clone(),
|
||||
&blob_recycler,
|
||||
);
|
||||
//TODO
|
||||
//the packets coming out of blob_receiver need to be sent to the GPU and verified
|
||||
//then sent to the window, which does the erasure coding reconstruction
|
||||
let (window_stage, blob_window_receiver) = WindowStage::new(
|
||||
&crdt.clone(),
|
||||
&crdt,
|
||||
window,
|
||||
entry_height,
|
||||
retransmit_socket,
|
||||
&blob_recycler.clone(),
|
||||
&blob_recycler,
|
||||
blob_fetch_receiver,
|
||||
);
|
||||
|
||||
let replicate_stage =
|
||||
ReplicateStage::new(keypair, bank, crdt, blob_recycler, blob_window_receiver);
|
||||
let replicate_stage = ReplicateStage::new(
|
||||
keypair,
|
||||
bank.clone(),
|
||||
crdt,
|
||||
blob_recycler,
|
||||
blob_window_receiver,
|
||||
);
|
||||
|
||||
Tvu {
|
||||
replicate_stage,
|
||||
@@ -168,21 +173,21 @@ pub mod tests {
|
||||
#[test]
|
||||
fn test_replicate() {
|
||||
logger::setup();
|
||||
let leader = TestNode::new();
|
||||
let leader = TestNode::new_localhost();
|
||||
let target1_kp = KeyPair::new();
|
||||
let target1 = TestNode::new_with_pubkey(target1_kp.pubkey());
|
||||
let target2 = TestNode::new();
|
||||
let target1 = TestNode::new_localhost_with_pubkey(target1_kp.pubkey());
|
||||
let target2 = TestNode::new_localhost();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
|
||||
//start crdt_leader
|
||||
let mut crdt_l = Crdt::new(leader.data.clone());
|
||||
let mut crdt_l = Crdt::new(leader.data.clone()).expect("Crdt::new");
|
||||
crdt_l.set_leader(leader.data.id);
|
||||
|
||||
let cref_l = Arc::new(RwLock::new(crdt_l));
|
||||
let dr_l = new_ncp(cref_l, leader.sockets.gossip, exit.clone()).unwrap();
|
||||
|
||||
//start crdt2
|
||||
let mut crdt2 = Crdt::new(target2.data.clone());
|
||||
let mut crdt2 = Crdt::new(target2.data.clone()).expect("Crdt::new");
|
||||
crdt2.insert(&leader.data);
|
||||
crdt2.set_leader(leader.data.id);
|
||||
let leader_id = leader.data.id;
|
||||
@@ -217,7 +222,7 @@ pub mod tests {
|
||||
let bank = Arc::new(Bank::new(&mint));
|
||||
|
||||
//start crdt1
|
||||
let mut crdt1 = Crdt::new(target1.data.clone());
|
||||
let mut crdt1 = Crdt::new(target1.data.clone()).expect("Crdt::new");
|
||||
crdt1.insert(&leader.data);
|
||||
crdt1.set_leader(leader.data.id);
|
||||
let cref1 = Arc::new(RwLock::new(crdt1));
|
||||
@@ -225,7 +230,7 @@ pub mod tests {
|
||||
|
||||
let tvu = Tvu::new(
|
||||
target1_kp,
|
||||
bank.clone(),
|
||||
&bank,
|
||||
0,
|
||||
cref1,
|
||||
dr_1.1,
|
||||
|
@@ -3,6 +3,7 @@
|
||||
//! stdout, and then sends the Entry to its output channel.
|
||||
|
||||
use bank::Bank;
|
||||
use counter::Counter;
|
||||
use crdt::Crdt;
|
||||
use entry::Entry;
|
||||
use entry_writer::EntryWriter;
|
||||
@@ -12,6 +13,7 @@ use result::{Error, Result};
|
||||
use service::Service;
|
||||
use std::collections::VecDeque;
|
||||
use std::io::Write;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{self, Builder, JoinHandle};
|
||||
@@ -41,6 +43,8 @@ impl WriteStage {
|
||||
let mut blobs = VecDeque::new();
|
||||
entries.to_blobs(blob_recycler, &mut blobs);
|
||||
if !blobs.is_empty() {
|
||||
inc_new_counter!("write_stage-broadcast_vote-count", votes.len());
|
||||
inc_new_counter!("write_stage-broadcast_blobs-count", blobs.len());
|
||||
trace!("broadcasting {}", blobs.len());
|
||||
blob_sender.send(blobs)?;
|
||||
}
|
||||
@@ -71,7 +75,10 @@ impl WriteStage {
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
_ => error!("{:?}", e),
|
||||
_ => {
|
||||
inc_new_counter!("write_stage-error", 1);
|
||||
error!("{:?}", e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@@ -16,8 +16,8 @@ use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
||||
fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, Ncp, UdpSocket) {
|
||||
let tn = TestNode::new();
|
||||
let crdt = Crdt::new(tn.data.clone());
|
||||
let tn = TestNode::new_localhost();
|
||||
let crdt = Crdt::new(tn.data.clone()).expect("Crdt::new");
|
||||
let c = Arc::new(RwLock::new(crdt));
|
||||
let w = Arc::new(RwLock::new(vec![]));
|
||||
let d = Ncp::new(
|
||||
|
@@ -11,6 +11,7 @@ use solana::fullnode::{FullNode, LedgerFile};
|
||||
use solana::logger;
|
||||
use solana::mint::Mint;
|
||||
use solana::ncp::Ncp;
|
||||
use solana::service::Service;
|
||||
use solana::signature::{KeyPair, KeyPairUtil, PublicKey};
|
||||
use solana::streamer::default_window;
|
||||
use solana::thin_client::ThinClient;
|
||||
@@ -24,18 +25,18 @@ use std::time::Duration;
|
||||
fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
|
||||
//lets spy on the network
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let mut spy = TestNode::new();
|
||||
let mut spy = TestNode::new_localhost();
|
||||
let daddr = "0.0.0.0:0".parse().unwrap();
|
||||
let me = spy.data.id.clone();
|
||||
spy.data.contact_info.tvu = daddr;
|
||||
spy.data.contact_info.rpu = daddr;
|
||||
let mut spy_crdt = Crdt::new(spy.data);
|
||||
let mut spy_crdt = Crdt::new(spy.data).expect("Crdt::new");
|
||||
spy_crdt.insert(&leader);
|
||||
spy_crdt.set_leader(leader.id);
|
||||
let spy_ref = Arc::new(RwLock::new(spy_crdt));
|
||||
let spy_window = default_window();
|
||||
let ncp = Ncp::new(
|
||||
&spy_ref.clone(),
|
||||
&spy_ref,
|
||||
spy_window,
|
||||
spy.sockets.gossip,
|
||||
spy.sockets.gossip_send,
|
||||
@@ -86,7 +87,7 @@ fn test_multi_node_validator_catchup_from_zero() {
|
||||
logger::setup();
|
||||
const N: usize = 5;
|
||||
trace!("test_multi_node_validator_catchup_from_zero");
|
||||
let leader = TestNode::new();
|
||||
let leader = TestNode::new_localhost();
|
||||
let leader_data = leader.data.clone();
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
|
||||
@@ -101,7 +102,7 @@ fn test_multi_node_validator_catchup_from_zero() {
|
||||
let mut nodes = vec![server];
|
||||
for _ in 0..N {
|
||||
let keypair = KeyPair::new();
|
||||
let validator = TestNode::new_with_pubkey(keypair.pubkey());
|
||||
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
|
||||
let mut val = FullNode::new(
|
||||
validator,
|
||||
false,
|
||||
@@ -135,7 +136,7 @@ fn test_multi_node_validator_catchup_from_zero() {
|
||||
success = 0;
|
||||
// start up another validator, converge and then check everyone's balances
|
||||
let keypair = KeyPair::new();
|
||||
let validator = TestNode::new_with_pubkey(keypair.pubkey());
|
||||
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
|
||||
let val = FullNode::new(
|
||||
validator,
|
||||
false,
|
||||
@@ -186,7 +187,7 @@ fn test_multi_node_basic() {
|
||||
logger::setup();
|
||||
const N: usize = 5;
|
||||
trace!("test_multi_node_basic");
|
||||
let leader = TestNode::new();
|
||||
let leader = TestNode::new_localhost();
|
||||
let leader_data = leader.data.clone();
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
let (alice, ledger_path) = genesis(10_000);
|
||||
@@ -200,7 +201,7 @@ fn test_multi_node_basic() {
|
||||
let mut nodes = vec![server];
|
||||
for _ in 0..N {
|
||||
let keypair = KeyPair::new();
|
||||
let validator = TestNode::new_with_pubkey(keypair.pubkey());
|
||||
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
|
||||
let val = FullNode::new(
|
||||
validator,
|
||||
false,
|
||||
@@ -239,7 +240,7 @@ fn test_multi_node_basic() {
|
||||
#[test]
|
||||
fn test_boot_validator_from_file() {
|
||||
logger::setup();
|
||||
let leader = TestNode::new();
|
||||
let leader = TestNode::new_localhost();
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
let (alice, ledger_path) = genesis(100_000);
|
||||
let leader_data = leader.data.clone();
|
||||
@@ -258,7 +259,7 @@ fn test_boot_validator_from_file() {
|
||||
assert_eq!(leader_balance, 1000);
|
||||
|
||||
let keypair = KeyPair::new();
|
||||
let validator = TestNode::new_with_pubkey(keypair.pubkey());
|
||||
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
|
||||
let validator_data = validator.data.clone();
|
||||
let val_fullnode = FullNode::new(
|
||||
validator,
|
||||
@@ -277,7 +278,7 @@ fn test_boot_validator_from_file() {
|
||||
}
|
||||
|
||||
fn create_leader(ledger_path: &str) -> (NodeInfo, FullNode) {
|
||||
let leader = TestNode::new();
|
||||
let leader = TestNode::new_localhost();
|
||||
let leader_data = leader.data.clone();
|
||||
let leader_fullnode = FullNode::new(
|
||||
leader,
|
||||
@@ -328,7 +329,7 @@ fn test_leader_restart_validator_start_from_old_ledger() {
|
||||
|
||||
// start validator from old ledger
|
||||
let keypair = KeyPair::new();
|
||||
let validator = TestNode::new_with_pubkey(keypair.pubkey());
|
||||
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
|
||||
let validator_data = validator.data.clone();
|
||||
let val_fullnode = FullNode::new(
|
||||
validator,
|
||||
@@ -369,7 +370,7 @@ fn test_leader_restart_validator_start_from_old_ledger() {
|
||||
fn test_multi_node_dynamic_network() {
|
||||
logger::setup();
|
||||
const N: usize = 60;
|
||||
let leader = TestNode::new();
|
||||
let leader = TestNode::new_localhost();
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
let (alice, ledger_path) = genesis(100_000);
|
||||
let leader_data = leader.data.clone();
|
||||
@@ -392,7 +393,7 @@ fn test_multi_node_dynamic_network() {
|
||||
.into_iter()
|
||||
.map(|n| {
|
||||
let keypair = KeyPair::new();
|
||||
let validator = TestNode::new_with_pubkey(keypair.pubkey());
|
||||
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
|
||||
let rd = validator.data.clone();
|
||||
//send some tokens to the new validator
|
||||
let bal =
|
||||
@@ -410,6 +411,7 @@ fn test_multi_node_dynamic_network() {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut consecutive_success = 0;
|
||||
for i in 0..N {
|
||||
//verify leader can do transfer
|
||||
let expected = ((i + 3) * 500) as i64;
|
||||
@@ -452,13 +454,25 @@ fn test_multi_node_dynamic_network() {
|
||||
validators.len(),
|
||||
distance
|
||||
);
|
||||
//assert_eq!(success, validators.len());
|
||||
if success == validators.len() && distance == 0 {
|
||||
consecutive_success += 1;
|
||||
} else {
|
||||
consecutive_success = 0;
|
||||
}
|
||||
if consecutive_success == 10 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
for (_, node) in validators {
|
||||
node.close().unwrap();
|
||||
assert_eq!(consecutive_success, 10);
|
||||
for (_, node) in &validators {
|
||||
node.exit();
|
||||
}
|
||||
server.close().unwrap();
|
||||
server.exit();
|
||||
for (_, node) in validators {
|
||||
node.join().unwrap();
|
||||
}
|
||||
server.join().unwrap();
|
||||
|
||||
std::fs::remove_file(ledger_path).unwrap();
|
||||
}
|
||||
|
Reference in New Issue
Block a user