Compare commits

...

56 Commits
v0.7.1 ... v0.7

Author SHA1 Message Date
6f3beb915c Update fetch-perf-libs.sh 2018-09-17 11:54:32 -07:00
f399172f51 Pin docker images 2018-09-07 17:13:39 -07:00
15f3b97492 Send deploy metrics to the testnet-specific database 2018-09-06 08:32:07 -07:00
de284466ff Fetch install-earlyoom.sh from v0.7 branch 2018-09-05 19:12:56 -07:00
cb20ebc583 Don't block on large network test 2018-08-21 20:52:45 -07:00
ceb5686175 clippy: remove identity conversion 2018-08-21 20:51:46 -07:00
55d59b1ac6 echo commands, use PID (good form) 2018-08-21 20:42:35 -07:00
2b60b4e23a files have to appear in the snap 2018-08-21 20:42:21 -07:00
9bdc1b9727 fixups 2018-08-21 20:42:13 -07:00
87efdabcb3 make a copy of the ledger for sanity check
we can't verify a live ledger, unfortunately, fixes #985
2018-08-21 20:42:05 -07:00
7d5bb28128 Make SNAP_CHANNEL more visible in build log 2018-08-17 21:40:24 -07:00
ae433d6a34 Invert logic 2018-08-17 21:17:31 -07:00
e3c668acff Keep v0.7 snap off the edge channel 2018-08-17 21:12:31 -07:00
5825501c79 Log expansion directive must be on its own line 2018-08-17 20:58:00 -07:00
7e84bb7a60 Add option to skip ledger verification 2018-08-17 20:41:55 -07:00
da1fd96d50 0.7.2 2018-08-17 17:56:04 -07:00
141e1e974d Add some wget retries 2018-08-17 16:11:18 -07:00
fc0d7f5982 updated nightly versions 2018-08-16 13:17:29 -07:00
f697632edb update clippy install instructions, from here:
https://github.com/rust-lang-nursery/rust-clippy

