Compare commits

...

13 Commits
v0.9 ... v0.8

Author SHA1 Message Date
390af512de Permit testnets without a GPU 2018-09-26 10:38:10 -07:00
2a9be901da Mark --outfile parameter as required 2018-09-26 10:38:06 -07:00
d794fee66f Remove unused variables and imports after cherry-picking from master 2018-09-19 11:49:47 -07:00
9b66d4d363 Read multiple entries in write stage (#1259)
- Also use rayon to parallelize to_blobs() to maximize CPU usage
2018-09-19 11:49:47 -07:00
bff8f2614b Move entry->blob creation out of write stage (#1257)
- The write stage will output vector of entries
- Broadcast stage will create blobs out of the entries
- Helps reduce MIPS requirements for write stage
2018-09-19 11:49:47 -07:00
8f0648e8fc Move register_entry_id() call out of write stage (#1253)
* Move register_entry_id() call out of write stage

- Write stage is MIPS intensive and has become a bottleneck for
  TPU pipeline
- This will reduce the MIPS requirements for the stage

* Fix rust format issues
2018-09-19 11:49:47 -07:00
d81eaf69db Update fetch-perf-libs.sh 2018-09-17 11:54:45 -07:00
b5935a3830 cargo fmt 2018-09-14 20:30:04 -07:00
c1b07d0f21 Upgrade rust stable to 1.29 2018-09-14 20:30:04 -07:00
a1579b5a47 Remove large-network test, it's ignored anyway 2018-09-14 20:11:46 -07:00
77949a4be6 cherry pick readme update 2018-09-13 19:19:48 -07:00
af58940964 Fix missing recycle in recv_from (#1205)
In the error case that i>0 (we have blobs to send)
we break out of the loop and do not push the allocated r
to the v array. We should recycle this blob, otherwise it
will be dropped.
2018-09-13 10:27:24 -07:00
21963b8c82 fix "leak" in Blob::recv_from (#1198)
* fix "leak" in Blob::recv_from

fixes #1199
2018-09-13 10:27:24 -07:00
47 changed files with 321 additions and 322 deletions

View File

@ -62,7 +62,7 @@ your odds of success if you check out the
before proceeding:
```bash
$ git checkout v0.7.2
$ git checkout v0.8.0
```
Configuration Setup
@ -113,7 +113,7 @@ To run a multinode testnet, after starting a leader node, spin up some validator
separate shells:
```bash
$ ./multinode-demo/validator.sh ubuntu@10.0.1.51:~/solana 10.0.1.51
$ ./multinode-demo/validator.sh
```
To run a performance-enhanced leader or validator (on Linux),
@ -123,22 +123,20 @@ your system:
```bash
$ ./fetch-perf-libs.sh
$ SOLANA_CUDA=1 ./multinode-demo/leader.sh
$ SOLANA_CUDA=1 ./multinode-demo/validator.sh ubuntu@10.0.1.51:~/solana 10.0.1.51
$ SOLANA_CUDA=1 ./multinode-demo/validator.sh
```
Testnet Client Demo
---
Now that your singlenode or multinode testnet is up and running let's send it some transactions! Note that we pass in
the expected number of nodes in the network. If running singlenode, pass 1; if multinode, pass the number
of validators you started.
Now that your singlenode or multinode testnet is up and running let's send it
some transactions!
In a separate shell start the client:
```bash
$ ./multinode-demo/client.sh ubuntu@10.0.1.51:~/solana 1
$ ./multinode-demo/client.sh # runs against localhost by default
```
What just happened? The client demo spins up several threads to send 500,000 transactions
@ -155,7 +153,7 @@ Public Testnet
In this example the client connects to our public testnet. To run validators on the testnet you would need to open udp ports `8000-10000`.
```bash
$ ./multinode-demo/client.sh testnet.solana.com 1 #The minumum number of nodes to discover on the network
$ ./multinode-demo/client.sh --network $(dig +short testnet.solana.com):8001 --identity config-private/client-id.json --duration 60
```
You can observe the effects of your client's transactions on our [dashboard](https://metrics.solana.com:3000/d/testnet/testnet-hud?orgId=2&from=now-30m&to=now&refresh=5s&var-testnet=testnet)

View File

@ -37,8 +37,7 @@ fn bench_process_transaction(bencher: &mut Bencher) {
// Finally, return the transaction to the benchmark.
tx
})
.collect();
}).collect();
bencher.iter(|| {
// Since benchmarker runs this multiple times, we need to clear the signatures.

View File

@ -116,8 +116,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
i as i64,
mint.last_id(),
)
})
.collect();
}).collect();
let (verified_sender, verified_receiver) = channel();
let (signal_sender, signal_receiver) = channel();
@ -131,8 +130,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
mint_total / num_src_accounts as i64,
mint.last_id(),
)
})
.collect();
}).collect();
bencher.iter(move || {
let bank = Arc::new(Bank::new(&mint));
@ -143,8 +141,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
.map(|x| {
let len = (*x).read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
}).collect();
verified_sender.send(verified_setup).unwrap();
BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler)
@ -157,8 +154,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
.map(|x| {
let len = (*x).read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
}).collect();
verified_sender.send(verified).unwrap();
BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler)
@ -187,8 +183,7 @@ fn bench_banking_stage_single_from(bencher: &mut Bencher) {
i as i64,
mint.last_id(),
)
})
.collect();
}).collect();
let (verified_sender, verified_receiver) = channel();
let (signal_sender, signal_receiver) = channel();
@ -201,8 +196,7 @@ fn bench_banking_stage_single_from(bencher: &mut Bencher) {
.map(|x| {
let len = (*x).read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
}).collect();
verified_sender.send(verified).unwrap();
BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler)
.unwrap();

View File

@ -1,5 +1,5 @@
steps:
- command: "ci/docker-run.sh solanalabs/rust:1.28.0 ci/test-stable.sh"
- command: "ci/docker-run.sh solanalabs/rust:1.29.0 ci/test-stable.sh"
name: "stable [public]"
env:
CARGO_TARGET_CACHE_NAME: "stable"
@ -24,13 +24,6 @@ steps:
timeout_in_minutes: 20
agents:
- "queue=cuda"
- command: "ci/test-large-network.sh || true"
name: "large-network [public] [ignored]"
env:
CARGO_TARGET_CACHE_NAME: "stable"
timeout_in_minutes: 20
agents:
- "queue=large"
- command: "ci/pr-snap.sh"
timeout_in_minutes: 20
name: "snap [public]"

View File

