Compare commits

..

64 Commits

Author SHA1 Message Date
Michael Vines
85d6627ee6 Deploy in one ssh login in a further attempt to avoid hitting GCP login quota 2018-07-17 20:45:52 -07:00
Michael Vines
611a005ec9 Avoid |wait| as it masks failures 2018-07-17 19:52:39 -07:00
Michael Vines
90b3b90391 -p 2018-07-17 19:42:00 -07:00
Michael Vines
fd4f294fd3 Rotate logs at 16MB 2018-07-17 19:42:00 -07:00
Michael Vines
145274c001 Ensure log directories are go+r 2018-07-17 18:16:40 -07:00
Michael Vines
df5d6693f6 Don't cache leader.json to make it easier to switch between nets 2018-07-17 18:16:40 -07:00
Anatoly Yakovenko
05c5603879 error counter 2018-07-17 17:28:23 -07:00
Anatoly Yakovenko
c2c48a5c3c write stage broadcast counters 2018-07-17 17:28:23 -07:00
pgarg66
4af556f70e Added tests for bad gossip address (#672) 2018-07-17 16:27:46 -07:00
anatoly yakovenko
8bad411962 env variable for default metrics rate that gets set for counters (#670)
* env variable for default metrics rate that gets set for counters

* ignore if env rate is set to 0

* use a slow rate by default

* fixed test
2018-07-17 15:26:10 -07:00
Michael Vines
5b0418793e Keep Snap fullnode/drone logs out of syslog, we're too spammy 2018-07-17 15:08:35 -07:00
pgarg66
4423ee6902 Renamed start_nodes.sh to remote_nodes.sh (#669) 2018-07-17 15:01:53 -07:00
pgarg66
f0c39cc84d Remote multinode scripts cleanup (#666)
- Also added support for stop nodes
2018-07-17 13:48:25 -07:00
Anatoly Yakovenko
3d45b04da8 review comments 2018-07-17 15:51:32 -04:00
Anatoly Yakovenko
9e2f26a5d2 review comments 2018-07-17 15:51:32 -04:00
Anatoly Yakovenko
a016f6e82e bulds 2018-07-17 15:51:32 -04:00
Anatoly Yakovenko
eb3e5fd204 server too 2018-07-17 15:51:32 -04:00
Anatoly Yakovenko
72282dc493 fast exit dynamic test 2018-07-17 15:51:32 -04:00
Michael Vines
47a22c66b4 Include program name in panic metric 2018-07-17 12:13:22 -07:00
Michael Vines
fb11d8a909 Install panic hook 2018-07-17 12:13:22 -07:00
Michael Vines
7d872f52f4 Add set_panic_hook 2018-07-17 12:13:22 -07:00
Michael Vines
d882bfe65c Ignore/log RequestWindowIndex from self 2018-07-17 12:12:54 -07:00
pgarg66
103584ef27 Use public IP for client gossip, if UPnP fails (#665) 2018-07-17 11:23:32 -07:00
anatoly yakovenko
1fb537deb9 Do not generate gossip requests to unspecified addresses (#657)
* Do not generate gossip requests to unspecified addresses

* review comments
2018-07-17 09:44:48 -07:00
Michael Vines
2bd48b4207 Display better deploy logs 2018-07-17 09:10:55 -07:00
Michael Vines
f5a6db3dc0 Add daemon plugs 2018-07-17 08:24:37 -07:00
anatoly yakovenko
dd0c1ac5b2 Error counters for streamer (#658)
* error counters for streamer

* more counters
2018-07-17 08:20:35 -07:00
anatoly yakovenko
d8c9655128 Dynamic test assert (#643)
* log responder error to warn

* log responder error to warn

* fixup!

* fixed assert

* fixed bad ports issue

* comments

* test for dummy address in Crdt::new instaad of NodeInfo::new

* return error if ContactInfo supplied to Crdt::new cannot be used to connect to network

* comments
2018-07-16 19:31:52 -07:00
anatoly yakovenko
09f2d273c5 less intrusive counters (#655)
* less intrusive counters

* fixed arg

* tests

* comments
2018-07-16 18:33:50 -07:00
Michael Vines
f6eb85e7a3 Permit Snap RUST_LOG to be overridden 2018-07-16 17:44:54 -07:00
pgarg66
0d85b43901 Fix input parameter processing for client num nodes (#653) 2018-07-16 17:23:35 -07:00
Michael Vines
fdf94a77b4 CUDA is now configurable 2018-07-16 16:23:45 -07:00
pgarg66
af40ab0c04 Split start_nodes script ssh commands to individual scripts (#642) 2018-07-16 16:21:32 -07:00
anatoly yakovenko
015b7a1ddb dash for namespaces (#649) 2018-07-16 15:55:54 -07:00
Anatoly Yakovenko
ab3e460e64 insert votes as they are observed 2018-07-16 13:39:20 -07:00
Michael Vines
194a84c8dd Add testnet-sanity.sh 2018-07-16 12:17:39 -07:00
Michael Vines
51d932dad1 Connect validators to the right leader 2018-07-16 11:05:01 -07:00
Michael Vines
561d31cc13 Add support for master.testnet.s.c 2018-07-16 10:08:58 -07:00
Michael Vines
d6a8e437bb Temporarily disable erasure 2018-07-16 08:15:47 -07:00
Anatoly Yakovenko
4631af5011 counters for vote not found 2018-07-15 20:31:23 -06:00
Michael Vines
5d28729b2a Use ed25519_init() for faster failures 2018-07-15 20:30:32 -06:00
Pankaj Garg
8c08e614b7 Start validator nodes in parallel
- This speeds up overall network startup time
2018-07-15 19:11:52 -06:00
Michael Vines
e76bf1438b A validator and leader running from the same workspace no longer share an identity 2018-07-15 13:34:48 -07:00
Michael Vines
4e177877c9 Add more error checking, better logging, avoid hitting GCP login quota 2018-07-15 09:27:25 -07:00
Michael Vines
60848b9d95 Testnet sanity test failures will now turn the build red 2018-07-14 21:27:27 -07:00
Michael Vines
79b3564a26 Log metrics params to stderr
Keep stdout clean for the actual program.  This is a specific concern for the
wallet command, where there exists tests that capture stdout from the wallet to
confirm transactions.
2018-07-14 21:24:22 -07:00
Michael Vines
1e8c36c555 Be less noisy 2018-07-14 20:42:00 -07:00
Michael Vines
94d015b089 Demote log level 2018-07-14 20:42:00 -07:00
Michael Vines
cfb3736372 Update buildkite-snap.yml 2018-07-14 17:55:03 -07:00
Michael Vines
2b77f62233 Poll longer while waiting for an airdrop 2018-07-14 17:10:44 -07:00
Michael Vines
e8d23c17ca timeout++ 2018-07-14 15:51:32 -07:00
Michael Vines
a7ed2a304a Add CUDA libraries 2018-07-14 15:27:24 -07:00
Michael Vines
0025b42c26 Locate perf libs 2018-07-14 10:24:20 -07:00
Michael Vines
3f7f492cc0 Fix snap client-demo fixes 2018-07-14 00:18:54 -07:00
Michael Vines
490d7875dd Snap client-demo fixes 2018-07-13 23:51:33 -07:00
Michael Vines
4240edf710 solana.client-demo now runs client.sh for the bash extras 2018-07-13 22:57:38 -07:00
Michael Vines
30e50d0f70 Log airdrop amount and client public key 2018-07-13 22:41:52 -07:00
Michael Vines
751c1eba32 Run wallet-sanity against the new testnet 2018-07-13 22:21:41 -07:00
Michael Vines
d349d6aa98 USE_SNAP=1 is now supported 2018-07-13 22:21:41 -07:00
Michael Vines
1f9152dc72 Detect and report airdrop failures 2018-07-13 18:08:28 -07:00
Michael Vines
1b9d50172b Correct log message 2018-07-13 18:08:28 -07:00
Michael Vines
084dbd7f58 Fail gracefully when leader.json is missing 2018-07-13 17:24:25 -07:00
Rob Walker
58c0508f94 add drone information to multinode demo instructions 2018-07-13 17:16:55 -07:00
Michael Vines
dcf82c024f Surface hidden call to solana-keygen 2018-07-13 16:16:46 -07:00
44 changed files with 1070 additions and 436 deletions

View File

@@ -47,7 +47,7 @@ $ source $HOME/.cargo/env
Now checkout the code from github:
```bash
$ git clone https://github.com/solana-labs/solana.git
$ git clone https://github.com/solana-labs/solana.git
$ cd solana
```
@@ -84,17 +84,24 @@ Now start the server:
$ ./multinode-demo/leader.sh
```
To run a performance-enhanced fullnode on Linux,
[CUDA 9.2](https://developer.nvidia.com/cuda-downloads) must be installed on
your system:
```bash
$ ./fetch-perf-libs.sh
$ SOLANA_CUDA=1 ./multinode-demo/leader.sh
```
Wait a few seconds for the server to initialize. It will print "Ready." when it's ready to
receive transactions.
Drone
---
In order for the below test client and validators to work, we'll also need to
spin up a drone to give out some test tokens. The drone delivers Milton
Friedman-style "air drops" (free tokens to requesting clients) to be used in
test transactions.
Start the drone on the leader node with:
```bash
$ ./multinode-demo/drone.sh
```
Multinode Testnet
---
@@ -104,15 +111,18 @@ To run a multinode testnet, after starting a leader node, spin up some validator
$ ./multinode-demo/validator.sh ubuntu@10.0.1.51:~/solana 10.0.1.51
```
To run a performance-enhanced fullnode on Linux,
To run a performance-enhanced leader or validator (on Linux),
[CUDA 9.2](https://developer.nvidia.com/cuda-downloads) must be installed on
your system:
```bash
$ ./fetch-perf-libs.sh
$ SOLANA_CUDA=1 ./multinode-demo/leader.sh ubuntu@10.0.1.51:~/solana 10.0.1.51
$ SOLANA_CUDA=1 ./multinode-demo/leader.sh
$ SOLANA_CUDA=1 ./multinode-demo/validator.sh ubuntu@10.0.1.51:~/solana 10.0.1.51
```
Testnet Client Demo
---

View File

@@ -1,4 +1,4 @@
steps:
- command: "ci/snap.sh"
timeout_in_minutes: 20
timeout_in_minutes: 40
name: "snap [public]"

View File

@@ -2,6 +2,6 @@ FROM snapcraft/xenial-amd64
# Update snapcraft to latest version
RUN apt-get update -qq \
&& apt-get install -y snapcraft \
&& apt-get install -y snapcraft daemontools \
&& rm -rf /var/lib/apt/lists/* \
&& snapcraft --version

View File

@@ -1,80 +0,0 @@
#!/bin/bash
#
# Refreshes the Solana software running on the Testnet full nodes
#
# This script must be run by a user/machine that has successfully authenticated
# with GCP and has sufficient permission.
#
if [[ -z $SOLANA_METRICS_CONFIG ]]; then
echo Error: SOLANA_METRICS_CONFIG environment variable is unset
exit 1
fi
# Default to --edge channel. To select the beta channel:
# export SOLANA_SNAP_CHANNEL=--beta
if [[ -z $SOLANA_SNAP_CHANNEL ]]; then
SOLANA_SNAP_CHANNEL=--edge
fi
vmlist=(testnet-solana-com:us-west1-b) # Leader is hard coded as the first entry
echo "--- Available validators"
gcloud compute instances list --filter="labels.testnet-mode=validator"
while read -r vmName vmZone status; do
if [[ $status != RUNNING ]]; then
echo "Warning: $vmName is not RUNNING, ignoring it."
continue
fi
vmlist+=("$vmName:$vmZone")
done < <(gcloud compute instances list --filter="labels.testnet-mode=validator" --format 'value(name,zone,status)')
echo "--- Refreshing"
leader=true
for info in "${vmlist[@]}"; do
vmName=${info%:*}
vmZone=${info#*:}
echo "Starting refresh for $vmName"
(
echo "--- Processing $vmName in zone $vmZone"
if $leader; then
nodeConfig="mode=leader+drone enable-cuda=1 metrics-config=$SOLANA_METRICS_CONFIG"
else
nodeConfig="mode=validator metrics-config=$SOLANA_METRICS_CONFIG"
fi
cat > "autogen-refresh-$vmName.sh" <<EOF
set -x
sudo snap remove solana
sudo snap install solana $SOLANA_SNAP_CHANNEL --devmode
sudo snap set solana $nodeConfig
snap info solana
sudo snap logs solana -n200
EOF
set -x
gcloud compute scp --zone "$vmZone" "autogen-refresh-$vmName.sh" "$vmName":
gcloud compute ssh "$vmName" --zone "$vmZone" \
--ssh-flag="-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -t" \
--command="bash ./autogen-refresh-$vmName.sh"
) > "log-$vmName.txt" 2>&1 &
if $leader; then
echo Waiting for leader...
# Wait for the leader to initialize before starting the validators
# TODO: Remove this limitation eventually.
wait
fi
leader=false
done
echo Waiting for validators...
wait
for info in "${vmlist[@]}"; do
vmName=${info%:*}
cat "log-$vmName.txt"
done
echo "--- done"
exit 0

View File

@@ -37,6 +37,12 @@ fi
set -x
echo --- checking for multilog
if [[ ! -x /usr/bin/multilog ]]; then
echo "multilog not found, install with: sudo apt-get install -y daemontools"
exit 1
fi
echo --- build
snapcraft

135
ci/testnet-deploy.sh Executable file
View File

@@ -0,0 +1,135 @@
#!/bin/bash -e
#
# Deploys the Solana software running on the testnet full nodes
#
# This script must be run by a user/machine that has successfully authenticated
# with GCP and has sufficient permission.
#
cd "$(dirname "$0")/.."
if [[ -z $SOLANA_METRICS_CONFIG ]]; then
echo Error: SOLANA_METRICS_CONFIG environment variable is unset
exit 1
fi
# Default to edge channel. To select the beta channel:
# export SOLANA_SNAP_CHANNEL=beta
if [[ -z $SOLANA_SNAP_CHANNEL ]]; then
SOLANA_SNAP_CHANNEL=edge
fi
case $SOLANA_SNAP_CHANNEL in
edge)
publicUrl=master.testnet.solana.com
publicIp=$(dig +short $publicUrl | head -n1)
;;
beta)
publicUrl=testnet.solana.com
publicIp=# # Use default value
;;
*)
echo Error: Unknown SOLANA_SNAP_CHANNEL=$SOLANA_SNAP_CHANNEL
exit 1
;;
esac
resourcePrefix=${publicUrl//./-}
vmlist=("$resourcePrefix":us-west1-b) # Leader is hard coded as the first entry
validatorNamePrefix=$resourcePrefix-validator-
echo "--- Available validators for $publicUrl"
filter="name~^$validatorNamePrefix"
gcloud compute instances list --filter="$filter"
while read -r vmName vmZone status; do
if [[ $status != RUNNING ]]; then
echo "Warning: $vmName is not RUNNING, ignoring it."
continue
fi
vmlist+=("$vmName:$vmZone")
done < <(gcloud compute instances list --filter="$filter" --format 'value(name,zone,status)')
wait_for_node() {
declare pid=$1
declare ok=true
wait "$pid" || ok=false
cat "log-$pid.txt"
if ! $ok; then
echo ^^^ +++
exit 1
fi
}
echo "--- Refreshing leader for $publicUrl"
leader=true
pids=()
count=1
for info in "${vmlist[@]}"; do
nodePosition="($count/${#vmlist[*]})"
vmName=${info%:*}
vmZone=${info#*:}
echo "Starting refresh for $vmName $nodePosition"
(
SECONDS=0
echo "--- $vmName in zone $vmZone $nodePosition"
if $leader; then
nodeConfig="mode=leader+drone metrics-config=$SOLANA_METRICS_CONFIG"
if [[ -n $SOLANA_CUDA ]]; then
nodeConfig="$nodeConfig enable-cuda=1"
fi
else
nodeConfig="mode=validator metrics-config=$SOLANA_METRICS_CONFIG leader-address=$publicIp"
fi
set -x
gcloud compute ssh "$vmName" --zone "$vmZone" \
--ssh-flag="-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -t" \
--command="\
set -ex; \
logmarker='solana deploy $(date)/$RANDOM'; \
sudo snap remove solana; \
logger \$logmarker; \
sudo snap install solana --$SOLANA_SNAP_CHANNEL --devmode; \
sudo snap set solana $nodeConfig; \
snap info solana; \
echo Slight delay to get more syslog output; \
sleep 2; \
sudo grep -Pzo \"\$logmarker(.|\\n)*\" /var/log/syslog \
"
echo "Succeeded in ${SECONDS} seconds"
) > "log-$vmName.txt" 2>&1 &
pid=$!
# Rename log file so it can be discovered later by $pid
mv "log-$vmName.txt" "log-$pid.txt"
if $leader; then
echo Waiting for leader...
# Wait for the leader to initialize before starting the validators
# TODO: Remove this limitation eventually.
wait_for_node "$pid"
echo "--- Refreshing validators"
else
# Slow down deployment to ~20 machines a minute to avoid triggering GCP login
# quota limits (each |ssh| counts as a login)
sleep 3
pids+=("$pid")
fi
leader=false
count=$((count + 1))
done
echo --- Waiting for validators
for pid in "${pids[@]}"; do
wait_for_node "$pid"
done
echo "--- $publicUrl sanity test"
USE_SNAP=1 ci/testnet-sanity.sh $publicUrl
exit 0

17
ci/testnet-sanity.sh Executable file
View File

@@ -0,0 +1,17 @@
#!/bin/bash -e
#
# Perform a quick sanity test on the specific testnet
#
cd "$(dirname "$0")/.."
TESTNET=$1
if [[ -z $TESTNET ]]; then
TESTNET=testnet.solana.com
fi
echo "--- $TESTNET: wallet sanity"
multinode-demo/test/wallet-sanity.sh $TESTNET
echo --- fin
exit 0

View File

@@ -7,25 +7,27 @@ here=$(dirname "$0")
# shellcheck source=multinode-demo/common.sh
source "$here"/common.sh
leader=${1:-${here}/..} # Default to local solana repo
leader=$1
if [[ -z $leader ]]; then
if [[ -d "$SNAP" ]]; then
leader=testnet.solana.com # Default to testnet when running as a Snap
else
leader=$here/.. # Default to local solana repo
fi
fi
count=${2:-1}
rsync_leader_url=$(rsync_url "$leader")
set -ex
mkdir -p "$SOLANA_CONFIG_CLIENT_DIR"
if [[ ! -r "$SOLANA_CONFIG_CLIENT_DIR"/leader.json ]]; then
(
set -x
$rsync -vPz "$rsync_leader_url"/config/leader.json "$SOLANA_CONFIG_CLIENT_DIR"/
)
fi
$rsync -vPz "$rsync_leader_url"/config/leader.json "$SOLANA_CONFIG_CLIENT_DIR"/
client_json="$SOLANA_CONFIG_CLIENT_DIR"/client.json
if [[ ! -r $client_json ]]; then
$solana_keygen -o "$client_json"
fi
[[ -r $client_json ]] || $solana_keygen -o "$client_json"
$solana_client_demo \
-n "$count" \
-l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json \
-k "$SOLANA_CONFIG_CLIENT_DIR"/client.json \
# shellcheck disable=SC2086 # $solana_client_demo should not be quoted
exec $solana_client_demo \
-n "$count" -l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json -k "$SOLANA_CONFIG_CLIENT_DIR"/client.json

View File

@@ -3,21 +3,40 @@
# Disable complaints about unused variables in this file:
# shellcheck disable=2034
# shellcheck disable=2154 # 'here' is referenced but not assigned
if [[ -z $here ]]; then
echo "|here| is not defined"
exit 1
fi
rsync=rsync
leader_logger="cat"
validator_logger="cat"
drone_logger="cat"
if [[ -d "$SNAP" ]]; then # Running inside a Linux Snap?
solana_program() {
declare program="$1"
if [[ "$program" = wallet ]]; then
# TODO: Merge wallet.sh functionality into solana-wallet proper and
# remove this special case
if [[ "$program" = wallet || "$program" = client-demo ]]; then
# TODO: Merge wallet.sh/client.sh functionality into
# solana-wallet/solana-demo-client proper and remove this special case
printf "%s/bin/solana-%s" "$SNAP" "$program"
else
printf "%s/command-%s.wrapper" "$SNAP" "$program"
fi
}
rsync="$SNAP"/bin/rsync
multilog="$SNAP/bin/multilog t s16777215"
leader_logger="$multilog $SNAP_DATA/leader"
validator_logger="$multilog t $SNAP_DATA/validator"
drone_logger="$multilog $SNAP_DATA/drone"
# Create log directories manually to prevent multilog from creating them as
# 0700
mkdir -p "$SNAP_DATA"/{drone,leader,validator}
SOLANA_METRICS_CONFIG="$(snapctl get metrics-config)"
SOLANA_CUDA="$(snapctl get enable-cuda)"
RUST_LOG="$(snapctl get rust-log)"
elif [[ -n "$USE_SNAP" ]]; then # Use the Linux Snap binaries
solana_program() {
@@ -37,13 +56,18 @@ else
declare features=""
if [[ "$program" =~ ^(.*)-cuda$ ]]; then
program=${BASH_REMATCH[1]}
features="--features=cuda,erasure"
features="--features=cuda"
fi
if [[ -z "$DEBUG" ]]; then
maybe_release=--release
fi
printf "cargo run $maybe_release --bin solana-%s %s -- " "$program" "$features"
}
if [[ -n $SOLANA_CUDA ]]; then
# Locate perf libs downloaded by |./fetch-perf-libs.sh|
LD_LIBRARY_PATH=$(cd "$here" && dirname "$PWD"):$LD_LIBRARY_PATH
export LD_LIBRARY_PATH
fi
fi
solana_client_demo=$(solana_program client-demo)
@@ -73,29 +97,29 @@ configure_metrics() {
for param in "${metrics_params[@]}"; do
IFS='=' read -r -a pair <<< "$param"
if [[ "${#pair[@]}" != 2 ]]; then
echo Error: invalid metrics parameter: "$param"
echo Error: invalid metrics parameter: "$param" >&2
else
declare name="${pair[0]}"
declare value="${pair[1]}"
case "$name" in
host)
export INFLUX_HOST="$value"
echo INFLUX_HOST="$INFLUX_HOST"
echo INFLUX_HOST="$INFLUX_HOST" >&2
;;
db)
export INFLUX_DATABASE="$value"
echo INFLUX_DATABASE="$INFLUX_DATABASE"
echo INFLUX_DATABASE="$INFLUX_DATABASE" >&2
;;
u)
export INFLUX_USERNAME="$value"
echo INFLUX_USERNAME="$INFLUX_USERNAME"
echo INFLUX_USERNAME="$INFLUX_USERNAME" >&2
;;
p)
export INFLUX_PASSWORD="$value"
echo INFLUX_PASSWORD="********"
echo INFLUX_PASSWORD="********" >&2
;;
*)
echo Error: Unknown metrics parameter name: "$name"
echo Error: Unknown metrics parameter name: "$name" >&2
;;
esac
fi
@@ -105,16 +129,18 @@ configure_metrics
tune_networking() {
# Reference: https://medium.com/@CameronSparr/increase-os-udp-buffers-to-improve-performance-51d167bb1360
[[ $(uname) = Linux ]] && (
set -x
# test the existence of the sysctls before trying to set them
sysctl net.core.rmem_max 2>/dev/null 1>/dev/null &&
sudo sysctl -w net.core.rmem_max=26214400 1>/dev/null 2>/dev/null
if [[ $(uname) = Linux ]]; then
(
set -x +e
# test the existence of the sysctls before trying to set them
# go ahead and return true and don't exit if these calls fail
sysctl net.core.rmem_max 2>/dev/null 1>/dev/null &&
sudo sysctl -w net.core.rmem_max=26214400 1>/dev/null 2>/dev/null
sysctl net.core.rmem_default 2>/dev/null 1>/dev/null &&
sudo sysctl -w net.core.rmem_default=26214400 1>/dev/null 2>/dev/null
)
return 0
sysctl net.core.rmem_default 2>/dev/null 1>/dev/null &&
sudo sysctl -w net.core.rmem_default=26214400 1>/dev/null 2>/dev/null
) || true
fi
}
SOLANA_CONFIG_DIR=${SNAP_DATA:-$PWD}/config

View File

@@ -36,6 +36,7 @@ set -ex
mkdir -p "$SOLANA_CONFIG_DIR"
$rsync -vPz "$rsync_leader_url"/config/leader.json "$SOLANA_CONFIG_DIR"/
# shellcheck disable=SC2086 # $solana_drone should not be quoted
exec $solana_drone \
-l "$SOLANA_CONFIG_DIR"/leader.json -k "$SOLANA_CONFIG_PRIVATE_DIR"/mint.json
set -o pipefail
$solana_drone \
-l "$SOLANA_CONFIG_DIR"/leader.json -k "$SOLANA_CONFIG_PRIVATE_DIR"/mint.json \
2>&1 | $drone_logger

View File

@@ -25,15 +25,8 @@ fi
tune_networking
# migrate from old ledger format? why not...
if [[ ! -f "$SOLANA_CONFIG_DIR"/ledger.log &&
-f "$SOLANA_CONFIG_DIR"/genesis.log ]]; then
(shopt -s nullglob &&
cat "$SOLANA_CONFIG_DIR"/genesis.log \
"$SOLANA_CONFIG_DIR"/tx-*.log) > "$SOLANA_CONFIG_DIR"/ledger.log
fi
# shellcheck disable=SC2086 # $program should not be quoted
exec $program \
set -xo pipefail
$program \
--identity "$SOLANA_CONFIG_DIR"/leader.json \
--ledger "$SOLANA_CONFIG_DIR"/ledger.log
--ledger "$SOLANA_CONFIG_DIR"/ledger.log \
2>&1 | $leader_logger

14
multinode-demo/remote_leader.sh Executable file
View File

@@ -0,0 +1,14 @@
#!/bin/bash -e
[[ -n $FORCE ]] || exit
chmod 600 ~/.ssh/authorized_keys ~/.ssh/id_rsa
PATH="$HOME"/.cargo/bin:"$PATH"
./fetch-perf-libs.sh
# Run setup
USE_INSTALL=1 ./multinode-demo/setup.sh -p
USE_INSTALL=1 SOLANA_CUDA=1 ./multinode-demo/leader.sh >leader.log 2>&1 &
USE_INSTALL=1 ./multinode-demo/drone.sh >drone.log 2>&1 &

186
multinode-demo/remote_nodes.sh Executable file
View File

@@ -0,0 +1,186 @@
#!/bin/bash
command=$1
ip_addr_file=
remote_user=
ssh_keys=
shift
usage() {
exitcode=0
if [[ -n "$1" ]]; then
exitcode=1
echo "Error: $*"
fi
cat <<EOF
usage: $0 <start|stop> <-f IP Addr Array file> <-u username> [-k ssh-keys]
Manage a GCE multinode network
start|stop - Create or delete the network
-f file - A bash script that exports an array of IP addresses, ip_addr_array.
Elements of the array are public IP address of remote nodes.
-u username - The username for logging into remote nodes.
-k ssh-keys - Path to public/private key pair that remote nodes can use to perform
rsync and ssh among themselves. Must contain pub, and priv keys.
EOF
exit $exitcode
}
while getopts "h?f:u:k:" opt; do
case $opt in
h | \?)
usage
;;
f)
ip_addr_file=$OPTARG
;;
u)
remote_user=$OPTARG
;;
k)
ssh_keys=$OPTARG
;;
*)
usage "Error: unhandled option: $opt"
;;
esac
done
set -e
# Sample IP Address array file contents
# ip_addr_array=(192.168.1.1 192.168.1.5 192.168.2.2)
[[ -n $command ]] || usage "Need a command (start|stop)"
[[ -n $ip_addr_file ]] || usage "Need a file with IP address array"
[[ -n $remote_user ]] || usage "Need the username for remote nodes"
ip_addr_array=()
# Get IP address array
# shellcheck source=/dev/null
source "$ip_addr_file"
build_project() {
echo "Build started at $(date)"
SECONDS=0
# Build and install locally
PATH="$HOME"/.cargo/bin:"$PATH"
cargo install --force
echo "Build took $SECONDS seconds"
}
common_start_setup() {
ip_addr=$1
# Killing sshguard for now. TODO: Find a better solution
# sshguard is blacklisting IP address after ssh-keyscan and ssh login attempts
ssh -n -f "$remote_user@$ip_addr" " \
set -ex; \
sudo service sshguard stop; \
sudo apt-get --assume-yes install rsync libssl-dev; \
mkdir -p ~/.ssh ~/solana ~/.cargo/bin; \
" >log/"$ip_addr".log
# If provided, deploy SSH keys
if [[ -n $ssh_keys ]]; then
{
rsync -vPrz "$ssh_keys"/id_rsa "$remote_user@$ip_addr":~/.ssh/
rsync -vPrz "$ssh_keys"/id_rsa.pub "$remote_user@$ip_addr":~/.ssh/
rsync -vPrz "$ssh_keys"/id_rsa.pub "$remote_user@$ip_addr":~/.ssh/authorized_keys
} >>log/"$ip_addr".log
fi
}
start_leader() {
common_start_setup "$1"
{
rsync -vPrz ~/.cargo/bin/solana* "$remote_user@$ip_addr":~/.cargo/bin/
rsync -vPrz ./multinode-demo "$remote_user@$ip_addr":~/solana/
rsync -vPrz ./fetch-perf-libs.sh "$remote_user@$ip_addr":~/solana/
ssh -n -f "$remote_user@$ip_addr" 'cd solana; FORCE=1 ./multinode-demo/remote_leader.sh'
} >>log/"$1".log
leader_ip=$1
leader_time=$SECONDS
SECONDS=0
}
start_validator() {
common_start_setup "$1"
ssh "$remote_user@$ip_addr" "rsync -vPrz ""$remote_user@$leader_ip"":~/solana/multinode-demo ~/solana/" >>log/"$1".log
ssh -n -f "$remote_user@$ip_addr" "cd solana; FORCE=1 ./multinode-demo/remote_validator.sh $leader_ip" >>log/"$1".log
}
start_all_nodes() {
echo "Deployment started at $(date)"
SECONDS=0
count=0
leader_ip=
leader_time=
mkdir -p log
for ip_addr in "${ip_addr_array[@]}"; do
ssh-keygen -R "$ip_addr" >log/local.log
ssh-keyscan "$ip_addr" >>~/.ssh/known_hosts 2>/dev/null
if ((!count)); then
# Start the leader on the first node
echo "Leader node $ip_addr, killing previous instance and restarting"
start_leader "$ip_addr"
else
# Start validator on all other nodes
echo "Validator[$count] node $ip_addr, killing previous instance and restarting"
start_validator "$ip_addr" &
# TBD: Remove the sleep or reduce time once GCP login quota is increased
sleep 2
fi
((count = count + 1))
done
wait
((validator_count = count - 1))
echo "Deployment finished at $(date)"
echo "Leader deployment too $leader_time seconds"
echo "$validator_count Validator deployment took $SECONDS seconds"
}
stop_all_nodes() {
SECONDS=0
local count=0
for ip_addr in "${ip_addr_array[@]}"; do
echo "Stopping node[$count] $ip_addr. Remote user $remote_user"
ssh -n -f "$remote_user@$ip_addr" " \
set -ex; \
sudo service sshguard stop; \
pkill -9 solana-; \
pkill -9 validator; \
pkill -9 leader; \
"
sleep 2
((count = count + 1))
echo "Stopped node[$count] $ip_addr"
done
echo "Stopping $count nodes took $SECONDS seconds"
}
if [[ $command == "start" ]]; then
#build_project
stop_all_nodes
start_all_nodes
elif [[ $command == "stop" ]]; then
stop_all_nodes
else
usage "Unknown command: $command"
fi

View File

@@ -0,0 +1,16 @@
#!/bin/bash -e
[[ -n $FORCE ]] || exit
chmod 600 ~/.ssh/authorized_keys ~/.ssh/id_rsa
PATH="$HOME"/.cargo/bin:"$PATH"
ssh-keygen -R "$1"
ssh-keyscan "$1" >>~/.ssh/known_hosts 2>/dev/null
rsync -vPrz "$1":~/.cargo/bin/solana* ~/.cargo/bin/
# Run setup
USE_INSTALL=1 ./multinode-demo/setup.sh -p
USE_INSTALL=1 ./multinode-demo/validator.sh "$1":~/solana "$1" >validator.log 2>&1

View File

@@ -71,7 +71,8 @@ done
leader_address_args=("$ip_address_arg")
validator_address_args=("$ip_address_arg" -b 9000)
id_path="$SOLANA_CONFIG_PRIVATE_DIR"/id.json
leader_id_path="$SOLANA_CONFIG_PRIVATE_DIR"/leader-id.json
validator_id_path="$SOLANA_CONFIG_PRIVATE_DIR"/validator-id.json
mint_path="$SOLANA_CONFIG_PRIVATE_DIR"/mint.json
set -e
@@ -83,23 +84,24 @@ mkdir -p "$SOLANA_CONFIG_DIR"
rm -rvf "$SOLANA_CONFIG_PRIVATE_DIR"
mkdir -p "$SOLANA_CONFIG_PRIVATE_DIR"
$solana_keygen -o "$id_path"
$solana_keygen -o "$leader_id_path"
$solana_keygen -o "$validator_id_path"
if $node_type_leader; then
echo "Creating $SOLANA_CONFIG_DIR/mint.json with $num_tokens tokens"
echo "Creating $mint_path with $num_tokens tokens"
$solana_keygen -o "$mint_path"
echo "Creating $SOLANA_CONFIG_DIR/ledger.log"
$solana_genesis --tokens="$num_tokens" < "$mint_path" > "$SOLANA_CONFIG_DIR"/ledger.log
echo "Creating $SOLANA_CONFIG_DIR/leader.json"
$solana_fullnode_config --keypair="$id_path" "${leader_address_args[@]}" > "$SOLANA_CONFIG_DIR"/leader.json
$solana_fullnode_config --keypair="$leader_id_path" "${leader_address_args[@]}" > "$SOLANA_CONFIG_DIR"/leader.json
fi
if $node_type_validator; then
echo "Creating $SOLANA_CONFIG_DIR/validator.json"
$solana_fullnode_config --keypair="$id_path" "${validator_address_args[@]}" > "$SOLANA_CONFIG_DIR"/validator.json
$solana_fullnode_config --keypair="$validator_id_path" "${validator_address_args[@]}" > "$SOLANA_CONFIG_DIR"/validator.json
fi
ls -lh "$SOLANA_CONFIG_DIR"/

View File

@@ -1,107 +0,0 @@
#!/bin/bash
ip_addr_file=$1
remote_user=$2
ssh_keys=$3
usage() {
echo -e "\\tUsage: $0 <IP Address array> <username> [path to ssh keys]\\n"
echo -e "\\t <IP Address array>: A bash script that exports an array of IP addresses, ip_addr_array. Elements of the array are public IP address of remote nodes."
echo -e "\\t <username>: The username for logging into remote nodes."
echo -e "\\t [path to ssh keys]: The public/private key pair that remote nodes can use to perform rsync and ssh among themselves. Must contain pub, priv and authorized_keys.\\n"
exit 1
}
# Sample IP Address array file contents
# ip_addr_array=(192.168.1.1 192.168.1.5 192.168.2.2)
if [[ -z "$ip_addr_file" ]]; then
usage
fi
if [[ -z "$remote_user" ]]; then
usage
fi
echo "Build started at $(date)"
SECONDS=0
# Build and install locally
PATH="$HOME"/.cargo/bin:"$PATH"
cargo install --force
echo "Build took $SECONDS seconds"
ip_addr_array=()
# Get IP address array
# shellcheck source=/dev/null
source "$ip_addr_file"
# shellcheck disable=SC2089,SC2016
ssh_command_prefix='export PATH="$HOME/.cargo/bin:$PATH"; cd solana; USE_INSTALL=1'
echo "Deployment started at $(date)"
SECONDS=0
count=0
leader=
for ip_addr in "${ip_addr_array[@]}"; do
echo "$ip_addr"
ssh-keygen -R "$ip_addr"
ssh-keyscan "$ip_addr" >>~/.ssh/known_hosts
ssh -n -f "$remote_user@$ip_addr" 'mkdir -p ~/.ssh ~/solana ~/.cargo/bin'
# Killing sshguard for now. TODO: Find a better solution
# sshguard is blacklisting IP address after ssh-keyscan and ssh login attempts
ssh -n -f "$remote_user@$ip_addr" "sudo service sshguard stop"
ssh -n -f "$remote_user@$ip_addr" 'sudo apt-get --assume-yes install rsync libssl-dev'
# If provided, deploy SSH keys
if [[ -z $ssh_keys ]]; then
echo "skip copying the ssh keys"
else
rsync -vPrz "$ssh_keys"/id_rsa "$remote_user@$ip_addr":~/.ssh/
rsync -vPrz "$ssh_keys"/id_rsa.pub "$remote_user@$ip_addr":~/.ssh/
rsync -vPrz "$ssh_keys"/id_rsa.pub "$remote_user@$ip_addr":~/.ssh/authorized_keys
ssh -n -f "$remote_user@$ip_addr" 'chmod 600 ~/.ssh/authorized_keys ~/.ssh/id_rsa'
fi
# Stop current nodes
ssh "$remote_user@$ip_addr" 'pkill -9 solana-'
if [[ -n $leader ]]; then
echo "Adding known hosts for $ip_addr"
ssh -n -f "$remote_user@$ip_addr" "ssh-keygen -R $leader"
ssh -n -f "$remote_user@$ip_addr" "ssh-keyscan $leader >> ~/.ssh/known_hosts"
ssh -n -f "$remote_user@$ip_addr" "rsync -vPrz ""$remote_user@$leader"":~/.cargo/bin/solana* ~/.cargo/bin/"
ssh -n -f "$remote_user@$ip_addr" "rsync -vPrz ""$remote_user@$leader"":~/solana/multinode-demo ~/solana/"
ssh -n -f "$remote_user@$ip_addr" "rsync -vPrz ""$remote_user@$leader"":~/solana/fetch-perf-libs.sh ~/solana/"
else
# Deploy build and scripts to remote node
rsync -vPrz ~/.cargo/bin/solana* "$remote_user@$ip_addr":~/.cargo/bin/
rsync -vPrz ./multinode-demo "$remote_user@$ip_addr":~/solana/
rsync -vPrz ./fetch-perf-libs.sh "$remote_user@$ip_addr":~/solana/
fi
# Run setup
ssh "$remote_user@$ip_addr" "$ssh_command_prefix"' ./multinode-demo/setup.sh -p "$ip_addr"'
if ((!count)); then
# Start the leader on the first node
echo "Starting leader node $ip_addr"
ssh -n -f "$remote_user@$ip_addr" 'cd solana; ./fetch-perf-libs.sh'
ssh -n -f "$remote_user@$ip_addr" "$ssh_command_prefix"' SOLANA_CUDA=1 ./multinode-demo/leader.sh > leader.log 2>&1'
ssh -n -f "$remote_user@$ip_addr" "$ssh_command_prefix"' ./multinode-demo/drone.sh > drone.log 2>&1'
leader=${ip_addr_array[0]}
else
# Start validator on all other nodes
echo "Starting validator node $ip_addr"
ssh -n -f "$remote_user@$ip_addr" "$ssh_command_prefix"" ./multinode-demo/validator.sh $remote_user@$leader:~/solana $leader > validator.log 2>&1"
fi
((count++))
done
echo "Deployment finished at $(date)"
echo "Deployment took $SECONDS seconds"

View File

@@ -6,7 +6,13 @@
here=$(dirname "$0")
cd "$here"
wallet="../wallet.sh $1"
if [[ -n "$USE_SNAP" ]]; then
# TODO: Merge wallet.sh functionality into solana-wallet proper and
# remove this USE_SNAP case
wallet="solana.wallet $1"
else
wallet="../wallet.sh $1"
fi
# Tokens transferred to this address are lost forever...
garbage_address=vS3ngn1TfQmpsW1Z4NkLuqNAQFF3dYQw8UZ6TCx9bmq

View File

@@ -65,13 +65,12 @@ fi
rsync_leader_url=$(rsync_url "$leader")
set -ex
tune_networking
SOLANA_LEADER_CONFIG_DIR="$SOLANA_CONFIG_DIR"/leader-config
rm -rf "$SOLANA_LEADER_CONFIG_DIR"
set -ex
$rsync -vPrz "$rsync_leader_url"/config/ "$SOLANA_LEADER_CONFIG_DIR"
ls -lh "$SOLANA_LEADER_CONFIG_DIR"
tune_networking
# migrate from old ledger format? why not...
if [[ ! -f "$SOLANA_LEADER_CONFIG_DIR"/ledger.log &&
@@ -85,12 +84,13 @@ fi
# TODO: Remove this workaround
while ! $solana_wallet \
-l "$SOLANA_LEADER_CONFIG_DIR"/leader.json \
-k "$SOLANA_CONFIG_PRIVATE_DIR"/id.json airdrop --tokens 1; do
-k "$SOLANA_CONFIG_PRIVATE_DIR"/validator-id.json airdrop --tokens 1; do
sleep 1
done
# shellcheck disable=SC2086 # $program should not be quoted
exec $program \
set -o pipefail
$program \
--identity "$SOLANA_CONFIG_DIR"/validator.json \
--testnet "$leader_address:$leader_port" \
--ledger "$SOLANA_LEADER_CONFIG_DIR"/ledger.log
--ledger "$SOLANA_LEADER_CONFIG_DIR"/ledger.log \
2>&1 | $validator_logger

View File

@@ -30,18 +30,16 @@ rsync_leader_url=$(rsync_url "$leader")
set -e
mkdir -p "$SOLANA_CONFIG_CLIENT_DIR"
if [[ ! -r "$SOLANA_CONFIG_CLIENT_DIR"/leader.json ]]; then
(
set -x
$rsync -vPz "$rsync_leader_url"/config/leader.json "$SOLANA_CONFIG_CLIENT_DIR"/
)
echo "Fetching leader configuration from $rsync_leader_url"
$rsync -Pz "$rsync_leader_url"/config/leader.json "$SOLANA_CONFIG_CLIENT_DIR"/
fi
client_id_path="$SOLANA_CONFIG_CLIENT_DIR"/id.json
if [[ ! -r $client_id_path ]]; then
echo "Generating client identity: $client_id_path"
$solana_keygen -o "$client_id_path"
fi
set -x
# shellcheck disable=SC2086 # $solana_wallet should not be quoted
exec $solana_wallet \
-l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json -k "$client_id_path" "$@"

View File

@@ -45,7 +45,9 @@ apps:
plugs:
- home
client-demo:
command: solana-client-demo
# TODO: Merge client.sh functionality into solana-client-demo proper
command: client.sh
#command: solana-client-demo
plugs:
- network
- network-bind
@@ -57,30 +59,43 @@ apps:
plugs:
- network
- home
daemon-validator:
daemon: simple
command: validator.sh
plugs:
- network
- network-bind
daemon-leader:
daemon: simple
command: leader.sh
plugs:
- network
- network-bind
daemon-drone:
daemon: simple
command: drone.sh
plugs:
- network
- network-bind
parts:
solana:
plugin: nil
prime:
- bin
- usr/lib/libgf_complete.so.1
- usr/lib/libJerasure.so.2
- usr/lib
override-build: |
# Install CUDA 9.2 runtime
mkdir -p $SNAPCRAFT_PART_INSTALL/usr/
cp -rav /usr/local/cuda-9.2/targets/x86_64-linux/lib/ $SNAPCRAFT_PART_INSTALL/usr/lib
mkdir -p $SNAPCRAFT_PART_INSTALL/usr/lib/x86_64-linux-gnu/
cp -rav /usr/lib/x86_64-linux-gnu/libcuda.* $SNAPCRAFT_PART_INSTALL/usr/lib/x86_64-linux-gnu/
mkdir -p $SNAPCRAFT_PART_INSTALL/usr/lib/nvidia-396/
cp -v /usr/lib/nvidia-396/libnvidia-fatbinaryloader.so* $SNAPCRAFT_PART_INSTALL/usr/lib/nvidia-396/
# Build/install solana-fullnode-cuda
./fetch-perf-libs.sh
cargo install --features=cuda,erasure --root $SNAPCRAFT_PART_INSTALL --bin solana-fullnode
cargo install --features=cuda --root $SNAPCRAFT_PART_INSTALL --bin solana-fullnode
mv $SNAPCRAFT_PART_INSTALL/bin/solana-fullnode $SNAPCRAFT_PART_INSTALL
rm -rf $SNAPCRAFT_PART_INSTALL/bin/*
mv $SNAPCRAFT_PART_INSTALL/solana-fullnode $SNAPCRAFT_PART_INSTALL/bin/solana-fullnode-cuda
@@ -95,8 +110,9 @@ parts:
mkdir -p $SNAPCRAFT_PART_INSTALL/bin
cp -av multinode-demo/* $SNAPCRAFT_PART_INSTALL/bin/
# TODO: build rsync from source instead of sneaking it in from the host
# TODO: build rsync/multilog from source instead of sneaking it in from the host
# system...
set -x
mkdir -p $SNAPCRAFT_PART_INSTALL/bin
cp -av /usr/bin/rsync $SNAPCRAFT_PART_INSTALL/bin/
cp -av /usr/bin/multilog $SNAPCRAFT_PART_INSTALL/bin/

View File

@@ -6,6 +6,7 @@
extern crate libc;
use chrono::prelude::*;
use counter::Counter;
use entry::Entry;
use hash::Hash;
use itertools::Itertools;
@@ -202,6 +203,11 @@ impl Bank {
{
let option = bals.get_mut(&tx.from);
if option.is_none() {
if let Instruction::NewVote(_) = &tx.instruction {
inc_new_counter!("bank-appy_debits-vote_account_not_found", 1);
} else {
inc_new_counter!("bank-appy_debits-generic_account_not_found", 1);
}
return Err(BankError::AccountNotFound(tx.from));
}
let bal = option.unwrap();

View File

@@ -89,7 +89,6 @@ impl BankingStage {
mms.len(),
);
let count = mms.iter().map(|x| x.1.len()).sum();
static mut COUNTER: Counter = create_counter!("banking_stage_process_packets", 1);
let proc_start = Instant::now();
for (msgs, vers) in mms {
let transactions = Self::deserialize_transactions(&msgs.read().unwrap());
@@ -125,7 +124,7 @@ impl BankingStage {
reqs_len,
(reqs_len as f32) / (total_time_s)
);
inc_counter!(COUNTER, count);
inc_new_counter!("banking_stage-process_packets", count);
Ok(())
}
}

6
src/bin/client-demo.rs Normal file → Executable file
View File

@@ -211,7 +211,7 @@ fn main() {
threads = t.to_string().parse().expect("integer");
}
if let Some(n) = matches.value_of("nodes") {
if let Some(n) = matches.value_of("num_nodes") {
num_nodes = n.to_string().parse().expect("integer");
}
@@ -365,6 +365,8 @@ fn spy_node() -> (NodeInfo, UdpSocket) {
let gossip_socket_pair = udp_public_bind("gossip", 8000, 10000);
let pubkey = KeyPair::new().pubkey();
let daddr = "0.0.0.0:0".parse().unwrap();
assert!(!gossip_socket_pair.addr.ip().is_unspecified());
assert!(!gossip_socket_pair.addr.ip().is_multicast());
let node = NodeInfo::new(
pubkey,
//gossip.local_addr().unwrap(),
@@ -386,7 +388,7 @@ fn converge(
//lets spy on the network
let daddr = "0.0.0.0:0".parse().unwrap();
let (spy, spy_gossip) = spy_node();
let mut spy_crdt = Crdt::new(spy);
let mut spy_crdt = Crdt::new(spy).expect("Crdt::new");
spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt));

View File

@@ -12,6 +12,7 @@ use clap::{App, Arg};
use solana::crdt::NodeInfo;
use solana::drone::{Drone, DroneRequest};
use solana::fullnode::Config;
use solana::metrics::set_panic_hook;
use solana::signature::read_keypair;
use std::fs::File;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
@@ -23,7 +24,8 @@ use tokio_codec::{BytesCodec, Decoder};
fn main() {
env_logger::init();
let matches = App::new("solana-client-demo")
set_panic_hook("drone");
let matches = App::new("drone")
.arg(
Arg::with_name("leader")
.short("l")

View File

@@ -8,6 +8,7 @@ extern crate solana;
use clap::{App, Arg};
use solana::crdt::{NodeInfo, TestNode};
use solana::fullnode::{Config, FullNode, LedgerFile};
use solana::metrics::set_panic_hook;
use solana::service::Service;
use solana::signature::{KeyPair, KeyPairUtil};
use std::fs::File;
@@ -17,6 +18,7 @@ use std::process::exit;
fn main() -> () {
env_logger::init();
set_panic_hook("fullnode");
let matches = App::new("fullnode")
.arg(
Arg::with_name("identity")

View File

@@ -142,7 +142,7 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
let leader: NodeInfo;
if let Some(l) = matches.value_of("leader") {
leader = read_leader(l).node_info;
leader = read_leader(l)?.node_info;
} else {
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
leader = NodeInfo::new_leader(&server_addr);
@@ -249,15 +249,28 @@ fn process_command(
// Request an airdrop from Solana Drone;
// Request amount is set in request_airdrop function
WalletCommand::AirDrop(tokens) => {
println!("Airdrop requested...");
println!("Airdropping {:?} tokens", tokens);
request_airdrop(&config.drone_addr, &config.id, tokens as u64)?;
// TODO: return airdrop Result from Drone
sleep(Duration::from_millis(100));
println!(
"Your balance is: {:?}",
client.poll_get_balance(&config.id.pubkey()).unwrap()
"Requesting airdrop of {:?} tokens from {}",
tokens, config.drone_addr
);
let previous_balance = client.poll_get_balance(&config.id.pubkey())?;
request_airdrop(&config.drone_addr, &config.id, tokens as u64)?;
// TODO: return airdrop Result from Drone instead of polling the
// network
let mut current_balance = previous_balance;
for _ in 0..20 {
sleep(Duration::from_millis(500));
current_balance = client.poll_get_balance(&config.id.pubkey())?;
if previous_balance != current_balance {
break;
}
println!(".");
}
println!("Your balance is: {:?}", current_balance);
if current_balance - previous_balance != tokens {
Err("Airdrop failed!")?;
}
}
// If client has positive balance, spend tokens in {balance} number of transactions
WalletCommand::Pay(tokens, to) => {
@@ -288,9 +301,20 @@ fn display_actions() {
println!();
}
fn read_leader(path: &str) -> Config {
let file = File::open(path.to_string()).unwrap_or_else(|_| panic!("file not found: {}", path));
serde_json::from_reader(file).unwrap_or_else(|_| panic!("failed to parse {}", path))
fn read_leader(path: &str) -> Result<Config, WalletError> {
let file = File::open(path.to_string()).or_else(|err| {
Err(WalletError::BadParameter(format!(
"{}: Unable to open leader file: {}",
err, path
)))
})?;
serde_json::from_reader(file).or_else(|err| {
Err(WalletError::BadParameter(format!(
"{}: Failed to parse leader file: {}",
err, path
)))
})
}
fn mk_client(r: &NodeInfo) -> io::Result<ThinClient> {

View File

@@ -29,7 +29,7 @@ impl<'a, 'b> ChooseRandomPeerStrategy<'a> {
impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> {
fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> {
if options.is_empty() {
Err(CrdtError::TooSmall)?;
Err(CrdtError::NoPeers)?;
}
let n = ((self.random)() as usize) % options.len();
@@ -174,7 +174,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> {
impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> {
fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> {
if options.is_empty() {
Err(CrdtError::TooSmall)?;
Err(CrdtError::NoPeers)?;
}
let mut weighted_peers = vec![];

View File

@@ -1,9 +1,10 @@
use influx_db_client as influxdb;
use metrics;
use std::env;
use std::sync::atomic::{AtomicUsize, Ordering};
use timing;
const INFLUX_RATE: usize = 100;
const DEFAULT_METRICS_RATE: usize = 100;
pub struct Counter {
pub name: &'static str,
@@ -12,7 +13,7 @@ pub struct Counter {
pub times: AtomicUsize,
/// last accumulated value logged
pub lastlog: AtomicUsize,
pub lograte: usize,
pub lograte: AtomicUsize,
}
macro_rules! create_counter {
@@ -22,7 +23,7 @@ macro_rules! create_counter {
counts: AtomicUsize::new(0),
times: AtomicUsize::new(0),
lastlog: AtomicUsize::new(0),
lograte: $lograte,
lograte: AtomicUsize::new($lograte),
}
};
}
@@ -33,12 +34,38 @@ macro_rules! inc_counter {
};
}
macro_rules! inc_new_counter {
($name:expr, $count:expr) => {{
static mut INC_NEW_COUNTER: Counter = create_counter!($name, 0);
inc_counter!(INC_NEW_COUNTER, $count);
}};
($name:expr, $count:expr, $lograte:expr) => {{
static mut INC_NEW_COUNTER: Counter = create_counter!($name, $lograte);
inc_counter!(INC_NEW_COUNTER, $count);
}};
}
impl Counter {
fn default_log_rate() -> usize {
let v = env::var("SOLANA_DEFAULT_METRICS_RATE")
.map(|x| x.parse().unwrap_or(DEFAULT_METRICS_RATE))
.unwrap_or(DEFAULT_METRICS_RATE);
if v == 0 {
DEFAULT_METRICS_RATE
} else {
v
}
}
pub fn inc(&mut self, events: usize) {
let counts = self.counts.fetch_add(events, Ordering::Relaxed);
let times = self.times.fetch_add(1, Ordering::Relaxed);
let lastlog = self.lastlog.load(Ordering::Relaxed);
if times % self.lograte == 0 && times > 0 {
let mut lograte = self.lograte.load(Ordering::Relaxed);
if lograte == 0 {
lograte = Counter::default_log_rate();
self.lograte.store(lograte, Ordering::Relaxed);
}
if times % lograte == 0 && times > 0 {
let lastlog = self.lastlog.load(Ordering::Relaxed);
info!(
"COUNTER:{{\"name\": \"{}\", \"counts\": {}, \"samples\": {}, \"now\": {}}}",
self.name,
@@ -46,10 +73,8 @@ impl Counter {
times,
timing::timestamp(),
);
}
if times % INFLUX_RATE == 0 && times > 0 {
metrics::submit(
influxdb::Point::new(&format!("counter_{}", self.name))
influxdb::Point::new(&format!("counter-{}", self.name))
.add_field(
"count",
influxdb::Value::Integer(counts as i64 - lastlog as i64),
@@ -63,7 +88,8 @@ impl Counter {
}
#[cfg(test)]
mod tests {
use counter::Counter;
use counter::{Counter, DEFAULT_METRICS_RATE};
use std::env;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_counter() {
@@ -73,7 +99,7 @@ mod tests {
unsafe {
assert_eq!(COUNTER.counts.load(Ordering::Relaxed), 1);
assert_eq!(COUNTER.times.load(Ordering::Relaxed), 1);
assert_eq!(COUNTER.lograte, 100);
assert_eq!(COUNTER.lograte.load(Ordering::Relaxed), 100);
assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 0);
assert_eq!(COUNTER.name, "test");
}
@@ -88,4 +114,42 @@ mod tests {
assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 399);
}
}
#[test]
fn test_inc_new_counter() {
//make sure that macros are syntactically correct
//the variable is internal to the macro scope so there is no way to introspect it
inc_new_counter!("counter-1", 1);
inc_new_counter!("counter-2", 1, 2);
}
#[test]
fn test_lograte() {
static mut COUNTER: Counter = create_counter!("test_lograte", 0);
inc_counter!(COUNTER, 2);
unsafe {
assert_eq!(
COUNTER.lograte.load(Ordering::Relaxed),
DEFAULT_METRICS_RATE
);
}
}
#[test]
fn test_lograte_env() {
assert_ne!(DEFAULT_METRICS_RATE, 0);
static mut COUNTER: Counter = create_counter!("test_lograte_env", 0);
env::set_var("SOLANA_DEFAULT_METRICS_RATE", "50");
inc_counter!(COUNTER, 2);
unsafe {
assert_eq!(COUNTER.lograte.load(Ordering::Relaxed), 50);
}
static mut COUNTER2: Counter = create_counter!("test_lograte_env", 0);
env::set_var("SOLANA_DEFAULT_METRICS_RATE", "0");
inc_counter!(COUNTER2, 2);
unsafe {
assert_eq!(
COUNTER2.lograte.load(Ordering::Relaxed),
DEFAULT_METRICS_RATE
);
}
}
}

View File

@@ -37,8 +37,6 @@ use streamer::{BlobReceiver, BlobSender, Window};
use timing::timestamp;
use transaction::Vote;
const LOG_RATE: usize = 10;
/// milliseconds we sleep for between gossip requests
const GOSSIP_SLEEP_MILLIS: u64 = 100;
const GOSSIP_PURGE_MILLIS: u64 = 15000;
@@ -48,8 +46,11 @@ const MIN_TABLE_SIZE: usize = 2;
#[derive(Debug, PartialEq, Eq)]
pub enum CrdtError {
TooSmall,
NoPeers,
NoLeader,
BadContactInfo,
BadNodeInfo,
BadGossipAddress,
}
pub fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr {
@@ -150,7 +151,7 @@ impl NodeInfo {
rpu: SocketAddr,
tpu: SocketAddr,
tvu_window: SocketAddr,
) -> NodeInfo {
) -> Self {
NodeInfo {
id,
version: 0,
@@ -169,6 +170,35 @@ impl NodeInfo {
},
}
}
#[cfg(test)]
/// NodeInfo with unspecified addresses for adversarial testing.
pub fn new_unspecified() -> Self {
let addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
assert!(addr.ip().is_unspecified());
Self::new(
KeyPair::new().pubkey(),
addr.clone(),
addr.clone(),
addr.clone(),
addr.clone(),
addr.clone(),
)
}
#[cfg(test)]
/// NodeInfo with multicast addresses for adversarial testing.
pub fn new_multicast() -> Self {
let addr: SocketAddr = "224.0.1.255:1000".parse().unwrap();
assert!(addr.ip().is_multicast());
Self::new(
KeyPair::new().pubkey(),
addr.clone(),
addr.clone(),
addr.clone(),
addr.clone(),
addr.clone(),
)
}
pub fn debug_id(&self) -> u64 {
make_debug_id(&self.id)
}
@@ -256,8 +286,31 @@ enum Protocol {
}
impl Crdt {
pub fn new(me: NodeInfo) -> Crdt {
assert_eq!(me.version, 0);
pub fn new(me: NodeInfo) -> Result<Crdt> {
if me.version != 0 {
return Err(Error::CrdtError(CrdtError::BadNodeInfo));
}
if me.contact_info.ncp.ip().is_unspecified()
|| me.contact_info.ncp.port() == 0
|| me.contact_info.ncp.ip().is_multicast()
{
return Err(Error::CrdtError(CrdtError::BadGossipAddress));
}
for addr in &[
me.contact_info.tvu,
me.contact_info.rpu,
me.contact_info.tpu,
me.contact_info.tvu_window,
] {
//dummy address is allowed, services will filter them
if addr.ip().is_unspecified() && addr.port() == 0 {
continue;
}
//if addr is not a dummy address, than it must be valid
if addr.ip().is_unspecified() || addr.port() == 0 || addr.ip().is_multicast() {
return Err(Error::CrdtError(CrdtError::BadContactInfo));
}
}
let mut g = Crdt {
table: HashMap::new(),
local: HashMap::new(),
@@ -269,7 +322,7 @@ impl Crdt {
};
g.local.insert(me.id, g.update_index);
g.table.insert(me.id, me);
g
Ok(g)
}
pub fn debug_id(&self) -> u64 {
make_debug_id(&self.me)
@@ -348,8 +401,7 @@ impl Crdt {
}
}
pub fn insert_votes(&mut self, votes: &[(PublicKey, Vote, Hash)]) {
static mut COUNTER_VOTE: Counter = create_counter!("crdt-vote-count", LOG_RATE);
inc_counter!(COUNTER_VOTE, votes.len());
inc_new_counter!("crdt-vote-count", votes.len());
if !votes.is_empty() {
info!("{:x}: INSERTING VOTES {}", self.debug_id(), votes.len());
}
@@ -373,8 +425,7 @@ impl Crdt {
self.update_index += 1;
let _ = self.table.insert(v.id, v.clone());
let _ = self.local.insert(v.id, self.update_index);
static mut COUNTER_UPDATE: Counter = create_counter!("crdt-update-count", LOG_RATE);
inc_counter!(COUNTER_UPDATE, 1);
inc_new_counter!("crdt-update-count", 1);
} else {
trace!(
"{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}",
@@ -446,8 +497,7 @@ impl Crdt {
})
.collect();
static mut COUNTER_PURGE: Counter = create_counter!("crdt-purge-count", LOG_RATE);
inc_counter!(COUNTER_PURGE, dead_ids.len());
inc_new_counter!("crdt-purge-count", dead_ids.len());
for id in &dead_ids {
self.alive.remove(id);
@@ -522,7 +572,8 @@ impl Crdt {
) -> Result<()> {
if broadcast_table.is_empty() {
warn!("{:x}:not enough peers in crdt table", me.debug_id());
Err(CrdtError::TooSmall)?;
inc_new_counter!("crdt-broadcast-not_enough_peers_error", 1, 1);
Err(CrdtError::NoPeers)?;
}
trace!("broadcast nodes {}", broadcast_table.len());
@@ -628,7 +679,8 @@ impl Crdt {
.collect();
for e in errs {
if let Err(e) = &e {
error!("broadcast result {:?}", e);
inc_new_counter!("crdt-retransmit-send_to_error", 1, 1);
error!("retransmit result {:?}", e);
}
e?;
}
@@ -669,7 +721,7 @@ impl Crdt {
.filter(|r| r.id != self.me && r.contact_info.tvu_window != daddr)
.collect();
if valid.is_empty() {
Err(CrdtError::TooSmall)?;
Err(CrdtError::NoPeers)?;
}
let n = (Self::random() as usize) % valid.len();
let addr = valid[n].contact_info.ncp;
@@ -684,7 +736,14 @@ impl Crdt {
/// * A - Address to send to
/// * B - RequestUpdates protocol message
fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> {
let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect();
let options: Vec<_> = self.table
.values()
.filter(|v| {
v.id != self.me
&& !v.contact_info.ncp.ip().is_unspecified()
&& !v.contact_info.ncp.ip().is_multicast()
})
.collect();
let choose_peer_strategy = ChooseWeightedPeerStrategy::new(
&self.remote,
@@ -694,7 +753,7 @@ impl Crdt {
let choose_peer_result = choose_peer_strategy.choose_peer(options);
if let Err(Error::CrdtError(CrdtError::TooSmall)) = &choose_peer_result {
if let Err(Error::CrdtError(CrdtError::NoPeers)) = &choose_peer_result {
trace!(
"crdt too small for gossip {:x} {}",
self.debug_id(),
@@ -898,24 +957,18 @@ impl Crdt {
outblob.meta.set_addr(&from.contact_info.tvu_window);
outblob.set_id(sender_id).expect("blob set_id");
}
static mut COUNTER_REQ_WINDOW_PASS: Counter =
create_counter!("crdt-window-request-pass", LOG_RATE);
inc_counter!(COUNTER_REQ_WINDOW_PASS, 1);
inc_new_counter!("crdt-window-request-pass", 1);
return Some(out);
} else {
static mut COUNTER_REQ_WINDOW_OUTSIDE: Counter =
create_counter!("crdt-window-request-outside", LOG_RATE);
inc_counter!(COUNTER_REQ_WINDOW_OUTSIDE, 1);
inc_new_counter!("crdt-window-request-outside", 1);
info!(
"requested ix {} != blob_ix {}, outside window!",
ix, blob_ix
);
}
} else {
static mut COUNTER_REQ_WINDOW_FAIL: Counter =
create_counter!("crdt-window-request-fail", LOG_RATE);
inc_counter!(COUNTER_REQ_WINDOW_FAIL, 1);
inc_new_counter!("crdt-window-request-fail", 1);
assert!(window.read().unwrap()[pos].is_none());
info!(
"{:x}: failed RequestWindowIndex {:x} {} {}",
@@ -991,16 +1044,23 @@ impl Crdt {
//TODO verify from is signed
obj.write().unwrap().insert(&from);
let me = obj.read().unwrap().my_data().clone();
static mut COUNTER_REQ_WINDOW: Counter =
create_counter!("crdt-window-request-recv", LOG_RATE);
inc_counter!(COUNTER_REQ_WINDOW, 1);
inc_new_counter!("crdt-window-request-recv", 1);
trace!(
"{:x}:received RequestWindowIndex {:x} {} ",
me.debug_id(),
from.debug_id(),
ix,
);
assert_ne!(from.contact_info.tvu_window, me.contact_info.tvu_window);
if from.contact_info.tvu_window == me.contact_info.tvu_window {
warn!(
"Ignored {:x}:received RequestWindowIndex from ME {:x} {} ",
me.debug_id(),
from.debug_id(),
ix,
);
inc_new_counter!("crdt-window-request-address-eq", 1);
return None;
}
Self::run_window_request(&window, &me, &from, ix, blob_recycler)
}
Err(_) => {
@@ -1084,23 +1144,18 @@ pub struct TestNode {
pub sockets: Sockets,
}
impl Default for TestNode {
fn default() -> Self {
Self::new()
}
}
impl TestNode {
pub fn new() -> Self {
pub fn new_localhost() -> Self {
let pubkey = KeyPair::new().pubkey();
Self::new_with_pubkey(pubkey)
Self::new_localhost_with_pubkey(pubkey)
}
pub fn new_with_pubkey(pubkey: PublicKey) -> Self {
let transaction = UdpSocket::bind("0.0.0.0:0").unwrap();
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let requests = UdpSocket::bind("0.0.0.0:0").unwrap();
let repair = UdpSocket::bind("0.0.0.0:0").unwrap();
pub fn new_localhost_with_pubkey(pubkey: PublicKey) -> Self {
let transaction = UdpSocket::bind("127.0.0.1:0").unwrap();
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
let requests = UdpSocket::bind("127.0.0.1:0").unwrap();
let repair = UdpSocket::bind("127.0.0.1:0").unwrap();
let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap();
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
@@ -1203,6 +1258,107 @@ mod tests {
let p3 = parse_port_or_addr(None);
assert_eq!(p3.port(), 8000);
}
#[test]
fn test_bad_address() {
let d1 = NodeInfo::new(
KeyPair::new().pubkey(),
"0.0.0.0:1234".parse().unwrap(),
"0.0.0.0:1235".parse().unwrap(),
"0.0.0.0:1236".parse().unwrap(),
"0.0.0.0:1237".parse().unwrap(),
"0.0.0.0:1238".parse().unwrap(),
);
assert_matches!(
Crdt::new(d1).err(),
Some(Error::CrdtError(CrdtError::BadGossipAddress))
);
let d1_1 = NodeInfo::new(
KeyPair::new().pubkey(),
"0.0.0.1:1234".parse().unwrap(),
"0.0.0.0:1235".parse().unwrap(),
"0.0.0.0:1236".parse().unwrap(),
"0.0.0.0:1237".parse().unwrap(),
"0.0.0.0:1238".parse().unwrap(),
);
assert_matches!(
Crdt::new(d1_1).err(),
Some(Error::CrdtError(CrdtError::BadContactInfo))
);
let d2 = NodeInfo::new(
KeyPair::new().pubkey(),
"0.0.0.1:0".parse().unwrap(),
"0.0.0.1:0".parse().unwrap(),
"0.0.0.1:0".parse().unwrap(),
"0.0.0.1:0".parse().unwrap(),
"0.0.0.1:0".parse().unwrap(),
);
assert_matches!(
Crdt::new(d2).err(),
Some(Error::CrdtError(CrdtError::BadGossipAddress))
);
let d2_1 = NodeInfo::new(
KeyPair::new().pubkey(),
"0.0.0.1:1234".parse().unwrap(),
"0.0.0.1:0".parse().unwrap(),
"0.0.0.1:0".parse().unwrap(),
"0.0.0.1:0".parse().unwrap(),
"0.0.0.1:0".parse().unwrap(),
);
assert_matches!(
Crdt::new(d2_1).err(),
Some(Error::CrdtError(CrdtError::BadContactInfo))
);
let d3 = NodeInfo::new_unspecified();
assert_matches!(
Crdt::new(d3).err(),
Some(Error::CrdtError(CrdtError::BadGossipAddress))
);
let d4 = NodeInfo::new_multicast();
assert_matches!(
Crdt::new(d4).err(),
Some(Error::CrdtError(CrdtError::BadGossipAddress))
);
let mut d5 = NodeInfo::new_multicast();
d5.version = 1;
assert_matches!(
Crdt::new(d5).err(),
Some(Error::CrdtError(CrdtError::BadNodeInfo))
);
let d6 = NodeInfo::new(
KeyPair::new().pubkey(),
"0.0.0.0:1234".parse().unwrap(),
"0.0.0.0:0".parse().unwrap(),
"0.0.0.0:0".parse().unwrap(),
"0.0.0.0:0".parse().unwrap(),
"0.0.0.0:0".parse().unwrap(),
);
assert_matches!(
Crdt::new(d6).err(),
Some(Error::CrdtError(CrdtError::BadGossipAddress))
);
let d7 = NodeInfo::new(
KeyPair::new().pubkey(),
"0.0.0.1:0".parse().unwrap(),
"0.0.0.0:0".parse().unwrap(),
"0.0.0.0:0".parse().unwrap(),
"0.0.0.0:0".parse().unwrap(),
"0.0.0.0:0".parse().unwrap(),
);
assert_matches!(
Crdt::new(d7).err(),
Some(Error::CrdtError(CrdtError::BadGossipAddress))
);
let d8 = NodeInfo::new(
KeyPair::new().pubkey(),
"0.0.0.1:1234".parse().unwrap(),
"0.0.0.0:0".parse().unwrap(),
"0.0.0.0:0".parse().unwrap(),
"0.0.0.0:0".parse().unwrap(),
"0.0.0.0:0".parse().unwrap(),
);
assert_eq!(Crdt::new(d8).is_ok(), true);
}
#[test]
fn insert_test() {
let mut d = NodeInfo::new(
@@ -1214,7 +1370,7 @@ mod tests {
"127.0.0.1:1238".parse().unwrap(),
);
assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone());
let mut crdt = Crdt::new(d.clone()).unwrap();
assert_eq!(crdt.table[&d.id].version, 0);
d.version = 2;
crdt.insert(&d);
@@ -1227,7 +1383,7 @@ mod tests {
fn test_new_vote() {
let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone());
let mut crdt = Crdt::new(d.clone()).unwrap();
assert_eq!(crdt.table[&d.id].version, 0);
let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap());
assert_ne!(d.id, leader.id);
@@ -1254,7 +1410,7 @@ mod tests {
fn test_insert_vote() {
let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone());
let mut crdt = Crdt::new(d.clone()).unwrap();
assert_eq!(crdt.table[&d.id].version, 0);
let vote_same_version = Vote {
version: d.version,
@@ -1286,7 +1442,7 @@ mod tests {
// TODO: remove this test once leaders vote
let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone());
let mut crdt = Crdt::new(d.clone()).unwrap();
let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap());
assert_ne!(d.id, leader.id);
crdt.insert(&leader);
@@ -1355,7 +1511,7 @@ mod tests {
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let mut crdt = Crdt::new(d1.clone());
let mut crdt = Crdt::new(d1.clone()).expect("Crdt::new");
let (key, ix, ups) = crdt.get_updates_since(0);
assert_eq!(key, d1.id);
assert_eq!(ix, 1);
@@ -1376,7 +1532,7 @@ mod tests {
sorted(&ups),
sorted(&vec![d1.clone(), d2.clone(), d3.clone()])
);
let mut crdt2 = Crdt::new(d2.clone());
let mut crdt2 = Crdt::new(d2.clone()).expect("Crdt::new");
crdt2.apply_updates(key, ix, &ups, &vec![]);
assert_eq!(crdt2.table.values().len(), 3);
assert_eq!(
@@ -1398,9 +1554,9 @@ mod tests {
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let mut crdt = Crdt::new(me.clone());
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
let rv = crdt.window_index_request(0);
assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers)));
let nxt = NodeInfo::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
@@ -1411,7 +1567,7 @@ mod tests {
);
crdt.insert(&nxt);
let rv = crdt.window_index_request(0);
assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers)));
let nxt = NodeInfo::new(
KeyPair::new().pubkey(),
"127.0.0.2:1234".parse().unwrap(),
@@ -1449,6 +1605,30 @@ mod tests {
assert!(one && two);
}
#[test]
fn gossip_request_bad_addr() {
let me = NodeInfo::new(
KeyPair::new().pubkey(),
"127.0.0.1:127".parse().unwrap(),
"127.0.0.1:127".parse().unwrap(),
"127.0.0.1:127".parse().unwrap(),
"127.0.0.1:127".parse().unwrap(),
"127.0.0.1:127".parse().unwrap(),
);
let mut crdt = Crdt::new(me).expect("Crdt::new");
let nxt1 = NodeInfo::new_unspecified();
// Filter out unspecified addresses
crdt.insert(&nxt1); //<--- attack!
let rv = crdt.gossip_request();
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers)));
let nxt2 = NodeInfo::new_multicast();
// Filter out multicast addresses
crdt.insert(&nxt2); //<--- attack!
let rv = crdt.gossip_request();
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers)));
}
/// test that gossip requests are eventually generated for all nodes
#[test]
fn gossip_request() {
@@ -1460,9 +1640,9 @@ mod tests {
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let mut crdt = Crdt::new(me.clone());
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
let rv = crdt.gossip_request();
assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers)));
let nxt1 = NodeInfo::new(
KeyPair::new().pubkey(),
"127.0.0.2:1234".parse().unwrap(),
@@ -1519,7 +1699,7 @@ mod tests {
fn purge_test() {
logger::setup();
let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
let mut crdt = Crdt::new(me.clone());
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
let nxt = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap());
assert_ne!(me.id, nxt.id);
crdt.set_leader(me.id);
@@ -1630,7 +1810,7 @@ mod tests {
let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
let leader0 = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
let leader1 = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
let mut crdt = Crdt::new(me.clone());
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
assert_eq!(crdt.top_leader(), None);
crdt.set_leader(leader0.id);
assert_eq!(crdt.top_leader().unwrap(), leader0.id);

View File

@@ -112,6 +112,10 @@ impl Drone {
airdrop_request_amount,
client_public_key,
} => {
info!(
"Requesting airdrop of {} to {:?}",
airdrop_request_amount, client_public_key
);
request_amount = airdrop_request_amount;
tx = Transaction::new(
&self.mint_keypair,
@@ -261,7 +265,7 @@ mod tests {
const TPS_BATCH: i64 = 5_000_000;
logger::setup();
let leader = TestNode::new();
let leader = TestNode::new_localhost();
let alice = Mint::new(10_000_000);
let bank = Bank::new(&alice);

View File

@@ -203,7 +203,7 @@ impl FullNode {
thread_hdls.extend(rpu.thread_hdls());
let blob_recycler = BlobRecycler::default();
let crdt = Arc::new(RwLock::new(Crdt::new(node.data)));
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
let (tpu, blob_receiver) = Tpu::new(
&bank.clone(),
&crdt.clone(),
@@ -285,7 +285,7 @@ impl FullNode {
);
thread_hdls.extend(rpu.thread_hdls());
let crdt = Arc::new(RwLock::new(Crdt::new(node.data)));
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
crdt.write()
.expect("'crdt' write lock before insert() in pub fn replicate")
.insert(&entry_point);
@@ -318,8 +318,12 @@ impl FullNode {
FullNode { exit, thread_hdls }
}
pub fn close(self) -> Result<()> {
//used for notifying many nodes in parallel to exit
pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed);
}
pub fn close(self) -> Result<()> {
self.exit();
self.join()
}
}
@@ -343,18 +347,39 @@ mod tests {
use crdt::TestNode;
use fullnode::FullNode;
use mint::Mint;
use service::Service;
use signature::{KeyPair, KeyPairUtil};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
#[test]
fn validator_exit() {
let kp = KeyPair::new();
let tn = TestNode::new_with_pubkey(kp.pubkey());
let tn = TestNode::new_localhost_with_pubkey(kp.pubkey());
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone();
let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit);
v.close().unwrap();
v.exit();
v.join().unwrap();
}
#[test]
fn validator_parallel_exit() {
let vals: Vec<FullNode> = (0..2)
.map(|_| {
let kp = KeyPair::new();
let tn = TestNode::new_localhost_with_pubkey(kp.pubkey());
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone();
FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit)
})
.collect();
//each validator can exit in parallel to speed many sequential calls to `join`
vals.iter().for_each(|v| v.exit());
//while join is called sequentially, the above exit call notified all the
//validators to exit from all their threads
vals.into_iter().for_each(|v| v.join().unwrap());
}
}

View File

@@ -57,7 +57,7 @@ impl InfluxDbMetricsWriter {
impl MetricsWriter for InfluxDbMetricsWriter {
fn write(&self, points: Vec<influxdb::Point>) {
if let Some(ref client) = self.client {
info!("submitting {} points", points.len());
debug!("submitting {} points", points.len());
if let Err(err) = client.write_points(
influxdb::Points { point: points },
Some(influxdb::Precision::Milliseconds),
@@ -184,6 +184,50 @@ pub fn flush() {
agent.flush();
}
/// Hook the panic handler to generate a data point on each panic
pub fn set_panic_hook(program: &'static str) {
use std::panic;
use std::sync::{Once, ONCE_INIT};
static SET_HOOK: Once = ONCE_INIT;
SET_HOOK.call_once(|| {
let default_hook = panic::take_hook();
panic::set_hook(Box::new(move |ono| {
default_hook(ono);
submit(
influxdb::Point::new("panic")
.add_tag("program", influxdb::Value::String(program.to_string()))
.add_tag(
"thread",
influxdb::Value::String(
thread::current().name().unwrap_or("?").to_string(),
),
)
// The 'one' field exists to give Kapacitor Alerts a numerical value
// to filter on
.add_field("one", influxdb::Value::Integer(1))
.add_field(
"message",
influxdb::Value::String(
// TODO: use ono.message() when it becomes stable
ono.to_string(),
),
)
.add_field(
"location",
influxdb::Value::String(match ono.location() {
Some(location) => location.to_string(),
None => "?".to_string(),
}),
)
.to_owned(),
);
// Flush metrics immediately in case the process exits immediately
// upon return
flush();
}));
});
}
#[cfg(test)]
mod test {
use super::*;

8
src/nat.rs Normal file → Executable file
View File

@@ -106,9 +106,13 @@ pub fn udp_public_bind(label: &str, startport: u16, endport: u16) -> UdpSocketPa
Err(_) => {
let sender = udp_random_bind(startport, endport, 5).unwrap();
let local_addr = sender.local_addr().unwrap();
info!("Using local address {} for {}", local_addr, label);
let pub_ip = get_public_ip_addr().unwrap();
let pub_addr = SocketAddr::new(pub_ip, local_addr.port());
info!("Using source address {} for {}", pub_addr, label);
UdpSocketPair {
addr: private_addr,
addr: pub_addr,
receiver: sender.try_clone().unwrap(),
sender,
}

View File

@@ -88,8 +88,8 @@ mod tests {
// test that stage will exit when flag is set
fn test_exit() {
let exit = Arc::new(AtomicBool::new(false));
let tn = TestNode::new();
let crdt = Crdt::new(tn.data.clone());
let tn = TestNode::new_localhost();
let crdt = Crdt::new(tn.data.clone()).expect("Crdt::new");
let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![]));
let d = Ncp::new(

View File

@@ -19,7 +19,6 @@ pub type SharedBlobs = VecDeque<SharedBlob>;
pub type PacketRecycler = Recycler<Packets>;
pub type BlobRecycler = Recycler<Blob>;
const LOG_RATE: usize = 10;
pub const NUM_PACKETS: usize = 1024 * 8;
pub const BLOB_SIZE: usize = 64 * 1024;
pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_HEADER_SIZE;
@@ -188,7 +187,6 @@ impl<T: Default> Recycler<T> {
impl Packets {
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
static mut COUNTER: Counter = create_counter!("packets", LOG_RATE);
self.packets.resize(NUM_PACKETS, Packet::default());
let mut i = 0;
//DOCUMENTED SIDE-EFFECT
@@ -203,7 +201,7 @@ impl Packets {
trace!("receiving on {}", socket.local_addr().unwrap());
match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => {
inc_counter!(COUNTER, i);
inc_new_counter!("packets-recv_count", 1);
debug!("got {:?} messages on {}", i, socket.local_addr().unwrap());
break;
}
@@ -423,7 +421,7 @@ impl Blob {
let p = r.read().expect("'r' read lock in pub fn send_to");
let a = p.meta.addr();
if let Err(e) = socket.send_to(&p.data[..p.meta.size], &a) {
info!(
warn!(
"error sending {} byte packet to {:?}: {:?}",
p.meta.size, a, e
);

View File

@@ -27,7 +27,6 @@ pub struct ReplicateStage {
}
const VOTE_TIMEOUT_MS: u64 = 1000;
const LOG_RATE: usize = 10;
impl ReplicateStage {
/// Process entry blobs, already in order
@@ -48,11 +47,13 @@ impl ReplicateStage {
}
let blobs_len = blobs.len();
let entries = ledger::reconstruct_entries_from_blobs(blobs.clone())?;
let votes = entries_to_votes(&entries);
static mut COUNTER_REPLICATE: Counter = create_counter!("replicate-transactions", LOG_RATE);
inc_counter!(
COUNTER_REPLICATE,
{
let votes = entries_to_votes(&entries);
let mut wcrdt = crdt.write().unwrap();
wcrdt.insert_votes(&votes);
};
inc_new_counter!(
"replicate-transactions",
entries.iter().map(|x| x.transactions.len()).sum()
);
let res = bank.process_entries(entries);
@@ -66,7 +67,6 @@ impl ReplicateStage {
let shared_blob = blob_recycler.allocate();
let (vote, addr) = {
let mut wcrdt = crdt.write().unwrap();
wcrdt.insert_votes(&votes);
//TODO: doesn't seem like there is a synchronous call to get height and id
info!("replicate_stage {} {:?}", height, &last_id[..8]);
wcrdt.new_vote(height, last_id)
@@ -80,7 +80,9 @@ impl ReplicateStage {
blob.meta.set_addr(&addr);
blob.meta.size = len;
}
inc_new_counter!("replicate-vote_sent", 1);
*last_vote = now;
vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?;
}
while let Some(blob) = blobs.pop_front() {

View File

@@ -22,6 +22,8 @@ struct Elems {
#[cfg(feature = "cuda")]
#[link(name = "cuda_verify_ed25519")]
extern "C" {
fn ed25519_init() -> bool;
fn ed25519_set_verbose(val: bool);
fn ed25519_verify_many(
vecs: *const Elems,
num: u32, //number of vecs
@@ -34,6 +36,11 @@ extern "C" {
) -> u32;
}
#[cfg(not(feature = "cuda"))]
pub fn init() {
// stub
}
#[cfg(not(feature = "cuda"))]
fn verify_packet(packet: &Packet) -> u8 {
use ring::signature;
@@ -70,7 +77,6 @@ fn batch_size(batches: &[SharedPackets]) -> usize {
#[cfg(not(feature = "cuda"))]
pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
use rayon::prelude::*;
static mut COUNTER: Counter = create_counter!("ed25519_verify", 1);
let count = batch_size(batches);
info!("CPU ECDSA for {}", batch_size(batches));
let rv = batches
@@ -84,14 +90,24 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
.collect()
})
.collect();
inc_counter!(COUNTER, count);
inc_new_counter!("ed25519_verify", count);
rv
}
#[cfg(feature = "cuda")]
pub fn init() {
unsafe {
ed25519_set_verbose(true);
if !ed25519_init() {
panic!("ed25519_init() failed");
}
ed25519_set_verbose(false);
}
}
#[cfg(feature = "cuda")]
pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
use packet::PACKET_DATA_SIZE;
static mut COUNTER: Counter = create_counter!("ed25519_verify_cuda", 1);
let count = batch_size(batches);
info!("CUDA ECDSA for {}", batch_size(batches));
let mut out = Vec::new();
@@ -151,7 +167,7 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
num += 1;
}
}
inc_counter!(COUNTER, count);
inc_new_counter!("ed25519_verify", count);
rvs
}

View File

@@ -25,6 +25,7 @@ pub struct SigVerifyStage {
impl SigVerifyStage {
pub fn new(packet_receiver: Receiver<SharedPackets>) -> (Self, Receiver<VerifiedPackets>) {
sigverify::init();
let (verified_sender, verified_receiver) = channel();
let thread_hdls = Self::verifier_services(packet_receiver, verified_sender);
(SigVerifyStage { thread_hdls }, verified_receiver)

View File

@@ -18,7 +18,6 @@ use std::sync::{Arc, RwLock};
use std::thread::{Builder, JoinHandle};
use std::time::Duration;
const LOG_RATE: usize = 10;
pub const WINDOW_SIZE: u64 = 2 * 1024;
pub type PacketReceiver = Receiver<SharedPackets>;
pub type PacketSender = Sender<SharedPackets>;
@@ -117,7 +116,7 @@ pub fn responder(
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{} responder error: {:?}", name, e),
_ => warn!("{} responder error: {:?}", name, e),
}
}
})
@@ -224,9 +223,7 @@ fn repair_window(
let reqs = find_next_missing(locked_window, crdt, consumed, received)?;
trace!("{:x}: repair_window missing: {}", debug_id, reqs.len());
if !reqs.is_empty() {
static mut COUNTER_REPAIR: Counter =
create_counter!("streamer-repair_window-repair", LOG_RATE);
inc_counter!(COUNTER_REPAIR, reqs.len());
inc_new_counter!("streamer-repair_window-repair", reqs.len());
debug!(
"{:x}: repair_window counter times: {} consumed: {} received: {} missing: {}",
debug_id,
@@ -301,9 +298,7 @@ fn retransmit_all_leader_blocks(
*received,
retransmit_queue.len(),
);
static mut COUNTER_RETRANSMIT: Counter =
create_counter!("streamer-recv_window-retransmit", LOG_RATE);
inc_counter!(COUNTER_RETRANSMIT, retransmit_queue.len());
inc_new_counter!("streamer-recv_window-retransmit", retransmit_queue.len());
retransmit.send(retransmit_queue)?;
}
Ok(())
@@ -413,8 +408,7 @@ fn recv_window(
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq)
}
static mut COUNTER_RECV: Counter = create_counter!("streamer-recv_window-recv", LOG_RATE);
inc_counter!(COUNTER_RECV, dq.len());
inc_new_counter!("streamer-recv_window-recv", dq.len());
debug!(
"{:x}: RECV_WINDOW {} {}: got packets {}",
debug_id,
@@ -480,9 +474,7 @@ fn recv_window(
consume_queue.len(),
);
trace!("sending consume_queue.len: {}", consume_queue.len());
static mut COUNTER_CONSUME: Counter =
create_counter!("streamer-recv_window-consume", LOG_RATE);
inc_counter!(COUNTER_CONSUME, consume_queue.len());
inc_new_counter!("streamer-recv_window-consume", consume_queue.len());
s.send(consume_queue)?;
}
Ok(())
@@ -591,7 +583,10 @@ pub fn window(
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("window error: {:?}", e),
_ => {
inc_new_counter!("streamer-window-error", 1, 1);
error!("window error: {:?}", e);
}
}
}
let _ = repair_window(
@@ -647,9 +642,7 @@ fn broadcast(
// Index the blobs
Crdt::index_blobs(&me, &blobs, receive_index)?;
// keep the cache of blobs that are broadcast
static mut COUNTER_BROADCAST: Counter =
create_counter!("streamer-broadcast-sent", LOG_RATE);
inc_counter!(COUNTER_BROADCAST, blobs.len());
inc_new_counter!("streamer-broadcast-sent", blobs.len());
{
let mut win = window.write().unwrap();
assert!(blobs.len() <= win.len());
@@ -738,8 +731,11 @@ pub fn broadcaster(
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::CrdtError(CrdtError::TooSmall) => (), // TODO: Why are the unit-tests throwing hundreds of these?
_ => error!("broadcaster error: {:?}", e),
Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
_ => {
inc_new_counter!("streamer-broadcaster-error", 1, 1);
error!("broadcaster error: {:?}", e);
}
}
}
}
@@ -792,7 +788,10 @@ pub fn retransmitter(
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("retransmitter error: {:?}", e),
_ => {
inc_new_counter!("streamer-retransmit-error", 1, 1);
error!("retransmitter error: {:?}", e);
}
}
}
}
@@ -899,9 +898,9 @@ mod test {
#[test]
pub fn window_send_test() {
logger::setup();
let tn = TestNode::new();
let tn = TestNode::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
let mut crdt_me = Crdt::new(tn.data.clone());
let mut crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new");
let me_id = crdt_me.my_data().id;
crdt_me.set_leader(me_id);
let subs = Arc::new(RwLock::new(crdt_me));

View File

@@ -274,7 +274,7 @@ mod tests {
#[test]
fn test_thin_client() {
logger::setup();
let leader = TestNode::new();
let leader = TestNode::new_localhost();
let leader_data = leader.data.clone();
let alice = Mint::new(10_000);
@@ -317,7 +317,7 @@ mod tests {
#[ignore]
fn test_bad_sig() {
logger::setup();
let leader = TestNode::new();
let leader = TestNode::new_localhost();
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
@@ -371,7 +371,7 @@ mod tests {
#[test]
fn test_client_check_signature() {
logger::setup();
let leader = TestNode::new();
let leader = TestNode::new_localhost();
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();

View File

@@ -168,21 +168,21 @@ pub mod tests {
#[test]
fn test_replicate() {
logger::setup();
let leader = TestNode::new();
let leader = TestNode::new_localhost();
let target1_kp = KeyPair::new();
let target1 = TestNode::new_with_pubkey(target1_kp.pubkey());
let target2 = TestNode::new();
let target1 = TestNode::new_localhost_with_pubkey(target1_kp.pubkey());
let target2 = TestNode::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
//start crdt_leader
let mut crdt_l = Crdt::new(leader.data.clone());
let mut crdt_l = Crdt::new(leader.data.clone()).expect("Crdt::new");
crdt_l.set_leader(leader.data.id);
let cref_l = Arc::new(RwLock::new(crdt_l));
let dr_l = new_ncp(cref_l, leader.sockets.gossip, exit.clone()).unwrap();
//start crdt2
let mut crdt2 = Crdt::new(target2.data.clone());
let mut crdt2 = Crdt::new(target2.data.clone()).expect("Crdt::new");
crdt2.insert(&leader.data);
crdt2.set_leader(leader.data.id);
let leader_id = leader.data.id;
@@ -217,7 +217,7 @@ pub mod tests {
let bank = Arc::new(Bank::new(&mint));
//start crdt1
let mut crdt1 = Crdt::new(target1.data.clone());
let mut crdt1 = Crdt::new(target1.data.clone()).expect("Crdt::new");
crdt1.insert(&leader.data);
crdt1.set_leader(leader.data.id);
let cref1 = Arc::new(RwLock::new(crdt1));

View File

@@ -3,6 +3,7 @@
//! stdout, and then sends the Entry to its output channel.
use bank::Bank;
use counter::Counter;
use crdt::Crdt;
use entry::Entry;
use entry_writer::EntryWriter;
@@ -12,6 +13,7 @@ use result::{Error, Result};
use service::Service;
use std::collections::VecDeque;
use std::io::Write;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
@@ -41,6 +43,8 @@ impl WriteStage {
let mut blobs = VecDeque::new();
entries.to_blobs(blob_recycler, &mut blobs);
if !blobs.is_empty() {
inc_new_counter!("write_stage-broadcast_vote-count", votes.len());
inc_new_counter!("write_stage-broadcast_blobs-count", blobs.len());
trace!("broadcasting {}", blobs.len());
blob_sender.send(blobs)?;
}
@@ -71,7 +75,10 @@ impl WriteStage {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
_ => {
inc_new_counter!("write_stage-error", 1);
error!("{:?}", e);
}
}
};
}

View File

@@ -16,8 +16,8 @@ use std::thread::sleep;
use std::time::Duration;
fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, Ncp, UdpSocket) {
let tn = TestNode::new();
let crdt = Crdt::new(tn.data.clone());
let tn = TestNode::new_localhost();
let crdt = Crdt::new(tn.data.clone()).expect("Crdt::new");
let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![]));
let d = Ncp::new(

View File

@@ -11,6 +11,7 @@ use solana::fullnode::{FullNode, LedgerFile};
use solana::logger;
use solana::mint::Mint;
use solana::ncp::Ncp;
use solana::service::Service;
use solana::signature::{KeyPair, KeyPairUtil, PublicKey};
use solana::streamer::default_window;
use solana::thin_client::ThinClient;
@@ -24,12 +25,12 @@ use std::time::Duration;
fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
//lets spy on the network
let exit = Arc::new(AtomicBool::new(false));
let mut spy = TestNode::new();
let mut spy = TestNode::new_localhost();
let daddr = "0.0.0.0:0".parse().unwrap();
let me = spy.data.id.clone();
spy.data.contact_info.tvu = daddr;
spy.data.contact_info.rpu = daddr;
let mut spy_crdt = Crdt::new(spy.data);
let mut spy_crdt = Crdt::new(spy.data).expect("Crdt::new");
spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt));
@@ -86,7 +87,7 @@ fn test_multi_node_validator_catchup_from_zero() {
logger::setup();
const N: usize = 5;
trace!("test_multi_node_validator_catchup_from_zero");
let leader = TestNode::new();
let leader = TestNode::new_localhost();
let leader_data = leader.data.clone();
let bob_pubkey = KeyPair::new().pubkey();
@@ -101,7 +102,7 @@ fn test_multi_node_validator_catchup_from_zero() {
let mut nodes = vec![server];
for _ in 0..N {
let keypair = KeyPair::new();
let validator = TestNode::new_with_pubkey(keypair.pubkey());
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let mut val = FullNode::new(
validator,
false,
@@ -135,7 +136,7 @@ fn test_multi_node_validator_catchup_from_zero() {
success = 0;
// start up another validator, converge and then check everyone's balances
let keypair = KeyPair::new();
let validator = TestNode::new_with_pubkey(keypair.pubkey());
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let val = FullNode::new(
validator,
false,
@@ -186,7 +187,7 @@ fn test_multi_node_basic() {
logger::setup();
const N: usize = 5;
trace!("test_multi_node_basic");
let leader = TestNode::new();
let leader = TestNode::new_localhost();
let leader_data = leader.data.clone();
let bob_pubkey = KeyPair::new().pubkey();
let (alice, ledger_path) = genesis(10_000);
@@ -200,7 +201,7 @@ fn test_multi_node_basic() {
let mut nodes = vec![server];
for _ in 0..N {
let keypair = KeyPair::new();
let validator = TestNode::new_with_pubkey(keypair.pubkey());
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let val = FullNode::new(
validator,
false,
@@ -239,7 +240,7 @@ fn test_multi_node_basic() {
#[test]
fn test_boot_validator_from_file() {
logger::setup();
let leader = TestNode::new();
let leader = TestNode::new_localhost();
let bob_pubkey = KeyPair::new().pubkey();
let (alice, ledger_path) = genesis(100_000);
let leader_data = leader.data.clone();
@@ -258,7 +259,7 @@ fn test_boot_validator_from_file() {
assert_eq!(leader_balance, 1000);
let keypair = KeyPair::new();
let validator = TestNode::new_with_pubkey(keypair.pubkey());
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let validator_data = validator.data.clone();
let val_fullnode = FullNode::new(
validator,
@@ -277,7 +278,7 @@ fn test_boot_validator_from_file() {
}
fn create_leader(ledger_path: &str) -> (NodeInfo, FullNode) {
let leader = TestNode::new();
let leader = TestNode::new_localhost();
let leader_data = leader.data.clone();
let leader_fullnode = FullNode::new(
leader,
@@ -328,7 +329,7 @@ fn test_leader_restart_validator_start_from_old_ledger() {
// start validator from old ledger
let keypair = KeyPair::new();
let validator = TestNode::new_with_pubkey(keypair.pubkey());
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let validator_data = validator.data.clone();
let val_fullnode = FullNode::new(
validator,
@@ -369,7 +370,7 @@ fn test_leader_restart_validator_start_from_old_ledger() {
fn test_multi_node_dynamic_network() {
logger::setup();
const N: usize = 60;
let leader = TestNode::new();
let leader = TestNode::new_localhost();
let bob_pubkey = KeyPair::new().pubkey();
let (alice, ledger_path) = genesis(100_000);
let leader_data = leader.data.clone();
@@ -392,7 +393,7 @@ fn test_multi_node_dynamic_network() {
.into_iter()
.map(|n| {
let keypair = KeyPair::new();
let validator = TestNode::new_with_pubkey(keypair.pubkey());
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let rd = validator.data.clone();
//send some tokens to the new validator
let bal =
@@ -410,6 +411,7 @@ fn test_multi_node_dynamic_network() {
})
.collect();
let mut consecutive_success = 0;
for i in 0..N {
//verify leader can do transfer
let expected = ((i + 3) * 500) as i64;
@@ -452,13 +454,25 @@ fn test_multi_node_dynamic_network() {
validators.len(),
distance
);
//assert_eq!(success, validators.len());
if success == validators.len() && distance == 0 {
consecutive_success += 1;
} else {
consecutive_success = 0;
}
if consecutive_success == 10 {
break;
}
}
}
for (_, node) in validators {
node.close().unwrap();
assert_eq!(consecutive_success, 10);
for (_, node) in &validators {
node.exit();
}
server.close().unwrap();
server.exit();
for (_, node) in validators {
node.join().unwrap();
}
server.join().unwrap();
std::fs::remove_file(ledger_path).unwrap();
}