fixes #947 ?
2018-08-16 13:17:29 -07:00
73797c789b back to 4 TX threads 2018-08-16 12:02:11 -07:00
036fcced31 test -t nproc 2018-08-16 12:02:11 -07:00
1d3157fb80 fixups 2018-08-16 12:02:11 -07:00
0b11c2e119 restart testnet clients in case airdrop fails 2018-08-16 12:02:11 -07:00
96af892d95 Add docs about the testnet 2018-08-16 07:39:17 -07:00
c2983f824e Refactored large network test to use finality to assert success (#978) 2018-08-15 20:05:43 -07:00
88d6fea999 Revert "Accounts with state (#954)"
This reverts commit c23fa289c3.
2018-08-15 19:44:39 -07:00
c23fa289c3 Accounts with state (#954)
* Account type with state

* fixed test according to @rob-solana
2018-08-15 14:32:11 -07:00
db35f220f7 Run multinode test for enough iterations for a small node count test (#971) 2018-08-15 10:44:14 -07:00
982afa87a6 Retransmit blobs from leader from window (#975)
- Some nodes don't have leader information while leader is broadcasting
  blobs to those nodes. Such blobs are not retransmitted. This change
  rertansmits the blobs once the leader's identity is know.
2018-08-14 21:51:37 -07:00
dccae18b53 cfg=erasure fixes, use return value of align!() 2018-08-14 12:14:59 -07:00
53e86f2fa2 use align! 2018-08-14 12:14:59 -07:00
757dfd36a3 Report errors better in build log 2018-08-14 11:44:26 -07:00
708add0e64 fixups 2018-08-14 10:16:34 -07:00
d8991ae2ca fix UPnP backout, fixes #969 2018-08-14 10:16:34 -07:00
5f6cbe0cf8 fixups 2018-08-13 21:07:26 -07:00
f167b0c2c5 fixups 2018-08-13 21:07:26 -07:00
f784500fbb fixups
fixes #907
2018-08-13 21:07:26 -07:00
83df47323a initialize recycled data 2018-08-13 21:07:26 -07:00
c75d4abb0b Tuck away PoH duration 2018-08-13 20:17:16 -06:00
5216a723b1 Pacify clippy 2018-08-13 20:17:16 -06:00
b801ca477d Declare fullnode a word 2018-08-13 20:17:16 -06:00
c830c604f4 Make BroadcastStage an actual stage
TODO: Why isn't BroadcastStage/RetransmitStage managed by the NCP?
2018-08-13 20:17:16 -06:00
0e66606c7f Rename broadcaster to broadcast_stage
And move retransmitter code into retransmit_stage.

TODO: Add a BroadcastStage service
2018-08-13 20:17:16 -06:00
8707abe091 Fix erasure build 2018-08-13 20:17:16 -06:00
dc2a840985 Move FullNode::new_window into window module 2018-08-13 20:17:16 -06:00
2727067b94 Move winow into its own module 2018-08-13 20:17:16 -06:00
6a8a494f5d Rename WindowStage to RetransmitStage
The window is used for both broadcasting from leader to validator
and retransmitting between validators.
2018-08-13 20:17:16 -06:00
a09d2e252a Move window dependencies out of streamer
No tests!?
2018-08-13 20:17:16 -06:00
3e9c463ff1 Offer only 1 way to create a fullnode with an empty window 2018-08-13 20:17:16 -06:00
46d50f5bde Remove p2p crate (and uPnP support) 2018-08-13 18:22:58 -07:00
e8da903c6c move tmp_ledger back to target dir 2018-08-13 16:52:53 -07:00
ab10b7676a use stable cache 2018-08-13 16:23:30 -07:00
fa44a71d3e move bench to a seprate, parallel step 2018-08-13 16:23:30 -07:00
c86e9e8568 pad max_data_size to jerasure's alignment requirements 2018-08-13 16:10:51 -07:00
9e22e23ce6 increase stable timeout until tomorrow 2018-08-13 15:45:50 -07:00
835f29a178 off by 2 2018-08-13 15:12:12 -07:00
35 changed files with 1827 additions and 1408 deletions

View File

@ -45,7 +45,8 @@ understood. Avoid introducing new 3-letter terms, which can be confused with 3-l
Some terms we currently use regularly in the codebase:
* hash: n. A SHA-256 Hash
* fullnode: n. A fully participating network node.
* hash: n. A SHA-256 Hash.
* keypair: n. A Ed25519 key-pair, containing a public and private key.
* pubkey: n. The public key of a Ed25519 key-pair.
* sigverify: v. To verify a Ed25519 digital signature.

View File

@ -1,7 +1,7 @@
[package]
name = "solana"
description = "Blockchain, Rebuilt for Scale"
version = "0.7.1"
version = "0.7.2"
documentation = "https://docs.rs/solana"
homepage = "http://solana.com/"
readme = "README.md"
@ -79,7 +79,6 @@ itertools = "0.7.8"
libc = "0.2.1"
log = "0.4.2"
matches = "0.1.6"
p2p = "0.5.2"
pnet_datalink = "0.21.0"
rand = "0.5.1"
rayon = "1.0.0"

View File

@ -1,13 +1,18 @@
steps:
- command: "ci/docker-run.sh solanalabs/rust ci/test-stable.sh"
- command: "ci/docker-run.sh solanalabs/rust:1.28.0 ci/test-stable.sh"
name: "stable [public]"
env:
CARGO_TARGET_CACHE_NAME: "stable"
timeout_in_minutes: 30
- command: "ci/docker-run.sh solanalabs/rust:1.28.0 ci/test-bench.sh"
name: "bench [public]"
env:
CARGO_TARGET_CACHE_NAME: "stable"
timeout_in_minutes: 30
- command: "ci/shellcheck.sh"
name: "shellcheck [public]"
timeout_in_minutes: 20
- command: "ci/docker-run.sh solanalabs/rust-nightly ci/test-nightly.sh"
- command: "ci/docker-run.sh solanalabs/rust-nightly:2018-08-14 ci/test-nightly.sh"
name: "nightly [public]"
env:
CARGO_TARGET_CACHE_NAME: "nightly"
@ -19,8 +24,8 @@ steps:
timeout_in_minutes: 20
agents:
- "queue=cuda"
- command: "ci/test-large-network.sh"
name: "large-network [public]"
- command: "ci/test-large-network.sh || true"
name: "large-network [public] [ignored]"
env:
CARGO_TARGET_CACHE_NAME: "stable"
timeout_in_minutes: 20

View File

@ -1,6 +1,6 @@
FROM rustlang/rust:nightly
RUN cargo install --force clippy cargo-cov && \
RUN rustup component add clippy-preview --toolchain=nightly && \
echo deb http://ftp.debian.org/debian stretch-backports main >> /etc/apt/sources.list && \
apt update && \
apt install -y \

View File

@ -55,21 +55,31 @@ trap shutdown EXIT INT
set -e
flag_error() {
echo Failed
echo "^^^ +++"
exit 1
}
echo "--- Wallet sanity"
(
set -x
multinode-demo/test/wallet-sanity.sh
)
) || flag_error
echo "--- Node count"
(
set -x
./multinode-demo/client.sh "$PWD" 3 -c --addr 127.0.0.1
)
) || flag_error
killBackgroundCommands
echo "--- Ledger verification"
killBackgroundCommands
$solana_ledger_tool --ledger "$SOLANA_CONFIG_DIR"/ledger verify
(
set -x
$solana_ledger_tool --ledger "$SOLANA_CONFIG_DIR"/ledger verify
) || flag_error
echo +++
echo Ok

View File

@ -11,7 +11,9 @@ fi
# when this script is run from a triggered pipeline, TRIGGERED_BUILDKITE_TAG is
# used instead of BUILDKITE_TAG (due to Buildkite limitations that prevents
# BUILDKITE_TAG from propagating through to triggered pipelines)
if [[ -z "$BUILDKITE_TAG" && -z "$TRIGGERED_BUILDKITE_TAG" ]]; then
if [[ -n "$BUILDKITE_TAG" || -n "$TRIGGERED_BUILDKITE_TAG" ]]; then
SNAP_CHANNEL=stable
elif [[ $BUILDKITE_BRANCH = master ]]; then
SNAP_CHANNEL=edge
else
SNAP_CHANNEL=beta
@ -43,11 +45,11 @@ if [[ ! -x /usr/bin/multilog ]]; then
exit 1
fi
echo --- build
echo --- build: $SNAP_CHANNEL channel
snapcraft
source ci/upload_ci_artifact.sh
upload_ci_artifact solana_*.snap
echo --- publish
echo --- publish: $SNAP_CHANNEL channel
$DRYRUN snapcraft push solana_*.snap --release $SNAP_CHANNEL

13
ci/test-bench.sh Executable file
View File

@ -0,0 +1,13 @@
#!/bin/bash -e
cd "$(dirname "$0")/.."
ci/version-check.sh stable
export RUST_BACKTRACE=1
_() {
echo "--- $*"
"$@"
}
_ cargo bench --verbose

View File

@ -13,7 +13,6 @@ _() {
_ cargo fmt -- --check
_ cargo build --verbose
_ cargo test --verbose
_ cargo bench --verbose
echo --- ci/localnet-sanity.sh
(

View File

@ -17,6 +17,51 @@ if [[ -z $SOLANA_METRICS_CONFIG ]]; then
exit 1
fi
# The SOLANA_METRICS_CONFIG environment variable is formatted as a
# comma-delimited list of parameters. All parameters are optional.
#
# Example:
# export SOLANA_METRICS_CONFIG="host=<metrics host>,db=<database name>,u=<username>,p=<password>"
#
configure_metrics() {
[[ -n $SOLANA_METRICS_CONFIG ]] || return 0
declare metrics_params
IFS=',' read -r -a metrics_params <<< "$SOLANA_METRICS_CONFIG"
for param in "${metrics_params[@]}"; do
IFS='=' read -r -a pair <<< "$param"
if [[ ${#pair[@]} != 2 ]]; then
echo Error: invalid metrics parameter: "$param" >&2
else
declare name="${pair[0]}"
declare value="${pair[1]}"
case "$name" in
host)
export INFLUX_HOST="$value"
echo INFLUX_HOST="$INFLUX_HOST" >&2
;;
db)
export INFLUX_DATABASE="$value"
echo INFLUX_DATABASE="$INFLUX_DATABASE" >&2
;;
u)
export INFLUX_USERNAME="$value"
echo INFLUX_USERNAME="$INFLUX_USERNAME" >&2
;;
p)
export INFLUX_PASSWORD="$value"
echo INFLUX_PASSWORD="********" >&2
;;
*)
echo Error: Unknown metrics parameter name: "$name" >&2
;;
esac
fi
done
}
configure_metrics
# Default to edge channel. To select the beta channel:
# export SOLANA_SNAP_CHANNEL=beta
if [[ -z $SOLANA_SNAP_CHANNEL ]]; then
@ -70,7 +115,10 @@ fi
SNAP_INSTALL_CMD="sudo snap remove solana; $SNAP_INSTALL_CMD"
EARLYOOM_INSTALL_CMD="\
wget -O install-earlyoom.sh https://raw.githubusercontent.com/solana-labs/solana/master/ci/install-earlyoom.sh; \
wget --retry-connrefused --waitretry=1 \
--read-timeout=20 --timeout=15 --tries=5 \
-O install-earlyoom.sh \
https://raw.githubusercontent.com/solana-labs/solana/v0.7/ci/install-earlyoom.sh; \
bash install-earlyoom.sh \
"
SNAP_INSTALL_CMD="$EARLYOOM_INSTALL_CMD; $SNAP_INSTALL_CMD"
@ -276,10 +324,12 @@ client_start() {
tmux new -s solana -d \" \
set -x; \
sudo rm /tmp/solana.log; \
/snap/bin/solana.bench-tps $SOLANA_NET_ENTRYPOINT $fullnode_count --loop -s 600 --sustained -t \$threadCount 2>&1 | tee /tmp/solana.log; \
echo 'https://metrics.solana.com:8086/write?db=${INFLUX_DATABASE}&u=${INFLUX_USERNAME}&p=${INFLUX_PASSWORD}' \
| xargs curl --max-time 5 -XPOST --data-binary 'testnet-deploy,name=$netBasename clientexit=1'; \
echo Error: bench-tps should never exit | tee -a /tmp/solana.log; \
while : ; do \
/snap/bin/solana.bench-tps $SOLANA_NET_ENTRYPOINT $fullnode_count --loop -s 600 --sustained -t \$threadCount 2>&1 | tee -a /tmp/solana.log; \
echo 'https://metrics.solana.com:8086/write?db=${INFLUX_DATABASE}&u=${INFLUX_USERNAME}&p=${INFLUX_PASSWORD}' \
| xargs curl --max-time 5 -XPOST --data-binary 'testnet-deploy,name=$netBasename clientexit=1'; \
echo Error: bench-tps should never exit | tee -a /tmp/solana.log; \
done; \
bash \
\"; \
sleep 2; \

View File

@ -17,12 +17,21 @@ if [[ -z $EXPECTED_NODE_COUNT ]]; then
fi
echo "--- $NET_URL: verify ledger"
if [[ -d /var/snap/solana/current/config/ledger ]]; then
# Note: here we assume this script is actually running on the leader node...
sudo solana.ledger-tool --ledger /var/snap/solana/current/config/ledger verify
if [[ -z $NO_LEDGER_VERIFY ]]; then
if [[ -d /var/snap/solana/current/config/ledger ]]; then
# Note: here we assume this script is actually running on the leader node...
(
set -x
sudo cp -r /var/snap/solana/current/config/ledger /var/snap/solana/current/config/ledger-verify-$$
sudo solana.ledger-tool --ledger /var/snap/solana/current/config/ledger-verify-$$ verify
)
else
echo "^^^ +++"
echo "Ledger verify skipped"
fi
else
echo "^^^ +++"
echo "Ledger verify skipped"
echo "Ledger verify skipped (NO_LEDGER_VERIFY defined)"
fi
echo "--- $NET_URL: wallet sanity"
@ -54,13 +63,15 @@ if [[ -z $NO_VALIDATOR_SANITY ]]; then
)
wc -l validator.log
if grep -C100 panic validator.log; then
echo "^^^ +++ Panic observed"
echo "^^^ +++"
echo "Panic observed"
exit 1
else
echo "Validator log looks ok"
fi
else
echo "^^^ +++ Validator sanity disabled (NO_VALIDATOR_SANITY defined)"
echo "^^^ +++"
echo "Validator sanity disabled (NO_VALIDATOR_SANITY defined)"
fi
exit 0

View File

@ -19,7 +19,7 @@ require() {
case ${1:-stable} in
nightly)
require rustc 1.29.[0-9]+-nightly
require rustc 1.30.[0-9]+-nightly
require cargo 1.29.[0-9]+-nightly
;;
stable)

35
doc/testnet.md Normal file
View File

@ -0,0 +1,35 @@
# TestNet debugging info
Currently we have two testnets, 'perf' and 'master', both on the master branch of the solana repo. Deploys happen
at the top of every hour with the latest code. 'perf' has more cores for the client machine to flood the network
with transactions until failure.
## Deploy process
They are deployed with the `ci/testnet-deploy.sh` script. There is a scheduled buildkite job which runs to do the deploy,
look at `testnet-deploy` to see the agent which ran it and the logs. There is also a manual job to do the deploy manually..
Validators are selected based on their machine name and everyone gets the binaries installed from snap.
## Where are the testnet logs?
For the client they are put in `/tmp/solana`; for validators and leaders they are in `/var/snap/solana/current/`.
You can also see the backtrace of the client by ssh'ing into the client node and doing:
```bash
$ sudo -u testnet-deploy
$ tmux attach -t solana
```
## How do I reset the testnet?
Through buildkite.
## How can I scale the tx generation rate?
Increase the TX rate by increasing the number of cores on the client machine which is running
`bench-tps` or run multiple clients. Decrease by lowering cores or using the rayon env
variable `RAYON_NUM_THREADS=<xx>`
## How can I test a change on the testnet?
Currently, a merged PR is the only way to test a change on the testnet.

View File

@ -13,7 +13,7 @@ fi
(
set -x
curl -o solana-perf.tgz \
https://solana-perf.s3.amazonaws.com/master/x86_64-unknown-linux-gnu/solana-perf.tgz
https://solana-perf.s3.amazonaws.com/v0.8.0/x86_64-unknown-linux-gnu/solana-perf.tgz
tar zxvf solana-perf.tgz
)

View File

@ -22,9 +22,9 @@ use std::result;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::RwLock;
use std::time::Instant;
use streamer::WINDOW_SIZE;
use timing::{duration_as_us, timestamp};
use transaction::{Instruction, Plan, Transaction};
use window::WINDOW_SIZE;
/// The number of most recent `last_id` values that the bank will track the signatures
/// of. Once the bank discards a `last_id`, it will reject any transactions that use

View File

@ -16,15 +16,15 @@ use solana::fullnode::Config;
use solana::hash::Hash;
use solana::logger;
use solana::metrics;
use solana::nat::{udp_public_bind, udp_random_bind, UdpSocketPair};
use solana::nat::{get_public_ip_addr, udp_random_bind};
use solana::ncp::Ncp;
use solana::service::Service;
use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil};
use solana::streamer::default_window;
use solana::thin_client::ThinClient;
use solana::timing::{duration_as_ms, duration_as_s};
use solana::transaction::Transaction;
use solana::wallet::request_airdrop;
use solana::window::default_window;
use std::collections::VecDeque;
use std::fs::File;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
@ -373,7 +373,6 @@ fn main() {
let mut threads = 4usize;
let mut num_nodes = 1usize;
let mut time_sec = 90;
let mut addr = None;
let mut sustained = false;
let mut tx_count = 500_000;
@ -429,7 +428,7 @@ fn main() {
Arg::with_name("addr")
.short("a")
.long("addr")
.value_name("PATH")
.value_name("IPADDR")
.takes_value(true)
.help("address to advertise to the network"),
)
@ -469,9 +468,17 @@ fn main() {
time_sec = s.to_string().parse().expect("integer");
}
if let Some(s) = matches.value_of("addr") {
addr = Some(s.to_string());
}
let addr = if let Some(s) = matches.value_of("addr") {
s.to_string().parse().unwrap_or_else(|e| {
eprintln!("failed to parse {} as IP address error: {:?}", s, e);
exit(1);
})
} else {
get_public_ip_addr().unwrap_or_else(|e| {
eprintln!("failed to get public IP, try --addr? error: {:?}", e);
exit(1);
})
};
if let Some(s) = matches.value_of("tx_count") {
tx_count = s.to_string().parse().expect("integer");
@ -638,37 +645,17 @@ fn main() {
}
}
fn spy_node(addr: Option<String>) -> (NodeInfo, UdpSocket) {
let gossip_socket_pair;
if let Some(a) = addr {
let gossip_socket = udp_random_bind(8000, 10000, 5).unwrap();
let gossip_addr = SocketAddr::new(
a.parse().unwrap(),
gossip_socket.local_addr().unwrap().port(),
);
gossip_socket_pair = UdpSocketPair {
addr: gossip_addr,
receiver: gossip_socket.try_clone().unwrap(),
sender: gossip_socket,
};
} else {
gossip_socket_pair = udp_public_bind("gossip", 8000, 10000);
}
fn spy_node(addr: IpAddr) -> (NodeInfo, UdpSocket) {
let gossip_socket = udp_random_bind(8000, 10000, 5).unwrap();
let gossip_addr = SocketAddr::new(addr, gossip_socket.local_addr().unwrap().port());
let pubkey = Keypair::new().pubkey();
let daddr = "0.0.0.0:0".parse().unwrap();
assert!(!gossip_socket_pair.addr.ip().is_unspecified());
assert!(!gossip_socket_pair.addr.ip().is_multicast());
let node = NodeInfo::new(
pubkey,
//gossip.local_addr().unwrap(),
gossip_socket_pair.addr,
daddr,
daddr,
daddr,
daddr,
);
(node, gossip_socket_pair.receiver)
assert!(!gossip_addr.ip().is_unspecified());
assert!(!gossip_addr.ip().is_multicast());
let node = NodeInfo::new(pubkey, gossip_addr, daddr, daddr, daddr, daddr);
(node, gossip_socket)
}
fn converge(
@ -676,7 +663,7 @@ fn converge(
exit_signal: &Arc<AtomicBool>,
num_nodes: usize,
threads: &mut Vec<JoinHandle<()>>,
addr: Option<String>,
addr: IpAddr,
) -> Vec<NodeInfo> {
//lets spy on the network
let (spy, spy_gossip) = spy_node(addr);
@ -702,7 +689,6 @@ fn converge(
.unwrap()
.table
.values()
.into_iter()
.filter(|x| Crdt::is_valid_address(x.contact_info.rpu))
.cloned()
.collect();

View File

@ -9,7 +9,7 @@ use clap::{App, Arg};
use solana::client::mk_client;
use solana::crdt::{NodeInfo, TestNode};
use solana::drone::DRONE_PORT;
use solana::fullnode::{Config, FullNode};
use solana::fullnode::{Config, Fullnode};
use solana::logger;
use solana::metrics::set_panic_hook;
use solana::service::Service;
@ -83,11 +83,11 @@ fn main() -> () {
let testnet_addr: SocketAddr = testnet_address_string.parse().unwrap();
drone_addr.set_ip(testnet_addr.ip());
FullNode::new(node, false, ledger_path, keypair, Some(testnet_addr))
Fullnode::new(node, false, ledger_path, keypair, Some(testnet_addr))
} else {
node.data.leader_id = node.data.id;
FullNode::new(node, true, ledger_path, keypair, None)
Fullnode::new(node, true, ledger_path, keypair, None)
};
let mut client = mk_client(&repl_clone);

View File

@ -117,15 +117,14 @@ fn main() {
}
let entries = entries.map(|e| e.unwrap());
for (i, entry) in entries.enumerate() {
let head = head - 2;
for (i, entry) in entries.skip(2).enumerate() {
if i >= head {
break;
}
if i >= 2 {
if let Err(e) = bank.process_entry(entry) {
eprintln!("verify failed at entry[{}], err: {:?}", i, e);
exit(1);
}
if let Err(e) = bank.process_entry(entry) {
eprintln!("verify failed at entry[{}], err: {:?}", i + 2, e);
exit(1);
}
}
}

203
src/broadcast_stage.rs Normal file
View File

@ -0,0 +1,203 @@
//! The `broadcast_stage` broadcasts data from a leader node to validators
//!
use counter::Counter;
use crdt::{Crdt, CrdtError, NodeInfo};
#[cfg(feature = "erasure")]
use erasure;
use log::Level;
use packet::BlobRecycler;
use result::{Error, Result};
use service::Service;
use std::mem;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use streamer::BlobReceiver;
use window::{self, SharedWindow, WindowIndex, WINDOW_SIZE};
fn broadcast(
node_info: &NodeInfo,
broadcast_table: &[NodeInfo],
window: &SharedWindow,
recycler: &BlobRecycler,
receiver: &BlobReceiver,
sock: &UdpSocket,
transmit_index: &mut WindowIndex,
receive_index: &mut u64,
) -> Result<()> {
let debug_id = node_info.debug_id();
let timer = Duration::new(1, 0);
let mut dq = receiver.recv_timeout(timer)?;
while let Ok(mut nq) = receiver.try_recv() {
dq.append(&mut nq);
}
// flatten deque to vec
let blobs_vec: Vec<_> = dq.into_iter().collect();
// We could receive more blobs than window slots so
// break them up into window-sized chunks to process
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec());
if log_enabled!(Level::Trace) {
trace!("{}", window::print_window(debug_id, window, *receive_index));
}
for mut blobs in blobs_chunked {
let blobs_len = blobs.len();
trace!("{:x}: broadcast blobs.len: {}", debug_id, blobs_len);
// Index the blobs
window::index_blobs(node_info, &blobs, receive_index)
.expect("index blobs for initial window");
// keep the cache of blobs that are broadcast
inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
{
let mut win = window.write().unwrap();
assert!(blobs.len() <= win.len());
for b in &blobs {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
if let Some(x) = mem::replace(&mut win[pos].data, None) {
trace!(
"{:x} popped {} at {}",
debug_id,
x.read().unwrap().get_index().unwrap(),
pos
);
recycler.recycle(x);
}
if let Some(x) = mem::replace(&mut win[pos].coding, None) {
trace!(
"{:x} popped {} at {}",
debug_id,
x.read().unwrap().get_index().unwrap(),
pos
);
recycler.recycle(x);
}
trace!("{:x} null {}", debug_id, pos);
}
while let Some(b) = blobs.pop() {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
trace!("{:x} caching {} at {}", debug_id, ix, pos);
assert!(win[pos].data.is_none());
win[pos].data = Some(b);
}
}
// Fill in the coding blob data from the window data blobs
#[cfg(feature = "erasure")]
{
erasure::generate_coding(
debug_id,
&mut window.write().unwrap(),
recycler,
*receive_index,
blobs_len,
&mut transmit_index.coding,
)?;
}
*receive_index += blobs_len as u64;
// Send blobs out from the window
Crdt::broadcast(
&node_info,
&broadcast_table,
&window,
&sock,
transmit_index,
*receive_index,
)?;
}
Ok(())
}
pub struct BroadcastStage {
thread_hdl: JoinHandle<()>,
}
impl BroadcastStage {
fn run(
sock: &UdpSocket,
crdt: &Arc<RwLock<Crdt>>,
window: &SharedWindow,
entry_height: u64,
recycler: &BlobRecycler,
receiver: &BlobReceiver,
) {
let mut transmit_index = WindowIndex {
data: entry_height,
coding: entry_height,
};
let mut receive_index = entry_height;
let me = crdt.read().unwrap().my_data().clone();
loop {
let broadcast_table = crdt.read().unwrap().compute_broadcast_table();
if let Err(e) = broadcast(
&me,
&broadcast_table,
&window,
&recycler,
&receiver,
&sock,
&mut transmit_index,
&mut receive_index,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
_ => {
inc_new_counter_info!("streamer-broadcaster-error", 1, 1);
error!("broadcaster error: {:?}", e);
}
}
}
}
}
/// Service to broadcast messages from the leader to layer 1 nodes.
/// See `crdt` for network layer definitions.
/// # Arguments
/// * `sock` - Socket to send from.
/// * `exit` - Boolean to signal system exit.
/// * `crdt` - CRDT structure
/// * `window` - Cache of blobs that we have broadcast
/// * `recycler` - Blob recycler.
/// * `receiver` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
pub fn new(
sock: UdpSocket,
crdt: Arc<RwLock<Crdt>>,
window: SharedWindow,
entry_height: u64,
recycler: BlobRecycler,
receiver: BlobReceiver,
) -> Self {
let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver);
})
.unwrap();
BroadcastStage { thread_hdl }
}
}
impl Service for BroadcastStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
vec![self.thread_hdl]
}
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}