@ -1,6 +1,6 @@
# Note: when the rust version (1.28) is changed also modify
# ci/buildkite.yml to pick up the new image tag
FROM rust:1.28
FROM rust:1.29.0
RUN set -x && \
apt update && \

View File

@ -9,6 +9,7 @@ validatorNodeCount=10
publicNetwork=false
snapChannel=edge
delete=false
enableGpu=false
usage() {
exitcode=0
@ -30,6 +31,7 @@ Deploys a CD testnet
-n [number] - Number of validator nodes (default: $validatorNodeCount)
-c [number] - Number of client nodes (default: $clientNodeCount)
-P - Use public network IP addresses (default: $publicNetwork)
-g - Enable GPU (default: $enableGpu)
-a [address] - Set the leader node's external IP address to this GCE address
-d - Delete the network
@ -45,7 +47,7 @@ zone=$2
[[ -n $zone ]] || usage "Zone not specified"
shift 2
while getopts "h?p:Pn:c:s:a:d" opt; do
while getopts "h?p:Pn:c:s:ga:d" opt; do
case $opt in
h | \?)
usage
@ -69,6 +71,9 @@ while getopts "h?p:Pn:c:s:a:d" opt; do
;;
esac
;;
g)
enableGpu=true
;;
a)
leaderAddress=$OPTARG
;;
@ -86,11 +91,14 @@ gce_create_args=(
-a "$leaderAddress"
-c "$clientNodeCount"
-n "$validatorNodeCount"
-g
-p "$netName"
-z "$zone"
)
if $enableGpu; then
gce_create_args+=(-g)
fi
if $publicNetwork; then
gce_create_args+=(-P)
fi

View File

@ -23,8 +23,8 @@ nightly)
require cargo 1.29.[0-9]+-nightly
;;
stable)
require rustc 1.28.[0-9]+
require cargo 1.28.[0-9]+
require rustc 1.29.[0-9]+
require cargo 1.29.[0-9]+
;;
*)
echo Error: unknown argument: "$1"

View File

@ -15,7 +15,7 @@ mkdir -p target/perf-libs
cd target/perf-libs
(
set -x
curl https://solana-perf.s3.amazonaws.com/master/x86_64-unknown-linux-gnu/solana-perf.tgz | tar zxvf -
curl https://solana-perf.s3.amazonaws.com/v0.8.0/x86_64-unknown-linux-gnu/solana-perf.tgz | tar zxvf -
)
if [[ -r /usr/local/cuda/version.txt && -r cuda-version.txt ]]; then

View File

@ -18,4 +18,8 @@ usage() {
exit 1
}
$solana_bench_tps "$@"
if [[ -z $1 ]]; then # default behavior
$solana_bench_tps --identity config-private/client-id.json --network 127.0.0.1:8001 --duration 90
else
$solana_bench_tps "$@"
fi

View File

@ -34,6 +34,7 @@ ip_address_arg=-l
num_tokens=1000000000
node_type_leader=true
node_type_validator=true
node_type_client=true
while getopts "h?n:lpt:" opt; do
case $opt in
h|\?)
@ -55,10 +56,17 @@ while getopts "h?n:lpt:" opt; do
leader)
node_type_leader=true
node_type_validator=false
node_type_client=false
;;
validator)
node_type_leader=false
node_type_validator=true
node_type_client=false
;;
client)
node_type_leader=false
node_type_validator=false
node_type_client=true
;;
*)
usage "Error: unknown node type: $node_type"
@ -74,13 +82,19 @@ done
set -e
if $node_type_leader; then
for i in "$SOLANA_CONFIG_DIR" "$SOLANA_CONFIG_PRIVATE_DIR"; do
echo "Cleaning $i"
rm -rvf "$i"
mkdir -p "$i"
done
for i in "$SOLANA_CONFIG_DIR" "$SOLANA_CONFIG_VALIDATOR_DIR" "$SOLANA_CONFIG_PRIVATE_DIR"; do
echo "Cleaning $i"
rm -rvf "$i"
mkdir -p "$i"
done
if $node_type_client; then
client_id_path="$SOLANA_CONFIG_PRIVATE_DIR"/client-id.json
$solana_keygen -o "$client_id_path"
ls -lhR "$SOLANA_CONFIG_PRIVATE_DIR"/
fi
if $node_type_leader; then
leader_address_args=("$ip_address_arg")
leader_id_path="$SOLANA_CONFIG_PRIVATE_DIR"/leader-id.json
mint_path="$SOLANA_CONFIG_PRIVATE_DIR"/mint.json
@ -102,11 +116,6 @@ fi
if $node_type_validator; then
echo "Cleaning $SOLANA_CONFIG_VALIDATOR_DIR"
rm -rvf "$SOLANA_CONFIG_VALIDATOR_DIR"
mkdir -p "$SOLANA_CONFIG_VALIDATOR_DIR"
validator_address_args=("$ip_address_arg" -b 9000)
validator_id_path="$SOLANA_CONFIG_PRIVATE_DIR"/validator-id.json

View File

@ -449,8 +449,7 @@ impl Bank {
.map(|(acc, tx)| match acc {
Err(e) => Err(e.clone()),
Ok(ref mut accounts) => Self::execute_transaction(tx, accounts),
})
.collect();
}).collect();
let execution_elapsed = now.elapsed();
let now = Instant::now();
Self::store_accounts(&res, &loaded_accounts, &mut accounts);
@ -1010,8 +1009,7 @@ mod tests {
let last_id = hash(&serialize(&i).unwrap()); // Unique hash
bank.register_entry_id(&last_id);
last_id
})
.collect();
}).collect();
assert_eq!(bank.count_valid_ids(&[]).len(), 0);
assert_eq!(bank.count_valid_ids(&[mint.last_id()]).len(), 0);
for (i, id) in bank.count_valid_ids(&ids).iter().enumerate() {

View File

@ -52,8 +52,7 @@ impl BankingStage {
_ => error!("{:?}", e),
}
}
})
.unwrap();
}).unwrap();
(BankingStage { thread_hdl }, signal_receiver)
}
@ -66,8 +65,7 @@ impl BankingStage {
deserialize(&x.data[0..x.meta.size])
.map(|req| (req, x.meta.addr()))
.ok()
})
.collect()
}).collect()
}
/// Process the incoming packets and send output `Signal` messages to `signal_sender`.
@ -105,8 +103,7 @@ impl BankingStage {
} else {
None
},
})
.collect();
}).collect();
debug!("process_transactions");
let results = bank.process_transactions(transactions);

View File

