Compare commits

...

41 Commits

Author SHA1 Message Date
fd338c3097 Run release binary for leader node 2018-06-01 17:10:48 -06:00
b66ebf5dec Version bump 2018-06-01 17:10:37 -06:00
5da99de579 Review feedback 2018-06-01 13:43:38 -06:00
3aa2907bd6 Restore shellcheck 2018-06-01 13:43:38 -06:00
05d1618659 Add more detail to testnet setup 2018-06-01 13:43:38 -06:00
86113811f2 Readme/demo cleanup 2018-06-01 13:43:38 -06:00
53ecaa03f1 Need another beta 2018-05-31 19:08:09 -06:00
205c1aa505 Version bump 2018-05-31 18:49:41 -06:00
9b54c1542b Move defaults from bash to Rust 2018-05-31 17:18:11 -07:00
93d5d1b2ad Default to 1 node 2018-05-31 17:18:11 -07:00
4c0f3ed6f3 Attempt to revive the singlenode demo 2018-05-31 17:18:11 -07:00
2580155bf2 Enable last of the ignored tests 2018-05-31 16:45:21 -06:00
6ab0dd4df9 Remove config options from fullnode 2018-05-31 16:15:02 -06:00
4b8c36b6b9 Add solana-fullnode-config 2018-05-31 16:15:02 -06:00
359a8397c0 Make bootstrapping functions accessible to other binaries 2018-05-31 16:15:02 -06:00
c9fd5d74b5 Boot futures 0.1
We added them thinking it'd be a good stepping stone towards an
asynchronous thin client, but it's used inconsistently and where
it used, the function is still synchronous, which is just confusing.
2018-05-31 14:13:09 -06:00
391744af97 Speed up the creation of the million accounts
All threads were locked on the same set of signatures.
2018-05-31 12:13:18 -06:00
587ab29e09 Don't register entry ID until after processing its transactions 2018-05-31 12:13:18 -06:00
80f07dadc5 Generalize process_entries()
And use it in fullnode
2018-05-31 12:13:18 -06:00
60609a44ba Initialize recorder from bank's last_id 2018-05-31 12:13:18 -06:00
30c8fa46b4 rustc version bump 2018-05-30 20:49:55 -06:00
7aab7d2f82 Sleep between events if PoH is disabled 2018-05-30 15:55:10 -06:00
a8e1c44663 names 2018-05-30 14:50:53 -06:00
a2b92c35e1 thread names 2018-05-30 14:50:53 -06:00
9f2086c772 names 2018-05-30 14:50:53 -06:00
3eb005d492 names for threds 2018-05-30 14:50:53 -06:00
68955bfcf4 Change multinode script argument to leader path
Some may have cloned their code in different place
2018-05-30 14:49:42 -06:00
9ac7070e08 fix ci 2018-05-30 14:04:48 -06:00
e44e81bd17 fmt 2018-05-30 14:04:48 -06:00
f5eedd2d19 fmt 2018-05-30 14:04:48 -06:00
46059a37eb skip shell check 2018-05-30 14:04:48 -06:00
adc655a3a2 scripts 2018-05-30 14:04:48 -06:00
3058f80489 log 2018-05-30 14:04:48 -06:00
df98cae4b6 cleanup 2018-05-30 14:04:48 -06:00
d327e0aabd warn on tx verify sig 2018-05-30 14:04:48 -06:00
17d3a6763c update 2018-05-30 14:04:48 -06:00
02c5b0343b fixed cloned 2018-05-30 14:04:48 -06:00
2888e45fea comments 2018-05-30 14:04:48 -06:00
f1311075d9 integration tests 2018-05-30 14:04:48 -06:00
6c380e04a3 fix 2018-05-30 14:04:48 -06:00
cef1c208a5 Crdt pipeline, coalesce window repair requests in the listener by examining all of them at once, and ublock those threads from doing io. 2018-05-30 14:04:48 -06:00
27 changed files with 1129 additions and 859 deletions

View File

@ -1,7 +1,7 @@
[package]
name = "solana"
description = "The World's Fastest Blockchain"
version = "0.6.0-beta"
description = "Blockchain Rebuilt for Scale"
version = "0.6.0"
documentation = "https://docs.rs/solana"
homepage = "http://solana.com/"
repository = "https://github.com/solana-labs/solana"
@ -20,6 +20,10 @@ path = "src/bin/client-demo.rs"
name = "solana-fullnode"
path = "src/bin/fullnode.rs"
[[bin]]
name = "solana-fullnode-config"
path = "src/bin/fullnode-config.rs"
[[bin]]
name = "solana-genesis"
path = "src/bin/genesis.rs"
@ -63,6 +67,5 @@ byteorder = "^1.2.1"
libc = "^0.2.1"
getopts = "^0.2"
isatty = "0.1"
futures = "0.1"
rand = "0.4.2"
pnet = "^0.21.0"

121
README.md
View File