View File

@ -35,9 +35,10 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{sleep, Builder, JoinHandle};
use std::time::{Duration, Instant};
use streamer::{BlobReceiver, BlobSender, SharedWindow, WindowIndex};
use streamer::{BlobReceiver, BlobSender};
use timing::{duration_as_ms, timestamp};
use transaction::Vote;
use window::{SharedWindow, WindowIndex};
/// milliseconds we sleep for between gossip requests
const GOSSIP_SLEEP_MILLIS: u64 = 100;
@ -1381,8 +1382,8 @@ mod tests {
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
use streamer::default_window;
use transaction::Vote;
use window::default_window;
#[test]
fn test_parse_port_or_addr() {
@ -1910,9 +1911,11 @@ mod tests {
assert!(rv.is_none());
fn tmp_ledger(name: &str) -> String {
use std::env;
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
let keypair = Keypair::new();
let path = format!("/tmp/farf/{}-{}", name, keypair.pubkey());
let path = format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey());
let mut writer = LedgerWriter::open(&path, true).unwrap();
let zero = Hash::default();

View File

@ -159,7 +159,7 @@ mod tests {
use bank::Bank;
use crdt::{get_ip_addr, TestNode};
use drone::{Drone, DroneRequest, REQUEST_CAP, TIME_SLICE};
use fullnode::FullNode;
use fullnode::Fullnode;
use logger;
use mint::Mint;
use service::Service;
@ -260,9 +260,11 @@ mod tests {
}
fn tmp_ledger_path(name: &str) -> String {
use std::env;
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
let keypair = Keypair::new();
format!("/tmp/tmp-ledger-{}-{}", name, keypair.pubkey())
format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey())
}
#[test]
@ -283,12 +285,11 @@ mod tests {
let leader_data = leader.data.clone();
let ledger_path = tmp_ledger_path("send_airdrop");
let server = FullNode::new_leader(
let server = Fullnode::new_leader(
leader_keypair,
bank,
0,
None,
Some(Duration::from_millis(30)),
&[],
leader,
exit.clone(),
&ledger_path,

View File

@ -3,13 +3,21 @@ use packet::{BlobRecycler, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE};
use std::cmp;
use std::mem;
use std::result;
use streamer::WindowSlot;
use window::WindowSlot;
//TODO(sakridge) pick these values
pub const NUM_DATA: usize = 16; // number of data blobs
pub const NUM_CODING: usize = 4; // number of coding blobs, also the maximum number that can go missing
pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING; // total number of blobs in an erasure set, includes data and coding blobs
pub const JERASURE_ALIGN: usize = 4; // data size has to be a multiple of 4 bytes
macro_rules! align {
($x:expr, $align:expr) => {
$x + ($align - 1) & !($align - 1)
};
}
#[derive(Debug, PartialEq, Eq)]
pub enum ErasureError {
NotEnoughBlocksToDecode,
@ -245,6 +253,9 @@ pub fn generate_coding(
}
}
// round up to the nearest jerasure alignment
max_data_size = align!(max_data_size, JERASURE_ALIGN);
trace!("{:x} max_data_size: {}", debug_id, max_data_size);
let mut data_blobs = Vec::with_capacity(NUM_DATA);
@ -262,8 +273,9 @@ pub fn generate_coding(
}
}
// getting ready to do erasure coding, means that we're potentially going back in time,
// tell our caller we've inserted coding blocks starting at coding_index_start
// getting ready to do erasure coding, means that we're potentially
// going back in time, tell our caller we've inserted coding blocks
// starting at coding_index_start
*transmit_index_coding = cmp::min(*transmit_index_coding, coding_index_start);
let mut coding_blobs = Vec::with_capacity(NUM_CODING);
@ -602,7 +614,7 @@ mod test {
use signature::Keypair;
use signature::KeypairUtil;
// use std::sync::{Arc, RwLock};
use streamer::{index_blobs, WindowSlot};
use window::{index_blobs, WindowSlot};
#[test]
pub fn test_coding() {
@ -701,7 +713,8 @@ mod test {
let mut window = vec![
WindowSlot {
data: None,
coding: None
coding: None,
leader_unknown: false,
};
WINDOW_SIZE
];
@ -711,7 +724,7 @@ mod test {
let b_ = b.clone();
let mut w = b.write().unwrap();
// generate a random length, multiple of 4 between 8 and 32
let data_len = thread_rng().gen_range(2, 8) * 4;
let data_len = (thread_rng().gen_range(2, 8) * 4) + 1;
eprintln!("data_len of {} is {}", i, data_len);
w.set_size(data_len);

View File

@ -1,27 +1,25 @@
//! The `fullnode` module hosts all the fullnode microservices.
use bank::Bank;
use broadcast_stage::BroadcastStage;
use crdt::{Crdt, NodeInfo, TestNode};
use entry::Entry;
use ledger::{read_ledger, Block};
use ledger::read_ledger;
use ncp::Ncp;
use packet::BlobRecycler;
use rpu::Rpu;
use service::Service;
use signature::{Keypair, KeypairUtil};
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{JoinHandle, Result};
use std::time::Duration;
use streamer;
use tpu::Tpu;
use tvu::Tvu;
use untrusted::Input;
use window;
//use std::time::Duration;
pub struct FullNode {
pub struct Fullnode {
exit: Arc<AtomicBool>,
thread_hdls: Vec<JoinHandle<()>>,
}
@ -48,7 +46,7 @@ impl Config {
}
}
impl FullNode {
impl Fullnode {
fn new_internal(
mut node: TestNode,
leader: bool,
@ -56,7 +54,7 @@ impl FullNode {
keypair: Keypair,
network_entry_for_validator: Option<SocketAddr>,
sigverify_disabled: bool,
) -> FullNode {
) -> Self {
info!("creating bank...");
let bank = Bank::new_default(leader);
@ -84,11 +82,11 @@ impl FullNode {
let testnet_addr = network_entry_for_validator.expect("validator requires entry");
let network_entry_point = NodeInfo::new_entry_point(testnet_addr);
let server = FullNode::new_validator(
let server = Self::new_validator(
keypair,
bank,
entry_height,
Some(ledger_tail),
&ledger_tail,
node,
&network_entry_point,
exit.clone(),
@ -103,13 +101,11 @@ impl FullNode {
} else {
node.data.leader_id = node.data.id;
let server = FullNode::new_leader(
let server = Self::new_leader(
keypair,
bank,
entry_height,
Some(ledger_tail),
//Some(Duration::from_millis(1000)),
None,
&ledger_tail,
node,
exit.clone(),
ledger_path,
@ -129,8 +125,8 @@ impl FullNode {
ledger: &str,
keypair: Keypair,
network_entry_for_validator: Option<SocketAddr>,
) -> FullNode {
FullNode::new_internal(
) -> Self {
Self::new_internal(
node,
leader,
ledger,
@ -146,8 +142,8 @@ impl FullNode {
ledger_path: &str,
keypair: Keypair,
network_entry_for_validator: Option<SocketAddr>,
) -> FullNode {
FullNode::new_internal(
) -> Self {
Self::new_internal(
node,
leader,
ledger_path,
@ -157,26 +153,6 @@ impl FullNode {
)
}
fn new_window(
ledger_tail: Option<Vec<Entry>>,
entry_height: u64,
node_info: &NodeInfo,
blob_recycler: &BlobRecycler,
) -> streamer::SharedWindow {
match ledger_tail {
Some(ledger_tail) => {
// convert to blobs
let mut blobs = VecDeque::new();
ledger_tail.to_blobs(&blob_recycler, &mut blobs);
// flatten deque to vec
let blobs: Vec<_> = blobs.into_iter().collect();
streamer::initialized_window(&node_info, blobs, entry_height)
}
None => streamer::default_window(),
}
}
/// Create a server instance acting as a leader.
///
/// ```text
@ -205,13 +181,16 @@ impl FullNode {
keypair: Keypair,
bank: Bank,
entry_height: u64,
ledger_tail: Option<Vec<Entry>>,
tick_duration: Option<Duration>,
ledger_tail: &[Entry],
node: TestNode,
exit: Arc<AtomicBool>,
ledger_path: &str,
sigverify_disabled: bool,
) -> Self {
let tick_duration = None;
// TODO: To light up PoH, uncomment the following line:
//let tick_duration = Some(Duration::from_millis(1000));
let bank = Arc::new(bank);
let mut thread_hdls = vec![];
let rpu = Rpu::new(
@ -223,7 +202,8 @@ impl FullNode {
thread_hdls.extend(rpu.thread_hdls());
let blob_recycler = BlobRecycler::default();
let window = FullNode::new_window(ledger_tail, entry_height, &node.data, &blob_recycler);
let window =
window::new_window_from_entries(ledger_tail, entry_height, &node.data, &blob_recycler);
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
@ -249,7 +229,7 @@ impl FullNode {
).expect("Ncp::new");
thread_hdls.extend(ncp.thread_hdls());
let t_broadcast = streamer::broadcaster(
let broadcast_stage = BroadcastStage::new(
node.sockets.broadcast,
crdt,
window,
@ -257,9 +237,9 @@ impl FullNode {
blob_recycler.clone(),
blob_receiver,
);
thread_hdls.extend(vec![t_broadcast]);
thread_hdls.extend(broadcast_stage.thread_hdls());
FullNode { exit, thread_hdls }
Fullnode { exit, thread_hdls }
}
/// Create a server instance acting as a validator.
@ -295,7 +275,7 @@ impl FullNode {
keypair: Keypair,
bank: Bank,
entry_height: u64,
ledger_tail: Option<Vec<Entry>>,
ledger_tail: &[Entry],
node: TestNode,
entry_point: &NodeInfo,
exit: Arc<AtomicBool>,
@ -313,7 +293,8 @@ impl FullNode {
thread_hdls.extend(rpu.thread_hdls());
let blob_recycler = BlobRecycler::default();
let window = FullNode::new_window(ledger_tail, entry_height, &node.data, &blob_recycler);
let window =
window::new_window_from_entries(ledger_tail, entry_height, &node.data, &blob_recycler);
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
crdt.write()
@ -343,7 +324,7 @@ impl FullNode {
);
thread_hdls.extend(tvu.thread_hdls());
thread_hdls.extend(ncp.thread_hdls());
FullNode { exit, thread_hdls }
Fullnode { exit, thread_hdls }
}
//used for notifying many nodes in parallel to exit
@ -356,7 +337,7 @@ impl FullNode {
}
}
impl Service for FullNode {
impl Service for Fullnode {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
self.thread_hdls
}
@ -373,7 +354,7 @@ impl Service for FullNode {
mod tests {
use bank::Bank;
use crdt::TestNode;
use fullnode::FullNode;
use fullnode::Fullnode;
use mint::Mint;
use service::Service;
use signature::{Keypair, KeypairUtil};
@ -388,13 +369,13 @@ mod tests {
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone();
let v = FullNode::new_validator(keypair, bank, 0, None, tn, &entry, exit, None, false);
let v = Fullnode::new_validator(keypair, bank, 0, &[], tn, &entry, exit, None, false);
v.exit();
v.join().unwrap();
}
#[test]
fn validator_parallel_exit() {
let vals: Vec<FullNode> = (0..2)
let vals: Vec<Fullnode> = (0..2)
.map(|_| {
let keypair = Keypair::new();
let tn = TestNode::new_localhost_with_pubkey(keypair.pubkey());
@ -402,7 +383,7 @@ mod tests {
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone();
FullNode::new_validator(keypair, bank, 0, None, tn, &entry, exit, None, false)
Fullnode::new_validator(keypair, bank, 0, &[], tn, &entry, exit, None, false)
})
.collect();
//each validator can exit in parallel to speed many sequential calls to `join`

View File

@ -15,8 +15,8 @@ use std::io::prelude::*;
use std::io::{self, BufReader, BufWriter, Seek, SeekFrom};
use std::mem::size_of;
use std::path::Path;
use streamer::WINDOW_SIZE;
use transaction::Transaction;
use window::WINDOW_SIZE;
//
// A persistent ledger is 2 files:
@ -549,9 +549,11 @@ mod tests {
use transaction::{Transaction, Vote};
fn tmp_ledger_path(name: &str) -> String {
use std::env;
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
let keypair = Keypair::new();
format!("/tmp/tmp-ledger-{}-{}", name, keypair.pubkey())
format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey())
}
#[test]

View File

@ -12,6 +12,7 @@ pub mod counter;
pub mod bank;
pub mod banking_stage;
pub mod blob_fetch_stage;
pub mod broadcast_stage;
pub mod budget;
pub mod choose_gossip_peer_strategy;
pub mod client;
@ -39,6 +40,7 @@ pub mod request;
pub mod request_processor;
pub mod request_stage;
pub mod result;
pub mod retransmit_stage;
pub mod rpu;
pub mod service;
pub mod signature;
@ -53,7 +55,7 @@ pub mod tvu;
pub mod vote_stage;
pub mod voting;
pub mod wallet;
pub mod window_stage;
pub mod window;
pub mod write_stage;
extern crate bincode;
extern crate bs58;

View File

@ -1,19 +1,14 @@
//! The `nat` module assists with NAT traversal
extern crate futures;
extern crate p2p;
extern crate rand;
extern crate reqwest;
extern crate tokio_core;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use self::futures::Future;
use self::p2p::UdpSocketExt;
use rand::{thread_rng, Rng};
use std::env;
use std::io;
use std::str;
/// A data type representing a public Udp socket
pub struct UdpSocketPair {
@ -51,71 +46,3 @@ pub fn udp_random_bind(start: u16, end: u16, tries: u32) -> io::Result<UdpSocket
}
}
}
/// Binds a private Udp address to a public address using UPnP if possible
pub fn udp_public_bind(label: &str, startport: u16, endport: u16) -> UdpSocketPair {
let private_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let mut core = tokio_core::reactor::Core::new().unwrap();
let handle = core.handle();
let mc = p2p::P2p::default();
let res = core.run({
tokio_core::net::UdpSocket::bind_public(&private_addr, &handle, &mc)
.map_err(|e| {
info!("Failed to bind public socket for {}: {}", label, e);
})
.and_then(|(socket, public_addr)| Ok((public_addr, socket.local_addr().unwrap())))
});
match res {
Ok((public_addr, local_addr)) => {
info!(
"Using local address {} mapped to UPnP public address {} for {}",
local_addr, public_addr, label
);
// NAT should now be forwarding inbound packets directed at
// |public_addr| to the local |receiver| socket...
let receiver = UdpSocket::bind(local_addr).unwrap();
// TODO: try to autodetect a broken NAT (issue #496)
let sender = if env::var("BROKEN_NAT").is_err() {
receiver.try_clone().unwrap()
} else {
// ... however for outbound packets, some NATs *will not* rewrite the
// source port from |receiver.local_addr().port()| to |public_addr.port()|.
// This is currently a problem when talking with a fullnode as it
// assumes it can send UDP packets back at the source. This hits the
// NAT as a datagram for |receiver.local_addr().port()| on the NAT's public
// IP, which the NAT promptly discards. As a short term hack, create a
// local UDP socket, |sender|, with the same port as |public_addr.port()|.
//
// TODO: Remove the |sender| socket and deal with the downstream changes to
// the UDP signalling
let mut local_addr_sender = local_addr;
local_addr_sender.set_port(public_addr.port());
UdpSocket::bind(local_addr_sender).unwrap()
};
UdpSocketPair {
addr: public_addr,
receiver,
sender,
}
}
Err(_) => {
let sender = udp_random_bind(startport, endport, 5).unwrap();
let local_addr = sender.local_addr().unwrap();
let pub_ip = get_public_ip_addr().unwrap();
let pub_addr = SocketAddr::new(pub_ip, local_addr.port());
info!("Using source address {} for {}", pub_addr, label);
UdpSocketPair {
addr: pub_addr,
receiver: sender.try_clone().unwrap(),
sender,
}
}
}
}

View File

@ -10,6 +10,7 @@ use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
use streamer;
use window::SharedWindow;
pub struct Ncp {
exit: Arc<AtomicBool>,
@ -19,7 +20,7 @@ pub struct Ncp {
impl Ncp {
pub fn new(
crdt: &Arc<RwLock<Crdt>>,
window: streamer::SharedWindow,
window: SharedWindow,
ledger_path: Option<&str>,
gossip_listen_socket: UdpSocket,
gossip_send_socket: UdpSocket,

View File

@ -26,7 +26,7 @@ pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_HEADER_SIZE;
pub const PACKET_DATA_SIZE: usize = 256;
pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE;
#[derive(Clone, Default, Debug)]
#[derive(Clone, Default, Debug, PartialEq)]
#[repr(C)]
pub struct Meta {
pub size: usize,
@ -63,6 +63,19 @@ impl Default for Packet {
}
}
pub trait Reset {
// Reset trait is an object that can re-initialize important parts
// of itself, similar to Default, but not necessarily a full clear
// also, we do it in-place.
fn reset(&mut self);
}
impl Reset for Packet {
fn reset(&mut self) {
self.meta = Meta::default();
}
}
impl Meta {
pub fn addr(&self) -> SocketAddr {
if !self.v6 {
@ -113,6 +126,14 @@ impl Default for Packets {
}
}
impl Reset for Packets {
fn reset(&mut self) {
for i in 0..self.packets.len() {
self.packets[i].reset();
}
}
}
#[derive(Clone)]
pub struct Blob {
pub data: [u8; BLOB_SIZE],
@ -140,6 +161,13 @@ impl Default for Blob {
}
}
impl Reset for Blob {
fn reset(&mut self) {
self.meta = Meta::default();
self.data[..BLOB_HEADER_SIZE].copy_from_slice(&[0u8; BLOB_HEADER_SIZE]);
}
}
#[derive(Debug)]
pub enum BlobError {
/// the Blob's meta and data are not self-consistent
@ -166,25 +194,35 @@ impl<T: Default> Clone for Recycler<T> {
}
}
impl<T: Default> Recycler<T> {
impl<T: Default + Reset> Recycler<T> {
pub fn allocate(&self) -> Arc<RwLock<T>> {
let mut gc = self.gc.lock().expect("recycler lock in pb fn allocate");
let x = gc
.pop()
.unwrap_or_else(|| Arc::new(RwLock::new(Default::default())));
// Only return the item if this recycler is the last reference to it.
// Remove this check once `T` holds a Weak reference back to this
// recycler and implements `Drop`. At the time of this writing, Weak can't
// be passed across threads ('alloc' is a nightly-only API), and so our
// reference-counted recyclables are awkwardly being recycled by hand,
// which allows this race condition to exist.
if Arc::strong_count(&x) > 1 {
warn!("Recycled item still in use. Booting it.");
drop(gc);
self.allocate()
} else {
x
loop {
if let Some(x) = gc.pop() {
// Only return the item if this recycler is the last reference to it.
// Remove this check once `T` holds a Weak reference back to this
// recycler and implements `Drop`. At the time of this writing, Weak can't
// be passed across threads ('alloc' is a nightly-only API), and so our
// reference-counted recyclables are awkwardly being recycled by hand,
// which allows this race condition to exist.
if Arc::strong_count(&x) >= 1 {
// Commenting out this message, is annoying for known use case of
// validator hanging onto a blob in the window, but also sending it over
// to retransmmit_request
//
// warn!("Recycled item still in use. Booting it.");
continue;
}
{
let mut w = x.write().unwrap();
w.reset();
}
return x;
} else {
return Arc::new(RwLock::new(Default::default()));
}
}
}
pub fn recycle(&self, x: Arc<RwLock<T>>) {
@ -455,7 +493,8 @@ impl Blob {
#[cfg(test)]
mod tests {
use packet::{
to_packets, Blob, BlobRecycler, Packet, PacketRecycler, Packets, Recycler, NUM_PACKETS,
to_packets, Blob, BlobRecycler, Meta, Packet, PacketRecycler, Packets, Recycler, Reset,
BLOB_HEADER_SIZE, NUM_PACKETS,
};
use request::Request;
use std::collections::VecDeque;
@ -474,6 +513,12 @@ mod tests {
assert_eq!(r.gc.lock().unwrap().len(), 0);
}
impl Reset for u8 {
fn reset(&mut self) {
*self = Default::default();
}
}
#[test]
pub fn test_leaked_recyclable() {
// Ensure that the recycler won't return an item
@ -611,6 +656,9 @@ mod tests {
b.data_mut()[0] = 1;
assert_eq!(b.data()[0], 1);
assert_eq!(b.get_index().unwrap(), <u64>::max_value());
b.reset();
assert!(b.data[..BLOB_HEADER_SIZE].starts_with(&[0u8; BLOB_HEADER_SIZE]));
assert_eq!(b.meta, Meta::default());
}
}

View File

@ -9,7 +9,7 @@ use packet;
use serde_json;
use std;
use std::any::Any;
use streamer;
use window;
#[derive(Debug)]
pub enum Error {
@ -22,7 +22,7 @@ pub enum Error {
Serialize(std::boxed::Box<bincode::ErrorKind>),
BankError(bank::BankError),
CrdtError(crdt::CrdtError),
WindowError(streamer::WindowError),
WindowError(window::WindowError),
BlobError(packet::BlobError),
#[cfg(feature = "erasure")]
ErasureError(erasure::ErasureError),
@ -51,8 +51,8 @@ impl std::convert::From<crdt::CrdtError> for Error {
Error::CrdtError(e)
}
}
impl std::convert::From<streamer::WindowError> for Error {
fn from(e: streamer::WindowError) -> Error {
impl std::convert::From<window::WindowError> for Error {
fn from(e: window::WindowError) -> Error {
Error::WindowError(e)
}
}

123
src/retransmit_stage.rs Normal file
View File

@ -0,0 +1,123 @@
//! The `retransmit_stage` retransmits blobs between validators
use counter::Counter;
use crdt::Crdt;
use log::Level;
use packet::BlobRecycler;
use result::{Error, Result};
use service::Service;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::channel;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use streamer::BlobReceiver;
use window::{self, SharedWindow};
fn retransmit(
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
r: &BlobReceiver,
sock: &UdpSocket,
) -> Result<()> {
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq);
}
{
for b in &dq {
Crdt::retransmit(&crdt, b, sock)?;
}
}
while let Some(b) = dq.pop_front() {
recycler.recycle(b);
}
Ok(())
}
/// Service to retransmit messages from the leader to layer 1 nodes.
/// See `crdt` for network layer definitions.
/// # Arguments
/// * `sock` - Socket to read from. Read timeout is set to 1.
/// * `exit` - Boolean to signal system exit.
/// * `crdt` - This structure needs to be updated and populated by the bank and via gossip.
/// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
fn retransmitter(
sock: UdpSocket,
crdt: Arc<RwLock<Crdt>>,
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
Builder::new()
.name("solana-retransmitter".to_string())
.spawn(move || {
trace!("retransmitter started");
loop {
if let Err(e) = retransmit(&crdt, &recycler, &r, &sock) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => {
inc_new_counter_info!("streamer-retransmit-error", 1, 1);
}
}
}
}
trace!("exiting retransmitter");
})
.unwrap()
}
pub struct RetransmitStage {
thread_hdls: Vec<JoinHandle<()>>,
}
impl RetransmitStage {
pub fn new(
crdt: &Arc<RwLock<Crdt>>,
window: SharedWindow,
entry_height: u64,
retransmit_socket: UdpSocket,
blob_recycler: &BlobRecycler,
fetch_stage_receiver: BlobReceiver,
) -> (Self, BlobReceiver) {
let (retransmit_sender, retransmit_receiver) = channel();
let t_retransmit = retransmitter(
retransmit_socket,
crdt.clone(),
blob_recycler.clone(),
retransmit_receiver,
);
let (blob_sender, blob_receiver) = channel();
let t_window = window::window(
crdt.clone(),
window,
entry_height,
blob_recycler.clone(),
fetch_stage_receiver,
blob_sender,
retransmit_sender,
);
let thread_hdls = vec![t_retransmit, t_window];
(RetransmitStage { thread_hdls }, blob_receiver)
}
}
impl Service for RetransmitStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
self.thread_hdls
}
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() {
thread_hdl.join()?;
}
Ok(())
}
}

View File

@ -1,50 +1,19 @@
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
//!
use counter::Counter;
use crdt::{Crdt, CrdtError, NodeInfo};
#[cfg(feature = "erasure")]
use erasure;
use log::Level;
use packet::{
Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedBlobs, SharedPackets, BLOB_SIZE,
};
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlobs, SharedPackets};
use result::{Error, Result};
use std::cmp;
use std::collections::VecDeque;
use std::mem;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::thread::{Builder, JoinHandle};
use std::time::{Duration, Instant};
use timing::duration_as_ms;
use std::time::Duration;
pub const WINDOW_SIZE: u64 = 2 * 1024;
pub type PacketReceiver = Receiver<SharedPackets>;
pub type PacketSender = Sender<SharedPackets>;
pub type BlobSender = Sender<SharedBlobs>;
pub type BlobReceiver = Receiver<SharedBlobs>;
#[derive(Clone, Default)]
pub struct WindowSlot {
pub data: Option<SharedBlob>,
pub coding: Option<SharedBlob>,
}
pub type SharedWindow = Arc<RwLock<Vec<WindowSlot>>>;
#[derive(Debug, PartialEq, Eq)]
pub enum WindowError {
GenericError,
}
#[derive(Debug)]
pub struct WindowIndex {
pub data: u64,
pub coding: u64,
}
fn recv_loop(
sock: &UdpSocket,
exit: &Arc<AtomicBool>,
@ -172,782 +141,8 @@ pub fn blob_receiver(
Ok(t)
}
fn find_next_missing(
window: &SharedWindow,
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
consumed: u64,
received: u64,
) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
if received <= consumed {
Err(WindowError::GenericError)?;
}
let mut window = window.write().unwrap();
let reqs: Vec<_> = (consumed..received)
.filter_map(|pix| {
let i = (pix % WINDOW_SIZE) as usize;
if let Some(blob) = mem::replace(&mut window[i].data, None) {
let blob_idx = blob.read().unwrap().get_index().unwrap();
if blob_idx == pix {
mem::replace(&mut window[i].data, Some(blob));
} else {
recycler.recycle(blob);
}
}
if window[i].data.is_none() {
let val = crdt.read().unwrap().window_index_request(pix as u64);
if let Ok((to, req)) = val {
return Some((to, req));
}
}
None
})
.collect();
Ok(reqs)
}
fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u64) -> u64 {
// Calculate the highest blob index that this node should have already received
// via avalanche. The avalanche splits data stream into nodes and each node retransmits
// the data to their peer nodes. So there's a possibility that a blob (with index lower
// than current received index) is being retransmitted by a peer node.
let highest_lost = cmp::max(consumed, received.saturating_sub(num_peers));
// This check prevents repairing a blob that will cause window to roll over. Even if
// the highes_lost blob is actually missing, asking to repair it might cause our
// current window to move past other missing blobs
cmp::min(consumed + WINDOW_SIZE - 1, highest_lost)
}
fn repair_window(
debug_id: u64,
window: &SharedWindow,
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
last: &mut u64,
times: &mut usize,
consumed: u64,
received: u64,
) -> Result<()> {
//exponential backoff
if *last != consumed {
*times = 0;
}
*last = consumed;
*times += 1;
//if times flips from all 1s 7 -> 8, 15 -> 16, we retry otherwise return Ok
if *times & (*times - 1) != 0 {
trace!("repair_window counter {} {} {}", *times, consumed, received);
return Ok(());
}
let highest_lost = calculate_highest_lost_blob_index(
crdt.read().unwrap().table.len() as u64,
consumed,
received,
);
let reqs = find_next_missing(window, crdt, recycler, consumed, highest_lost)?;
trace!("{:x}: repair_window missing: {}", debug_id, reqs.len());
if !reqs.is_empty() {
inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
info!(
"{:x}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}",
debug_id,
*times,
consumed,
highest_lost,
reqs.len()
);
}
let sock = UdpSocket::bind("0.0.0.0:0")?;
for (to, req) in reqs {
//todo cache socket
debug!(
"{:x}: repair_window request {} {} {}",
debug_id, consumed, highest_lost, to
);
assert!(req.len() <= BLOB_SIZE);
sock.send_to(&req, to)?;
}
Ok(())
}
fn retransmit_all_leader_blocks(
maybe_leader: Option<NodeInfo>,
dq: &mut SharedBlobs,
debug_id: u64,
recycler: &BlobRecycler,
consumed: u64,
received: u64,
retransmit: &BlobSender,
) -> Result<()> {
let mut retransmit_queue = VecDeque::new();
if let Some(leader) = maybe_leader {
for b in dq {
let p = b.read().expect("'b' read lock in fn recv_window");
//TODO this check isn't safe against adverserial packets
//we need to maintain a sequence window
let leader_id = leader.id;
trace!(
"idx: {} addr: {:?} id: {:?} leader: {:?}",
p.get_index().expect("get_index in fn recv_window"),
p.get_id().expect("get_id in trace! fn recv_window"),
p.meta.addr(),
leader_id
);
if p.get_id().expect("get_id in fn recv_window") == leader_id {
//TODO
//need to copy the retransmitted blob
//otherwise we get into races with which thread
//should do the recycling
//
//a better abstraction would be to recycle when the blob
//is dropped via a weakref to the recycler
let nv = recycler.allocate();
{
let mut mnv = nv.write().expect("recycler write lock in fn recv_window");
let sz = p.meta.size;
mnv.meta.size = sz;
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
}
retransmit_queue.push_back(nv);
}
}
} else {
warn!("{:x}: no leader to retransmit from", debug_id);
}
if !retransmit_queue.is_empty() {
debug!(
"{:x}: RECV_WINDOW {} {}: retransmit {}",
debug_id,
consumed,
received,
retransmit_queue.len(),
);
inc_new_counter_info!("streamer-recv_window-retransmit", retransmit_queue.len());
retransmit.send(retransmit_queue)?;
}
Ok(())
}
/// process a blob: Add blob to the window. If a continuous set of blobs
/// starting from consumed is thereby formed, add that continuous
/// range of blobs to a queue to be sent on to the next stage.
///
/// * `debug_id` - this node's id in a useful-for-debug format
/// * `blob` - the blob to be processed into the window and rebroadcast
/// * `pix` - the index of the blob, corresponds to
/// the entry height of this blob
/// * `consume_queue` - output, blobs to be rebroadcast are placed here
/// * `window` - the window we're operating on
/// * `recycler` - where to return the blob once processed, also where
/// to return old blobs from the window
/// * `consumed` - input/output, the entry-height to which this
/// node has populated and rebroadcast entries
fn process_blob(
debug_id: u64,
blob: SharedBlob,
pix: u64,
consume_queue: &mut SharedBlobs,
window: &SharedWindow,
recycler: &BlobRecycler,
consumed: &mut u64,
) {
let mut window = window.write().unwrap();
let w = (pix % WINDOW_SIZE) as usize;
let is_coding = {
let blob_r = blob
.read()
.expect("blob read lock for flogs streamer::window");
blob_r.is_coding()
};
// insert a newly received blob into a window slot, clearing out and recycling any previous
// blob unless the incoming blob is a duplicate (based on idx)
// returns whether the incoming is a duplicate blob
fn insert_blob_is_dup(
debug_id: u64,
blob: SharedBlob,
pix: u64,
window_slot: &mut Option<SharedBlob>,
recycler: &BlobRecycler,
c_or_d: &str,
) -> bool {
if let Some(old) = mem::replace(window_slot, Some(blob)) {
let is_dup = old.read().unwrap().get_index().unwrap() == pix;
recycler.recycle(old);
trace!(
"{:x}: occupied {} window slot {:}, is_dup: {}",
debug_id,
c_or_d,
pix,
is_dup
);
is_dup
} else {
trace!("{:x}: empty {} window slot {:}", debug_id, c_or_d, pix);
false
}
}
// insert the new blob into the window, overwrite and recycle old (or duplicate) entry
let is_duplicate = if is_coding {
insert_blob_is_dup(
debug_id,
blob,
pix,
&mut window[w].coding,
recycler,
"coding",
)
} else {
insert_blob_is_dup(debug_id, blob, pix, &mut window[w].data, recycler, "data")
};
if is_duplicate {
return;
}
#[cfg(feature = "erasure")]
{
if erasure::recover(
debug_id,
recycler,
&mut window,
*consumed,
(*consumed % WINDOW_SIZE) as usize,
).is_err()
{
trace!("{:x}: erasure::recover failed", debug_id);
}
}
// push all contiguous blobs into consumed queue, increment consumed
loop {
let k = (*consumed % WINDOW_SIZE) as usize;
trace!("{:x}: k: {} consumed: {}", debug_id, k, *consumed,);
if let Some(blob) = &window[k].data {
if blob.read().unwrap().get_index().unwrap() < *consumed {
// window wrap-around, end of received
break;
}
} else {
// window[k].data is None, end of received
break;
}
consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window"));
*consumed += 1;
}
}
fn blob_idx_in_window(debug_id: u64, pix: u64, consumed: u64, received: &mut u64) -> bool {
// Prevent receive window from running over
// Got a blob which has already been consumed, skip it
// probably from a repair window request
if pix < consumed {
trace!(
"{:x}: received: {} but older than consumed: {} skipping..",
debug_id,
pix,
consumed
);
false
} else {
// received always has to be updated even if we don't accept the packet into
// the window. The worst case here is the server *starts* outside
// the window, none of the packets it receives fits in the window
// and repair requests (which are based on received) are never generated
*received = cmp::max(pix, *received);
if pix >= consumed + WINDOW_SIZE {
trace!(
"{:x}: received: {} will overrun window: {} skipping..",
debug_id,
pix,
consumed + WINDOW_SIZE
);
false
} else {
true
}
}
}
fn recv_window(
debug_id: u64,
window: &SharedWindow,
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
consumed: &mut u64,
received: &mut u64,
r: &BlobReceiver,
s: &BlobSender,
retransmit: &BlobSender,
) -> Result<()> {
let timer = Duration::from_millis(200);
let mut dq = r.recv_timeout(timer)?;
let maybe_leader: Option<NodeInfo> = crdt
.read()
.expect("'crdt' read lock in fn recv_window")
.leader_data()
.cloned();
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq)
}
let now = Instant::now();
inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100);
debug!(
"{:x}: RECV_WINDOW {} {}: got packets {}",
debug_id,
*consumed,
*received,
dq.len(),
);
retransmit_all_leader_blocks(
maybe_leader,
&mut dq,
debug_id,
recycler,
*consumed,
*received,
retransmit,
)?;
let mut pixs = Vec::new();
//send a contiguous set of blocks
let mut consume_queue = VecDeque::new();
while let Some(b) = dq.pop_front() {
let (pix, meta_size) = {
let p = b.write().expect("'b' write lock in fn recv_window");
(p.get_index()?, p.meta.size)
};
pixs.push(pix);
if !blob_idx_in_window(debug_id, pix, *consumed, received) {
recycler.recycle(b);
continue;
}
trace!("{:x} window pix: {} size: {}", debug_id, pix, meta_size);
process_blob(
debug_id,
b,
pix,
&mut consume_queue,
window,
recycler,
consumed,
);
}
if log_enabled!(Level::Trace) {
trace!("{}", print_window(debug_id, window, *consumed));
}
info!(
"{:x}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms",
debug_id,
*consumed,
*received,
consume_queue.len(),
pixs,
duration_as_ms(&now.elapsed())
);
if !consume_queue.is_empty() {
debug!(
"{:x}: RECV_WINDOW {} {}: forwarding consume_queue {}",
debug_id,
*consumed,
*received,
consume_queue.len(),
);
trace!(
"{:x}: sending consume_queue.len: {}",
debug_id,
consume_queue.len()
);
inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len());
s.send(consume_queue)?;
}
Ok(())
}
fn print_window(debug_id: u64, window: &SharedWindow, consumed: u64) -> String {
let pointer: Vec<_> = window
.read()
.unwrap()
.iter()
.enumerate()
.map(|(i, _v)| {
if i == (consumed % WINDOW_SIZE) as usize {
"V"
} else {
" "
}
})
.collect();
let buf: Vec<_> = window
.read()
.unwrap()
.iter()
.map(|v| {
if v.data.is_none() && v.coding.is_none() {
"O"
} else if v.data.is_some() && v.coding.is_some() {
"D"
} else if v.data.is_some() {
// coding.is_none()
"d"
} else {
// data.is_none()
"c"
}
})
.collect();
format!(
"\n{:x}: WINDOW ({}): {}\n{:x}: WINDOW ({}): {}",
debug_id,
consumed,
pointer.join(""),
debug_id,
consumed,
buf.join("")
)
}
pub fn default_window() -> SharedWindow {
Arc::new(RwLock::new(vec![
WindowSlot::default();
WINDOW_SIZE as usize
]))
}
pub fn index_blobs(
node_info: &NodeInfo,
blobs: &[SharedBlob],
receive_index: &mut u64,
) -> Result<()> {
// enumerate all the blobs, those are the indices
trace!("{:x}: INDEX_BLOBS {}", node_info.debug_id(), blobs.len());
for (i, b) in blobs.iter().enumerate() {
// only leader should be broadcasting
let mut blob = b.write().expect("'blob' write lock in crdt::index_blobs");
blob.set_id(node_info.id)
.expect("set_id in pub fn broadcast");
blob.set_index(*receive_index + i as u64)
.expect("set_index in pub fn broadcast");
blob.set_flags(0).unwrap();
}
Ok(())
}
/// Initialize a rebroadcast window with most recent Entry blobs
/// * `crdt` - gossip instance, used to set blob ids
/// * `blobs` - up to WINDOW_SIZE most recent blobs
/// * `entry_height` - current entry height
pub fn initialized_window(
node_info: &NodeInfo,
blobs: Vec<SharedBlob>,
entry_height: u64,
) -> SharedWindow {
let window = default_window();
let debug_id = node_info.debug_id();
{
let mut win = window.write().unwrap();
trace!(
"{:x} initialized window entry_height:{} blobs_len:{}",
debug_id,
entry_height,
blobs.len()
);
// Index the blobs
let mut received = entry_height - blobs.len() as u64;
index_blobs(&node_info, &blobs, &mut received).expect("index blobs for initial window");
// populate the window, offset by implied index
let diff = cmp::max(blobs.len() as isize - win.len() as isize, 0) as usize;
for b in blobs.into_iter().skip(diff) {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
trace!("{:x} caching {} at {}", debug_id, ix, pos);
assert!(win[pos].data.is_none());
win[pos].data = Some(b);
}
}
window
}
pub fn window(
crdt: Arc<RwLock<Crdt>>,
window: SharedWindow,
entry_height: u64,
recycler: BlobRecycler,
r: BlobReceiver,
s: BlobSender,
retransmit: BlobSender,
) -> JoinHandle<()> {
Builder::new()
.name("solana-window".to_string())
.spawn(move || {
let mut consumed = entry_height;
let mut received = entry_height;
let mut last = entry_height;
let mut times = 0;
let debug_id = crdt.read().unwrap().debug_id();
trace!("{:x}: RECV_WINDOW started", debug_id);
loop {
if let Err(e) = recv_window(
debug_id,
&window,
&crdt,
&recycler,
&mut consumed,
&mut received,
&r,
&s,
&retransmit,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => {
inc_new_counter_info!("streamer-window-error", 1, 1);
error!("window error: {:?}", e);
}
}
}
let _ = repair_window(
debug_id, &window, &crdt, &recycler, &mut last, &mut times, consumed, received,
);
}
})
.unwrap()
}
fn broadcast(
node_info: &NodeInfo,
broadcast_table: &[NodeInfo],
window: &SharedWindow,
recycler: &BlobRecycler,
r: &BlobReceiver,
sock: &UdpSocket,
transmit_index: &mut WindowIndex,
receive_index: &mut u64,
) -> Result<()> {
let debug_id = node_info.debug_id();
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq);
}
// flatten deque to vec
let blobs_vec: Vec<_> = dq.into_iter().collect();
// We could receive more blobs than window slots so
// break them up into window-sized chunks to process
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec());
if log_enabled!(Level::Trace) {
trace!("{}", print_window(debug_id, window, *receive_index));
}
for mut blobs in blobs_chunked {
let blobs_len = blobs.len();
trace!("{:x}: broadcast blobs.len: {}", debug_id, blobs_len);
// Index the blobs
index_blobs(node_info, &blobs, receive_index).expect("index blobs for initial window");
// keep the cache of blobs that are broadcast
inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
{
let mut win = window.write().unwrap();
assert!(blobs.len() <= win.len());
for b in &blobs {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
if let Some(x) = mem::replace(&mut win[pos].data, None) {
trace!(
"{:x} popped {} at {}",
debug_id,
x.read().unwrap().get_index().unwrap(),
pos
);
recycler.recycle(x);
}
if let Some(x) = mem::replace(&mut win[pos].coding, None) {
trace!(
"{:x} popped {} at {}",
debug_id,
x.read().unwrap().get_index().unwrap(),
pos
);
recycler.recycle(x);
}
trace!("{:x} null {}", debug_id, pos);
}
while let Some(b) = blobs.pop() {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
trace!("{:x} caching {} at {}", debug_id, ix, pos);
assert!(win[pos].data.is_none());
win[pos].data = Some(b);
}
}
// Fill in the coding blob data from the window data blobs
#[cfg(feature = "erasure")]
{
erasure::generate_coding(
debug_id,
&mut window.write().unwrap(),
recycler,
*receive_index,
blobs_len,
&mut transmit_index.coding,
)?;
}
*receive_index += blobs_len as u64;
// Send blobs out from the window
Crdt::broadcast(
&node_info,
&broadcast_table,
&window,
&sock,
transmit_index,
*receive_index,
)?;
}
Ok(())
}
/// Service to broadcast messages from the leader to layer 1 nodes.
/// See `crdt` for network layer definitions.
/// # Arguments
/// * `sock` - Socket to send from.
/// * `exit` - Boolean to signal system exit.
/// * `crdt` - CRDT structure
/// * `window` - Cache of blobs that we have broadcast
/// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
pub fn broadcaster(
sock: UdpSocket,
crdt: Arc<RwLock<Crdt>>,
window: SharedWindow,
entry_height: u64,
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
let mut transmit_index = WindowIndex {
data: entry_height,
coding: entry_height,
};
let mut receive_index = entry_height;
let me = crdt.read().unwrap().my_data().clone();
loop {
let broadcast_table = crdt.read().unwrap().compute_broadcast_table();
if let Err(e) = broadcast(
&me,
&broadcast_table,
&window,
&recycler,
&r,
&sock,
&mut transmit_index,
&mut receive_index,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
_ => {
inc_new_counter_info!("streamer-broadcaster-error", 1, 1);
error!("broadcaster error: {:?}", e);
}
}
}
}
})
.unwrap()
}
fn retransmit(
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
r: &BlobReceiver,
sock: &UdpSocket,
) -> Result<()> {
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq);
}
{
for b in &dq {
Crdt::retransmit(&crdt, b, sock)?;
}
}
while let Some(b) = dq.pop_front() {
recycler.recycle(b);
}
Ok(())
}
/// Service to retransmit messages from the leader to layer 1 nodes.
/// See `crdt` for network layer definitions.
/// # Arguments
/// * `sock` - Socket to read from. Read timeout is set to 1.
/// * `exit` - Boolean to signal system exit.
/// * `crdt` - This structure needs to be updated and populated by the bank and via gossip.
/// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
pub fn retransmitter(
sock: UdpSocket,
crdt: Arc<RwLock<Crdt>>,
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
Builder::new()
.name("solana-retransmitter".to_string())
.spawn(move || {
trace!("retransmitter started");
loop {
if let Err(e) = retransmit(&crdt, &recycler, &r, &sock) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => {
inc_new_counter_info!("streamer-retransmit-error", 1, 1);
error!("retransmitter error: {:?}", e);
}
}
}
}
trace!("exiting retransmitter");
})
.unwrap()
}
#[cfg(test)]
mod test {
use crdt::{Crdt, TestNode};
use logger;
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
use std::collections::VecDeque;
use std::io;
@ -955,12 +150,10 @@ mod test {
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::time::Duration;
use streamer::blob_idx_in_window;
use streamer::calculate_highest_lost_blob_index;
use streamer::{blob_receiver, receiver, responder, window};
use streamer::{default_window, BlobReceiver, PacketReceiver, WINDOW_SIZE};
use streamer::PacketReceiver;
use streamer::{receiver, responder};
fn get_msgs(r: PacketReceiver, num: &mut usize) {
for _t in 0..5 {
@ -1022,144 +215,4 @@ mod test {
t_receiver.join().expect("join");
t_responder.join().expect("join");
}
fn get_blobs(r: BlobReceiver, num: &mut usize) {
for _t in 0..5 {
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(m) => {
for (i, v) in m.iter().enumerate() {
assert_eq!(v.read().unwrap().get_index().unwrap() as usize, *num + i);
}
*num += m.len();
}
e => info!("error {:?}", e),
}
if *num == 10 {
break;
}
}
}
#[test]
pub fn window_send_test() {
logger::setup();
let tn = TestNode::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
let mut crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new");
let me_id = crdt_me.my_data().id;
crdt_me.set_leader(me_id);
let subs = Arc::new(RwLock::new(crdt_me));
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(
exit.clone(),
resp_recycler.clone(),
tn.sockets.gossip,
s_reader,
).unwrap();
let (s_window, r_window) = channel();
let (s_retransmit, r_retransmit) = channel();
let win = default_window();
let t_window = window(
subs,
win,
0,
resp_recycler.clone(),
r_reader,
s_window,
s_retransmit,
);
let t_responder = {
let (s_responder, r_responder) = channel();
let t_responder = responder(
"window_send_test",
tn.sockets.replicate,
resp_recycler.clone(),
r_responder,
);
let mut msgs = VecDeque::new();
for v in 0..10 {
let i = 9 - v;
let b = resp_recycler.allocate();
{
let mut w = b.write().unwrap();
w.set_index(i).unwrap();
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.data.contact_info.ncp);
}
msgs.push_back(b);
}
s_responder.send(msgs).expect("send");
t_responder
};
let mut num = 0;
get_blobs(r_window, &mut num);
assert_eq!(num, 10);
let mut q = r_retransmit.recv().unwrap();
while let Ok(mut nq) = r_retransmit.try_recv() {
q.append(&mut nq);
}
assert_eq!(q.len(), 10);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
t_window.join().expect("join");
}
#[test]
pub fn calculate_highest_lost_blob_index_test() {
assert_eq!(calculate_highest_lost_blob_index(0, 10, 90), 90);
assert_eq!(calculate_highest_lost_blob_index(15, 10, 90), 75);
assert_eq!(calculate_highest_lost_blob_index(90, 10, 90), 10);
assert_eq!(calculate_highest_lost_blob_index(90, 10, 50), 10);
assert_eq!(calculate_highest_lost_blob_index(90, 10, 99), 10);
assert_eq!(calculate_highest_lost_blob_index(90, 10, 101), 11);
assert_eq!(
calculate_highest_lost_blob_index(90, 10, 95 + WINDOW_SIZE),
WINDOW_SIZE + 5
);
assert_eq!(
calculate_highest_lost_blob_index(90, 10, 99 + WINDOW_SIZE),
WINDOW_SIZE + 9
);
assert_eq!(
calculate_highest_lost_blob_index(90, 10, 100 + WINDOW_SIZE),
WINDOW_SIZE + 9
);
assert_eq!(
calculate_highest_lost_blob_index(90, 10, 120 + WINDOW_SIZE),
WINDOW_SIZE + 9
);
}
fn wrap_blob_idx_in_window(
debug_id: u64,
pix: u64,
consumed: u64,
received: u64,
) -> (bool, u64) {
let mut received = received;
let is_in_window = blob_idx_in_window(debug_id, pix, consumed, &mut received);
(is_in_window, received)
}
#[test]
pub fn blob_idx_in_window_test() {
assert_eq!(
wrap_blob_idx_in_window(0, 90 + WINDOW_SIZE, 90, 100),
(false, 90 + WINDOW_SIZE)
);
assert_eq!(
wrap_blob_idx_in_window(0, 91 + WINDOW_SIZE, 90, 100),
(false, 91 + WINDOW_SIZE)
);
assert_eq!(wrap_blob_idx_in_window(0, 89, 90, 100), (false, 100));
assert_eq!(wrap_blob_idx_in_window(0, 91, 90, 100), (true, 100));
assert_eq!(wrap_blob_idx_in_window(0, 101, 90, 100), (true, 101));
}
}

View File

@ -319,7 +319,7 @@ mod tests {
use bank::Bank;
use budget::Budget;
use crdt::TestNode;
use fullnode::FullNode;
use fullnode::Fullnode;
use ledger::LedgerWriter;
use logger;
use mint::Mint;
@ -331,9 +331,11 @@ mod tests {
use transaction::{Instruction, Plan};
fn tmp_ledger(name: &str, mint: &Mint) -> String {
use std::env;
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
let keypair = Keypair::new();
let path = format!("/tmp/tmp-ledger-{}-{}", name, keypair.pubkey());
let path = format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey());
let mut writer = LedgerWriter::open(&path, true).unwrap();
writer.write_entries(mint.create_entries()).unwrap();
@ -354,12 +356,11 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false));
let ledger_path = tmp_ledger("thin_client", &alice);
let server = FullNode::new_leader(
let server = Fullnode::new_leader(
leader_keypair,
bank,
0,
None,
Some(Duration::from_millis(30)),
&[],
leader,
exit.clone(),
&ledger_path,
@ -402,12 +403,11 @@ mod tests {
let leader_data = leader.data.clone();
let ledger_path = tmp_ledger("bad_sig", &alice);
let server = FullNode::new_leader(
let server = Fullnode::new_leader(
leader_keypair,
bank,
0,
None,
Some(Duration::from_millis(30)),
&[],
leader,
exit.clone(),
&ledger_path,
@ -462,12 +462,11 @@ mod tests {
let leader_data = leader.data.clone();
let ledger_path = tmp_ledger("client_check_signature", &alice);
let server = FullNode::new_leader(
let server = Fullnode::new_leader(
leader_keypair,
bank,
0,
None,
Some(Duration::from_millis(30)),
&[],
leader,
exit.clone(),
&ledger_path,

View File

@ -2,29 +2,29 @@
//! 3-stage transaction validation pipeline in software.
//!
//! ```text
//! .--------------------------------------------.
//! | |
//! | .--------------------------------+------------.
//! | | TVU | |
//! | | | |
//! | | | | .------------.
//! | | .------------+-------------->| Validators |
//! v | .-------. | | | `------------`
//! .----+---. | | | .----+---. .----+---------. |
//! | Leader |--------->| Blob | | Window | | Replicate | |
//! `--------` | | Fetch |-->| Stage |-->| Stage / | |
//! .------------. | | Stage | | | | Vote Stage | |
//! | Validators |----->| | `--------` `----+---------` |
//! `------------` | `-------` | |
//! | | |
//! | | |
//! | | |
//! `--------------------------------|------------`
//! |
//! v
//! .------.
//! | Bank |
//! `------`
//! .------------------------------------------------.
//! | |
//! | .------------------------------------+------------.
//! | | TVU | |
//! | | | |
//! | | | | .------------.
//! | | .----------------+-------------->| Validators |
//! v | .-------. | | | `------------`
//! .----+---. | | | .----+-------. .----+---------. |
//! | Leader |--------->| Blob | | Retransmit | | Replicate | |
//! `--------` | | Fetch |-->| Stage |-->| Stage / | |
//! .------------. | | Stage | | | | Vote Stage | |
//! | Validators |----->| | `------------` `----+---------` |
//! `------------` | `-------` | |
//! | | |
//! | | |
//! | | |
//! `------------------------------------|------------`
//! |
//! v
//! .------.
//! | Bank |
//! `------`
//! ```
//!
//! 1. Fetch Stage
@ -41,19 +41,19 @@ use blob_fetch_stage::BlobFetchStage;
use crdt::Crdt;
use packet::BlobRecycler;
use replicate_stage::ReplicateStage;
use retransmit_stage::RetransmitStage;
use service::Service;
use signature::Keypair;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
use streamer::SharedWindow;
use window_stage::WindowStage;
use window::SharedWindow;
pub struct Tvu {
replicate_stage: ReplicateStage,
fetch_stage: BlobFetchStage,
window_stage: WindowStage,
retransmit_stage: RetransmitStage,
}
impl Tvu {
@ -90,7 +90,7 @@ impl Tvu {
//TODO
//the packets coming out of blob_receiver need to be sent to the GPU and verified
//then sent to the window, which does the erasure coding reconstruction
let (window_stage, blob_window_receiver) = WindowStage::new(
let (retransmit_stage, blob_window_receiver) = RetransmitStage::new(
&crdt,
window,
entry_height,
@ -112,7 +112,7 @@ impl Tvu {
Tvu {
replicate_stage,
fetch_stage,
window_stage,
retransmit_stage,
}
}
@ -127,7 +127,7 @@ impl Service for Tvu {
let mut thread_hdls = vec![];
thread_hdls.extend(self.replicate_stage.thread_hdls().into_iter());
thread_hdls.extend(self.fetch_stage.thread_hdls().into_iter());
thread_hdls.extend(self.window_stage.thread_hdls().into_iter());
thread_hdls.extend(self.retransmit_stage.thread_hdls().into_iter());
thread_hdls
}
@ -159,16 +159,17 @@ pub mod tests {
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer::{self, SharedWindow};
use streamer;
use transaction::Transaction;
use tvu::Tvu;
use window::{self, SharedWindow};
fn new_ncp(
crdt: Arc<RwLock<Crdt>>,
listen: UdpSocket,
exit: Arc<AtomicBool>,
) -> Result<(Ncp, SharedWindow)> {
let window = streamer::default_window();
let window = window::default_window();
let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let ncp = Ncp::new(&crdt, window.clone(), None, listen, send_sock, exit)?;
Ok((ncp, window))

1043
src/window.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,60 +0,0 @@
//! The `window_stage` maintains the blob window
use crdt::Crdt;
use packet::BlobRecycler;
use service::Service;
use std::net::UdpSocket;
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
use streamer::{self, BlobReceiver, SharedWindow};
pub struct WindowStage {
thread_hdls: Vec<JoinHandle<()>>,
}
impl WindowStage {
pub fn new(
crdt: &Arc<RwLock<Crdt>>,
window: SharedWindow,
entry_height: u64,
retransmit_socket: UdpSocket,
blob_recycler: &BlobRecycler,
fetch_stage_receiver: BlobReceiver,
) -> (Self, BlobReceiver) {
let (retransmit_sender, retransmit_receiver) = channel();
let t_retransmit = streamer::retransmitter(
retransmit_socket,
crdt.clone(),
blob_recycler.clone(),
retransmit_receiver,
);
let (blob_sender, blob_receiver) = channel();
let t_window = streamer::window(
crdt.clone(),
window,
entry_height,
blob_recycler.clone(),
fetch_stage_receiver,
blob_sender,
retransmit_sender,
);
let thread_hdls = vec![t_retransmit, t_window];
(WindowStage { thread_hdls }, blob_receiver)
}
}
impl Service for WindowStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
self.thread_hdls
}
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() {
thread_hdl.join()?;
}
Ok(())
}
}

189
tests/multinode.rs Executable file → Normal file
View File

@ -7,7 +7,7 @@ extern crate solana;
use solana::crdt::{Crdt, NodeInfo, TestNode};
use solana::entry::Entry;
use solana::fullnode::FullNode;
use solana::fullnode::Fullnode;
use solana::hash::Hash;
use solana::ledger::LedgerWriter;
use solana::logger;
@ -16,10 +16,9 @@ use solana::ncp::Ncp;
use solana::result;
use solana::service::Service;
use solana::signature::{Keypair, KeypairUtil, Pubkey};
use solana::streamer::{default_window, WINDOW_SIZE};
use solana::thin_client::ThinClient;
use solana::timing::duration_as_s;
use std::cmp::max;
use solana::timing::{duration_as_ms, duration_as_s};
use solana::window::{default_window, WINDOW_SIZE};
use std::env;
use std::fs::{copy, create_dir_all, remove_dir_all};
use std::net::UdpSocket;
@ -146,7 +145,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
writer.write_entries(entries).unwrap();
}
let leader = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None);
let leader = Fullnode::new(leader, true, &leader_ledger_path, leader_keypair, None);
// Send leader some tokens to vote
let leader_balance =
@ -158,7 +157,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
let keypair = Keypair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let validator_data = validator.data.clone();
let validator = FullNode::new(
let validator = Fullnode::new(
validator,
false,
&zero_ledger_path,
@ -219,7 +218,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
);
ledger_paths.push(zero_ledger_path.clone());
let server = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None);
let server = Fullnode::new(leader, true, &leader_ledger_path, leader_keypair, None);
// Send leader some tokens to vote
let leader_balance =
@ -236,7 +235,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
);
ledger_paths.push(ledger_path.clone());
let mut val = FullNode::new(
let mut val = Fullnode::new(
validator,
false,
&ledger_path,
@ -271,7 +270,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
// balances
let keypair = Keypair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let val = FullNode::new(
let val = Fullnode::new(
validator,
false,
&zero_ledger_path,
@ -336,7 +335,7 @@ fn test_multi_node_basic() {
let (alice, leader_ledger_path) = genesis("multi_node_basic", 10_000);
ledger_paths.push(leader_ledger_path.clone());
let server = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None);
let server = Fullnode::new(leader, true, &leader_ledger_path, leader_keypair, None);
// Send leader some tokens to vote
let leader_balance =
@ -349,7 +348,7 @@ fn test_multi_node_basic() {
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_basic");
ledger_paths.push(ledger_path.clone());
let val = FullNode::new(
let val = Fullnode::new(
validator,
false,
&ledger_path,
@ -397,7 +396,7 @@ fn test_boot_validator_from_file() -> result::Result<()> {
ledger_paths.push(leader_ledger_path.clone());
let leader_data = leader.data.clone();
let leader_fullnode = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None);
let leader_fullnode = Fullnode::new(leader, true, &leader_ledger_path, leader_keypair, None);
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap();
assert_eq!(leader_balance, 500);
@ -410,7 +409,7 @@ fn test_boot_validator_from_file() -> result::Result<()> {
let validator_data = validator.data.clone();
let ledger_path = tmp_copy_ledger(&leader_ledger_path, "boot_validator_from_file");
ledger_paths.push(ledger_path.clone());
let val_fullnode = FullNode::new(
let val_fullnode = Fullnode::new(
validator,
false,
&ledger_path,
@ -430,11 +429,11 @@ fn test_boot_validator_from_file() -> result::Result<()> {
Ok(())
}
fn create_leader(ledger_path: &str) -> (NodeInfo, FullNode) {
fn create_leader(ledger_path: &str) -> (NodeInfo, Fullnode) {
let leader_keypair = Keypair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let leader_fullnode = FullNode::new(leader, true, &ledger_path, leader_keypair, None);
let leader_fullnode = Fullnode::new(leader, true, &ledger_path, leader_keypair, None);
(leader_data, leader_fullnode)
}
@ -479,7 +478,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let validator_data = validator.data.clone();
let val_fullnode = FullNode::new(
let val_fullnode = Fullnode::new(
validator,
false,
&stale_ledger_path,
@ -524,15 +523,7 @@ fn test_multi_node_dynamic_network() {
Ok(val) => val
.parse()
.expect(&format!("env var {} is not parse-able as usize", key)),
Err(_) => 100,
};
let purge_key = "SOLANA_DYNAMIC_NODES_PURGE_LAG";
let purge_lag: usize = match env::var(purge_key) {
Ok(val) => val
.parse()
.expect(&format!("env var {} is not parse-able as usize", purge_key)),
Err(_) => std::usize::MAX,
Err(_) => 200,
};
let leader_keypair = Keypair::new();
@ -548,7 +539,7 @@ fn test_multi_node_dynamic_network() {
let leader_data = leader.data.clone();
let server =
FullNode::new_without_sigverify(leader, true, &leader_ledger_path, leader_keypair, None);
Fullnode::new_without_sigverify(leader, true, &leader_ledger_path, leader_keypair, None);
// Send leader some tokens to vote
let leader_balance = send_tx_and_retry_get_balance(
@ -616,7 +607,7 @@ fn test_multi_node_dynamic_network() {
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let rd = validator.data.clone();
info!("starting {} {:x}", keypair.pubkey(), rd.debug_id());
let val = FullNode::new_without_sigverify(
let val = Fullnode::new_without_sigverify(
validator,
false,
&ledger_path,
@ -631,94 +622,78 @@ fn test_multi_node_dynamic_network() {
let mut validators: Vec<_> = t2.into_iter().map(|t| t.join().unwrap()).collect();
let now = Instant::now();
let mut client = mk_client(&leader_data);
let mut last_finality = client.get_finality();
info!("Last finality {}", last_finality);
let start = Instant::now();
let mut consecutive_success = 0;
let mut failures = 0;
let mut max_distance_increase = 0i64;
for i in 0..num_nodes {
//verify leader can do transfer
let expected = ((i + 3) * 500) as i64;
let leader_balance = retry_send_tx_and_retry_get_balance(
&leader_data,
&alice_arc.read().unwrap(),
&bob_pubkey,
Some(expected),
).unwrap();
if leader_balance != expected {
info!(
"leader dropped transaction {} {:?} {:?}",
i, leader_balance, expected
);
let mut expected_balance = leader_balance;
for i in 0..std::cmp::max(20, num_nodes) {
trace!("getting leader last_id");
let last_id = client.get_last_id();
trace!("executing leader transfer");
let sig = client
.transfer(
500,
&alice_arc.read().unwrap().keypair(),
bob_pubkey,
&last_id,
)
.unwrap();
expected_balance += 500;
assert!(client.poll_for_signature(&sig).is_ok());
let now = Instant::now();
let mut finality = client.get_finality();
// Need this to make sure the finality is updated
// (i.e. the node is not returning stale value)
while last_finality == finality {
finality = client.get_finality();
}
//verify all validators have the same balance
{
let mut success = 0usize;
let mut max_distance = 0i64;
let mut total_distance = 0i64;
while duration_as_ms(&now.elapsed()) < finality as u64 {
sleep(Duration::from_millis(100));
finality = client.get_finality()
}
last_finality = finality;
let balance = retry_get_balance(&mut client, &bob_pubkey, Some(expected_balance));
assert_eq!(balance, Some(expected_balance));
consecutive_success += 1;
info!(
"SUCCESS[{}] balance: {}, finality: {} ms",
i, expected_balance, last_finality,
);
if consecutive_success == 10 {
info!("Took {} s to converge", duration_as_s(&start.elapsed()),);
info!("Verifying signature of the last transaction in the validators");
let mut num_nodes_behind = 0i64;
validators.retain(|server| {
let mut retain_me = true;
let mut client = mk_client(&server.0);
trace!("{:x} {} get_balance start", server.0.debug_id(), i);
let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance));
trace!(
"{:x} {} get_balance: {:?} leader_balance: {}",
server.0.debug_id(),
i,
getbal,
leader_balance
);
let bal = getbal.unwrap_or(0);
let distance = (leader_balance - bal) / 500;
max_distance = max(distance, max_distance);
total_distance += distance;
if distance > max_distance_increase {
info!("Node {:x} is behind by {}", server.0.debug_id(), distance);
max_distance_increase = distance;
if max_distance_increase as u64 > purge_lag as u64 {
server.1.exit();
info!("Node {:x} is exiting", server.0.debug_id());
retain_me = false;
}
}
if distance > 0 {
num_nodes_behind += 1;
}
if let Some(bal) = getbal {
if bal == leader_balance {
success += 1;
}
}
retain_me
trace!("{:x} polling for signature", server.0.debug_id());
num_nodes_behind += match client.poll_for_signature(&sig) {
Ok(_) => 0,
Err(_) => 1,
};
true
});
if num_nodes_behind != 0 {
info!("{} nodes are lagging behind leader", num_nodes_behind);
}
info!(
"SUCCESS[{}] {} out of {} distance: {} max_distance: {} finality: {}",
i,
success,
"Validators lagging: {}/{}",
num_nodes_behind,
validators.len(),
total_distance,
max_distance,
get_finality(&leader_data)
);
if success == validators.len() && total_distance == 0 {
consecutive_success += 1;
} else {
consecutive_success = 0;
failures += 1;
}
if consecutive_success == 10 {
break;
}
break;
}
}
info!(
"Took {} s to converge total failures: {}",
duration_as_s(&now.elapsed()),
failures
);
assert_eq!(consecutive_success, 10);
for (_, node) in &validators {
node.exit();
@ -821,9 +796,3 @@ fn retry_send_tx_and_retry_get_balance(
}
None
}
fn get_finality(leader: &NodeInfo) -> usize {
let mut client = mk_client(leader);
trace!("getting leader finality");
client.get_finality()
}