@ -68,8 +68,7 @@ fn main() -> Result<()> {
.value_name("NUM")
.takes_value(true)
.help("Use NUM receive sockets"),
)
.get_matches();
).get_matches();
if let Some(n) = matches.value_of("num-recv-sockets") {
num_sockets = max(num_sockets, n.to_string().parse().expect("integer"));

View File

@ -135,8 +135,7 @@ fn send_barrier_transaction(barrier_client: &mut ThinClient, last_id: &mut Hash,
.add_tag(
"op",
influxdb::Value::String("send_barrier_transaction".to_string()),
)
.add_field("poll_count", influxdb::Value::Integer(poll_count))
).add_field("poll_count", influxdb::Value::Integer(poll_count))
.add_field("duration", influxdb::Value::Integer(duration_ms as i64))
.to_owned(),
);
@ -147,8 +146,7 @@ fn send_barrier_transaction(barrier_client: &mut ThinClient, last_id: &mut Hash,
&id.pubkey(),
&Duration::from_millis(100),
&Duration::from_secs(10),
)
.expect("Failed to get balance");
).expect("Failed to get balance");
if balance != 1 {
panic!("Expected an account balance of 1 (balance: {}", balance);
}
@ -195,8 +193,7 @@ fn generate_txs(
} else {
Transaction::new(keypair, id.pubkey(), 1, *last_id)
}
})
.collect();
}).collect();
let duration = signing_start.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
@ -214,8 +211,7 @@ fn generate_txs(
.add_field(
"duration",
influxdb::Value::Integer(duration_as_ms(&duration) as i64),
)
.to_owned(),
).to_owned(),
);
let sz = transactions.len() / threads;
@ -267,8 +263,7 @@ fn do_tx_transfers(
.add_field(
"duration",
influxdb::Value::Integer(duration_as_ms(&transfer_start.elapsed()) as i64),
)
.add_field("count", influxdb::Value::Integer(tx_len as i64))
).add_field("count", influxdb::Value::Integer(tx_len as i64))
.to_owned(),
);
}
@ -578,10 +573,8 @@ fn main() {
.name("solana-client-sample".to_string())
.spawn(move || {
sample_tx_count(&exit_signal, &maxes, first_tx_count, &v, sample_period);
})
.unwrap()
})
.collect();
}).unwrap()
}).collect();
let shared_txs: Arc<RwLock<VecDeque<Vec<Transaction>>>> =
Arc::new(RwLock::new(VecDeque::new()));
@ -606,10 +599,8 @@ fn main() {
&shared_tx_active_thread_count,
&total_tx_sent_count,
);
})
.unwrap()
})
.collect();
}).unwrap()
}).collect();
// generate and send transactions for the specified duration
let start = Instant::now();

View File

@ -48,8 +48,7 @@ fn main() -> Result<(), Box<error::Error>> {
.takes_value(true)
.required(true)
.help("rendezvous with the network at this gossip entry point"),
)
.arg(
).arg(
Arg::with_name("keypair")
.short("k")
.long("keypair")
@ -57,22 +56,19 @@ fn main() -> Result<(), Box<error::Error>> {
.takes_value(true)
.required(true)
.help("File to read the client's keypair from"),
)
.arg(
).arg(
Arg::with_name("slice")
.long("slice")
.value_name("SECONDS")
.takes_value(true)
.help("Time slice over which to limit requests to drone"),
)
.arg(
).arg(
Arg::with_name("cap")
.long("cap")
.value_name("NUMBER")
.takes_value(true)
.help("Request limit for time slice"),
)
.get_matches();
).get_matches();
let network = matches
.value_of("network")
@ -159,8 +155,7 @@ fn main() -> Result<(), Box<error::Error>> {
io::ErrorKind::Other,
format!("Drone response: {:?}", err),
))
}))
.then(|_| Ok(()));
})).then(|_| Ok(()));
tokio::spawn(server)
});
tokio::run(done);

View File

@ -21,31 +21,27 @@ fn main() {
.long("local")
.takes_value(false)
.help("detect network address from local machine configuration"),
)
.arg(
).arg(
Arg::with_name("keypair")
.short("k")
.long("keypair")
.value_name("PATH")
.takes_value(true)
.help("/path/to/id.json"),
)
.arg(
).arg(
Arg::with_name("public")
.short("p")
.long("public")
.takes_value(false)
.help("detect public network address using public servers"),
)
.arg(
).arg(
Arg::with_name("bind")
.short("b")
.long("bind")
.value_name("PORT")
.takes_value(true)
.help("bind to port or address"),
)
.get_matches();
).get_matches();
let bind_addr: SocketAddr = {
let mut bind_addr = parse_port_or_addr(matches.value_of("bind"), FULLNODE_PORT_RANGE.0);

View File

@ -36,16 +36,14 @@ fn main() -> () {
.value_name("FILE")
.takes_value(true)
.help("run with the identity found in FILE"),
)
.arg(
).arg(
Arg::with_name("network")
.short("n")
.long("network")
.value_name("HOST:PORT")
.takes_value(true)
.help("connect/rendezvous with the network at this gossip entry point"),
)
.arg(
).arg(
Arg::with_name("ledger")
.short("l")
.long("ledger")
@ -53,8 +51,7 @@ fn main() -> () {
.takes_value(true)
.required(true)
.help("use DIR as persistent ledger location"),
)
.get_matches();
).get_matches();
let (keypair, ncp) = if let Some(i) = matches.value_of("identity") {
let path = i.to_string();

View File

@ -25,8 +25,7 @@ fn main() -> Result<(), Box<error::Error>> {
.takes_value(true)
.required(true)
.help("Number of tokens with which to initialize mint"),
)
.arg(
).arg(
Arg::with_name("ledger")
.short("l")
.long("ledger")
@ -34,8 +33,7 @@ fn main() -> Result<(), Box<error::Error>> {
.takes_value(true)
.required(true)
.help("use DIR as persistent ledger location"),
)
.get_matches();
).get_matches();
let tokens = value_t_or_exit!(matches, "tokens", i64);
let ledger_path = matches.value_of("ledger").unwrap();

View File

@ -21,9 +21,9 @@ fn main() -> Result<(), Box<error::Error>> {
.long("outfile")
.value_name("PATH")
.takes_value(true)
.required(true)
.help("path to generated file"),
)
.get_matches();
).get_matches();
let rnd = SystemRandom::new();
let pkcs8_bytes = Ed25519KeyPair::generate_pkcs8(&rnd)?;

View File

@ -84,23 +84,20 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.value_name("PATH")
.takes_value(true)
.help("/path/to/leader.json"),
)
.arg(
).arg(
Arg::with_name("keypair")
.short("k")
.long("keypair")
.value_name("PATH")
.takes_value(true)
.help("/path/to/id.json"),
)
.arg(
).arg(
Arg::with_name("timeout")
.long("timeout")
.value_name("SECONDS")
.takes_value(true)
.help("Max SECONDS to wait to get necessary gossip from the network"),
)
.subcommand(
).subcommand(
SubCommand::with_name("airdrop")
.about("Request a batch of tokens")
.arg(
@ -111,8 +108,7 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.required(true)
.help("The number of tokens to request"),
),
)
.subcommand(
).subcommand(
SubCommand::with_name("pay")
.about("Send a payment")
.arg(
@ -122,16 +118,14 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.takes_value(true)
.required(true)
.help("The number of tokens to send"),
)
.arg(
).arg(
Arg::with_name("to")
.long("to")
.value_name("PUBKEY")
.takes_value(true)
.help("The pubkey of recipient"),
),
)
.subcommand(
).subcommand(
SubCommand::with_name("confirm")
.about("Confirm your payment by signature")
.arg(
@ -141,8 +135,7 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.required(true)
.help("The transaction signature to confirm"),
),
)
.subcommand(SubCommand::with_name("balance").about("Get your balance"))
).subcommand(SubCommand::with_name("balance").about("Get your balance"))
.subcommand(SubCommand::with_name("address").about("Get your public key"))
.get_matches();

