Compare commits
41 Commits
v0.6.0-bet
...
v0.6.0
Author | SHA1 | Date | |
---|---|---|---|
fd338c3097 | |||
b66ebf5dec | |||
5da99de579 | |||
3aa2907bd6 | |||
05d1618659 | |||
86113811f2 | |||
53ecaa03f1 | |||
205c1aa505 | |||
9b54c1542b | |||
93d5d1b2ad | |||
4c0f3ed6f3 | |||
2580155bf2 | |||
6ab0dd4df9 | |||
4b8c36b6b9 | |||
359a8397c0 | |||
c9fd5d74b5 | |||
391744af97 | |||
587ab29e09 | |||
80f07dadc5 | |||
60609a44ba | |||
30c8fa46b4 | |||
7aab7d2f82 | |||
a8e1c44663 | |||
a2b92c35e1 | |||
9f2086c772 | |||
3eb005d492 | |||
68955bfcf4 | |||
9ac7070e08 | |||
e44e81bd17 | |||
f5eedd2d19 | |||
46059a37eb | |||
adc655a3a2 | |||
3058f80489 | |||
df98cae4b6 | |||
d327e0aabd | |||
17d3a6763c | |||
02c5b0343b | |||
2888e45fea | |||
f1311075d9 | |||
6c380e04a3 | |||
cef1c208a5 |
@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "solana"
|
||||
description = "The World's Fastest Blockchain"
|
||||
version = "0.6.0-beta"
|
||||
description = "Blockchain Rebuilt for Scale"
|
||||
version = "0.6.0"
|
||||
documentation = "https://docs.rs/solana"
|
||||
homepage = "http://solana.com/"
|
||||
repository = "https://github.com/solana-labs/solana"
|
||||
@ -20,6 +20,10 @@ path = "src/bin/client-demo.rs"
|
||||
name = "solana-fullnode"
|
||||
path = "src/bin/fullnode.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "solana-fullnode-config"
|
||||
path = "src/bin/fullnode-config.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "solana-genesis"
|
||||
path = "src/bin/genesis.rs"
|
||||
@ -63,6 +67,5 @@ byteorder = "^1.2.1"
|
||||
libc = "^0.2.1"
|
||||
getopts = "^0.2"
|
||||
isatty = "0.1"
|
||||
futures = "0.1"
|
||||
rand = "0.4.2"
|
||||
pnet = "^0.21.0"
|
||||
|
121
README.md
121
README.md
@ -8,20 +8,35 @@ Disclaimer
|
||||
|
||||
All claims, content, designs, algorithms, estimates, roadmaps, specifications, and performance measurements described in this project are done with the author's best effort. It is up to the reader to check and validate their accuracy and truthfulness. Furthermore nothing in this project constitutes a solicitation for investment.
|
||||
|
||||
Solana: High Performance Blockchain
|
||||
Solana: Blockchain Rebuilt for Scale
|
||||
===
|
||||
|
||||
Solana™ is a new architecture for a high performance blockchain. It aims to support
|
||||
over 700 thousand transactions per second on a gigabit network.
|
||||
Solana™ is a new blockchain architecture built from the ground up for scale. The architecture supports
|
||||
up to 710 thousand transactions per second on a gigabit network.
|
||||
|
||||
Introduction
|
||||
===
|
||||
|
||||
It's possible for a centralized database to process 710,000 transactions per second on a standard gigabit network if the transactions are, on average, no more than 178 bytes. A centralized database can also replicate itself and maintain high availability without significantly compromising that transaction rate using the distributed system technique known as Optimistic Concurrency Control [H.T.Kung, J.T.Robinson (1981)]. At Solana, we're demonstrating that these same theoretical limits apply just as well to blockchain on an adversarial network. The key ingredient? Finding a way to share time when nodes can't trust one-another. Once nodes can trust time, suddenly ~40 years of distributed systems research becomes applicable to blockchain! Furthermore, and much to our surprise, it can implemented using a mechanism that has existed in Bitcoin since day one. The Bitcoin feature is called nLocktime and it can be used to postdate transactions using block height instead of a timestamp. As a Bitcoin client, you'd use block height instead of a timestamp if you don't trust the network. Block height turns out to be an instance of what's being called a Verifiable Delay Function in cryptography circles. It's a cryptographically secure way to say time has passed. In Solana, we use a far more granular verifiable delay function, a SHA 256 hash chain, to checkpoint the ledger and coordinate consensus. With it, we implement Optimistic Concurrency Control and are now well in route towards that theoretical limit of 710,000 transactions per second.
|
||||
It's possible for a centralized database to process 710,000 transactions per second on a standard gigabit network if the transactions are, on average, no more than 176 bytes. A centralized database can also replicate itself and maintain high availability without significantly compromising that transaction rate using the distributed system technique known as Optimistic Concurrency Control [H.T.Kung, J.T.Robinson (1981)]. At Solana, we're demonstrating that these same theoretical limits apply just as well to blockchain on an adversarial network. The key ingredient? Finding a way to share time when nodes can't trust one-another. Once nodes can trust time, suddenly ~40 years of distributed systems research becomes applicable to blockchain! Furthermore, and much to our surprise, it can implemented using a mechanism that has existed in Bitcoin since day one. The Bitcoin feature is called nLocktime and it can be used to postdate transactions using block height instead of a timestamp. As a Bitcoin client, you'd use block height instead of a timestamp if you don't trust the network. Block height turns out to be an instance of what's being called a Verifiable Delay Function in cryptography circles. It's a cryptographically secure way to say time has passed. In Solana, we use a far more granular verifiable delay function, a SHA 256 hash chain, to checkpoint the ledger and coordinate consensus. With it, we implement Optimistic Concurrency Control and are now well in route towards that theoretical limit of 710,000 transactions per second.
|
||||
|
||||
Running the demo
|
||||
|
||||
Testnet Demos
|
||||
===
|
||||
|
||||
The Solana repo contains all the scripts you might need to spin up your own
|
||||
local testnet. Depending on what you're looking to achieve, you may want to
|
||||
run a different variation, as the full-fledged, performance-enhanced
|
||||
multinode testnet is considerably more complex to set up than a Rust-only,
|
||||
singlenode testnode. If you are looking to develop high-level features, such
|
||||
as experimenting with smart contracts, save yourself some setup headaches and
|
||||
stick to the Rust-only singlenode demo. If you're doing performance optimization
|
||||
of the transaction pipeline, consider the enhanced singlenode demo. If you're
|
||||
doing consensus work, you'll need at least a Rust-only multinode demo. If you want
|
||||
to reproduce our TPS metrics, run the enhanced multinode demo.
|
||||
|
||||
For all four variations, you'd need the latest Rust toolchain and the Solana
|
||||
source code:
|
||||
|
||||
First, install Rust's package manager Cargo.
|
||||
|
||||
```bash
|
||||
@ -36,6 +51,19 @@ $ git clone https://github.com/solana-labs/solana.git
|
||||
$ cd solana
|
||||
```
|
||||
|
||||
The demo code is sometimes broken between releases as we add new low-level
|
||||
features, so if this is your first time running the demo, you'll improve
|
||||
your odds of success if you check out the
|
||||
[latest release](https://github.com/solana-labs/solana/releases)
|
||||
before proceeding:
|
||||
|
||||
```bash
|
||||
$ git checkout v0.6.0
|
||||
```
|
||||
|
||||
Singlenode Testnet
|
||||
---
|
||||
|
||||
The fullnode server is initialized with a ledger from stdin and
|
||||
generates new ledger entries on stdout. To create the input ledger, we'll need
|
||||
to create *the mint* and use it to generate a *genesis ledger*. It's done in
|
||||
@ -47,51 +75,87 @@ $ echo 1000000000 | cargo run --release --bin solana-mint-demo > mint-demo.json
|
||||
$ cat mint-demo.json | cargo run --release --bin solana-genesis-demo > genesis.log
|
||||
```
|
||||
|
||||
Before you start the server, make sure you know the IP address of the machine ou want to be the leader for the demo, and make sure that udp ports 8000-10000 are open on all the machines you wan to test with. Now you can start the server:
|
||||
Before you start a fullnode, make sure you know the IP address of the machine you
|
||||
want to be the leader for the demo, and make sure that udp ports 8000-10000 are
|
||||
open on all the machines you want to test with.
|
||||
|
||||
Generate a leader configuration file with:
|
||||
|
||||
```bash
|
||||
cargo run --release --bin solana-fullnode-config > leader.json
|
||||
```
|
||||
|
||||
Now start the server:
|
||||
|
||||
```bash
|
||||
$ cat ./multinode-demo/leader.sh
|
||||
#!/bin/bash
|
||||
export RUST_LOG=solana=info
|
||||
sudo sysctl -w net.core.rmem_max=26214400
|
||||
cat genesis.log leader.log | cargo run --release --features cuda --bin solana-fullnode -- -s leader.json -l leader.json -b 8000 -d 2>&1 | tee leader-tee.log
|
||||
$ ./multinode-demo/leader.sh
|
||||
cargo run --release --bin solana-fullnode -- -l leader.json < genesis.log
|
||||
$ ./multinode-demo/leader.sh > leader-txs.log
|
||||
```
|
||||
|
||||
Wait a few seconds for the server to initialize. It will print "Ready." when it's safe
|
||||
to start sending it transactions.
|
||||
To run a performance-enhanced fullnode on Linux, download `libcuda_verify_ed25519.a`. Enable
|
||||
it by adding `--features=cuda` to the line that runs `solana-fullnode` in `leader.sh`.
|
||||
|
||||
Now you can start some validators:
|
||||
```bash
|
||||
$ wget https://solana-build-artifacts.s3.amazonaws.com/v0.6.0/libcuda_verify_ed25519.a
|
||||
cargo run --release --features=cuda --bin solana-fullnode -- -l leader.json < genesis.log
|
||||
```
|
||||
|
||||
Wait a few seconds for the server to initialize. It will print "Ready." when it's ready to
|
||||
receive transactions.
|
||||
|
||||
Multinode Testnet
|
||||
---
|
||||
|
||||
To run a multinode testnet, after starting a leader node, spin up some validator nodes:
|
||||
|
||||
```bash
|
||||
$ cat ./multinode-demo/validator.sh
|
||||
#!/bin/bash
|
||||
rsync -v -e ssh $1:~/solana/mint-demo.json .
|
||||
rsync -v -e ssh $1:~/solana/leader.json .
|
||||
rsync -v -e ssh $1:~/solana/genesis.log .
|
||||
rsync -v -e ssh $1:~/solana/leader.log .
|
||||
rsync -v -e ssh $1:~/solana/libcuda_verify_ed25519.a .
|
||||
rsync -v -e ssh $1/mint-demo.json .
|
||||
rsync -v -e ssh $1/leader.json .
|
||||
rsync -v -e ssh $1/genesis.log .
|
||||
export RUST_LOG=solana=info
|
||||
sudo sysctl -w net.core.rmem_max=26214400
|
||||
cat genesis.log leader.log | cargo run --release --features cuda --bin solana-fullnode -- -l validator.json -s validator.json -v leader.json -b 9000 -d 2>&1 | tee validator-tee.log
|
||||
$ ./multinode-demo/validator.sh ubuntu@10.0.1.51 #The leader machine
|
||||
cargo run --release --bin solana-fullnode -- -l validator.json -v leader.json -b 9000 -d < genesis.log
|
||||
$ ./multinode-demo/validator.sh ubuntu@10.0.1.51:~/solana > validator-txs.log #The leader machine
|
||||
```
|
||||
|
||||
As with the leader node, you can run a performance-enhanced validator fullnode by adding
|
||||
`--features=cuda` to the line that runs `solana-fullnode` in `validator.sh`.
|
||||
|
||||
```bash
|
||||
cargo run --release --features=cuda --bin solana-fullnode -- -l validator.json -v leader.json -b 9000 -d < genesis.log
|
||||
```
|
||||
|
||||
|
||||
Then, in a separate shell, let's execute some transactions. Note we pass in
|
||||
Testnet Client Demo
|
||||
---
|
||||
|
||||
Now that your singlenode or multinode testnet is up and running, in a separate shell, let's send it some transactions! Note we pass in
|
||||
the JSON configuration file here, not the genesis ledger.
|
||||
|
||||
```bash
|
||||
$ cat ./multinode-demo/client.sh
|
||||
#!/bin/bash
|
||||
export RUST_LOG=solana=info
|
||||
rsync -v -e ssh $1:~/solana/leader.json .
|
||||
rsync -v -e ssh $1:~/solana/mint-demo.json .
|
||||
cat mint-demo.json | cargo run --release --bin solana-client-demo -- -l leader.json -c 8100 -n 1
|
||||
$ ./multinode-demo/client.sh ubuntu@10.0.1.51 #The leader machine
|
||||
rsync -v -e ssh $1/leader.json .
|
||||
rsync -v -e ssh $1/mint-demo.json .
|
||||
cat mint-demo.json | cargo run --release --bin solana-client-demo -- -l leader.json
|
||||
$ ./multinode-demo/client.sh ubuntu@10.0.1.51:~/solana #The leader machine
|
||||
```
|
||||
|
||||
Try starting a more validators and reruning the client demo!
|
||||
What just happened? The client demo spins up several threads to send 500,000 transactions
|
||||
to the testnet as quickly as it can. The client then pings the testnet periodically to see
|
||||
how many transactions it processed in that time. Take note that the demo intentionally
|
||||
floods the network with UDP packets, such that the network will almost certainly drop a
|
||||
bunch of them. This ensures the testnet has an opportunity to reach 710k TPS. The client
|
||||
demo completes after it has convinced itself the testnet won't process any additional
|
||||
transactions. You should see several TPS measurements printed to the screen. In the
|
||||
multinode variation, you'll see TPS measurements for each validator node as well.
|
||||
|
||||
Developing
|
||||
===
|
||||
@ -107,7 +171,7 @@ $ source $HOME/.cargo/env
|
||||
$ rustup component add rustfmt-preview
|
||||
```
|
||||
|
||||
If your rustc version is lower than 1.25.0, please update it:
|
||||
If your rustc version is lower than 1.26.1, please update it:
|
||||
|
||||
```bash
|
||||
$ rustup update
|
||||
@ -161,13 +225,6 @@ Run the benchmarks:
|
||||
$ cargo +nightly bench --features="unstable"
|
||||
```
|
||||
|
||||
To run the benchmarks on Linux with GPU optimizations enabled:
|
||||
|
||||
```bash
|
||||
$ wget https://solana-build-artifacts.s3.amazonaws.com/v0.5.0/libcuda_verify_ed25519.a
|
||||
$ cargo +nightly bench --features="unstable,cuda"
|
||||
```
|
||||
|
||||
Code coverage
|
||||
---
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
#!/bin/bash -e
|
||||
|
||||
if [[ -z "$1" ]]; then
|
||||
echo "usage: $0 [leader machine]"
|
||||
echo "usage: $0 [network path to solana repo on leader machine]"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@ -9,8 +9,8 @@ LEADER="$1"
|
||||
|
||||
set -x
|
||||
export RUST_LOG=solana=info
|
||||
rsync -v -e ssh "$LEADER:~/solana/leader.json" .
|
||||
rsync -v -e ssh "$LEADER:~/solana/mint-demo.json" .
|
||||
rsync -v -e ssh "$LEADER/leader.json" .
|
||||
rsync -v -e ssh "$LEADER/mint-demo.json" .
|
||||
|
||||
cargo run --release --bin solana-client-demo -- \
|
||||
-l leader.json -c 8100 -n 1 < mint-demo.json
|
||||
-l leader.json < mint-demo.json 2>&1 | tee client.log
|
||||
|
@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
export RUST_LOG=solana=info
|
||||
sudo sysctl -w net.core.rmem_max=26214400
|
||||
cat genesis.log leader.log | cargo run --release --features cuda --bin solana-fullnode -- -s leader.json -l leader.json -b 8000 -d 2>&1 | tee leader-tee.log
|
||||
cargo run --release --bin solana-fullnode -- -l leader.json < genesis.log
|
||||
|
@ -1,7 +1,7 @@
|
||||
#!/bin/bash -e
|
||||
|
||||
if [[ -z "$1" ]]; then
|
||||
echo "usage: $0 [leader machine]"
|
||||
echo "usage: $0 [network path to solana repo on leader machine]"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@ -9,16 +9,13 @@ LEADER="$1"
|
||||
|
||||
set -x
|
||||
|
||||
rsync -v -e ssh "$LEADER:~/solana/mint-demo.json" .
|
||||
rsync -v -e ssh "$LEADER:~/solana/leader.json" .
|
||||
rsync -v -e ssh "$LEADER:~/solana/genesis.log" .
|
||||
rsync -v -e ssh "$LEADER:~/solana/leader.log" .
|
||||
rsync -v -e ssh "$LEADER:~/solana/libcuda_verify_ed25519.a" .
|
||||
rsync -v -e ssh "$LEADER/mint-demo.json" .
|
||||
rsync -v -e ssh "$LEADER/leader.json" .
|
||||
rsync -v -e ssh "$LEADER/genesis.log" .
|
||||
|
||||
export RUST_LOG=solana=info
|
||||
|
||||
sudo sysctl -w net.core.rmem_max=26214400
|
||||
|
||||
cat genesis.log leader.log | \
|
||||
cargo run --release --features cuda --bin solana-fullnode -- \
|
||||
-l validator.json -s validator.json -v leader.json -b 9000 -d 2>&1 | tee validator-tee.log
|
||||
cargo run --release --features=cuda --bin solana-fullnode -- \
|
||||
-l validator.json -v leader.json -b 9000 -d < genesis.log
|
||||
|
@ -260,12 +260,15 @@ impl Bank {
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn process_entries(&self, entries: Vec<Entry>) -> Result<()> {
|
||||
pub fn process_entries<I>(&self, entries: I) -> Result<()>
|
||||
where
|
||||
I: IntoIterator<Item = Entry>,
|
||||
{
|
||||
for entry in entries {
|
||||
self.register_entry_id(&entry.id);
|
||||
for result in self.process_transactions(entry.transactions) {
|
||||
result?;
|
||||
}
|
||||
self.register_entry_id(&entry.id);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -11,7 +11,7 @@ use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||
use std::sync::Arc;
|
||||
use std::thread::{spawn, JoinHandle};
|
||||
use std::thread::{Builder, JoinHandle};
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use timing;
|
||||
@ -30,19 +30,22 @@ impl BankingStage {
|
||||
packet_recycler: packet::PacketRecycler,
|
||||
) -> Self {
|
||||
let (signal_sender, signal_receiver) = channel();
|
||||
let thread_hdl = spawn(move || loop {
|
||||
let e = Self::process_packets(
|
||||
bank.clone(),
|
||||
&verified_receiver,
|
||||
&signal_sender,
|
||||
&packet_recycler,
|
||||
);
|
||||
if e.is_err() {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
let thread_hdl = Builder::new()
|
||||
.name("solana-banking-stage".to_string())
|
||||
.spawn(move || loop {
|
||||
let e = Self::process_packets(
|
||||
bank.clone(),
|
||||
&verified_receiver,
|
||||
&signal_sender,
|
||||
&packet_recycler,
|
||||
);
|
||||
if e.is_err() {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
})
|
||||
.unwrap();
|
||||
BankingStage {
|
||||
thread_hdl,
|
||||
signal_receiver,
|
||||
|
@ -1,4 +1,3 @@
|
||||
extern crate futures;
|
||||
extern crate getopts;
|
||||
extern crate isatty;
|
||||
extern crate pnet;
|
||||
@ -6,12 +5,12 @@ extern crate rayon;
|
||||
extern crate serde_json;
|
||||
extern crate solana;
|
||||
|
||||
use futures::Future;
|
||||
use getopts::Options;
|
||||
use isatty::stdin_isatty;
|
||||
use pnet::datalink;
|
||||
use rayon::prelude::*;
|
||||
use solana::crdt::{Crdt, ReplicatedData};
|
||||
use solana::data_replicator::DataReplicator;
|
||||
use solana::mint::MintDemo;
|
||||
use solana::signature::{GenKeys, KeyPair, KeyPairUtil};
|
||||
use solana::streamer::default_window;
|
||||
@ -20,7 +19,7 @@ use solana::transaction::Transaction;
|
||||
use std::env;
|
||||
use std::fs::File;
|
||||
use std::io::{stdin, Read};
|
||||
use std::net::{IpAddr, SocketAddr, UdpSocket};
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
|
||||
use std::process::exit;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
@ -51,13 +50,13 @@ fn get_ip_addr() -> Option<IpAddr> {
|
||||
|
||||
fn main() {
|
||||
let mut threads = 4usize;
|
||||
let mut num_nodes = 10usize;
|
||||
let mut leader = "leader.json".to_string();
|
||||
let mut num_nodes = 1usize;
|
||||
|
||||
let mut opts = Options::new();
|
||||
opts.optopt("l", "", "leader", "leader.json");
|
||||
opts.optopt("c", "", "client port", "port");
|
||||
opts.optopt("t", "", "number of threads", &format!("{}", threads));
|
||||
opts.optflag("d", "dyn", "detect network address dynamically");
|
||||
opts.optopt(
|
||||
"n",
|
||||
"",
|
||||
@ -79,15 +78,14 @@ fn main() {
|
||||
print_usage(&program, opts);
|
||||
return;
|
||||
}
|
||||
if matches.opt_present("l") {
|
||||
leader = matches.opt_str("l").unwrap();
|
||||
}
|
||||
let mut addr: SocketAddr = "127.0.0.1:8010".parse().unwrap();
|
||||
let mut addr: SocketAddr = "0.0.0.0:8100".parse().unwrap();
|
||||
if matches.opt_present("c") {
|
||||
let port = matches.opt_str("c").unwrap().parse().unwrap();
|
||||
addr.set_port(port);
|
||||
}
|
||||
addr.set_ip(get_ip_addr().unwrap());
|
||||
if matches.opt_present("d") {
|
||||
addr.set_ip(get_ip_addr().unwrap());
|
||||
}
|
||||
let client_addr: Arc<RwLock<SocketAddr>> = Arc::new(RwLock::new(addr));
|
||||
if matches.opt_present("t") {
|
||||
threads = matches.opt_str("t").unwrap().parse().expect("integer");
|
||||
@ -96,7 +94,13 @@ fn main() {
|
||||
num_nodes = matches.opt_str("n").unwrap().parse().expect("integer");
|
||||
}
|
||||
|
||||
let leader: ReplicatedData = read_leader(leader);
|
||||
let leader = if matches.opt_present("l") {
|
||||
read_leader(matches.opt_str("l").unwrap())
|
||||
} else {
|
||||
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
|
||||
ReplicatedData::new_leader(&server_addr)
|
||||
};
|
||||
|
||||
let signal = Arc::new(AtomicBool::new(false));
|
||||
let mut c_threads = vec![];
|
||||
let validators = converge(
|
||||
@ -127,7 +131,7 @@ fn main() {
|
||||
let mut client = mk_client(&client_addr, &leader);
|
||||
|
||||
println!("Get last ID...");
|
||||
let last_id = client.get_last_id().wait().unwrap();
|
||||
let last_id = client.get_last_id();
|
||||
println!("Got last ID {:?}", last_id);
|
||||
|
||||
let rnd = GenKeys::new(demo.mint.keypair().public_key_bytes());
|
||||
@ -160,7 +164,11 @@ fn main() {
|
||||
let sz = transactions.len() / threads;
|
||||
let chunks: Vec<_> = transactions.chunks(sz).collect();
|
||||
chunks.into_par_iter().for_each(|txs| {
|
||||
println!("Transferring 1 unit {} times... to", txs.len());
|
||||
println!(
|
||||
"Transferring 1 unit {} times... to {:?}",
|
||||
txs.len(),
|
||||
leader.transactions_addr
|
||||
);
|
||||
let client = mk_client(&client_addr, &leader);
|
||||
for tx in txs {
|
||||
client.transfer_signed(tx.clone()).unwrap();
|
||||
@ -245,9 +253,15 @@ fn converge(
|
||||
spy_crdt.insert(&leader);
|
||||
spy_crdt.set_leader(leader.id);
|
||||
let spy_ref = Arc::new(RwLock::new(spy_crdt));
|
||||
let spy_window = default_window();
|
||||
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone());
|
||||
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
|
||||
let window = default_window();
|
||||
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
|
||||
let data_replicator = DataReplicator::new(
|
||||
spy_ref.clone(),
|
||||
window.clone(),
|
||||
spy_gossip,
|
||||
gossip_send_socket,
|
||||
exit.clone(),
|
||||
).expect("DataReplicator::new");
|
||||
//wait for the network to converge
|
||||
for _ in 0..30 {
|
||||
let min = spy_ref.read().unwrap().convergence();
|
||||
@ -257,8 +271,7 @@ fn converge(
|
||||
}
|
||||
sleep(Duration::new(1, 0));
|
||||
}
|
||||
threads.push(t_spy_listen);
|
||||
threads.push(t_spy_gossip);
|
||||
threads.extend(data_replicator.thread_hdls.into_iter());
|
||||
let v: Vec<ReplicatedData> = spy_ref
|
||||
.read()
|
||||
.unwrap()
|
||||
|
52
src/bin/fullnode-config.rs
Normal file
52
src/bin/fullnode-config.rs
Normal file
@ -0,0 +1,52 @@
|
||||
extern crate getopts;
|
||||
extern crate serde_json;
|
||||
extern crate solana;
|
||||
|
||||
use getopts::Options;
|
||||
use solana::crdt::{get_ip_addr, parse_port_or_addr, ReplicatedData};
|
||||
use std::env;
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use std::process::exit;
|
||||
|
||||
fn print_usage(program: &str, opts: Options) {
|
||||
let mut brief = format!("Usage: {} [options]\n\n", program);
|
||||
brief += " Create a solana fullnode config file\n";
|
||||
|
||||
print!("{}", opts.usage(&brief));
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let mut opts = Options::new();
|
||||
opts.optopt("b", "", "bind", "bind to port or address");
|
||||
opts.optflag("d", "dyn", "detect network address dynamically");
|
||||
opts.optflag("h", "help", "print help");
|
||||
let args: Vec<String> = env::args().collect();
|
||||
let matches = match opts.parse(&args[1..]) {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
eprintln!("{}", e);
|
||||
exit(1);
|
||||
}
|
||||
};
|
||||
if matches.opt_present("h") {
|
||||
let program = args[0].clone();
|
||||
print_usage(&program, opts);
|
||||
return;
|
||||
}
|
||||
|
||||
let bind_addr: SocketAddr = {
|
||||
let mut bind_addr = parse_port_or_addr(matches.opt_str("b"));
|
||||
if matches.opt_present("d") {
|
||||
let ip = get_ip_addr().unwrap();
|
||||
bind_addr.set_ip(ip);
|
||||
}
|
||||
bind_addr
|
||||
};
|
||||
|
||||
// we need all the receiving sockets to be bound within the expected
|
||||
// port range that we open on aws
|
||||
let repl_data = ReplicatedData::new_leader(&bind_addr);
|
||||
let stdout = io::stdout();
|
||||
serde_json::to_writer(stdout, &repl_data).expect("serialize");
|
||||
}
|
@ -1,24 +1,23 @@
|
||||
extern crate env_logger;
|
||||
extern crate getopts;
|
||||
extern crate isatty;
|
||||
extern crate pnet;
|
||||
extern crate serde_json;
|
||||
extern crate solana;
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
use getopts::Options;
|
||||
use isatty::stdin_isatty;
|
||||
use pnet::datalink;
|
||||
use solana::bank::Bank;
|
||||
use solana::crdt::ReplicatedData;
|
||||
use solana::entry::Entry;
|
||||
use solana::payment_plan::PaymentPlan;
|
||||
use solana::server::Server;
|
||||
use solana::signature::{KeyPair, KeyPairUtil};
|
||||
use solana::transaction::Instruction;
|
||||
use std::env;
|
||||
use std::fs::File;
|
||||
use std::io::{stdin, Read};
|
||||
use std::net::{IpAddr, SocketAddr, UdpSocket};
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
|
||||
use std::process::exit;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
@ -36,9 +35,6 @@ fn print_usage(program: &str, opts: Options) {
|
||||
fn main() {
|
||||
env_logger::init().unwrap();
|
||||
let mut opts = Options::new();
|
||||
opts.optopt("b", "", "bind", "bind to port or address");
|
||||
opts.optflag("d", "dyn", "detect network address dynamically");
|
||||
opts.optopt("s", "", "save", "save my identity to path.json");
|
||||
opts.optopt("l", "", "load", "load my identity to path.json");
|
||||
opts.optflag("h", "help", "print help");
|
||||
opts.optopt(
|
||||
@ -60,14 +56,6 @@ fn main() {
|
||||
print_usage(&program, opts);
|
||||
return;
|
||||
}
|
||||
let bind_addr: SocketAddr = {
|
||||
let mut bind_addr = parse_port_or_addr(matches.opt_str("b"));
|
||||
if matches.opt_present("d") {
|
||||
let ip = get_ip_addr().unwrap();
|
||||
bind_addr.set_ip(ip);
|
||||
}
|
||||
bind_addr
|
||||
};
|
||||
if stdin_isatty() {
|
||||
eprintln!("nothing found on stdin, expected a log file");
|
||||
exit(1);
|
||||
@ -112,30 +100,21 @@ fn main() {
|
||||
bank.register_entry_id(&entry1.id);
|
||||
|
||||
eprintln!("processing entries...");
|
||||
|
||||
let mut last_id = entry1.id;
|
||||
for entry in entries {
|
||||
last_id = entry.id;
|
||||
let results = bank.process_transactions(entry.transactions);
|
||||
for result in results {
|
||||
if let Err(e) = result {
|
||||
eprintln!("failed to process transaction {:?}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
bank.register_entry_id(&last_id);
|
||||
}
|
||||
bank.process_entries(entries).expect("process_entries");
|
||||
|
||||
eprintln!("creating networking stack...");
|
||||
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
// we need all the receiving sockets to be bound within the expected
|
||||
// port range that we open on aws
|
||||
let mut repl_data = make_repl_data(&bind_addr);
|
||||
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
|
||||
let mut repl_data = ReplicatedData::new_leader(&bind_addr);
|
||||
if matches.opt_present("l") {
|
||||
let path = matches.opt_str("l").unwrap();
|
||||
if let Ok(file) = File::open(path) {
|
||||
repl_data = serde_json::from_reader(file).expect("parse");
|
||||
if let Ok(file) = File::open(path.clone()) {
|
||||
if let Ok(data) = serde_json::from_reader(file) {
|
||||
repl_data = data;
|
||||
} else {
|
||||
warn!("failed to parse leader {}, generating new identity", path);
|
||||
}
|
||||
}
|
||||
}
|
||||
let threads = if matches.opt_present("v") {
|
||||
@ -160,7 +139,6 @@ fn main() {
|
||||
let file = File::create("leader.log").expect("leader.log create");
|
||||
let server = Server::new_leader(
|
||||
bank,
|
||||
last_id,
|
||||
//Some(Duration::from_millis(1000)),
|
||||
None,
|
||||
repl_data.clone(),
|
||||
@ -174,73 +152,9 @@ fn main() {
|
||||
);
|
||||
server.thread_hdls
|
||||
};
|
||||
if matches.opt_present("s") {
|
||||
let path = matches.opt_str("s").unwrap();
|
||||
let file = File::create(path).expect("file");
|
||||
serde_json::to_writer(file, &repl_data).expect("serialize");
|
||||
}
|
||||
eprintln!("Ready. Listening on {}", repl_data.transactions_addr);
|
||||
|
||||
for t in threads {
|
||||
t.join().expect("join");
|
||||
}
|
||||
}
|
||||
|
||||
fn next_port(server_addr: &SocketAddr, nxt: u16) -> SocketAddr {
|
||||
let mut gossip_addr = server_addr.clone();
|
||||
gossip_addr.set_port(server_addr.port() + nxt);
|
||||
gossip_addr
|
||||
}
|
||||
|
||||
fn make_repl_data(bind_addr: &SocketAddr) -> ReplicatedData {
|
||||
let transactions_addr = bind_addr.clone();
|
||||
let gossip_addr = next_port(&bind_addr, 1);
|
||||
let replicate_addr = next_port(&bind_addr, 2);
|
||||
let requests_addr = next_port(&bind_addr, 3);
|
||||
let pubkey = KeyPair::new().pubkey();
|
||||
ReplicatedData::new(
|
||||
pubkey,
|
||||
gossip_addr,
|
||||
replicate_addr,
|
||||
requests_addr,
|
||||
transactions_addr,
|
||||
)
|
||||
}
|
||||
|
||||
fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr {
|
||||
let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address");
|
||||
if let Some(addrstr) = optstr {
|
||||
if let Ok(port) = addrstr.parse() {
|
||||
let mut addr = daddr.clone();
|
||||
addr.set_port(port);
|
||||
addr
|
||||
} else if let Ok(addr) = addrstr.parse() {
|
||||
addr
|
||||
} else {
|
||||
daddr
|
||||
}
|
||||
} else {
|
||||
daddr
|
||||
}
|
||||
}
|
||||
|
||||
fn get_ip_addr() -> Option<IpAddr> {
|
||||
for iface in datalink::interfaces() {
|
||||
for p in iface.ips {
|
||||
if !p.ip().is_loopback() && !p.ip().is_multicast() {
|
||||
return Some(p.ip());
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_port_or_addr() {
|
||||
let p1 = parse_port_or_addr(Some("9000".to_string()));
|
||||
assert_eq!(p1.port(), 9000);
|
||||
let p2 = parse_port_or_addr(Some("127.0.0.1:7000".to_string()));
|
||||
assert_eq!(p2.port(), 7000);
|
||||
let p3 = parse_port_or_addr(None);
|
||||
assert_eq!(p3.port(), 8000);
|
||||
}
|
||||
|
@ -41,33 +41,37 @@ fn main() {
|
||||
let mint_keypair = demo.mint.keypair();
|
||||
let last_id = demo.mint.last_id();
|
||||
|
||||
eprintln!("Signing {} transactions...", num_accounts);
|
||||
let transactions: Vec<_> = keypairs
|
||||
.into_par_iter()
|
||||
.map(|rando| {
|
||||
let last_id = demo.mint.last_id();
|
||||
Transaction::new(&mint_keypair, rando.pubkey(), tokens_per_user, last_id)
|
||||
})
|
||||
.collect();
|
||||
|
||||
for entry in demo.mint.create_entries() {
|
||||
println!("{}", serde_json::to_string(&entry).unwrap());
|
||||
}
|
||||
|
||||
eprintln!("Logging the creation of {} accounts...", num_accounts);
|
||||
let entry = Entry::new(&last_id, 0, transactions);
|
||||
println!("{}", serde_json::to_string(&entry).unwrap());
|
||||
|
||||
eprintln!("Creating {} empty entries...", MAX_ENTRY_IDS);
|
||||
|
||||
// Offer client lots of entry IDs to use for each transaction's last_id.
|
||||
let mut last_id = last_id;
|
||||
let mut last_ids = vec![];
|
||||
for _ in 0..MAX_ENTRY_IDS {
|
||||
let entry = next_entry(&last_id, 1, vec![]);
|
||||
last_id = entry.id;
|
||||
last_ids.push(last_id);
|
||||
let serialized = serde_json::to_string(&entry).unwrap_or_else(|e| {
|
||||
eprintln!("failed to serialize: {}", e);
|
||||
exit(1);
|
||||
});
|
||||
println!("{}", serialized);
|
||||
}
|
||||
|
||||
eprintln!("Creating {} transactions...", num_accounts);
|
||||
let transactions: Vec<_> = keypairs
|
||||
.into_par_iter()
|
||||
.enumerate()
|
||||
.map(|(i, rando)| {
|
||||
let last_id = last_ids[i % MAX_ENTRY_IDS];
|
||||
Transaction::new(&mint_keypair, rando.pubkey(), tokens_per_user, last_id)
|
||||
})
|
||||
.collect();
|
||||
|
||||
eprintln!("Logging the creation of {} accounts...", num_accounts);
|
||||
let entry = Entry::new(&last_id, 0, transactions);
|
||||
println!("{}", serde_json::to_string(&entry).unwrap());
|
||||
}
|
||||
|
600
src/crdt.rs
600
src/crdt.rs
@ -16,27 +16,59 @@
|
||||
use bincode::{deserialize, serialize};
|
||||
use byteorder::{LittleEndian, ReadBytesExt};
|
||||
use hash::Hash;
|
||||
use packet::{SharedBlob, BLOB_SIZE};
|
||||
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
|
||||
use pnet::datalink;
|
||||
use rayon::prelude::*;
|
||||
use result::{Error, Result};
|
||||
use ring::rand::{SecureRandom, SystemRandom};
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
use signature::{PublicKey, Signature};
|
||||
use std;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::io::Cursor;
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
use std::net::{IpAddr, SocketAddr, UdpSocket};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{sleep, spawn, JoinHandle};
|
||||
use std::thread::{sleep, Builder, JoinHandle};
|
||||
use std::time::Duration;
|
||||
use streamer::{BlobReceiver, BlobSender};
|
||||
|
||||
pub fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr {
|
||||
let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address");
|
||||
if let Some(addrstr) = optstr {
|
||||
if let Ok(port) = addrstr.parse() {
|
||||
let mut addr = daddr.clone();
|
||||
addr.set_port(port);
|
||||
addr
|
||||
} else if let Ok(addr) = addrstr.parse() {
|
||||
addr
|
||||
} else {
|
||||
daddr
|
||||
}
|
||||
} else {
|
||||
daddr
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_ip_addr() -> Option<IpAddr> {
|
||||
for iface in datalink::interfaces() {
|
||||
for p in iface.ips {
|
||||
if !p.ip().is_loopback() && !p.ip().is_multicast() {
|
||||
return Some(p.ip());
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Structure to be replicated by the network
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub struct ReplicatedData {
|
||||
pub id: PublicKey,
|
||||
sig: Signature,
|
||||
/// should always be increasing
|
||||
version: u64,
|
||||
pub version: u64,
|
||||
/// address to connect to for gossip
|
||||
pub gossip_addr: SocketAddr,
|
||||
/// address to connect to for replication
|
||||
@ -74,6 +106,27 @@ impl ReplicatedData {
|
||||
last_verified_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr {
|
||||
let mut nxt_addr = addr.clone();
|
||||
nxt_addr.set_port(addr.port() + nxt);
|
||||
nxt_addr
|
||||
}
|
||||
|
||||
pub fn new_leader(bind_addr: &SocketAddr) -> Self {
|
||||
let transactions_addr = bind_addr.clone();
|
||||
let gossip_addr = Self::next_port(&bind_addr, 1);
|
||||
let replicate_addr = Self::next_port(&bind_addr, 2);
|
||||
let requests_addr = Self::next_port(&bind_addr, 3);
|
||||
let pubkey = KeyPair::new().pubkey();
|
||||
ReplicatedData::new(
|
||||
pubkey,
|
||||
gossip_addr,
|
||||
replicate_addr,
|
||||
requests_addr,
|
||||
transactions_addr,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// `Crdt` structure keeps a table of `ReplicatedData` structs
|
||||
@ -149,15 +202,20 @@ impl Crdt {
|
||||
if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) {
|
||||
//somehow we signed a message for our own identity with a higher version that
|
||||
// we have stored ourselves
|
||||
trace!("me: {:?}", self.me[0]);
|
||||
trace!("v.id: {:?}", v.id[0]);
|
||||
trace!("insert! {}", v.version);
|
||||
trace!(
|
||||
"me: {:?} v.id: {:?} version: {}",
|
||||
&self.me[..4],
|
||||
&v.id[..4],
|
||||
v.version
|
||||
);
|
||||
self.update_index += 1;
|
||||
let _ = self.table.insert(v.id.clone(), v.clone());
|
||||
let _ = self.local.insert(v.id, self.update_index);
|
||||
} else {
|
||||
trace!(
|
||||
"INSERT FAILED new.version: {} me.version: {}",
|
||||
"INSERT FAILED me: {:?} data: {:?} new.version: {} me.version: {}",
|
||||
&self.me[..4],
|
||||
&v.id[..4],
|
||||
v.version,
|
||||
self.table[&v.id].version
|
||||
);
|
||||
@ -352,18 +410,32 @@ impl Crdt {
|
||||
fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> {
|
||||
let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect();
|
||||
if options.len() < 1 {
|
||||
trace!("crdt too small for gossip");
|
||||
trace!(
|
||||
"crdt too small for gossip {:?} {}",
|
||||
&self.me[..4],
|
||||
self.table.len()
|
||||
);
|
||||
return Err(Error::CrdtTooSmall);
|
||||
}
|
||||
let n = (Self::random() as usize) % options.len();
|
||||
let v = options[n].clone();
|
||||
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
|
||||
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
|
||||
trace!(
|
||||
"created gossip request from {:?} to {:?} {}",
|
||||
&self.me[..4],
|
||||
&v.id[..4],
|
||||
v.gossip_addr
|
||||
);
|
||||
Ok((v.gossip_addr, req))
|
||||
}
|
||||
|
||||
/// At random pick a node and try to get updated changes from them
|
||||
fn run_gossip(obj: &Arc<RwLock<Self>>) -> Result<()> {
|
||||
fn run_gossip(
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
blob_sender: &BlobSender,
|
||||
blob_recycler: &BlobRecycler,
|
||||
) -> Result<()> {
|
||||
//TODO we need to keep track of stakes and weight the selection by stake size
|
||||
//TODO cache sockets
|
||||
|
||||
@ -372,12 +444,12 @@ impl Crdt {
|
||||
let (remote_gossip_addr, req) = obj.read()
|
||||
.expect("'obj' read lock in fn run_gossip")
|
||||
.gossip_request()?;
|
||||
let sock = UdpSocket::bind("0.0.0.0:0")?;
|
||||
// TODO this will get chatty, so we need to first ask for number of updates since
|
||||
// then only ask for specific data that we dont have
|
||||
let r = serialize(&req)?;
|
||||
trace!("sending gossip request to {}", remote_gossip_addr);
|
||||
sock.send_to(&r, remote_gossip_addr)?;
|
||||
let blob = to_blob(req, remote_gossip_addr, blob_recycler)?;
|
||||
let mut q: VecDeque<SharedBlob> = VecDeque::new();
|
||||
q.push_back(blob);
|
||||
blob_sender.send(q)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -397,90 +469,111 @@ impl Crdt {
|
||||
}
|
||||
|
||||
/// randomly pick a node and ask them for updates asynchronously
|
||||
pub fn gossip(obj: Arc<RwLock<Self>>, exit: Arc<AtomicBool>) -> JoinHandle<()> {
|
||||
spawn(move || loop {
|
||||
let _ = Self::run_gossip(&obj);
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
//TODO this should be a tuned parameter
|
||||
sleep(
|
||||
obj.read()
|
||||
.expect("'obj' read lock in pub fn gossip")
|
||||
.timeout,
|
||||
);
|
||||
})
|
||||
pub fn gossip(
|
||||
obj: Arc<RwLock<Self>>,
|
||||
blob_recycler: BlobRecycler,
|
||||
blob_sender: BlobSender,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> JoinHandle<()> {
|
||||
Builder::new()
|
||||
.name("solana-gossip".to_string())
|
||||
.spawn(move || loop {
|
||||
let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler);
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
//TODO this should be a tuned parameter
|
||||
sleep(
|
||||
obj.read()
|
||||
.expect("'obj' read lock in pub fn gossip")
|
||||
.timeout,
|
||||
);
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
fn run_window_request(
|
||||
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
||||
sock: &UdpSocket,
|
||||
from: &ReplicatedData,
|
||||
ix: u64,
|
||||
) -> Result<()> {
|
||||
blob_recycler: &BlobRecycler,
|
||||
) -> Option<SharedBlob> {
|
||||
let pos = (ix as usize) % window.read().unwrap().len();
|
||||
let mut outblob = vec![];
|
||||
if let &Some(ref blob) = &window.read().unwrap()[pos] {
|
||||
let rblob = blob.read().unwrap();
|
||||
let blob_ix = rblob.get_index().expect("run_window_request get_index");
|
||||
if blob_ix == ix {
|
||||
let out = blob_recycler.allocate();
|
||||
// copy to avoid doing IO inside the lock
|
||||
outblob.extend(&rblob.data[..rblob.meta.size]);
|
||||
{
|
||||
let mut outblob = out.write().unwrap();
|
||||
let sz = rblob.meta.size;
|
||||
outblob.meta.size = sz;
|
||||
outblob.data[..sz].copy_from_slice(&rblob.data[..sz]);
|
||||
outblob.meta.set_addr(&from.replicate_addr);
|
||||
//TODO, set the sender id to the requester so we dont retransmit
|
||||
//come up with a cleaner solution for this when sender signatures are checked
|
||||
outblob.set_id(from.id).expect("blob set_id");
|
||||
}
|
||||
return Some(out);
|
||||
}
|
||||
} else {
|
||||
assert!(window.read().unwrap()[pos].is_none());
|
||||
info!("failed RequestWindowIndex {} {}", ix, from.replicate_addr);
|
||||
}
|
||||
if outblob.len() > 0 {
|
||||
info!(
|
||||
"responding RequestWindowIndex {} {}",
|
||||
ix, from.replicate_addr
|
||||
);
|
||||
assert!(outblob.len() < BLOB_SIZE);
|
||||
sock.send_to(&outblob, from.replicate_addr)?;
|
||||
}
|
||||
Ok(())
|
||||
None
|
||||
}
|
||||
/// Process messages from the network
|
||||
fn run_listen(
|
||||
|
||||
//TODO we should first coalesce all the requests
|
||||
fn handle_blob(
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
||||
sock: &UdpSocket,
|
||||
) -> Result<()> {
|
||||
//TODO cache connections
|
||||
let mut buf = vec![0u8; BLOB_SIZE];
|
||||
trace!("recv_from on {}", sock.local_addr().unwrap());
|
||||
let (amt, src) = sock.recv_from(&mut buf)?;
|
||||
trace!("got request from {}", src);
|
||||
buf.resize(amt, 0);
|
||||
let r = deserialize(&buf)?;
|
||||
match r {
|
||||
blob_recycler: &BlobRecycler,
|
||||
blob: &Blob,
|
||||
) -> Option<SharedBlob> {
|
||||
match deserialize(&blob.data[..blob.meta.size]) {
|
||||
// TODO sigverify these
|
||||
Protocol::RequestUpdates(v, reqdata) => {
|
||||
trace!("RequestUpdates {} from {}", v, src);
|
||||
Ok(Protocol::RequestUpdates(v, reqdata)) => {
|
||||
trace!("RequestUpdates {}", v);
|
||||
let addr = reqdata.gossip_addr;
|
||||
// only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from`
|
||||
let (from, ups, data) = obj.read()
|
||||
.expect("'obj' read lock in RequestUpdates")
|
||||
.get_updates_since(v);
|
||||
trace!("get updates since response {} {}", v, data.len());
|
||||
let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?;
|
||||
trace!("send_to {}", addr);
|
||||
//TODO verify reqdata belongs to sender
|
||||
obj.write()
|
||||
.expect("'obj' write lock in RequestUpdates")
|
||||
.insert(&reqdata);
|
||||
assert!(rsp.len() < BLOB_SIZE);
|
||||
sock.send_to(&rsp, addr)
|
||||
.expect("'sock.send_to' in RequestUpdates");
|
||||
trace!("send_to done!");
|
||||
let len = data.len();
|
||||
let rsp = Protocol::ReceiveUpdates(from, ups, data);
|
||||
obj.write().unwrap().insert(&reqdata);
|
||||
if len < 1 {
|
||||
let me = obj.read().unwrap();
|
||||
trace!(
|
||||
"no updates me {:?} ix {} since {}",
|
||||
&me.me[..4],
|
||||
me.update_index,
|
||||
v
|
||||
);
|
||||
None
|
||||
} else if let Ok(r) = to_blob(rsp, addr, &blob_recycler) {
|
||||
trace!(
|
||||
"sending updates me {:?} len {} to {:?} {}",
|
||||
&obj.read().unwrap().me[..4],
|
||||
len,
|
||||
&reqdata.id[..4],
|
||||
addr,
|
||||
);
|
||||
Some(r)
|
||||
} else {
|
||||
warn!("to_blob failed");
|
||||
None
|
||||
}
|
||||
}
|
||||
Protocol::ReceiveUpdates(from, ups, data) => {
|
||||
trace!("ReceivedUpdates {} from {}", ups, src);
|
||||
Ok(Protocol::ReceiveUpdates(from, ups, data)) => {
|
||||
trace!("ReceivedUpdates {:?} {} {}", &from[0..4], ups, data.len());
|
||||
obj.write()
|
||||
.expect("'obj' write lock in ReceiveUpdates")
|
||||
.apply_updates(from, ups, &data);
|
||||
None
|
||||
}
|
||||
Protocol::RequestWindowIndex(from, ix) => {
|
||||
Ok(Protocol::RequestWindowIndex(from, ix)) => {
|
||||
//TODO verify from is signed
|
||||
obj.write().unwrap().insert(&from);
|
||||
let me = obj.read().unwrap().my_data().clone();
|
||||
@ -491,161 +584,130 @@ impl Crdt {
|
||||
me.replicate_addr
|
||||
);
|
||||
assert_ne!(from.replicate_addr, me.replicate_addr);
|
||||
let _ = Self::run_window_request(window, sock, &from, ix);
|
||||
Self::run_window_request(&window, &from, ix, blob_recycler)
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("deserialize crdt packet failed");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Process messages from the network
|
||||
fn run_listen(
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
||||
blob_recycler: &BlobRecycler,
|
||||
requests_receiver: &BlobReceiver,
|
||||
response_sender: &BlobSender,
|
||||
) -> Result<()> {
|
||||
//TODO cache connections
|
||||
let timeout = Duration::new(1, 0);
|
||||
let mut reqs = requests_receiver.recv_timeout(timeout)?;
|
||||
while let Ok(mut more) = requests_receiver.try_recv() {
|
||||
reqs.append(&mut more);
|
||||
}
|
||||
let resp: VecDeque<_> = reqs.iter()
|
||||
.filter_map(|b| Self::handle_blob(obj, window, blob_recycler, &b.read().unwrap()))
|
||||
.collect();
|
||||
response_sender.send(resp)?;
|
||||
while let Some(r) = reqs.pop_front() {
|
||||
blob_recycler.recycle(r);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
pub fn listen(
|
||||
obj: Arc<RwLock<Self>>,
|
||||
window: Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
||||
sock: UdpSocket,
|
||||
blob_recycler: BlobRecycler,
|
||||
requests_receiver: BlobReceiver,
|
||||
response_sender: BlobSender,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> JoinHandle<()> {
|
||||
sock.set_read_timeout(Some(Duration::new(2, 0)))
|
||||
.expect("'sock.set_read_timeout' in crdt.rs");
|
||||
spawn(move || loop {
|
||||
let e = Self::run_listen(&obj, &window, &sock);
|
||||
if e.is_err() {
|
||||
info!(
|
||||
"run_listen timeout, table size: {}",
|
||||
obj.read().unwrap().table.len()
|
||||
Builder::new()
|
||||
.name("solana-listen".to_string())
|
||||
.spawn(move || loop {
|
||||
let e = Self::run_listen(
|
||||
&obj,
|
||||
&window,
|
||||
&blob_recycler,
|
||||
&requests_receiver,
|
||||
&response_sender,
|
||||
);
|
||||
}
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
})
|
||||
if e.is_err() {
|
||||
info!(
|
||||
"run_listen timeout, table size: {}",
|
||||
obj.read().unwrap().table.len()
|
||||
);
|
||||
}
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Sockets {
|
||||
pub gossip: UdpSocket,
|
||||
pub gossip_send: UdpSocket,
|
||||
pub requests: UdpSocket,
|
||||
pub replicate: UdpSocket,
|
||||
pub transaction: UdpSocket,
|
||||
pub respond: UdpSocket,
|
||||
pub broadcast: UdpSocket,
|
||||
}
|
||||
|
||||
pub struct TestNode {
|
||||
pub data: ReplicatedData,
|
||||
pub sockets: Sockets,
|
||||
}
|
||||
|
||||
impl TestNode {
|
||||
pub fn new() -> TestNode {
|
||||
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let requests = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let transaction = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let pubkey = KeyPair::new().pubkey();
|
||||
let data = ReplicatedData::new(
|
||||
pubkey,
|
||||
gossip.local_addr().unwrap(),
|
||||
replicate.local_addr().unwrap(),
|
||||
requests.local_addr().unwrap(),
|
||||
transaction.local_addr().unwrap(),
|
||||
);
|
||||
TestNode {
|
||||
data: data,
|
||||
sockets: Sockets {
|
||||
gossip,
|
||||
gossip_send,
|
||||
requests,
|
||||
replicate,
|
||||
transaction,
|
||||
respond,
|
||||
broadcast,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crdt::{Crdt, ReplicatedData};
|
||||
use logger;
|
||||
use packet::Blob;
|
||||
use rayon::iter::*;
|
||||
use signature::KeyPair;
|
||||
use signature::KeyPairUtil;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{sleep, JoinHandle};
|
||||
use std::time::Duration;
|
||||
use crdt::{parse_port_or_addr, Crdt, ReplicatedData};
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
|
||||
fn test_node() -> (Crdt, UdpSocket, UdpSocket, UdpSocket) {
|
||||
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let transactions = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let pubkey = KeyPair::new().pubkey();
|
||||
let d = ReplicatedData::new(
|
||||
pubkey,
|
||||
gossip.local_addr().unwrap(),
|
||||
replicate.local_addr().unwrap(),
|
||||
serve.local_addr().unwrap(),
|
||||
transactions.local_addr().unwrap(),
|
||||
);
|
||||
let crdt = Crdt::new(d);
|
||||
trace!(
|
||||
"id: {} gossip: {} replicate: {} serve: {}",
|
||||
crdt.my_data().id[0],
|
||||
gossip.local_addr().unwrap(),
|
||||
replicate.local_addr().unwrap(),
|
||||
serve.local_addr().unwrap(),
|
||||
);
|
||||
(crdt, gossip, replicate, serve)
|
||||
}
|
||||
|
||||
/// Test that the network converges.
|
||||
/// Run until every node in the network has a full ReplicatedData set.
|
||||
/// Check that nodes stop sending updates after all the ReplicatedData has been shared.
|
||||
/// tests that actually use this function are below
|
||||
fn run_gossip_topo<F>(topo: F)
|
||||
where
|
||||
F: Fn(&Vec<(Arc<RwLock<Crdt>>, JoinHandle<()>)>) -> (),
|
||||
{
|
||||
let num: usize = 5;
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let listen: Vec<_> = (0..num)
|
||||
.map(|_| {
|
||||
let (crdt, gossip, _, _) = test_node();
|
||||
let c = Arc::new(RwLock::new(crdt));
|
||||
let w = Arc::new(RwLock::new(vec![]));
|
||||
let l = Crdt::listen(c.clone(), w, gossip, exit.clone());
|
||||
(c, l)
|
||||
})
|
||||
.collect();
|
||||
topo(&listen);
|
||||
let gossip: Vec<_> = listen
|
||||
.iter()
|
||||
.map(|&(ref c, _)| Crdt::gossip(c.clone(), exit.clone()))
|
||||
.collect();
|
||||
let mut done = true;
|
||||
for i in 0..(num * 32) {
|
||||
done = false;
|
||||
trace!("round {}", i);
|
||||
for &(ref c, _) in listen.iter() {
|
||||
if num == c.read().unwrap().convergence() as usize {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
//at least 1 node converged
|
||||
if done == true {
|
||||
break;
|
||||
}
|
||||
sleep(Duration::new(1, 0));
|
||||
}
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
for j in gossip {
|
||||
j.join().unwrap();
|
||||
}
|
||||
for (c, j) in listen.into_iter() {
|
||||
j.join().unwrap();
|
||||
// make it clear what failed
|
||||
// protocol is to chatty, updates should stop after everyone receives `num`
|
||||
assert!(c.read().unwrap().update_index <= num as u64);
|
||||
// protocol is not chatty enough, everyone should get `num` entries
|
||||
assert_eq!(c.read().unwrap().table.len(), num);
|
||||
}
|
||||
assert!(done);
|
||||
}
|
||||
/// ring a -> b -> c -> d -> e -> a
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn gossip_ring_test() {
|
||||
logger::setup();
|
||||
run_gossip_topo(|listen| {
|
||||
let num = listen.len();
|
||||
for n in 0..num {
|
||||
let y = n % listen.len();
|
||||
let x = (n + 1) % listen.len();
|
||||
let mut xv = listen[x].0.write().unwrap();
|
||||
let yv = listen[y].0.read().unwrap();
|
||||
let mut d = yv.table[&yv.me].clone();
|
||||
d.version = 0;
|
||||
xv.insert(&d);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// star (b,c,d,e) -> a
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn gossip_star_test() {
|
||||
run_gossip_topo(|listen| {
|
||||
let num = listen.len();
|
||||
for n in 0..(num - 1) {
|
||||
let x = 0;
|
||||
let y = (n + 1) % listen.len();
|
||||
let mut xv = listen[x].0.write().unwrap();
|
||||
let yv = listen[y].0.read().unwrap();
|
||||
let mut d = yv.table[&yv.me].clone();
|
||||
d.version = 0;
|
||||
xv.insert(&d);
|
||||
}
|
||||
});
|
||||
fn test_parse_port_or_addr() {
|
||||
let p1 = parse_port_or_addr(Some("9000".to_string()));
|
||||
assert_eq!(p1.port(), 9000);
|
||||
let p2 = parse_port_or_addr(Some("127.0.0.1:7000".to_string()));
|
||||
assert_eq!(p2.port(), 7000);
|
||||
let p3 = parse_port_or_addr(None);
|
||||
assert_eq!(p3.port(), 8000);
|
||||
}
|
||||
|
||||
/// Test that insert drops messages that are older
|
||||
@ -668,77 +730,59 @@ mod tests {
|
||||
crdt.insert(&d);
|
||||
assert_eq!(crdt.table[&d.id].version, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
pub fn test_crdt_retransmit() {
|
||||
logger::setup();
|
||||
trace!("c1:");
|
||||
let (mut c1, s1, r1, e1) = test_node();
|
||||
trace!("c2:");
|
||||
let (mut c2, s2, r2, _) = test_node();
|
||||
trace!("c3:");
|
||||
let (mut c3, s3, r3, _) = test_node();
|
||||
let c1_id = c1.my_data().id;
|
||||
c1.set_leader(c1_id);
|
||||
|
||||
c2.insert(&c1.my_data());
|
||||
c3.insert(&c1.my_data());
|
||||
|
||||
c2.set_leader(c1.my_data().id);
|
||||
c3.set_leader(c1.my_data().id);
|
||||
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
|
||||
// Create listen threads
|
||||
let win1 = Arc::new(RwLock::new(vec![]));
|
||||
let a1 = Arc::new(RwLock::new(c1));
|
||||
let t1 = Crdt::listen(a1.clone(), win1, s1, exit.clone());
|
||||
|
||||
let a2 = Arc::new(RwLock::new(c2));
|
||||
let win2 = Arc::new(RwLock::new(vec![]));
|
||||
let t2 = Crdt::listen(a2.clone(), win2, s2, exit.clone());
|
||||
|
||||
let a3 = Arc::new(RwLock::new(c3));
|
||||
let win3 = Arc::new(RwLock::new(vec![]));
|
||||
let t3 = Crdt::listen(a3.clone(), win3, s3, exit.clone());
|
||||
|
||||
// Create gossip threads
|
||||
let t1_gossip = Crdt::gossip(a1.clone(), exit.clone());
|
||||
let t2_gossip = Crdt::gossip(a2.clone(), exit.clone());
|
||||
let t3_gossip = Crdt::gossip(a3.clone(), exit.clone());
|
||||
|
||||
//wait to converge
|
||||
trace!("waitng to converge:");
|
||||
let mut done = false;
|
||||
for _ in 0..30 {
|
||||
done = a1.read().unwrap().table.len() == 3 && a2.read().unwrap().table.len() == 3
|
||||
&& a3.read().unwrap().table.len() == 3;
|
||||
if done {
|
||||
break;
|
||||
}
|
||||
sleep(Duration::new(1, 0));
|
||||
}
|
||||
assert!(done);
|
||||
let mut b = Blob::default();
|
||||
b.meta.size = 10;
|
||||
Crdt::retransmit(&a1, &Arc::new(RwLock::new(b)), &e1).unwrap();
|
||||
let res: Vec<_> = [r1, r2, r3]
|
||||
.into_par_iter()
|
||||
.map(|s| {
|
||||
let mut b = Blob::default();
|
||||
s.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
|
||||
let res = s.recv_from(&mut b.data);
|
||||
res.is_err() //true if failed to receive the retransmit packet
|
||||
})
|
||||
.collect();
|
||||
//true if failed receive the retransmit packet, r2, and r3 should succeed
|
||||
//r1 was the sender, so it should fail to receive the packet
|
||||
assert_eq!(res, [true, false, false]);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
let threads = vec![t1, t2, t3, t1_gossip, t2_gossip, t3_gossip];
|
||||
for t in threads.into_iter() {
|
||||
t.join().unwrap();
|
||||
}
|
||||
fn sorted(ls: &Vec<ReplicatedData>) -> Vec<ReplicatedData> {
|
||||
let mut copy: Vec<_> = ls.iter().cloned().collect();
|
||||
copy.sort_by(|x, y| x.id.cmp(&y.id));
|
||||
copy
|
||||
}
|
||||
#[test]
|
||||
fn update_test() {
|
||||
let d1 = ReplicatedData::new(
|
||||
KeyPair::new().pubkey(),
|
||||
"127.0.0.1:1234".parse().unwrap(),
|
||||
"127.0.0.1:1235".parse().unwrap(),
|
||||
"127.0.0.1:1236".parse().unwrap(),
|
||||
"127.0.0.1:1237".parse().unwrap(),
|
||||
);
|
||||
let d2 = ReplicatedData::new(
|
||||
KeyPair::new().pubkey(),
|
||||
"127.0.0.1:1234".parse().unwrap(),
|
||||
"127.0.0.1:1235".parse().unwrap(),
|
||||
"127.0.0.1:1236".parse().unwrap(),
|
||||
"127.0.0.1:1237".parse().unwrap(),
|
||||
);
|
||||
let d3 = ReplicatedData::new(
|
||||
KeyPair::new().pubkey(),
|
||||
"127.0.0.1:1234".parse().unwrap(),
|
||||
"127.0.0.1:1235".parse().unwrap(),
|
||||
"127.0.0.1:1236".parse().unwrap(),
|
||||
"127.0.0.1:1237".parse().unwrap(),
|
||||
);
|
||||
let mut crdt = Crdt::new(d1.clone());
|
||||
let (key, ix, ups) = crdt.get_updates_since(0);
|
||||
assert_eq!(key, d1.id);
|
||||
assert_eq!(ix, 1);
|
||||
assert_eq!(ups.len(), 1);
|
||||
assert_eq!(sorted(&ups), sorted(&vec![d1.clone()]));
|
||||
crdt.insert(&d2);
|
||||
let (key, ix, ups) = crdt.get_updates_since(0);
|
||||
assert_eq!(key, d1.id);
|
||||
assert_eq!(ix, 2);
|
||||
assert_eq!(ups.len(), 2);
|
||||
assert_eq!(sorted(&ups), sorted(&vec![d1.clone(), d2.clone()]));
|
||||
crdt.insert(&d3);
|
||||
let (key, ix, ups) = crdt.get_updates_since(0);
|
||||
assert_eq!(key, d1.id);
|
||||
assert_eq!(ix, 3);
|
||||
assert_eq!(ups.len(), 3);
|
||||
assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3]));
|
||||
let mut crdt2 = Crdt::new(d2.clone());
|
||||
crdt2.apply_updates(key, ix, &ups);
|
||||
assert_eq!(crdt2.table.values().len(), 3);
|
||||
assert_eq!(
|
||||
sorted(&crdt2.table.values().map(|x| x.clone()).collect()),
|
||||
sorted(&crdt.table.values().map(|x| x.clone()).collect())
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
84
src/data_replicator.rs
Normal file
84
src/data_replicator.rs
Normal file
@ -0,0 +1,84 @@
|
||||
use crdt;
|
||||
use packet;
|
||||
use result::Result;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::JoinHandle;
|
||||
use streamer;
|
||||
|
||||
pub struct DataReplicator {
|
||||
pub thread_hdls: Vec<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl DataReplicator {
|
||||
pub fn new(
|
||||
crdt: Arc<RwLock<crdt::Crdt>>,
|
||||
window: Arc<RwLock<Vec<Option<packet::SharedBlob>>>>,
|
||||
gossip_listen_socket: UdpSocket,
|
||||
gossip_send_socket: UdpSocket,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> Result<DataReplicator> {
|
||||
let blob_recycler = packet::BlobRecycler::default();
|
||||
let (request_sender, request_receiver) = channel();
|
||||
trace!(
|
||||
"DataReplicator: id: {:?}, listening on: {:?}",
|
||||
&crdt.read().unwrap().me[..4],
|
||||
gossip_listen_socket.local_addr().unwrap()
|
||||
);
|
||||
let t_receiver = streamer::blob_receiver(
|
||||
exit.clone(),
|
||||
blob_recycler.clone(),
|
||||
gossip_listen_socket,
|
||||
request_sender,
|
||||
)?;
|
||||
let (response_sender, response_receiver) = channel();
|
||||
let t_responder = streamer::responder(
|
||||
gossip_send_socket,
|
||||
exit.clone(),
|
||||
blob_recycler.clone(),
|
||||
response_receiver,
|
||||
);
|
||||
let t_listen = crdt::Crdt::listen(
|
||||
crdt.clone(),
|
||||
window,
|
||||
blob_recycler.clone(),
|
||||
request_receiver,
|
||||
response_sender.clone(),
|
||||
exit.clone(),
|
||||
);
|
||||
let t_gossip = crdt::Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit);
|
||||
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
|
||||
Ok(DataReplicator { thread_hdls })
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crdt::{Crdt, TestNode};
|
||||
use data_replicator::DataReplicator;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
#[test]
|
||||
// test that stage will exit when flag is set
|
||||
fn test_exit() {
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let tn = TestNode::new();
|
||||
let crdt = Crdt::new(tn.data.clone());
|
||||
let c = Arc::new(RwLock::new(crdt));
|
||||
let w = Arc::new(RwLock::new(vec![]));
|
||||
let d = DataReplicator::new(
|
||||
c.clone(),
|
||||
w,
|
||||
tn.sockets.gossip,
|
||||
tn.sockets.gossip_send,
|
||||
exit.clone(),
|
||||
).unwrap();
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
for t in d.thread_hdls {
|
||||
t.join().expect("thread join");
|
||||
}
|
||||
}
|
||||
}
|
@ -3,6 +3,7 @@ pub mod bank;
|
||||
pub mod banking_stage;
|
||||
pub mod budget;
|
||||
pub mod crdt;
|
||||
pub mod data_replicator;
|
||||
pub mod entry;
|
||||
pub mod entry_writer;
|
||||
#[cfg(feature = "erasure")]
|
||||
@ -45,12 +46,11 @@ extern crate ring;
|
||||
extern crate serde;
|
||||
#[macro_use]
|
||||
extern crate serde_derive;
|
||||
extern crate pnet;
|
||||
extern crate serde_json;
|
||||
extern crate sha2;
|
||||
extern crate untrusted;
|
||||
|
||||
extern crate futures;
|
||||
|
||||
#[cfg(test)]
|
||||
#[macro_use]
|
||||
extern crate matches;
|
||||
|
@ -180,10 +180,10 @@ impl Packets {
|
||||
socket.set_nonblocking(false)?;
|
||||
for p in &mut self.packets {
|
||||
p.meta.size = 0;
|
||||
trace!("receiving");
|
||||
trace!("receiving on {}", socket.local_addr().unwrap());
|
||||
match socket.recv_from(&mut p.data) {
|
||||
Err(_) if i > 0 => {
|
||||
debug!("got {:?} messages", i);
|
||||
debug!("got {:?} messages on {}", i, socket.local_addr().unwrap());
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
@ -250,6 +250,7 @@ pub fn to_blob<T: Serialize>(
|
||||
// the raw bytes are being serialized and sent, this isn't the
|
||||
// right interface, and we should create a separate path for
|
||||
// sending request responses in the RPU
|
||||
assert!(len < BLOB_SIZE);
|
||||
b.data[..len].copy_from_slice(&v);
|
||||
b.meta.size = len;
|
||||
b.meta.set_addr(&rsp_addr);
|
||||
@ -283,7 +284,8 @@ impl Blob {
|
||||
self.data[..BLOB_INDEX_END].clone_from_slice(&wtr);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// sender id, we use this for identifying if its a blob from the leader that we should
|
||||
/// retransmit. eventually blobs should have a signature that we can use ffor spam filtering
|
||||
pub fn get_id(&self) -> Result<PublicKey> {
|
||||
let e = deserialize(&self.data[BLOB_INDEX_END..BLOB_ID_END])?;
|
||||
Ok(e)
|
||||
@ -317,9 +319,10 @@ impl Blob {
|
||||
let r = re.allocate();
|
||||
{
|
||||
let mut p = r.write().expect("'r' write lock in pub fn recv_from");
|
||||
trace!("receiving on {}", socket.local_addr().unwrap());
|
||||
match socket.recv_from(&mut p.data) {
|
||||
Err(_) if i > 0 => {
|
||||
trace!("got {:?} messages", i);
|
||||
trace!("got {:?} messages on {}", i, socket.local_addr().unwrap());
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
|
@ -8,8 +8,8 @@
|
||||
use entry::Entry;
|
||||
use hash::Hash;
|
||||
use recorder::Recorder;
|
||||
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
|
||||
use std::thread::{spawn, JoinHandle};
|
||||
use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError};
|
||||
use std::thread::{Builder, JoinHandle};
|
||||
use std::time::{Duration, Instant};
|
||||
use transaction::Transaction;
|
||||
|
||||
@ -27,31 +27,17 @@ pub struct RecordStage {
|
||||
impl RecordStage {
|
||||
/// A background thread that will continue tagging received Event messages and
|
||||
/// sending back Entry messages until either the receiver or sender channel is closed.
|
||||
pub fn new(
|
||||
transaction_receiver: Receiver<Signal>,
|
||||
start_hash: &Hash,
|
||||
tick_duration: Option<Duration>,
|
||||
) -> Self {
|
||||
pub fn new(signal_receiver: Receiver<Signal>, start_hash: &Hash) -> Self {
|
||||
let (entry_sender, entry_receiver) = channel();
|
||||
let start_hash = start_hash.clone();
|
||||
|
||||
let thread_hdl = spawn(move || {
|
||||
let mut recorder = Recorder::new(start_hash);
|
||||
let duration_data = tick_duration.map(|dur| (Instant::now(), dur));
|
||||
loop {
|
||||
if let Err(_) = Self::process_transactions(
|
||||
&mut recorder,
|
||||
duration_data,
|
||||
&transaction_receiver,
|
||||
&entry_sender,
|
||||
) {
|
||||
return;
|
||||
}
|
||||
if duration_data.is_some() {
|
||||
recorder.hash();
|
||||
}
|
||||
}
|
||||
});
|
||||
let thread_hdl = Builder::new()
|
||||
.name("solana-record-stage".to_string())
|
||||
.spawn(move || {
|
||||
let mut recorder = Recorder::new(start_hash);
|
||||
let _ = Self::process_signals(&mut recorder, &signal_receiver, &entry_sender);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
RecordStage {
|
||||
entry_receiver,
|
||||
@ -59,29 +45,81 @@ impl RecordStage {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn process_transactions(
|
||||
/// Same as `RecordStage::new`, but will automatically produce entries every `tick_duration`.
|
||||
pub fn new_with_clock(
|
||||
signal_receiver: Receiver<Signal>,
|
||||
start_hash: &Hash,
|
||||
tick_duration: Duration,
|
||||
) -> Self {
|
||||
let (entry_sender, entry_receiver) = channel();
|
||||
let start_hash = start_hash.clone();
|
||||
|
||||
let thread_hdl = Builder::new()
|
||||
.name("solana-record-stage".to_string())
|
||||
.spawn(move || {
|
||||
let mut recorder = Recorder::new(start_hash);
|
||||
let start_time = Instant::now();
|
||||
loop {
|
||||
if let Err(_) = Self::try_process_signals(
|
||||
&mut recorder,
|
||||
start_time,
|
||||
tick_duration,
|
||||
&signal_receiver,
|
||||
&entry_sender,
|
||||
) {
|
||||
return;
|
||||
}
|
||||
recorder.hash();
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
RecordStage {
|
||||
entry_receiver,
|
||||
thread_hdl,
|
||||
}
|
||||
}
|
||||
|
||||
fn process_signal(
|
||||
signal: Signal,
|
||||
recorder: &mut Recorder,
|
||||
sender: &Sender<Entry>,
|
||||
) -> Result<(), ()> {
|
||||
let txs = if let Signal::Events(txs) = signal {
|
||||
txs
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
let entry = recorder.record(txs);
|
||||
sender.send(entry).map_err(|_| ())
|
||||
}
|
||||
|
||||
fn process_signals(
|
||||
recorder: &mut Recorder,
|
||||
duration_data: Option<(Instant, Duration)>,
|
||||
receiver: &Receiver<Signal>,
|
||||
sender: &Sender<Entry>,
|
||||
) -> Result<(), ()> {
|
||||
loop {
|
||||
if let Some((start_time, tick_duration)) = duration_data {
|
||||
if let Some(entry) = recorder.tick(start_time, tick_duration) {
|
||||
sender.send(entry).or(Err(()))?;
|
||||
}
|
||||
match receiver.recv() {
|
||||
Ok(signal) => Self::process_signal(signal, recorder, sender)?,
|
||||
Err(RecvError) => return Err(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn try_process_signals(
|
||||
recorder: &mut Recorder,
|
||||
start_time: Instant,
|
||||
tick_duration: Duration,
|
||||
receiver: &Receiver<Signal>,
|
||||
sender: &Sender<Entry>,
|
||||
) -> Result<(), ()> {
|
||||
loop {
|
||||
if let Some(entry) = recorder.tick(start_time, tick_duration) {
|
||||
sender.send(entry).or(Err(()))?;
|
||||
}
|
||||
match receiver.try_recv() {
|
||||
Ok(signal) => match signal {
|
||||
Signal::Tick => {
|
||||
let entry = recorder.record(vec![]);
|
||||
sender.send(entry).or(Err(()))?;
|
||||
}
|
||||
Signal::Events(transactions) => {
|
||||
let entry = recorder.record(transactions);
|
||||
sender.send(entry).or(Err(()))?;
|
||||
}
|
||||
},
|
||||
Ok(signal) => Self::process_signal(signal, recorder, sender)?,
|
||||
Err(TryRecvError::Empty) => return Ok(()),
|
||||
Err(TryRecvError::Disconnected) => return Err(()),
|
||||
};
|
||||
@ -101,7 +139,7 @@ mod tests {
|
||||
fn test_historian() {
|
||||
let (tx_sender, tx_receiver) = channel();
|
||||
let zero = Hash::default();
|
||||
let record_stage = RecordStage::new(tx_receiver, &zero, None);
|
||||
let record_stage = RecordStage::new(tx_receiver, &zero);
|
||||
|
||||
tx_sender.send(Signal::Tick).unwrap();
|
||||
sleep(Duration::new(0, 1_000_000));
|
||||
@ -127,7 +165,7 @@ mod tests {
|
||||
fn test_historian_closed_sender() {
|
||||
let (tx_sender, tx_receiver) = channel();
|
||||
let zero = Hash::default();
|
||||
let record_stage = RecordStage::new(tx_receiver, &zero, None);
|
||||
let record_stage = RecordStage::new(tx_receiver, &zero);
|
||||
drop(record_stage.entry_receiver);
|
||||
tx_sender.send(Signal::Tick).unwrap();
|
||||
assert_eq!(record_stage.thread_hdl.join().unwrap(), ());
|
||||
@ -137,7 +175,7 @@ mod tests {
|
||||
fn test_transactions() {
|
||||
let (tx_sender, signal_receiver) = channel();
|
||||
let zero = Hash::default();
|
||||
let record_stage = RecordStage::new(signal_receiver, &zero, None);
|
||||
let record_stage = RecordStage::new(signal_receiver, &zero);
|
||||
let alice_keypair = KeyPair::new();
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero);
|
||||
@ -149,11 +187,11 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_ticking_historian() {
|
||||
fn test_clock() {
|
||||
let (tx_sender, tx_receiver) = channel();
|
||||
let zero = Hash::default();
|
||||
let record_stage = RecordStage::new(tx_receiver, &zero, Some(Duration::from_millis(20)));
|
||||
let record_stage =
|
||||
RecordStage::new_with_clock(tx_receiver, &zero, Duration::from_millis(20));
|
||||
sleep(Duration::from_millis(900));
|
||||
tx_sender.send(Signal::Tick).unwrap();
|
||||
drop(tx_sender);
|
||||
|
@ -6,7 +6,7 @@ use packet;
|
||||
use result::Result;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::thread::{spawn, JoinHandle};
|
||||
use std::thread::{Builder, JoinHandle};
|
||||
use std::time::Duration;
|
||||
use streamer;
|
||||
|
||||
@ -41,12 +41,15 @@ impl ReplicateStage {
|
||||
window_receiver: streamer::BlobReceiver,
|
||||
blob_recycler: packet::BlobRecycler,
|
||||
) -> Self {
|
||||
let thread_hdl = spawn(move || loop {
|
||||
let e = Self::replicate_requests(&bank, &window_receiver, &blob_recycler);
|
||||
if e.is_err() && exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
});
|
||||
let thread_hdl = Builder::new()
|
||||
.name("solana-replicate-stage".to_string())
|
||||
.spawn(move || loop {
|
||||
let e = Self::replicate_requests(&bank, &window_receiver, &blob_recycler);
|
||||
if e.is_err() && exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
ReplicateStage { thread_hdl }
|
||||
}
|
||||
}
|
||||
|
@ -11,7 +11,7 @@ use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::{channel, Receiver};
|
||||
use std::sync::Arc;
|
||||
use std::thread::{spawn, JoinHandle};
|
||||
use std::thread::{Builder, JoinHandle};
|
||||
use std::time::Instant;
|
||||
use streamer;
|
||||
use timing;
|
||||
@ -90,20 +90,23 @@ impl RequestStage {
|
||||
let request_processor = Arc::new(request_processor);
|
||||
let request_processor_ = request_processor.clone();
|
||||
let (blob_sender, blob_receiver) = channel();
|
||||
let thread_hdl = spawn(move || loop {
|
||||
let e = Self::process_request_packets(
|
||||
&request_processor_,
|
||||
&packet_receiver,
|
||||
&blob_sender,
|
||||
&packet_recycler,
|
||||
&blob_recycler,
|
||||
);
|
||||
if e.is_err() {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
let thread_hdl = Builder::new()
|
||||
.name("solana-request-stage".to_string())
|
||||
.spawn(move || loop {
|
||||
let e = Self::process_request_packets(
|
||||
&request_processor_,
|
||||
&packet_receiver,
|
||||
&blob_sender,
|
||||
&packet_recycler,
|
||||
&blob_recycler,
|
||||
);
|
||||
if e.is_err() {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
})
|
||||
.unwrap();
|
||||
RequestStage {
|
||||
thread_hdl,
|
||||
blob_receiver,
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
use bank::Bank;
|
||||
use crdt::{Crdt, ReplicatedData};
|
||||
use hash::Hash;
|
||||
use data_replicator::DataReplicator;
|
||||
use packet;
|
||||
use rpu::Rpu;
|
||||
use std::io::Write;
|
||||
@ -22,7 +22,6 @@ pub struct Server {
|
||||
impl Server {
|
||||
pub fn new_leader<W: Write + Send + 'static>(
|
||||
bank: Bank,
|
||||
start_hash: Hash,
|
||||
tick_duration: Option<Duration>,
|
||||
me: ReplicatedData,
|
||||
requests_socket: UdpSocket,
|
||||
@ -41,7 +40,6 @@ impl Server {
|
||||
let blob_recycler = packet::BlobRecycler::default();
|
||||
let tpu = Tpu::new(
|
||||
bank.clone(),
|
||||
start_hash,
|
||||
tick_duration,
|
||||
transactions_socket,
|
||||
blob_recycler.clone(),
|
||||
@ -51,19 +49,26 @@ impl Server {
|
||||
thread_hdls.extend(tpu.thread_hdls);
|
||||
|
||||
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
|
||||
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
|
||||
let window = streamer::default_window();
|
||||
let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip_socket, exit.clone());
|
||||
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
|
||||
let data_replicator = DataReplicator::new(
|
||||
crdt.clone(),
|
||||
window.clone(),
|
||||
gossip_socket,
|
||||
gossip_send_socket,
|
||||
exit.clone(),
|
||||
).expect("DataReplicator::new");
|
||||
thread_hdls.extend(data_replicator.thread_hdls);
|
||||
|
||||
let t_broadcast = streamer::broadcaster(
|
||||
broadcast_socket,
|
||||
exit.clone(),
|
||||
crdt.clone(),
|
||||
crdt,
|
||||
window,
|
||||
blob_recycler.clone(),
|
||||
tpu.blob_receiver,
|
||||
);
|
||||
thread_hdls.extend(vec![t_gossip, t_listen, t_broadcast]);
|
||||
thread_hdls.extend(vec![t_broadcast]);
|
||||
|
||||
Server { thread_hdls }
|
||||
}
|
||||
|
254
src/streamer.rs
254
src/streamer.rs
@ -10,7 +10,7 @@ use std::net::{SocketAddr, UdpSocket};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{spawn, JoinHandle};
|
||||
use std::thread::{Builder, JoinHandle};
|
||||
use std::time::Duration;
|
||||
|
||||
pub const WINDOW_SIZE: usize = 2 * 1024;
|
||||
@ -58,10 +58,13 @@ pub fn receiver(
|
||||
if res.is_err() {
|
||||
panic!("streamer::receiver set_read_timeout error");
|
||||
}
|
||||
spawn(move || {
|
||||
let _ = recv_loop(&sock, &exit, &recycler, &packet_sender);
|
||||
()
|
||||
})
|
||||
Builder::new()
|
||||
.name("solana-receiver".to_string())
|
||||
.spawn(move || {
|
||||
let _ = recv_loop(&sock, &exit, &recycler, &packet_sender);
|
||||
()
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> {
|
||||
@ -96,16 +99,20 @@ pub fn responder(
|
||||
recycler: BlobRecycler,
|
||||
r: BlobReceiver,
|
||||
) -> JoinHandle<()> {
|
||||
spawn(move || loop {
|
||||
if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
})
|
||||
Builder::new()
|
||||
.name("solana-responder".to_string())
|
||||
.spawn(move || loop {
|
||||
if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
//TODO, we would need to stick block authentication before we create the
|
||||
//window.
|
||||
fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> {
|
||||
trace!("receiving on {}", sock.local_addr().unwrap());
|
||||
let dq = Blob::recv_from(recycler, sock)?;
|
||||
if !dq.is_empty() {
|
||||
s.send(dq)?;
|
||||
@ -123,12 +130,15 @@ pub fn blob_receiver(
|
||||
//1 second timeout on socket read
|
||||
let timer = Duration::new(1, 0);
|
||||
sock.set_read_timeout(Some(timer))?;
|
||||
let t = spawn(move || loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
let _ = recv_blobs(&recycler, &sock, &s);
|
||||
});
|
||||
let t = Builder::new()
|
||||
.name("solana-blob_receiver".to_string())
|
||||
.spawn(move || loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
let _ = recv_blobs(&recycler, &sock, &s);
|
||||
})
|
||||
.unwrap();
|
||||
Ok(t)
|
||||
}
|
||||
|
||||
@ -319,35 +329,38 @@ pub fn window(
|
||||
s: BlobSender,
|
||||
retransmit: BlobSender,
|
||||
) -> JoinHandle<()> {
|
||||
spawn(move || {
|
||||
let mut consumed = 0;
|
||||
let mut received = 0;
|
||||
let mut last = 0;
|
||||
let mut times = 0;
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
Builder::new()
|
||||
.name("solana-window".to_string())
|
||||
.spawn(move || {
|
||||
let mut consumed = 0;
|
||||
let mut received = 0;
|
||||
let mut last = 0;
|
||||
let mut times = 0;
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
let _ = recv_window(
|
||||
&window,
|
||||
&crdt,
|
||||
&recycler,
|
||||
&mut consumed,
|
||||
&mut received,
|
||||
&r,
|
||||
&s,
|
||||
&retransmit,
|
||||
);
|
||||
let _ = repair_window(
|
||||
&window,
|
||||
&crdt,
|
||||
&mut last,
|
||||
&mut times,
|
||||
&mut consumed,
|
||||
&mut received,
|
||||
);
|
||||
}
|
||||
let _ = recv_window(
|
||||
&window,
|
||||
&crdt,
|
||||
&recycler,
|
||||
&mut consumed,
|
||||
&mut received,
|
||||
&r,
|
||||
&s,
|
||||
&retransmit,
|
||||
);
|
||||
let _ = repair_window(
|
||||
&window,
|
||||
&crdt,
|
||||
&mut last,
|
||||
&mut times,
|
||||
&mut consumed,
|
||||
&mut received,
|
||||
);
|
||||
}
|
||||
})
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn broadcast(
|
||||
@ -414,15 +427,18 @@ pub fn broadcaster(
|
||||
recycler: BlobRecycler,
|
||||
r: BlobReceiver,
|
||||
) -> JoinHandle<()> {
|
||||
spawn(move || {
|
||||
let mut transmit_index = 0;
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
Builder::new()
|
||||
.name("solana-broadcaster".to_string())
|
||||
.spawn(move || {
|
||||
let mut transmit_index = 0;
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
let _ = broadcast(&crdt, &window, &recycler, &r, &sock, &mut transmit_index);
|
||||
}
|
||||
let _ = broadcast(&crdt, &window, &recycler, &r, &sock, &mut transmit_index);
|
||||
}
|
||||
})
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn retransmit(
|
||||
@ -462,17 +478,20 @@ pub fn retransmitter(
|
||||
recycler: BlobRecycler,
|
||||
r: BlobReceiver,
|
||||
) -> JoinHandle<()> {
|
||||
spawn(move || {
|
||||
trace!("retransmitter started");
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
Builder::new()
|
||||
.name("solana-retransmitter".to_string())
|
||||
.spawn(move || {
|
||||
trace!("retransmitter started");
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
// TODO: handle this error
|
||||
let _ = retransmit(&crdt, &recycler, &r, &sock);
|
||||
}
|
||||
// TODO: handle this error
|
||||
let _ = retransmit(&crdt, &recycler, &r, &sock);
|
||||
}
|
||||
trace!("exiting retransmitter");
|
||||
})
|
||||
trace!("exiting retransmitter");
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "unstable", test))]
|
||||
@ -584,7 +603,6 @@ mod bench {
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crdt::{Crdt, ReplicatedData};
|
||||
use logger;
|
||||
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
|
||||
use signature::KeyPair;
|
||||
use signature::KeyPairUtil;
|
||||
@ -595,9 +613,8 @@ mod test {
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
use streamer::{blob_receiver, receiver, responder, retransmitter, window};
|
||||
use streamer::{blob_receiver, receiver, responder, window};
|
||||
use streamer::{default_window, BlobReceiver, PacketReceiver};
|
||||
|
||||
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
||||
@ -735,111 +752,4 @@ mod test {
|
||||
t_responder.join().expect("join");
|
||||
t_window.join().expect("join");
|
||||
}
|
||||
|
||||
fn test_node() -> (Arc<RwLock<Crdt>>, UdpSocket, UdpSocket, UdpSocket) {
|
||||
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let serve = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let transaction = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let pubkey = KeyPair::new().pubkey();
|
||||
let d = ReplicatedData::new(
|
||||
pubkey,
|
||||
gossip.local_addr().unwrap(),
|
||||
replicate.local_addr().unwrap(),
|
||||
serve.local_addr().unwrap(),
|
||||
transaction.local_addr().unwrap(),
|
||||
);
|
||||
trace!("data: {:?}", d);
|
||||
let crdt = Crdt::new(d);
|
||||
(Arc::new(RwLock::new(crdt)), gossip, replicate, serve)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
//retransmit from leader to replicate target
|
||||
pub fn retransmit() {
|
||||
logger::setup();
|
||||
trace!("retransmit test start");
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let (crdt_leader, sock_gossip_leader, _, sock_leader) = test_node();
|
||||
let (crdt_target, sock_gossip_target, sock_replicate_target, _) = test_node();
|
||||
let leader_data = crdt_leader.read().unwrap().my_data().clone();
|
||||
crdt_leader.write().unwrap().insert(&leader_data);
|
||||
crdt_leader.write().unwrap().set_leader(leader_data.id);
|
||||
let t_crdt_leader_g = Crdt::gossip(crdt_leader.clone(), exit.clone());
|
||||
let window_leader = Arc::new(RwLock::new(vec![]));
|
||||
let t_crdt_leader_l = Crdt::listen(
|
||||
crdt_leader.clone(),
|
||||
window_leader,
|
||||
sock_gossip_leader,
|
||||
exit.clone(),
|
||||
);
|
||||
|
||||
crdt_target.write().unwrap().insert(&leader_data);
|
||||
crdt_target.write().unwrap().set_leader(leader_data.id);
|
||||
let t_crdt_target_g = Crdt::gossip(crdt_target.clone(), exit.clone());
|
||||
let window_target = Arc::new(RwLock::new(vec![]));
|
||||
let t_crdt_target_l = Crdt::listen(
|
||||
crdt_target.clone(),
|
||||
window_target,
|
||||
sock_gossip_target,
|
||||
exit.clone(),
|
||||
);
|
||||
//leader retransmitter
|
||||
let (s_retransmit, r_retransmit) = channel();
|
||||
let blob_recycler = BlobRecycler::default();
|
||||
let saddr = sock_leader.local_addr().unwrap();
|
||||
let t_retransmit = retransmitter(
|
||||
sock_leader,
|
||||
exit.clone(),
|
||||
crdt_leader.clone(),
|
||||
blob_recycler.clone(),
|
||||
r_retransmit,
|
||||
);
|
||||
|
||||
//target receiver
|
||||
let (s_blob_receiver, r_blob_receiver) = channel();
|
||||
let t_receiver = blob_receiver(
|
||||
exit.clone(),
|
||||
blob_recycler.clone(),
|
||||
sock_replicate_target,
|
||||
s_blob_receiver,
|
||||
).unwrap();
|
||||
for _ in 0..10 {
|
||||
let done = crdt_target.read().unwrap().update_index == 2
|
||||
&& crdt_leader.read().unwrap().update_index == 2;
|
||||
if done {
|
||||
break;
|
||||
}
|
||||
let timer = Duration::new(1, 0);
|
||||
sleep(timer);
|
||||
}
|
||||
|
||||
//send the data through
|
||||
let mut bq = VecDeque::new();
|
||||
let b = blob_recycler.allocate();
|
||||
b.write().unwrap().meta.size = 10;
|
||||
bq.push_back(b);
|
||||
s_retransmit.send(bq).unwrap();
|
||||
let timer = Duration::new(5, 0);
|
||||
trace!("Waiting for timeout");
|
||||
let mut oq = r_blob_receiver.recv_timeout(timer).unwrap();
|
||||
assert_eq!(oq.len(), 1);
|
||||
let o = oq.pop_front().unwrap();
|
||||
let ro = o.read().unwrap();
|
||||
assert_eq!(ro.meta.size, 10);
|
||||
assert_eq!(ro.meta.addr(), saddr);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
let threads = vec![
|
||||
t_receiver,
|
||||
t_retransmit,
|
||||
t_crdt_target_g,
|
||||
t_crdt_target_l,
|
||||
t_crdt_leader_g,
|
||||
t_crdt_leader_l,
|
||||
];
|
||||
for t in threads {
|
||||
t.join().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -4,7 +4,6 @@
|
||||
//! unstable and may change in future releases.
|
||||
|
||||
use bincode::{deserialize, serialize};
|
||||
use futures::future::{ok, FutureResult};
|
||||
use hash::Hash;
|
||||
use request::{Request, Response};
|
||||
use signature::{KeyPair, PublicKey, Signature};
|
||||
@ -138,7 +137,7 @@ impl ThinClient {
|
||||
|
||||
/// Request the last Entry ID from the server. This method blocks
|
||||
/// until the server sends a response.
|
||||
pub fn get_last_id(&mut self) -> FutureResult<Hash, ()> {
|
||||
pub fn get_last_id(&mut self) -> Hash {
|
||||
info!("get_last_id");
|
||||
let req = Request::GetLastId;
|
||||
let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id");
|
||||
@ -153,7 +152,7 @@ impl ThinClient {
|
||||
}
|
||||
self.process_response(resp);
|
||||
}
|
||||
ok(self.last_id.expect("some last_id"))
|
||||
self.last_id.expect("some last_id")
|
||||
}
|
||||
|
||||
pub fn poll_get_balance(&mut self, pubkey: &PublicKey) -> io::Result<i64> {
|
||||
@ -177,7 +176,7 @@ mod tests {
|
||||
use super::*;
|
||||
use bank::Bank;
|
||||
use budget::Budget;
|
||||
use futures::Future;
|
||||
use crdt::TestNode;
|
||||
use logger;
|
||||
use mint::Mint;
|
||||
use server::Server;
|
||||
@ -188,7 +187,6 @@ mod tests {
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
use transaction::{Instruction, Plan};
|
||||
use tvu::TestNode;
|
||||
|
||||
#[test]
|
||||
fn test_thin_client() {
|
||||
@ -202,7 +200,6 @@ mod tests {
|
||||
|
||||
let server = Server::new_leader(
|
||||
bank,
|
||||
alice.last_id(),
|
||||
Some(Duration::from_millis(30)),
|
||||
leader.data.clone(),
|
||||
leader.sockets.requests,
|
||||
@ -224,7 +221,7 @@ mod tests {
|
||||
leader.data.transactions_addr,
|
||||
transactions_socket,
|
||||
);
|
||||
let last_id = client.get_last_id().wait().unwrap();
|
||||
let last_id = client.get_last_id();
|
||||
let _sig = client
|
||||
.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
|
||||
.unwrap();
|
||||
@ -247,7 +244,6 @@ mod tests {
|
||||
|
||||
let server = Server::new_leader(
|
||||
bank,
|
||||
alice.last_id(),
|
||||
Some(Duration::from_millis(30)),
|
||||
leader.data.clone(),
|
||||
leader.sockets.requests,
|
||||
@ -271,13 +267,13 @@ mod tests {
|
||||
leader.data.transactions_addr,
|
||||
transactions_socket,
|
||||
);
|
||||
let last_id = client.get_last_id().wait().unwrap();
|
||||
let last_id = client.get_last_id();
|
||||
|
||||
let tx = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id);
|
||||
|
||||
let _sig = client.transfer_signed(tx).unwrap();
|
||||
|
||||
let last_id = client.get_last_id().wait().unwrap();
|
||||
let last_id = client.get_last_id();
|
||||
|
||||
let mut tr2 = Transaction::new(&alice.keypair(), bob_pubkey, 501, last_id);
|
||||
if let Instruction::NewContract(contract) = &mut tr2.instruction {
|
||||
|
13
src/tpu.rs
13
src/tpu.rs
@ -4,7 +4,6 @@
|
||||
use bank::Bank;
|
||||
use banking_stage::BankingStage;
|
||||
use fetch_stage::FetchStage;
|
||||
use hash::Hash;
|
||||
use packet::{BlobRecycler, PacketRecycler};
|
||||
use record_stage::RecordStage;
|
||||
use sigverify_stage::SigVerifyStage;
|
||||
@ -25,7 +24,6 @@ pub struct Tpu {
|
||||
impl Tpu {
|
||||
pub fn new<W: Write + Send + 'static>(
|
||||
bank: Arc<Bank>,
|
||||
start_hash: Hash,
|
||||
tick_duration: Option<Duration>,
|
||||
transactions_socket: UdpSocket,
|
||||
blob_recycler: BlobRecycler,
|
||||
@ -46,8 +44,14 @@ impl Tpu {
|
||||
packet_recycler.clone(),
|
||||
);
|
||||
|
||||
let record_stage =
|
||||
RecordStage::new(banking_stage.signal_receiver, &start_hash, tick_duration);
|
||||
let record_stage = match tick_duration {
|
||||
Some(tick_duration) => RecordStage::new_with_clock(
|
||||
banking_stage.signal_receiver,
|
||||
&bank.last_id(),
|
||||
tick_duration,
|
||||
),
|
||||
None => RecordStage::new(banking_stage.signal_receiver, &bank.last_id()),
|
||||
};
|
||||
|
||||
let write_stage = WriteStage::new(
|
||||
bank.clone(),
|
||||
@ -56,7 +60,6 @@ impl Tpu {
|
||||
Mutex::new(writer),
|
||||
record_stage.entry_receiver,
|
||||
);
|
||||
|
||||
let mut thread_hdls = vec![
|
||||
fetch_stage.thread_hdl,
|
||||
banking_stage.thread_hdl,
|
||||
|
@ -149,6 +149,7 @@ impl Transaction {
|
||||
}
|
||||
|
||||
pub fn verify_sig(&self) -> bool {
|
||||
warn!("transaction signature verification called");
|
||||
self.sig.verify(&self.from, &self.get_sign_data())
|
||||
}
|
||||
|
||||
|
120
src/tvu.rs
120
src/tvu.rs
@ -22,9 +22,9 @@
|
||||
|
||||
use bank::Bank;
|
||||
use crdt::{Crdt, ReplicatedData};
|
||||
use data_replicator::DataReplicator;
|
||||
use packet;
|
||||
use replicate_stage::ReplicateStage;
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::mpsc::channel;
|
||||
@ -49,7 +49,7 @@ impl Tvu {
|
||||
pub fn new(
|
||||
bank: Arc<Bank>,
|
||||
me: ReplicatedData,
|
||||
gossip: UdpSocket,
|
||||
gossip_listen_socket: UdpSocket,
|
||||
replicate: UdpSocket,
|
||||
leader: ReplicatedData,
|
||||
exit: Arc<AtomicBool>,
|
||||
@ -62,9 +62,15 @@ impl Tvu {
|
||||
crdt.write()
|
||||
.expect("'crdt' write lock before insert() in pub fn replicate")
|
||||
.insert(&leader);
|
||||
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
|
||||
let window = streamer::default_window();
|
||||
let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone());
|
||||
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
|
||||
let data_replicator = DataReplicator::new(
|
||||
crdt.clone(),
|
||||
window.clone(),
|
||||
gossip_listen_socket,
|
||||
gossip_send_socket,
|
||||
exit.clone(),
|
||||
).expect("DataReplicator::new");
|
||||
|
||||
// TODO pull this socket out through the public interface
|
||||
// make sure we are on the same interface
|
||||
@ -111,108 +117,52 @@ impl Tvu {
|
||||
blob_recycler.clone(),
|
||||
);
|
||||
|
||||
let threads = vec![
|
||||
let mut threads = vec![
|
||||
//replicate threads
|
||||
t_blob_receiver,
|
||||
t_retransmit,
|
||||
t_window,
|
||||
replicate_stage.thread_hdl,
|
||||
t_gossip,
|
||||
t_listen,
|
||||
];
|
||||
threads.extend(data_replicator.thread_hdls.into_iter());
|
||||
Tvu {
|
||||
thread_hdls: threads,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Sockets {
|
||||
pub gossip: UdpSocket,
|
||||
pub requests: UdpSocket,
|
||||
pub replicate: UdpSocket,
|
||||
pub transaction: UdpSocket,
|
||||
pub respond: UdpSocket,
|
||||
pub broadcast: UdpSocket,
|
||||
}
|
||||
|
||||
pub struct TestNode {
|
||||
pub data: ReplicatedData,
|
||||
pub sockets: Sockets,
|
||||
}
|
||||
|
||||
impl TestNode {
|
||||
pub fn new() -> TestNode {
|
||||
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let requests = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let transaction = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let pubkey = KeyPair::new().pubkey();
|
||||
let data = ReplicatedData::new(
|
||||
pubkey,
|
||||
gossip.local_addr().unwrap(),
|
||||
replicate.local_addr().unwrap(),
|
||||
requests.local_addr().unwrap(),
|
||||
transaction.local_addr().unwrap(),
|
||||
);
|
||||
TestNode {
|
||||
data: data,
|
||||
sockets: Sockets {
|
||||
gossip,
|
||||
requests,
|
||||
replicate,
|
||||
transaction,
|
||||
respond,
|
||||
broadcast,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
use std::time::Duration;
|
||||
|
||||
let transactions_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
requests_socket
|
||||
.set_read_timeout(Some(Duration::new(1, 0)))
|
||||
.unwrap();
|
||||
let pubkey = KeyPair::new().pubkey();
|
||||
let d = ReplicatedData::new(
|
||||
pubkey,
|
||||
gossip.local_addr().unwrap(),
|
||||
replicate.local_addr().unwrap(),
|
||||
requests_socket.local_addr().unwrap(),
|
||||
transactions_socket.local_addr().unwrap(),
|
||||
);
|
||||
(d, gossip, replicate, requests_socket, transactions_socket)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use bank::Bank;
|
||||
use bincode::serialize;
|
||||
use crdt::Crdt;
|
||||
use crdt::{Crdt, TestNode};
|
||||
use data_replicator::DataReplicator;
|
||||
use entry::Entry;
|
||||
use hash::{hash, Hash};
|
||||
use logger;
|
||||
use mint::Mint;
|
||||
use packet::BlobRecycler;
|
||||
use result::Result;
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
use std::collections::VecDeque;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
use streamer;
|
||||
use transaction::Transaction;
|
||||
use tvu::{TestNode, Tvu};
|
||||
use tvu::Tvu;
|
||||
|
||||
fn new_replicator(
|
||||
crdt: Arc<RwLock<Crdt>>,
|
||||
listen: UdpSocket,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> Result<DataReplicator> {
|
||||
let window = streamer::default_window();
|
||||
let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
|
||||
DataReplicator::new(crdt, window, listen, send_sock, exit)
|
||||
}
|
||||
/// Test that message sent from leader to target1 and replicated to target2
|
||||
#[test]
|
||||
fn test_replicate() {
|
||||
@ -227,9 +177,7 @@ pub mod tests {
|
||||
crdt_l.set_leader(leader.data.id);
|
||||
|
||||
let cref_l = Arc::new(RwLock::new(crdt_l));
|
||||
let t_l_gossip = Crdt::gossip(cref_l.clone(), exit.clone());
|
||||
let window1 = streamer::default_window();
|
||||
let t_l_listen = Crdt::listen(cref_l, window1, leader.sockets.gossip, exit.clone());
|
||||
let dr_l = new_replicator(cref_l, leader.sockets.gossip, exit.clone()).unwrap();
|
||||
|
||||
//start crdt2
|
||||
let mut crdt2 = Crdt::new(target2.data.clone());
|
||||
@ -237,9 +185,7 @@ pub mod tests {
|
||||
crdt2.set_leader(leader.data.id);
|
||||
let leader_id = leader.data.id;
|
||||
let cref2 = Arc::new(RwLock::new(crdt2));
|
||||
let t2_gossip = Crdt::gossip(cref2.clone(), exit.clone());
|
||||
let window2 = streamer::default_window();
|
||||
let t2_listen = Crdt::listen(cref2, window2, target2.sockets.gossip, exit.clone());
|
||||
let dr_2 = new_replicator(cref2, target2.sockets.gossip, exit.clone()).unwrap();
|
||||
|
||||
// setup some blob services to send blobs into the socket
|
||||
// to simulate the source peer and get blobs out of the socket to
|
||||
@ -337,11 +283,13 @@ pub mod tests {
|
||||
for t in tvu.thread_hdls {
|
||||
t.join().expect("join");
|
||||
}
|
||||
t2_gossip.join().expect("join");
|
||||
t2_listen.join().expect("join");
|
||||
for t in dr_l.thread_hdls {
|
||||
t.join().expect("join");
|
||||
}
|
||||
for t in dr_2.thread_hdls {
|
||||
t.join().expect("join");
|
||||
}
|
||||
t_receiver.join().expect("join");
|
||||
t_responder.join().expect("join");
|
||||
t_l_gossip.join().expect("join");
|
||||
t_l_listen.join().expect("join");
|
||||
}
|
||||
}
|
||||
|
@ -8,7 +8,7 @@ use std::io::Write;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::{channel, Receiver};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread::{spawn, JoinHandle};
|
||||
use std::thread::{Builder, JoinHandle};
|
||||
use streamer;
|
||||
|
||||
pub struct WriteStage {
|
||||
@ -26,19 +26,22 @@ impl WriteStage {
|
||||
entry_receiver: Receiver<Entry>,
|
||||
) -> Self {
|
||||
let (blob_sender, blob_receiver) = channel();
|
||||
let thread_hdl = spawn(move || loop {
|
||||
let entry_writer = EntryWriter::new(&bank);
|
||||
let _ = entry_writer.write_and_send_entries(
|
||||
&blob_sender,
|
||||
&blob_recycler,
|
||||
&writer,
|
||||
&entry_receiver,
|
||||
);
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
info!("broadcat_service exiting");
|
||||
break;
|
||||
}
|
||||
});
|
||||
let thread_hdl = Builder::new()
|
||||
.name("solana-writer".to_string())
|
||||
.spawn(move || loop {
|
||||
let entry_writer = EntryWriter::new(&bank);
|
||||
let _ = entry_writer.write_and_send_entries(
|
||||
&blob_sender,
|
||||
&blob_recycler,
|
||||
&writer,
|
||||
&entry_receiver,
|
||||
);
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
info!("broadcat_service exiting");
|
||||
break;
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
WriteStage {
|
||||
thread_hdl,
|
||||
@ -52,16 +55,19 @@ impl WriteStage {
|
||||
entry_receiver: Receiver<Entry>,
|
||||
) -> Self {
|
||||
let (_blob_sender, blob_receiver) = channel();
|
||||
let thread_hdl = spawn(move || {
|
||||
let entry_writer = EntryWriter::new(&bank);
|
||||
loop {
|
||||
let _ = entry_writer.drain_entries(&entry_receiver);
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
info!("drain_service exiting");
|
||||
break;
|
||||
let thread_hdl = Builder::new()
|
||||
.name("solana-drain".to_string())
|
||||
.spawn(move || {
|
||||
let entry_writer = EntryWriter::new(&bank);
|
||||
loop {
|
||||
let _ = entry_writer.drain_entries(&entry_receiver);
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
info!("drain_service exiting");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
WriteStage {
|
||||
thread_hdl,
|
||||
|
184
tests/data_replicator.rs
Normal file
184
tests/data_replicator.rs
Normal file
@ -0,0 +1,184 @@
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
extern crate rayon;
|
||||
extern crate solana;
|
||||
|
||||
use rayon::iter::*;
|
||||
use solana::crdt::{Crdt, TestNode};
|
||||
use solana::data_replicator::DataReplicator;
|
||||
use solana::logger;
|
||||
use solana::packet::Blob;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
||||
fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, DataReplicator, UdpSocket) {
|
||||
let tn = TestNode::new();
|
||||
let crdt = Crdt::new(tn.data.clone());
|
||||
let c = Arc::new(RwLock::new(crdt));
|
||||
let w = Arc::new(RwLock::new(vec![]));
|
||||
let d = DataReplicator::new(
|
||||
c.clone(),
|
||||
w,
|
||||
tn.sockets.gossip,
|
||||
tn.sockets.gossip_send,
|
||||
exit,
|
||||
).unwrap();
|
||||
(c, d, tn.sockets.replicate)
|
||||
}
|
||||
|
||||
/// Test that the network converges.
|
||||
/// Run until every node in the network has a full ReplicatedData set.
|
||||
/// Check that nodes stop sending updates after all the ReplicatedData has been shared.
|
||||
/// tests that actually use this function are below
|
||||
fn run_gossip_topo<F>(topo: F)
|
||||
where
|
||||
F: Fn(&Vec<(Arc<RwLock<Crdt>>, DataReplicator, UdpSocket)>) -> (),
|
||||
{
|
||||
let num: usize = 5;
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let listen: Vec<_> = (0..num).map(|_| test_node(exit.clone())).collect();
|
||||
topo(&listen);
|
||||
let mut done = true;
|
||||
for i in 0..(num * 32) {
|
||||
done = false;
|
||||
trace!("round {}", i);
|
||||
for &(ref c, _, _) in listen.iter() {
|
||||
if num == c.read().unwrap().convergence() as usize {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
//at least 1 node converged
|
||||
if done == true {
|
||||
break;
|
||||
}
|
||||
sleep(Duration::new(1, 0));
|
||||
}
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
for (c, dr, _) in listen.into_iter() {
|
||||
for j in dr.thread_hdls.into_iter() {
|
||||
j.join().unwrap();
|
||||
}
|
||||
// make it clear what failed
|
||||
// protocol is to chatty, updates should stop after everyone receives `num`
|
||||
assert!(c.read().unwrap().update_index <= num as u64);
|
||||
// protocol is not chatty enough, everyone should get `num` entries
|
||||
assert_eq!(c.read().unwrap().table.len(), num);
|
||||
}
|
||||
assert!(done);
|
||||
}
|
||||
/// ring a -> b -> c -> d -> e -> a
|
||||
#[test]
|
||||
fn gossip_ring() {
|
||||
logger::setup();
|
||||
run_gossip_topo(|listen| {
|
||||
let num = listen.len();
|
||||
for n in 0..num {
|
||||
let y = n % listen.len();
|
||||
let x = (n + 1) % listen.len();
|
||||
let mut xv = listen[x].0.write().unwrap();
|
||||
let yv = listen[y].0.read().unwrap();
|
||||
let mut d = yv.table[&yv.me].clone();
|
||||
d.version = 0;
|
||||
xv.insert(&d);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// star a -> (b,c,d,e)
|
||||
#[test]
|
||||
fn gossip_star() {
|
||||
logger::setup();
|
||||
run_gossip_topo(|listen| {
|
||||
let num = listen.len();
|
||||
for n in 0..(num - 1) {
|
||||
let x = 0;
|
||||
let y = (n + 1) % listen.len();
|
||||
let mut xv = listen[x].0.write().unwrap();
|
||||
let yv = listen[y].0.read().unwrap();
|
||||
let mut yd = yv.table[&yv.me].clone();
|
||||
yd.version = 0;
|
||||
xv.insert(&yd);
|
||||
trace!("star leader {:?}", &xv.me[..4]);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// rstar a <- (b,c,d,e)
|
||||
#[test]
|
||||
fn gossip_rstar() {
|
||||
logger::setup();
|
||||
run_gossip_topo(|listen| {
|
||||
let num = listen.len();
|
||||
let xd = {
|
||||
let xv = listen[0].0.read().unwrap();
|
||||
xv.table[&xv.me].clone()
|
||||
};
|
||||
trace!("rstar leader {:?}", &xd.id[..4]);
|
||||
for n in 0..(num - 1) {
|
||||
let y = (n + 1) % listen.len();
|
||||
let mut yv = listen[y].0.write().unwrap();
|
||||
yv.insert(&xd);
|
||||
trace!("rstar insert {:?} into {:?}", &xd.id[..4], &yv.me[..4]);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn crdt_retransmit() {
|
||||
logger::setup();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
trace!("c1:");
|
||||
let (c1, dr1, tn1) = test_node(exit.clone());
|
||||
trace!("c2:");
|
||||
let (c2, dr2, tn2) = test_node(exit.clone());
|
||||
trace!("c3:");
|
||||
let (c3, dr3, tn3) = test_node(exit.clone());
|
||||
let c1_data = c1.read().unwrap().my_data().clone();
|
||||
c1.write().unwrap().set_leader(c1_data.id);
|
||||
|
||||
c2.write().unwrap().insert(&c1_data);
|
||||
c3.write().unwrap().insert(&c1_data);
|
||||
|
||||
c2.write().unwrap().set_leader(c1_data.id);
|
||||
c3.write().unwrap().set_leader(c1_data.id);
|
||||
|
||||
//wait to converge
|
||||
trace!("waiting to converge:");
|
||||
let mut done = false;
|
||||
for _ in 0..30 {
|
||||
done = c1.read().unwrap().table.len() == 3 && c2.read().unwrap().table.len() == 3
|
||||
&& c3.read().unwrap().table.len() == 3;
|
||||
if done {
|
||||
break;
|
||||
}
|
||||
sleep(Duration::new(1, 0));
|
||||
}
|
||||
assert!(done);
|
||||
let mut b = Blob::default();
|
||||
b.meta.size = 10;
|
||||
Crdt::retransmit(&c1, &Arc::new(RwLock::new(b)), &tn1).unwrap();
|
||||
let res: Vec<_> = [tn1, tn2, tn3]
|
||||
.into_par_iter()
|
||||
.map(|s| {
|
||||
let mut b = Blob::default();
|
||||
s.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
|
||||
let res = s.recv_from(&mut b.data);
|
||||
res.is_err() //true if failed to receive the retransmit packet
|
||||
})
|
||||
.collect();
|
||||
//true if failed receive the retransmit packet, r2, and r3 should succeed
|
||||
//r1 was the sender, so it should fail to receive the packet
|
||||
assert_eq!(res, [true, false, false]);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
let mut threads = vec![];
|
||||
threads.extend(dr1.thread_hdls.into_iter());
|
||||
threads.extend(dr2.thread_hdls.into_iter());
|
||||
threads.extend(dr3.thread_hdls.into_iter());
|
||||
for t in threads.into_iter() {
|
||||
t.join().unwrap();
|
||||
}
|
||||
}
|
@ -1,19 +1,18 @@
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
extern crate bincode;
|
||||
extern crate futures;
|
||||
extern crate solana;
|
||||
|
||||
use futures::Future;
|
||||
use solana::bank::Bank;
|
||||
use solana::crdt::TestNode;
|
||||
use solana::crdt::{Crdt, ReplicatedData};
|
||||
use solana::data_replicator::DataReplicator;
|
||||
use solana::logger;
|
||||
use solana::mint::Mint;
|
||||
use solana::server::Server;
|
||||
use solana::signature::{KeyPair, KeyPairUtil, PublicKey};
|
||||
use solana::streamer::default_window;
|
||||
use solana::thin_client::ThinClient;
|
||||
use solana::tvu::TestNode;
|
||||
use std::io;
|
||||
use std::io::sink;
|
||||
use std::net::UdpSocket;
|
||||
@ -59,16 +58,15 @@ fn converge(
|
||||
let mut spy_crdt = Crdt::new(spy.data);
|
||||
spy_crdt.insert(&leader);
|
||||
spy_crdt.set_leader(leader.id);
|
||||
|
||||
let spy_ref = Arc::new(RwLock::new(spy_crdt));
|
||||
let spy_window = default_window();
|
||||
let t_spy_listen = Crdt::listen(
|
||||
let dr = DataReplicator::new(
|
||||
spy_ref.clone(),
|
||||
spy_window,
|
||||
spy.sockets.gossip,
|
||||
exit.clone(),
|
||||
);
|
||||
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
|
||||
spy.sockets.gossip_send,
|
||||
exit,
|
||||
).unwrap();
|
||||
//wait for the network to converge
|
||||
let mut converged = false;
|
||||
for _ in 0..30 {
|
||||
@ -80,8 +78,7 @@ fn converge(
|
||||
sleep(Duration::new(1, 0));
|
||||
}
|
||||
assert!(converged);
|
||||
threads.push(t_spy_listen);
|
||||
threads.push(t_spy_gossip);
|
||||
threads.extend(dr.thread_hdls.into_iter());
|
||||
let v: Vec<ReplicatedData> = spy_ref
|
||||
.read()
|
||||
.unwrap()
|
||||
@ -107,7 +104,6 @@ fn test_multi_node() {
|
||||
let leader_bank = Bank::new(&alice);
|
||||
let server = Server::new_leader(
|
||||
leader_bank,
|
||||
alice.last_id(),
|
||||
None,
|
||||
leader.data.clone(),
|
||||
leader.sockets.requests,
|
||||
@ -169,7 +165,7 @@ fn tx_and_retry_get_balance(
|
||||
) -> io::Result<i64> {
|
||||
let mut client = mk_client(leader);
|
||||
trace!("getting leader last_id");
|
||||
let last_id = client.get_last_id().wait().unwrap();
|
||||
let last_id = client.get_last_id();
|
||||
info!("executing leader transer");
|
||||
let _sig = client
|
||||
.transfer(500, &alice.keypair(), *bob_pubkey, &last_id)
|
||||
|
Reference in New Issue
Block a user