Compare commits

...

66 Commits

Author SHA1 Message Date
7aa05618a3 data_replicator -> ncp
Fixes #327
2018-06-07 17:11:17 -06:00
cdfbbe5e60 Fix diagram typos 2018-06-07 17:11:17 -06:00
fe7d1cb81c Race -> Or
Thanks for the suggestion @FishmanL!
2018-06-07 17:11:03 -06:00
c2a9395a4b perf counters 2018-06-07 14:59:21 -07:00
586279bcfc Add server diagrams 2018-06-07 15:24:44 -06:00
8bd10e7c4c Cleanup top-level lib doc 2018-06-07 15:24:44 -06:00
928e6165bc Add TPU & RPU diagrams 2018-06-07 15:24:44 -06:00
77c9e801aa fixed client demo (#325)
* fixed client demo
2018-06-07 13:51:15 -07:00
c78132417f fix deadlock 2018-06-07 13:52:33 -06:00
849928887e undo 2018-06-07 13:52:33 -06:00
ba1163d49f fix logs 2018-06-07 13:52:33 -06:00
6f9c89af39 fix deadlock 2018-06-07 13:52:33 -06:00
246b8b1242 No longer cat scripts
Because we keep changing those scripts and not updating the readme.

Also, this removes the "-b 9000" starting validators. Is that right?
Or should we be passing that to the validator config?
2018-06-07 12:17:43 -06:00
f0db68cb75 Add note about validator.json and -d flag to config generating scripts 2018-06-07 11:15:41 -06:00
f0d1fdfb46 Add missing module descriptions 2018-06-07 09:25:36 -06:00
3b8b2e030a Better docs for transaction 2018-06-07 09:25:36 -06:00
b4fee677a5 Better docs for payment_plan 2018-06-07 09:25:36 -06:00
fe706583f9 Better docs for sigverify_stage 2018-06-07 09:25:36 -06:00
d0e0c17ece Better docs for rpu 2018-06-07 09:25:36 -06:00
5aaa38bcaf Better docs for write_stage 2018-06-07 09:25:36 -06:00
6ff9b27f8e Better docs for entry 2018-06-07 09:25:36 -06:00
3f4e035506 Better docs for budget 2018-06-07 09:25:36 -06:00
57d9fbb927 Better docs for banking_stage 2018-06-07 09:25:36 -06:00
ee44e51b30 Better docs for the bank 2018-06-07 09:25:36 -06:00
5011f24123 Move more interesting content into first header
The first header and its content is the only text displayed on
GitHub's mobile page. Reorder so that the disclaimer is the only
information people see.

Disclaimer: IANAL and assume reordering these doesn't matter. :)
2018-06-07 09:25:36 -06:00
d1eda334f3 gdb 2018-06-07 09:25:08 -06:00
2ae5ce9f2c Do not use cuda for multinode-demo validator component 2018-06-07 07:04:33 -06:00
4f5ac78b7e Add readme to crates.io 2018-06-06 15:00:25 -06:00
074c9af020 Shellcheck again 2018-06-05 15:32:25 -06:00
2da2d4e365 More shellcheck 2018-06-05 15:32:25 -06:00
8eb76ab2a5 Fix shellcheck 2018-06-05 15:32:25 -06:00
a710d95243 Fix non-erasure blob nulling 2018-06-05 15:32:25 -06:00
a06535d7ed cargo fmt 2018-06-05 15:32:25 -06:00
f511ac9be7 Fixes for receiving old blobs and nulling the window with coding 2018-06-05 15:32:25 -06:00
e28ad2177e Receive fixes 2018-06-05 15:32:25 -06:00
cb16fe84cd Rework to fix coding blob insertion 2018-06-05 15:32:25 -06:00
ec3569aa39 Move receive_index to correct place 2018-06-05 15:32:25 -06:00
246edecf53 Add receive_index for broadcast blobs and fix blobs_len position 2018-06-05 15:32:25 -06:00
34834c5af9 Store another size in the data block so it is coded as well 2018-06-05 15:32:25 -06:00
b845245614 Restore more of the blob window and add is_coding helper 2018-06-05 15:32:25 -06:00
5711fb9969 Generate coding for the current blob set not just the first coding set 2018-06-05 15:32:25 -06:00
d1eaecde9a Fix deadlock and only push to contq if it's not a coding blob 2018-06-05 15:32:25 -06:00
00c8505d1e Handle set_flags error 2018-06-05 15:32:25 -06:00
33f01efe69 Fixes for erasure coding 2018-06-05 15:32:25 -06:00
377d312c81 Revert log levels 2018-06-05 15:32:25 -06:00
badf5d5412 Add window recovery 2018-06-05 15:32:25 -06:00
0339f90b40 Fix gf-complete url and symlinks 2018-06-05 15:32:25 -06:00
5455e8e6a9 Review comments 2018-06-05 15:32:25 -06:00
6843b71a0d Debug erasure ci script 2018-06-05 15:32:25 -06:00
634408b5e8 Add erasure build to ci 2018-06-05 15:32:25 -06:00
d053f78b74 Erasure refinements, fix generating orders table 2018-06-05 15:32:25 -06:00
93b6fceb2f generate coding after indexing 2018-06-05 15:32:25 -06:00
ac7860c35d indexing blobs then coding 2018-06-05 15:32:25 -06:00
b0eab8729f Add erasure ci script 2018-06-05 15:32:25 -06:00
cb81f80b31 Enable logging for client demo 2018-06-05 15:32:25 -06:00
ea97529185 Fix erasure compilation 2018-06-05 15:32:25 -06:00
f1075191fe Clean up comments: Event -> Transaction 2018-06-04 21:43:46 -06:00
74c479fbc9 Delete bitrotted docs 2018-06-04 21:43:46 -06:00
7e788d3a17 No longer need explicit refs in rustc 1.26 2018-06-04 21:43:46 -06:00
69b3c75f0d Power of two chance (#314)
* fix validator script
* 1/2^30 that we fail due to random returning the same value
2018-06-04 13:32:34 -07:00
b2c2fa40a2 comments 2018-06-03 22:08:25 -06:00
50458d9524 more tests 2018-06-03 22:08:25 -06:00
9679e3e356 more tests 2018-06-03 22:08:25 -06:00
6db9f92b8a crdt gossip tests 2018-06-03 22:08:25 -06:00
4a44498d45 Fix args in validator script, readme version, client-demo perf print 2018-06-02 21:55:27 -06:00
216510c573 repair socket and receiver thread (#303)
repair socket and receiver thread
2018-06-02 08:32:51 -07:00
39 changed files with 1429 additions and 505 deletions

View File

@ -1,9 +1,10 @@
[package]
name = "solana"
description = "Blockchain Rebuilt for Scale"
version = "0.6.0"
description = "Blockchain, Rebuilt for Scale"
version = "0.6.1"
documentation = "https://docs.rs/solana"
homepage = "http://solana.com/"
readme = "README.md"
repository = "https://github.com/solana-labs/solana"
authors = [
"Anatoly Yakovenko <anatoly@solana.com>",

View File

@ -3,17 +3,17 @@
[![Build status](https://badge.buildkite.com/d4c4d7da9154e3a8fb7199325f430ccdb05be5fc1e92777e51.svg?branch=master)](https://buildkite.com/solana-labs/solana)
[![codecov](https://codecov.io/gh/solana-labs/solana/branch/master/graph/badge.svg)](https://codecov.io/gh/solana-labs/solana)
Disclaimer
===
All claims, content, designs, algorithms, estimates, roadmaps, specifications, and performance measurements described in this project are done with the author's best effort. It is up to the reader to check and validate their accuracy and truthfulness. Furthermore nothing in this project constitutes a solicitation for investment.
Solana: Blockchain Rebuilt for Scale
Blockchain, Rebuilt for Scale
===
Solana&trade; is a new blockchain architecture built from the ground up for scale. The architecture supports
up to 710 thousand transactions per second on a gigabit network.
Disclaimer
===
All claims, content, designs, algorithms, estimates, roadmaps, specifications, and performance measurements described in this project are done with the author's best effort. It is up to the reader to check and validate their accuracy and truthfulness. Furthermore nothing in this project constitutes a solicitation for investment.
Introduction
===
@ -82,17 +82,12 @@ open on all the machines you want to test with.
Generate a leader configuration file with:
```bash
cargo run --release --bin solana-fullnode-config > leader.json
cargo run --release --bin solana-fullnode-config -- -d > leader.json
```
Now start the server:
```bash
$ cat ./multinode-demo/leader.sh
#!/bin/bash
export RUST_LOG=solana=info
sudo sysctl -w net.core.rmem_max=26214400
cargo run --release --bin solana-fullnode -- -l leader.json < genesis.log
$ ./multinode-demo/leader.sh > leader-txs.log
```
@ -100,7 +95,7 @@ To run a performance-enhanced fullnode on Linux, download `libcuda_verify_ed2551
it by adding `--features=cuda` to the line that runs `solana-fullnode` in `leader.sh`.
```bash
$ wget https://solana-build-artifacts.s3.amazonaws.com/v0.6.0/libcuda_verify_ed25519.a
$ wget https://solana-build-artifacts.s3.amazonaws.com/v0.5.0/libcuda_verify_ed25519.a
cargo run --release --features=cuda --bin solana-fullnode -- -l leader.json < genesis.log
```
@ -112,15 +107,13 @@ Multinode Testnet
To run a multinode testnet, after starting a leader node, spin up some validator nodes:
Generate the validator's configuration file:
```bash
cargo run --release --bin solana-fullnode-config -- -d > validator.json
```
```bash
$ cat ./multinode-demo/validator.sh
#!/bin/bash
rsync -v -e ssh $1/mint-demo.json .
rsync -v -e ssh $1/leader.json .
rsync -v -e ssh $1/genesis.log .
export RUST_LOG=solana=info
sudo sysctl -w net.core.rmem_max=26214400
cargo run --release --bin solana-fullnode -- -l validator.json -v leader.json -b 9000 -d < genesis.log
$ ./multinode-demo/validator.sh ubuntu@10.0.1.51:~/solana > validator-txs.log #The leader machine
```
@ -128,7 +121,7 @@ As with the leader node, you can run a performance-enhanced validator fullnode b
`--features=cuda` to the line that runs `solana-fullnode` in `validator.sh`.
```bash
cargo run --release --features=cuda --bin solana-fullnode -- -l validator.json -v leader.json -b 9000 -d < genesis.log
cargo run --release --features=cuda --bin solana-fullnode -- -l validator.json -v leader.json < genesis.log
```
@ -139,13 +132,7 @@ Now that your singlenode or multinode testnet is up and running, in a separate s
the JSON configuration file here, not the genesis ledger.
```bash
$ cat ./multinode-demo/client.sh
#!/bin/bash
export RUST_LOG=solana=info
rsync -v -e ssh $1/leader.json .
rsync -v -e ssh $1/mint-demo.json .
cat mint-demo.json | cargo run --release --bin solana-client-demo -- -l leader.json
$ ./multinode-demo/client.sh ubuntu@10.0.1.51:~/solana #The leader machine
$ ./multinode-demo/client.sh ubuntu@10.0.1.51:~/solana 2 #The leader machine and the total number of nodes in the network
```
What just happened? The client demo spins up several threads to send 500,000 transactions
@ -210,6 +197,17 @@ to see the debug and info sections for streamer and server respectively. General
we are using debug for infrequent debug messages, trace for potentially frequent messages and
info for performance-related logging.
Attaching to a running process with gdb
```
$ sudo gdb
attach <PID>
set logging on
thread apply all bt
```
This will dump all the threads stack traces into gdb.txt
Benchmarking
---

View File

@ -11,6 +11,8 @@ steps:
name: "cuda"
- command: "ci/shellcheck.sh"
name: "shellcheck [public]"
- command: "ci/test-erasure.sh"
name: "erasure"
- wait
- command: "ci/publish.sh"
name: "publish release artifacts"

29
ci/test-erasure.sh Executable file
View File

@ -0,0 +1,29 @@
#!/bin/bash -e
set -o xtrace
cd "$(dirname "$0")/.."
if [[ -z "${libgf_complete_URL:-}" ]]; then
echo libgf_complete_URL undefined
exit 1
fi
if [[ -z "${libJerasure_URL:-}" ]]; then
echo libJerasure_URL undefined
exit 1
fi
curl -X GET -o libJerasure.so "$libJerasure_URL"
curl -X GET -o libgf_complete.so "$libgf_complete_URL"
ln -s libJerasure.so libJerasure.so.2
ln -s libJerasure.so libJerasure.so.2.0.0
ln -s libgf_complete.so libgf_complete.so.1.0.0
export LD_LIBRARY_PATH=$PWD:$LD_LIBRARY_PATH
# shellcheck disable=SC1090 # <-- shellcheck can't follow ~
source ~/.cargo/env
cargo test --features="erasure"
exit 0

View File

@ -1,65 +0,0 @@
The Historian
===
Create a *Historian* and send it *events* to generate an *event log*, where each *entry*
is tagged with the historian's latest *hash*. Then ensure the order of events was not tampered
with by verifying each entry's hash can be generated from the hash in the previous entry:
![historian](https://user-images.githubusercontent.com/55449/36950845-459bdb58-1fb9-11e8-850e-894586f3729b.png)
```rust
extern crate solana;
use solana::historian::Historian;
use solana::ledger::{Block, Entry, Hash};
use solana::event::{generate_keypair, get_pubkey, sign_claim_data, Event};
use std::thread::sleep;
use std::time::Duration;
use std::sync::mpsc::SendError;
fn create_ledger(hist: &Historian<Hash>) -> Result<(), SendError<Event<Hash>>> {
sleep(Duration::from_millis(15));
let tokens = 42;
let keypair = generate_keypair();
let event0 = Event::new_claim(get_pubkey(&keypair), tokens, sign_claim_data(&tokens, &keypair));
hist.sender.send(event0)?;
sleep(Duration::from_millis(10));
Ok(())
}
fn main() {
let seed = Hash::default();
let hist = Historian::new(&seed, Some(10));
create_ledger(&hist).expect("send error");
drop(hist.sender);
let entries: Vec<Entry<Hash>> = hist.receiver.iter().collect();
for entry in &entries {
println!("{:?}", entry);
}
// Proof-of-History: Verify the historian learned about the events
// in the same order they appear in the vector.
assert!(entries[..].verify(&seed));
}
```
Running the program should produce a ledger similar to:
```rust
Entry { num_hashes: 0, id: [0, ...], event: Tick }
Entry { num_hashes: 3, id: [67, ...], event: Transaction { tokens: 42 } }
Entry { num_hashes: 3, id: [123, ...], event: Tick }
```
Proof-of-History
---
Take note of the last line:
```rust
assert!(entries[..].verify(&seed));
```
[It's a proof!](https://en.wikipedia.org/wiki/CurryHoward_correspondence) For each entry returned by the
historian, we can verify that `id` is the result of applying a sha256 hash to the previous `id`
exactly `num_hashes` times, and then hashing then event data on top of that. Because the event data is
included in the hash, the events cannot be reordered without regenerating all the hashes.

View File

@ -1,18 +0,0 @@
msc {
client,historian,recorder;
recorder=>historian [ label = "e0 = Entry{id: h0, n: 0, event: Tick}" ] ;
recorder=>recorder [ label = "h1 = hash(h0)" ] ;
recorder=>recorder [ label = "h2 = hash(h1)" ] ;
client=>historian [ label = "Transaction(d0)" ] ;
historian=>recorder [ label = "Transaction(d0)" ] ;
recorder=>recorder [ label = "h3 = hash(h2 + d0)" ] ;
recorder=>historian [ label = "e1 = Entry{id: hash(h3), n: 3, event: Transaction(d0)}" ] ;
recorder=>recorder [ label = "h4 = hash(h3)" ] ;
recorder=>recorder [ label = "h5 = hash(h4)" ] ;
recorder=>recorder [ label = "h6 = hash(h5)" ] ;
recorder=>historian [ label = "e2 = Entry{id: h6, n: 3, event: Tick}" ] ;
client=>historian [ label = "collect()" ] ;
historian=>client [ label = "entries = [e0, e1, e2]" ] ;
client=>client [ label = "entries.verify(h0)" ] ;
}

View File

@ -1,11 +1,15 @@
#!/bin/bash -e
if [[ -z "$1" ]]; then
echo "usage: $0 [network path to solana repo on leader machine]"
echo "usage: $0 [network path to solana repo on leader machine] [number of nodes in the network if greater then 1]"
exit 1
fi
LEADER="$1"
COUNT="$2"
if [[ -z "$2" ]]; then
COUNT=1
fi
set -x
export RUST_LOG=solana=info
@ -13,4 +17,4 @@ rsync -v -e ssh "$LEADER/leader.json" .
rsync -v -e ssh "$LEADER/mint-demo.json" .
cargo run --release --bin solana-client-demo -- \
-l leader.json < mint-demo.json 2>&1 | tee client.log
-l leader.json -n $COUNT -d < mint-demo.json 2>&1 | tee client.log

View File

@ -17,5 +17,5 @@ export RUST_LOG=solana=info
sudo sysctl -w net.core.rmem_max=26214400
cargo run --release --features=cuda --bin solana-fullnode -- \
-l validator.json -v leader.json -b 9000 -d < genesis.log
cargo run --release --bin solana-fullnode -- \
-l validator.json -v leader.json < genesis.log

View File

@ -1,6 +1,6 @@
//! The `bank` module tracks client balances, and the progress of pending
//! transactions. It offers a high-level public API that signs transactions
//! on behalf of the caller, and a private low-level API for when they have
//! The `bank` module tracks client balances and the progress of smart
//! contracts. It offers a high-level API that signs transactions
//! on behalf of the caller, and a low-level API for when they have
//! already been signed and verified.
extern crate libc;
@ -19,25 +19,69 @@ use std::sync::atomic::{AtomicIsize, AtomicUsize, Ordering};
use std::sync::RwLock;
use transaction::{Instruction, Plan, Transaction};
/// The number of most recent `last_id` values that the bank will track the signatures
/// of. Once the bank discards a `last_id`, it will reject any transactions that use
/// that `last_id` in a transaction. Lowering this value reduces memory consumption,
/// but requires clients to update its `last_id` more frequently. Raising the value
/// lengthens the time a client must wait to be certain a missing transaction will
/// not be processed by the network.
pub const MAX_ENTRY_IDS: usize = 1024 * 4;
/// Reasons a transaction might be rejected.
#[derive(Debug, PartialEq, Eq)]
pub enum BankError {
/// Attempt to debit from `PublicKey`, but no found no record of a prior credit.
AccountNotFound(PublicKey),
/// The requested debit from `PublicKey` has the potential to draw the balance
/// below zero. This can occur when a debit and credit are processed in parallel.
/// The bank may reject the debit or push it to a future entry.
InsufficientFunds(PublicKey),
/// The bank has seen `Signature` before. This can occur under normal operation
/// when a UDP packet is duplicated, as a user error from a client not updating
/// its `last_id`, or as a double-spend attack.
DuplicateSiganture(Signature),
/// The bank has not seen the given `last_id` or the transaction is too old and
/// the `last_id` has been discarded.
LastIdNotFound(Hash),
/// The transaction is invalid and has requested a debit or credit of negative
/// tokens.
NegativeTokens,
}
pub type Result<T> = result::Result<T, BankError>;
/// The state of all accounts and contracts after processing its entries.
pub struct Bank {
/// A map of account public keys to the balance in that account.
balances: RwLock<HashMap<PublicKey, AtomicIsize>>,
/// A map of smart contract transaction signatures to what remains of its payment
/// plan. Each transaction that targets the plan should cause it to be reduced.
/// Once it cannot be reduced, final payments are made and it is discarded.
pending: RwLock<HashMap<Signature, Plan>>,
/// A FIFO queue of `last_id` items, where each item is a set of signatures
/// that have been processed using that `last_id`. The bank uses this data to
/// reject transactions with signatures its seen before as well as `last_id`
/// values that are so old that its `last_id` has been pulled out of the queue.
last_ids: RwLock<VecDeque<(Hash, RwLock<HashSet<Signature>>)>>,
/// The set of trusted timekeepers. A Timestamp transaction from a `PublicKey`
/// outside this set will be discarded. Note that if validators do not have the
/// same set as leaders, they may interpret the ledger differently.
time_sources: RwLock<HashSet<PublicKey>>,
/// The most recent timestamp from a trusted timekeeper. This timestamp is applied
/// to every smart contract when it enters the system. If it is waiting on a
/// timestamp witness before that timestamp, the bank will execute it immediately.
last_time: RwLock<DateTime<Utc>>,
/// The number of transactions the bank has processed without error since the
/// start of the ledger.
transaction_count: AtomicUsize,
}
@ -67,7 +111,7 @@ impl Bank {
bank
}
/// Commit funds to the 'to' party.
/// Commit funds to the `payment.to` party.
fn apply_payment(&self, payment: &Payment) {
// First we check balances with a read lock to maximize potential parallelization.
if self.balances
@ -89,13 +133,14 @@ impl Bank {
}
}
/// Return the last entry ID registered
/// Return the last entry ID registered.
pub fn last_id(&self) -> Hash {
let last_ids = self.last_ids.read().expect("'last_ids' read lock");
let last_item = last_ids.iter().last().expect("empty 'last_ids' list");
last_item.0
}
/// Store the given signature. The bank will reject any transaction with the same signature.
fn reserve_signature(signatures: &RwLock<HashSet<Signature>>, sig: &Signature) -> Result<()> {
if signatures
.read()
@ -111,14 +156,16 @@ impl Bank {
Ok(())
}
fn forget_signature(signatures: &RwLock<HashSet<Signature>>, sig: &Signature) {
/// Forget the given `signature` because its transaction was rejected.
fn forget_signature(signatures: &RwLock<HashSet<Signature>>, signature: &Signature) {
signatures
.write()
.expect("'signatures' write lock in forget_signature")
.remove(sig);
.remove(signature);
}
fn forget_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) {
/// Forget the given `signature` with `last_id` because the transaction was rejected.
fn forget_signature_with_last_id(&self, signature: &Signature, last_id: &Hash) {
if let Some(entry) = self.last_ids
.read()
.expect("'last_ids' read lock in forget_signature_with_last_id")
@ -126,11 +173,11 @@ impl Bank {
.rev()
.find(|x| x.0 == *last_id)
{
Self::forget_signature(&entry.1, sig);
Self::forget_signature(&entry.1, signature);
}
}
fn reserve_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> Result<()> {
fn reserve_signature_with_last_id(&self, signature: &Signature, last_id: &Hash) -> Result<()> {
if let Some(entry) = self.last_ids
.read()
.expect("'last_ids' read lock in reserve_signature_with_last_id")
@ -138,7 +185,7 @@ impl Bank {
.rev()
.find(|x| x.0 == *last_id)
{
return Self::reserve_signature(&entry.1, sig);
return Self::reserve_signature(&entry.1, signature);
}
Err(BankError::LastIdNotFound(*last_id))
}
@ -207,6 +254,8 @@ impl Bank {
}
}
/// Apply only a transaction's credits. Credits from multiple transactions
/// may safely be applied in parallel.
fn apply_credits(&self, tx: &Transaction) {
match &tx.instruction {
Instruction::NewContract(contract) => {
@ -215,8 +264,8 @@ impl Bank {
.read()
.expect("timestamp creation in apply_credits")));
if let Some(ref payment) = plan.final_payment() {
self.apply_payment(payment);
if let Some(payment) = plan.final_payment() {
self.apply_payment(&payment);
} else {
let mut pending = self.pending
.write()
@ -233,17 +282,17 @@ impl Bank {
}
}
/// Process a Transaction.
/// Process a Transaction. If it contains a payment plan that requires a witness
/// to progress, the payment plan will be stored in the bank.
fn process_transaction(&self, tx: &Transaction) -> Result<()> {
self.apply_debits(tx)?;
self.apply_credits(tx);
Ok(())
}
/// Process a batch of transactions.
/// Process a batch of transactions. It runs all debits first to filter out any
/// transactions that can't be processed in parallel deterministically.
pub fn process_transactions(&self, txs: Vec<Transaction>) -> Vec<Result<Transaction>> {
// Run all debits first to filter out any transactions that can't be processed
// in parallel deterministically.
info!("processing Transactions {}", txs.len());
let results: Vec<_> = txs.into_par_iter()
.map(|tx| self.apply_debits(&tx).map(|_| tx))
@ -260,6 +309,7 @@ impl Bank {
.collect()
}
/// Process an ordered list of entries.
pub fn process_entries<I>(&self, entries: I) -> Result<()>
where
I: IntoIterator<Item = Entry>,
@ -273,7 +323,8 @@ impl Bank {
Ok(())
}
/// Process a Witness Signature.
/// Process a Witness Signature. Any payment plans waiting on this signature
/// will progress one step.
fn apply_signature(&self, from: PublicKey, tx_sig: Signature) -> Result<()> {
if let Occupied(mut e) = self.pending
.write()
@ -290,7 +341,8 @@ impl Bank {
Ok(())
}
/// Process a Witness Timestamp.
/// Process a Witness Timestamp. Any payment plans waiting on this timestamp
/// will progress one step.
fn apply_timestamp(&self, from: PublicKey, dt: DateTime<Utc>) -> Result<()> {
// If this is the first timestamp we've seen, it probably came from the genesis block,
// so we'll trust it.
@ -329,8 +381,8 @@ impl Bank {
plan.apply_witness(&Witness::Timestamp(*self.last_time
.read()
.expect("'last_time' read lock when creating timestamp")));
if let Some(ref payment) = plan.final_payment() {
self.apply_payment(payment);
if let Some(payment) = plan.final_payment() {
self.apply_payment(&payment);
completed.push(key.clone());
}
}
@ -392,7 +444,7 @@ mod tests {
use signature::KeyPairUtil;
#[test]
fn test_bank() {
fn test_two_payments_to_one_party() {
let mint = Mint::new(10_000);
let pubkey = KeyPair::new().pubkey();
let bank = Bank::new(&mint);
@ -409,7 +461,7 @@ mod tests {
}
#[test]
fn test_invalid_tokens() {
fn test_negative_tokens() {
let mint = Mint::new(1);
let pubkey = KeyPair::new().pubkey();
let bank = Bank::new(&mint);
@ -433,7 +485,7 @@ mod tests {
}
#[test]
fn test_invalid_transfer() {
fn test_insufficient_funds() {
let mint = Mint::new(11_000);
let bank = Bank::new(&mint);
let pubkey = KeyPair::new().pubkey();
@ -570,7 +622,7 @@ mod tests {
}
#[test]
fn test_max_entry_ids() {
fn test_reject_old_last_id() {
let mint = Mint::new(1);
let bank = Bank::new(&mint);
let sig = Signature::default();

View File

@ -1,14 +1,17 @@
//! The `banking_stage` processes Transaction messages.
//! The `banking_stage` processes Transaction messages. It is intended to be used
//! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
use bank::Bank;
use bincode::deserialize;
use counter::Counter;
use packet;
use packet::SharedPackets;
use rayon::prelude::*;
use record_stage::Signal;
use result::Result;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::thread::{Builder, JoinHandle};
@ -17,12 +20,20 @@ use std::time::Instant;
use timing;
use transaction::Transaction;
/// Stores the stage's thread handle and output receiver.
pub struct BankingStage {
/// Handle to the stage's thread.
pub thread_hdl: JoinHandle<()>,
/// Output receiver for the following stage.
pub signal_receiver: Receiver<Signal>,
}
impl BankingStage {
/// Create the stage using `bank`. Exit when either `exit` is set or
/// when `verified_receiver` or the stage's output receiver is dropped.
/// Discard input packets using `packet_recycler` to minimize memory
/// allocations in a previous stage such as the `fetch_stage`.
pub fn new(
bank: Arc<Bank>,
exit: Arc<AtomicBool>,
@ -52,6 +63,8 @@ impl BankingStage {
}
}
/// Convert the transactions from a blob of binary data to a vector of transactions and
/// an unused `SocketAddr` that could be used to send a response.
fn deserialize_transactions(p: &packet::Packets) -> Vec<Option<(Transaction, SocketAddr)>> {
p.packets
.par_iter()
@ -63,6 +76,8 @@ impl BankingStage {
.collect()
}
/// Process the incoming packets and send output `Signal` messages to `signal_sender`.
/// Discard packets via `packet_recycler`.
fn process_packets(
bank: Arc<Bank>,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
@ -80,6 +95,8 @@ impl BankingStage {
timing::duration_as_ms(&recv_start.elapsed()),
mms.len(),
);
let count = mms.iter().map(|x| x.1.len()).sum();
static mut COUNTER: Counter = create_counter!("banking_stage_process_packets", 1);
let proc_start = Instant::now();
for (msgs, vers) in mms {
let transactions = Self::deserialize_transactions(&msgs.read().unwrap());
@ -100,7 +117,7 @@ impl BankingStage {
debug!("process_transactions");
let results = bank.process_transactions(transactions);
let transactions = results.into_iter().filter_map(|x| x.ok()).collect();
signal_sender.send(Signal::Events(transactions))?;
signal_sender.send(Signal::Transactions(transactions))?;
debug!("done process_transactions");
packet_recycler.recycle(msgs);
@ -115,6 +132,7 @@ impl BankingStage {
reqs_len,
(reqs_len as f32) / (total_time_s)
);
inc_counter!(COUNTER, count, proc_start);
Ok(())
}
}
@ -293,7 +311,7 @@ mod bench {
&packet_recycler,
).unwrap();
let signal = signal_receiver.recv().unwrap();
if let Signal::Events(ref transactions) = signal {
if let Signal::Transactions(transactions) = signal {
assert_eq!(transactions.len(), tx);
} else {
assert!(false);

View File

@ -1,3 +1,4 @@
extern crate env_logger;
extern crate getopts;
extern crate isatty;
extern crate pnet;
@ -10,8 +11,8 @@ use isatty::stdin_isatty;
use pnet::datalink;
use rayon::prelude::*;
use solana::crdt::{Crdt, ReplicatedData};
use solana::data_replicator::DataReplicator;
use solana::mint::MintDemo;
use solana::ncp::Ncp;
use solana::signature::{GenKeys, KeyPair, KeyPairUtil};
use solana::streamer::default_window;
use solana::thin_client::ThinClient;
@ -49,6 +50,7 @@ fn get_ip_addr() -> Option<IpAddr> {
}
fn main() {
env_logger::init().unwrap();
let mut threads = 4usize;
let mut num_nodes = 1usize;
@ -107,9 +109,10 @@ fn main() {
&client_addr,
&leader,
signal.clone(),
num_nodes + 2,
num_nodes,
&mut c_threads,
);
assert_eq!(validators.len(), num_nodes);
if stdin_isatty() {
eprintln!("nothing found on stdin, expected a json file");
@ -175,38 +178,63 @@ fn main() {
}
});
let sample_period = 1; // in seconds
println!("Sampling tps every second...",);
validators.into_par_iter().for_each(|val| {
let mut client = mk_client(&client_addr, &val);
let mut now = Instant::now();
let mut initial_tx_count = client.transaction_count();
for i in 0..100 {
let tx_count = client.transaction_count();
let duration = now.elapsed();
now = Instant::now();
let sample = tx_count - initial_tx_count;
initial_tx_count = tx_count;
println!(
"{}: Transactions processed {}",
val.transactions_addr, sample
);
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (sample * 1_000_000_000) as f64 / ns as f64;
println!("{}: {} tps", val.transactions_addr, tps);
let total = tx_count - first_count;
println!(
"{}: Total Transactions processed {}",
val.transactions_addr, total
);
if total == transactions.len() as u64 {
break;
let maxes: Vec<_> = validators
.into_par_iter()
.map(|val| {
let mut client = mk_client(&client_addr, &val);
let mut now = Instant::now();
let mut initial_tx_count = client.transaction_count();
let mut max_tps = 0.0;
let mut total = 0;
for i in 0..100 {
let tx_count = client.transaction_count();
let duration = now.elapsed();
now = Instant::now();
let sample = tx_count - initial_tx_count;
initial_tx_count = tx_count;
println!(
"{}: Transactions processed {}",
val.transactions_addr, sample
);
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (sample * 1_000_000_000) as f64 / ns as f64;
if tps > max_tps {
max_tps = tps;
}
println!("{}: {} tps", val.transactions_addr, tps);
total = tx_count - first_count;
println!(
"{}: Total Transactions processed {}",
val.transactions_addr, total
);
if total == transactions.len() as u64 {
break;
}
if i > 20 && sample == 0 {
break;
}
sleep(Duration::new(sample_period, 0));
}
if i > 20 && sample == 0 {
break;
}
sleep(Duration::new(1, 0));
(max_tps, total)
})
.collect();
let mut max_of_maxes = 0.0;
let mut total_txs = 0;
for (max, txs) in &maxes {
if *max > max_of_maxes {
max_of_maxes = *max;
}
});
total_txs += *txs;
}
println!(
"\nHighest TPS: {} sampling period {}s total transactions: {} clients: {}",
max_of_maxes,
sample_period,
total_txs,
maxes.len()
);
signal.store(true, Ordering::Relaxed);
for t in c_threads {
t.join().unwrap();
@ -235,7 +263,14 @@ fn spy_node(client_addr: &Arc<RwLock<SocketAddr>>) -> (ReplicatedData, UdpSocket
addr.set_port(port + 1);
let daddr = "0.0.0.0:0".parse().unwrap();
let pubkey = KeyPair::new().pubkey();
let node = ReplicatedData::new(pubkey, gossip.local_addr().unwrap(), daddr, daddr, daddr);
let node = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
daddr,
daddr,
daddr,
daddr,
);
(node, gossip)
}
@ -255,33 +290,34 @@ fn converge(
let spy_ref = Arc::new(RwLock::new(spy_crdt));
let window = default_window();
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let data_replicator = DataReplicator::new(
let ncp = Ncp::new(
spy_ref.clone(),
window.clone(),
spy_gossip,
gossip_send_socket,
exit.clone(),
).expect("DataReplicator::new");
//wait for the network to converge
let mut rv = vec![];
//wait for the network to converge, 30 seconds should be plenty
for _ in 0..30 {
let min = spy_ref.read().unwrap().convergence();
if num_nodes as u64 == min {
println!("converged!");
let v: Vec<ReplicatedData> = spy_ref
.read()
.unwrap()
.table
.values()
.into_iter()
.filter(|x| x.requests_addr != daddr)
.cloned()
.collect();
if v.len() >= num_nodes {
println!("CONVERGED!");
rv.extend(v.into_iter());
break;
}
sleep(Duration::new(1, 0));
}
threads.extend(data_replicator.thread_hdls.into_iter());
let v: Vec<ReplicatedData> = spy_ref
.read()
.unwrap()
.table
.values()
.into_iter()
.filter(|x| x.requests_addr != daddr)
.map(|x| x.clone())
.collect();
v.clone()
threads.extend(ncp.thread_hdls.into_iter());
rv
}
fn read_leader(path: String) -> ReplicatedData {

View File

@ -129,6 +129,7 @@ fn main() {
UdpSocket::bind("0.0.0.0:0").unwrap(),
UdpSocket::bind(repl_data.replicate_addr).unwrap(),
UdpSocket::bind(repl_data.gossip_addr).unwrap(),
UdpSocket::bind(repl_data.repair_addr).unwrap(),
leader,
exit.clone(),
);

View File

@ -8,9 +8,13 @@ use payment_plan::{Payment, PaymentPlan, Witness};
use signature::PublicKey;
use std::mem;
/// A data type representing a `Witness` that the payment plan is waiting on.
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
pub enum Condition {
/// Wait for a `Timestamp` `Witness` at or after the given `DateTime`.
Timestamp(DateTime<Utc>),
/// Wait for a `Signature` `Witness` from `PublicKey`.
Signature(PublicKey),
}
@ -18,19 +22,26 @@ impl Condition {
/// Return true if the given Witness satisfies this Condition.
pub fn is_satisfied(&self, witness: &Witness) -> bool {
match (self, witness) {
(&Condition::Signature(ref pubkey), &Witness::Signature(ref from)) => pubkey == from,
(&Condition::Timestamp(ref dt), &Witness::Timestamp(ref last_time)) => dt <= last_time,
(Condition::Signature(pubkey), Witness::Signature(from)) => pubkey == from,
(Condition::Timestamp(dt), Witness::Timestamp(last_time)) => dt <= last_time,
_ => false,
}
}
}
/// A data type reprsenting a payment plan.
#[repr(C)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
pub enum Budget {
/// Make a payment.
Pay(Payment),
/// Make a payment after some condition.
After(Condition, Payment),
Race((Condition, Payment), (Condition, Payment)),
/// Either make a payment after one condition or a different payment after another
/// condition, which ever condition is satisfied first.
Or((Condition, Payment), (Condition, Payment)),
}
impl Budget {
@ -57,7 +68,7 @@ impl Budget {
tokens: i64,
to: PublicKey,
) -> Self {
Budget::Race(
Budget::Or(
(Condition::Timestamp(dt), Payment { tokens, to }),
(Condition::Signature(from), Payment { tokens, to: from }),
)
@ -67,31 +78,27 @@ impl Budget {
impl PaymentPlan for Budget {
/// Return Payment if the budget requires no additional Witnesses.
fn final_payment(&self) -> Option<Payment> {
match *self {
Budget::Pay(ref payment) => Some(payment.clone()),
match self {
Budget::Pay(payment) => Some(payment.clone()),
_ => None,
}
}
/// Return true if the budget spends exactly `spendable_tokens`.
fn verify(&self, spendable_tokens: i64) -> bool {
match *self {
Budget::Pay(ref payment) | Budget::After(_, ref payment) => {
payment.tokens == spendable_tokens
}
Budget::Race(ref a, ref b) => {
a.1.tokens == spendable_tokens && b.1.tokens == spendable_tokens
}
match self {
Budget::Pay(payment) | Budget::After(_, payment) => payment.tokens == spendable_tokens,
Budget::Or(a, b) => a.1.tokens == spendable_tokens && b.1.tokens == spendable_tokens,
}
}
/// Apply a witness to the budget to see if the budget can be reduced.
/// If so, modify the budget in-place.
fn apply_witness(&mut self, witness: &Witness) {
let new_payment = match *self {
Budget::After(ref cond, ref payment) if cond.is_satisfied(witness) => Some(payment),
Budget::Race((ref cond, ref payment), _) if cond.is_satisfied(witness) => Some(payment),
Budget::Race(_, (ref cond, ref payment)) if cond.is_satisfied(witness) => Some(payment),
let new_payment = match self {
Budget::After(cond, payment) if cond.is_satisfied(witness) => Some(payment),
Budget::Or((cond, payment), _) if cond.is_satisfied(witness) => Some(payment),
Budget::Or(_, (cond, payment)) if cond.is_satisfied(witness) => Some(payment),
_ => None,
}.cloned();

70
src/counter.rs Normal file
View File

@ -0,0 +1,70 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
pub struct Counter {
pub name: &'static str,
pub counts: AtomicUsize,
pub nanos: AtomicUsize,
pub times: AtomicUsize,
pub lograte: usize,
}
macro_rules! create_counter {
($name:expr, $lograte:expr) => {
Counter {
name: $name,
counts: AtomicUsize::new(0),
nanos: AtomicUsize::new(0),
times: AtomicUsize::new(0),
lograte: $lograte,
}
};
}
macro_rules! inc_counter {
($name:expr, $count:expr, $start:expr) => {
unsafe { $name.inc($count, $start.elapsed()) };
};
}
impl Counter {
pub fn inc(&mut self, events: usize, dur: Duration) {
let total = dur.as_secs() * 1_000_000_000 + dur.subsec_nanos() as u64;
let counts = self.counts.fetch_add(events, Ordering::Relaxed);
let nanos = self.nanos.fetch_add(total as usize, Ordering::Relaxed);
let times = self.times.fetch_add(1, Ordering::Relaxed);
if times % self.lograte == 0 && times > 0 {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let now_ms = now.as_secs() * 1_000 + now.subsec_nanos() as u64 / 1_000_000;
info!(
"COUNTER:{{\"name:\":\"{}\", \"counts\": {}, \"nanos\": {}, \"samples\": {} \"rate\": {}, \"now\": {}}}",
self.name,
counts,
nanos,
times,
counts as f64 * 1e9 / nanos as f64,
now_ms,
);
}
}
}
#[cfg(test)]
mod tests {
use counter::Counter;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
#[test]
fn test_counter() {
static mut COUNTER: Counter = create_counter!("test", 100);
let start = Instant::now();
let count = 1;
inc_counter!(COUNTER, count, start);
unsafe {
assert_eq!(COUNTER.counts.load(Ordering::Relaxed), 1);
assert_ne!(COUNTER.nanos.load(Ordering::Relaxed), 0);
assert_eq!(COUNTER.times.load(Ordering::Relaxed), 1);
assert_eq!(COUNTER.lograte, 100);
assert_eq!(COUNTER.name, "test");
}
}
}

View File

@ -77,6 +77,9 @@ pub struct ReplicatedData {
pub requests_addr: SocketAddr,
/// transactions address
pub transactions_addr: SocketAddr,
/// repair address, we use this to jump ahead of the packets
/// destined to the replciate_addr
pub repair_addr: SocketAddr,
/// current leader identity
pub current_leader_id: PublicKey,
/// last verified hash that was submitted to the leader
@ -92,6 +95,7 @@ impl ReplicatedData {
replicate_addr: SocketAddr,
requests_addr: SocketAddr,
transactions_addr: SocketAddr,
repair_addr: SocketAddr,
) -> ReplicatedData {
ReplicatedData {
id,
@ -101,6 +105,7 @@ impl ReplicatedData {
replicate_addr,
requests_addr,
transactions_addr,
repair_addr,
current_leader_id: PublicKey::default(),
last_verified_hash: Hash::default(),
last_verified_count: 0,
@ -118,6 +123,7 @@ impl ReplicatedData {
let gossip_addr = Self::next_port(&bind_addr, 1);
let replicate_addr = Self::next_port(&bind_addr, 2);
let requests_addr = Self::next_port(&bind_addr, 3);
let repair_addr = Self::next_port(&bind_addr, 4);
let pubkey = KeyPair::new().pubkey();
ReplicatedData::new(
pubkey,
@ -125,6 +131,7 @@ impl ReplicatedData {
replicate_addr,
requests_addr,
transactions_addr,
repair_addr,
)
}
}
@ -152,10 +159,9 @@ pub struct Crdt {
pub remote: HashMap<PublicKey, u64>,
pub update_index: u64,
pub me: PublicKey,
timeout: Duration,
}
// TODO These messages should be signed, and go through the gpu pipeline for spam filtering
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
enum Protocol {
/// forward your own latest data structure when requesting an update
/// this doesn't update the `remote` update index, but it allows the
@ -177,7 +183,6 @@ impl Crdt {
remote: HashMap::new(),
me: me.id,
update_index: 1,
timeout: Duration::from_millis(100),
};
g.local.insert(me.id, g.update_index);
g.table.insert(me.id, me);
@ -222,14 +227,38 @@ impl Crdt {
}
}
pub fn index_blobs(
obj: &Arc<RwLock<Self>>,
blobs: &Vec<SharedBlob>,
receive_index: &mut u64,
) -> Result<()> {
let me: ReplicatedData = {
let robj = obj.read().expect("'obj' read lock in crdt::index_blobs");
debug!("broadcast table {}", robj.table.len());
robj.table[&robj.me].clone()
};
// enumerate all the blobs, those are the indices
for (i, b) in blobs.iter().enumerate() {
// only leader should be broadcasting
let mut blob = b.write().expect("'blob' write lock in crdt::index_blobs");
blob.set_id(me.id).expect("set_id in pub fn broadcast");
blob.set_index(*receive_index + i as u64)
.expect("set_index in pub fn broadcast");
}
Ok(())
}
/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to`
pub fn broadcast(
obj: &Arc<RwLock<Self>>,
blobs: &Vec<SharedBlob>,
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
s: &UdpSocket,
transmit_index: &mut u64,
received_index: u64,
) -> Result<()> {
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
// copy to avoid locking during IO
@ -259,31 +288,35 @@ impl Crdt {
return Err(Error::CrdtTooSmall);
}
trace!("nodes table {}", nodes.len());
trace!("blobs table {}", blobs.len());
// enumerate all the blobs, those are the indices
// enumerate all the blobs in the window, those are the indices
// transmit them to nodes, starting from a different node
let orders: Vec<_> = blobs
.iter()
.enumerate()
.zip(
nodes
.iter()
.cycle()
.skip((*transmit_index as usize) % nodes.len()),
)
.collect();
let mut orders = Vec::new();
let window_l = window.write().unwrap();
for i in *transmit_index..received_index {
let is = i as usize;
let k = is % window_l.len();
assert!(window_l[k].is_some());
orders.push((window_l[k].clone(), nodes[is % nodes.len()]));
}
trace!("orders table {}", orders.len());
let errs: Vec<_> = orders
.into_iter()
.map(|((i, b), v)| {
.map(|(b, v)| {
// only leader should be broadcasting
assert!(me.current_leader_id != v.id);
let mut blob = b.write().expect("'b' write lock in pub fn broadcast");
blob.set_id(me.id).expect("set_id in pub fn broadcast");
blob.set_index(*transmit_index + i as u64)
.expect("set_index in pub fn broadcast");
let bl = b.unwrap();
let blob = bl.read().expect("blob read lock in streamer::broadcast");
//TODO profile this, may need multiple sockets for par_iter
trace!("broadcast {} to {}", blob.meta.size, v.replicate_addr);
trace!(
"broadcast idx: {} sz: {} to {} coding: {}",
blob.get_index().unwrap(),
blob.meta.size,
v.replicate_addr,
blob.is_coding()
);
assert!(blob.meta.size < BLOB_SIZE);
let e = s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr);
trace!("done broadcast {} to {}", blob.meta.size, v.replicate_addr);
@ -390,7 +423,7 @@ impl Crdt {
let daddr = "0.0.0.0:0".parse().unwrap();
let valid: Vec<_> = self.table
.values()
.filter(|r| r.id != self.me && r.replicate_addr != daddr)
.filter(|r| r.id != self.me && r.repair_addr != daddr)
.collect();
if valid.is_empty() {
return Err(Error::CrdtTooSmall);
@ -482,12 +515,9 @@ impl Crdt {
if exit.load(Ordering::Relaxed) {
return;
}
//TODO this should be a tuned parameter
sleep(
obj.read()
.expect("'obj' read lock in pub fn gossip")
.timeout,
);
//TODO: possibly tune this parameter
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep
sleep(Duration::from_millis(100));
})
.unwrap()
}
@ -498,7 +528,7 @@ impl Crdt {
blob_recycler: &BlobRecycler,
) -> Option<SharedBlob> {
let pos = (ix as usize) % window.read().unwrap().len();
if let &Some(ref blob) = &window.read().unwrap()[pos] {
if let Some(blob) = &window.read().unwrap()[pos] {
let rblob = blob.read().unwrap();
let blob_ix = rblob.get_index().expect("run_window_request get_index");
if blob_ix == ix {
@ -509,7 +539,7 @@ impl Crdt {
let sz = rblob.meta.size;
outblob.meta.size = sz;
outblob.data[..sz].copy_from_slice(&rblob.data[..sz]);
outblob.meta.set_addr(&from.replicate_addr);
outblob.meta.set_addr(&from.repair_addr);
//TODO, set the sender id to the requester so we dont retransmit
//come up with a cleaner solution for this when sender signatures are checked
outblob.set_id(from.id).expect("blob set_id");
@ -518,7 +548,7 @@ impl Crdt {
}
} else {
assert!(window.read().unwrap()[pos].is_none());
info!("failed RequestWindowIndex {} {}", ix, from.replicate_addr);
info!("failed RequestWindowIndex {} {}", ix, from.repair_addr);
}
None
}
@ -574,16 +604,18 @@ impl Crdt {
None
}
Ok(Protocol::RequestWindowIndex(from, ix)) => {
//TODO this doesn't depend on CRDT module, can be moved
//but we are using the listen thread to service these request
//TODO verify from is signed
obj.write().unwrap().insert(&from);
let me = obj.read().unwrap().my_data().clone();
trace!(
"received RequestWindowIndex {} {} myaddr {}",
ix,
from.replicate_addr,
me.replicate_addr
from.repair_addr,
me.repair_addr
);
assert_ne!(from.replicate_addr, me.replicate_addr);
assert_ne!(from.repair_addr, me.repair_addr);
Self::run_window_request(&window, &from, ix, blob_recycler)
}
Err(_) => {
@ -656,6 +688,7 @@ pub struct Sockets {
pub transaction: UdpSocket,
pub respond: UdpSocket,
pub broadcast: UdpSocket,
pub repair: UdpSocket,
}
pub struct TestNode {
@ -672,6 +705,7 @@ impl TestNode {
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let repair = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let data = ReplicatedData::new(
pubkey,
@ -679,6 +713,7 @@ impl TestNode {
replicate.local_addr().unwrap(),
requests.local_addr().unwrap(),
transaction.local_addr().unwrap(),
repair.local_addr().unwrap(),
);
TestNode {
data: data,
@ -690,6 +725,7 @@ impl TestNode {
transaction,
respond,
broadcast,
repair,
},
}
}
@ -698,7 +734,14 @@ impl TestNode {
#[cfg(test)]
mod tests {
use crdt::{parse_port_or_addr, Crdt, ReplicatedData};
use packet::BlobRecycler;
use result::Error;
use signature::{KeyPair, KeyPairUtil};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer::default_window;
#[test]
fn test_parse_port_or_addr() {
@ -709,8 +752,6 @@ mod tests {
let p3 = parse_port_or_addr(None);
assert_eq!(p3.port(), 8000);
}
/// Test that insert drops messages that are older
#[test]
fn insert_test() {
let mut d = ReplicatedData::new(
@ -719,6 +760,7 @@ mod tests {
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone());
@ -736,6 +778,15 @@ mod tests {
copy
}
#[test]
fn replicated_data_new_leader() {
let d1 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap());
assert_eq!(d1.gossip_addr, "127.0.0.1:1235".parse().unwrap());
assert_eq!(d1.replicate_addr, "127.0.0.1:1236".parse().unwrap());
assert_eq!(d1.requests_addr, "127.0.0.1:1237".parse().unwrap());
assert_eq!(d1.transactions_addr, "127.0.0.1:1234".parse().unwrap());
assert_eq!(d1.repair_addr, "127.0.0.1:1238".parse().unwrap());
}
#[test]
fn update_test() {
let d1 = ReplicatedData::new(
KeyPair::new().pubkey(),
@ -743,6 +794,7 @@ mod tests {
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let d2 = ReplicatedData::new(
KeyPair::new().pubkey(),
@ -750,6 +802,7 @@ mod tests {
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let d3 = ReplicatedData::new(
KeyPair::new().pubkey(),
@ -757,6 +810,7 @@ mod tests {
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let mut crdt = Crdt::new(d1.clone());
let (key, ix, ups) = crdt.get_updates_since(0);
@ -784,5 +838,165 @@ mod tests {
sorted(&crdt.table.values().map(|x| x.clone()).collect())
);
}
#[test]
fn window_index_request() {
let me = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let mut crdt = Crdt::new(me.clone());
let rv = crdt.window_index_request(0);
assert_matches!(rv, Err(Error::CrdtTooSmall));
let nxt = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"0.0.0.0:0".parse().unwrap(),
);
crdt.insert(&nxt);
let rv = crdt.window_index_request(0);
assert_matches!(rv, Err(Error::CrdtTooSmall));
let nxt = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.2:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
crdt.insert(&nxt);
let rv = crdt.window_index_request(0).unwrap();
assert_eq!(nxt.gossip_addr, "127.0.0.2:1234".parse().unwrap());
assert_eq!(rv.0, "127.0.0.2:1234".parse().unwrap());
let nxt = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.3:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
crdt.insert(&nxt);
let mut one = false;
let mut two = false;
while !one || !two {
//this randomly picks an option, so eventually it should pick both
let rv = crdt.window_index_request(0).unwrap();
if rv.0 == "127.0.0.2:1234".parse().unwrap() {
one = true;
}
if rv.0 == "127.0.0.3:1234".parse().unwrap() {
two = true;
}
}
assert!(one && two);
}
/// test that gossip requests are eventually generated for all nodes
#[test]
fn gossip_request() {
let me = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let mut crdt = Crdt::new(me.clone());
let rv = crdt.gossip_request();
assert_matches!(rv, Err(Error::CrdtTooSmall));
let nxt1 = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.2:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
crdt.insert(&nxt1);
let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt1.gossip_addr);
let nxt2 = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.3:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
crdt.insert(&nxt2);
// check that the service works
// and that it eventually produces a request for both nodes
let (sender, reader) = channel();
let recycler = BlobRecycler::default();
let exit = Arc::new(AtomicBool::new(false));
let obj = Arc::new(RwLock::new(crdt));
let thread = Crdt::gossip(obj, recycler, sender, exit.clone());
let mut one = false;
let mut two = false;
for _ in 0..30 {
//50% chance each try that we get a repeat
let mut rv = reader.recv_timeout(Duration::new(1, 0)).unwrap();
while let Ok(mut more) = reader.try_recv() {
rv.append(&mut more);
}
assert!(rv.len() > 0);
for i in rv.iter() {
if i.read().unwrap().meta.addr() == nxt1.gossip_addr {
one = true;
} else if i.read().unwrap().meta.addr() == nxt2.gossip_addr {
two = true;
} else {
//unexpected request
assert!(false);
}
}
if one && two {
break;
}
}
exit.store(true, Ordering::Relaxed);
thread.join().unwrap();
//created requests to both
assert!(one && two);
}
/// test window requests respond with the right blob, and do not overrun
#[test]
fn run_window_request() {
let window = default_window();
let me = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let recycler = BlobRecycler::default();
let rv = Crdt::run_window_request(&window, &me, 0, &recycler);
assert!(rv.is_none());
let out = recycler.allocate();
out.write().unwrap().meta.size = 200;
window.write().unwrap()[0] = Some(out);
let rv = Crdt::run_window_request(&window, &me, 0, &recycler);
assert!(rv.is_some());
let v = rv.unwrap();
//test we copied the blob
assert_eq!(v.read().unwrap().meta.size, 200);
let len = window.read().unwrap().len() as u64;
let rv = Crdt::run_window_request(&window, &me, len, &recycler);
assert!(rv.is_none());
}
}

View File

@ -9,18 +9,29 @@ use transaction::Transaction;
/// Each Entry contains three pieces of data. The `num_hashes` field is the number
/// of hashes performed since the previous entry. The `id` field is the result
/// of hashing `id` from the previous entry `num_hashes` times. The `transactions`
/// field points to Events that took place shortly after `id` was generated.
/// field points to Transactions that took place shortly before `id` was generated.
///
/// If you divide `num_hashes` by the amount of time it takes to generate a new hash, you
/// get a duration estimate since the last Entry. Since processing power increases
/// over time, one should expect the duration `num_hashes` represents to decrease proportionally.
/// Though processing power varies across nodes, the network gives priority to the
/// fastest processor. Duration should therefore be estimated by assuming that the hash
/// was generated by the fastest processor at the time the entry was recorded.
/// An upper bound on Duration can be estimated by assuming each hash was generated by the
/// world's fastest processor at the time the entry was recorded. Or said another way, it
/// is physically not possible for a shorter duration to have occurred if one assumes the
/// hash was computed by the world's fastest processor at that time. The hash chain is both
/// a Verifiable Delay Function (VDF) and a Proof of Work (not to be confused with Proof or
/// Work consensus!)
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
pub struct Entry {
/// The number of hashes since the previous Entry ID.
pub num_hashes: u64,
/// The SHA-256 hash `num_hashes` after the previous Entry ID.
pub id: Hash,
/// An unordered list of transactions that were observed before the Entry ID was
/// generated. The may have been observed before a previous Entry ID but were
/// pushed back into this list to ensure deterministic interpretation of the ledger.
pub transactions: Vec<Transaction>,
}
@ -95,7 +106,7 @@ pub fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction
}
}
/// Creates the next Tick or Event Entry `num_hashes` after `start_hash`.
/// Creates the next Tick or Transaction Entry `num_hashes` after `start_hash`.
pub fn next_entry(start_hash: &Hash, num_hashes: u64, transactions: Vec<Transaction>) -> Entry {
Entry {
num_hashes,

View File

@ -1,4 +1,6 @@
//! The `entry_writer` module helps implement the TPU's write stage.
//! The `entry_writer` module helps implement the TPU's write stage. It
//! writes entries to the given writer, which is typically a file or
//! stdout, and then sends the Entry to its output channel.
use bank::Bank;
use entry::Entry;

View File

@ -1,17 +1,18 @@
// Support erasure coding
use packet::{BlobRecycler, SharedBlob};
use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE};
use std::result;
//TODO(sakridge) pick these values
const NUM_CODED: usize = 10;
const MAX_MISSING: usize = 2;
pub const NUM_CODED: usize = 20;
pub const MAX_MISSING: usize = 4;
const NUM_DATA: usize = NUM_CODED - MAX_MISSING;
#[derive(Debug, PartialEq, Eq)]
pub enum ErasureError {
NotEnoughBlocksToDecode,
DecodeError,
EncodeError,
InvalidBlockSize,
}
@ -73,12 +74,22 @@ pub fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Resul
let mut data_arg = Vec::new();
for block in data {
if block_len != block.len() {
trace!(
"data block size incorrect {} expected {}",
block.len(),
block_len
);
return Err(ErasureError::InvalidBlockSize);
}
data_arg.push(block.as_ptr());
}
for mut block in coding {
if block_len != block.len() {
trace!(
"coding block size incorrect {} expected {}",
block.len(),
block_len
);
return Err(ErasureError::InvalidBlockSize);
}
coding_arg.push(block.as_mut_ptr());
@ -150,59 +161,128 @@ pub fn decode_blocks(data: &mut [&mut [u8]], coding: &[&[u8]], erasures: &[i32])
Ok(())
}
// Generate coding blocks in window from consumed to consumed+NUM_DATA
// Allocate some coding blobs and insert into the blobs array
pub fn add_coding_blobs(recycler: &BlobRecycler, blobs: &mut Vec<SharedBlob>, consumed: u64) {
let mut added = 0;
let blobs_len = blobs.len() as u64;
for i in consumed..consumed + blobs_len {
let is = i as usize;
if is != 0 && ((is + MAX_MISSING) % NUM_CODED) == 0 {
for _ in 0..MAX_MISSING {
trace!("putting coding at {}", (i - consumed));
let new_blob = recycler.allocate();
let new_blob_clone = new_blob.clone();
let mut new_blob_l = new_blob_clone.write().unwrap();
new_blob_l.set_size(0);
new_blob_l.set_coding().unwrap();
drop(new_blob_l);
blobs.insert((i - consumed) as usize, new_blob);
added += 1;
}
}
}
info!(
"add_coding consumed: {} blobs.len(): {} added: {}",
consumed,
blobs.len(),
added
);
}
// Generate coding blocks in window starting from consumed
pub fn generate_coding(
re: &BlobRecycler,
window: &mut Vec<SharedBlob>,
window: &mut Vec<Option<SharedBlob>>,
consumed: usize,
num_blobs: usize,
) -> Result<()> {
let mut data_blobs = Vec::new();
let mut coding_blobs = Vec::new();
let mut data_locks = Vec::new();
let mut data_ptrs: Vec<&[u8]> = Vec::new();
let mut coding_locks = Vec::new();
let mut coding_ptrs: Vec<&mut [u8]> = Vec::new();
for i in consumed..consumed + NUM_DATA {
let n = i % window.len();
data_blobs.push(
window[n]
.clone()
.expect("'data_blobs' arr in pub fn generate_coding"),
);
}
for b in &data_blobs {
data_locks.push(b.write().expect("'b' write lock in pub fn generate_coding"));
}
for (i, l) in data_locks.iter_mut().enumerate() {
trace!("i: {} data: {}", i, l.data[0]);
data_ptrs.push(&l.data);
}
let mut block_start = consumed - (consumed % NUM_CODED);
// generate coding ptr array
let coding_start = consumed + NUM_DATA;
let coding_end = consumed + NUM_CODED;
for i in coding_start..coding_end {
let n = i % window.len();
window[n] = re.allocate();
coding_blobs.push(
window[n]
.clone()
.expect("'coding_blobs' arr in pub fn generate_coding"),
);
}
for b in &coding_blobs {
coding_locks.push(
b.write()
.expect("'coding_locks' arr in pub fn generate_coding"),
);
}
for (i, l) in coding_locks.iter_mut().enumerate() {
trace!("i: {} data: {}", i, l.data[0]);
coding_ptrs.push(&mut l.data);
}
for i in consumed..consumed + num_blobs {
if (i % NUM_CODED) == (NUM_CODED - 1) {
let mut data_blobs = Vec::new();
let mut coding_blobs = Vec::new();
let mut data_locks = Vec::new();
let mut data_ptrs: Vec<&[u8]> = Vec::new();
let mut coding_locks = Vec::new();
let mut coding_ptrs: Vec<&mut [u8]> = Vec::new();
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
trace!("consumed: {}", consumed);
info!(
"generate_coding start: {} end: {} consumed: {} num_blobs: {}",
block_start,
block_start + NUM_DATA,
consumed,
num_blobs
);
for i in block_start..block_start + NUM_DATA {
let n = i % window.len();
trace!("window[{}] = {:?}", n, window[n]);
if window[n].is_none() {
trace!("data block is null @ {}", n);
return Ok(());
}
data_blobs.push(
window[n]
.clone()
.expect("'data_blobs' arr in pub fn generate_coding"),
);
}
let mut max_data_size = 0;
for b in &data_blobs {
let lck = b.write().expect("'b' write lock in pub fn generate_coding");
if lck.meta.size > max_data_size {
max_data_size = lck.meta.size;
}
data_locks.push(lck);
}
trace!("max_data_size: {}", max_data_size);
for (i, l) in data_locks.iter_mut().enumerate() {
trace!("i: {} data: {}", i, l.data[0]);
data_ptrs.push(&l.data[..max_data_size]);
}
// generate coding ptr array
let coding_start = block_start + NUM_DATA;
let coding_end = block_start + NUM_CODED;
for i in coding_start..coding_end {
let n = i % window.len();
if window[n].is_none() {
trace!("coding block is null @ {}", n);
return Ok(());
}
let w_l = window[n].clone().unwrap();
w_l.write().unwrap().set_size(max_data_size);
if w_l.write().unwrap().set_coding().is_err() {
return Err(ErasureError::EncodeError);
}
coding_blobs.push(
window[n]
.clone()
.expect("'coding_blobs' arr in pub fn generate_coding"),
);
}
for b in &coding_blobs {
coding_locks.push(
b.write()
.expect("'coding_locks' arr in pub fn generate_coding"),
);
}
for (i, l) in coding_locks.iter_mut().enumerate() {
trace!("i: {} coding: {} size: {}", i, l.data[0], max_data_size);
coding_ptrs.push(&mut l.data_mut()[..max_data_size]);
}
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
debug!(
"consumed: {} data: {}:{} coding: {}:{}",
consumed,
block_start,
block_start + NUM_DATA,
coding_start,
coding_end
);
block_start += NUM_CODED;
}
}
Ok(())
}
@ -214,75 +294,142 @@ pub fn recover(
re: &BlobRecycler,
window: &mut Vec<Option<SharedBlob>>,
consumed: usize,
received: usize,
) -> Result<()> {
//recover with erasure coding
let mut data_missing = 0;
let mut coded_missing = 0;
let coding_start = consumed + NUM_DATA;
let coding_end = consumed + NUM_CODED;
for i in consumed..coding_end {
let n = i % window.len();
if window[n].is_none() {
if i >= coding_start {
coded_missing += 1;
} else {
data_missing += 1;
}
}
if received <= consumed {
return Ok(());
}
trace!("missing: data: {} coding: {}", data_missing, coded_missing);
if data_missing > 0 {
if (data_missing + coded_missing) <= MAX_MISSING {
let mut blobs: Vec<SharedBlob> = Vec::new();
let mut locks = Vec::new();
let mut data_ptrs: Vec<&mut [u8]> = Vec::new();
let mut coding_ptrs: Vec<&[u8]> = Vec::new();
let mut erasures: Vec<i32> = Vec::new();
for i in consumed..coding_end {
let j = i % window.len();
let mut b = &mut window[j];
if b.is_some() {
blobs.push(b.clone().expect("'blobs' arr in pb fn recover"));
continue;
}
let n = re.allocate();
*b = Some(n.clone());
//mark the missing memory
blobs.push(n);
erasures.push((i - consumed) as i32);
}
erasures.push(-1);
trace!("erasures: {:?}", erasures);
//lock everything
for b in &blobs {
locks.push(b.write().expect("'locks' arr in pb fn recover"));
}
for (i, l) in locks.iter_mut().enumerate() {
if i >= NUM_DATA {
trace!("pushing coding: {}", i);
coding_ptrs.push(&l.data);
} else {
trace!("pushing data: {}", i);
data_ptrs.push(&mut l.data);
}
}
trace!(
"coding_ptrs.len: {} data_ptrs.len {}",
coding_ptrs.len(),
data_ptrs.len()
);
decode_blocks(data_ptrs.as_mut_slice(), &coding_ptrs, &erasures)?;
} else {
return Err(ErasureError::NotEnoughBlocksToDecode);
let num_blocks = (received - consumed) / NUM_CODED;
let mut block_start = consumed - (consumed % NUM_CODED);
if num_blocks > 0 {
debug!(
"num_blocks: {} received: {} consumed: {}",
num_blocks, received, consumed
);
}
for i in 0..num_blocks {
if i > 100 {
break;
}
let mut data_missing = 0;
let mut coded_missing = 0;
let coding_start = block_start + NUM_DATA;
let coding_end = block_start + NUM_CODED;
trace!(
"recover: block_start: {} coding_start: {} coding_end: {}",
block_start,
coding_start,
coding_end
);
for i in block_start..coding_end {
let n = i % window.len();
if window[n].is_none() {
if i >= coding_start {
coded_missing += 1;
} else {
data_missing += 1;
}
}
}
if (data_missing + coded_missing) != NUM_CODED && (data_missing + coded_missing) != 0 {
debug!(
"1: start: {} recovering: data: {} coding: {}",
block_start, data_missing, coded_missing
);
}
if data_missing > 0 {
if (data_missing + coded_missing) <= MAX_MISSING {
debug!(
"2: recovering: data: {} coding: {}",
data_missing, coded_missing
);
let mut blobs: Vec<SharedBlob> = Vec::new();
let mut locks = Vec::new();
let mut erasures: Vec<i32> = Vec::new();
let mut meta = None;
let mut size = None;
for i in block_start..coding_end {
let j = i % window.len();
let mut b = &mut window[j];
if b.is_some() {
if i >= NUM_DATA && size.is_none() {
let bl = b.clone().unwrap();
size = Some(bl.read().unwrap().meta.size - BLOB_HEADER_SIZE);
}
if meta.is_none() {
let bl = b.clone().unwrap();
meta = Some(bl.read().unwrap().meta.clone());
}
blobs.push(b.clone().expect("'blobs' arr in pb fn recover"));
continue;
}
let n = re.allocate();
*b = Some(n.clone());
//mark the missing memory
blobs.push(n);
erasures.push((i - block_start) as i32);
}
erasures.push(-1);
trace!(
"erasures: {:?} data_size: {} header_size: {}",
erasures,
size.unwrap(),
BLOB_HEADER_SIZE
);
//lock everything
for b in &blobs {
locks.push(b.write().expect("'locks' arr in pb fn recover"));
}
{
let mut coding_ptrs: Vec<&[u8]> = Vec::new();
let mut data_ptrs: Vec<&mut [u8]> = Vec::new();
for (i, l) in locks.iter_mut().enumerate() {
if i >= NUM_DATA {
trace!("pushing coding: {}", i);
coding_ptrs.push(&l.data()[..size.unwrap()]);
} else {
trace!("pushing data: {}", i);
data_ptrs.push(&mut l.data[..size.unwrap()]);
}
}
trace!(
"coding_ptrs.len: {} data_ptrs.len {}",
coding_ptrs.len(),
data_ptrs.len()
);
decode_blocks(data_ptrs.as_mut_slice(), &coding_ptrs, &erasures)?;
}
for i in &erasures[..erasures.len() - 1] {
let idx = *i as usize;
let data_size = locks[idx].get_data_size().unwrap() - BLOB_HEADER_SIZE as u64;
locks[idx].meta = meta.clone().unwrap();
locks[idx].set_size(data_size as usize);
trace!(
"erasures[{}] size: {} data[0]: {}",
*i,
data_size,
locks[idx].data()[0]
);
}
}
}
block_start += NUM_CODED;
}
Ok(())
}
#[cfg(test)]
mod test {
use crdt;
use erasure;
use packet::{BlobRecycler, SharedBlob, PACKET_DATA_SIZE};
use logger;
use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE};
use signature::KeyPair;
use signature::KeyPairUtil;
use std::sync::{Arc, RwLock};
#[test]
pub fn test_coding() {
@ -338,10 +485,15 @@ mod test {
for (i, w) in window.iter().enumerate() {
print!("window({}): ", i);
if w.is_some() {
let window_lock = w.clone().unwrap();
let window_data = window_lock.read().unwrap().data;
let window_l1 = w.clone().unwrap();
let window_l2 = window_l1.read().unwrap();
print!(
"index: {:?} meta.size: {} data: ",
window_l2.get_index(),
window_l2.meta.size
);
for i in 0..8 {
print!("{} ", window_data[i]);
print!("{} ", window_l2.data()[i]);
}
} else {
print!("null");
@ -350,45 +502,102 @@ mod test {
}
}
#[test]
pub fn test_window_recover() {
let mut window = Vec::new();
let blob_recycler = BlobRecycler::default();
let offset = 4;
for i in 0..(4 * erasure::NUM_CODED + 1) {
fn generate_window(
data_len: usize,
blob_recycler: &BlobRecycler,
offset: usize,
num_blobs: usize,
) -> (Vec<Option<SharedBlob>>, usize) {
let mut window = vec![None; 32];
let mut blobs = Vec::new();
for i in 0..num_blobs {
let b = blob_recycler.allocate();
let b_ = b.clone();
let data_len = b.read().unwrap().data.len();
let mut w = b.write().unwrap();
w.set_index(i as u64).unwrap();
assert_eq!(i as u64, w.get_index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.set_size(data_len);
for k in 0..data_len {
w.data[k] = (k + i) as u8;
w.data_mut()[k] = (k + i) as u8;
}
window.push(Some(b_));
blobs.push(b_);
}
erasure::add_coding_blobs(blob_recycler, &mut blobs, offset as u64);
let blobs_len = blobs.len();
let d = crdt::ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let crdt = Arc::new(RwLock::new(crdt::Crdt::new(d.clone())));
assert!(crdt::Crdt::index_blobs(&crdt, &blobs, &mut (offset as u64)).is_ok());
for b in blobs {
let idx = b.read().unwrap().get_index().unwrap() as usize;
window[idx] = Some(b);
}
(window, blobs_len)
}
#[test]
pub fn test_window_recover_basic() {
logger::setup();
let data_len = 16;
let blob_recycler = BlobRecycler::default();
// Generate a window
let offset = 1;
let num_blobs = erasure::NUM_DATA + 2;
let (mut window, blobs_len) = generate_window(data_len, &blob_recycler, 0, num_blobs);
println!("** after-gen-window:");
print_window(&window);
// Generate the coding blocks
assert!(erasure::generate_coding(&mut window, offset, blobs_len).is_ok());
println!("** after-gen-coding:");
print_window(&window);
let erase_offset = offset;
// Create a hole in the window
let refwindow = window[erase_offset].clone();
window[erase_offset] = None;
// Recover it from coding
assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + blobs_len).is_ok());
println!("** after-recover:");
print_window(&window);
// Check the result
let window_l = window[erase_offset].clone().unwrap();
let window_l2 = window_l.read().unwrap();
let ref_l = refwindow.clone().unwrap();
let ref_l2 = ref_l.read().unwrap();
assert_eq!(
window_l2.data[..(data_len + BLOB_HEADER_SIZE)],
ref_l2.data[..(data_len + BLOB_HEADER_SIZE)]
);
assert_eq!(window_l2.meta.size, ref_l2.meta.size);
assert_eq!(window_l2.meta.addr, ref_l2.meta.addr);
assert_eq!(window_l2.meta.port, ref_l2.meta.port);
assert_eq!(window_l2.meta.v6, ref_l2.meta.v6);
assert_eq!(window_l2.get_index().unwrap(), erase_offset as u64);
}
//TODO This needs to be reworked
#[test]
#[ignore]
pub fn test_window_recover() {
logger::setup();
let blob_recycler = BlobRecycler::default();
let offset = 4;
let data_len = 16;
let num_blobs = erasure::NUM_DATA + 2;
let (mut window, blobs_len) = generate_window(data_len, &blob_recycler, offset, num_blobs);
println!("** after-gen:");
print_window(&window);
assert!(erasure::generate_coding(&blob_recycler, &mut window, offset).is_ok());
assert!(
erasure::generate_coding(&blob_recycler, &mut window, offset + erasure::NUM_CODED)
.is_ok()
);
assert!(
erasure::generate_coding(
&blob_recycler,
&mut window,
offset + (2 * erasure::NUM_CODED)
).is_ok()
);
assert!(
erasure::generate_coding(
&blob_recycler,
&mut window,
offset + (3 * erasure::NUM_CODED)
).is_ok()
);
assert!(erasure::generate_coding(&mut window, offset, blobs_len).is_ok());
println!("** after-coding:");
print_window(&window);
let refwindow = window[offset + 1].clone();
@ -402,29 +611,14 @@ mod test {
window_l0.write().unwrap().data[0] = 55;
println!("** after-nulling:");
print_window(&window);
assert!(erasure::recover(&blob_recycler, &mut window, offset).is_ok());
assert!(erasure::recover(&blob_recycler, &mut window, offset + erasure::NUM_CODED).is_ok());
assert!(
erasure::recover(
&blob_recycler,
&mut window,
offset + (2 * erasure::NUM_CODED)
).is_err()
);
assert!(
erasure::recover(
&blob_recycler,
&mut window,
offset + (3 * erasure::NUM_CODED)
).is_ok()
);
assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + blobs_len).is_ok());
println!("** after-restore:");
print_window(&window);
let window_l = window[offset + 1].clone().unwrap();
let ref_l = refwindow.clone().unwrap();
assert_eq!(
window_l.read().unwrap().data.to_vec(),
ref_l.read().unwrap().data.to_vec()
window_l.read().unwrap().data()[..data_len],
ref_l.read().unwrap().data()[..data_len]
);
}
}

View File

@ -1,9 +1,18 @@
//! The `solana` library implements the Solana high-performance blockchain architecture.
//! It includes a full Rust implementation of the architecture (see
//! [Server](server/struct.Server.html)) as well as hooks to GPU implementations of its most
//! paralellizable components (i.e. [SigVerify](sigverify/index.html)). It also includes
//! command-line tools to spin up fullnodes and a Rust library
//! (see [ThinClient](thin_client/struct.ThinClient.html)) to interact with them.
//!
#![cfg_attr(feature = "unstable", feature(test))]
#[macro_use]
pub mod counter;
pub mod bank;
pub mod banking_stage;
pub mod budget;
pub mod crdt;
pub mod data_replicator;
pub mod entry;
pub mod entry_writer;
#[cfg(feature = "erasure")]
@ -13,6 +22,7 @@ pub mod hash;
pub mod ledger;
pub mod logger;
pub mod mint;
pub mod ncp;
pub mod packet;
pub mod payment_plan;
pub mod record_stage;

View File

@ -1,3 +1,6 @@
//! The `logger` module provides a setup function for `env_logger`. Its only function,
//! `setup()` may be called multiple times.
use std::sync::{Once, ONCE_INIT};
extern crate env_logger;

View File

@ -1,3 +1,5 @@
//! The `ncp` module implements the network control plane.
use crdt;
use packet;
use result::Result;
@ -8,22 +10,22 @@ use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;
use streamer;
pub struct DataReplicator {
pub struct Ncp {
pub thread_hdls: Vec<JoinHandle<()>>,
}
impl DataReplicator {
impl Ncp {
pub fn new(
crdt: Arc<RwLock<crdt::Crdt>>,
window: Arc<RwLock<Vec<Option<packet::SharedBlob>>>>,
gossip_listen_socket: UdpSocket,
gossip_send_socket: UdpSocket,
exit: Arc<AtomicBool>,
) -> Result<DataReplicator> {
) -> Result<Ncp> {
let blob_recycler = packet::BlobRecycler::default();
let (request_sender, request_receiver) = channel();
trace!(
"DataReplicator: id: {:?}, listening on: {:?}",
"Ncp: id: {:?}, listening on: {:?}",
&crdt.read().unwrap().me[..4],
gossip_listen_socket.local_addr().unwrap()
);
@ -50,14 +52,14 @@ impl DataReplicator {
);
let t_gossip = crdt::Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit);
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
Ok(DataReplicator { thread_hdls })
Ok(Ncp { thread_hdls })
}
}
#[cfg(test)]
mod tests {
use crdt::{Crdt, TestNode};
use data_replicator::DataReplicator;
use ncp::Ncp;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
@ -69,7 +71,7 @@ mod tests {
let crdt = Crdt::new(tn.data.clone());
let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![]));
let d = DataReplicator::new(
let d = Ncp::new(
c.clone(),
w,
tn.sockets.gossip,

View File

@ -1,6 +1,7 @@
//! The `packet` module defines data structures and methods to pull data from the network.
use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use counter::Counter;
use result::{Error, Result};
use serde::Serialize;
use signature::PublicKey;
@ -9,7 +10,9 @@ use std::fmt;
use std::io;
use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Instant;
pub type SharedPackets = Arc<RwLock<Packets>>;
pub type SharedBlob = Arc<RwLock<Blob>>;
@ -169,6 +172,7 @@ impl<T: Default> Recycler<T> {
impl Packets {
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
static mut COUNTER: Counter = create_counter!("packets", 10);
self.packets.resize(NUM_PACKETS, Packet::default());
let mut i = 0;
//DOCUMENTED SIDE-EFFECT
@ -178,11 +182,13 @@ impl Packets {
// * read until it fails
// * set it back to blocking before returning
socket.set_nonblocking(false)?;
let mut start = Instant::now();
for p in &mut self.packets {
p.meta.size = 0;
trace!("receiving on {}", socket.local_addr().unwrap());
match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => {
inc_counter!(COUNTER, i, start);
debug!("got {:?} messages on {}", i, socket.local_addr().unwrap());
break;
}
@ -194,6 +200,7 @@ impl Packets {
p.meta.size = nrecv;
p.meta.set_addr(&from);
if i == 0 {
start = Instant::now();
socket.set_nonblocking(true)?;
}
}
@ -271,6 +278,17 @@ pub fn to_blobs<T: Serialize>(
const BLOB_INDEX_END: usize = size_of::<u64>();
const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::<usize>() + size_of::<PublicKey>();
const BLOB_FLAGS_END: usize = BLOB_ID_END + size_of::<u32>();
const BLOB_SIZE_END: usize = BLOB_FLAGS_END + size_of::<u64>();
macro_rules! align {
($x:expr, $align:expr) => {
$x + ($align - 1) & !($align - 1)
};
}
pub const BLOB_FLAG_IS_CODING: u32 = 0x1;
pub const BLOB_HEADER_SIZE: usize = align!(BLOB_SIZE_END, 64);
impl Blob {
pub fn get_index(&self) -> Result<u64> {
@ -297,14 +315,51 @@ impl Blob {
Ok(())
}
pub fn get_flags(&self) -> Result<u32> {
let mut rdr = io::Cursor::new(&self.data[BLOB_ID_END..BLOB_FLAGS_END]);
let r = rdr.read_u32::<LittleEndian>()?;
Ok(r)
}
pub fn set_flags(&mut self, ix: u32) -> Result<()> {
let mut wtr = vec![];
wtr.write_u32::<LittleEndian>(ix)?;
self.data[BLOB_ID_END..BLOB_FLAGS_END].clone_from_slice(&wtr);
Ok(())
}
pub fn is_coding(&self) -> bool {
return (self.get_flags().unwrap() & BLOB_FLAG_IS_CODING) != 0;
}
pub fn set_coding(&mut self) -> Result<()> {
let flags = self.get_flags().unwrap();
self.set_flags(flags | BLOB_FLAG_IS_CODING)
}
pub fn get_data_size(&self) -> Result<u64> {
let mut rdr = io::Cursor::new(&self.data[BLOB_FLAGS_END..BLOB_SIZE_END]);
let r = rdr.read_u64::<LittleEndian>()?;
Ok(r)
}
pub fn set_data_size(&mut self, ix: u64) -> Result<()> {
let mut wtr = vec![];
wtr.write_u64::<LittleEndian>(ix)?;
self.data[BLOB_FLAGS_END..BLOB_SIZE_END].clone_from_slice(&wtr);
Ok(())
}
pub fn data(&self) -> &[u8] {
&self.data[BLOB_ID_END..]
&self.data[BLOB_HEADER_SIZE..]
}
pub fn data_mut(&mut self) -> &mut [u8] {
&mut self.data[BLOB_ID_END..]
&mut self.data[BLOB_HEADER_SIZE..]
}
pub fn set_size(&mut self, size: usize) {
self.meta.size = size + BLOB_ID_END;
let new_size = size + BLOB_HEADER_SIZE;
self.meta.size = new_size;
self.set_data_size(new_size as u64).unwrap();
}
pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result<VecDeque<SharedBlob>> {
let mut v = VecDeque::new();

View File

@ -6,18 +6,27 @@
use chrono::prelude::*;
use signature::PublicKey;
/// The types of events a payment plan can process.
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
pub enum Witness {
/// The current time.
Timestamp(DateTime<Utc>),
/// A siganture from PublicKey.
Signature(PublicKey),
}
/// Some amount of tokens that should be sent to the `to` `PublicKey`.
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
pub struct Payment {
/// Amount to be paid.
pub tokens: i64,
/// The `PublicKey` that `tokens` should be paid to.
pub to: PublicKey,
}
/// Interface to smart contracts.
pub trait PaymentPlan {
/// Return Payment if the payment plan requires no additional Witnesses.
fn final_payment(&self) -> Option<Payment>;

View File

@ -1,8 +1,8 @@
//! The `record_stage` module provides an object for generating a Proof of History.
//! It records Event items on behalf of its users. It continuously generates
//! new hashes, only stopping to check if it has been sent an Event item. It
//! tags each Event with an Entry, and sends it back. The Entry includes the
//! Event, the latest hash, and the number of hashes since the last transaction.
//! It records Transaction items on behalf of its users. It continuously generates
//! new hashes, only stopping to check if it has been sent an Transaction item. It
//! tags each Transaction with an Entry, and sends it back. The Entry includes the
//! Transaction, the latest hash, and the number of hashes since the last transaction.
//! The resulting stream of entries represents ordered transactions in time.
use entry::Entry;
@ -16,7 +16,7 @@ use transaction::Transaction;
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
pub enum Signal {
Tick,
Events(Vec<Transaction>),
Transactions(Vec<Transaction>),
}
pub struct RecordStage {
@ -25,7 +25,7 @@ pub struct RecordStage {
}
impl RecordStage {
/// A background thread that will continue tagging received Event messages and
/// A background thread that will continue tagging received Transaction messages and
/// sending back Entry messages until either the receiver or sender channel is closed.
pub fn new(signal_receiver: Receiver<Signal>, start_hash: &Hash) -> Self {
let (entry_sender, entry_receiver) = channel();
@ -85,7 +85,7 @@ impl RecordStage {
recorder: &mut Recorder,
sender: &Sender<Entry>,
) -> Result<(), ()> {
let txs = if let Signal::Events(txs) = signal {
let txs = if let Signal::Transactions(txs) = signal {
txs
} else {
vec![]
@ -180,7 +180,9 @@ mod tests {
let bob_pubkey = KeyPair::new().pubkey();
let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero);
let tx1 = Transaction::new(&alice_keypair, bob_pubkey, 2, zero);
tx_sender.send(Signal::Events(vec![tx0, tx1])).unwrap();
tx_sender
.send(Signal::Transactions(vec![tx0, tx1]))
.unwrap();
drop(tx_sender);
let entries: Vec<_> = record_stage.entry_receiver.iter().collect();
assert_eq!(entries.len(), 1);

View File

@ -1,5 +1,5 @@
//! The `recorder` module provides an object for generating a Proof of History.
//! It records Event items on behalf of its users.
//! It records Transaction items on behalf of its users.
use entry::Entry;
use hash::{hash, Hash};

View File

@ -43,7 +43,7 @@ impl RequestStage {
) -> Result<()> {
let (batch, batch_len) = streamer::recv_batch(packet_receiver)?;
info!(
debug!(
"@{:?} request_stage: processing: {}",
timing::timestamp(),
batch_len
@ -70,7 +70,7 @@ impl RequestStage {
}
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
info!(
debug!(
"@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}",
timing::timestamp(),
batch_len,

View File

@ -1,5 +1,27 @@
//! The `rpu` module implements the Request Processing Unit, a
//! 5-stage transaction processing pipeline in software.
//! 3-stage transaction processing pipeline in software. It listens
//! for `Request` messages from clients and replies with `Response`
//! messages.
//!
//! ```text
//! .------.
//! | Bank |
//! `---+--`
//! |
//! .------------------|-------------------.
//! | RPU | |
//! | v |
//! .---------. | .-------. .---------. .---------. | .---------.
//! | Alice |--->| | | | | +---->| Alice |
//! `---------` | | Fetch | | Request | | Respond | | `---------`
//! | | Stage |->| Stage |->| Stage | |
//! .---------. | | | | | | | | .---------.
//! | Bob |--->| | | | | +---->| Bob |
//! `---------` | `-------` `---------` `---------` | `---------`
//! | |
//! | |
//! `--------------------------------------`
//! ```
use bank::Bank;
use packet;

View File

@ -2,7 +2,7 @@
use bank::Bank;
use crdt::{Crdt, ReplicatedData};
use data_replicator::DataReplicator;
use ncp::Ncp;
use packet;
use rpu::Rpu;
use std::io::Write;
@ -20,6 +20,30 @@ pub struct Server {
}
impl Server {
/// Create a server instance acting as a leader.
///
/// ```text
/// .---------------------.
/// | Leader |
/// | |
/// .--------. | .-----. |
/// | |---->| | |
/// | Client | | | RPU | |
/// | |<----| | |
/// `----+---` | `-----` |
/// | | ^ |
/// | | | |
/// | | .--+---. |
/// | | | Bank | |
/// | | `------` |
/// | | ^ |
/// | | | | .------------.
/// | | .--+--. .-----. | | |
/// `-------->| TPU +-->| NCP +------>| Validators |
/// | `-----` `-----` | | |
/// | | `------------`
/// `---------------------`
/// ```
pub fn new_leader<W: Write + Send + 'static>(
bank: Bank,
tick_duration: Option<Duration>,
@ -51,14 +75,14 @@ impl Server {
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
let window = streamer::default_window();
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let data_replicator = DataReplicator::new(
let ncp = Ncp::new(
crdt.clone(),
window.clone(),
gossip_socket,
gossip_send_socket,
exit.clone(),
).expect("DataReplicator::new");
thread_hdls.extend(data_replicator.thread_hdls);
).expect("Ncp::new");
thread_hdls.extend(ncp.thread_hdls);
let t_broadcast = streamer::broadcaster(
broadcast_socket,
@ -72,6 +96,31 @@ impl Server {
Server { thread_hdls }
}
/// Create a server instance acting as a validator.
///
/// ```text
/// .-------------------------------.
/// | Validator |
/// | |
/// .--------. | .-----. |
/// | |-------------->| | |
/// | Client | | | RPU | |
/// | |<--------------| | |
/// `--------` | `-----` |
/// | ^ |
/// | | |
/// | .--+---. |
/// | | Bank | |
/// | `------` |
/// | ^ |
/// .--------. | | | .------------.
/// | | | .-----. .--+--. .-----. | | |
/// | Leader |--->| NCP +-->| TVU +-->| NCP +------>| Validators |
/// | | | `-----` `-----` `-----` | | |
/// `--------` | | `------------`
/// `-------------------------------`
/// ```
pub fn new_validator(
bank: Bank,
me: ReplicatedData,
@ -79,6 +128,7 @@ impl Server {
respond_socket: UdpSocket,
replicate_socket: UdpSocket,
gossip_socket: UdpSocket,
repair_socket: UdpSocket,
leader_repl_data: ReplicatedData,
exit: Arc<AtomicBool>,
) -> Self {
@ -91,6 +141,7 @@ impl Server {
me,
gossip_socket,
replicate_socket,
repair_socket,
leader_repl_data,
exit.clone(),
);
@ -98,3 +149,34 @@ impl Server {
Server { thread_hdls }
}
}
#[cfg(test)]
mod tests {
use bank::Bank;
use crdt::TestNode;
use mint::Mint;
use server::Server;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
#[test]
fn validator_exit() {
let tn = TestNode::new();
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let v = Server::new_validator(
bank,
tn.data.clone(),
tn.sockets.requests,
tn.sockets.respond,
tn.sockets.replicate,
tn.sockets.gossip,
tn.sockets.repair,
tn.data,
exit.clone(),
);
exit.store(true, Ordering::Relaxed);
for t in v.thread_hdls {
t.join().unwrap();
}
}
}

View File

@ -1,5 +1,14 @@
//! The `sigverify` module provides digital signature verification functions.
//! By default, signatures are verified in parallel using all available CPU
//! cores. When `--features=cuda` is enabled, signature verification is
//! offloaded to the GPU.
//!
use counter::Counter;
use packet::{Packet, SharedPackets};
use std::mem::size_of;
use std::sync::atomic::AtomicUsize;
use std::time::Instant;
use transaction::{PUB_KEY_OFFSET, SIGNED_DATA_OFFSET, SIG_OFFSET};
pub const TX_OFFSET: usize = 0;
@ -61,8 +70,11 @@ fn batch_size(batches: &Vec<SharedPackets>) -> usize {
#[cfg(not(feature = "cuda"))]
pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
use rayon::prelude::*;
static mut COUNTER: Counter = create_counter!("ed25519_verify", 1);
let start = Instant::now();
let count = batch_size(batches);
info!("CPU ECDSA for {}", batch_size(batches));
batches
let rv = batches
.into_par_iter()
.map(|p| {
p.read()
@ -72,13 +84,17 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
.map(verify_packet)
.collect()
})
.collect()
.collect();
inc_counter!(COUNTER, count, start);
rv
}
#[cfg(feature = "cuda")]
pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
use packet::PACKET_DATA_SIZE;
static mut COUNTER: Counter = create_counter!("ed25519_verify_cuda", 1);
let start = Instant::now();
let count = batch_size(batches);
info!("CUDA ECDSA for {}", batch_size(batches));
let mut out = Vec::new();
let mut elems = Vec::new();
@ -137,6 +153,7 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
num += 1;
}
}
inc_counter!(COUNTER, count, start);
rvs
}

View File

@ -1,4 +1,9 @@
//! The `sigverify_stage` implements the signature verification stage of the TPU.
//! The `sigverify_stage` implements the signature verification stage of the TPU. It
//! receives a list of lists of packets and outputs the same list, but tags each
//! top-level list with a list of booleans, telling the next stage whether the
//! signature in that packet is valid. It assumes each packet contains one
//! transaction. All processing is done on the CPU by default and on a GPU
//! if the `cuda` feature is enabled with `--features=cuda`.
use packet::SharedPackets;
use rand::{thread_rng, Rng};

View File

@ -170,12 +170,24 @@ fn find_next_missing(
fn repair_window(
locked_window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
crdt: &Arc<RwLock<Crdt>>,
_recycler: &BlobRecycler,
last: &mut usize,
times: &mut usize,
consumed: &mut usize,
received: &mut usize,
) -> Result<()> {
let reqs = find_next_missing(locked_window, crdt, consumed, received)?;
#[cfg(feature = "erasure")]
{
if erasure::recover(
_recycler,
&mut locked_window.write().unwrap(),
*consumed,
*received,
).is_err()
{
trace!("erasure::recover failed");
}
}
//exponential backoff
if *last != *consumed {
*times = 0;
@ -187,6 +199,7 @@ fn repair_window(
trace!("repair_window counter {} {}", *times, *consumed);
return Ok(());
}
let reqs = find_next_missing(locked_window, crdt, consumed, received)?;
let sock = UdpSocket::bind("0.0.0.0:0")?;
for (to, req) in reqs {
//todo cache socket
@ -261,16 +274,26 @@ fn recv_window(
if pix > *received {
*received = pix;
}
// Got a blob which has already been consumed, skip it
// probably from a repair window request
if pix < *consumed {
debug!(
"received: {} but older than consumed: {} skipping..",
pix, *consumed
);
continue;
}
let w = pix % WINDOW_SIZE;
//TODO, after the block are authenticated
//if we get different blocks at the same index
//that is a network failure/attack
trace!("window w: {} size: {}", w, p.meta.size);
drop(p);
{
let mut window = locked_window.write().unwrap();
if window[w].is_none() {
window[w] = Some(b_);
} else if let &Some(ref cblob) = &window[w] {
} else if let Some(cblob) = &window[w] {
if cblob.read().unwrap().get_index().unwrap() != pix as u64 {
warn!("overrun blob at index {:}", w);
} else {
@ -283,12 +306,54 @@ fn recv_window(
if window[k].is_none() {
break;
}
contq.push_back(window[k].clone().expect("clone in fn recv_window"));
window[k] = None;
*consumed += 1;
let mut is_coding = false;
if let &Some(ref cblob) = &window[k] {
if cblob
.read()
.expect("blob read lock for flags streamer::window")
.is_coding()
{
is_coding = true;
}
}
if !is_coding {
contq.push_back(window[k].clone().expect("clone in fn recv_window"));
*consumed += 1;
#[cfg(not(feature = "erasure"))]
{
window[k] = None;
}
} else {
#[cfg(feature = "erasure")]
{
let block_start = *consumed - (*consumed % erasure::NUM_CODED);
let coding_end = block_start + erasure::NUM_CODED;
// We've received all this block's data blobs, go and null out the window now
for j in block_start..coding_end {
window[j % WINDOW_SIZE] = None;
}
*consumed += erasure::MAX_MISSING;
debug!(
"skipping processing coding blob k: {} consumed: {}",
k, *consumed
);
}
}
}
}
}
print_window(locked_window, *consumed);
trace!("sending contq.len: {}", contq.len());
if !contq.is_empty() {
trace!("sending contq.len: {}", contq.len());
s.send(contq)?;
}
Ok(())
}
fn print_window(locked_window: &Arc<RwLock<Vec<Option<SharedBlob>>>>, consumed: usize) {
{
let buf: Vec<_> = locked_window
.read()
@ -296,24 +361,25 @@ fn recv_window(
.iter()
.enumerate()
.map(|(i, v)| {
if i == (*consumed % WINDOW_SIZE) {
assert!(v.is_none());
if i == (consumed % WINDOW_SIZE) {
"_"
} else if v.is_none() {
"0"
} else {
"1"
if let &Some(ref cblob) = &v {
if cblob.read().unwrap().is_coding() {
"C"
} else {
"1"
}
} else {
"0"
}
}
})
.collect();
trace!("WINDOW: {}", buf.join(""));
debug!("WINDOW ({}): {}", consumed, buf.join(""));
}
trace!("sending contq.len: {}", contq.len());
if !contq.is_empty() {
trace!("sending contq.len: {}", contq.len());
s.send(contq)?;
}
Ok(())
}
pub fn default_window() -> Arc<RwLock<Vec<Option<SharedBlob>>>> {
@ -353,6 +419,7 @@ pub fn window(
let _ = repair_window(
&window,
&crdt,
&recycler,
&mut last,
&mut times,
&mut consumed,
@ -370,17 +437,26 @@ fn broadcast(
r: &BlobReceiver,
sock: &UdpSocket,
transmit_index: &mut u64,
receive_index: &mut u64,
) -> Result<()> {
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq);
}
let mut blobs = dq.into_iter().collect();
/// appends codes to the list of blobs allowing us to reconstruct the stream
let mut blobs: Vec<_> = dq.into_iter().collect();
print_window(window, *receive_index as usize);
// Insert the coding blobs into the blob stream
#[cfg(feature = "erasure")]
erasure::generate_coding(re, blobs, consumed);
Crdt::broadcast(crdt, &blobs, &sock, transmit_index)?;
erasure::add_coding_blobs(recycler, &mut blobs, *receive_index);
let blobs_len = blobs.len();
info!("broadcast blobs.len: {}", blobs_len);
// Index the blobs
Crdt::index_blobs(crdt, &blobs, receive_index)?;
// keep the cache of blobs that are broadcast
{
let mut win = window.write().unwrap();
@ -407,6 +483,24 @@ fn broadcast(
win[pos] = Some(b);
}
}
// Fill in the coding blob data from the window data blobs
#[cfg(feature = "erasure")]
{
if erasure::generate_coding(
&mut window.write().unwrap(),
*receive_index as usize,
blobs_len,
).is_err()
{
return Err(Error::GenericError);
}
}
*receive_index += blobs_len as u64;
// Send blobs out from the window
Crdt::broadcast(crdt, &window, &sock, transmit_index, *receive_index)?;
Ok(())
}
@ -431,11 +525,20 @@ pub fn broadcaster(
.name("solana-broadcaster".to_string())
.spawn(move || {
let mut transmit_index = 0;
let mut receive_index = 0;
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = broadcast(&crdt, &window, &recycler, &r, &sock, &mut transmit_index);
let _ = broadcast(
&crdt,
&window,
&recycler,
&r,
&sock,
&mut transmit_index,
&mut receive_index,
);
}
})
.unwrap()
@ -602,10 +705,8 @@ mod bench {
#[cfg(test)]
mod test {
use crdt::{Crdt, ReplicatedData};
use crdt::{Crdt, TestNode};
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
use signature::KeyPair;
use signature::KeyPairUtil;
use std::collections::VecDeque;
use std::io;
use std::io::Write;
@ -688,29 +789,21 @@ mod test {
#[test]
pub fn window_send_test() {
let pubkey_me = KeyPair::new().pubkey();
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let serve = UdpSocket::bind("127.0.0.1:0").expect("bind");
let transaction = UdpSocket::bind("127.0.0.1:0").expect("bind");
let tn = TestNode::new();
let exit = Arc::new(AtomicBool::new(false));
let rep_data = ReplicatedData::new(
pubkey_me,
read.local_addr().unwrap(),
send.local_addr().unwrap(),
serve.local_addr().unwrap(),
transaction.local_addr().unwrap(),
);
let mut crdt_me = Crdt::new(rep_data);
let mut crdt_me = Crdt::new(tn.data.clone());
let me_id = crdt_me.my_data().id;
crdt_me.set_leader(me_id);
let subs = Arc::new(RwLock::new(crdt_me));
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver =
blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap();
let t_receiver = blob_receiver(
exit.clone(),
resp_recycler.clone(),
tn.sockets.gossip,
s_reader,
).unwrap();
let (s_window, r_window) = channel();
let (s_retransmit, r_retransmit) = channel();
let win = default_window();
@ -724,7 +817,12 @@ mod test {
s_retransmit,
);
let (s_responder, r_responder) = channel();
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
let t_responder = responder(
tn.sockets.replicate,
exit.clone(),
resp_recycler.clone(),
r_responder,
);
let mut msgs = VecDeque::new();
for v in 0..10 {
let i = 9 - v;
@ -735,7 +833,7 @@ mod test {
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
w.meta.set_addr(&tn.data.gossip_addr);
msgs.push_back(b_);
}
s_responder.send(msgs).expect("send");

View File

@ -12,6 +12,7 @@ use std::io;
use std::net::{SocketAddr, UdpSocket};
use transaction::Transaction;
/// An object for querying and sending transactions to the network.
pub struct ThinClient {
requests_addr: SocketAddr,
requests_socket: UdpSocket,
@ -105,7 +106,7 @@ impl ThinClient {
while !done {
let resp = self.recv_response()?;
trace!("recv_response {:?}", resp);
if let &Response::Balance { ref key, .. } = &resp {
if let Response::Balance { key, .. } = &resp {
done = key == pubkey;
}
self.process_response(resp);

View File

@ -1,3 +1,4 @@
//! The `timing` module provides std::time utility functions.
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};

View File

@ -1,5 +1,29 @@
//! The `tpu` module implements the Transaction Processing Unit, a
//! 5-stage transaction processing pipeline in software.
//!
//! ```text
//! .---------------------------------------------------------------.
//! | TPU .-----. |
//! | | PoH | |
//! | `--+--` |
//! | | |
//! | v |
//! | .-------. .-----------. .---------. .--------. .-------. |
//! .---------. | | Fetch | | SigVerify | | Banking | | Record | | Write | | .------------.
//! | Clients |--->| Stage |->| Stage |->| Stage |->| Stage |->| Stage +--->| Validators |
//! `---------` | | | | | | | | | | | | `------------`
//! | `-------` `-----------` `----+----` `--------` `---+---` |
//! | | | |
//! | | | |
//! | | | |
//! | | | |
//! `---------------------------------|-----------------------|-----`
//! | |
//! v v
//! .------. .--------.
//! | Bank | | Ledger |
//! `------` `--------`
//! ```
use bank::Bank;
use banking_stage::BankingStage;

View File

@ -11,8 +11,10 @@ pub const SIGNED_DATA_OFFSET: usize = 112;
pub const SIG_OFFSET: usize = 8;
pub const PUB_KEY_OFFSET: usize = 80;
/// The type of payment plan. Each item must implement the PaymentPlan trait.
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
pub enum Plan {
/// The builtin contract language Budget.
Budget(Budget),
}
@ -37,29 +39,49 @@ impl PaymentPlan for Plan {
}
}
/// A smart contract.
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
pub struct Contract {
/// The number of tokens allocated to the `Plan` and any transaction fees.
pub tokens: i64,
pub plan: Plan,
}
/// An instruction to progress the smart contract.
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
pub enum Instruction {
/// Declare and instanstansiate `Contract`.
NewContract(Contract),
/// Tell a payment plan acknowledge the given `DateTime` has past.
ApplyTimestamp(DateTime<Utc>),
/// Tell the payment plan that the `NewContract` with `Signature` has been
/// signed by the containing transaction's `PublicKey`.
ApplySignature(Signature),
}
/// An instruction signed by a client with `PublicKey`.
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
pub struct Transaction {
/// A digital signature of `instruction`, `last_id` and `fee`, signed by `PublicKey`.
pub sig: Signature,
/// The `PublicKey` of the entity that signed the transaction data.
pub from: PublicKey,
/// The action the server should take.
pub instruction: Instruction,
/// The ID of a recent ledger entry.
pub last_id: Hash,
/// The number of tokens paid for processing and storage of this transaction.
pub fee: i64,
}
impl Transaction {
/// Create a signed transaction from the given `Instruction`.
fn new_from_instruction(
from_keypair: &KeyPair,
instruction: Instruction,
@ -122,7 +144,7 @@ impl Transaction {
last_id: Hash,
) -> Self {
let from = from_keypair.pubkey();
let budget = Budget::Race(
let budget = Budget::Or(
(Condition::Timestamp(dt), Payment { tokens, to }),
(Condition::Signature(from), Payment { tokens, to: from }),
);
@ -131,6 +153,7 @@ impl Transaction {
Self::new_from_instruction(from_keypair, instruction, last_id, 0)
}
/// Get the transaction data to sign.
fn get_sign_data(&self) -> Vec<u8> {
let mut data = serialize(&(&self.instruction)).expect("serialize Contract");
let last_id_data = serialize(&(&self.last_id)).expect("serialize last_id");
@ -148,11 +171,13 @@ impl Transaction {
self.sig = Signature::clone_from_slice(keypair.sign(&sign_data).as_ref());
}
/// Verify only the transaction signature.
pub fn verify_sig(&self) -> bool {
warn!("transaction signature verification called");
self.sig.verify(&self.from, &self.get_sign_data())
}
/// Verify only the payment plan.
pub fn verify_plan(&self) -> bool {
if let Instruction::NewContract(contract) = &self.instruction {
self.fee >= 0 && self.fee <= contract.tokens

View File

@ -22,7 +22,7 @@
use bank::Bank;
use crdt::{Crdt, ReplicatedData};
use data_replicator::DataReplicator;
use ncp::Ncp;
use packet;
use replicate_stage::ReplicateStage;
use std::net::UdpSocket;
@ -51,6 +51,7 @@ impl Tvu {
me: ReplicatedData,
gossip_listen_socket: UdpSocket,
replicate: UdpSocket,
repair_socket: UdpSocket,
leader: ReplicatedData,
exit: Arc<AtomicBool>,
) -> Self {
@ -64,13 +65,13 @@ impl Tvu {
.insert(&leader);
let window = streamer::default_window();
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let data_replicator = DataReplicator::new(
let ncp = Ncp::new(
crdt.clone(),
window.clone(),
gossip_listen_socket,
gossip_send_socket,
exit.clone(),
).expect("DataReplicator::new");
).expect("Ncp::new");
// TODO pull this socket out through the public interface
// make sure we are on the same interface
@ -96,6 +97,12 @@ impl Tvu {
blob_recycler.clone(),
retransmit_receiver,
);
let t_repair_receiver = streamer::blob_receiver(
exit.clone(),
blob_recycler.clone(),
repair_socket,
blob_sender.clone(),
).expect("tvu: blob repair receiver fail");
//TODO
//the packets coming out of blob_receiver need to be sent to the GPU and verified
@ -122,9 +129,10 @@ impl Tvu {
t_blob_receiver,
t_retransmit,
t_window,
t_repair_receiver,
replicate_stage.thread_hdl,
];
threads.extend(data_replicator.thread_hdls.into_iter());
threads.extend(ncp.thread_hdls.into_iter());
Tvu {
thread_hdls: threads,
}
@ -136,11 +144,11 @@ pub mod tests {
use bank::Bank;
use bincode::serialize;
use crdt::{Crdt, TestNode};
use data_replicator::DataReplicator;
use entry::Entry;
use hash::{hash, Hash};
use logger;
use mint::Mint;
use ncp::Ncp;
use packet::BlobRecycler;
use result::Result;
use signature::{KeyPair, KeyPairUtil};
@ -158,10 +166,10 @@ pub mod tests {
crdt: Arc<RwLock<Crdt>>,
listen: UdpSocket,
exit: Arc<AtomicBool>,
) -> Result<DataReplicator> {
) -> Result<Ncp> {
let window = streamer::default_window();
let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
DataReplicator::new(crdt, window, listen, send_sock, exit)
Ncp::new(crdt, window, listen, send_sock, exit)
}
/// Test that message sent from leader to target1 and replicated to target2
#[test]
@ -218,6 +226,7 @@ pub mod tests {
target1.data,
target1.sockets.gossip,
target1.sockets.replicate,
target1.sockets.repair,
leader.data,
exit.clone(),
);

View File

@ -1,4 +1,6 @@
//! The `write_stage` module implements write stage of the RPU.
//! The `write_stage` module implements the TPU's write stage. It
//! writes entries to the given writer, which is typically a file or
//! stdout, and then sends the Entry to its output channel.
use bank::Bank;
use entry::Entry;

View File

@ -5,8 +5,8 @@ extern crate solana;
use rayon::iter::*;
use solana::crdt::{Crdt, TestNode};
use solana::data_replicator::DataReplicator;
use solana::logger;
use solana::ncp::Ncp;
use solana::packet::Blob;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
@ -14,12 +14,12 @@ use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, DataReplicator, UdpSocket) {
fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, Ncp, UdpSocket) {
let tn = TestNode::new();
let crdt = Crdt::new(tn.data.clone());
let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![]));
let d = DataReplicator::new(
let d = Ncp::new(
c.clone(),
w,
tn.sockets.gossip,
@ -35,7 +35,7 @@ fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, DataReplicator, UdpSo
/// tests that actually use this function are below
fn run_gossip_topo<F>(topo: F)
where
F: Fn(&Vec<(Arc<RwLock<Crdt>>, DataReplicator, UdpSocket)>) -> (),
F: Fn(&Vec<(Arc<RwLock<Crdt>>, Ncp, UdpSocket)>) -> (),
{
let num: usize = 5;
let exit = Arc::new(AtomicBool::new(false));
@ -45,7 +45,7 @@ where
for i in 0..(num * 32) {
done = false;
trace!("round {}", i);
for &(ref c, _, _) in listen.iter() {
for (c, _, _) in &listen {
if num == c.read().unwrap().convergence() as usize {
done = true;
break;

View File

@ -6,9 +6,9 @@ extern crate solana;
use solana::bank::Bank;
use solana::crdt::TestNode;
use solana::crdt::{Crdt, ReplicatedData};
use solana::data_replicator::DataReplicator;
use solana::logger;
use solana::mint::Mint;
use solana::ncp::Ncp;
use solana::server::Server;
use solana::signature::{KeyPair, KeyPairUtil, PublicKey};
use solana::streamer::default_window;
@ -37,6 +37,7 @@ fn validator(
validator.sockets.respond,
validator.sockets.replicate,
validator.sockets.gossip,
validator.sockets.repair,
leader.clone(),
exit.clone(),
);
@ -60,7 +61,7 @@ fn converge(
spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = default_window();
let dr = DataReplicator::new(
let dr = Ncp::new(
spy_ref.clone(),
spy_window,
spy.sockets.gossip,