View File

@ -32,8 +32,7 @@ impl BlobFetchStage {
.into_iter()
.map(|socket| {
streamer::blob_receiver(socket, exit.clone(), recycler.clone(), sender.clone())
})
.collect();
}).collect();
(BlobFetchStage { exit, thread_hdls }, receiver)
}

View File

@ -2,20 +2,23 @@
//!
use counter::Counter;
use crdt::{Crdt, CrdtError, NodeInfo};
use entry::Entry;
#[cfg(feature = "erasure")]
use erasure;
use ledger::Block;
use log::Level;
use packet::BlobRecycler;
use packet::{BlobRecycler, SharedBlobs};
use rayon::prelude::*;
use result::{Error, Result};
use service::Service;
use std::mem;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use streamer::BlobReceiver;
use std::time::{Duration, Instant};
use timing::duration_as_ms;
use window::{self, SharedWindow, WindowIndex, WindowUtil, WINDOW_SIZE};
fn broadcast(
@ -23,27 +26,42 @@ fn broadcast(
broadcast_table: &[NodeInfo],
window: &SharedWindow,
recycler: &BlobRecycler,
receiver: &BlobReceiver,
receiver: &Receiver<Vec<Entry>>,
sock: &UdpSocket,
transmit_index: &mut WindowIndex,
receive_index: &mut u64,
) -> Result<()> {
let id = node_info.id;
let timer = Duration::new(1, 0);
let mut dq = receiver.recv_timeout(timer)?;
while let Ok(mut nq) = receiver.try_recv() {
dq.append(&mut nq);
let entries = receiver.recv_timeout(timer)?;
let mut num_entries = entries.len();
let mut ventries = Vec::new();
ventries.push(entries);
while let Ok(entries) = receiver.try_recv() {
num_entries += entries.len();
ventries.push(entries);
}
let to_blobs_start = Instant::now();
let dq: SharedBlobs = ventries
.into_par_iter()
.flat_map(|p| p.to_blobs(recycler))
.collect();
let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
// flatten deque to vec
let blobs_vec: Vec<_> = dq.into_iter().collect();
let blobs_chunking = Instant::now();
// We could receive more blobs than window slots so
// break them up into window-sized chunks to process
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec());
let chunking_elapsed = duration_as_ms(&blobs_chunking.elapsed());
trace!("{}", window.read().unwrap().print(&id, *receive_index));
let broadcast_start = Instant::now();
for mut blobs in blobs_chunked {
let blobs_len = blobs.len();
trace!("{}: broadcast blobs.len: {}", id, blobs_len);
@ -115,6 +133,13 @@ fn broadcast(
*receive_index,
)?;
}
let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed());
info!(
"broadcast: {} entries, blob time {} chunking time {} broadcast time {}",
num_entries, to_blobs_elapsed, chunking_elapsed, broadcast_elapsed
);
Ok(())
}
@ -129,7 +154,7 @@ impl BroadcastStage {
window: &SharedWindow,
entry_height: u64,
recycler: &BlobRecycler,
receiver: &BlobReceiver,
receiver: &Receiver<Vec<Entry>>,
) {
let mut transmit_index = WindowIndex {
data: entry_height,
@ -177,14 +202,13 @@ impl BroadcastStage {
window: SharedWindow,
entry_height: u64,
recycler: BlobRecycler,
receiver: BlobReceiver,
receiver: Receiver<Vec<Entry>>,
) -> Self {
let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver);
})
.unwrap();
}).unwrap();
BroadcastStage { thread_hdl }
}

View File

@ -86,8 +86,7 @@ impl Counter {
.add_field(
"count",
influxdb::Value::Integer(counts as i64 - lastlog as i64),
)
.to_owned(),
).to_owned(),
);
self.lastlog
.compare_and_swap(lastlog, counts, Ordering::Relaxed);

View File

