Compare commits
46 Commits
Author | SHA1 | Date | |
---|---|---|---|
7d5bb28128 | |||
ae433d6a34 | |||
e3c668acff | |||
5825501c79 | |||
7e84bb7a60 | |||
da1fd96d50 | |||
141e1e974d | |||
fc0d7f5982 | |||
f697632edb | |||
73797c789b | |||
036fcced31 | |||
1d3157fb80 | |||
0b11c2e119 | |||
96af892d95 | |||
c2983f824e | |||
88d6fea999 | |||
c23fa289c3 | |||
db35f220f7 | |||
982afa87a6 | |||
dccae18b53 | |||
53e86f2fa2 | |||
757dfd36a3 | |||
708add0e64 | |||
d8991ae2ca | |||
5f6cbe0cf8 | |||
f167b0c2c5 | |||
f784500fbb | |||
83df47323a | |||
c75d4abb0b | |||
5216a723b1 | |||
b801ca477d | |||
c830c604f4 | |||
0e66606c7f | |||
8707abe091 | |||
dc2a840985 | |||
2727067b94 | |||
6a8a494f5d | |||
a09d2e252a | |||
3e9c463ff1 | |||
46d50f5bde | |||
e8da903c6c | |||
ab10b7676a | |||
fa44a71d3e | |||
c86e9e8568 | |||
9e22e23ce6 | |||
835f29a178 |
@ -45,7 +45,8 @@ understood. Avoid introducing new 3-letter terms, which can be confused with 3-l
|
||||
|
||||
Some terms we currently use regularly in the codebase:
|
||||
|
||||
* hash: n. A SHA-256 Hash
|
||||
* fullnode: n. A fully participating network node.
|
||||
* hash: n. A SHA-256 Hash.
|
||||
* keypair: n. A Ed25519 key-pair, containing a public and private key.
|
||||
* pubkey: n. The public key of a Ed25519 key-pair.
|
||||
* sigverify: v. To verify a Ed25519 digital signature.
|
||||
|
@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "solana"
|
||||
description = "Blockchain, Rebuilt for Scale"
|
||||
version = "0.7.1"
|
||||
version = "0.7.2"
|
||||
documentation = "https://docs.rs/solana"
|
||||
homepage = "http://solana.com/"
|
||||
readme = "README.md"
|
||||
@ -79,7 +79,6 @@ itertools = "0.7.8"
|
||||
libc = "0.2.1"
|
||||
log = "0.4.2"
|
||||
matches = "0.1.6"
|
||||
p2p = "0.5.2"
|
||||
pnet_datalink = "0.21.0"
|
||||
rand = "0.5.1"
|
||||
rayon = "1.0.0"
|
||||
|
@ -4,6 +4,11 @@ steps:
|
||||
env:
|
||||
CARGO_TARGET_CACHE_NAME: "stable"
|
||||
timeout_in_minutes: 30
|
||||
- command: "ci/docker-run.sh solanalabs/rust ci/test-bench.sh"
|
||||
name: "bench [public]"
|
||||
env:
|
||||
CARGO_TARGET_CACHE_NAME: "stable"
|
||||
timeout_in_minutes: 30
|
||||
- command: "ci/shellcheck.sh"
|
||||
name: "shellcheck [public]"
|
||||
timeout_in_minutes: 20
|
||||
|
@ -1,6 +1,6 @@
|
||||
FROM rustlang/rust:nightly
|
||||
|
||||
RUN cargo install --force clippy cargo-cov && \
|
||||
RUN rustup component add clippy-preview --toolchain=nightly && \
|
||||
echo deb http://ftp.debian.org/debian stretch-backports main >> /etc/apt/sources.list && \
|
||||
apt update && \
|
||||
apt install -y \
|
||||
|
@ -55,21 +55,31 @@ trap shutdown EXIT INT
|
||||
|
||||
set -e
|
||||
|
||||
flag_error() {
|
||||
echo Failed
|
||||
echo "^^^ +++"
|
||||
exit 1
|
||||
}
|
||||
|
||||
echo "--- Wallet sanity"
|
||||
(
|
||||
set -x
|
||||
multinode-demo/test/wallet-sanity.sh
|
||||
)
|
||||
) || flag_error
|
||||
|
||||
echo "--- Node count"
|
||||
(
|
||||
set -x
|
||||
./multinode-demo/client.sh "$PWD" 3 -c --addr 127.0.0.1
|
||||
)
|
||||
) || flag_error
|
||||
|
||||
killBackgroundCommands
|
||||
|
||||
echo "--- Ledger verification"
|
||||
killBackgroundCommands
|
||||
$solana_ledger_tool --ledger "$SOLANA_CONFIG_DIR"/ledger verify
|
||||
(
|
||||
set -x
|
||||
$solana_ledger_tool --ledger "$SOLANA_CONFIG_DIR"/ledger verify
|
||||
) || flag_error
|
||||
|
||||
echo +++
|
||||
echo Ok
|
||||
|
@ -11,7 +11,9 @@ fi
|
||||
# when this script is run from a triggered pipeline, TRIGGERED_BUILDKITE_TAG is
|
||||
# used instead of BUILDKITE_TAG (due to Buildkite limitations that prevents
|
||||
# BUILDKITE_TAG from propagating through to triggered pipelines)
|
||||
if [[ -z "$BUILDKITE_TAG" && -z "$TRIGGERED_BUILDKITE_TAG" ]]; then
|
||||
if [[ -n "$BUILDKITE_TAG" || -n "$TRIGGERED_BUILDKITE_TAG" ]]; then
|
||||
SNAP_CHANNEL=stable
|
||||
elif [[ $BUILDKITE_BRANCH = master ]]; then
|
||||
SNAP_CHANNEL=edge
|
||||
else
|
||||
SNAP_CHANNEL=beta
|
||||
@ -43,11 +45,11 @@ if [[ ! -x /usr/bin/multilog ]]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo --- build
|
||||
echo --- build: $SNAP_CHANNEL channel
|
||||
snapcraft
|
||||
|
||||
source ci/upload_ci_artifact.sh
|
||||
upload_ci_artifact solana_*.snap
|
||||
|
||||
echo --- publish
|
||||
echo --- publish: $SNAP_CHANNEL channel
|
||||
$DRYRUN snapcraft push solana_*.snap --release $SNAP_CHANNEL
|
||||
|
13
ci/test-bench.sh
Executable file
13
ci/test-bench.sh
Executable file
@ -0,0 +1,13 @@
|
||||
#!/bin/bash -e
|
||||
|
||||
cd "$(dirname "$0")/.."
|
||||
|
||||
ci/version-check.sh stable
|
||||
export RUST_BACKTRACE=1
|
||||
|
||||
_() {
|
||||
echo "--- $*"
|
||||
"$@"
|
||||
}
|
||||
|
||||
_ cargo bench --verbose
|
@ -13,7 +13,6 @@ _() {
|
||||
_ cargo fmt -- --check
|
||||
_ cargo build --verbose
|
||||
_ cargo test --verbose
|
||||
_ cargo bench --verbose
|
||||
|
||||
echo --- ci/localnet-sanity.sh
|
||||
(
|
||||
|
@ -70,7 +70,10 @@ fi
|
||||
SNAP_INSTALL_CMD="sudo snap remove solana; $SNAP_INSTALL_CMD"
|
||||
|
||||
EARLYOOM_INSTALL_CMD="\
|
||||
wget -O install-earlyoom.sh https://raw.githubusercontent.com/solana-labs/solana/master/ci/install-earlyoom.sh; \
|
||||
wget --retry-connrefused --waitretry=1 \
|
||||
--read-timeout=20 --timeout=15 --tries=5 \
|
||||
-O install-earlyoom.sh \
|
||||
https://raw.githubusercontent.com/solana-labs/solana/master/ci/install-earlyoom.sh; \
|
||||
bash install-earlyoom.sh \
|
||||
"
|
||||
SNAP_INSTALL_CMD="$EARLYOOM_INSTALL_CMD; $SNAP_INSTALL_CMD"
|
||||
@ -276,10 +279,12 @@ client_start() {
|
||||
tmux new -s solana -d \" \
|
||||
set -x; \
|
||||
sudo rm /tmp/solana.log; \
|
||||
/snap/bin/solana.bench-tps $SOLANA_NET_ENTRYPOINT $fullnode_count --loop -s 600 --sustained -t \$threadCount 2>&1 | tee /tmp/solana.log; \
|
||||
echo 'https://metrics.solana.com:8086/write?db=${INFLUX_DATABASE}&u=${INFLUX_USERNAME}&p=${INFLUX_PASSWORD}' \
|
||||
| xargs curl --max-time 5 -XPOST --data-binary 'testnet-deploy,name=$netBasename clientexit=1'; \
|
||||
echo Error: bench-tps should never exit | tee -a /tmp/solana.log; \
|
||||
while : ; do \
|
||||
/snap/bin/solana.bench-tps $SOLANA_NET_ENTRYPOINT $fullnode_count --loop -s 600 --sustained -t \$threadCount 2>&1 | tee -a /tmp/solana.log; \
|
||||
echo 'https://metrics.solana.com:8086/write?db=${INFLUX_DATABASE}&u=${INFLUX_USERNAME}&p=${INFLUX_PASSWORD}' \
|
||||
| xargs curl --max-time 5 -XPOST --data-binary 'testnet-deploy,name=$netBasename clientexit=1'; \
|
||||
echo Error: bench-tps should never exit | tee -a /tmp/solana.log; \
|
||||
done; \
|
||||
bash \
|
||||
\"; \
|
||||
sleep 2; \
|
||||
|
@ -17,12 +17,17 @@ if [[ -z $EXPECTED_NODE_COUNT ]]; then
|
||||
fi
|
||||
|
||||
echo "--- $NET_URL: verify ledger"
|
||||
if [[ -d /var/snap/solana/current/config/ledger ]]; then
|
||||
# Note: here we assume this script is actually running on the leader node...
|
||||
sudo solana.ledger-tool --ledger /var/snap/solana/current/config/ledger verify
|
||||
if [[ -z $NO_LEDGER_VERIFY ]]; then
|
||||
if [[ -d /var/snap/solana/current/config/ledger ]]; then
|
||||
# Note: here we assume this script is actually running on the leader node...
|
||||
sudo solana.ledger-tool --ledger /var/snap/solana/current/config/ledger verify
|
||||
else
|
||||
echo "^^^ +++"
|
||||
echo "Ledger verify skipped"
|
||||
fi
|
||||
else
|
||||
echo "^^^ +++"
|
||||
echo "Ledger verify skipped"
|
||||
echo "Ledger verify skipped (NO_LEDGER_VERIFY defined)"
|
||||
fi
|
||||
|
||||
echo "--- $NET_URL: wallet sanity"
|
||||
@ -54,13 +59,15 @@ if [[ -z $NO_VALIDATOR_SANITY ]]; then
|
||||
)
|
||||
wc -l validator.log
|
||||
if grep -C100 panic validator.log; then
|
||||
echo "^^^ +++ Panic observed"
|
||||
echo "^^^ +++"
|
||||
echo "Panic observed"
|
||||
exit 1
|
||||
else
|
||||
echo "Validator log looks ok"
|
||||
fi
|
||||
else
|
||||
echo "^^^ +++ Validator sanity disabled (NO_VALIDATOR_SANITY defined)"
|
||||
echo "^^^ +++"
|
||||
echo "Validator sanity disabled (NO_VALIDATOR_SANITY defined)"
|
||||
fi
|
||||
|
||||
exit 0
|
||||
|
@ -19,7 +19,7 @@ require() {
|
||||
|
||||
case ${1:-stable} in
|
||||
nightly)
|
||||
require rustc 1.29.[0-9]+-nightly
|
||||
require rustc 1.30.[0-9]+-nightly
|
||||
require cargo 1.29.[0-9]+-nightly
|
||||
;;
|
||||
stable)
|
||||
|
35
doc/testnet.md
Normal file
35
doc/testnet.md
Normal file
@ -0,0 +1,35 @@
|
||||
# TestNet debugging info
|
||||
|
||||
Currently we have two testnets, 'perf' and 'master', both on the master branch of the solana repo. Deploys happen
|
||||
at the top of every hour with the latest code. 'perf' has more cores for the client machine to flood the network
|
||||
with transactions until failure.
|
||||
|
||||
## Deploy process
|
||||
|
||||
They are deployed with the `ci/testnet-deploy.sh` script. There is a scheduled buildkite job which runs to do the deploy,
|
||||
look at `testnet-deploy` to see the agent which ran it and the logs. There is also a manual job to do the deploy manually..
|
||||
Validators are selected based on their machine name and everyone gets the binaries installed from snap.
|
||||
|
||||
## Where are the testnet logs?
|
||||
|
||||
For the client they are put in `/tmp/solana`; for validators and leaders they are in `/var/snap/solana/current/`.
|
||||
You can also see the backtrace of the client by ssh'ing into the client node and doing:
|
||||
|
||||
```bash
|
||||
$ sudo -u testnet-deploy
|
||||
$ tmux attach -t solana
|
||||
```
|
||||
|
||||
## How do I reset the testnet?
|
||||
|
||||
Through buildkite.
|
||||
|
||||
## How can I scale the tx generation rate?
|
||||
|
||||
Increase the TX rate by increasing the number of cores on the client machine which is running
|
||||
`bench-tps` or run multiple clients. Decrease by lowering cores or using the rayon env
|
||||
variable `RAYON_NUM_THREADS=<xx>`
|
||||
|
||||
## How can I test a change on the testnet?
|
||||
|
||||
Currently, a merged PR is the only way to test a change on the testnet.
|
@ -22,9 +22,9 @@ use std::result;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::RwLock;
|
||||
use std::time::Instant;
|
||||
use streamer::WINDOW_SIZE;
|
||||
use timing::{duration_as_us, timestamp};
|
||||
use transaction::{Instruction, Plan, Transaction};
|
||||
use window::WINDOW_SIZE;
|
||||
|
||||
/// The number of most recent `last_id` values that the bank will track the signatures
|
||||
/// of. Once the bank discards a `last_id`, it will reject any transactions that use
|
||||
|
@ -16,15 +16,15 @@ use solana::fullnode::Config;
|
||||
use solana::hash::Hash;
|
||||
use solana::logger;
|
||||
use solana::metrics;
|
||||
use solana::nat::{udp_public_bind, udp_random_bind, UdpSocketPair};
|
||||
use solana::nat::{get_public_ip_addr, udp_random_bind};
|
||||
use solana::ncp::Ncp;
|
||||
use solana::service::Service;
|
||||
use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil};
|
||||
use solana::streamer::default_window;
|
||||
use solana::thin_client::ThinClient;
|
||||
use solana::timing::{duration_as_ms, duration_as_s};
|
||||
use solana::transaction::Transaction;
|
||||
use solana::wallet::request_airdrop;
|
||||
use solana::window::default_window;
|
||||
use std::collections::VecDeque;
|
||||
use std::fs::File;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
|
||||
@ -373,7 +373,6 @@ fn main() {
|
||||
let mut threads = 4usize;
|
||||
let mut num_nodes = 1usize;
|
||||
let mut time_sec = 90;
|
||||
let mut addr = None;
|
||||
let mut sustained = false;
|
||||
let mut tx_count = 500_000;
|
||||
|
||||
@ -429,7 +428,7 @@ fn main() {
|
||||
Arg::with_name("addr")
|
||||
.short("a")
|
||||
.long("addr")
|
||||
.value_name("PATH")
|
||||
.value_name("IPADDR")
|
||||
.takes_value(true)
|
||||
.help("address to advertise to the network"),
|
||||
)
|
||||
@ -469,9 +468,17 @@ fn main() {
|
||||
time_sec = s.to_string().parse().expect("integer");
|
||||
}
|
||||
|
||||
if let Some(s) = matches.value_of("addr") {
|
||||
addr = Some(s.to_string());
|
||||
}
|
||||
let addr = if let Some(s) = matches.value_of("addr") {
|
||||
s.to_string().parse().unwrap_or_else(|e| {
|
||||
eprintln!("failed to parse {} as IP address error: {:?}", s, e);
|
||||
exit(1);
|
||||
})
|
||||
} else {
|
||||
get_public_ip_addr().unwrap_or_else(|e| {
|
||||
eprintln!("failed to get public IP, try --addr? error: {:?}", e);
|
||||
exit(1);
|
||||
})
|
||||
};
|
||||
|
||||
if let Some(s) = matches.value_of("tx_count") {
|
||||
tx_count = s.to_string().parse().expect("integer");
|
||||
@ -638,37 +645,17 @@ fn main() {
|
||||
}
|
||||
}
|
||||
|
||||
fn spy_node(addr: Option<String>) -> (NodeInfo, UdpSocket) {
|
||||
let gossip_socket_pair;
|
||||
if let Some(a) = addr {
|
||||
let gossip_socket = udp_random_bind(8000, 10000, 5).unwrap();
|
||||
let gossip_addr = SocketAddr::new(
|
||||
a.parse().unwrap(),
|
||||
gossip_socket.local_addr().unwrap().port(),
|
||||
);
|
||||
gossip_socket_pair = UdpSocketPair {
|
||||
addr: gossip_addr,
|
||||
receiver: gossip_socket.try_clone().unwrap(),
|
||||
sender: gossip_socket,
|
||||
};
|
||||
} else {
|
||||
gossip_socket_pair = udp_public_bind("gossip", 8000, 10000);
|
||||
}
|
||||
fn spy_node(addr: IpAddr) -> (NodeInfo, UdpSocket) {
|
||||
let gossip_socket = udp_random_bind(8000, 10000, 5).unwrap();
|
||||
|
||||
let gossip_addr = SocketAddr::new(addr, gossip_socket.local_addr().unwrap().port());
|
||||
|
||||
let pubkey = Keypair::new().pubkey();
|
||||
let daddr = "0.0.0.0:0".parse().unwrap();
|
||||
assert!(!gossip_socket_pair.addr.ip().is_unspecified());
|
||||
assert!(!gossip_socket_pair.addr.ip().is_multicast());
|
||||
let node = NodeInfo::new(
|
||||
pubkey,
|
||||
//gossip.local_addr().unwrap(),
|
||||
gossip_socket_pair.addr,
|
||||
daddr,
|
||||
daddr,
|
||||
daddr,
|
||||
daddr,
|
||||
);
|
||||
(node, gossip_socket_pair.receiver)
|
||||
assert!(!gossip_addr.ip().is_unspecified());
|
||||
assert!(!gossip_addr.ip().is_multicast());
|
||||
let node = NodeInfo::new(pubkey, gossip_addr, daddr, daddr, daddr, daddr);
|
||||
(node, gossip_socket)
|
||||
}
|
||||
|
||||
fn converge(
|
||||
@ -676,7 +663,7 @@ fn converge(
|
||||
exit_signal: &Arc<AtomicBool>,
|
||||
num_nodes: usize,
|
||||
threads: &mut Vec<JoinHandle<()>>,
|
||||
addr: Option<String>,
|
||||
addr: IpAddr,
|
||||
) -> Vec<NodeInfo> {
|
||||
//lets spy on the network
|
||||
let (spy, spy_gossip) = spy_node(addr);
|
||||
|
@ -9,7 +9,7 @@ use clap::{App, Arg};
|
||||
use solana::client::mk_client;
|
||||
use solana::crdt::{NodeInfo, TestNode};
|
||||
use solana::drone::DRONE_PORT;
|
||||
use solana::fullnode::{Config, FullNode};
|
||||
use solana::fullnode::{Config, Fullnode};
|
||||
use solana::logger;
|
||||
use solana::metrics::set_panic_hook;
|
||||
use solana::service::Service;
|
||||
@ -83,11 +83,11 @@ fn main() -> () {
|
||||
let testnet_addr: SocketAddr = testnet_address_string.parse().unwrap();
|
||||
drone_addr.set_ip(testnet_addr.ip());
|
||||
|
||||
FullNode::new(node, false, ledger_path, keypair, Some(testnet_addr))
|
||||
Fullnode::new(node, false, ledger_path, keypair, Some(testnet_addr))
|
||||
} else {
|
||||
node.data.leader_id = node.data.id;
|
||||
|
||||
FullNode::new(node, true, ledger_path, keypair, None)
|
||||
Fullnode::new(node, true, ledger_path, keypair, None)
|
||||
};
|
||||
|
||||
let mut client = mk_client(&repl_clone);
|
||||
|
@ -117,15 +117,14 @@ fn main() {
|
||||
}
|
||||
let entries = entries.map(|e| e.unwrap());
|
||||
|
||||
for (i, entry) in entries.enumerate() {
|
||||
let head = head - 2;
|
||||
for (i, entry) in entries.skip(2).enumerate() {
|
||||
if i >= head {
|
||||
break;
|
||||
}
|
||||
if i >= 2 {
|
||||
if let Err(e) = bank.process_entry(entry) {
|
||||
eprintln!("verify failed at entry[{}], err: {:?}", i, e);
|
||||
exit(1);
|
||||
}
|
||||
if let Err(e) = bank.process_entry(entry) {
|
||||
eprintln!("verify failed at entry[{}], err: {:?}", i + 2, e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
203
src/broadcast_stage.rs
Normal file
203
src/broadcast_stage.rs
Normal file
@ -0,0 +1,203 @@
|
||||
//! The `broadcast_stage` broadcasts data from a leader node to validators
|
||||
//!
|
||||
use counter::Counter;
|
||||
use crdt::{Crdt, CrdtError, NodeInfo};
|
||||
#[cfg(feature = "erasure")]
|
||||
use erasure;
|
||||
use log::Level;
|
||||
use packet::BlobRecycler;
|
||||
use result::{Error, Result};
|
||||
use service::Service;
|
||||
use std::mem;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::mpsc::RecvTimeoutError;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{self, Builder, JoinHandle};
|
||||
use std::time::Duration;
|
||||
use streamer::BlobReceiver;
|
||||
use window::{self, SharedWindow, WindowIndex, WINDOW_SIZE};
|
||||
|
||||
fn broadcast(
|
||||
node_info: &NodeInfo,
|
||||
broadcast_table: &[NodeInfo],
|
||||
window: &SharedWindow,
|
||||
recycler: &BlobRecycler,
|
||||
receiver: &BlobReceiver,
|
||||
sock: &UdpSocket,
|
||||
transmit_index: &mut WindowIndex,
|
||||
receive_index: &mut u64,
|
||||
) -> Result<()> {
|
||||
let debug_id = node_info.debug_id();
|
||||
let timer = Duration::new(1, 0);
|
||||
let mut dq = receiver.recv_timeout(timer)?;
|
||||
while let Ok(mut nq) = receiver.try_recv() {
|
||||
dq.append(&mut nq);
|
||||
}
|
||||
|
||||
// flatten deque to vec
|
||||
let blobs_vec: Vec<_> = dq.into_iter().collect();
|
||||
|
||||
// We could receive more blobs than window slots so
|
||||
// break them up into window-sized chunks to process
|
||||
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec());
|
||||
|
||||
if log_enabled!(Level::Trace) {
|
||||
trace!("{}", window::print_window(debug_id, window, *receive_index));
|
||||
}
|
||||
|
||||
for mut blobs in blobs_chunked {
|
||||
let blobs_len = blobs.len();
|
||||
trace!("{:x}: broadcast blobs.len: {}", debug_id, blobs_len);
|
||||
|
||||
// Index the blobs
|
||||
window::index_blobs(node_info, &blobs, receive_index)
|
||||
.expect("index blobs for initial window");
|
||||
|
||||
// keep the cache of blobs that are broadcast
|
||||
inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
|
||||
{
|
||||
let mut win = window.write().unwrap();
|
||||
assert!(blobs.len() <= win.len());
|
||||
for b in &blobs {
|
||||
let ix = b.read().unwrap().get_index().expect("blob index");
|
||||
let pos = (ix % WINDOW_SIZE) as usize;
|
||||
if let Some(x) = mem::replace(&mut win[pos].data, None) {
|
||||
trace!(
|
||||
"{:x} popped {} at {}",
|
||||
debug_id,
|
||||
x.read().unwrap().get_index().unwrap(),
|
||||
pos
|
||||
);
|
||||
recycler.recycle(x);
|
||||
}
|
||||
if let Some(x) = mem::replace(&mut win[pos].coding, None) {
|
||||
trace!(
|
||||
"{:x} popped {} at {}",
|
||||
debug_id,
|
||||
x.read().unwrap().get_index().unwrap(),
|
||||
pos
|
||||
);
|
||||
recycler.recycle(x);
|
||||
}
|
||||
|
||||
trace!("{:x} null {}", debug_id, pos);
|
||||
}
|
||||
while let Some(b) = blobs.pop() {
|
||||
let ix = b.read().unwrap().get_index().expect("blob index");
|
||||
let pos = (ix % WINDOW_SIZE) as usize;
|
||||
trace!("{:x} caching {} at {}", debug_id, ix, pos);
|
||||
assert!(win[pos].data.is_none());
|
||||
win[pos].data = Some(b);
|
||||
}
|
||||
}
|
||||
|
||||
// Fill in the coding blob data from the window data blobs
|
||||
#[cfg(feature = "erasure")]
|
||||
{
|
||||
erasure::generate_coding(
|
||||
debug_id,
|
||||
&mut window.write().unwrap(),
|
||||
recycler,
|
||||
*receive_index,
|
||||
blobs_len,
|
||||
&mut transmit_index.coding,
|
||||
)?;
|
||||
}
|
||||
|
||||
*receive_index += blobs_len as u64;
|
||||
|
||||
// Send blobs out from the window
|
||||
Crdt::broadcast(
|
||||
&node_info,
|
||||
&broadcast_table,
|
||||
&window,
|
||||
&sock,
|
||||
transmit_index,
|
||||
*receive_index,
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub struct BroadcastStage {
|
||||
thread_hdl: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl BroadcastStage {
|
||||
fn run(
|
||||
sock: &UdpSocket,
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
window: &SharedWindow,
|
||||
entry_height: u64,
|
||||
recycler: &BlobRecycler,
|
||||
receiver: &BlobReceiver,
|
||||
) {
|
||||
let mut transmit_index = WindowIndex {
|
||||
data: entry_height,
|
||||
coding: entry_height,
|
||||
};
|
||||
let mut receive_index = entry_height;
|
||||
let me = crdt.read().unwrap().my_data().clone();
|
||||
loop {
|
||||
let broadcast_table = crdt.read().unwrap().compute_broadcast_table();
|
||||
if let Err(e) = broadcast(
|
||||
&me,
|
||||
&broadcast_table,
|
||||
&window,
|
||||
&recycler,
|
||||
&receiver,
|
||||
&sock,
|
||||
&mut transmit_index,
|
||||
&mut receive_index,
|
||||
) {
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
|
||||
_ => {
|
||||
inc_new_counter_info!("streamer-broadcaster-error", 1, 1);
|
||||
error!("broadcaster error: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Service to broadcast messages from the leader to layer 1 nodes.
|
||||
/// See `crdt` for network layer definitions.
|
||||
/// # Arguments
|
||||
/// * `sock` - Socket to send from.
|
||||
/// * `exit` - Boolean to signal system exit.
|
||||
/// * `crdt` - CRDT structure
|
||||
/// * `window` - Cache of blobs that we have broadcast
|
||||
/// * `recycler` - Blob recycler.
|
||||
/// * `receiver` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
|
||||
pub fn new(
|
||||
sock: UdpSocket,
|
||||
crdt: Arc<RwLock<Crdt>>,
|
||||
window: SharedWindow,
|
||||
entry_height: u64,
|
||||
recycler: BlobRecycler,
|
||||
receiver: BlobReceiver,
|
||||
) -> Self {
|
||||
let thread_hdl = Builder::new()
|
||||
.name("solana-broadcaster".to_string())
|
||||
.spawn(move || {
|
||||
Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
BroadcastStage { thread_hdl }
|
||||
}
|
||||
}
|
||||
|
||||
impl Service for BroadcastStage {
|
||||
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
|
||||
vec![self.thread_hdl]
|
||||
}
|
||||
|
||||
fn join(self) -> thread::Result<()> {
|
||||
self.thread_hdl.join()
|
||||
}
|
||||
}
|
@ -35,9 +35,10 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{sleep, Builder, JoinHandle};
|
||||
use std::time::{Duration, Instant};
|
||||
use streamer::{BlobReceiver, BlobSender, SharedWindow, WindowIndex};
|
||||
use streamer::{BlobReceiver, BlobSender};
|
||||
use timing::{duration_as_ms, timestamp};
|
||||
use transaction::Vote;
|
||||
use window::{SharedWindow, WindowIndex};
|
||||
|
||||
/// milliseconds we sleep for between gossip requests
|
||||
const GOSSIP_SLEEP_MILLIS: u64 = 100;
|
||||
@ -1381,8 +1382,8 @@ mod tests {
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
use streamer::default_window;
|
||||
use transaction::Vote;
|
||||
use window::default_window;
|
||||
|
||||
#[test]
|
||||
fn test_parse_port_or_addr() {
|
||||
@ -1910,9 +1911,11 @@ mod tests {
|
||||
assert!(rv.is_none());
|
||||
|
||||
fn tmp_ledger(name: &str) -> String {
|
||||
use std::env;
|
||||
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
|
||||
let keypair = Keypair::new();
|
||||
|
||||
let path = format!("/tmp/farf/{}-{}", name, keypair.pubkey());
|
||||
let path = format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey());
|
||||
|
||||
let mut writer = LedgerWriter::open(&path, true).unwrap();
|
||||
let zero = Hash::default();
|
||||
|
11
src/drone.rs
11
src/drone.rs
@ -159,7 +159,7 @@ mod tests {
|
||||
use bank::Bank;
|
||||
use crdt::{get_ip_addr, TestNode};
|
||||
use drone::{Drone, DroneRequest, REQUEST_CAP, TIME_SLICE};
|
||||
use fullnode::FullNode;
|
||||
use fullnode::Fullnode;
|
||||
use logger;
|
||||
use mint::Mint;
|
||||
use service::Service;
|
||||
@ -260,9 +260,11 @@ mod tests {
|
||||
}
|
||||
|
||||
fn tmp_ledger_path(name: &str) -> String {
|
||||
use std::env;
|
||||
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
|
||||
let keypair = Keypair::new();
|
||||
|
||||
format!("/tmp/tmp-ledger-{}-{}", name, keypair.pubkey())
|
||||
format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey())
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -283,12 +285,11 @@ mod tests {
|
||||
let leader_data = leader.data.clone();
|
||||
let ledger_path = tmp_ledger_path("send_airdrop");
|
||||
|
||||
let server = FullNode::new_leader(
|
||||
let server = Fullnode::new_leader(
|
||||
leader_keypair,
|
||||
bank,
|
||||
0,
|
||||
None,
|
||||
Some(Duration::from_millis(30)),
|
||||
&[],
|
||||
leader,
|
||||
exit.clone(),
|
||||
&ledger_path,
|
||||
|
@ -3,13 +3,21 @@ use packet::{BlobRecycler, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE};
|
||||
use std::cmp;
|
||||
use std::mem;
|
||||
use std::result;
|
||||
use streamer::WindowSlot;
|
||||
use window::WindowSlot;
|
||||
|
||||
//TODO(sakridge) pick these values
|
||||
pub const NUM_DATA: usize = 16; // number of data blobs
|
||||
pub const NUM_CODING: usize = 4; // number of coding blobs, also the maximum number that can go missing
|
||||
pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING; // total number of blobs in an erasure set, includes data and coding blobs
|
||||
|
||||
pub const JERASURE_ALIGN: usize = 4; // data size has to be a multiple of 4 bytes
|
||||
|
||||
macro_rules! align {
|
||||
($x:expr, $align:expr) => {
|
||||
$x + ($align - 1) & !($align - 1)
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum ErasureError {
|
||||
NotEnoughBlocksToDecode,
|
||||
@ -245,6 +253,9 @@ pub fn generate_coding(
|
||||
}
|
||||
}
|
||||
|
||||
// round up to the nearest jerasure alignment
|
||||
max_data_size = align!(max_data_size, JERASURE_ALIGN);
|
||||
|
||||
trace!("{:x} max_data_size: {}", debug_id, max_data_size);
|
||||
|
||||
let mut data_blobs = Vec::with_capacity(NUM_DATA);
|
||||
@ -262,8 +273,9 @@ pub fn generate_coding(
|
||||
}
|
||||
}
|
||||
|
||||
// getting ready to do erasure coding, means that we're potentially going back in time,
|
||||
// tell our caller we've inserted coding blocks starting at coding_index_start
|
||||
// getting ready to do erasure coding, means that we're potentially
|
||||
// going back in time, tell our caller we've inserted coding blocks
|
||||
// starting at coding_index_start
|
||||
*transmit_index_coding = cmp::min(*transmit_index_coding, coding_index_start);
|
||||
|
||||
let mut coding_blobs = Vec::with_capacity(NUM_CODING);
|
||||
@ -602,7 +614,7 @@ mod test {
|
||||
use signature::Keypair;
|
||||
use signature::KeypairUtil;
|
||||
// use std::sync::{Arc, RwLock};
|
||||
use streamer::{index_blobs, WindowSlot};
|
||||
use window::{index_blobs, WindowSlot};
|
||||
|
||||
#[test]
|
||||
pub fn test_coding() {
|
||||
@ -701,7 +713,8 @@ mod test {
|
||||
let mut window = vec![
|
||||
WindowSlot {
|
||||
data: None,
|
||||
coding: None
|
||||
coding: None,
|
||||
leader_unknown: false,
|
||||
};
|
||||
WINDOW_SIZE
|
||||
];
|
||||
@ -711,7 +724,7 @@ mod test {
|
||||
let b_ = b.clone();
|
||||
let mut w = b.write().unwrap();
|
||||
// generate a random length, multiple of 4 between 8 and 32
|
||||
let data_len = thread_rng().gen_range(2, 8) * 4;
|
||||
let data_len = (thread_rng().gen_range(2, 8) * 4) + 1;
|
||||
eprintln!("data_len of {} is {}", i, data_len);
|
||||
w.set_size(data_len);
|
||||
|
||||
|
@ -1,27 +1,25 @@
|
||||
//! The `fullnode` module hosts all the fullnode microservices.
|
||||
|
||||
use bank::Bank;
|
||||
use broadcast_stage::BroadcastStage;
|
||||
use crdt::{Crdt, NodeInfo, TestNode};
|
||||
use entry::Entry;
|
||||
use ledger::{read_ledger, Block};
|
||||
use ledger::read_ledger;
|
||||
use ncp::Ncp;
|
||||
use packet::BlobRecycler;
|
||||
use rpu::Rpu;
|
||||
use service::Service;
|
||||
use signature::{Keypair, KeypairUtil};
|
||||
use std::collections::VecDeque;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{JoinHandle, Result};
|
||||
use std::time::Duration;
|
||||
use streamer;
|
||||
use tpu::Tpu;
|
||||
use tvu::Tvu;
|
||||
use untrusted::Input;
|
||||
use window;
|
||||
|
||||
//use std::time::Duration;
|
||||
pub struct FullNode {
|
||||
pub struct Fullnode {
|
||||
exit: Arc<AtomicBool>,
|
||||
thread_hdls: Vec<JoinHandle<()>>,
|
||||
}
|
||||
@ -48,7 +46,7 @@ impl Config {
|
||||
}
|
||||
}
|
||||
|
||||
impl FullNode {
|
||||
impl Fullnode {
|
||||
fn new_internal(
|
||||
mut node: TestNode,
|
||||
leader: bool,
|
||||
@ -56,7 +54,7 @@ impl FullNode {
|
||||
keypair: Keypair,
|
||||
network_entry_for_validator: Option<SocketAddr>,
|
||||
sigverify_disabled: bool,
|
||||
) -> FullNode {
|
||||
) -> Self {
|
||||
info!("creating bank...");
|
||||
let bank = Bank::new_default(leader);
|
||||
|
||||
@ -84,11 +82,11 @@ impl FullNode {
|
||||
let testnet_addr = network_entry_for_validator.expect("validator requires entry");
|
||||
|
||||
let network_entry_point = NodeInfo::new_entry_point(testnet_addr);
|
||||
let server = FullNode::new_validator(
|
||||
let server = Self::new_validator(
|
||||
keypair,
|
||||
bank,
|
||||
entry_height,
|
||||
Some(ledger_tail),
|
||||
&ledger_tail,
|
||||
node,
|
||||
&network_entry_point,
|
||||
exit.clone(),
|
||||
@ -103,13 +101,11 @@ impl FullNode {
|
||||
} else {
|
||||
node.data.leader_id = node.data.id;
|
||||
|
||||
let server = FullNode::new_leader(
|
||||
let server = Self::new_leader(
|
||||
keypair,
|
||||
bank,
|
||||
entry_height,
|
||||
Some(ledger_tail),
|
||||
//Some(Duration::from_millis(1000)),
|
||||
None,
|
||||
&ledger_tail,
|
||||
node,
|
||||
exit.clone(),
|
||||
ledger_path,
|
||||
@ -129,8 +125,8 @@ impl FullNode {
|
||||
ledger: &str,
|
||||
keypair: Keypair,
|
||||
network_entry_for_validator: Option<SocketAddr>,
|
||||
) -> FullNode {
|
||||
FullNode::new_internal(
|
||||
) -> Self {
|
||||
Self::new_internal(
|
||||
node,
|
||||
leader,
|
||||
ledger,
|
||||
@ -146,8 +142,8 @@ impl FullNode {
|
||||
ledger_path: &str,
|
||||
keypair: Keypair,
|
||||
network_entry_for_validator: Option<SocketAddr>,
|
||||
) -> FullNode {
|
||||
FullNode::new_internal(
|
||||
) -> Self {
|
||||
Self::new_internal(
|
||||
node,
|
||||
leader,
|
||||
ledger_path,
|
||||
@ -157,26 +153,6 @@ impl FullNode {
|
||||
)
|
||||
}
|
||||
|
||||
fn new_window(
|
||||
ledger_tail: Option<Vec<Entry>>,
|
||||
entry_height: u64,
|
||||
node_info: &NodeInfo,
|
||||
blob_recycler: &BlobRecycler,
|
||||
) -> streamer::SharedWindow {
|
||||
match ledger_tail {
|
||||
Some(ledger_tail) => {
|
||||
// convert to blobs
|
||||
let mut blobs = VecDeque::new();
|
||||
ledger_tail.to_blobs(&blob_recycler, &mut blobs);
|
||||
|
||||
// flatten deque to vec
|
||||
let blobs: Vec<_> = blobs.into_iter().collect();
|
||||
streamer::initialized_window(&node_info, blobs, entry_height)
|
||||
}
|
||||
None => streamer::default_window(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a server instance acting as a leader.
|
||||
///
|
||||
/// ```text
|
||||
@ -205,13 +181,16 @@ impl FullNode {
|
||||
keypair: Keypair,
|
||||
bank: Bank,
|
||||
entry_height: u64,
|
||||
ledger_tail: Option<Vec<Entry>>,
|
||||
tick_duration: Option<Duration>,
|
||||
ledger_tail: &[Entry],
|
||||
node: TestNode,
|
||||
exit: Arc<AtomicBool>,
|
||||
ledger_path: &str,
|
||||
sigverify_disabled: bool,
|
||||
) -> Self {
|
||||
let tick_duration = None;
|
||||
// TODO: To light up PoH, uncomment the following line:
|
||||
//let tick_duration = Some(Duration::from_millis(1000));
|
||||
|
||||
let bank = Arc::new(bank);
|
||||
let mut thread_hdls = vec![];
|
||||
let rpu = Rpu::new(
|
||||
@ -223,7 +202,8 @@ impl FullNode {
|
||||
thread_hdls.extend(rpu.thread_hdls());
|
||||
|
||||
let blob_recycler = BlobRecycler::default();
|
||||
let window = FullNode::new_window(ledger_tail, entry_height, &node.data, &blob_recycler);
|
||||
let window =
|
||||
window::new_window_from_entries(ledger_tail, entry_height, &node.data, &blob_recycler);
|
||||
|
||||
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
|
||||
|
||||
@ -249,7 +229,7 @@ impl FullNode {
|
||||
).expect("Ncp::new");
|
||||
thread_hdls.extend(ncp.thread_hdls());
|
||||
|
||||
let t_broadcast = streamer::broadcaster(
|
||||
let broadcast_stage = BroadcastStage::new(
|
||||
node.sockets.broadcast,
|
||||
crdt,
|
||||
window,
|
||||
@ -257,9 +237,9 @@ impl FullNode {
|
||||
blob_recycler.clone(),
|
||||
blob_receiver,
|
||||
);
|
||||
thread_hdls.extend(vec![t_broadcast]);
|
||||
thread_hdls.extend(broadcast_stage.thread_hdls());
|
||||
|
||||
FullNode { exit, thread_hdls }
|
||||
Fullnode { exit, thread_hdls }
|
||||
}
|
||||
|
||||
/// Create a server instance acting as a validator.
|
||||
@ -295,7 +275,7 @@ impl FullNode {
|
||||
keypair: Keypair,
|
||||
bank: Bank,
|
||||
entry_height: u64,
|
||||
ledger_tail: Option<Vec<Entry>>,
|
||||
ledger_tail: &[Entry],
|
||||
node: TestNode,
|
||||
entry_point: &NodeInfo,
|
||||
exit: Arc<AtomicBool>,
|
||||
@ -313,7 +293,8 @@ impl FullNode {
|
||||
thread_hdls.extend(rpu.thread_hdls());
|
||||
|
||||
let blob_recycler = BlobRecycler::default();
|
||||
let window = FullNode::new_window(ledger_tail, entry_height, &node.data, &blob_recycler);
|
||||
let window =
|
||||
window::new_window_from_entries(ledger_tail, entry_height, &node.data, &blob_recycler);
|
||||
|
||||
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
|
||||
crdt.write()
|
||||
@ -343,7 +324,7 @@ impl FullNode {
|
||||
);
|
||||
thread_hdls.extend(tvu.thread_hdls());
|
||||
thread_hdls.extend(ncp.thread_hdls());
|
||||
FullNode { exit, thread_hdls }
|
||||
Fullnode { exit, thread_hdls }
|
||||
}
|
||||
|
||||
//used for notifying many nodes in parallel to exit
|
||||
@ -356,7 +337,7 @@ impl FullNode {
|
||||
}
|
||||
}
|
||||
|
||||
impl Service for FullNode {
|
||||
impl Service for Fullnode {
|
||||
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
|
||||
self.thread_hdls
|
||||
}
|
||||
@ -373,7 +354,7 @@ impl Service for FullNode {
|
||||
mod tests {
|
||||
use bank::Bank;
|
||||
use crdt::TestNode;
|
||||
use fullnode::FullNode;
|
||||
use fullnode::Fullnode;
|
||||
use mint::Mint;
|
||||
use service::Service;
|
||||
use signature::{Keypair, KeypairUtil};
|
||||
@ -388,13 +369,13 @@ mod tests {
|
||||
let bank = Bank::new(&alice);
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let entry = tn.data.clone();
|
||||
let v = FullNode::new_validator(keypair, bank, 0, None, tn, &entry, exit, None, false);
|
||||
let v = Fullnode::new_validator(keypair, bank, 0, &[], tn, &entry, exit, None, false);
|
||||
v.exit();
|
||||
v.join().unwrap();
|
||||
}
|
||||
#[test]
|
||||
fn validator_parallel_exit() {
|
||||
let vals: Vec<FullNode> = (0..2)
|
||||
let vals: Vec<Fullnode> = (0..2)
|
||||
.map(|_| {
|
||||
let keypair = Keypair::new();
|
||||
let tn = TestNode::new_localhost_with_pubkey(keypair.pubkey());
|
||||
@ -402,7 +383,7 @@ mod tests {
|
||||
let bank = Bank::new(&alice);
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let entry = tn.data.clone();
|
||||
FullNode::new_validator(keypair, bank, 0, None, tn, &entry, exit, None, false)
|
||||
Fullnode::new_validator(keypair, bank, 0, &[], tn, &entry, exit, None, false)
|
||||
})
|
||||
.collect();
|
||||
//each validator can exit in parallel to speed many sequential calls to `join`
|
||||
|
@ -15,8 +15,8 @@ use std::io::prelude::*;
|
||||
use std::io::{self, BufReader, BufWriter, Seek, SeekFrom};
|
||||
use std::mem::size_of;
|
||||
use std::path::Path;
|
||||
use streamer::WINDOW_SIZE;
|
||||
use transaction::Transaction;
|
||||
use window::WINDOW_SIZE;
|
||||
|
||||
//
|
||||
// A persistent ledger is 2 files:
|
||||
@ -549,9 +549,11 @@ mod tests {
|
||||
use transaction::{Transaction, Vote};
|
||||
|
||||
fn tmp_ledger_path(name: &str) -> String {
|
||||
use std::env;
|
||||
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
|
||||
let keypair = Keypair::new();
|
||||
|
||||
format!("/tmp/tmp-ledger-{}-{}", name, keypair.pubkey())
|
||||
format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey())
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -12,6 +12,7 @@ pub mod counter;
|
||||
pub mod bank;
|
||||
pub mod banking_stage;
|
||||
pub mod blob_fetch_stage;
|
||||
pub mod broadcast_stage;
|
||||
pub mod budget;
|
||||
pub mod choose_gossip_peer_strategy;
|
||||
pub mod client;
|
||||
@ -39,6 +40,7 @@ pub mod request;
|
||||
pub mod request_processor;
|
||||
pub mod request_stage;
|
||||
pub mod result;
|
||||
pub mod retransmit_stage;
|
||||
pub mod rpu;
|
||||
pub mod service;
|
||||
pub mod signature;
|
||||
@ -53,7 +55,7 @@ pub mod tvu;
|
||||
pub mod vote_stage;
|
||||
pub mod voting;
|
||||
pub mod wallet;
|
||||
pub mod window_stage;
|
||||
pub mod window;
|
||||
pub mod write_stage;
|
||||
extern crate bincode;
|
||||
extern crate bs58;
|
||||
|
73
src/nat.rs
73
src/nat.rs
@ -1,19 +1,14 @@
|
||||
//! The `nat` module assists with NAT traversal
|
||||
|
||||
extern crate futures;
|
||||
extern crate p2p;
|
||||
extern crate rand;
|
||||
extern crate reqwest;
|
||||
extern crate tokio_core;
|
||||
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
|
||||
|
||||
use self::futures::Future;
|
||||
use self::p2p::UdpSocketExt;
|
||||
use rand::{thread_rng, Rng};
|
||||
use std::env;
|
||||
use std::io;
|
||||
use std::str;
|
||||
|
||||
/// A data type representing a public Udp socket
|
||||
pub struct UdpSocketPair {
|
||||
@ -51,71 +46,3 @@ pub fn udp_random_bind(start: u16, end: u16, tries: u32) -> io::Result<UdpSocket
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Binds a private Udp address to a public address using UPnP if possible
|
||||
pub fn udp_public_bind(label: &str, startport: u16, endport: u16) -> UdpSocketPair {
|
||||
let private_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
|
||||
|
||||
let mut core = tokio_core::reactor::Core::new().unwrap();
|
||||
let handle = core.handle();
|
||||
let mc = p2p::P2p::default();
|
||||
let res = core.run({
|
||||
tokio_core::net::UdpSocket::bind_public(&private_addr, &handle, &mc)
|
||||
.map_err(|e| {
|
||||
info!("Failed to bind public socket for {}: {}", label, e);
|
||||
})
|
||||
.and_then(|(socket, public_addr)| Ok((public_addr, socket.local_addr().unwrap())))
|
||||
});
|
||||
|
||||
match res {
|
||||
Ok((public_addr, local_addr)) => {
|
||||
info!(
|
||||
"Using local address {} mapped to UPnP public address {} for {}",
|
||||
local_addr, public_addr, label
|
||||
);
|
||||
|
||||
// NAT should now be forwarding inbound packets directed at
|
||||
// |public_addr| to the local |receiver| socket...
|
||||
let receiver = UdpSocket::bind(local_addr).unwrap();
|
||||
|
||||
// TODO: try to autodetect a broken NAT (issue #496)
|
||||
let sender = if env::var("BROKEN_NAT").is_err() {
|
||||
receiver.try_clone().unwrap()
|
||||
} else {
|
||||
// ... however for outbound packets, some NATs *will not* rewrite the
|
||||
// source port from |receiver.local_addr().port()| to |public_addr.port()|.
|
||||
// This is currently a problem when talking with a fullnode as it
|
||||
// assumes it can send UDP packets back at the source. This hits the
|
||||
// NAT as a datagram for |receiver.local_addr().port()| on the NAT's public
|
||||
// IP, which the NAT promptly discards. As a short term hack, create a
|
||||
// local UDP socket, |sender|, with the same port as |public_addr.port()|.
|
||||
//
|
||||
// TODO: Remove the |sender| socket and deal with the downstream changes to
|
||||
// the UDP signalling
|
||||
let mut local_addr_sender = local_addr;
|
||||
local_addr_sender.set_port(public_addr.port());
|
||||
UdpSocket::bind(local_addr_sender).unwrap()
|
||||
};
|
||||
|
||||
UdpSocketPair {
|
||||
addr: public_addr,
|
||||
receiver,
|
||||
sender,
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
let sender = udp_random_bind(startport, endport, 5).unwrap();
|
||||
let local_addr = sender.local_addr().unwrap();
|
||||
|
||||
let pub_ip = get_public_ip_addr().unwrap();
|
||||
let pub_addr = SocketAddr::new(pub_ip, local_addr.port());
|
||||
|
||||
info!("Using source address {} for {}", pub_addr, label);
|
||||
UdpSocketPair {
|
||||
addr: pub_addr,
|
||||
receiver: sender.try_clone().unwrap(),
|
||||
sender,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{self, JoinHandle};
|
||||
use streamer;
|
||||
use window::SharedWindow;
|
||||
|
||||
pub struct Ncp {
|
||||
exit: Arc<AtomicBool>,
|
||||
@ -19,7 +20,7 @@ pub struct Ncp {
|
||||
impl Ncp {
|
||||
pub fn new(
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
window: streamer::SharedWindow,
|
||||
window: SharedWindow,
|
||||
ledger_path: Option<&str>,
|
||||
gossip_listen_socket: UdpSocket,
|
||||
gossip_send_socket: UdpSocket,
|
||||
|
@ -26,7 +26,7 @@ pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_HEADER_SIZE;
|
||||
pub const PACKET_DATA_SIZE: usize = 256;
|
||||
pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE;
|
||||
|
||||
#[derive(Clone, Default, Debug)]
|
||||
#[derive(Clone, Default, Debug, PartialEq)]
|
||||
#[repr(C)]
|
||||
pub struct Meta {
|
||||
pub size: usize,
|
||||
@ -63,6 +63,19 @@ impl Default for Packet {
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Reset {
|
||||
// Reset trait is an object that can re-initialize important parts
|
||||
// of itself, similar to Default, but not necessarily a full clear
|
||||
// also, we do it in-place.
|
||||
fn reset(&mut self);
|
||||
}
|
||||
|
||||
impl Reset for Packet {
|
||||
fn reset(&mut self) {
|
||||
self.meta = Meta::default();
|
||||
}
|
||||
}
|
||||
|
||||
impl Meta {
|
||||
pub fn addr(&self) -> SocketAddr {
|
||||
if !self.v6 {
|
||||
@ -113,6 +126,14 @@ impl Default for Packets {
|
||||
}
|
||||
}
|
||||
|
||||
impl Reset for Packets {
|
||||
fn reset(&mut self) {
|
||||
for i in 0..self.packets.len() {
|
||||
self.packets[i].reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Blob {
|
||||
pub data: [u8; BLOB_SIZE],
|
||||
@ -140,6 +161,13 @@ impl Default for Blob {
|
||||
}
|
||||
}
|
||||
|
||||
impl Reset for Blob {
|
||||
fn reset(&mut self) {
|
||||
self.meta = Meta::default();
|
||||
self.data[..BLOB_HEADER_SIZE].copy_from_slice(&[0u8; BLOB_HEADER_SIZE]);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum BlobError {
|
||||
/// the Blob's meta and data are not self-consistent
|
||||
@ -166,25 +194,35 @@ impl<T: Default> Clone for Recycler<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Default> Recycler<T> {
|
||||
impl<T: Default + Reset> Recycler<T> {
|
||||
pub fn allocate(&self) -> Arc<RwLock<T>> {
|
||||
let mut gc = self.gc.lock().expect("recycler lock in pb fn allocate");
|
||||
let x = gc
|
||||
.pop()
|
||||
.unwrap_or_else(|| Arc::new(RwLock::new(Default::default())));
|
||||
|
||||
// Only return the item if this recycler is the last reference to it.
|
||||
// Remove this check once `T` holds a Weak reference back to this
|
||||
// recycler and implements `Drop`. At the time of this writing, Weak can't
|
||||
// be passed across threads ('alloc' is a nightly-only API), and so our
|
||||
// reference-counted recyclables are awkwardly being recycled by hand,
|
||||
// which allows this race condition to exist.
|
||||
if Arc::strong_count(&x) > 1 {
|
||||
warn!("Recycled item still in use. Booting it.");
|
||||
drop(gc);
|
||||
self.allocate()
|
||||
} else {
|
||||
x
|
||||
loop {
|
||||
if let Some(x) = gc.pop() {
|
||||
// Only return the item if this recycler is the last reference to it.
|
||||
// Remove this check once `T` holds a Weak reference back to this
|
||||
// recycler and implements `Drop`. At the time of this writing, Weak can't
|
||||
// be passed across threads ('alloc' is a nightly-only API), and so our
|
||||
// reference-counted recyclables are awkwardly being recycled by hand,
|
||||
// which allows this race condition to exist.
|
||||
if Arc::strong_count(&x) >= 1 {
|
||||
// Commenting out this message, is annoying for known use case of
|
||||
// validator hanging onto a blob in the window, but also sending it over
|
||||
// to retransmmit_request
|
||||
//
|
||||
// warn!("Recycled item still in use. Booting it.");
|
||||
continue;
|
||||
}
|
||||
|
||||
{
|
||||
let mut w = x.write().unwrap();
|
||||
w.reset();
|
||||
}
|
||||
return x;
|
||||
} else {
|
||||
return Arc::new(RwLock::new(Default::default()));
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn recycle(&self, x: Arc<RwLock<T>>) {
|
||||
@ -455,7 +493,8 @@ impl Blob {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use packet::{
|
||||
to_packets, Blob, BlobRecycler, Packet, PacketRecycler, Packets, Recycler, NUM_PACKETS,
|
||||
to_packets, Blob, BlobRecycler, Meta, Packet, PacketRecycler, Packets, Recycler, Reset,
|
||||
BLOB_HEADER_SIZE, NUM_PACKETS,
|
||||
};
|
||||
use request::Request;
|
||||
use std::collections::VecDeque;
|
||||
@ -474,6 +513,12 @@ mod tests {
|
||||
assert_eq!(r.gc.lock().unwrap().len(), 0);
|
||||
}
|
||||
|
||||
impl Reset for u8 {
|
||||
fn reset(&mut self) {
|
||||
*self = Default::default();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_leaked_recyclable() {
|
||||
// Ensure that the recycler won't return an item
|
||||
@ -611,6 +656,9 @@ mod tests {
|
||||
b.data_mut()[0] = 1;
|
||||
assert_eq!(b.data()[0], 1);
|
||||
assert_eq!(b.get_index().unwrap(), <u64>::max_value());
|
||||
b.reset();
|
||||
assert!(b.data[..BLOB_HEADER_SIZE].starts_with(&[0u8; BLOB_HEADER_SIZE]));
|
||||
assert_eq!(b.meta, Meta::default());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -9,7 +9,7 @@ use packet;
|
||||
use serde_json;
|
||||
use std;
|
||||
use std::any::Any;
|
||||
use streamer;
|
||||
use window;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
@ -22,7 +22,7 @@ pub enum Error {
|
||||
Serialize(std::boxed::Box<bincode::ErrorKind>),
|
||||
BankError(bank::BankError),
|
||||
CrdtError(crdt::CrdtError),
|
||||
WindowError(streamer::WindowError),
|
||||
WindowError(window::WindowError),
|
||||
BlobError(packet::BlobError),
|
||||
#[cfg(feature = "erasure")]
|
||||
ErasureError(erasure::ErasureError),
|
||||
@ -51,8 +51,8 @@ impl std::convert::From<crdt::CrdtError> for Error {
|
||||
Error::CrdtError(e)
|
||||
}
|
||||
}
|
||||
impl std::convert::From<streamer::WindowError> for Error {
|
||||
fn from(e: streamer::WindowError) -> Error {
|
||||
impl std::convert::From<window::WindowError> for Error {
|
||||
fn from(e: window::WindowError) -> Error {
|
||||
Error::WindowError(e)
|
||||
}
|
||||
}
|
||||
|
123
src/retransmit_stage.rs
Normal file
123
src/retransmit_stage.rs
Normal file
@ -0,0 +1,123 @@
|
||||
//! The `retransmit_stage` retransmits blobs between validators
|
||||
|
||||
use counter::Counter;
|
||||
use crdt::Crdt;
|
||||
use log::Level;
|
||||
use packet::BlobRecycler;
|
||||
use result::{Error, Result};
|
||||
use service::Service;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::mpsc::RecvTimeoutError;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{self, Builder, JoinHandle};
|
||||
use std::time::Duration;
|
||||
use streamer::BlobReceiver;
|
||||
use window::{self, SharedWindow};
|
||||
|
||||
fn retransmit(
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
recycler: &BlobRecycler,
|
||||
r: &BlobReceiver,
|
||||
sock: &UdpSocket,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let mut dq = r.recv_timeout(timer)?;
|
||||
while let Ok(mut nq) = r.try_recv() {
|
||||
dq.append(&mut nq);
|
||||
}
|
||||
{
|
||||
for b in &dq {
|
||||
Crdt::retransmit(&crdt, b, sock)?;
|
||||
}
|
||||
}
|
||||
while let Some(b) = dq.pop_front() {
|
||||
recycler.recycle(b);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Service to retransmit messages from the leader to layer 1 nodes.
|
||||
/// See `crdt` for network layer definitions.
|
||||
/// # Arguments
|
||||
/// * `sock` - Socket to read from. Read timeout is set to 1.
|
||||
/// * `exit` - Boolean to signal system exit.
|
||||
/// * `crdt` - This structure needs to be updated and populated by the bank and via gossip.
|
||||
/// * `recycler` - Blob recycler.
|
||||
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
|
||||
fn retransmitter(
|
||||
sock: UdpSocket,
|
||||
crdt: Arc<RwLock<Crdt>>,
|
||||
recycler: BlobRecycler,
|
||||
r: BlobReceiver,
|
||||
) -> JoinHandle<()> {
|
||||
Builder::new()
|
||||
.name("solana-retransmitter".to_string())
|
||||
.spawn(move || {
|
||||
trace!("retransmitter started");
|
||||
loop {
|
||||
if let Err(e) = retransmit(&crdt, &recycler, &r, &sock) {
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
_ => {
|
||||
inc_new_counter_info!("streamer-retransmit-error", 1, 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!("exiting retransmitter");
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub struct RetransmitStage {
|
||||
thread_hdls: Vec<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl RetransmitStage {
|
||||
pub fn new(
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
window: SharedWindow,
|
||||
entry_height: u64,
|
||||
retransmit_socket: UdpSocket,
|
||||
blob_recycler: &BlobRecycler,
|
||||
fetch_stage_receiver: BlobReceiver,
|
||||
) -> (Self, BlobReceiver) {
|
||||
let (retransmit_sender, retransmit_receiver) = channel();
|
||||
|
||||
let t_retransmit = retransmitter(
|
||||
retransmit_socket,
|
||||
crdt.clone(),
|
||||
blob_recycler.clone(),
|
||||
retransmit_receiver,
|
||||
);
|
||||
let (blob_sender, blob_receiver) = channel();
|
||||
let t_window = window::window(
|
||||
crdt.clone(),
|
||||
window,
|
||||
entry_height,
|
||||
blob_recycler.clone(),
|
||||
fetch_stage_receiver,
|
||||
blob_sender,
|
||||
retransmit_sender,
|
||||
);
|
||||
let thread_hdls = vec![t_retransmit, t_window];
|
||||
|
||||
(RetransmitStage { thread_hdls }, blob_receiver)
|
||||
}
|
||||
}
|
||||
|
||||
impl Service for RetransmitStage {
|
||||
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
|
||||
self.thread_hdls
|
||||
}
|
||||
|
||||
fn join(self) -> thread::Result<()> {
|
||||
for thread_hdl in self.thread_hdls() {
|
||||
thread_hdl.join()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
963
src/streamer.rs
963
src/streamer.rs
@ -1,50 +1,19 @@
|
||||
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
|
||||
//!
|
||||
use counter::Counter;
|
||||
use crdt::{Crdt, CrdtError, NodeInfo};
|
||||
#[cfg(feature = "erasure")]
|
||||
use erasure;
|
||||
use log::Level;
|
||||
use packet::{
|
||||
Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedBlobs, SharedPackets, BLOB_SIZE,
|
||||
};
|
||||
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlobs, SharedPackets};
|
||||
use result::{Error, Result};
|
||||
use std::cmp;
|
||||
use std::collections::VecDeque;
|
||||
use std::mem;
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::Arc;
|
||||
use std::thread::{Builder, JoinHandle};
|
||||
use std::time::{Duration, Instant};
|
||||
use timing::duration_as_ms;
|
||||
use std::time::Duration;
|
||||
|
||||
pub const WINDOW_SIZE: u64 = 2 * 1024;
|
||||
pub type PacketReceiver = Receiver<SharedPackets>;
|
||||
pub type PacketSender = Sender<SharedPackets>;
|
||||
pub type BlobSender = Sender<SharedBlobs>;
|
||||
pub type BlobReceiver = Receiver<SharedBlobs>;
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct WindowSlot {
|
||||
pub data: Option<SharedBlob>,
|
||||
pub coding: Option<SharedBlob>,
|
||||
}
|
||||
|
||||
pub type SharedWindow = Arc<RwLock<Vec<WindowSlot>>>;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum WindowError {
|
||||
GenericError,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WindowIndex {
|
||||
pub data: u64,
|
||||
pub coding: u64,
|
||||
}
|
||||
|
||||
fn recv_loop(
|
||||
sock: &UdpSocket,
|
||||
exit: &Arc<AtomicBool>,
|
||||
@ -172,782 +141,8 @@ pub fn blob_receiver(
|
||||
Ok(t)
|
||||
}
|
||||
|
||||
fn find_next_missing(
|
||||
window: &SharedWindow,
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
recycler: &BlobRecycler,
|
||||
consumed: u64,
|
||||
received: u64,
|
||||
) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
|
||||
if received <= consumed {
|
||||
Err(WindowError::GenericError)?;
|
||||
}
|
||||
let mut window = window.write().unwrap();
|
||||
let reqs: Vec<_> = (consumed..received)
|
||||
.filter_map(|pix| {
|
||||
let i = (pix % WINDOW_SIZE) as usize;
|
||||
|
||||
if let Some(blob) = mem::replace(&mut window[i].data, None) {
|
||||
let blob_idx = blob.read().unwrap().get_index().unwrap();
|
||||
if blob_idx == pix {
|
||||
mem::replace(&mut window[i].data, Some(blob));
|
||||
} else {
|
||||
recycler.recycle(blob);
|
||||
}
|
||||
}
|
||||
if window[i].data.is_none() {
|
||||
let val = crdt.read().unwrap().window_index_request(pix as u64);
|
||||
if let Ok((to, req)) = val {
|
||||
return Some((to, req));
|
||||
}
|
||||
}
|
||||
None
|
||||
})
|
||||
.collect();
|
||||
Ok(reqs)
|
||||
}
|
||||
|
||||
fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u64) -> u64 {
|
||||
// Calculate the highest blob index that this node should have already received
|
||||
// via avalanche. The avalanche splits data stream into nodes and each node retransmits
|
||||
// the data to their peer nodes. So there's a possibility that a blob (with index lower
|
||||
// than current received index) is being retransmitted by a peer node.
|
||||
let highest_lost = cmp::max(consumed, received.saturating_sub(num_peers));
|
||||
|
||||
// This check prevents repairing a blob that will cause window to roll over. Even if
|
||||
// the highes_lost blob is actually missing, asking to repair it might cause our
|
||||
// current window to move past other missing blobs
|
||||
cmp::min(consumed + WINDOW_SIZE - 1, highest_lost)
|
||||
}
|
||||
|
||||
fn repair_window(
|
||||
debug_id: u64,
|
||||
window: &SharedWindow,
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
recycler: &BlobRecycler,
|
||||
last: &mut u64,
|
||||
times: &mut usize,
|
||||
consumed: u64,
|
||||
received: u64,
|
||||
) -> Result<()> {
|
||||
//exponential backoff
|
||||
if *last != consumed {
|
||||
*times = 0;
|
||||
}
|
||||
*last = consumed;
|
||||
*times += 1;
|
||||
//if times flips from all 1s 7 -> 8, 15 -> 16, we retry otherwise return Ok
|
||||
if *times & (*times - 1) != 0 {
|
||||
trace!("repair_window counter {} {} {}", *times, consumed, received);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let highest_lost = calculate_highest_lost_blob_index(
|
||||
crdt.read().unwrap().table.len() as u64,
|
||||
consumed,
|
||||
received,
|
||||
);
|
||||
let reqs = find_next_missing(window, crdt, recycler, consumed, highest_lost)?;
|
||||
trace!("{:x}: repair_window missing: {}", debug_id, reqs.len());
|
||||
if !reqs.is_empty() {
|
||||
inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
|
||||
info!(
|
||||
"{:x}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}",
|
||||
debug_id,
|
||||
*times,
|
||||
consumed,
|
||||
highest_lost,
|
||||
reqs.len()
|
||||
);
|
||||
}
|
||||
let sock = UdpSocket::bind("0.0.0.0:0")?;
|
||||
for (to, req) in reqs {
|
||||
//todo cache socket
|
||||
debug!(
|
||||
"{:x}: repair_window request {} {} {}",
|
||||
debug_id, consumed, highest_lost, to
|
||||
);
|
||||
assert!(req.len() <= BLOB_SIZE);
|
||||
sock.send_to(&req, to)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn retransmit_all_leader_blocks(
|
||||
maybe_leader: Option<NodeInfo>,
|
||||
dq: &mut SharedBlobs,
|
||||
debug_id: u64,
|
||||
recycler: &BlobRecycler,
|
||||
consumed: u64,
|
||||
received: u64,
|
||||
retransmit: &BlobSender,
|
||||
) -> Result<()> {
|
||||
let mut retransmit_queue = VecDeque::new();
|
||||
if let Some(leader) = maybe_leader {
|
||||
for b in dq {
|
||||
let p = b.read().expect("'b' read lock in fn recv_window");
|
||||
//TODO this check isn't safe against adverserial packets
|
||||
//we need to maintain a sequence window
|
||||
let leader_id = leader.id;
|
||||
trace!(
|
||||
"idx: {} addr: {:?} id: {:?} leader: {:?}",
|
||||
p.get_index().expect("get_index in fn recv_window"),
|
||||
p.get_id().expect("get_id in trace! fn recv_window"),
|
||||
p.meta.addr(),
|
||||
leader_id
|
||||
);
|
||||
if p.get_id().expect("get_id in fn recv_window") == leader_id {
|
||||
//TODO
|
||||
//need to copy the retransmitted blob
|
||||
//otherwise we get into races with which thread
|
||||
//should do the recycling
|
||||
//
|
||||
//a better abstraction would be to recycle when the blob
|
||||
//is dropped via a weakref to the recycler
|
||||
let nv = recycler.allocate();
|
||||
{
|
||||
let mut mnv = nv.write().expect("recycler write lock in fn recv_window");
|
||||
let sz = p.meta.size;
|
||||
mnv.meta.size = sz;
|
||||
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
|
||||
}
|
||||
retransmit_queue.push_back(nv);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("{:x}: no leader to retransmit from", debug_id);
|
||||
}
|
||||
if !retransmit_queue.is_empty() {
|
||||
debug!(
|
||||
"{:x}: RECV_WINDOW {} {}: retransmit {}",
|
||||
debug_id,
|
||||
consumed,
|
||||
received,
|
||||
retransmit_queue.len(),
|
||||
);
|
||||
inc_new_counter_info!("streamer-recv_window-retransmit", retransmit_queue.len());
|
||||
retransmit.send(retransmit_queue)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// process a blob: Add blob to the window. If a continuous set of blobs
|
||||
/// starting from consumed is thereby formed, add that continuous
|
||||
/// range of blobs to a queue to be sent on to the next stage.
|
||||
///
|
||||
/// * `debug_id` - this node's id in a useful-for-debug format
|
||||
/// * `blob` - the blob to be processed into the window and rebroadcast
|
||||
/// * `pix` - the index of the blob, corresponds to
|
||||
/// the entry height of this blob
|
||||
/// * `consume_queue` - output, blobs to be rebroadcast are placed here
|
||||
/// * `window` - the window we're operating on
|
||||
/// * `recycler` - where to return the blob once processed, also where
|
||||
/// to return old blobs from the window
|
||||
/// * `consumed` - input/output, the entry-height to which this
|
||||
/// node has populated and rebroadcast entries
|
||||
fn process_blob(
|
||||
debug_id: u64,
|
||||
blob: SharedBlob,
|
||||
pix: u64,
|
||||
consume_queue: &mut SharedBlobs,
|
||||
window: &SharedWindow,
|
||||
recycler: &BlobRecycler,
|
||||
consumed: &mut u64,
|
||||
) {
|
||||
let mut window = window.write().unwrap();
|
||||
let w = (pix % WINDOW_SIZE) as usize;
|
||||
|
||||
let is_coding = {
|
||||
let blob_r = blob
|
||||
.read()
|
||||
.expect("blob read lock for flogs streamer::window");
|
||||
blob_r.is_coding()
|
||||
};
|
||||
|
||||
// insert a newly received blob into a window slot, clearing out and recycling any previous
|
||||
// blob unless the incoming blob is a duplicate (based on idx)
|
||||
// returns whether the incoming is a duplicate blob
|
||||
fn insert_blob_is_dup(
|
||||
debug_id: u64,
|
||||
blob: SharedBlob,
|
||||
pix: u64,
|
||||
window_slot: &mut Option<SharedBlob>,
|
||||
recycler: &BlobRecycler,
|
||||
c_or_d: &str,
|
||||
) -> bool {
|
||||
if let Some(old) = mem::replace(window_slot, Some(blob)) {
|
||||
let is_dup = old.read().unwrap().get_index().unwrap() == pix;
|
||||
recycler.recycle(old);
|
||||
trace!(
|
||||
"{:x}: occupied {} window slot {:}, is_dup: {}",
|
||||
debug_id,
|
||||
c_or_d,
|
||||
pix,
|
||||
is_dup
|
||||
);
|
||||
is_dup
|
||||
} else {
|
||||
trace!("{:x}: empty {} window slot {:}", debug_id, c_or_d, pix);
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
// insert the new blob into the window, overwrite and recycle old (or duplicate) entry
|
||||
let is_duplicate = if is_coding {
|
||||
insert_blob_is_dup(
|
||||
debug_id,
|
||||
blob,
|
||||
pix,
|
||||
&mut window[w].coding,
|
||||
recycler,
|
||||
"coding",
|
||||
)
|
||||
} else {
|
||||
insert_blob_is_dup(debug_id, blob, pix, &mut window[w].data, recycler, "data")
|
||||
};
|
||||
|
||||
if is_duplicate {
|
||||
return;
|
||||
}
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
{
|
||||
if erasure::recover(
|
||||
debug_id,
|
||||
recycler,
|
||||
&mut window,
|
||||
*consumed,
|
||||
(*consumed % WINDOW_SIZE) as usize,
|
||||
).is_err()
|
||||
{
|
||||
trace!("{:x}: erasure::recover failed", debug_id);
|
||||
}
|
||||
}
|
||||
|
||||
// push all contiguous blobs into consumed queue, increment consumed
|
||||
loop {
|
||||
let k = (*consumed % WINDOW_SIZE) as usize;
|
||||
trace!("{:x}: k: {} consumed: {}", debug_id, k, *consumed,);
|
||||
|
||||
if let Some(blob) = &window[k].data {
|
||||
if blob.read().unwrap().get_index().unwrap() < *consumed {
|
||||
// window wrap-around, end of received
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// window[k].data is None, end of received
|
||||
break;
|
||||
}
|
||||
consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window"));
|
||||
*consumed += 1;
|
||||
}
|
||||
}
|
||||
|
||||
fn blob_idx_in_window(debug_id: u64, pix: u64, consumed: u64, received: &mut u64) -> bool {
|
||||
// Prevent receive window from running over
|
||||
// Got a blob which has already been consumed, skip it
|
||||
// probably from a repair window request
|
||||
if pix < consumed {
|
||||
trace!(
|
||||
"{:x}: received: {} but older than consumed: {} skipping..",
|
||||
debug_id,
|
||||
pix,
|
||||
consumed
|
||||
);
|
||||
false
|
||||
} else {
|
||||
// received always has to be updated even if we don't accept the packet into
|
||||
// the window. The worst case here is the server *starts* outside
|
||||
// the window, none of the packets it receives fits in the window
|
||||
// and repair requests (which are based on received) are never generated
|
||||
*received = cmp::max(pix, *received);
|
||||
|
||||
if pix >= consumed + WINDOW_SIZE {
|
||||
trace!(
|
||||
"{:x}: received: {} will overrun window: {} skipping..",
|
||||
debug_id,
|
||||
pix,
|
||||
consumed + WINDOW_SIZE
|
||||
);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn recv_window(
|
||||
debug_id: u64,
|
||||
window: &SharedWindow,
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
recycler: &BlobRecycler,
|
||||
consumed: &mut u64,
|
||||
received: &mut u64,
|
||||
r: &BlobReceiver,
|
||||
s: &BlobSender,
|
||||
retransmit: &BlobSender,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::from_millis(200);
|
||||
let mut dq = r.recv_timeout(timer)?;
|
||||
let maybe_leader: Option<NodeInfo> = crdt
|
||||
.read()
|
||||
.expect("'crdt' read lock in fn recv_window")
|
||||
.leader_data()
|
||||
.cloned();
|
||||
while let Ok(mut nq) = r.try_recv() {
|
||||
dq.append(&mut nq)
|
||||
}
|
||||
let now = Instant::now();
|
||||
inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100);
|
||||
debug!(
|
||||
"{:x}: RECV_WINDOW {} {}: got packets {}",
|
||||
debug_id,
|
||||
*consumed,
|
||||
*received,
|
||||
dq.len(),
|
||||
);
|
||||
|
||||
retransmit_all_leader_blocks(
|
||||
maybe_leader,
|
||||
&mut dq,
|
||||
debug_id,
|
||||
recycler,
|
||||
*consumed,
|
||||
*received,
|
||||
retransmit,
|
||||
)?;
|
||||
|
||||
let mut pixs = Vec::new();
|
||||
//send a contiguous set of blocks
|
||||
let mut consume_queue = VecDeque::new();
|
||||
while let Some(b) = dq.pop_front() {
|
||||
let (pix, meta_size) = {
|
||||
let p = b.write().expect("'b' write lock in fn recv_window");
|
||||
(p.get_index()?, p.meta.size)
|
||||
};
|
||||
pixs.push(pix);
|
||||
|
||||
if !blob_idx_in_window(debug_id, pix, *consumed, received) {
|
||||
recycler.recycle(b);
|
||||
continue;
|
||||
}
|
||||
|
||||
trace!("{:x} window pix: {} size: {}", debug_id, pix, meta_size);
|
||||
|
||||
process_blob(
|
||||
debug_id,
|
||||
b,
|
||||
pix,
|
||||
&mut consume_queue,
|
||||
window,
|
||||
recycler,
|
||||
consumed,
|
||||
);
|
||||
}
|
||||
if log_enabled!(Level::Trace) {
|
||||
trace!("{}", print_window(debug_id, window, *consumed));
|
||||
}
|
||||
info!(
|
||||
"{:x}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms",
|
||||
debug_id,
|
||||
*consumed,
|
||||
*received,
|
||||
consume_queue.len(),
|
||||
pixs,
|
||||
duration_as_ms(&now.elapsed())
|
||||
);
|
||||
if !consume_queue.is_empty() {
|
||||
debug!(
|
||||
"{:x}: RECV_WINDOW {} {}: forwarding consume_queue {}",
|
||||
debug_id,
|
||||
*consumed,
|
||||
*received,
|
||||
consume_queue.len(),
|
||||
);
|
||||
trace!(
|
||||
"{:x}: sending consume_queue.len: {}",
|
||||
debug_id,
|
||||
consume_queue.len()
|
||||
);
|
||||
inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len());
|
||||
s.send(consume_queue)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_window(debug_id: u64, window: &SharedWindow, consumed: u64) -> String {
|
||||
let pointer: Vec<_> = window
|
||||
.read()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, _v)| {
|
||||
if i == (consumed % WINDOW_SIZE) as usize {
|
||||
"V"
|
||||
} else {
|
||||
" "
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let buf: Vec<_> = window
|
||||
.read()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|v| {
|
||||
if v.data.is_none() && v.coding.is_none() {
|
||||
"O"
|
||||
} else if v.data.is_some() && v.coding.is_some() {
|
||||
"D"
|
||||
} else if v.data.is_some() {
|
||||
// coding.is_none()
|
||||
"d"
|
||||
} else {
|
||||
// data.is_none()
|
||||
"c"
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
format!(
|
||||
"\n{:x}: WINDOW ({}): {}\n{:x}: WINDOW ({}): {}",
|
||||
debug_id,
|
||||
consumed,
|
||||
pointer.join(""),
|
||||
debug_id,
|
||||
consumed,
|
||||
buf.join("")
|
||||
)
|
||||
}
|
||||
|
||||
pub fn default_window() -> SharedWindow {
|
||||
Arc::new(RwLock::new(vec![
|
||||
WindowSlot::default();
|
||||
WINDOW_SIZE as usize
|
||||
]))
|
||||
}
|
||||
|
||||
pub fn index_blobs(
|
||||
node_info: &NodeInfo,
|
||||
blobs: &[SharedBlob],
|
||||
receive_index: &mut u64,
|
||||
) -> Result<()> {
|
||||
// enumerate all the blobs, those are the indices
|
||||
trace!("{:x}: INDEX_BLOBS {}", node_info.debug_id(), blobs.len());
|
||||
for (i, b) in blobs.iter().enumerate() {
|
||||
// only leader should be broadcasting
|
||||
let mut blob = b.write().expect("'blob' write lock in crdt::index_blobs");
|
||||
blob.set_id(node_info.id)
|
||||
.expect("set_id in pub fn broadcast");
|
||||
blob.set_index(*receive_index + i as u64)
|
||||
.expect("set_index in pub fn broadcast");
|
||||
blob.set_flags(0).unwrap();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Initialize a rebroadcast window with most recent Entry blobs
|
||||
/// * `crdt` - gossip instance, used to set blob ids
|
||||
/// * `blobs` - up to WINDOW_SIZE most recent blobs
|
||||
/// * `entry_height` - current entry height
|
||||
pub fn initialized_window(
|
||||
node_info: &NodeInfo,
|
||||
blobs: Vec<SharedBlob>,
|
||||
entry_height: u64,
|
||||
) -> SharedWindow {
|
||||
let window = default_window();
|
||||
let debug_id = node_info.debug_id();
|
||||
|
||||
{
|
||||
let mut win = window.write().unwrap();
|
||||
|
||||
trace!(
|
||||
"{:x} initialized window entry_height:{} blobs_len:{}",
|
||||
debug_id,
|
||||
entry_height,
|
||||
blobs.len()
|
||||
);
|
||||
|
||||
// Index the blobs
|
||||
let mut received = entry_height - blobs.len() as u64;
|
||||
index_blobs(&node_info, &blobs, &mut received).expect("index blobs for initial window");
|
||||
|
||||
// populate the window, offset by implied index
|
||||
let diff = cmp::max(blobs.len() as isize - win.len() as isize, 0) as usize;
|
||||
for b in blobs.into_iter().skip(diff) {
|
||||
let ix = b.read().unwrap().get_index().expect("blob index");
|
||||
let pos = (ix % WINDOW_SIZE) as usize;
|
||||
trace!("{:x} caching {} at {}", debug_id, ix, pos);
|
||||
assert!(win[pos].data.is_none());
|
||||
win[pos].data = Some(b);
|
||||
}
|
||||
}
|
||||
|
||||
window
|
||||
}
|
||||
|
||||
pub fn window(
|
||||
crdt: Arc<RwLock<Crdt>>,
|
||||
window: SharedWindow,
|
||||
entry_height: u64,
|
||||
recycler: BlobRecycler,
|
||||
r: BlobReceiver,
|
||||
s: BlobSender,
|
||||
retransmit: BlobSender,
|
||||
) -> JoinHandle<()> {
|
||||
Builder::new()
|
||||
.name("solana-window".to_string())
|
||||
.spawn(move || {
|
||||
let mut consumed = entry_height;
|
||||
let mut received = entry_height;
|
||||
let mut last = entry_height;
|
||||
let mut times = 0;
|
||||
let debug_id = crdt.read().unwrap().debug_id();
|
||||
trace!("{:x}: RECV_WINDOW started", debug_id);
|
||||
loop {
|
||||
if let Err(e) = recv_window(
|
||||
debug_id,
|
||||
&window,
|
||||
&crdt,
|
||||
&recycler,
|
||||
&mut consumed,
|
||||
&mut received,
|
||||
&r,
|
||||
&s,
|
||||
&retransmit,
|
||||
) {
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
_ => {
|
||||
inc_new_counter_info!("streamer-window-error", 1, 1);
|
||||
error!("window error: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
let _ = repair_window(
|
||||
debug_id, &window, &crdt, &recycler, &mut last, &mut times, consumed, received,
|
||||
);
|
||||
}
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn broadcast(
|
||||
node_info: &NodeInfo,
|
||||
broadcast_table: &[NodeInfo],
|
||||
window: &SharedWindow,
|
||||
recycler: &BlobRecycler,
|
||||
r: &BlobReceiver,
|
||||
sock: &UdpSocket,
|
||||
transmit_index: &mut WindowIndex,
|
||||
receive_index: &mut u64,
|
||||
) -> Result<()> {
|
||||
let debug_id = node_info.debug_id();
|
||||
let timer = Duration::new(1, 0);
|
||||
let mut dq = r.recv_timeout(timer)?;
|
||||
while let Ok(mut nq) = r.try_recv() {
|
||||
dq.append(&mut nq);
|
||||
}
|
||||
|
||||
// flatten deque to vec
|
||||
let blobs_vec: Vec<_> = dq.into_iter().collect();
|
||||
|
||||
// We could receive more blobs than window slots so
|
||||
// break them up into window-sized chunks to process
|
||||
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec());
|
||||
|
||||
if log_enabled!(Level::Trace) {
|
||||
trace!("{}", print_window(debug_id, window, *receive_index));
|
||||
}
|
||||
|
||||
for mut blobs in blobs_chunked {
|
||||
let blobs_len = blobs.len();
|
||||
trace!("{:x}: broadcast blobs.len: {}", debug_id, blobs_len);
|
||||
|
||||
// Index the blobs
|
||||
index_blobs(node_info, &blobs, receive_index).expect("index blobs for initial window");
|
||||
|
||||
// keep the cache of blobs that are broadcast
|
||||
inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
|
||||
{
|
||||
let mut win = window.write().unwrap();
|
||||
assert!(blobs.len() <= win.len());
|
||||
for b in &blobs {
|
||||
let ix = b.read().unwrap().get_index().expect("blob index");
|
||||
let pos = (ix % WINDOW_SIZE) as usize;
|
||||
if let Some(x) = mem::replace(&mut win[pos].data, None) {
|
||||
trace!(
|
||||
"{:x} popped {} at {}",
|
||||
debug_id,
|
||||
x.read().unwrap().get_index().unwrap(),
|
||||
pos
|
||||
);
|
||||
recycler.recycle(x);
|
||||
}
|
||||
if let Some(x) = mem::replace(&mut win[pos].coding, None) {
|
||||
trace!(
|
||||
"{:x} popped {} at {}",
|
||||
debug_id,
|
||||
x.read().unwrap().get_index().unwrap(),
|
||||
pos
|
||||
);
|
||||
recycler.recycle(x);
|
||||
}
|
||||
|
||||
trace!("{:x} null {}", debug_id, pos);
|
||||
}
|
||||
while let Some(b) = blobs.pop() {
|
||||
let ix = b.read().unwrap().get_index().expect("blob index");
|
||||
let pos = (ix % WINDOW_SIZE) as usize;
|
||||
trace!("{:x} caching {} at {}", debug_id, ix, pos);
|
||||
assert!(win[pos].data.is_none());
|
||||
win[pos].data = Some(b);
|
||||
}
|
||||
}
|
||||
|
||||
// Fill in the coding blob data from the window data blobs
|
||||
#[cfg(feature = "erasure")]
|
||||
{
|
||||
erasure::generate_coding(
|
||||
debug_id,
|
||||
&mut window.write().unwrap(),
|
||||
recycler,
|
||||
*receive_index,
|
||||
blobs_len,
|
||||
&mut transmit_index.coding,
|
||||
)?;
|
||||
}
|
||||
|
||||
*receive_index += blobs_len as u64;
|
||||
|
||||
// Send blobs out from the window
|
||||
Crdt::broadcast(
|
||||
&node_info,
|
||||
&broadcast_table,
|
||||
&window,
|
||||
&sock,
|
||||
transmit_index,
|
||||
*receive_index,
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Service to broadcast messages from the leader to layer 1 nodes.
|
||||
/// See `crdt` for network layer definitions.
|
||||
/// # Arguments
|
||||
/// * `sock` - Socket to send from.
|
||||
/// * `exit` - Boolean to signal system exit.
|
||||
/// * `crdt` - CRDT structure
|
||||
/// * `window` - Cache of blobs that we have broadcast
|
||||
/// * `recycler` - Blob recycler.
|
||||
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
|
||||
pub fn broadcaster(
|
||||
sock: UdpSocket,
|
||||
crdt: Arc<RwLock<Crdt>>,
|
||||
window: SharedWindow,
|
||||
entry_height: u64,
|
||||
recycler: BlobRecycler,
|
||||
r: BlobReceiver,
|
||||
) -> JoinHandle<()> {
|
||||
Builder::new()
|
||||
.name("solana-broadcaster".to_string())
|
||||
.spawn(move || {
|
||||
let mut transmit_index = WindowIndex {
|
||||
data: entry_height,
|
||||
coding: entry_height,
|
||||
};
|
||||
let mut receive_index = entry_height;
|
||||
let me = crdt.read().unwrap().my_data().clone();
|
||||
loop {
|
||||
let broadcast_table = crdt.read().unwrap().compute_broadcast_table();
|
||||
if let Err(e) = broadcast(
|
||||
&me,
|
||||
&broadcast_table,
|
||||
&window,
|
||||
&recycler,
|
||||
&r,
|
||||
&sock,
|
||||
&mut transmit_index,
|
||||
&mut receive_index,
|
||||
) {
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
|
||||
_ => {
|
||||
inc_new_counter_info!("streamer-broadcaster-error", 1, 1);
|
||||
error!("broadcaster error: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn retransmit(
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
recycler: &BlobRecycler,
|
||||
r: &BlobReceiver,
|
||||
sock: &UdpSocket,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let mut dq = r.recv_timeout(timer)?;
|
||||
while let Ok(mut nq) = r.try_recv() {
|
||||
dq.append(&mut nq);
|
||||
}
|
||||
{
|
||||
for b in &dq {
|
||||
Crdt::retransmit(&crdt, b, sock)?;
|
||||
}
|
||||
}
|
||||
while let Some(b) = dq.pop_front() {
|
||||
recycler.recycle(b);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Service to retransmit messages from the leader to layer 1 nodes.
|
||||
/// See `crdt` for network layer definitions.
|
||||
/// # Arguments
|
||||
/// * `sock` - Socket to read from. Read timeout is set to 1.
|
||||
/// * `exit` - Boolean to signal system exit.
|
||||
/// * `crdt` - This structure needs to be updated and populated by the bank and via gossip.
|
||||
/// * `recycler` - Blob recycler.
|
||||
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
|
||||
pub fn retransmitter(
|
||||
sock: UdpSocket,
|
||||
crdt: Arc<RwLock<Crdt>>,
|
||||
recycler: BlobRecycler,
|
||||
r: BlobReceiver,
|
||||
) -> JoinHandle<()> {
|
||||
Builder::new()
|
||||
.name("solana-retransmitter".to_string())
|
||||
.spawn(move || {
|
||||
trace!("retransmitter started");
|
||||
loop {
|
||||
if let Err(e) = retransmit(&crdt, &recycler, &r, &sock) {
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
_ => {
|
||||
inc_new_counter_info!("streamer-retransmit-error", 1, 1);
|
||||
error!("retransmitter error: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!("exiting retransmitter");
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crdt::{Crdt, TestNode};
|
||||
use logger;
|
||||
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
|
||||
use std::collections::VecDeque;
|
||||
use std::io;
|
||||
@ -955,12 +150,10 @@ mod test {
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use streamer::blob_idx_in_window;
|
||||
use streamer::calculate_highest_lost_blob_index;
|
||||
use streamer::{blob_receiver, receiver, responder, window};
|
||||
use streamer::{default_window, BlobReceiver, PacketReceiver, WINDOW_SIZE};
|
||||
use streamer::PacketReceiver;
|
||||
use streamer::{receiver, responder};
|
||||
|
||||
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
||||
for _t in 0..5 {
|
||||
@ -1022,144 +215,4 @@ mod test {
|
||||
t_receiver.join().expect("join");
|
||||
t_responder.join().expect("join");
|
||||
}
|
||||
|
||||
fn get_blobs(r: BlobReceiver, num: &mut usize) {
|
||||
for _t in 0..5 {
|
||||
let timer = Duration::new(1, 0);
|
||||
match r.recv_timeout(timer) {
|
||||
Ok(m) => {
|
||||
for (i, v) in m.iter().enumerate() {
|
||||
assert_eq!(v.read().unwrap().get_index().unwrap() as usize, *num + i);
|
||||
}
|
||||
*num += m.len();
|
||||
}
|
||||
e => info!("error {:?}", e),
|
||||
}
|
||||
if *num == 10 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn window_send_test() {
|
||||
logger::setup();
|
||||
let tn = TestNode::new_localhost();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let mut crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new");
|
||||
let me_id = crdt_me.my_data().id;
|
||||
crdt_me.set_leader(me_id);
|
||||
let subs = Arc::new(RwLock::new(crdt_me));
|
||||
|
||||
let resp_recycler = BlobRecycler::default();
|
||||
let (s_reader, r_reader) = channel();
|
||||
let t_receiver = blob_receiver(
|
||||
exit.clone(),
|
||||
resp_recycler.clone(),
|
||||
tn.sockets.gossip,
|
||||
s_reader,
|
||||
).unwrap();
|
||||
let (s_window, r_window) = channel();
|
||||
let (s_retransmit, r_retransmit) = channel();
|
||||
let win = default_window();
|
||||
let t_window = window(
|
||||
subs,
|
||||
win,
|
||||
0,
|
||||
resp_recycler.clone(),
|
||||
r_reader,
|
||||
s_window,
|
||||
s_retransmit,
|
||||
);
|
||||
let t_responder = {
|
||||
let (s_responder, r_responder) = channel();
|
||||
let t_responder = responder(
|
||||
"window_send_test",
|
||||
tn.sockets.replicate,
|
||||
resp_recycler.clone(),
|
||||
r_responder,
|
||||
);
|
||||
let mut msgs = VecDeque::new();
|
||||
for v in 0..10 {
|
||||
let i = 9 - v;
|
||||
let b = resp_recycler.allocate();
|
||||
{
|
||||
let mut w = b.write().unwrap();
|
||||
w.set_index(i).unwrap();
|
||||
w.set_id(me_id).unwrap();
|
||||
assert_eq!(i, w.get_index().unwrap());
|
||||
w.meta.size = PACKET_DATA_SIZE;
|
||||
w.meta.set_addr(&tn.data.contact_info.ncp);
|
||||
}
|
||||
msgs.push_back(b);
|
||||
}
|
||||
s_responder.send(msgs).expect("send");
|
||||
t_responder
|
||||
};
|
||||
|
||||
let mut num = 0;
|
||||
get_blobs(r_window, &mut num);
|
||||
assert_eq!(num, 10);
|
||||
let mut q = r_retransmit.recv().unwrap();
|
||||
while let Ok(mut nq) = r_retransmit.try_recv() {
|
||||
q.append(&mut nq);
|
||||
}
|
||||
assert_eq!(q.len(), 10);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
t_receiver.join().expect("join");
|
||||
t_responder.join().expect("join");
|
||||
t_window.join().expect("join");
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn calculate_highest_lost_blob_index_test() {
|
||||
assert_eq!(calculate_highest_lost_blob_index(0, 10, 90), 90);
|
||||
assert_eq!(calculate_highest_lost_blob_index(15, 10, 90), 75);
|
||||
assert_eq!(calculate_highest_lost_blob_index(90, 10, 90), 10);
|
||||
assert_eq!(calculate_highest_lost_blob_index(90, 10, 50), 10);
|
||||
assert_eq!(calculate_highest_lost_blob_index(90, 10, 99), 10);
|
||||
assert_eq!(calculate_highest_lost_blob_index(90, 10, 101), 11);
|
||||
assert_eq!(
|
||||
calculate_highest_lost_blob_index(90, 10, 95 + WINDOW_SIZE),
|
||||
WINDOW_SIZE + 5
|
||||
);
|
||||
assert_eq!(
|
||||
calculate_highest_lost_blob_index(90, 10, 99 + WINDOW_SIZE),
|
||||
WINDOW_SIZE + 9
|
||||
);
|
||||
assert_eq!(
|
||||
calculate_highest_lost_blob_index(90, 10, 100 + WINDOW_SIZE),
|
||||
WINDOW_SIZE + 9
|
||||
);
|
||||
assert_eq!(
|
||||
calculate_highest_lost_blob_index(90, 10, 120 + WINDOW_SIZE),
|
||||
WINDOW_SIZE + 9
|
||||
);
|
||||
}
|
||||
|
||||
fn wrap_blob_idx_in_window(
|
||||
debug_id: u64,
|
||||
pix: u64,
|
||||
consumed: u64,
|
||||
received: u64,
|
||||
) -> (bool, u64) {
|
||||
let mut received = received;
|
||||
let is_in_window = blob_idx_in_window(debug_id, pix, consumed, &mut received);
|
||||
(is_in_window, received)
|
||||
}
|
||||
#[test]
|
||||
pub fn blob_idx_in_window_test() {
|
||||
assert_eq!(
|
||||
wrap_blob_idx_in_window(0, 90 + WINDOW_SIZE, 90, 100),
|
||||
(false, 90 + WINDOW_SIZE)
|
||||
);
|
||||
assert_eq!(
|
||||
wrap_blob_idx_in_window(0, 91 + WINDOW_SIZE, 90, 100),
|
||||
(false, 91 + WINDOW_SIZE)
|
||||
);
|
||||
assert_eq!(wrap_blob_idx_in_window(0, 89, 90, 100), (false, 100));
|
||||
|
||||
assert_eq!(wrap_blob_idx_in_window(0, 91, 90, 100), (true, 100));
|
||||
assert_eq!(wrap_blob_idx_in_window(0, 101, 90, 100), (true, 101));
|
||||
}
|
||||
}
|
||||
|
@ -319,7 +319,7 @@ mod tests {
|
||||
use bank::Bank;
|
||||
use budget::Budget;
|
||||
use crdt::TestNode;
|
||||
use fullnode::FullNode;
|
||||
use fullnode::Fullnode;
|
||||
use ledger::LedgerWriter;
|
||||
use logger;
|
||||
use mint::Mint;
|
||||
@ -331,9 +331,11 @@ mod tests {
|
||||
use transaction::{Instruction, Plan};
|
||||
|
||||
fn tmp_ledger(name: &str, mint: &Mint) -> String {
|
||||
use std::env;
|
||||
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
|
||||
let keypair = Keypair::new();
|
||||
|
||||
let path = format!("/tmp/tmp-ledger-{}-{}", name, keypair.pubkey());
|
||||
let path = format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey());
|
||||
|
||||
let mut writer = LedgerWriter::open(&path, true).unwrap();
|
||||
writer.write_entries(mint.create_entries()).unwrap();
|
||||
@ -354,12 +356,11 @@ mod tests {
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let ledger_path = tmp_ledger("thin_client", &alice);
|
||||
|
||||
let server = FullNode::new_leader(
|
||||
let server = Fullnode::new_leader(
|
||||
leader_keypair,
|
||||
bank,
|
||||
0,
|
||||
None,
|
||||
Some(Duration::from_millis(30)),
|
||||
&[],
|
||||
leader,
|
||||
exit.clone(),
|
||||
&ledger_path,
|
||||
@ -402,12 +403,11 @@ mod tests {
|
||||
let leader_data = leader.data.clone();
|
||||
let ledger_path = tmp_ledger("bad_sig", &alice);
|
||||
|
||||
let server = FullNode::new_leader(
|
||||
let server = Fullnode::new_leader(
|
||||
leader_keypair,
|
||||
bank,
|
||||
0,
|
||||
None,
|
||||
Some(Duration::from_millis(30)),
|
||||
&[],
|
||||
leader,
|
||||
exit.clone(),
|
||||
&ledger_path,
|
||||
@ -462,12 +462,11 @@ mod tests {
|
||||
let leader_data = leader.data.clone();
|
||||
let ledger_path = tmp_ledger("client_check_signature", &alice);
|
||||
|
||||
let server = FullNode::new_leader(
|
||||
let server = Fullnode::new_leader(
|
||||
leader_keypair,
|
||||
bank,
|
||||
0,
|
||||
None,
|
||||
Some(Duration::from_millis(30)),
|
||||
&[],
|
||||
leader,
|
||||
exit.clone(),
|
||||
&ledger_path,
|
||||
|
63
src/tvu.rs
63
src/tvu.rs
@ -2,29 +2,29 @@
|
||||
//! 3-stage transaction validation pipeline in software.
|
||||
//!
|
||||
//! ```text
|
||||
//! .--------------------------------------------.
|
||||
//! | |
|
||||
//! | .--------------------------------+------------.
|
||||
//! | | TVU | |
|
||||
//! | | | |
|
||||
//! | | | | .------------.
|
||||
//! | | .------------+-------------->| Validators |
|
||||
//! v | .-------. | | | `------------`
|
||||
//! .----+---. | | | .----+---. .----+---------. |
|
||||
//! | Leader |--------->| Blob | | Window | | Replicate | |
|
||||
//! `--------` | | Fetch |-->| Stage |-->| Stage / | |
|
||||
//! .------------. | | Stage | | | | Vote Stage | |
|
||||
//! | Validators |----->| | `--------` `----+---------` |
|
||||
//! `------------` | `-------` | |
|
||||
//! | | |
|
||||
//! | | |
|
||||
//! | | |
|
||||
//! `--------------------------------|------------`
|
||||
//! |
|
||||
//! v
|
||||
//! .------.
|
||||
//! | Bank |
|
||||
//! `------`
|
||||
//! .------------------------------------------------.
|
||||
//! | |
|
||||
//! | .------------------------------------+------------.
|
||||
//! | | TVU | |
|
||||
//! | | | |
|
||||
//! | | | | .------------.
|
||||
//! | | .----------------+-------------->| Validators |
|
||||
//! v | .-------. | | | `------------`
|
||||
//! .----+---. | | | .----+-------. .----+---------. |
|
||||
//! | Leader |--------->| Blob | | Retransmit | | Replicate | |
|
||||
//! `--------` | | Fetch |-->| Stage |-->| Stage / | |
|
||||
//! .------------. | | Stage | | | | Vote Stage | |
|
||||
//! | Validators |----->| | `------------` `----+---------` |
|
||||
//! `------------` | `-------` | |
|
||||
//! | | |
|
||||
//! | | |
|
||||
//! | | |
|
||||
//! `------------------------------------|------------`
|
||||
//! |
|
||||
//! v
|
||||
//! .------.
|
||||
//! | Bank |
|
||||
//! `------`
|
||||
//! ```
|
||||
//!
|
||||
//! 1. Fetch Stage
|
||||
@ -41,19 +41,19 @@ use blob_fetch_stage::BlobFetchStage;
|
||||
use crdt::Crdt;
|
||||
use packet::BlobRecycler;
|
||||
use replicate_stage::ReplicateStage;
|
||||
use retransmit_stage::RetransmitStage;
|
||||
use service::Service;
|
||||
use signature::Keypair;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{self, JoinHandle};
|
||||
use streamer::SharedWindow;
|
||||
use window_stage::WindowStage;
|
||||
use window::SharedWindow;
|
||||
|
||||
pub struct Tvu {
|
||||
replicate_stage: ReplicateStage,
|
||||
fetch_stage: BlobFetchStage,
|
||||
window_stage: WindowStage,
|
||||
retransmit_stage: RetransmitStage,
|
||||
}
|
||||
|
||||
impl Tvu {
|
||||
@ -90,7 +90,7 @@ impl Tvu {
|
||||
//TODO
|
||||
//the packets coming out of blob_receiver need to be sent to the GPU and verified
|
||||
//then sent to the window, which does the erasure coding reconstruction
|
||||
let (window_stage, blob_window_receiver) = WindowStage::new(
|
||||
let (retransmit_stage, blob_window_receiver) = RetransmitStage::new(
|
||||
&crdt,
|
||||
window,
|
||||
entry_height,
|
||||
@ -112,7 +112,7 @@ impl Tvu {
|
||||
Tvu {
|
||||
replicate_stage,
|
||||
fetch_stage,
|
||||
window_stage,
|
||||
retransmit_stage,
|
||||
}
|
||||
}
|
||||
|
||||
@ -127,7 +127,7 @@ impl Service for Tvu {
|
||||
let mut thread_hdls = vec![];
|
||||
thread_hdls.extend(self.replicate_stage.thread_hdls().into_iter());
|
||||
thread_hdls.extend(self.fetch_stage.thread_hdls().into_iter());
|
||||
thread_hdls.extend(self.window_stage.thread_hdls().into_iter());
|
||||
thread_hdls.extend(self.retransmit_stage.thread_hdls().into_iter());
|
||||
thread_hdls
|
||||
}
|
||||
|
||||
@ -159,16 +159,17 @@ pub mod tests {
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
use streamer::{self, SharedWindow};
|
||||
use streamer;
|
||||
use transaction::Transaction;
|
||||
use tvu::Tvu;
|
||||
use window::{self, SharedWindow};
|
||||
|
||||
fn new_ncp(
|
||||
crdt: Arc<RwLock<Crdt>>,
|
||||
listen: UdpSocket,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> Result<(Ncp, SharedWindow)> {
|
||||
let window = streamer::default_window();
|
||||
let window = window::default_window();
|
||||
let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
|
||||
let ncp = Ncp::new(&crdt, window.clone(), None, listen, send_sock, exit)?;
|
||||
Ok((ncp, window))
|
||||
|
1043
src/window.rs
Normal file
1043
src/window.rs
Normal file
File diff suppressed because it is too large
Load Diff
@ -1,60 +0,0 @@
|
||||
//! The `window_stage` maintains the blob window
|
||||
|
||||
use crdt::Crdt;
|
||||
use packet::BlobRecycler;
|
||||
use service::Service;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{self, JoinHandle};
|
||||
use streamer::{self, BlobReceiver, SharedWindow};
|
||||
|
||||
pub struct WindowStage {
|
||||
thread_hdls: Vec<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl WindowStage {
|
||||
pub fn new(
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
window: SharedWindow,
|
||||
entry_height: u64,
|
||||
retransmit_socket: UdpSocket,
|
||||
blob_recycler: &BlobRecycler,
|
||||
fetch_stage_receiver: BlobReceiver,
|
||||
) -> (Self, BlobReceiver) {
|
||||
let (retransmit_sender, retransmit_receiver) = channel();
|
||||
|
||||
let t_retransmit = streamer::retransmitter(
|
||||
retransmit_socket,
|
||||
crdt.clone(),
|
||||
blob_recycler.clone(),
|
||||
retransmit_receiver,
|
||||
);
|
||||
let (blob_sender, blob_receiver) = channel();
|
||||
let t_window = streamer::window(
|
||||
crdt.clone(),
|
||||
window,
|
||||
entry_height,
|
||||
blob_recycler.clone(),
|
||||
fetch_stage_receiver,
|
||||
blob_sender,
|
||||
retransmit_sender,
|
||||
);
|
||||
let thread_hdls = vec![t_retransmit, t_window];
|
||||
|
||||
(WindowStage { thread_hdls }, blob_receiver)
|
||||
}
|
||||
}
|
||||
|
||||
impl Service for WindowStage {
|
||||
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
|
||||
self.thread_hdls
|
||||
}
|
||||
|
||||
fn join(self) -> thread::Result<()> {
|
||||
for thread_hdl in self.thread_hdls() {
|
||||
thread_hdl.join()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
189
tests/multinode.rs
Executable file → Normal file
189
tests/multinode.rs
Executable file → Normal file
@ -7,7 +7,7 @@ extern crate solana;
|
||||
|
||||
use solana::crdt::{Crdt, NodeInfo, TestNode};
|
||||
use solana::entry::Entry;
|
||||
use solana::fullnode::FullNode;
|
||||
use solana::fullnode::Fullnode;
|
||||
use solana::hash::Hash;
|
||||
use solana::ledger::LedgerWriter;
|
||||
use solana::logger;
|
||||
@ -16,10 +16,9 @@ use solana::ncp::Ncp;
|
||||
use solana::result;
|
||||
use solana::service::Service;
|
||||
use solana::signature::{Keypair, KeypairUtil, Pubkey};
|
||||
use solana::streamer::{default_window, WINDOW_SIZE};
|
||||
use solana::thin_client::ThinClient;
|
||||
use solana::timing::duration_as_s;
|
||||
use std::cmp::max;
|
||||
use solana::timing::{duration_as_ms, duration_as_s};
|
||||
use solana::window::{default_window, WINDOW_SIZE};
|
||||
use std::env;
|
||||
use std::fs::{copy, create_dir_all, remove_dir_all};
|
||||
use std::net::UdpSocket;
|
||||
@ -146,7 +145,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
|
||||
writer.write_entries(entries).unwrap();
|
||||
}
|
||||
|
||||
let leader = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None);
|
||||
let leader = Fullnode::new(leader, true, &leader_ledger_path, leader_keypair, None);
|
||||
|
||||
// Send leader some tokens to vote
|
||||
let leader_balance =
|
||||
@ -158,7 +157,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
|
||||
let keypair = Keypair::new();
|
||||
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
|
||||
let validator_data = validator.data.clone();
|
||||
let validator = FullNode::new(
|
||||
let validator = Fullnode::new(
|
||||
validator,
|
||||
false,
|
||||
&zero_ledger_path,
|
||||
@ -219,7 +218,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
|
||||
);
|
||||
ledger_paths.push(zero_ledger_path.clone());
|
||||
|
||||
let server = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None);
|
||||
let server = Fullnode::new(leader, true, &leader_ledger_path, leader_keypair, None);
|
||||
|
||||
// Send leader some tokens to vote
|
||||
let leader_balance =
|
||||
@ -236,7 +235,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
|
||||
);
|
||||
ledger_paths.push(ledger_path.clone());
|
||||
|
||||
let mut val = FullNode::new(
|
||||
let mut val = Fullnode::new(
|
||||
validator,
|
||||
false,
|
||||
&ledger_path,
|
||||
@ -271,7 +270,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
|
||||
// balances
|
||||
let keypair = Keypair::new();
|
||||
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
|
||||
let val = FullNode::new(
|
||||
let val = Fullnode::new(
|
||||
validator,
|
||||
false,
|
||||
&zero_ledger_path,
|
||||
@ -336,7 +335,7 @@ fn test_multi_node_basic() {
|
||||
|
||||
let (alice, leader_ledger_path) = genesis("multi_node_basic", 10_000);
|
||||
ledger_paths.push(leader_ledger_path.clone());
|
||||
let server = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None);
|
||||
let server = Fullnode::new(leader, true, &leader_ledger_path, leader_keypair, None);
|
||||
|
||||
// Send leader some tokens to vote
|
||||
let leader_balance =
|
||||
@ -349,7 +348,7 @@ fn test_multi_node_basic() {
|
||||
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
|
||||
let ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_basic");
|
||||
ledger_paths.push(ledger_path.clone());
|
||||
let val = FullNode::new(
|
||||
let val = Fullnode::new(
|
||||
validator,
|
||||
false,
|
||||
&ledger_path,
|
||||
@ -397,7 +396,7 @@ fn test_boot_validator_from_file() -> result::Result<()> {
|
||||
ledger_paths.push(leader_ledger_path.clone());
|
||||
|
||||
let leader_data = leader.data.clone();
|
||||
let leader_fullnode = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None);
|
||||
let leader_fullnode = Fullnode::new(leader, true, &leader_ledger_path, leader_keypair, None);
|
||||
let leader_balance =
|
||||
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap();
|
||||
assert_eq!(leader_balance, 500);
|
||||
@ -410,7 +409,7 @@ fn test_boot_validator_from_file() -> result::Result<()> {
|
||||
let validator_data = validator.data.clone();
|
||||
let ledger_path = tmp_copy_ledger(&leader_ledger_path, "boot_validator_from_file");
|
||||
ledger_paths.push(ledger_path.clone());
|
||||
let val_fullnode = FullNode::new(
|
||||
let val_fullnode = Fullnode::new(
|
||||
validator,
|
||||
false,
|
||||
&ledger_path,
|
||||
@ -430,11 +429,11 @@ fn test_boot_validator_from_file() -> result::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_leader(ledger_path: &str) -> (NodeInfo, FullNode) {
|
||||
fn create_leader(ledger_path: &str) -> (NodeInfo, Fullnode) {
|
||||
let leader_keypair = Keypair::new();
|
||||
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
|
||||
let leader_data = leader.data.clone();
|
||||
let leader_fullnode = FullNode::new(leader, true, &ledger_path, leader_keypair, None);
|
||||
let leader_fullnode = Fullnode::new(leader, true, &ledger_path, leader_keypair, None);
|
||||
(leader_data, leader_fullnode)
|
||||
}
|
||||
|
||||
@ -479,7 +478,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
|
||||
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
|
||||
let validator_data = validator.data.clone();
|
||||
|
||||
let val_fullnode = FullNode::new(
|
||||
let val_fullnode = Fullnode::new(
|
||||
validator,
|
||||
false,
|
||||
&stale_ledger_path,
|
||||
@ -524,15 +523,7 @@ fn test_multi_node_dynamic_network() {
|
||||
Ok(val) => val
|
||||
.parse()
|
||||
.expect(&format!("env var {} is not parse-able as usize", key)),
|
||||
Err(_) => 100,
|
||||
};
|
||||
|
||||
let purge_key = "SOLANA_DYNAMIC_NODES_PURGE_LAG";
|
||||
let purge_lag: usize = match env::var(purge_key) {
|
||||
Ok(val) => val
|
||||
.parse()
|
||||
.expect(&format!("env var {} is not parse-able as usize", purge_key)),
|
||||
Err(_) => std::usize::MAX,
|
||||
Err(_) => 200,
|
||||
};
|
||||
|
||||
let leader_keypair = Keypair::new();
|
||||
@ -548,7 +539,7 @@ fn test_multi_node_dynamic_network() {
|
||||
let leader_data = leader.data.clone();
|
||||
|
||||
let server =
|
||||
FullNode::new_without_sigverify(leader, true, &leader_ledger_path, leader_keypair, None);
|
||||
Fullnode::new_without_sigverify(leader, true, &leader_ledger_path, leader_keypair, None);
|
||||
|
||||
// Send leader some tokens to vote
|
||||
let leader_balance = send_tx_and_retry_get_balance(
|
||||
@ -616,7 +607,7 @@ fn test_multi_node_dynamic_network() {
|
||||
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
|
||||
let rd = validator.data.clone();
|
||||
info!("starting {} {:x}", keypair.pubkey(), rd.debug_id());
|
||||
let val = FullNode::new_without_sigverify(
|
||||
let val = Fullnode::new_without_sigverify(
|
||||
validator,
|
||||
false,
|
||||
&ledger_path,
|
||||
@ -631,94 +622,78 @@ fn test_multi_node_dynamic_network() {
|
||||
|
||||
let mut validators: Vec<_> = t2.into_iter().map(|t| t.join().unwrap()).collect();
|
||||
|
||||
let now = Instant::now();
|
||||
let mut client = mk_client(&leader_data);
|
||||
let mut last_finality = client.get_finality();
|
||||
info!("Last finality {}", last_finality);
|
||||
let start = Instant::now();
|
||||
let mut consecutive_success = 0;
|
||||
let mut failures = 0;
|
||||
let mut max_distance_increase = 0i64;
|
||||
for i in 0..num_nodes {
|
||||
//verify leader can do transfer
|
||||
let expected = ((i + 3) * 500) as i64;
|
||||
let leader_balance = retry_send_tx_and_retry_get_balance(
|
||||
&leader_data,
|
||||
&alice_arc.read().unwrap(),
|
||||
&bob_pubkey,
|
||||
Some(expected),
|
||||
).unwrap();
|
||||
if leader_balance != expected {
|
||||
info!(
|
||||
"leader dropped transaction {} {:?} {:?}",
|
||||
i, leader_balance, expected
|
||||
);
|
||||
let mut expected_balance = leader_balance;
|
||||
for i in 0..std::cmp::max(20, num_nodes) {
|
||||
trace!("getting leader last_id");
|
||||
let last_id = client.get_last_id();
|
||||
trace!("executing leader transfer");
|
||||
let sig = client
|
||||
.transfer(
|
||||
500,
|
||||
&alice_arc.read().unwrap().keypair(),
|
||||
bob_pubkey,
|
||||
&last_id,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
expected_balance += 500;
|
||||
|
||||
assert!(client.poll_for_signature(&sig).is_ok());
|
||||
|
||||
let now = Instant::now();
|
||||
let mut finality = client.get_finality();
|
||||
|
||||
// Need this to make sure the finality is updated
|
||||
// (i.e. the node is not returning stale value)
|
||||
while last_finality == finality {
|
||||
finality = client.get_finality();
|
||||
}
|
||||
//verify all validators have the same balance
|
||||
{
|
||||
let mut success = 0usize;
|
||||
let mut max_distance = 0i64;
|
||||
let mut total_distance = 0i64;
|
||||
|
||||
while duration_as_ms(&now.elapsed()) < finality as u64 {
|
||||
sleep(Duration::from_millis(100));
|
||||
finality = client.get_finality()
|
||||
}
|
||||
|
||||
last_finality = finality;
|
||||
|
||||
let balance = retry_get_balance(&mut client, &bob_pubkey, Some(expected_balance));
|
||||
assert_eq!(balance, Some(expected_balance));
|
||||
consecutive_success += 1;
|
||||
|
||||
info!(
|
||||
"SUCCESS[{}] balance: {}, finality: {} ms",
|
||||
i, expected_balance, last_finality,
|
||||
);
|
||||
|
||||
if consecutive_success == 10 {
|
||||
info!("Took {} s to converge", duration_as_s(&start.elapsed()),);
|
||||
info!("Verifying signature of the last transaction in the validators");
|
||||
|
||||
let mut num_nodes_behind = 0i64;
|
||||
validators.retain(|server| {
|
||||
let mut retain_me = true;
|
||||
let mut client = mk_client(&server.0);
|
||||
trace!("{:x} {} get_balance start", server.0.debug_id(), i);
|
||||
let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance));
|
||||
trace!(
|
||||
"{:x} {} get_balance: {:?} leader_balance: {}",
|
||||
server.0.debug_id(),
|
||||
i,
|
||||
getbal,
|
||||
leader_balance
|
||||
);
|
||||
let bal = getbal.unwrap_or(0);
|
||||
let distance = (leader_balance - bal) / 500;
|
||||
max_distance = max(distance, max_distance);
|
||||
total_distance += distance;
|
||||
if distance > max_distance_increase {
|
||||
info!("Node {:x} is behind by {}", server.0.debug_id(), distance);
|
||||
max_distance_increase = distance;
|
||||
if max_distance_increase as u64 > purge_lag as u64 {
|
||||
server.1.exit();
|
||||
info!("Node {:x} is exiting", server.0.debug_id());
|
||||
retain_me = false;
|
||||
}
|
||||
}
|
||||
if distance > 0 {
|
||||
num_nodes_behind += 1;
|
||||
}
|
||||
if let Some(bal) = getbal {
|
||||
if bal == leader_balance {
|
||||
success += 1;
|
||||
}
|
||||
}
|
||||
retain_me
|
||||
trace!("{:x} polling for signature", server.0.debug_id());
|
||||
num_nodes_behind += match client.poll_for_signature(&sig) {
|
||||
Ok(_) => 0,
|
||||
Err(_) => 1,
|
||||
};
|
||||
true
|
||||
});
|
||||
if num_nodes_behind != 0 {
|
||||
info!("{} nodes are lagging behind leader", num_nodes_behind);
|
||||
}
|
||||
|
||||
info!(
|
||||
"SUCCESS[{}] {} out of {} distance: {} max_distance: {} finality: {}",
|
||||
i,
|
||||
success,
|
||||
"Validators lagging: {}/{}",
|
||||
num_nodes_behind,
|
||||
validators.len(),
|
||||
total_distance,
|
||||
max_distance,
|
||||
get_finality(&leader_data)
|
||||
);
|
||||
if success == validators.len() && total_distance == 0 {
|
||||
consecutive_success += 1;
|
||||
} else {
|
||||
consecutive_success = 0;
|
||||
failures += 1;
|
||||
}
|
||||
if consecutive_success == 10 {
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
info!(
|
||||
"Took {} s to converge total failures: {}",
|
||||
duration_as_s(&now.elapsed()),
|
||||
failures
|
||||
);
|
||||
|
||||
assert_eq!(consecutive_success, 10);
|
||||
for (_, node) in &validators {
|
||||
node.exit();
|
||||
@ -821,9 +796,3 @@ fn retry_send_tx_and_retry_get_balance(
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn get_finality(leader: &NodeInfo) -> usize {
|
||||
let mut client = mk_client(leader);
|
||||
trace!("getting leader finality");
|
||||
client.get_finality()
|
||||
}
|
||||
|
Reference in New Issue
Block a user