@ -8,20 +8,35 @@ Disclaimer
All claims, content, designs, algorithms, estimates, roadmaps, specifications, and performance measurements described in this project are done with the author's best effort. It is up to the reader to check and validate their accuracy and truthfulness. Furthermore nothing in this project constitutes a solicitation for investment.
Solana: High Performance Blockchain
Solana: Blockchain Rebuilt for Scale
===
Solana™ is a new architecture for a high performance blockchain. It aims to support
over 700 thousand transactions per second on a gigabit network.
Solana™ is a new blockchain architecture built from the ground up for scale. The architecture supports
up to 710 thousand transactions per second on a gigabit network.
Introduction
===
It's possible for a centralized database to process 710,000 transactions per second on a standard gigabit network if the transactions are, on average, no more than 178 bytes. A centralized database can also replicate itself and maintain high availability without significantly compromising that transaction rate using the distributed system technique known as Optimistic Concurrency Control [H.T.Kung, J.T.Robinson (1981)]. At Solana, we're demonstrating that these same theoretical limits apply just as well to blockchain on an adversarial network. The key ingredient? Finding a way to share time when nodes can't trust one-another. Once nodes can trust time, suddenly ~40 years of distributed systems research becomes applicable to blockchain! Furthermore, and much to our surprise, it can implemented using a mechanism that has existed in Bitcoin since day one. The Bitcoin feature is called nLocktime and it can be used to postdate transactions using block height instead of a timestamp. As a Bitcoin client, you'd use block height instead of a timestamp if you don't trust the network. Block height turns out to be an instance of what's being called a Verifiable Delay Function in cryptography circles. It's a cryptographically secure way to say time has passed. In Solana, we use a far more granular verifiable delay function, a SHA 256 hash chain, to checkpoint the ledger and coordinate consensus. With it, we implement Optimistic Concurrency Control and are now well in route towards that theoretical limit of 710,000 transactions per second.
It's possible for a centralized database to process 710,000 transactions per second on a standard gigabit network if the transactions are, on average, no more than 176 bytes. A centralized database can also replicate itself and maintain high availability without significantly compromising that transaction rate using the distributed system technique known as Optimistic Concurrency Control [H.T.Kung, J.T.Robinson (1981)]. At Solana, we're demonstrating that these same theoretical limits apply just as well to blockchain on an adversarial network. The key ingredient? Finding a way to share time when nodes can't trust one-another. Once nodes can trust time, suddenly ~40 years of distributed systems research becomes applicable to blockchain! Furthermore, and much to our surprise, it can implemented using a mechanism that has existed in Bitcoin since day one. The Bitcoin feature is called nLocktime and it can be used to postdate transactions using block height instead of a timestamp. As a Bitcoin client, you'd use block height instead of a timestamp if you don't trust the network. Block height turns out to be an instance of what's being called a Verifiable Delay Function in cryptography circles. It's a cryptographically secure way to say time has passed. In Solana, we use a far more granular verifiable delay function, a SHA 256 hash chain, to checkpoint the ledger and coordinate consensus. With it, we implement Optimistic Concurrency Control and are now well in route towards that theoretical limit of 710,000 transactions per second.
Running the demo
Testnet Demos
===
The Solana repo contains all the scripts you might need to spin up your own
local testnet. Depending on what you're looking to achieve, you may want to
run a different variation, as the full-fledged, performance-enhanced
multinode testnet is considerably more complex to set up than a Rust-only,
singlenode testnode. If you are looking to develop high-level features, such
as experimenting with smart contracts, save yourself some setup headaches and
stick to the Rust-only singlenode demo. If you're doing performance optimization
of the transaction pipeline, consider the enhanced singlenode demo. If you're
doing consensus work, you'll need at least a Rust-only multinode demo. If you want
to reproduce our TPS metrics, run the enhanced multinode demo.
For all four variations, you'd need the latest Rust toolchain and the Solana
source code:
First, install Rust's package manager Cargo.
```bash
@ -36,6 +51,19 @@ $ git clone https://github.com/solana-labs/solana.git
$ cd solana
```
The demo code is sometimes broken between releases as we add new low-level
features, so if this is your first time running the demo, you'll improve
your odds of success if you check out the
[latest release](https://github.com/solana-labs/solana/releases)
before proceeding:
```bash
$ git checkout v0.6.0
```
Singlenode Testnet
---
The fullnode server is initialized with a ledger from stdin and
generates new ledger entries on stdout. To create the input ledger, we'll need
to create *the mint* and use it to generate a *genesis ledger*. It's done in
@ -47,51 +75,87 @@ $ echo 1000000000 | cargo run --release --bin solana-mint-demo > mint-demo.json
$ cat mint-demo.json | cargo run --release --bin solana-genesis-demo > genesis.log
```
Before you start the server, make sure you know the IP address of the machine ou want to be the leader for the demo, and make sure that udp ports 8000-10000 are open on all the machines you wan to test with. Now you can start the server:
Before you start a fullnode, make sure you know the IP address of the machine you
want to be the leader for the demo, and make sure that udp ports 8000-10000 are
open on all the machines you want to test with.
Generate a leader configuration file with:
```bash
cargo run --release --bin solana-fullnode-config > leader.json
```
Now start the server:
```bash
$ cat ./multinode-demo/leader.sh
#!/bin/bash
export RUST_LOG=solana=info
sudo sysctl -w net.core.rmem_max=26214400
cat genesis.log leader.log | cargo run --release --features cuda --bin solana-fullnode -- -s leader.json -l leader.json -b 8000 -d 2>&1 | tee leader-tee.log
$ ./multinode-demo/leader.sh
cargo run --release --bin solana-fullnode -- -l leader.json < genesis.log
$ ./multinode-demo/leader.sh > leader-txs.log
```
Wait a few seconds for the server to initialize. It will print "Ready." when it's safe
to start sending it transactions.
To run a performance-enhanced fullnode on Linux, download `libcuda_verify_ed25519.a`. Enable
it by adding `--features=cuda` to the line that runs `solana-fullnode` in `leader.sh`.
Now you can start some validators:
```bash
$ wget https://solana-build-artifacts.s3.amazonaws.com/v0.6.0/libcuda_verify_ed25519.a
cargo run --release --features=cuda --bin solana-fullnode -- -l leader.json < genesis.log
```
Wait a few seconds for the server to initialize. It will print "Ready." when it's ready to
receive transactions.
Multinode Testnet
---
To run a multinode testnet, after starting a leader node, spin up some validator nodes:
```bash
$ cat ./multinode-demo/validator.sh
#!/bin/bash
rsync -v -e ssh $1:~/solana/mint-demo.json .
rsync -v -e ssh $1:~/solana/leader.json .
rsync -v -e ssh $1:~/solana/genesis.log .
rsync -v -e ssh $1:~/solana/leader.log .
rsync -v -e ssh $1:~/solana/libcuda_verify_ed25519.a .
rsync -v -e ssh $1/mint-demo.json .
rsync -v -e ssh $1/leader.json .
rsync -v -e ssh $1/genesis.log .
export RUST_LOG=solana=info
sudo sysctl -w net.core.rmem_max=26214400
cat genesis.log leader.log | cargo run --release --features cuda --bin solana-fullnode -- -l validator.json -s validator.json -v leader.json -b 9000 -d 2>&1 | tee validator-tee.log
$ ./multinode-demo/validator.sh ubuntu@10.0.1.51 #The leader machine
cargo run --release --bin solana-fullnode -- -l validator.json -v leader.json -b 9000 -d < genesis.log
$ ./multinode-demo/validator.sh ubuntu@10.0.1.51:~/solana > validator-txs.log #The leader machine
```
As with the leader node, you can run a performance-enhanced validator fullnode by adding
`--features=cuda` to the line that runs `solana-fullnode` in `validator.sh`.
```bash
cargo run --release --features=cuda --bin solana-fullnode -- -l validator.json -v leader.json -b 9000 -d < genesis.log
```
Then, in a separate shell, let's execute some transactions. Note we pass in
Testnet Client Demo
---
Now that your singlenode or multinode testnet is up and running, in a separate shell, let's send it some transactions! Note we pass in
the JSON configuration file here, not the genesis ledger.
```bash
$ cat ./multinode-demo/client.sh
#!/bin/bash
export RUST_LOG=solana=info
rsync -v -e ssh $1:~/solana/leader.json .
rsync -v -e ssh $1:~/solana/mint-demo.json .
cat mint-demo.json | cargo run --release --bin solana-client-demo -- -l leader.json -c 8100 -n 1
$ ./multinode-demo/client.sh ubuntu@10.0.1.51 #The leader machine
rsync -v -e ssh $1/leader.json .
rsync -v -e ssh $1/mint-demo.json .
cat mint-demo.json | cargo run --release --bin solana-client-demo -- -l leader.json
$ ./multinode-demo/client.sh ubuntu@10.0.1.51:~/solana #The leader machine
```
Try starting a more validators and reruning the client demo!
What just happened? The client demo spins up several threads to send 500,000 transactions
to the testnet as quickly as it can. The client then pings the testnet periodically to see
how many transactions it processed in that time. Take note that the demo intentionally
floods the network with UDP packets, such that the network will almost certainly drop a
bunch of them. This ensures the testnet has an opportunity to reach 710k TPS. The client
demo completes after it has convinced itself the testnet won't process any additional
transactions. You should see several TPS measurements printed to the screen. In the
multinode variation, you'll see TPS measurements for each validator node as well.
Developing
===
@ -107,7 +171,7 @@ $ source $HOME/.cargo/env
$ rustup component add rustfmt-preview
```
If your rustc version is lower than 1.25.0, please update it:
If your rustc version is lower than 1.26.1, please update it:
```bash
$ rustup update
@ -161,13 +225,6 @@ Run the benchmarks:
$ cargo +nightly bench --features="unstable"
```
To run the benchmarks on Linux with GPU optimizations enabled:
```bash
$ wget https://solana-build-artifacts.s3.amazonaws.com/v0.5.0/libcuda_verify_ed25519.a
$ cargo +nightly bench --features="unstable,cuda"
```
Code coverage
---

View File

@ -1,7 +1,7 @@
#!/bin/bash -e
if [[ -z "$1" ]]; then
echo "usage: $0 [leader machine]"
echo "usage: $0 [network path to solana repo on leader machine]"
exit 1
fi
@ -9,8 +9,8 @@ LEADER="$1"
set -x
export RUST_LOG=solana=info
rsync -v -e ssh "$LEADER:~/solana/leader.json" .
rsync -v -e ssh "$LEADER:~/solana/mint-demo.json" .
rsync -v -e ssh "$LEADER/leader.json" .
rsync -v -e ssh "$LEADER/mint-demo.json" .
cargo run --release --bin solana-client-demo -- \
-l leader.json -c 8100 -n 1 < mint-demo.json
-l leader.json < mint-demo.json 2>&1 | tee client.log

View File

@ -1,4 +1,4 @@
#!/bin/bash
export RUST_LOG=solana=info
sudo sysctl -w net.core.rmem_max=26214400
cat genesis.log leader.log | cargo run --release --features cuda --bin solana-fullnode -- -s leader.json -l leader.json -b 8000 -d 2>&1 | tee leader-tee.log
cargo run --release --bin solana-fullnode -- -l leader.json < genesis.log

View File

@ -1,7 +1,7 @@
#!/bin/bash -e
if [[ -z "$1" ]]; then
echo "usage: $0 [leader machine]"
echo "usage: $0 [network path to solana repo on leader machine]"
exit 1
fi
@ -9,16 +9,13 @@ LEADER="$1"
set -x
rsync -v -e ssh "$LEADER:~/solana/mint-demo.json" .
rsync -v -e ssh "$LEADER:~/solana/leader.json" .
rsync -v -e ssh "$LEADER:~/solana/genesis.log" .
rsync -v -e ssh "$LEADER:~/solana/leader.log" .
rsync -v -e ssh "$LEADER:~/solana/libcuda_verify_ed25519.a" .
rsync -v -e ssh "$LEADER/mint-demo.json" .
rsync -v -e ssh "$LEADER/leader.json" .
rsync -v -e ssh "$LEADER/genesis.log" .
export RUST_LOG=solana=info
sudo sysctl -w net.core.rmem_max=26214400
cat genesis.log leader.log | \
cargo run --release --features cuda --bin solana-fullnode -- \
-l validator.json -s validator.json -v leader.json -b 9000 -d 2>&1 | tee validator-tee.log
cargo run --release --features=cuda --bin solana-fullnode -- \
-l validator.json -v leader.json -b 9000 -d < genesis.log

View File

@ -260,12 +260,15 @@ impl Bank {
.collect()
}
pub fn process_entries(&self, entries: Vec<Entry>) -> Result<()> {
pub fn process_entries<I>(&self, entries: I) -> Result<()>
where
I: IntoIterator<Item = Entry>,
{
for entry in entries {
self.register_entry_id(&entry.id);
for result in self.process_transactions(entry.transactions) {
result?;
}
self.register_entry_id(&entry.id);
}
Ok(())
}

View File

@ -11,7 +11,7 @@ use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::thread::{spawn, JoinHandle};
use std::thread::{Builder, JoinHandle};
use std::time::Duration;
use std::time::Instant;
use timing;
@ -30,19 +30,22 @@ impl BankingStage {
packet_recycler: packet::PacketRecycler,
) -> Self {
let (signal_sender, signal_receiver) = channel();
let thread_hdl = spawn(move || loop {
let e = Self::process_packets(
bank.clone(),
&verified_receiver,
&signal_sender,
&packet_recycler,
);
if e.is_err() {
if exit.load(Ordering::Relaxed) {
break;
let thread_hdl = Builder::new()
.name("solana-banking-stage".to_string())
.spawn(move || loop {
let e = Self::process_packets(
bank.clone(),
&verified_receiver,
&signal_sender,
&packet_recycler,
);
if e.is_err() {
if exit.load(Ordering::Relaxed) {
break;
}
}
}
});
})
.unwrap();
BankingStage {
thread_hdl,
signal_receiver,

View File

@ -1,4 +1,3 @@
extern crate futures;
extern crate getopts;
extern crate isatty;
extern crate pnet;
@ -6,12 +5,12 @@ extern crate rayon;
extern crate serde_json;
extern crate solana;
use futures::Future;
use getopts::Options;
use isatty::stdin_isatty;
use pnet::datalink;
use rayon::prelude::*;
use solana::crdt::{Crdt, ReplicatedData};
use solana::data_replicator::DataReplicator;
use solana::mint::MintDemo;
use solana::signature::{GenKeys, KeyPair, KeyPairUtil};
use solana::streamer::default_window;
@ -20,7 +19,7 @@ use solana::transaction::Transaction;
use std::env;
use std::fs::File;
use std::io::{stdin, Read};
use std::net::{IpAddr, SocketAddr, UdpSocket};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::process::exit;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
@ -51,13 +50,13 @@ fn get_ip_addr() -> Option<IpAddr> {
fn main() {
let mut threads = 4usize;
let mut num_nodes = 10usize;
let mut leader = "leader.json".to_string();
let mut num_nodes = 1usize;
let mut opts = Options::new();
opts.optopt("l", "", "leader", "leader.json");
opts.optopt("c", "", "client port", "port");
opts.optopt("t", "", "number of threads", &format!("{}", threads));
opts.optflag("d", "dyn", "detect network address dynamically");
opts.optopt(
"n",
"",
@ -79,15 +78,14 @@ fn main() {
print_usage(&program, opts);
return;
}
if matches.opt_present("l") {
leader = matches.opt_str("l").unwrap();
}
let mut addr: SocketAddr = "127.0.0.1:8010".parse().unwrap();
let mut addr: SocketAddr = "0.0.0.0:8100".parse().unwrap();
if matches.opt_present("c") {
let port = matches.opt_str("c").unwrap().parse().unwrap();
addr.set_port(port);
}
addr.set_ip(get_ip_addr().unwrap());
if matches.opt_present("d") {
addr.set_ip(get_ip_addr().unwrap());
}
let client_addr: Arc<RwLock<SocketAddr>> = Arc::new(RwLock::new(addr));
if matches.opt_present("t") {
threads = matches.opt_str("t").unwrap().parse().expect("integer");
@ -96,7 +94,13 @@ fn main() {
num_nodes = matches.opt_str("n").unwrap().parse().expect("integer");
}
let leader: ReplicatedData = read_leader(leader);
let leader = if matches.opt_present("l") {
read_leader(matches.opt_str("l").unwrap())
} else {
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
ReplicatedData::new_leader(&server_addr)
};
let signal = Arc::new(AtomicBool::new(false));
let mut c_threads = vec![];
let validators = converge(
@ -127,7 +131,7 @@ fn main() {
let mut client = mk_client(&client_addr, &leader);
println!("Get last ID...");
let last_id = client.get_last_id().wait().unwrap();
let last_id = client.get_last_id();
println!("Got last ID {:?}", last_id);
let rnd = GenKeys::new(demo.mint.keypair().public_key_bytes());
@ -160,7 +164,11 @@ fn main() {
let sz = transactions.len() / threads;
let chunks: Vec<_> = transactions.chunks(sz).collect();
chunks.into_par_iter().for_each(|txs| {
println!("Transferring 1 unit {} times... to", txs.len());
println!(
"Transferring 1 unit {} times... to {:?}",
txs.len(),
leader.transactions_addr
);
let client = mk_client(&client_addr, &leader);
for tx in txs {
client.transfer_signed(tx.clone()).unwrap();
@ -245,9 +253,15 @@ fn converge(
spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = default_window();
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone());
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
let window = default_window();
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let data_replicator = DataReplicator::new(
spy_ref.clone(),
window.clone(),
spy_gossip,
gossip_send_socket,
exit.clone(),
).expect("DataReplicator::new");
//wait for the network to converge
for _ in 0..30 {
let min = spy_ref.read().unwrap().convergence();
@ -257,8 +271,7 @@ fn converge(
}
sleep(Duration::new(1, 0));
}
threads.push(t_spy_listen);
threads.push(t_spy_gossip);
threads.extend(data_replicator.thread_hdls.into_iter());
let v: Vec<ReplicatedData> = spy_ref
.read()
.unwrap()

View File

@ -0,0 +1,52 @@
extern crate getopts;
extern crate serde_json;
extern crate solana;
use getopts::Options;
use solana::crdt::{get_ip_addr, parse_port_or_addr, ReplicatedData};
use std::env;
use std::io;
use std::net::SocketAddr;
use std::process::exit;
fn print_usage(program: &str, opts: Options) {
let mut brief = format!("Usage: {} [options]\n\n", program);
brief += " Create a solana fullnode config file\n";
print!("{}", opts.usage(&brief));
}
fn main() {
let mut opts = Options::new();
opts.optopt("b", "", "bind", "bind to port or address");
opts.optflag("d", "dyn", "detect network address dynamically");
opts.optflag("h", "help", "print help");
let args: Vec<String> = env::args().collect();
let matches = match opts.parse(&args[1..]) {
Ok(m) => m,
Err(e) => {
eprintln!("{}", e);
exit(1);
}
};
if matches.opt_present("h") {
let program = args[0].clone();
print_usage(&program, opts);
return;
}
let bind_addr: SocketAddr = {
let mut bind_addr = parse_port_or_addr(matches.opt_str("b"));
if matches.opt_present("d") {
let ip = get_ip_addr().unwrap();
bind_addr.set_ip(ip);
}
bind_addr
};
// we need all the receiving sockets to be bound within the expected
// port range that we open on aws
let repl_data = ReplicatedData::new_leader(&bind_addr);
let stdout = io::stdout();
serde_json::to_writer(stdout, &repl_data).expect("serialize");
}

View File

@ -1,24 +1,23 @@
extern crate env_logger;
extern crate getopts;
extern crate isatty;
extern crate pnet;
extern crate serde_json;
extern crate solana;
#[macro_use]
extern crate log;
use getopts::Options;
use isatty::stdin_isatty;
use pnet::datalink;
use solana::bank::Bank;
use solana::crdt::ReplicatedData;
use solana::entry::Entry;
use solana::payment_plan::PaymentPlan;
use solana::server::Server;
use solana::signature::{KeyPair, KeyPairUtil};
use solana::transaction::Instruction;
use std::env;
use std::fs::File;
use std::io::{stdin, Read};
use std::net::{IpAddr, SocketAddr, UdpSocket};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::process::exit;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
@ -36,9 +35,6 @@ fn print_usage(program: &str, opts: Options) {
fn main() {
env_logger::init().unwrap();
let mut opts = Options::new();
opts.optopt("b", "", "bind", "bind to port or address");
opts.optflag("d", "dyn", "detect network address dynamically");
opts.optopt("s", "", "save", "save my identity to path.json");
opts.optopt("l", "", "load", "load my identity to path.json");
opts.optflag("h", "help", "print help");
opts.optopt(
@ -60,14 +56,6 @@ fn main() {
print_usage(&program, opts);
return;
}
let bind_addr: SocketAddr = {
let mut bind_addr = parse_port_or_addr(matches.opt_str("b"));
if matches.opt_present("d") {
let ip = get_ip_addr().unwrap();
bind_addr.set_ip(ip);
}
bind_addr
};
if stdin_isatty() {
eprintln!("nothing found on stdin, expected a log file");
exit(1);
@ -112,30 +100,21 @@ fn main() {
bank.register_entry_id(&entry1.id);
eprintln!("processing entries...");
let mut last_id = entry1.id;
for entry in entries {
last_id = entry.id;
let results = bank.process_transactions(entry.transactions);
for result in results {
if let Err(e) = result {
eprintln!("failed to process transaction {:?}", e);
exit(1);
}
}
bank.register_entry_id(&last_id);
}
bank.process_entries(entries).expect("process_entries");
eprintln!("creating networking stack...");
let exit = Arc::new(AtomicBool::new(false));
// we need all the receiving sockets to be bound within the expected
// port range that we open on aws
let mut repl_data = make_repl_data(&bind_addr);
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
let mut repl_data = ReplicatedData::new_leader(&bind_addr);
if matches.opt_present("l") {
let path = matches.opt_str("l").unwrap();
if let Ok(file) = File::open(path) {
repl_data = serde_json::from_reader(file).expect("parse");
if let Ok(file) = File::open(path.clone()) {
if let Ok(data) = serde_json::from_reader(file) {
repl_data = data;
} else {
warn!("failed to parse leader {}, generating new identity", path);
}
}
}
let threads = if matches.opt_present("v") {
@ -160,7 +139,6 @@ fn main() {
let file = File::create("leader.log").expect("leader.log create");
let server = Server::new_leader(
bank,
last_id,
//Some(Duration::from_millis(1000)),
None,
repl_data.clone(),
@ -174,73 +152,9 @@ fn main() {
);
server.thread_hdls
};
if matches.opt_present("s") {
let path = matches.opt_str("s").unwrap();
let file = File::create(path).expect("file");
serde_json::to_writer(file, &repl_data).expect("serialize");
}
eprintln!("Ready. Listening on {}", repl_data.transactions_addr);
for t in threads {
t.join().expect("join");
}
}
fn next_port(server_addr: &SocketAddr, nxt: u16) -> SocketAddr {
let mut gossip_addr = server_addr.clone();
gossip_addr.set_port(server_addr.port() + nxt);
gossip_addr
}
fn make_repl_data(bind_addr: &SocketAddr) -> ReplicatedData {
let transactions_addr = bind_addr.clone();
let gossip_addr = next_port(&bind_addr, 1);
let replicate_addr = next_port(&bind_addr, 2);
let requests_addr = next_port(&bind_addr, 3);
let pubkey = KeyPair::new().pubkey();
ReplicatedData::new(
pubkey,
gossip_addr,
replicate_addr,
requests_addr,
transactions_addr,
)
}
fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr {
let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address");
if let Some(addrstr) = optstr {
if let Ok(port) = addrstr.parse() {
let mut addr = daddr.clone();
addr.set_port(port);
addr
} else if let Ok(addr) = addrstr.parse() {
addr
} else {
daddr
}
} else {
daddr
}
}
fn get_ip_addr() -> Option<IpAddr> {
for iface in datalink::interfaces() {
for p in iface.ips {
if !p.ip().is_loopback() && !p.ip().is_multicast() {
return Some(p.ip());
}
}
}
None
}
#[test]
fn test_parse_port_or_addr() {
let p1 = parse_port_or_addr(Some("9000".to_string()));
assert_eq!(p1.port(), 9000);
let p2 = parse_port_or_addr(Some("127.0.0.1:7000".to_string()));
assert_eq!(p2.port(), 7000);
let p3 = parse_port_or_addr(None);
assert_eq!(p3.port(), 8000);
}

View File

@ -41,33 +41,37 @@ fn main() {
let mint_keypair = demo.mint.keypair();
let last_id = demo.mint.last_id();
eprintln!("Signing {} transactions...", num_accounts);
let transactions: Vec<_> = keypairs
.into_par_iter()
.map(|rando| {
let last_id = demo.mint.last_id();
Transaction::new(&mint_keypair, rando.pubkey(), tokens_per_user, last_id)
})
.collect();
for entry in demo.mint.create_entries() {
println!("{}", serde_json::to_string(&entry).unwrap());
}
eprintln!("Logging the creation of {} accounts...", num_accounts);
let entry = Entry::new(&last_id, 0, transactions);
println!("{}", serde_json::to_string(&entry).unwrap());
eprintln!("Creating {} empty entries...", MAX_ENTRY_IDS);
// Offer client lots of entry IDs to use for each transaction's last_id.
let mut last_id = last_id;
let mut last_ids = vec![];
for _ in 0..MAX_ENTRY_IDS {
let entry = next_entry(&last_id, 1, vec![]);
last_id = entry.id;
last_ids.push(last_id);
let serialized = serde_json::to_string(&entry).unwrap_or_else(|e| {
eprintln!("failed to serialize: {}", e);
exit(1);
});
println!("{}", serialized);
}
eprintln!("Creating {} transactions...", num_accounts);
let transactions: Vec<_> = keypairs
.into_par_iter()
.enumerate()
.map(|(i, rando)| {
let last_id = last_ids[i % MAX_ENTRY_IDS];
Transaction::new(&mint_keypair, rando.pubkey(), tokens_per_user, last_id)
})
.collect();
eprintln!("Logging the creation of {} accounts...", num_accounts);
let entry = Entry::new(&last_id, 0, transactions);
println!("{}", serde_json::to_string(&entry).unwrap());
}

View File

@ -16,27 +16,59 @@
use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt};
use hash::Hash;
use packet::{SharedBlob, BLOB_SIZE};
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
use pnet::datalink;
use rayon::prelude::*;
use result::{Error, Result};
use ring::rand::{SecureRandom, SystemRandom};
use signature::{KeyPair, KeyPairUtil};
use signature::{PublicKey, Signature};
use std;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::io::Cursor;
use std::net::{SocketAddr, UdpSocket};
use std::net::{IpAddr, SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{sleep, spawn, JoinHandle};
use std::thread::{sleep, Builder, JoinHandle};
use std::time::Duration;
use streamer::{BlobReceiver, BlobSender};
pub fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr {
let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address");
if let Some(addrstr) = optstr {
if let Ok(port) = addrstr.parse() {
let mut addr = daddr.clone();
addr.set_port(port);
addr
} else if let Ok(addr) = addrstr.parse() {
addr
} else {
daddr
}
} else {
daddr
}
}
pub fn get_ip_addr() -> Option<IpAddr> {
for iface in datalink::interfaces() {
for p in iface.ips {
if !p.ip().is_loopback() && !p.ip().is_multicast() {
return Some(p.ip());
}
}
}
None
}
/// Structure to be replicated by the network
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct ReplicatedData {
pub id: PublicKey,
sig: Signature,
/// should always be increasing
version: u64,
pub version: u64,
/// address to connect to for gossip
pub gossip_addr: SocketAddr,
/// address to connect to for replication
@ -74,6 +106,27 @@ impl ReplicatedData {
last_verified_count: 0,
}
}
fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr {
let mut nxt_addr = addr.clone();
nxt_addr.set_port(addr.port() + nxt);
nxt_addr
}
pub fn new_leader(bind_addr: &SocketAddr) -> Self {
let transactions_addr = bind_addr.clone();
let gossip_addr = Self::next_port(&bind_addr, 1);
let replicate_addr = Self::next_port(&bind_addr, 2);
let requests_addr = Self::next_port(&bind_addr, 3);
let pubkey = KeyPair::new().pubkey();
ReplicatedData::new(
pubkey,
gossip_addr,
replicate_addr,
requests_addr,
transactions_addr,
)
}
}
/// `Crdt` structure keeps a table of `ReplicatedData` structs
@ -149,15 +202,20 @@ impl Crdt {
if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) {
//somehow we signed a message for our own identity with a higher version that
// we have stored ourselves
trace!("me: {:?}", self.me[0]);
trace!("v.id: {:?}", v.id[0]);
trace!("insert! {}", v.version);
trace!(
"me: {:?} v.id: {:?} version: {}",
&self.me[..4],
&v.id[..4],
v.version
);
self.update_index += 1;
let _ = self.table.insert(v.id.clone(), v.clone());
let _ = self.local.insert(v.id, self.update_index);
} else {
trace!(
"INSERT FAILED new.version: {} me.version: {}",
"INSERT FAILED me: {:?} data: {:?} new.version: {} me.version: {}",
&self.me[..4],
&v.id[..4],
v.version,
self.table[&v.id].version
);
@ -352,18 +410,32 @@ impl Crdt {
fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> {
let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect();
if options.len() < 1 {
trace!("crdt too small for gossip");
trace!(
"crdt too small for gossip {:?} {}",
&self.me[..4],
self.table.len()
);
return Err(Error::CrdtTooSmall);
}
let n = (Self::random() as usize) % options.len();
let v = options[n].clone();
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
trace!(
"created gossip request from {:?} to {:?} {}",
&self.me[..4],
&v.id[..4],
v.gossip_addr
);
Ok((v.gossip_addr, req))
}
/// At random pick a node and try to get updated changes from them
fn run_gossip(obj: &Arc<RwLock<Self>>) -> Result<()> {
fn run_gossip(
obj: &Arc<RwLock<Self>>,
blob_sender: &BlobSender,
blob_recycler: &BlobRecycler,
) -> Result<()> {
//TODO we need to keep track of stakes and weight the selection by stake size
//TODO cache sockets
@ -372,12 +444,12 @@ impl Crdt {
let (remote_gossip_addr, req) = obj.read()
.expect("'obj' read lock in fn run_gossip")
.gossip_request()?;
let sock = UdpSocket::bind("0.0.0.0:0")?;
// TODO this will get chatty, so we need to first ask for number of updates since
// then only ask for specific data that we dont have
let r = serialize(&req)?;
trace!("sending gossip request to {}", remote_gossip_addr);
sock.send_to(&r, remote_gossip_addr)?;
let blob = to_blob(req, remote_gossip_addr, blob_recycler)?;
let mut q: VecDeque<SharedBlob> = VecDeque::new();
q.push_back(blob);
blob_sender.send(q)?;
Ok(())
}
@ -397,90 +469,111 @@ impl Crdt {
}
/// randomly pick a node and ask them for updates asynchronously
pub fn gossip(obj: Arc<RwLock<Self>>, exit: Arc<AtomicBool>) -> JoinHandle<()> {
spawn(move || loop {
let _ = Self::run_gossip(&obj);
if exit.load(Ordering::Relaxed) {
return;
}
//TODO this should be a tuned parameter
sleep(
obj.read()
.expect("'obj' read lock in pub fn gossip")
.timeout,
);
})
pub fn gossip(
obj: Arc<RwLock<Self>>,
blob_recycler: BlobRecycler,
blob_sender: BlobSender,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name("solana-gossip".to_string())
.spawn(move || loop {
let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler);
if exit.load(Ordering::Relaxed) {
return;
}
//TODO this should be a tuned parameter
sleep(
obj.read()
.expect("'obj' read lock in pub fn gossip")
.timeout,
);
})
.unwrap()
}
fn run_window_request(
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
sock: &UdpSocket,
from: &ReplicatedData,
ix: u64,
) -> Result<()> {
blob_recycler: &BlobRecycler,
) -> Option<SharedBlob> {
let pos = (ix as usize) % window.read().unwrap().len();
let mut outblob = vec![];
if let &Some(ref blob) = &window.read().unwrap()[pos] {
let rblob = blob.read().unwrap();
let blob_ix = rblob.get_index().expect("run_window_request get_index");
if blob_ix == ix {
let out = blob_recycler.allocate();
// copy to avoid doing IO inside the lock
outblob.extend(&rblob.data[..rblob.meta.size]);
{
let mut outblob = out.write().unwrap();
let sz = rblob.meta.size;
outblob.meta.size = sz;
outblob.data[..sz].copy_from_slice(&rblob.data[..sz]);
outblob.meta.set_addr(&from.replicate_addr);
//TODO, set the sender id to the requester so we dont retransmit
//come up with a cleaner solution for this when sender signatures are checked
outblob.set_id(from.id).expect("blob set_id");
}
return Some(out);
}
} else {
assert!(window.read().unwrap()[pos].is_none());
info!("failed RequestWindowIndex {} {}", ix, from.replicate_addr);
}
if outblob.len() > 0 {
info!(
"responding RequestWindowIndex {} {}",
ix, from.replicate_addr
);
assert!(outblob.len() < BLOB_SIZE);
sock.send_to(&outblob, from.replicate_addr)?;
}
Ok(())
None
}
/// Process messages from the network
fn run_listen(
//TODO we should first coalesce all the requests
fn handle_blob(
obj: &Arc<RwLock<Self>>,
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
sock: &UdpSocket,
) -> Result<()> {
//TODO cache connections
let mut buf = vec![0u8; BLOB_SIZE];
trace!("recv_from on {}", sock.local_addr().unwrap());
let (amt, src) = sock.recv_from(&mut buf)?;
trace!("got request from {}", src);
buf.resize(amt, 0);
let r = deserialize(&buf)?;
match r {
blob_recycler: &BlobRecycler,
blob: &Blob,
) -> Option<SharedBlob> {
match deserialize(&blob.data[..blob.meta.size]) {
// TODO sigverify these
Protocol::RequestUpdates(v, reqdata) => {
trace!("RequestUpdates {} from {}", v, src);
Ok(Protocol::RequestUpdates(v, reqdata)) => {
trace!("RequestUpdates {}", v);
let addr = reqdata.gossip_addr;
// only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from`
let (from, ups, data) = obj.read()
.expect("'obj' read lock in RequestUpdates")
.get_updates_since(v);
trace!("get updates since response {} {}", v, data.len());
let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?;
trace!("send_to {}", addr);
//TODO verify reqdata belongs to sender
obj.write()
.expect("'obj' write lock in RequestUpdates")
.insert(&reqdata);
assert!(rsp.len() < BLOB_SIZE);
sock.send_to(&rsp, addr)
.expect("'sock.send_to' in RequestUpdates");
trace!("send_to done!");
let len = data.len();
let rsp = Protocol::ReceiveUpdates(from, ups, data);
obj.write().unwrap().insert(&reqdata);
if len < 1 {
let me = obj.read().unwrap();
trace!(
"no updates me {:?} ix {} since {}",
&me.me[..4],
me.update_index,
v
);
None
} else if let Ok(r) = to_blob(rsp, addr, &blob_recycler) {
trace!(
"sending updates me {:?} len {} to {:?} {}",
&obj.read().unwrap().me[..4],
len,
&reqdata.id[..4],
addr,
);
Some(r)
} else {
warn!("to_blob failed");
None
}
}
Protocol::ReceiveUpdates(from, ups, data) => {
trace!("ReceivedUpdates {} from {}", ups, src);
Ok(Protocol::ReceiveUpdates(from, ups, data)) => {
trace!("ReceivedUpdates {:?} {} {}", &from[0..4], ups, data.len());
obj.write()
.expect("'obj' write lock in ReceiveUpdates")
.apply_updates(from, ups, &data);
None
}
Protocol::RequestWindowIndex(from, ix) => {
Ok(Protocol::RequestWindowIndex(from, ix)) => {
//TODO verify from is signed
obj.write().unwrap().insert(&from);
let me = obj.read().unwrap().my_data().clone();
@ -491,161 +584,130 @@ impl Crdt {
me.replicate_addr
);
assert_ne!(from.replicate_addr, me.replicate_addr);
let _ = Self::run_window_request(window, sock, &from, ix);
Self::run_window_request(&window, &from, ix, blob_recycler)
}
Err(_) => {
warn!("deserialize crdt packet failed");
None
}
}
}
/// Process messages from the network
fn run_listen(
obj: &Arc<RwLock<Self>>,
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
blob_recycler: &BlobRecycler,
requests_receiver: &BlobReceiver,
response_sender: &BlobSender,
) -> Result<()> {
//TODO cache connections
let timeout = Duration::new(1, 0);
let mut reqs = requests_receiver.recv_timeout(timeout)?;
while let Ok(mut more) = requests_receiver.try_recv() {
reqs.append(&mut more);
}
let resp: VecDeque<_> = reqs.iter()
.filter_map(|b| Self::handle_blob(obj, window, blob_recycler, &b.read().unwrap()))
.collect();
response_sender.send(resp)?;
while let Some(r) = reqs.pop_front() {
blob_recycler.recycle(r);
}
Ok(())
}
pub fn listen(
obj: Arc<RwLock<Self>>,
window: Arc<RwLock<Vec<Option<SharedBlob>>>>,
sock: UdpSocket,
blob_recycler: BlobRecycler,
requests_receiver: BlobReceiver,
response_sender: BlobSender,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
sock.set_read_timeout(Some(Duration::new(2, 0)))
.expect("'sock.set_read_timeout' in crdt.rs");
spawn(move || loop {
let e = Self::run_listen(&obj, &window, &sock);
if e.is_err() {
info!(
"run_listen timeout, table size: {}",
obj.read().unwrap().table.len()
Builder::new()
.name("solana-listen".to_string())
.spawn(move || loop {
let e = Self::run_listen(
&obj,
&window,
&blob_recycler,
&requests_receiver,
&response_sender,
);
}
if exit.load(Ordering::Relaxed) {
return;
}
})
if e.is_err() {
info!(
"run_listen timeout, table size: {}",
obj.read().unwrap().table.len()
);
}
if exit.load(Ordering::Relaxed) {
return;
}
})
.unwrap()
}
}
pub struct Sockets {
pub gossip: UdpSocket,
pub gossip_send: UdpSocket,
pub requests: UdpSocket,
pub replicate: UdpSocket,
pub transaction: UdpSocket,
pub respond: UdpSocket,
pub broadcast: UdpSocket,
}
pub struct TestNode {
pub data: ReplicatedData,
pub sockets: Sockets,
}
impl TestNode {
pub fn new() -> TestNode {
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap();
let requests = UdpSocket::bind("0.0.0.0:0").unwrap();
let transaction = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let data = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
requests.local_addr().unwrap(),
transaction.local_addr().unwrap(),
);
TestNode {
data: data,
sockets: Sockets {
gossip,
gossip_send,
requests,
replicate,
transaction,
respond,
broadcast,
},
}
}
}
#[cfg(test)]
mod tests {
use crdt::{Crdt, ReplicatedData};
use logger;
use packet::Blob;
use rayon::iter::*;
use signature::KeyPair;
use signature::KeyPairUtil;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{sleep, JoinHandle};
use std::time::Duration;
use crdt::{parse_port_or_addr, Crdt, ReplicatedData};
use signature::{KeyPair, KeyPairUtil};
fn test_node() -> (Crdt, UdpSocket, UdpSocket, UdpSocket) {
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
let transactions = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
transactions.local_addr().unwrap(),
);
let crdt = Crdt::new(d);
trace!(
"id: {} gossip: {} replicate: {} serve: {}",
crdt.my_data().id[0],
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
);
(crdt, gossip, replicate, serve)
}
/// Test that the network converges.
/// Run until every node in the network has a full ReplicatedData set.
/// Check that nodes stop sending updates after all the ReplicatedData has been shared.
/// tests that actually use this function are below
fn run_gossip_topo<F>(topo: F)
where
F: Fn(&Vec<(Arc<RwLock<Crdt>>, JoinHandle<()>)>) -> (),
{
let num: usize = 5;
let exit = Arc::new(AtomicBool::new(false));
let listen: Vec<_> = (0..num)
.map(|_| {
let (crdt, gossip, _, _) = test_node();
let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![]));
let l = Crdt::listen(c.clone(), w, gossip, exit.clone());
(c, l)
})
.collect();
topo(&listen);
let gossip: Vec<_> = listen
.iter()
.map(|&(ref c, _)| Crdt::gossip(c.clone(), exit.clone()))
.collect();
let mut done = true;
for i in 0..(num * 32) {
done = false;
trace!("round {}", i);
for &(ref c, _) in listen.iter() {
if num == c.read().unwrap().convergence() as usize {
done = true;
break;
}
}
//at least 1 node converged
if done == true {
break;
}
sleep(Duration::new(1, 0));
}
exit.store(true, Ordering::Relaxed);
for j in gossip {
j.join().unwrap();
}
for (c, j) in listen.into_iter() {
j.join().unwrap();
// make it clear what failed
// protocol is to chatty, updates should stop after everyone receives `num`
assert!(c.read().unwrap().update_index <= num as u64);
// protocol is not chatty enough, everyone should get `num` entries
assert_eq!(c.read().unwrap().table.len(), num);
}
assert!(done);
}
/// ring a -> b -> c -> d -> e -> a
#[test]
#[ignore]
fn gossip_ring_test() {
logger::setup();
run_gossip_topo(|listen| {
let num = listen.len();
for n in 0..num {
let y = n % listen.len();
let x = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut d = yv.table[&yv.me].clone();
d.version = 0;
xv.insert(&d);
}
});
}
/// star (b,c,d,e) -> a
#[test]
#[ignore]
fn gossip_star_test() {
run_gossip_topo(|listen| {
let num = listen.len();
for n in 0..(num - 1) {
let x = 0;
let y = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut d = yv.table[&yv.me].clone();
d.version = 0;
xv.insert(&d);
}
});
fn test_parse_port_or_addr() {
let p1 = parse_port_or_addr(Some("9000".to_string()));
assert_eq!(p1.port(), 9000);
let p2 = parse_port_or_addr(Some("127.0.0.1:7000".to_string()));
assert_eq!(p2.port(), 7000);
let p3 = parse_port_or_addr(None);
assert_eq!(p3.port(), 8000);
}
/// Test that insert drops messages that are older
@ -668,77 +730,59 @@ mod tests {
crdt.insert(&d);
assert_eq!(crdt.table[&d.id].version, 2);
}
#[test]
#[ignore]
pub fn test_crdt_retransmit() {
logger::setup();
trace!("c1:");
let (mut c1, s1, r1, e1) = test_node();
trace!("c2:");
let (mut c2, s2, r2, _) = test_node();
trace!("c3:");
let (mut c3, s3, r3, _) = test_node();
let c1_id = c1.my_data().id;
c1.set_leader(c1_id);
c2.insert(&c1.my_data());
c3.insert(&c1.my_data());
c2.set_leader(c1.my_data().id);
c3.set_leader(c1.my_data().id);
let exit = Arc::new(AtomicBool::new(false));
// Create listen threads
let win1 = Arc::new(RwLock::new(vec![]));
let a1 = Arc::new(RwLock::new(c1));
let t1 = Crdt::listen(a1.clone(), win1, s1, exit.clone());
let a2 = Arc::new(RwLock::new(c2));
let win2 = Arc::new(RwLock::new(vec![]));
let t2 = Crdt::listen(a2.clone(), win2, s2, exit.clone());
let a3 = Arc::new(RwLock::new(c3));
let win3 = Arc::new(RwLock::new(vec![]));
let t3 = Crdt::listen(a3.clone(), win3, s3, exit.clone());
// Create gossip threads
let t1_gossip = Crdt::gossip(a1.clone(), exit.clone());
let t2_gossip = Crdt::gossip(a2.clone(), exit.clone());
let t3_gossip = Crdt::gossip(a3.clone(), exit.clone());
//wait to converge
trace!("waitng to converge:");
let mut done = false;
for _ in 0..30 {
done = a1.read().unwrap().table.len() == 3 && a2.read().unwrap().table.len() == 3
&& a3.read().unwrap().table.len() == 3;
if done {
break;
}
sleep(Duration::new(1, 0));
}
assert!(done);
let mut b = Blob::default();
b.meta.size = 10;
Crdt::retransmit(&a1, &Arc::new(RwLock::new(b)), &e1).unwrap();
let res: Vec<_> = [r1, r2, r3]
.into_par_iter()
.map(|s| {
let mut b = Blob::default();
s.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let res = s.recv_from(&mut b.data);
res.is_err() //true if failed to receive the retransmit packet
})
.collect();
//true if failed receive the retransmit packet, r2, and r3 should succeed
//r1 was the sender, so it should fail to receive the packet
assert_eq!(res, [true, false, false]);
exit.store(true, Ordering::Relaxed);
let threads = vec![t1, t2, t3, t1_gossip, t2_gossip, t3_gossip];
for t in threads.into_iter() {
t.join().unwrap();
}
fn sorted(ls: &Vec<ReplicatedData>) -> Vec<ReplicatedData> {
let mut copy: Vec<_> = ls.iter().cloned().collect();
copy.sort_by(|x, y| x.id.cmp(&y.id));
copy
}
#[test]
fn update_test() {
let d1 = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
);
let d2 = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
);
let d3 = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
);
let mut crdt = Crdt::new(d1.clone());
let (key, ix, ups) = crdt.get_updates_since(0);
assert_eq!(key, d1.id);
assert_eq!(ix, 1);
assert_eq!(ups.len(), 1);
assert_eq!(sorted(&ups), sorted(&vec![d1.clone()]));
crdt.insert(&d2);
let (key, ix, ups) = crdt.get_updates_since(0);
assert_eq!(key, d1.id);
assert_eq!(ix, 2);
assert_eq!(ups.len(), 2);
assert_eq!(sorted(&ups), sorted(&vec![d1.clone(), d2.clone()]));
crdt.insert(&d3);
let (key, ix, ups) = crdt.get_updates_since(0);
assert_eq!(key, d1.id);
assert_eq!(ix, 3);
assert_eq!(ups.len(), 3);
assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3]));
let mut crdt2 = Crdt::new(d2.clone());
crdt2.apply_updates(key, ix, &ups);
assert_eq!(crdt2.table.values().len(), 3);
assert_eq!(
sorted(&crdt2.table.values().map(|x| x.clone()).collect()),
sorted(&crdt.table.values().map(|x| x.clone()).collect())
);
}
}

84
src/data_replicator.rs Normal file
View File

@ -0,0 +1,84 @@
use crdt;
use packet;
use result::Result;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;
use streamer;
pub struct DataReplicator {
pub thread_hdls: Vec<JoinHandle<()>>,
}
impl DataReplicator {
pub fn new(
crdt: Arc<RwLock<crdt::Crdt>>,
window: Arc<RwLock<Vec<Option<packet::SharedBlob>>>>,
gossip_listen_socket: UdpSocket,
gossip_send_socket: UdpSocket,
exit: Arc<AtomicBool>,
) -> Result<DataReplicator> {
let blob_recycler = packet::BlobRecycler::default();
let (request_sender, request_receiver) = channel();
trace!(
"DataReplicator: id: {:?}, listening on: {:?}",
&crdt.read().unwrap().me[..4],
gossip_listen_socket.local_addr().unwrap()
);
let t_receiver = streamer::blob_receiver(
exit.clone(),
blob_recycler.clone(),
gossip_listen_socket,
request_sender,
)?;
let (response_sender, response_receiver) = channel();
let t_responder = streamer::responder(
gossip_send_socket,
exit.clone(),
blob_recycler.clone(),
response_receiver,
);
let t_listen = crdt::Crdt::listen(
crdt.clone(),
window,
blob_recycler.clone(),
request_receiver,
response_sender.clone(),
exit.clone(),
);
let t_gossip = crdt::Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit);
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
Ok(DataReplicator { thread_hdls })
}
}
#[cfg(test)]
mod tests {
use crdt::{Crdt, TestNode};
use data_replicator::DataReplicator;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
#[test]
// test that stage will exit when flag is set
fn test_exit() {
let exit = Arc::new(AtomicBool::new(false));
let tn = TestNode::new();
let crdt = Crdt::new(tn.data.clone());
let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![]));
let d = DataReplicator::new(
c.clone(),
w,
tn.sockets.gossip,
tn.sockets.gossip_send,
exit.clone(),
).unwrap();
exit.store(true, Ordering::Relaxed);
for t in d.thread_hdls {
t.join().expect("thread join");
}
}
}

View File

@ -3,6 +3,7 @@ pub mod bank;
pub mod banking_stage;
pub mod budget;
pub mod crdt;
pub mod data_replicator;
pub mod entry;
pub mod entry_writer;
#[cfg(feature = "erasure")]
@ -45,12 +46,11 @@ extern crate ring;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate pnet;
extern crate serde_json;
extern crate sha2;
extern crate untrusted;
extern crate futures;
#[cfg(test)]
#[macro_use]
extern crate matches;

View File

@ -180,10 +180,10 @@ impl Packets {
socket.set_nonblocking(false)?;
for p in &mut self.packets {
p.meta.size = 0;
trace!("receiving");
trace!("receiving on {}", socket.local_addr().unwrap());
match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => {
debug!("got {:?} messages", i);
debug!("got {:?} messages on {}", i, socket.local_addr().unwrap());
break;
}
Err(e) => {
@ -250,6 +250,7 @@ pub fn to_blob<T: Serialize>(
// the raw bytes are being serialized and sent, this isn't the
// right interface, and we should create a separate path for
// sending request responses in the RPU
assert!(len < BLOB_SIZE);
b.data[..len].copy_from_slice(&v);
b.meta.size = len;
b.meta.set_addr(&rsp_addr);
@ -283,7 +284,8 @@ impl Blob {
self.data[..BLOB_INDEX_END].clone_from_slice(&wtr);
Ok(())
}
/// sender id, we use this for identifying if its a blob from the leader that we should
/// retransmit. eventually blobs should have a signature that we can use ffor spam filtering
pub fn get_id(&self) -> Result<PublicKey> {
let e = deserialize(&self.data[BLOB_INDEX_END..BLOB_ID_END])?;
Ok(e)
@ -317,9 +319,10 @@ impl Blob {
let r = re.allocate();
{
let mut p = r.write().expect("'r' write lock in pub fn recv_from");
trace!("receiving on {}", socket.local_addr().unwrap());
match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => {
trace!("got {:?} messages", i);
trace!("got {:?} messages on {}", i, socket.local_addr().unwrap());
break;
}
Err(e) => {

View File

@ -8,8 +8,8 @@
use entry::Entry;
use hash::Hash;
use recorder::Recorder;
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::thread::{spawn, JoinHandle};
use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError};
use std::thread::{Builder, JoinHandle};
use std::time::{Duration, Instant};
use transaction::Transaction;
@ -27,31 +27,17 @@ pub struct RecordStage {
impl RecordStage {
/// A background thread that will continue tagging received Event messages and
/// sending back Entry messages until either the receiver or sender channel is closed.
pub fn new(
transaction_receiver: Receiver<Signal>,
start_hash: &Hash,
tick_duration: Option<Duration>,
) -> Self {
pub fn new(signal_receiver: Receiver<Signal>, start_hash: &Hash) -> Self {
let (entry_sender, entry_receiver) = channel();
let start_hash = start_hash.clone();
let thread_hdl = spawn(move || {
let mut recorder = Recorder::new(start_hash);
let duration_data = tick_duration.map(|dur| (Instant::now(), dur));
loop {
if let Err(_) = Self::process_transactions(
&mut recorder,
duration_data,
&transaction_receiver,
&entry_sender,
) {
return;
}
if duration_data.is_some() {
recorder.hash();
}
}
});
let thread_hdl = Builder::new()
.name("solana-record-stage".to_string())
.spawn(move || {
let mut recorder = Recorder::new(start_hash);
let _ = Self::process_signals(&mut recorder, &signal_receiver, &entry_sender);
})
.unwrap();
RecordStage {
entry_receiver,
@ -59,29 +45,81 @@ impl RecordStage {
}
}
pub fn process_transactions(
/// Same as `RecordStage::new`, but will automatically produce entries every `tick_duration`.
pub fn new_with_clock(
signal_receiver: Receiver<Signal>,
start_hash: &Hash,
tick_duration: Duration,
) -> Self {
let (entry_sender, entry_receiver) = channel();
let start_hash = start_hash.clone();
let thread_hdl = Builder::new()
.name("solana-record-stage".to_string())
.spawn(move || {
let mut recorder = Recorder::new(start_hash);
let start_time = Instant::now();
loop {
if let Err(_) = Self::try_process_signals(
&mut recorder,
start_time,
tick_duration,
&signal_receiver,
&entry_sender,
) {
return;
}
recorder.hash();
}
})
.unwrap();
RecordStage {
entry_receiver,
thread_hdl,
}
}
fn process_signal(
signal: Signal,
recorder: &mut Recorder,
sender: &Sender<Entry>,
) -> Result<(), ()> {
let txs = if let Signal::Events(txs) = signal {
txs
} else {
vec![]
};
let entry = recorder.record(txs);
sender.send(entry).map_err(|_| ())
}
fn process_signals(
recorder: &mut Recorder,
duration_data: Option<(Instant, Duration)>,
receiver: &Receiver<Signal>,
sender: &Sender<Entry>,
) -> Result<(), ()> {
loop {
if let Some((start_time, tick_duration)) = duration_data {
if let Some(entry) = recorder.tick(start_time, tick_duration) {
sender.send(entry).or(Err(()))?;
}
match receiver.recv() {
Ok(signal) => Self::process_signal(signal, recorder, sender)?,
Err(RecvError) => return Err(()),
}
}
}
fn try_process_signals(
recorder: &mut Recorder,
start_time: Instant,
tick_duration: Duration,
receiver: &Receiver<Signal>,
sender: &Sender<Entry>,
) -> Result<(), ()> {
loop {
if let Some(entry) = recorder.tick(start_time, tick_duration) {
sender.send(entry).or(Err(()))?;
}
match receiver.try_recv() {
Ok(signal) => match signal {
Signal::Tick => {
let entry = recorder.record(vec![]);
sender.send(entry).or(Err(()))?;
}
Signal::Events(transactions) => {
let entry = recorder.record(transactions);
sender.send(entry).or(Err(()))?;
}
},
Ok(signal) => Self::process_signal(signal, recorder, sender)?,
Err(TryRecvError::Empty) => return Ok(()),
Err(TryRecvError::Disconnected) => return Err(()),
};
@ -101,7 +139,7 @@ mod tests {
fn test_historian() {
let (tx_sender, tx_receiver) = channel();
let zero = Hash::default();
let record_stage = RecordStage::new(tx_receiver, &zero, None);
let record_stage = RecordStage::new(tx_receiver, &zero);
tx_sender.send(Signal::Tick).unwrap();
sleep(Duration::new(0, 1_000_000));
@ -127,7 +165,7 @@ mod tests {
fn test_historian_closed_sender() {
let (tx_sender, tx_receiver) = channel();
let zero = Hash::default();
let record_stage = RecordStage::new(tx_receiver, &zero, None);
let record_stage = RecordStage::new(tx_receiver, &zero);
drop(record_stage.entry_receiver);
tx_sender.send(Signal::Tick).unwrap();
assert_eq!(record_stage.thread_hdl.join().unwrap(), ());
@ -137,7 +175,7 @@ mod tests {
fn test_transactions() {
let (tx_sender, signal_receiver) = channel();
let zero = Hash::default();
let record_stage = RecordStage::new(signal_receiver, &zero, None);
let record_stage = RecordStage::new(signal_receiver, &zero);
let alice_keypair = KeyPair::new();
let bob_pubkey = KeyPair::new().pubkey();
let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero);
@ -149,11 +187,11 @@ mod tests {
}
#[test]
#[ignore]
fn test_ticking_historian() {
fn test_clock() {
let (tx_sender, tx_receiver) = channel();
let zero = Hash::default();
let record_stage = RecordStage::new(tx_receiver, &zero, Some(Duration::from_millis(20)));
let record_stage =
RecordStage::new_with_clock(tx_receiver, &zero, Duration::from_millis(20));
sleep(Duration::from_millis(900));
tx_sender.send(Signal::Tick).unwrap();
drop(tx_sender);

View File

@ -6,7 +6,7 @@ use packet;
use result::Result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{spawn, JoinHandle};
use std::thread::{Builder, JoinHandle};
use std::time::Duration;
use streamer;
@ -41,12 +41,15 @@ impl ReplicateStage {
window_receiver: streamer::BlobReceiver,
blob_recycler: packet::BlobRecycler,
) -> Self {
let thread_hdl = spawn(move || loop {
let e = Self::replicate_requests(&bank, &window_receiver, &blob_recycler);
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
});
let thread_hdl = Builder::new()
.name("solana-replicate-stage".to_string())
.spawn(move || loop {
let e = Self::replicate_requests(&bank, &window_receiver, &blob_recycler);
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
})
.unwrap();
ReplicateStage { thread_hdl }
}
}

View File

@ -11,7 +11,7 @@ use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
use std::sync::Arc;
use std::thread::{spawn, JoinHandle};
use std::thread::{Builder, JoinHandle};
use std::time::Instant;
use streamer;
use timing;
@ -90,20 +90,23 @@ impl RequestStage {
let request_processor = Arc::new(request_processor);
let request_processor_ = request_processor.clone();
let (blob_sender, blob_receiver) = channel();
let thread_hdl = spawn(move || loop {
let e = Self::process_request_packets(
&request_processor_,
&packet_receiver,
&blob_sender,
&packet_recycler,
&blob_recycler,
);
if e.is_err() {
if exit.load(Ordering::Relaxed) {
break;
let thread_hdl = Builder::new()
.name("solana-request-stage".to_string())
.spawn(move || loop {
let e = Self::process_request_packets(
&request_processor_,
&packet_receiver,
&blob_sender,
&packet_recycler,
&blob_recycler,
);
if e.is_err() {
if exit.load(Ordering::Relaxed) {
break;
}
}
}
});
})
.unwrap();
RequestStage {
thread_hdl,
blob_receiver,

View File

@ -2,7 +2,7 @@
use bank::Bank;
use crdt::{Crdt, ReplicatedData};
use hash::Hash;
use data_replicator::DataReplicator;
use packet;
use rpu::Rpu;
use std::io::Write;
@ -22,7 +22,6 @@ pub struct Server {
impl Server {
pub fn new_leader<W: Write + Send + 'static>(
bank: Bank,
start_hash: Hash,
tick_duration: Option<Duration>,
me: ReplicatedData,
requests_socket: UdpSocket,
@ -41,7 +40,6 @@ impl Server {
let blob_recycler = packet::BlobRecycler::default();
let tpu = Tpu::new(
bank.clone(),
start_hash,
tick_duration,
transactions_socket,
blob_recycler.clone(),
@ -51,19 +49,26 @@ impl Server {
thread_hdls.extend(tpu.thread_hdls);
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
let window = streamer::default_window();
let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip_socket, exit.clone());
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let data_replicator = DataReplicator::new(
crdt.clone(),
window.clone(),
gossip_socket,
gossip_send_socket,
exit.clone(),
).expect("DataReplicator::new");
thread_hdls.extend(data_replicator.thread_hdls);
let t_broadcast = streamer::broadcaster(
broadcast_socket,
exit.clone(),
crdt.clone(),
crdt,
window,
blob_recycler.clone(),
tpu.blob_receiver,
);
thread_hdls.extend(vec![t_gossip, t_listen, t_broadcast]);
thread_hdls.extend(vec![t_broadcast]);
Server { thread_hdls }
}

View File

@ -10,7 +10,7 @@ use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, RwLock};
use std::thread::{spawn, JoinHandle};
use std::thread::{Builder, JoinHandle};
use std::time::Duration;
pub const WINDOW_SIZE: usize = 2 * 1024;
@ -58,10 +58,13 @@ pub fn receiver(
if res.is_err() {
panic!("streamer::receiver set_read_timeout error");
}
spawn(move || {
let _ = recv_loop(&sock, &exit, &recycler, &packet_sender);
()
})
Builder::new()
.name("solana-receiver".to_string())
.spawn(move || {
let _ = recv_loop(&sock, &exit, &recycler, &packet_sender);
()
})
.unwrap()
}
fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> {
@ -96,16 +99,20 @@ pub fn responder(
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
spawn(move || loop {
if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) {
break;
}
})
Builder::new()
.name("solana-responder".to_string())
.spawn(move || loop {
if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) {
break;
}
})
.unwrap()
}
//TODO, we would need to stick block authentication before we create the
//window.
fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> {
trace!("receiving on {}", sock.local_addr().unwrap());
let dq = Blob::recv_from(recycler, sock)?;
if !dq.is_empty() {
s.send(dq)?;
@ -123,12 +130,15 @@ pub fn blob_receiver(
//1 second timeout on socket read
let timer = Duration::new(1, 0);
sock.set_read_timeout(Some(timer))?;
let t = spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = recv_blobs(&recycler, &sock, &s);
});
let t = Builder::new()
.name("solana-blob_receiver".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = recv_blobs(&recycler, &sock, &s);
})
.unwrap();
Ok(t)
}
@ -319,35 +329,38 @@ pub fn window(
s: BlobSender,
retransmit: BlobSender,
) -> JoinHandle<()> {
spawn(move || {
let mut consumed = 0;
let mut received = 0;
let mut last = 0;
let mut times = 0;
loop {
if exit.load(Ordering::Relaxed) {
break;
Builder::new()
.name("solana-window".to_string())
.spawn(move || {
let mut consumed = 0;
let mut received = 0;
let mut last = 0;
let mut times = 0;
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = recv_window(
&window,
&crdt,
&recycler,
&mut consumed,
&mut received,
&r,
&s,
&retransmit,
);
let _ = repair_window(
&window,
&crdt,
&mut last,
&mut times,
&mut consumed,
&mut received,
);
}
let _ = recv_window(
&window,
&crdt,
&recycler,
&mut consumed,
&mut received,
&r,
&s,
&retransmit,
);
let _ = repair_window(
&window,
&crdt,
&mut last,
&mut times,
&mut consumed,
&mut received,
);
}
})
})
.unwrap()
}
fn broadcast(
@ -414,15 +427,18 @@ pub fn broadcaster(
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
spawn(move || {
let mut transmit_index = 0;
loop {
if exit.load(Ordering::Relaxed) {
break;
Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
let mut transmit_index = 0;
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = broadcast(&crdt, &window, &recycler, &r, &sock, &mut transmit_index);
}
let _ = broadcast(&crdt, &window, &recycler, &r, &sock, &mut transmit_index);
}
})
})
.unwrap()
}
fn retransmit(
@ -462,17 +478,20 @@ pub fn retransmitter(
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
spawn(move || {
trace!("retransmitter started");
loop {
if exit.load(Ordering::Relaxed) {
break;
Builder::new()
.name("solana-retransmitter".to_string())
.spawn(move || {
trace!("retransmitter started");
loop {
if exit.load(Ordering::Relaxed) {
break;
}
// TODO: handle this error
let _ = retransmit(&crdt, &recycler, &r, &sock);
}
// TODO: handle this error
let _ = retransmit(&crdt, &recycler, &r, &sock);
}
trace!("exiting retransmitter");
})
trace!("exiting retransmitter");
})
.unwrap()
}
#[cfg(all(feature = "unstable", test))]
@ -584,7 +603,6 @@ mod bench {
#[cfg(test)]
mod test {
use crdt::{Crdt, ReplicatedData};
use logger;
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
use signature::KeyPair;
use signature::KeyPairUtil;
@ -595,9 +613,8 @@ mod test {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
use streamer::{blob_receiver, receiver, responder, retransmitter, window};
use streamer::{blob_receiver, receiver, responder, window};
use streamer::{default_window, BlobReceiver, PacketReceiver};
fn get_msgs(r: PacketReceiver, num: &mut usize) {
@ -735,111 +752,4 @@ mod test {
t_responder.join().expect("join");
t_window.join().expect("join");
}
fn test_node() -> (Arc<RwLock<Crdt>>, UdpSocket, UdpSocket, UdpSocket) {
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
let serve = UdpSocket::bind("127.0.0.1:0").unwrap();
let transaction = UdpSocket::bind("127.0.0.1:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
transaction.local_addr().unwrap(),
);
trace!("data: {:?}", d);
let crdt = Crdt::new(d);
(Arc::new(RwLock::new(crdt)), gossip, replicate, serve)
}
#[test]
#[ignore]
//retransmit from leader to replicate target
pub fn retransmit() {
logger::setup();
trace!("retransmit test start");
let exit = Arc::new(AtomicBool::new(false));
let (crdt_leader, sock_gossip_leader, _, sock_leader) = test_node();
let (crdt_target, sock_gossip_target, sock_replicate_target, _) = test_node();
let leader_data = crdt_leader.read().unwrap().my_data().clone();
crdt_leader.write().unwrap().insert(&leader_data);
crdt_leader.write().unwrap().set_leader(leader_data.id);
let t_crdt_leader_g = Crdt::gossip(crdt_leader.clone(), exit.clone());
let window_leader = Arc::new(RwLock::new(vec![]));
let t_crdt_leader_l = Crdt::listen(
crdt_leader.clone(),
window_leader,
sock_gossip_leader,
exit.clone(),
);
crdt_target.write().unwrap().insert(&leader_data);
crdt_target.write().unwrap().set_leader(leader_data.id);
let t_crdt_target_g = Crdt::gossip(crdt_target.clone(), exit.clone());
let window_target = Arc::new(RwLock::new(vec![]));
let t_crdt_target_l = Crdt::listen(
crdt_target.clone(),
window_target,
sock_gossip_target,
exit.clone(),
);
//leader retransmitter
let (s_retransmit, r_retransmit) = channel();
let blob_recycler = BlobRecycler::default();
let saddr = sock_leader.local_addr().unwrap();
let t_retransmit = retransmitter(
sock_leader,
exit.clone(),
crdt_leader.clone(),
blob_recycler.clone(),
r_retransmit,
);
//target receiver
let (s_blob_receiver, r_blob_receiver) = channel();
let t_receiver = blob_receiver(
exit.clone(),
blob_recycler.clone(),
sock_replicate_target,
s_blob_receiver,
).unwrap();
for _ in 0..10 {
let done = crdt_target.read().unwrap().update_index == 2
&& crdt_leader.read().unwrap().update_index == 2;
if done {
break;
}
let timer = Duration::new(1, 0);
sleep(timer);
}
//send the data through
let mut bq = VecDeque::new();
let b = blob_recycler.allocate();
b.write().unwrap().meta.size = 10;
bq.push_back(b);
s_retransmit.send(bq).unwrap();
let timer = Duration::new(5, 0);
trace!("Waiting for timeout");
let mut oq = r_blob_receiver.recv_timeout(timer).unwrap();
assert_eq!(oq.len(), 1);
let o = oq.pop_front().unwrap();
let ro = o.read().unwrap();
assert_eq!(ro.meta.size, 10);
assert_eq!(ro.meta.addr(), saddr);
exit.store(true, Ordering::Relaxed);
let threads = vec![
t_receiver,
t_retransmit,
t_crdt_target_g,
t_crdt_target_l,
t_crdt_leader_g,
t_crdt_leader_l,
];
for t in threads {
t.join().unwrap();
}
}
}

View File

@ -4,7 +4,6 @@
//! unstable and may change in future releases.
use bincode::{deserialize, serialize};
use futures::future::{ok, FutureResult};
use hash::Hash;
use request::{Request, Response};
use signature::{KeyPair, PublicKey, Signature};
@ -138,7 +137,7 @@ impl ThinClient {
/// Request the last Entry ID from the server. This method blocks
/// until the server sends a response.
pub fn get_last_id(&mut self) -> FutureResult<Hash, ()> {
pub fn get_last_id(&mut self) -> Hash {
info!("get_last_id");
let req = Request::GetLastId;
let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id");
@ -153,7 +152,7 @@ impl ThinClient {
}
self.process_response(resp);
}
ok(self.last_id.expect("some last_id"))
self.last_id.expect("some last_id")
}
pub fn poll_get_balance(&mut self, pubkey: &PublicKey) -> io::Result<i64> {
@ -177,7 +176,7 @@ mod tests {
use super::*;
use bank::Bank;
use budget::Budget;
use futures::Future;
use crdt::TestNode;
use logger;
use mint::Mint;
use server::Server;
@ -188,7 +187,6 @@ mod tests {
use std::thread::sleep;
use std::time::Duration;
use transaction::{Instruction, Plan};
use tvu::TestNode;
#[test]
fn test_thin_client() {
@ -202,7 +200,6 @@ mod tests {
let server = Server::new_leader(
bank,
alice.last_id(),
Some(Duration::from_millis(30)),
leader.data.clone(),
leader.sockets.requests,
@ -224,7 +221,7 @@ mod tests {
leader.data.transactions_addr,
transactions_socket,
);
let last_id = client.get_last_id().wait().unwrap();
let last_id = client.get_last_id();
let _sig = client
.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
.unwrap();
@ -247,7 +244,6 @@ mod tests {
let server = Server::new_leader(
bank,
alice.last_id(),
Some(Duration::from_millis(30)),
leader.data.clone(),
leader.sockets.requests,
@ -271,13 +267,13 @@ mod tests {
leader.data.transactions_addr,
transactions_socket,
);
let last_id = client.get_last_id().wait().unwrap();
let last_id = client.get_last_id();
let tx = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id);
let _sig = client.transfer_signed(tx).unwrap();
let last_id = client.get_last_id().wait().unwrap();
let last_id = client.get_last_id();
let mut tr2 = Transaction::new(&alice.keypair(), bob_pubkey, 501, last_id);
if let Instruction::NewContract(contract) = &mut tr2.instruction {

View File

@ -4,7 +4,6 @@
use bank::Bank;
use banking_stage::BankingStage;
use fetch_stage::FetchStage;
use hash::Hash;
use packet::{BlobRecycler, PacketRecycler};
use record_stage::RecordStage;
use sigverify_stage::SigVerifyStage;
@ -25,7 +24,6 @@ pub struct Tpu {
impl Tpu {
pub fn new<W: Write + Send + 'static>(
bank: Arc<Bank>,
start_hash: Hash,
tick_duration: Option<Duration>,
transactions_socket: UdpSocket,
blob_recycler: BlobRecycler,
@ -46,8 +44,14 @@ impl Tpu {
packet_recycler.clone(),
);
let record_stage =
RecordStage::new(banking_stage.signal_receiver, &start_hash, tick_duration);
let record_stage = match tick_duration {
Some(tick_duration) => RecordStage::new_with_clock(
banking_stage.signal_receiver,
&bank.last_id(),
tick_duration,
),
None => RecordStage::new(banking_stage.signal_receiver, &bank.last_id()),
};
let write_stage = WriteStage::new(
bank.clone(),
@ -56,7 +60,6 @@ impl Tpu {
Mutex::new(writer),
record_stage.entry_receiver,
);
let mut thread_hdls = vec![
fetch_stage.thread_hdl,
banking_stage.thread_hdl,

View File

@ -149,6 +149,7 @@ impl Transaction {
}
pub fn verify_sig(&self) -> bool {
warn!("transaction signature verification called");
self.sig.verify(&self.from, &self.get_sign_data())
}

View File

@ -22,9 +22,9 @@
use bank::Bank;
use crdt::{Crdt, ReplicatedData};
use data_replicator::DataReplicator;
use packet;
use replicate_stage::ReplicateStage;
use signature::{KeyPair, KeyPairUtil};
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
@ -49,7 +49,7 @@ impl Tvu {
pub fn new(
bank: Arc<Bank>,
me: ReplicatedData,
gossip: UdpSocket,
gossip_listen_socket: UdpSocket,
replicate: UdpSocket,
leader: ReplicatedData,
exit: Arc<AtomicBool>,
@ -62,9 +62,15 @@ impl Tvu {
crdt.write()
.expect("'crdt' write lock before insert() in pub fn replicate")
.insert(&leader);
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
let window = streamer::default_window();
let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone());
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let data_replicator = DataReplicator::new(
crdt.clone(),
window.clone(),
gossip_listen_socket,
gossip_send_socket,
exit.clone(),
).expect("DataReplicator::new");
// TODO pull this socket out through the public interface
// make sure we are on the same interface
@ -111,108 +117,52 @@ impl Tvu {
blob_recycler.clone(),
);
let threads = vec![
let mut threads = vec![
//replicate threads
t_blob_receiver,
t_retransmit,
t_window,
replicate_stage.thread_hdl,
t_gossip,
t_listen,
];
threads.extend(data_replicator.thread_hdls.into_iter());
Tvu {
thread_hdls: threads,
}
}
}
pub struct Sockets {
pub gossip: UdpSocket,
pub requests: UdpSocket,
pub replicate: UdpSocket,
pub transaction: UdpSocket,
pub respond: UdpSocket,
pub broadcast: UdpSocket,
}
pub struct TestNode {
pub data: ReplicatedData,
pub sockets: Sockets,
}
impl TestNode {
pub fn new() -> TestNode {
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let requests = UdpSocket::bind("0.0.0.0:0").unwrap();
let transaction = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let data = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
requests.local_addr().unwrap(),
transaction.local_addr().unwrap(),
);
TestNode {
data: data,
sockets: Sockets {
gossip,
requests,
replicate,
transaction,
respond,
broadcast,
},
}
}
}
#[cfg(test)]
pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
use signature::{KeyPair, KeyPairUtil};
use std::time::Duration;
let transactions_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
requests_socket.local_addr().unwrap(),
transactions_socket.local_addr().unwrap(),
);
(d, gossip, replicate, requests_socket, transactions_socket)
}
#[cfg(test)]
pub mod tests {
use bank::Bank;
use bincode::serialize;
use crdt::Crdt;
use crdt::{Crdt, TestNode};
use data_replicator::DataReplicator;
use entry::Entry;
use hash::{hash, Hash};
use logger;
use mint::Mint;
use packet::BlobRecycler;
use result::Result;
use signature::{KeyPair, KeyPairUtil};
use std::collections::VecDeque;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer;
use transaction::Transaction;
use tvu::{TestNode, Tvu};
use tvu::Tvu;
fn new_replicator(
crdt: Arc<RwLock<Crdt>>,
listen: UdpSocket,
exit: Arc<AtomicBool>,
) -> Result<DataReplicator> {
let window = streamer::default_window();
let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
DataReplicator::new(crdt, window, listen, send_sock, exit)
}
/// Test that message sent from leader to target1 and replicated to target2
#[test]
fn test_replicate() {
@ -227,9 +177,7 @@ pub mod tests {
crdt_l.set_leader(leader.data.id);
let cref_l = Arc::new(RwLock::new(crdt_l));
let t_l_gossip = Crdt::gossip(cref_l.clone(), exit.clone());
let window1 = streamer::default_window();
let t_l_listen = Crdt::listen(cref_l, window1, leader.sockets.gossip, exit.clone());
let dr_l = new_replicator(cref_l, leader.sockets.gossip, exit.clone()).unwrap();
//start crdt2
let mut crdt2 = Crdt::new(target2.data.clone());
@ -237,9 +185,7 @@ pub mod tests {
crdt2.set_leader(leader.data.id);
let leader_id = leader.data.id;
let cref2 = Arc::new(RwLock::new(crdt2));
let t2_gossip = Crdt::gossip(cref2.clone(), exit.clone());
let window2 = streamer::default_window();
let t2_listen = Crdt::listen(cref2, window2, target2.sockets.gossip, exit.clone());
let dr_2 = new_replicator(cref2, target2.sockets.gossip, exit.clone()).unwrap();
// setup some blob services to send blobs into the socket
// to simulate the source peer and get blobs out of the socket to
@ -337,11 +283,13 @@ pub mod tests {
for t in tvu.thread_hdls {
t.join().expect("join");
}
t2_gossip.join().expect("join");
t2_listen.join().expect("join");
for t in dr_l.thread_hdls {
t.join().expect("join");
}
for t in dr_2.thread_hdls {
t.join().expect("join");
}
t_receiver.join().expect("join");
t_responder.join().expect("join");
t_l_gossip.join().expect("join");
t_l_listen.join().expect("join");
}
}

View File

@ -8,7 +8,7 @@ use std::io::Write;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex};
use std::thread::{spawn, JoinHandle};
use std::thread::{Builder, JoinHandle};
use streamer;
pub struct WriteStage {
@ -26,19 +26,22 @@ impl WriteStage {
entry_receiver: Receiver<Entry>,
) -> Self {
let (blob_sender, blob_receiver) = channel();
let thread_hdl = spawn(move || loop {
let entry_writer = EntryWriter::new(&bank);
let _ = entry_writer.write_and_send_entries(
&blob_sender,
&blob_recycler,
&writer,
&entry_receiver,
);
if exit.load(Ordering::Relaxed) {
info!("broadcat_service exiting");
break;
}
});
let thread_hdl = Builder::new()
.name("solana-writer".to_string())
.spawn(move || loop {
let entry_writer = EntryWriter::new(&bank);
let _ = entry_writer.write_and_send_entries(
&blob_sender,
&blob_recycler,
&writer,
&entry_receiver,
);
if exit.load(Ordering::Relaxed) {
info!("broadcat_service exiting");
break;
}
})
.unwrap();
WriteStage {
thread_hdl,
@ -52,16 +55,19 @@ impl WriteStage {
entry_receiver: Receiver<Entry>,
) -> Self {
let (_blob_sender, blob_receiver) = channel();
let thread_hdl = spawn(move || {
let entry_writer = EntryWriter::new(&bank);
loop {
let _ = entry_writer.drain_entries(&entry_receiver);
if exit.load(Ordering::Relaxed) {
info!("drain_service exiting");
break;
let thread_hdl = Builder::new()
.name("solana-drain".to_string())
.spawn(move || {
let entry_writer = EntryWriter::new(&bank);
loop {
let _ = entry_writer.drain_entries(&entry_receiver);
if exit.load(Ordering::Relaxed) {
info!("drain_service exiting");
break;
}
}
}
});
})
.unwrap();
WriteStage {
thread_hdl,

184
tests/data_replicator.rs Normal file
View File

@ -0,0 +1,184 @@
#[macro_use]
extern crate log;
extern crate rayon;
extern crate solana;
use rayon::iter::*;
use solana::crdt::{Crdt, TestNode};
use solana::data_replicator::DataReplicator;
use solana::logger;
use solana::packet::Blob;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, DataReplicator, UdpSocket) {
let tn = TestNode::new();
let crdt = Crdt::new(tn.data.clone());
let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![]));
let d = DataReplicator::new(
c.clone(),
w,
tn.sockets.gossip,
tn.sockets.gossip_send,
exit,
).unwrap();
(c, d, tn.sockets.replicate)
}
/// Test that the network converges.
/// Run until every node in the network has a full ReplicatedData set.
/// Check that nodes stop sending updates after all the ReplicatedData has been shared.
/// tests that actually use this function are below
fn run_gossip_topo<F>(topo: F)
where
F: Fn(&Vec<(Arc<RwLock<Crdt>>, DataReplicator, UdpSocket)>) -> (),
{
let num: usize = 5;
let exit = Arc::new(AtomicBool::new(false));
let listen: Vec<_> = (0..num).map(|_| test_node(exit.clone())).collect();
topo(&listen);
let mut done = true;
for i in 0..(num * 32) {
done = false;
trace!("round {}", i);
for &(ref c, _, _) in listen.iter() {
if num == c.read().unwrap().convergence() as usize {
done = true;
break;
}
}
//at least 1 node converged
if done == true {
break;
}
sleep(Duration::new(1, 0));
}
exit.store(true, Ordering::Relaxed);
for (c, dr, _) in listen.into_iter() {
for j in dr.thread_hdls.into_iter() {
j.join().unwrap();
}
// make it clear what failed
// protocol is to chatty, updates should stop after everyone receives `num`
assert!(c.read().unwrap().update_index <= num as u64);
// protocol is not chatty enough, everyone should get `num` entries
assert_eq!(c.read().unwrap().table.len(), num);
}
assert!(done);
}
/// ring a -> b -> c -> d -> e -> a
#[test]
fn gossip_ring() {
logger::setup();
run_gossip_topo(|listen| {
let num = listen.len();
for n in 0..num {
let y = n % listen.len();
let x = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut d = yv.table[&yv.me].clone();
d.version = 0;
xv.insert(&d);
}
});
}
/// star a -> (b,c,d,e)
#[test]
fn gossip_star() {
logger::setup();
run_gossip_topo(|listen| {
let num = listen.len();
for n in 0..(num - 1) {
let x = 0;
let y = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut yd = yv.table[&yv.me].clone();
yd.version = 0;
xv.insert(&yd);
trace!("star leader {:?}", &xv.me[..4]);
}
});
}
/// rstar a <- (b,c,d,e)
#[test]
fn gossip_rstar() {
logger::setup();
run_gossip_topo(|listen| {
let num = listen.len();
let xd = {
let xv = listen[0].0.read().unwrap();
xv.table[&xv.me].clone()
};
trace!("rstar leader {:?}", &xd.id[..4]);
for n in 0..(num - 1) {
let y = (n + 1) % listen.len();
let mut yv = listen[y].0.write().unwrap();
yv.insert(&xd);
trace!("rstar insert {:?} into {:?}", &xd.id[..4], &yv.me[..4]);
}
});
}
#[test]
pub fn crdt_retransmit() {
logger::setup();
let exit = Arc::new(AtomicBool::new(false));
trace!("c1:");
let (c1, dr1, tn1) = test_node(exit.clone());
trace!("c2:");
let (c2, dr2, tn2) = test_node(exit.clone());
trace!("c3:");
let (c3, dr3, tn3) = test_node(exit.clone());
let c1_data = c1.read().unwrap().my_data().clone();
c1.write().unwrap().set_leader(c1_data.id);
c2.write().unwrap().insert(&c1_data);
c3.write().unwrap().insert(&c1_data);
c2.write().unwrap().set_leader(c1_data.id);
c3.write().unwrap().set_leader(c1_data.id);
//wait to converge
trace!("waiting to converge:");
let mut done = false;
for _ in 0..30 {
done = c1.read().unwrap().table.len() == 3 && c2.read().unwrap().table.len() == 3
&& c3.read().unwrap().table.len() == 3;
if done {
break;
}
sleep(Duration::new(1, 0));
}
assert!(done);
let mut b = Blob::default();
b.meta.size = 10;
Crdt::retransmit(&c1, &Arc::new(RwLock::new(b)), &tn1).unwrap();
let res: Vec<_> = [tn1, tn2, tn3]
.into_par_iter()
.map(|s| {
let mut b = Blob::default();
s.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let res = s.recv_from(&mut b.data);
res.is_err() //true if failed to receive the retransmit packet
})
.collect();
//true if failed receive the retransmit packet, r2, and r3 should succeed
//r1 was the sender, so it should fail to receive the packet
assert_eq!(res, [true, false, false]);
exit.store(true, Ordering::Relaxed);
let mut threads = vec![];
threads.extend(dr1.thread_hdls.into_iter());
threads.extend(dr2.thread_hdls.into_iter());
threads.extend(dr3.thread_hdls.into_iter());
for t in threads.into_iter() {
t.join().unwrap();
}
}

View File

@ -1,19 +1,18 @@
#[macro_use]
extern crate log;
extern crate bincode;
extern crate futures;
extern crate solana;
use futures::Future;
use solana::bank::Bank;
use solana::crdt::TestNode;
use solana::crdt::{Crdt, ReplicatedData};
use solana::data_replicator::DataReplicator;
use solana::logger;
use solana::mint::Mint;
use solana::server::Server;
use solana::signature::{KeyPair, KeyPairUtil, PublicKey};
use solana::streamer::default_window;
use solana::thin_client::ThinClient;
use solana::tvu::TestNode;
use std::io;
use std::io::sink;
use std::net::UdpSocket;
@ -59,16 +58,15 @@ fn converge(
let mut spy_crdt = Crdt::new(spy.data);
spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = default_window();
let t_spy_listen = Crdt::listen(
let dr = DataReplicator::new(
spy_ref.clone(),
spy_window,
spy.sockets.gossip,
exit.clone(),
);
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
spy.sockets.gossip_send,
exit,
).unwrap();
//wait for the network to converge
let mut converged = false;
for _ in 0..30 {
@ -80,8 +78,7 @@ fn converge(
sleep(Duration::new(1, 0));
}
assert!(converged);
threads.push(t_spy_listen);
threads.push(t_spy_gossip);
threads.extend(dr.thread_hdls.into_iter());
let v: Vec<ReplicatedData> = spy_ref
.read()
.unwrap()
@ -107,7 +104,6 @@ fn test_multi_node() {
let leader_bank = Bank::new(&alice);
let server = Server::new_leader(
leader_bank,
alice.last_id(),
None,
leader.data.clone(),
leader.sockets.requests,
@ -169,7 +165,7 @@ fn tx_and_retry_get_balance(
) -> io::Result<i64> {
let mut client = mk_client(leader);
trace!("getting leader last_id");
let last_id = client.get_last_id().wait().unwrap();
let last_id = client.get_last_id();
info!("executing leader transer");
let _sig = client
.transfer(500, &alice.keypair(), *bob_pubkey, &last_id)