@ -277,8 +277,7 @@ impl Crdt {
node.contact_info.rpu.to_string(),
node.contact_info.tpu.to_string()
)
})
.collect();
}).collect();
format!(
" NodeInfo.contact_info | Node identifier\n\
@ -402,8 +401,7 @@ impl Crdt {
trace!("{} purge skipped {} {} {}", self.id, k, now - v, limit);
None
}
})
.collect();
}).collect();
inc_new_counter_info!("crdt-purge-count", dead_ids.len());
@ -450,8 +448,7 @@ impl Crdt {
trace!("{}:broadcast node {} {}", me.id, v.id, v.contact_info.tvu);
true
}
})
.cloned()
}).cloned()
.collect();
cloned_table
}
@ -552,8 +549,7 @@ impl Crdt {
v.contact_info.tvu
);
e
})
.collect();
}).collect();
trace!("broadcast results {}", errs.len());
for e in errs {
@ -607,8 +603,7 @@ impl Crdt {
} else {
true
}
})
.collect();
}).collect();
trace!("retransmit orders {}", orders.len());
let errs: Vec<_> = orders
.par_iter()
@ -623,8 +618,7 @@ impl Crdt {
//TODO profile this, may need multiple sockets for par_iter
assert!(rblob.meta.size <= BLOB_SIZE);
s.send_to(&rblob.data[..rblob.meta.size], &v.contact_info.tvu)
})
.collect();
}).collect();
for e in errs {
if let Err(e) = &e {
inc_new_counter_info!("crdt-retransmit-send_to_error", 1, 1);
@ -666,8 +660,7 @@ impl Crdt {
r.id != Pubkey::default()
&& (Self::is_valid_address(&r.contact_info.tpu)
|| Self::is_valid_address(&r.contact_info.tvu))
})
.map(|x| x.ledger_state.last_id)
}).map(|x| x.ledger_state.last_id)
.collect()
}
@ -702,8 +695,7 @@ impl Crdt {
v.id != self.id
&& !v.contact_info.ncp.ip().is_unspecified()
&& !v.contact_info.ncp.ip().is_multicast()
})
.collect();
}).collect();
let choose_peer_strategy = ChooseWeightedPeerStrategy::new(
&self.remote,
@ -867,8 +859,7 @@ impl Crdt {
let time_left = GOSSIP_SLEEP_MILLIS - elapsed;
sleep(Duration::from_millis(time_left));
}
})
.unwrap()
}).unwrap()
}
fn run_window_request(
from: &NodeInfo,
@ -1191,8 +1182,7 @@ impl Crdt {
me.table.len()
);
}
})
.unwrap()
}).unwrap()
}
fn is_valid_ip(addr: IpAddr) -> bool {

View File

@ -134,12 +134,10 @@ impl Drone {
.add_field(
"request_amount",
influxdb::Value::Integer(request_amount as i64),
)
.add_field(
).add_field(
"request_current",
influxdb::Value::Integer(self.request_current as i64),
)
.to_owned(),
).to_owned(),
);
client.retry_transfer_signed(&tx, 10)
} else {

View File

@ -112,7 +112,8 @@ impl Entry {
id: Hash::default(),
transactions,
has_more: false,
}).unwrap() <= BLOB_DATA_SIZE as u64
}).unwrap()
<= BLOB_DATA_SIZE as u64
}
/// Creates the next Tick Entry `num_hashes` after `start_hash`.

View File

@ -327,8 +327,7 @@ pub fn generate_coding(
.map(|(i, l)| {
trace!("{} i: {} data: {}", id, i, l.data[0]);
&l.data[..max_data_size]
})
.collect();
}).collect();
let mut coding_locks: Vec<_> = coding_blobs
.iter()
@ -341,8 +340,7 @@ pub fn generate_coding(
.map(|(i, l)| {
trace!("{} i: {} coding: {}", id, i, l.data[0],);
&mut l.data_mut()[..max_data_size]
})
.collect();
}).collect();
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
debug!(

View File

@ -33,8 +33,7 @@ impl FetchStage {
.into_iter()
.map(|socket| {
streamer::receiver(socket, exit.clone(), recycler.clone(), sender.clone())
})
.collect();
}).collect();
(FetchStage { exit, thread_hdls }, receiver)
}

View File

@ -243,7 +243,7 @@ impl Fullnode {
// TODO: To light up PoH, uncomment the following line:
//let tick_duration = Some(Duration::from_millis(1000));
let (tpu, blob_receiver) = Tpu::new(
let (tpu, entry_receiver) = Tpu::new(
keypair,
&bank,
&crdt,
@ -262,7 +262,7 @@ impl Fullnode {
shared_window,
entry_height,
blob_recycler.clone(),
blob_receiver,
entry_receiver,
);
thread_hdls.extend(broadcast_stage.thread_hdls());
}
@ -323,8 +323,7 @@ mod tests {
let bank = Bank::new(&alice);
let entry = tn.info.clone();
Fullnode::new_with_bank(keypair, bank, 0, &[], tn, Some(&entry), None, false)
})
.collect();
}).collect();
//each validator can exit in parallel to speed many sequential calls to `join`
vals.iter().for_each(|v| v.exit());
//while join is called sequentially, the above exit call notified all the

View File

@ -598,8 +598,7 @@ mod tests {
)],
false,
)
})
.collect()
}).collect()
}
fn make_test_entries() -> Vec<Entry> {

View File

@ -342,12 +342,10 @@ mod test {
.add_field(
"random_bool",
influxdb::Value::Boolean(random::<u8>() < 128),
)
.add_field(
).add_field(
"random_int",
influxdb::Value::Integer(random::<u8>() as i64),
)
.to_owned();
).to_owned();
agent.submit(point);
}

View File

@ -441,6 +441,18 @@ impl Blob {
self.meta.size = new_size;
self.set_data_size(new_size as u64).unwrap();
}
pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> {
let mut p = r.write().expect("'r' write lock in pub fn recv_from");
trace!("receiving on {}", socket.local_addr().unwrap());
let (nrecv, from) = socket.recv_from(&mut p.data)?;
p.meta.size = nrecv;
p.meta.set_addr(&from);
trace!("got {} bytes from {}", nrecv, from);
Ok(())
}
pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result<SharedBlobs> {
let mut v = Vec::new();
//DOCUMENTED SIDE-EFFECT
@ -452,29 +464,23 @@ impl Blob {
socket.set_nonblocking(false)?;
for i in 0..NUM_BLOBS {
let r = re.allocate();
{
let mut p = r.write().expect("'r' write lock in pub fn recv_from");
trace!("receiving on {}", socket.local_addr().unwrap());
match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => {
trace!("got {:?} messages on {}", i, socket.local_addr().unwrap());
break;
}
Err(e) => {
if e.kind() != io::ErrorKind::WouldBlock {
info!("recv_from err {:?}", e);
}
return Err(Error::IO(e));
}
Ok((nrecv, from)) => {
p.meta.size = nrecv;
p.meta.set_addr(&from);
trace!("got {} bytes from {}", nrecv, from);
if i == 0 {
socket.set_nonblocking(true)?;
}
}
match Blob::recv_blob(socket, &r) {
Err(_) if i > 0 => {
trace!("got {:?} messages on {}", i, socket.local_addr().unwrap());
re.recycle(r, "Bob::recv_from::i>0");
break;
}
Err(e) => {
if e.kind() != io::ErrorKind::WouldBlock {
info!("recv_from err {:?}", e);
}
re.recycle(r, "Blob::recv_from::empty");
return Err(Error::IO(e));
}
Ok(()) => if i == 0 {
socket.set_nonblocking(true)?;
},
}
v.push(r);
}

View File

@ -5,11 +5,12 @@
//! Transaction, the latest hash, and the number of hashes since the last transaction.
//! The resulting stream of entries represents ordered transactions in time.
use bank::Bank;
use entry::Entry;
use hash::Hash;
use recorder::Recorder;
use service::Service;
use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError};
use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle};
use std::time::{Duration, Instant};
use transaction::Transaction;
@ -27,20 +28,16 @@ pub struct RecordStage {
impl RecordStage {
/// A background thread that will continue tagging received Transaction messages and
/// sending back Entry messages until either the receiver or sender channel is closed.
pub fn new(
signal_receiver: Receiver<Signal>,
start_hash: &Hash,
) -> (Self, Receiver<Vec<Entry>>) {
pub fn new(signal_receiver: Receiver<Signal>, bank: Arc<Bank>) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel();
let start_hash = *start_hash;
let start_hash = bank.last_id();
let thread_hdl = Builder::new()
.name("solana-record-stage".to_string())
.spawn(move || {
let mut recorder = Recorder::new(start_hash);
let _ = Self::process_signals(&mut recorder, &signal_receiver, &entry_sender);
})
.unwrap();
let _ = Self::process_signals(&mut recorder, &signal_receiver, bank, &entry_sender);
}).unwrap();
(RecordStage { thread_hdl }, entry_receiver)
}
@ -48,11 +45,11 @@ impl RecordStage {
/// Same as `RecordStage::new`, but will automatically produce entries every `tick_duration`.
pub fn new_with_clock(
signal_receiver: Receiver<Signal>,
start_hash: &Hash,
bank: Arc<Bank>,
tick_duration: Duration,
) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel();
let start_hash = *start_hash;
let start_hash = bank.last_id();
let thread_hdl = Builder::new()
.name("solana-record-stage".to_string())
@ -65,6 +62,7 @@ impl RecordStage {
start_time,
tick_duration,
&signal_receiver,
bank.clone(),
&entry_sender,
).is_err()
{
@ -72,14 +70,14 @@ impl RecordStage {
}
recorder.hash();
}
})
.unwrap();
}).unwrap();
(RecordStage { thread_hdl }, entry_receiver)
}
fn process_signal(
signal: Signal,
bank: &Arc<Bank>,
recorder: &mut Recorder,
sender: &Sender<Vec<Entry>>,
) -> Result<(), ()> {
@ -89,6 +87,13 @@ impl RecordStage {
vec![]
};
let entries = recorder.record(txs);
for entry in entries.iter() {
if !entry.has_more {
bank.register_entry_id(&entry.id);
}
}
sender.send(entries).or(Err(()))?;
Ok(())
}
@ -96,11 +101,12 @@ impl RecordStage {
fn process_signals(
recorder: &mut Recorder,
receiver: &Receiver<Signal>,
bank: Arc<Bank>,
sender: &Sender<Vec<Entry>>,
) -> Result<(), ()> {
loop {
match receiver.recv() {
Ok(signal) => Self::process_signal(signal, recorder, sender)?,
Ok(signal) => Self::process_signal(signal, &bank, recorder, sender)?,
Err(RecvError) => return Err(()),
}
}
@ -111,6 +117,7 @@ impl RecordStage {
start_time: Instant,
tick_duration: Duration,
receiver: &Receiver<Signal>,
bank: Arc<Bank>,
sender: &Sender<Vec<Entry>>,
) -> Result<(), ()> {
loop {
@ -118,7 +125,7 @@ impl RecordStage {
sender.send(vec![entry]).or(Err(()))?;
}
match receiver.try_recv() {
Ok(signal) => Self::process_signal(signal, recorder, sender)?,
Ok(signal) => Self::process_signal(signal, &bank, recorder, sender)?,
Err(TryRecvError::Empty) => return Ok(()),
Err(TryRecvError::Disconnected) => return Err(()),
};
@ -139,16 +146,21 @@ impl Service for RecordStage {
#[cfg(test)]
mod tests {
use super::*;
use bank::Bank;
use ledger::Block;
use mint::Mint;
use signature::{Keypair, KeypairUtil};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread::sleep;
#[test]
fn test_historian() {
let (tx_sender, tx_receiver) = channel();
let zero = Hash::default();
let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, &zero);
let mint = Mint::new(1234);
let bank = Arc::new(Bank::new(&mint));
let zero = bank.last_id();
let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, bank);
tx_sender.send(Signal::Tick).unwrap();
sleep(Duration::new(0, 1_000_000));
@ -173,8 +185,9 @@ mod tests {
#[test]
fn test_historian_closed_sender() {
let (tx_sender, tx_receiver) = channel();
let zero = Hash::default();
let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, &zero);
let mint = Mint::new(1234);
let bank = Arc::new(Bank::new(&mint));
let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, bank);
drop(entry_receiver);
tx_sender.send(Signal::Tick).unwrap();
assert_eq!(record_stage.thread_hdl.join().unwrap(), ());
@ -183,8 +196,10 @@ mod tests {
#[test]
fn test_transactions() {
let (tx_sender, signal_receiver) = channel();
let zero = Hash::default();
let (_record_stage, entry_receiver) = RecordStage::new(signal_receiver, &zero);
let mint = Mint::new(1234);
let bank = Arc::new(Bank::new(&mint));
let zero = bank.last_id();
let (_record_stage, entry_receiver) = RecordStage::new(signal_receiver, bank);
let alice_keypair = Keypair::new();
let bob_pubkey = Keypair::new().pubkey();
let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero);
@ -200,9 +215,11 @@ mod tests {
#[test]
fn test_clock() {
let (tx_sender, tx_receiver) = channel();
let zero = Hash::default();
let mint = Mint::new(1234);
let bank = Arc::new(Bank::new(&mint));
let zero = bank.last_id();
let (_record_stage, entry_receiver) =
RecordStage::new_with_clock(tx_receiver, &zero, Duration::from_millis(20));
RecordStage::new_with_clock(tx_receiver, bank, Duration::from_millis(20));
sleep(Duration::from_millis(900));
tx_sender.send(Signal::Tick).unwrap();
drop(tx_sender);

View File

@ -110,8 +110,7 @@ impl ReplicateStage {
_ => error!("{:?}", e),
}
}
})
.unwrap();
}).unwrap();
let mut thread_hdls = vec![t_responder, t_replicate];
thread_hdls.extend(vote_stage.thread_hdls());

View File

@ -28,8 +28,7 @@ impl RequestStage {
deserialize(&x.data[0..x.meta.size])
.map(|req| (req, x.meta.addr()))
.ok()
})
.collect()
}).collect()
}
pub fn process_request_packets(
@ -103,8 +102,7 @@ impl RequestStage {
_ => error!("{:?}", e),
}
}
})
.unwrap();
}).unwrap();
(
RequestStage {
thread_hdl,

View File

@ -69,8 +69,7 @@ fn retransmitter(
}
}
trace!("exiting retransmitter");
})
.unwrap()
}).unwrap()
}
pub struct RetransmitStage {

View File

@ -96,8 +96,7 @@ pub fn ed25519_verify_cpu(batches: &[SharedPackets]) -> Vec<Vec<u8>> {
.par_iter()
.map(verify_packet)
.collect()
})
.collect();
}).collect();
inc_new_counter_info!("ed25519_verify_cpu", count);
rv
}
@ -115,8 +114,7 @@ pub fn ed25519_verify_disabled(batches: &[SharedPackets]) -> Vec<Vec<u8>> {
.par_iter()
.map(verify_packet_disabled)
.collect()
})
.collect();
}).collect();
inc_new_counter_info!("ed25519_verify_disabled", count);
rv
}

View File

@ -89,8 +89,7 @@ impl SigVerifyStage {
.add_field(
"total_time_ms",
influxdb::Value::Integer(total_time_ms as i64),
)
.to_owned(),
).to_owned(),
);
Ok(())

View File

@ -58,8 +58,7 @@ pub fn receiver(
.spawn(move || {
let _ = recv_loop(&sock, &exit, &recycler, &packet_sender);
()
})
.unwrap()
}).unwrap()
}
fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> {
@ -104,8 +103,7 @@ pub fn responder(
_ => warn!("{} responder error: {:?}", name, e),
}
}
})
.unwrap()
}).unwrap()
}
//TODO, we would need to stick block authentication before we create the
@ -137,8 +135,7 @@ pub fn blob_receiver(
break;
}
let _ = recv_blobs(&recycler, &sock, &s);
})
.unwrap()
}).unwrap()
}
#[cfg(test)]

View File

@ -163,8 +163,7 @@ impl ThinClient {
.add_field(
"duration_ms",
influxdb::Value::Integer(timing::duration_as_ms(&now.elapsed()) as i64),
)
.to_owned(),
).to_owned(),
);
result
}
@ -285,8 +284,7 @@ impl ThinClient {
.add_field(
"duration_ms",
influxdb::Value::Integer(timing::duration_as_ms(elapsed) as i64),
)
.to_owned(),
).to_owned(),
);
}
@ -359,8 +357,7 @@ impl ThinClient {
.add_field(
"duration_ms",
influxdb::Value::Integer(timing::duration_as_ms(&now.elapsed()) as i64),
)
.to_owned(),
).to_owned(),
);
self.signature_status
}

View File

@ -28,6 +28,7 @@
use bank::Bank;
use banking_stage::BankingStage;
use crdt::Crdt;
use entry::Entry;
use fetch_stage::FetchStage;
use packet::{BlobRecycler, PacketRecycler};
use record_stage::RecordStage;
@ -36,10 +37,10 @@ use signature::Keypair;
use sigverify_stage::SigVerifyStage;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use streamer::BlobReceiver;
use write_stage::WriteStage;
pub struct Tpu {
@ -61,7 +62,7 @@ impl Tpu {
exit: Arc<AtomicBool>,
ledger_path: &str,
sigverify_disabled: bool,
) -> (Self, BlobReceiver) {
) -> (Self, Receiver<Vec<Entry>>) {
let packet_recycler = PacketRecycler::default();
let (fetch_stage, packet_receiver) =
@ -75,12 +76,12 @@ impl Tpu {
let (record_stage, entry_receiver) = match tick_duration {
Some(tick_duration) => {
RecordStage::new_with_clock(signal_receiver, &bank.last_id(), tick_duration)
RecordStage::new_with_clock(signal_receiver, bank.clone(), tick_duration)
}
None => RecordStage::new(signal_receiver, &bank.last_id()),
None => RecordStage::new(signal_receiver, bank.clone()),
};
let (write_stage, blob_receiver) = WriteStage::new(
let (write_stage, entry_forwarder) = WriteStage::new(
keypair,
bank.clone(),
crdt.clone(),
@ -96,7 +97,7 @@ impl Tpu {
record_stage,
write_stage,
};
(tpu, blob_receiver)
(tpu, entry_forwarder)
}
pub fn close(self) -> thread::Result<()> {

View File

@ -83,8 +83,7 @@ fn get_last_id_to_vote_on(
.add_field(
"valid_peers",
influxdb::Value::Integer(valid_ids.len() as i64),
)
.to_owned(),
).to_owned(),
);
if valid_ids.len() > super_majority_index {
@ -103,8 +102,7 @@ fn get_last_id_to_vote_on(
.add_field(
"duration_ms",
influxdb::Value::Integer((now - *last_valid_validator_timestamp) as i64),
)
.to_owned(),
).to_owned(),
);
}
@ -408,8 +406,7 @@ pub mod tests {
// sleep to get a different timestamp in the bank
sleep(Duration::from_millis(1));
last_id
})
.collect();
}).collect();
// see that we fail to have 2/3rds consensus
assert!(

View File

@ -90,8 +90,7 @@ impl WindowUtil for Window {
}
self[i].clear_data(recycler);
Some(pix)
})
.collect()
}).collect()
}
fn repair(
@ -140,8 +139,7 @@ impl WindowUtil for Window {
} else {
" "
}
})
.collect();
}).collect();
let buf: Vec<_> = self
.iter()
@ -157,8 +155,7 @@ impl WindowUtil for Window {
// data.is_none()
"c"
}
})
.collect();
}).collect();
format!(
"\n{}: WINDOW ({}): {}\n{}: WINDOW ({}): {}",
id,

View File

@ -61,7 +61,8 @@ fn add_block_to_retransmit_queue(
leader_id
);
if p.get_id()
.expect("get_id in fn add_block_to_retransmit_queue") == leader_id
.expect("get_id in fn add_block_to_retransmit_queue")
== leader_id
{
//TODO
//need to copy the retransmitted blob
@ -293,8 +294,7 @@ pub fn window_service(
});
}
}
})
.unwrap()
}).unwrap()
}
#[cfg(test)]
@ -561,8 +561,7 @@ mod test {
let rv = repair_backoff(&mut last, &mut times, 1) as usize;
assert_eq!(times, x + 2);
rv
})
.sum();
}).sum();
assert_eq!(times, 128);
assert_eq!(last, 1);
repair_backoff(&mut last, &mut times, 1);
@ -571,8 +570,7 @@ mod test {
assert_eq!(times, 2);
assert_eq!(last, 2);
total
})
.sum();
}).sum();
let avg = res / num_tests;
assert!(avg >= 3);
assert!(avg <= 5);

View File

@ -14,11 +14,12 @@ use service::Service;
use signature::Keypair;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use streamer::{responder, BlobReceiver, BlobSender};
use std::time::{Duration, Instant};
use streamer::responder;
use timing::{duration_as_ms, duration_as_s};
use vote_stage::send_leader_vote;
pub struct WriteStage {
@ -27,41 +28,72 @@ pub struct WriteStage {
impl WriteStage {
/// Process any Entry items that have been published by the RecordStage.
/// continuosly broadcast blobs of entries out
/// continuosly send entries out
pub fn write_and_send_entries(
crdt: &Arc<RwLock<Crdt>>,
bank: &Arc<Bank>,
ledger_writer: &mut LedgerWriter,
blob_sender: &BlobSender,
blob_recycler: &BlobRecycler,
entry_sender: &Sender<Vec<Entry>>,
entry_receiver: &Receiver<Vec<Entry>>,
) -> Result<()> {
let mut ventries = Vec::new();
let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
let mut num_entries = entries.len();
let mut num_txs = 0;
let votes = &entries.votes();
crdt.write().unwrap().insert_votes(&votes);
ventries.push(entries);
while let Ok(more) = entry_receiver.try_recv() {
num_entries += more.len();
ventries.push(more);
}
ledger_writer.write_entries(entries.clone())?;
info!("write_stage entries: {}", num_entries);
for entry in &entries {
if !entry.has_more {
bank.register_entry_id(&entry.id);
let to_blobs_total = 0;
let mut blob_send_total = 0;
let mut register_entry_total = 0;
let mut crdt_votes_total = 0;
let start = Instant::now();
for _ in 0..ventries.len() {
let entries = ventries.pop().unwrap();
for e in entries.iter() {
num_txs += e.transactions.len();
}
let crdt_votes_start = Instant::now();
let votes = &entries.votes();
crdt.write().unwrap().insert_votes(&votes);
crdt_votes_total += duration_as_ms(&crdt_votes_start.elapsed());
ledger_writer.write_entries(entries.clone())?;
let register_entry_start = Instant::now();
register_entry_total += duration_as_ms(&register_entry_start.elapsed());
inc_new_counter_info!("write_stage-write_entries", entries.len());
//TODO(anatoly): real stake based voting needs to change this
//leader simply votes if the current set of validators have voted
//on a valid last id
trace!("New entries? {}", entries.len());
let blob_send_start = Instant::now();
if !entries.is_empty() {
inc_new_counter_info!("write_stage-recv_vote", votes.len());
inc_new_counter_info!("write_stage-broadcast_entries", entries.len());
trace!("broadcasting {}", entries.len());
entry_sender.send(entries)?;
}
blob_send_total += duration_as_ms(&blob_send_start.elapsed());
}
info!("done write_stage txs: {} time {} ms txs/s: {} to_blobs_total: {} register_entry_total: {} blob_send_total: {} crdt_votes_total: {}",
num_txs, duration_as_ms(&start.elapsed()),
num_txs as f32 / duration_as_s(&start.elapsed()),
to_blobs_total,
register_entry_total,
blob_send_total,
crdt_votes_total);
//TODO(anatoly): real stake based voting needs to change this
//leader simply votes if the current set of validators have voted
//on a valid last id
trace!("New blobs? {}", entries.len());
let blobs = entries.to_blobs(blob_recycler);
if !blobs.is_empty() {
inc_new_counter_info!("write_stage-recv_vote", votes.len());
inc_new_counter_info!("write_stage-broadcast_blobs", blobs.len());
trace!("broadcasting {}", blobs.len());
blob_sender.send(blobs)?;
}
Ok(())
}
@ -73,7 +105,7 @@ impl WriteStage {
blob_recycler: BlobRecycler,
ledger_path: &str,
entry_receiver: Receiver<Vec<Entry>>,
) -> (Self, BlobReceiver) {
) -> (Self, Receiver<Vec<Entry>>) {
let (vote_blob_sender, vote_blob_receiver) = channel();
let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
let t_responder = responder(
@ -82,7 +114,7 @@ impl WriteStage {
blob_recycler.clone(),
vote_blob_receiver,
);
let (blob_sender, blob_receiver) = channel();
let (entry_sender, entry_receiver_forward) = channel();
let mut ledger_writer = LedgerWriter::recover(ledger_path).unwrap();
let thread_hdl = Builder::new()
@ -94,10 +126,8 @@ impl WriteStage {
loop {
if let Err(e) = Self::write_and_send_entries(
&crdt,
&bank,
&mut ledger_writer,
&blob_sender,
&blob_recycler,
&entry_sender,
&entry_receiver,
) {
match e {
@ -126,11 +156,10 @@ impl WriteStage {
error!("{:?}", e);
}
}
})
.unwrap();
}).unwrap();
let thread_hdls = vec![t_responder, thread_hdl];
(WriteStage { thread_hdls }, blob_receiver)
(WriteStage { thread_hdls }, entry_receiver_forward)
}
}

View File

@ -176,8 +176,7 @@ pub fn crdt_retransmit() -> result::Result<()> {
s.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let res = s.recv_from(&mut b.data);
res.is_err() //true if failed to receive the retransmit packet
})
.collect();
}).collect();
//true if failed receive the retransmit packet, r2, and r3 should succeed
//r1 was the sender, so it should fail to receive the packet
assert_eq!(res, [true, false, false]);

View File

@ -593,10 +593,8 @@ fn test_multi_node_dynamic_network() {
assert_eq!(bal, Some(500));
info!("sent balance to[{}/{}] {}", n, num_nodes, keypair.pubkey());
keypair
})
.unwrap()
})
.collect();
}).unwrap()
}).collect();
info!("Waiting for keypairs to be created");
let keypairs: Vec<_> = t1.into_iter().map(|t| t.join().unwrap()).collect();
@ -622,10 +620,8 @@ fn test_multi_node_dynamic_network() {
true,
);
(rd, val)
})
.unwrap()
})
.collect();
}).unwrap()
}).collect();
let mut validators: Vec<_> = t2.into_iter().map(|t| t.join().unwrap()).collect();
@ -645,8 +641,7 @@ fn test_multi_node_dynamic_network() {
&alice_arc.read().unwrap().keypair(),
bob_pubkey,
&last_id,
)
.unwrap();
).unwrap();
expected_balance += 500;