Compare commits

...

13 Commits

Author SHA1 Message Date
390af512de Permit testnets without a GPU 2018-09-26 10:38:10 -07:00
2a9be901da Mark --outfile parameter as required 2018-09-26 10:38:06 -07:00
d794fee66f Remove unused variables and imports after cherry-picking from master 2018-09-19 11:49:47 -07:00
9b66d4d363 Read multiple entries in write stage (#1259)
- Also use rayon to parallelize to_blobs() to maximize CPU usage
2018-09-19 11:49:47 -07:00
bff8f2614b Move entry->blob creation out of write stage (#1257)
- The write stage will output vector of entries
- Broadcast stage will create blobs out of the entries
- Helps reduce MIPS requirements for write stage
2018-09-19 11:49:47 -07:00
8f0648e8fc Move register_entry_id() call out of write stage (#1253)
* Move register_entry_id() call out of write stage

- Write stage is MIPS intensive and has become a bottleneck for
  TPU pipeline
- This will reduce the MIPS requirements for the stage

* Fix rust format issues
2018-09-19 11:49:47 -07:00
d81eaf69db Update fetch-perf-libs.sh 2018-09-17 11:54:45 -07:00
b5935a3830 cargo fmt 2018-09-14 20:30:04 -07:00
c1b07d0f21 Upgrade rust stable to 1.29 2018-09-14 20:30:04 -07:00
a1579b5a47 Remove large-network test, it's ignored anyway 2018-09-14 20:11:46 -07:00
77949a4be6 cherry pick readme update 2018-09-13 19:19:48 -07:00
af58940964 Fix missing recycle in recv_from (#1205)
In the error case that i>0 (we have blobs to send)
we break out of the loop and do not push the allocated r
to the v array. We should recycle this blob, otherwise it
will be dropped.
2018-09-13 10:27:24 -07:00
21963b8c82 fix "leak" in Blob::recv_from (#1198)
* fix "leak" in Blob::recv_from

fixes #1199
2018-09-13 10:27:24 -07:00
47 changed files with 321 additions and 322 deletions

View File

@ -62,7 +62,7 @@ your odds of success if you check out the
before proceeding: before proceeding:
```bash ```bash
$ git checkout v0.7.2 $ git checkout v0.8.0
``` ```
Configuration Setup Configuration Setup
@ -113,7 +113,7 @@ To run a multinode testnet, after starting a leader node, spin up some validator
separate shells: separate shells:
```bash ```bash
$ ./multinode-demo/validator.sh ubuntu@10.0.1.51:~/solana 10.0.1.51 $ ./multinode-demo/validator.sh
``` ```
To run a performance-enhanced leader or validator (on Linux), To run a performance-enhanced leader or validator (on Linux),
@ -123,22 +123,20 @@ your system:
```bash ```bash
$ ./fetch-perf-libs.sh $ ./fetch-perf-libs.sh
$ SOLANA_CUDA=1 ./multinode-demo/leader.sh $ SOLANA_CUDA=1 ./multinode-demo/leader.sh
$ SOLANA_CUDA=1 ./multinode-demo/validator.sh ubuntu@10.0.1.51:~/solana 10.0.1.51 $ SOLANA_CUDA=1 ./multinode-demo/validator.sh
``` ```
Testnet Client Demo Testnet Client Demo
--- ---
Now that your singlenode or multinode testnet is up and running let's send it some transactions! Note that we pass in Now that your singlenode or multinode testnet is up and running let's send it
the expected number of nodes in the network. If running singlenode, pass 1; if multinode, pass the number some transactions!
of validators you started.
In a separate shell start the client: In a separate shell start the client:
```bash ```bash
$ ./multinode-demo/client.sh ubuntu@10.0.1.51:~/solana 1 $ ./multinode-demo/client.sh # runs against localhost by default
``` ```
What just happened? The client demo spins up several threads to send 500,000 transactions What just happened? The client demo spins up several threads to send 500,000 transactions
@ -155,7 +153,7 @@ Public Testnet
In this example the client connects to our public testnet. To run validators on the testnet you would need to open udp ports `8000-10000`. In this example the client connects to our public testnet. To run validators on the testnet you would need to open udp ports `8000-10000`.
```bash ```bash
$ ./multinode-demo/client.sh testnet.solana.com 1 #The minumum number of nodes to discover on the network $ ./multinode-demo/client.sh --network $(dig +short testnet.solana.com):8001 --identity config-private/client-id.json --duration 60
``` ```
You can observe the effects of your client's transactions on our [dashboard](https://metrics.solana.com:3000/d/testnet/testnet-hud?orgId=2&from=now-30m&to=now&refresh=5s&var-testnet=testnet) You can observe the effects of your client's transactions on our [dashboard](https://metrics.solana.com:3000/d/testnet/testnet-hud?orgId=2&from=now-30m&to=now&refresh=5s&var-testnet=testnet)

View File

@ -37,8 +37,7 @@ fn bench_process_transaction(bencher: &mut Bencher) {
// Finally, return the transaction to the benchmark. // Finally, return the transaction to the benchmark.
tx tx
}) }).collect();
.collect();
bencher.iter(|| { bencher.iter(|| {
// Since benchmarker runs this multiple times, we need to clear the signatures. // Since benchmarker runs this multiple times, we need to clear the signatures.

View File

@ -116,8 +116,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
i as i64, i as i64,
mint.last_id(), mint.last_id(),
) )
}) }).collect();
.collect();
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (signal_sender, signal_receiver) = channel(); let (signal_sender, signal_receiver) = channel();
@ -131,8 +130,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
mint_total / num_src_accounts as i64, mint_total / num_src_accounts as i64,
mint.last_id(), mint.last_id(),
) )
}) }).collect();
.collect();
bencher.iter(move || { bencher.iter(move || {
let bank = Arc::new(Bank::new(&mint)); let bank = Arc::new(Bank::new(&mint));
@ -143,8 +141,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
.map(|x| { .map(|x| {
let len = (*x).read().unwrap().packets.len(); let len = (*x).read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect()) (x, iter::repeat(1).take(len).collect())
}) }).collect();
.collect();
verified_sender.send(verified_setup).unwrap(); verified_sender.send(verified_setup).unwrap();
BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler) BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler)
@ -157,8 +154,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
.map(|x| { .map(|x| {
let len = (*x).read().unwrap().packets.len(); let len = (*x).read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect()) (x, iter::repeat(1).take(len).collect())
}) }).collect();
.collect();
verified_sender.send(verified).unwrap(); verified_sender.send(verified).unwrap();
BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler) BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler)
@ -187,8 +183,7 @@ fn bench_banking_stage_single_from(bencher: &mut Bencher) {
i as i64, i as i64,
mint.last_id(), mint.last_id(),
) )
}) }).collect();
.collect();
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (signal_sender, signal_receiver) = channel(); let (signal_sender, signal_receiver) = channel();
@ -201,8 +196,7 @@ fn bench_banking_stage_single_from(bencher: &mut Bencher) {
.map(|x| { .map(|x| {
let len = (*x).read().unwrap().packets.len(); let len = (*x).read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect()) (x, iter::repeat(1).take(len).collect())
}) }).collect();
.collect();
verified_sender.send(verified).unwrap(); verified_sender.send(verified).unwrap();
BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler) BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler)
.unwrap(); .unwrap();

View File

@ -1,5 +1,5 @@
steps: steps:
- command: "ci/docker-run.sh solanalabs/rust:1.28.0 ci/test-stable.sh" - command: "ci/docker-run.sh solanalabs/rust:1.29.0 ci/test-stable.sh"
name: "stable [public]" name: "stable [public]"
env: env:
CARGO_TARGET_CACHE_NAME: "stable" CARGO_TARGET_CACHE_NAME: "stable"
@ -24,13 +24,6 @@ steps:
timeout_in_minutes: 20 timeout_in_minutes: 20
agents: agents:
- "queue=cuda" - "queue=cuda"
- command: "ci/test-large-network.sh || true"
name: "large-network [public] [ignored]"
env:
CARGO_TARGET_CACHE_NAME: "stable"
timeout_in_minutes: 20
agents:
- "queue=large"
- command: "ci/pr-snap.sh" - command: "ci/pr-snap.sh"
timeout_in_minutes: 20 timeout_in_minutes: 20
name: "snap [public]" name: "snap [public]"

View File

@ -1,6 +1,6 @@
# Note: when the rust version (1.28) is changed also modify # Note: when the rust version (1.28) is changed also modify
# ci/buildkite.yml to pick up the new image tag # ci/buildkite.yml to pick up the new image tag
FROM rust:1.28 FROM rust:1.29.0
RUN set -x && \ RUN set -x && \
apt update && \ apt update && \

View File

@ -9,6 +9,7 @@ validatorNodeCount=10
publicNetwork=false publicNetwork=false
snapChannel=edge snapChannel=edge
delete=false delete=false
enableGpu=false
usage() { usage() {
exitcode=0 exitcode=0
@ -30,6 +31,7 @@ Deploys a CD testnet
-n [number] - Number of validator nodes (default: $validatorNodeCount) -n [number] - Number of validator nodes (default: $validatorNodeCount)
-c [number] - Number of client nodes (default: $clientNodeCount) -c [number] - Number of client nodes (default: $clientNodeCount)
-P - Use public network IP addresses (default: $publicNetwork) -P - Use public network IP addresses (default: $publicNetwork)
-g - Enable GPU (default: $enableGpu)
-a [address] - Set the leader node's external IP address to this GCE address -a [address] - Set the leader node's external IP address to this GCE address
-d - Delete the network -d - Delete the network
@ -45,7 +47,7 @@ zone=$2
[[ -n $zone ]] || usage "Zone not specified" [[ -n $zone ]] || usage "Zone not specified"
shift 2 shift 2
while getopts "h?p:Pn:c:s:a:d" opt; do while getopts "h?p:Pn:c:s:ga:d" opt; do
case $opt in case $opt in
h | \?) h | \?)
usage usage
@ -69,6 +71,9 @@ while getopts "h?p:Pn:c:s:a:d" opt; do
;; ;;
esac esac
;; ;;
g)
enableGpu=true
;;
a) a)
leaderAddress=$OPTARG leaderAddress=$OPTARG
;; ;;
@ -86,11 +91,14 @@ gce_create_args=(
-a "$leaderAddress" -a "$leaderAddress"
-c "$clientNodeCount" -c "$clientNodeCount"
-n "$validatorNodeCount" -n "$validatorNodeCount"
-g
-p "$netName" -p "$netName"
-z "$zone" -z "$zone"
) )
if $enableGpu; then
gce_create_args+=(-g)
fi
if $publicNetwork; then if $publicNetwork; then
gce_create_args+=(-P) gce_create_args+=(-P)
fi fi

View File

@ -23,8 +23,8 @@ nightly)
require cargo 1.29.[0-9]+-nightly require cargo 1.29.[0-9]+-nightly
;; ;;
stable) stable)
require rustc 1.28.[0-9]+ require rustc 1.29.[0-9]+
require cargo 1.28.[0-9]+ require cargo 1.29.[0-9]+
;; ;;
*) *)
echo Error: unknown argument: "$1" echo Error: unknown argument: "$1"

View File

@ -15,7 +15,7 @@ mkdir -p target/perf-libs
cd target/perf-libs cd target/perf-libs
( (
set -x set -x
curl https://solana-perf.s3.amazonaws.com/master/x86_64-unknown-linux-gnu/solana-perf.tgz | tar zxvf - curl https://solana-perf.s3.amazonaws.com/v0.8.0/x86_64-unknown-linux-gnu/solana-perf.tgz | tar zxvf -
) )
if [[ -r /usr/local/cuda/version.txt && -r cuda-version.txt ]]; then if [[ -r /usr/local/cuda/version.txt && -r cuda-version.txt ]]; then

View File

@ -18,4 +18,8 @@ usage() {
exit 1 exit 1
} }
if [[ -z $1 ]]; then # default behavior
$solana_bench_tps --identity config-private/client-id.json --network 127.0.0.1:8001 --duration 90
else
$solana_bench_tps "$@" $solana_bench_tps "$@"
fi

View File

@ -34,6 +34,7 @@ ip_address_arg=-l
num_tokens=1000000000 num_tokens=1000000000
node_type_leader=true node_type_leader=true
node_type_validator=true node_type_validator=true
node_type_client=true
while getopts "h?n:lpt:" opt; do while getopts "h?n:lpt:" opt; do
case $opt in case $opt in
h|\?) h|\?)
@ -55,10 +56,17 @@ while getopts "h?n:lpt:" opt; do
leader) leader)
node_type_leader=true node_type_leader=true
node_type_validator=false node_type_validator=false
node_type_client=false
;; ;;
validator) validator)
node_type_leader=false node_type_leader=false
node_type_validator=true node_type_validator=true
node_type_client=false
;;
client)
node_type_leader=false
node_type_validator=false
node_type_client=true
;; ;;
*) *)
usage "Error: unknown node type: $node_type" usage "Error: unknown node type: $node_type"
@ -74,13 +82,19 @@ done
set -e set -e
if $node_type_leader; then for i in "$SOLANA_CONFIG_DIR" "$SOLANA_CONFIG_VALIDATOR_DIR" "$SOLANA_CONFIG_PRIVATE_DIR"; do
for i in "$SOLANA_CONFIG_DIR" "$SOLANA_CONFIG_PRIVATE_DIR"; do
echo "Cleaning $i" echo "Cleaning $i"
rm -rvf "$i" rm -rvf "$i"
mkdir -p "$i" mkdir -p "$i"
done done
if $node_type_client; then
client_id_path="$SOLANA_CONFIG_PRIVATE_DIR"/client-id.json
$solana_keygen -o "$client_id_path"
ls -lhR "$SOLANA_CONFIG_PRIVATE_DIR"/
fi
if $node_type_leader; then
leader_address_args=("$ip_address_arg") leader_address_args=("$ip_address_arg")
leader_id_path="$SOLANA_CONFIG_PRIVATE_DIR"/leader-id.json leader_id_path="$SOLANA_CONFIG_PRIVATE_DIR"/leader-id.json
mint_path="$SOLANA_CONFIG_PRIVATE_DIR"/mint.json mint_path="$SOLANA_CONFIG_PRIVATE_DIR"/mint.json
@ -102,11 +116,6 @@ fi
if $node_type_validator; then if $node_type_validator; then
echo "Cleaning $SOLANA_CONFIG_VALIDATOR_DIR"
rm -rvf "$SOLANA_CONFIG_VALIDATOR_DIR"
mkdir -p "$SOLANA_CONFIG_VALIDATOR_DIR"
validator_address_args=("$ip_address_arg" -b 9000) validator_address_args=("$ip_address_arg" -b 9000)
validator_id_path="$SOLANA_CONFIG_PRIVATE_DIR"/validator-id.json validator_id_path="$SOLANA_CONFIG_PRIVATE_DIR"/validator-id.json

View File

@ -449,8 +449,7 @@ impl Bank {
.map(|(acc, tx)| match acc { .map(|(acc, tx)| match acc {
Err(e) => Err(e.clone()), Err(e) => Err(e.clone()),
Ok(ref mut accounts) => Self::execute_transaction(tx, accounts), Ok(ref mut accounts) => Self::execute_transaction(tx, accounts),
}) }).collect();
.collect();
let execution_elapsed = now.elapsed(); let execution_elapsed = now.elapsed();
let now = Instant::now(); let now = Instant::now();
Self::store_accounts(&res, &loaded_accounts, &mut accounts); Self::store_accounts(&res, &loaded_accounts, &mut accounts);
@ -1010,8 +1009,7 @@ mod tests {
let last_id = hash(&serialize(&i).unwrap()); // Unique hash let last_id = hash(&serialize(&i).unwrap()); // Unique hash
bank.register_entry_id(&last_id); bank.register_entry_id(&last_id);
last_id last_id
}) }).collect();
.collect();
assert_eq!(bank.count_valid_ids(&[]).len(), 0); assert_eq!(bank.count_valid_ids(&[]).len(), 0);
assert_eq!(bank.count_valid_ids(&[mint.last_id()]).len(), 0); assert_eq!(bank.count_valid_ids(&[mint.last_id()]).len(), 0);
for (i, id) in bank.count_valid_ids(&ids).iter().enumerate() { for (i, id) in bank.count_valid_ids(&ids).iter().enumerate() {

View File

@ -52,8 +52,7 @@ impl BankingStage {
_ => error!("{:?}", e), _ => error!("{:?}", e),
} }
} }
}) }).unwrap();
.unwrap();
(BankingStage { thread_hdl }, signal_receiver) (BankingStage { thread_hdl }, signal_receiver)
} }
@ -66,8 +65,7 @@ impl BankingStage {
deserialize(&x.data[0..x.meta.size]) deserialize(&x.data[0..x.meta.size])
.map(|req| (req, x.meta.addr())) .map(|req| (req, x.meta.addr()))
.ok() .ok()
}) }).collect()
.collect()
} }
/// Process the incoming packets and send output `Signal` messages to `signal_sender`. /// Process the incoming packets and send output `Signal` messages to `signal_sender`.
@ -105,8 +103,7 @@ impl BankingStage {
} else { } else {
None None
}, },
}) }).collect();
.collect();
debug!("process_transactions"); debug!("process_transactions");
let results = bank.process_transactions(transactions); let results = bank.process_transactions(transactions);

View File

@ -68,8 +68,7 @@ fn main() -> Result<()> {
.value_name("NUM") .value_name("NUM")
.takes_value(true) .takes_value(true)
.help("Use NUM receive sockets"), .help("Use NUM receive sockets"),
) ).get_matches();
.get_matches();
if let Some(n) = matches.value_of("num-recv-sockets") { if let Some(n) = matches.value_of("num-recv-sockets") {
num_sockets = max(num_sockets, n.to_string().parse().expect("integer")); num_sockets = max(num_sockets, n.to_string().parse().expect("integer"));

View File

@ -135,8 +135,7 @@ fn send_barrier_transaction(barrier_client: &mut ThinClient, last_id: &mut Hash,
.add_tag( .add_tag(
"op", "op",
influxdb::Value::String("send_barrier_transaction".to_string()), influxdb::Value::String("send_barrier_transaction".to_string()),
) ).add_field("poll_count", influxdb::Value::Integer(poll_count))
.add_field("poll_count", influxdb::Value::Integer(poll_count))
.add_field("duration", influxdb::Value::Integer(duration_ms as i64)) .add_field("duration", influxdb::Value::Integer(duration_ms as i64))
.to_owned(), .to_owned(),
); );
@ -147,8 +146,7 @@ fn send_barrier_transaction(barrier_client: &mut ThinClient, last_id: &mut Hash,
&id.pubkey(), &id.pubkey(),
&Duration::from_millis(100), &Duration::from_millis(100),
&Duration::from_secs(10), &Duration::from_secs(10),
) ).expect("Failed to get balance");
.expect("Failed to get balance");
if balance != 1 { if balance != 1 {
panic!("Expected an account balance of 1 (balance: {}", balance); panic!("Expected an account balance of 1 (balance: {}", balance);
} }
@ -195,8 +193,7 @@ fn generate_txs(
} else { } else {
Transaction::new(keypair, id.pubkey(), 1, *last_id) Transaction::new(keypair, id.pubkey(), 1, *last_id)
} }
}) }).collect();
.collect();
let duration = signing_start.elapsed(); let duration = signing_start.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
@ -214,8 +211,7 @@ fn generate_txs(
.add_field( .add_field(
"duration", "duration",
influxdb::Value::Integer(duration_as_ms(&duration) as i64), influxdb::Value::Integer(duration_as_ms(&duration) as i64),
) ).to_owned(),
.to_owned(),
); );
let sz = transactions.len() / threads; let sz = transactions.len() / threads;
@ -267,8 +263,7 @@ fn do_tx_transfers(
.add_field( .add_field(
"duration", "duration",
influxdb::Value::Integer(duration_as_ms(&transfer_start.elapsed()) as i64), influxdb::Value::Integer(duration_as_ms(&transfer_start.elapsed()) as i64),
) ).add_field("count", influxdb::Value::Integer(tx_len as i64))
.add_field("count", influxdb::Value::Integer(tx_len as i64))
.to_owned(), .to_owned(),
); );
} }
@ -578,10 +573,8 @@ fn main() {
.name("solana-client-sample".to_string()) .name("solana-client-sample".to_string())
.spawn(move || { .spawn(move || {
sample_tx_count(&exit_signal, &maxes, first_tx_count, &v, sample_period); sample_tx_count(&exit_signal, &maxes, first_tx_count, &v, sample_period);
}) }).unwrap()
.unwrap() }).collect();
})
.collect();
let shared_txs: Arc<RwLock<VecDeque<Vec<Transaction>>>> = let shared_txs: Arc<RwLock<VecDeque<Vec<Transaction>>>> =
Arc::new(RwLock::new(VecDeque::new())); Arc::new(RwLock::new(VecDeque::new()));
@ -606,10 +599,8 @@ fn main() {
&shared_tx_active_thread_count, &shared_tx_active_thread_count,
&total_tx_sent_count, &total_tx_sent_count,
); );
}) }).unwrap()
.unwrap() }).collect();
})
.collect();
// generate and send transactions for the specified duration // generate and send transactions for the specified duration
let start = Instant::now(); let start = Instant::now();

View File

@ -48,8 +48,7 @@ fn main() -> Result<(), Box<error::Error>> {
.takes_value(true) .takes_value(true)
.required(true) .required(true)
.help("rendezvous with the network at this gossip entry point"), .help("rendezvous with the network at this gossip entry point"),
) ).arg(
.arg(
Arg::with_name("keypair") Arg::with_name("keypair")
.short("k") .short("k")
.long("keypair") .long("keypair")
@ -57,22 +56,19 @@ fn main() -> Result<(), Box<error::Error>> {
.takes_value(true) .takes_value(true)
.required(true) .required(true)
.help("File to read the client's keypair from"), .help("File to read the client's keypair from"),
) ).arg(
.arg(
Arg::with_name("slice") Arg::with_name("slice")
.long("slice") .long("slice")
.value_name("SECONDS") .value_name("SECONDS")
.takes_value(true) .takes_value(true)
.help("Time slice over which to limit requests to drone"), .help("Time slice over which to limit requests to drone"),
) ).arg(
.arg(
Arg::with_name("cap") Arg::with_name("cap")
.long("cap") .long("cap")
.value_name("NUMBER") .value_name("NUMBER")
.takes_value(true) .takes_value(true)
.help("Request limit for time slice"), .help("Request limit for time slice"),
) ).get_matches();
.get_matches();
let network = matches let network = matches
.value_of("network") .value_of("network")
@ -159,8 +155,7 @@ fn main() -> Result<(), Box<error::Error>> {
io::ErrorKind::Other, io::ErrorKind::Other,
format!("Drone response: {:?}", err), format!("Drone response: {:?}", err),
)) ))
})) })).then(|_| Ok(()));
.then(|_| Ok(()));
tokio::spawn(server) tokio::spawn(server)
}); });
tokio::run(done); tokio::run(done);

View File

@ -21,31 +21,27 @@ fn main() {
.long("local") .long("local")
.takes_value(false) .takes_value(false)
.help("detect network address from local machine configuration"), .help("detect network address from local machine configuration"),
) ).arg(
.arg(
Arg::with_name("keypair") Arg::with_name("keypair")
.short("k") .short("k")
.long("keypair") .long("keypair")
.value_name("PATH") .value_name("PATH")
.takes_value(true) .takes_value(true)
.help("/path/to/id.json"), .help("/path/to/id.json"),
) ).arg(
.arg(
Arg::with_name("public") Arg::with_name("public")
.short("p") .short("p")
.long("public") .long("public")
.takes_value(false) .takes_value(false)
.help("detect public network address using public servers"), .help("detect public network address using public servers"),
) ).arg(
.arg(
Arg::with_name("bind") Arg::with_name("bind")
.short("b") .short("b")
.long("bind") .long("bind")
.value_name("PORT") .value_name("PORT")
.takes_value(true) .takes_value(true)
.help("bind to port or address"), .help("bind to port or address"),
) ).get_matches();
.get_matches();
let bind_addr: SocketAddr = { let bind_addr: SocketAddr = {
let mut bind_addr = parse_port_or_addr(matches.value_of("bind"), FULLNODE_PORT_RANGE.0); let mut bind_addr = parse_port_or_addr(matches.value_of("bind"), FULLNODE_PORT_RANGE.0);

View File

@ -36,16 +36,14 @@ fn main() -> () {
.value_name("FILE") .value_name("FILE")
.takes_value(true) .takes_value(true)
.help("run with the identity found in FILE"), .help("run with the identity found in FILE"),
) ).arg(
.arg(
Arg::with_name("network") Arg::with_name("network")
.short("n") .short("n")
.long("network") .long("network")
.value_name("HOST:PORT") .value_name("HOST:PORT")
.takes_value(true) .takes_value(true)
.help("connect/rendezvous with the network at this gossip entry point"), .help("connect/rendezvous with the network at this gossip entry point"),
) ).arg(
.arg(
Arg::with_name("ledger") Arg::with_name("ledger")
.short("l") .short("l")
.long("ledger") .long("ledger")
@ -53,8 +51,7 @@ fn main() -> () {
.takes_value(true) .takes_value(true)
.required(true) .required(true)
.help("use DIR as persistent ledger location"), .help("use DIR as persistent ledger location"),
) ).get_matches();
.get_matches();
let (keypair, ncp) = if let Some(i) = matches.value_of("identity") { let (keypair, ncp) = if let Some(i) = matches.value_of("identity") {
let path = i.to_string(); let path = i.to_string();

View File

@ -25,8 +25,7 @@ fn main() -> Result<(), Box<error::Error>> {
.takes_value(true) .takes_value(true)
.required(true) .required(true)
.help("Number of tokens with which to initialize mint"), .help("Number of tokens with which to initialize mint"),
) ).arg(
.arg(
Arg::with_name("ledger") Arg::with_name("ledger")
.short("l") .short("l")
.long("ledger") .long("ledger")
@ -34,8 +33,7 @@ fn main() -> Result<(), Box<error::Error>> {
.takes_value(true) .takes_value(true)
.required(true) .required(true)
.help("use DIR as persistent ledger location"), .help("use DIR as persistent ledger location"),
) ).get_matches();
.get_matches();
let tokens = value_t_or_exit!(matches, "tokens", i64); let tokens = value_t_or_exit!(matches, "tokens", i64);
let ledger_path = matches.value_of("ledger").unwrap(); let ledger_path = matches.value_of("ledger").unwrap();

View File

@ -21,9 +21,9 @@ fn main() -> Result<(), Box<error::Error>> {
.long("outfile") .long("outfile")
.value_name("PATH") .value_name("PATH")
.takes_value(true) .takes_value(true)
.required(true)
.help("path to generated file"), .help("path to generated file"),
) ).get_matches();
.get_matches();
let rnd = SystemRandom::new(); let rnd = SystemRandom::new();
let pkcs8_bytes = Ed25519KeyPair::generate_pkcs8(&rnd)?; let pkcs8_bytes = Ed25519KeyPair::generate_pkcs8(&rnd)?;

View File

@ -84,23 +84,20 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.value_name("PATH") .value_name("PATH")
.takes_value(true) .takes_value(true)
.help("/path/to/leader.json"), .help("/path/to/leader.json"),
) ).arg(
.arg(
Arg::with_name("keypair") Arg::with_name("keypair")
.short("k") .short("k")
.long("keypair") .long("keypair")
.value_name("PATH") .value_name("PATH")
.takes_value(true) .takes_value(true)
.help("/path/to/id.json"), .help("/path/to/id.json"),
) ).arg(
.arg(
Arg::with_name("timeout") Arg::with_name("timeout")
.long("timeout") .long("timeout")
.value_name("SECONDS") .value_name("SECONDS")
.takes_value(true) .takes_value(true)
.help("Max SECONDS to wait to get necessary gossip from the network"), .help("Max SECONDS to wait to get necessary gossip from the network"),
) ).subcommand(
.subcommand(
SubCommand::with_name("airdrop") SubCommand::with_name("airdrop")
.about("Request a batch of tokens") .about("Request a batch of tokens")
.arg( .arg(
@ -111,8 +108,7 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.required(true) .required(true)
.help("The number of tokens to request"), .help("The number of tokens to request"),
), ),
) ).subcommand(
.subcommand(
SubCommand::with_name("pay") SubCommand::with_name("pay")
.about("Send a payment") .about("Send a payment")
.arg( .arg(
@ -122,16 +118,14 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.takes_value(true) .takes_value(true)
.required(true) .required(true)
.help("The number of tokens to send"), .help("The number of tokens to send"),
) ).arg(
.arg(
Arg::with_name("to") Arg::with_name("to")
.long("to") .long("to")
.value_name("PUBKEY") .value_name("PUBKEY")
.takes_value(true) .takes_value(true)
.help("The pubkey of recipient"), .help("The pubkey of recipient"),
), ),
) ).subcommand(
.subcommand(
SubCommand::with_name("confirm") SubCommand::with_name("confirm")
.about("Confirm your payment by signature") .about("Confirm your payment by signature")
.arg( .arg(
@ -141,8 +135,7 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.required(true) .required(true)
.help("The transaction signature to confirm"), .help("The transaction signature to confirm"),
), ),
) ).subcommand(SubCommand::with_name("balance").about("Get your balance"))
.subcommand(SubCommand::with_name("balance").about("Get your balance"))
.subcommand(SubCommand::with_name("address").about("Get your public key")) .subcommand(SubCommand::with_name("address").about("Get your public key"))
.get_matches(); .get_matches();

View File

@ -32,8 +32,7 @@ impl BlobFetchStage {
.into_iter() .into_iter()
.map(|socket| { .map(|socket| {
streamer::blob_receiver(socket, exit.clone(), recycler.clone(), sender.clone()) streamer::blob_receiver(socket, exit.clone(), recycler.clone(), sender.clone())
}) }).collect();
.collect();
(BlobFetchStage { exit, thread_hdls }, receiver) (BlobFetchStage { exit, thread_hdls }, receiver)
} }

View File

@ -2,20 +2,23 @@
//! //!
use counter::Counter; use counter::Counter;
use crdt::{Crdt, CrdtError, NodeInfo}; use crdt::{Crdt, CrdtError, NodeInfo};
use entry::Entry;
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
use erasure; use erasure;
use ledger::Block;
use log::Level; use log::Level;
use packet::BlobRecycler; use packet::{BlobRecycler, SharedBlobs};
use rayon::prelude::*;
use result::{Error, Result}; use result::{Error, Result};
use service::Service; use service::Service;
use std::mem; use std::mem;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::Duration; use std::time::{Duration, Instant};
use streamer::BlobReceiver; use timing::duration_as_ms;
use window::{self, SharedWindow, WindowIndex, WindowUtil, WINDOW_SIZE}; use window::{self, SharedWindow, WindowIndex, WindowUtil, WINDOW_SIZE};
fn broadcast( fn broadcast(
@ -23,27 +26,42 @@ fn broadcast(
broadcast_table: &[NodeInfo], broadcast_table: &[NodeInfo],
window: &SharedWindow, window: &SharedWindow,
recycler: &BlobRecycler, recycler: &BlobRecycler,
receiver: &BlobReceiver, receiver: &Receiver<Vec<Entry>>,
sock: &UdpSocket, sock: &UdpSocket,
transmit_index: &mut WindowIndex, transmit_index: &mut WindowIndex,
receive_index: &mut u64, receive_index: &mut u64,
) -> Result<()> { ) -> Result<()> {
let id = node_info.id; let id = node_info.id;
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let mut dq = receiver.recv_timeout(timer)?; let entries = receiver.recv_timeout(timer)?;
while let Ok(mut nq) = receiver.try_recv() { let mut num_entries = entries.len();
dq.append(&mut nq); let mut ventries = Vec::new();
ventries.push(entries);
while let Ok(entries) = receiver.try_recv() {
num_entries += entries.len();
ventries.push(entries);
} }
let to_blobs_start = Instant::now();
let dq: SharedBlobs = ventries
.into_par_iter()
.flat_map(|p| p.to_blobs(recycler))
.collect();
let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
// flatten deque to vec // flatten deque to vec
let blobs_vec: Vec<_> = dq.into_iter().collect(); let blobs_vec: Vec<_> = dq.into_iter().collect();
let blobs_chunking = Instant::now();
// We could receive more blobs than window slots so // We could receive more blobs than window slots so
// break them up into window-sized chunks to process // break them up into window-sized chunks to process
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec()); let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec());
let chunking_elapsed = duration_as_ms(&blobs_chunking.elapsed());
trace!("{}", window.read().unwrap().print(&id, *receive_index)); trace!("{}", window.read().unwrap().print(&id, *receive_index));
let broadcast_start = Instant::now();
for mut blobs in blobs_chunked { for mut blobs in blobs_chunked {
let blobs_len = blobs.len(); let blobs_len = blobs.len();
trace!("{}: broadcast blobs.len: {}", id, blobs_len); trace!("{}: broadcast blobs.len: {}", id, blobs_len);
@ -115,6 +133,13 @@ fn broadcast(
*receive_index, *receive_index,
)?; )?;
} }
let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed());
info!(
"broadcast: {} entries, blob time {} chunking time {} broadcast time {}",
num_entries, to_blobs_elapsed, chunking_elapsed, broadcast_elapsed
);
Ok(()) Ok(())
} }
@ -129,7 +154,7 @@ impl BroadcastStage {
window: &SharedWindow, window: &SharedWindow,
entry_height: u64, entry_height: u64,
recycler: &BlobRecycler, recycler: &BlobRecycler,
receiver: &BlobReceiver, receiver: &Receiver<Vec<Entry>>,
) { ) {
let mut transmit_index = WindowIndex { let mut transmit_index = WindowIndex {
data: entry_height, data: entry_height,
@ -177,14 +202,13 @@ impl BroadcastStage {
window: SharedWindow, window: SharedWindow,
entry_height: u64, entry_height: u64,
recycler: BlobRecycler, recycler: BlobRecycler,
receiver: BlobReceiver, receiver: Receiver<Vec<Entry>>,
) -> Self { ) -> Self {
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string()) .name("solana-broadcaster".to_string())
.spawn(move || { .spawn(move || {
Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver); Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver);
}) }).unwrap();
.unwrap();
BroadcastStage { thread_hdl } BroadcastStage { thread_hdl }
} }

View File

@ -86,8 +86,7 @@ impl Counter {
.add_field( .add_field(
"count", "count",
influxdb::Value::Integer(counts as i64 - lastlog as i64), influxdb::Value::Integer(counts as i64 - lastlog as i64),
) ).to_owned(),
.to_owned(),
); );
self.lastlog self.lastlog
.compare_and_swap(lastlog, counts, Ordering::Relaxed); .compare_and_swap(lastlog, counts, Ordering::Relaxed);

View File

@ -277,8 +277,7 @@ impl Crdt {
node.contact_info.rpu.to_string(), node.contact_info.rpu.to_string(),
node.contact_info.tpu.to_string() node.contact_info.tpu.to_string()
) )
}) }).collect();
.collect();
format!( format!(
" NodeInfo.contact_info | Node identifier\n\ " NodeInfo.contact_info | Node identifier\n\
@ -402,8 +401,7 @@ impl Crdt {
trace!("{} purge skipped {} {} {}", self.id, k, now - v, limit); trace!("{} purge skipped {} {} {}", self.id, k, now - v, limit);
None None
} }
}) }).collect();
.collect();
inc_new_counter_info!("crdt-purge-count", dead_ids.len()); inc_new_counter_info!("crdt-purge-count", dead_ids.len());
@ -450,8 +448,7 @@ impl Crdt {
trace!("{}:broadcast node {} {}", me.id, v.id, v.contact_info.tvu); trace!("{}:broadcast node {} {}", me.id, v.id, v.contact_info.tvu);
true true
} }
}) }).cloned()
.cloned()
.collect(); .collect();
cloned_table cloned_table
} }
@ -552,8 +549,7 @@ impl Crdt {
v.contact_info.tvu v.contact_info.tvu
); );
e e
}) }).collect();
.collect();
trace!("broadcast results {}", errs.len()); trace!("broadcast results {}", errs.len());
for e in errs { for e in errs {
@ -607,8 +603,7 @@ impl Crdt {
} else { } else {
true true
} }
}) }).collect();
.collect();
trace!("retransmit orders {}", orders.len()); trace!("retransmit orders {}", orders.len());
let errs: Vec<_> = orders let errs: Vec<_> = orders
.par_iter() .par_iter()
@ -623,8 +618,7 @@ impl Crdt {
//TODO profile this, may need multiple sockets for par_iter //TODO profile this, may need multiple sockets for par_iter
assert!(rblob.meta.size <= BLOB_SIZE); assert!(rblob.meta.size <= BLOB_SIZE);
s.send_to(&rblob.data[..rblob.meta.size], &v.contact_info.tvu) s.send_to(&rblob.data[..rblob.meta.size], &v.contact_info.tvu)
}) }).collect();
.collect();
for e in errs { for e in errs {
if let Err(e) = &e { if let Err(e) = &e {
inc_new_counter_info!("crdt-retransmit-send_to_error", 1, 1); inc_new_counter_info!("crdt-retransmit-send_to_error", 1, 1);
@ -666,8 +660,7 @@ impl Crdt {
r.id != Pubkey::default() r.id != Pubkey::default()
&& (Self::is_valid_address(&r.contact_info.tpu) && (Self::is_valid_address(&r.contact_info.tpu)
|| Self::is_valid_address(&r.contact_info.tvu)) || Self::is_valid_address(&r.contact_info.tvu))
}) }).map(|x| x.ledger_state.last_id)
.map(|x| x.ledger_state.last_id)
.collect() .collect()
} }
@ -702,8 +695,7 @@ impl Crdt {
v.id != self.id v.id != self.id
&& !v.contact_info.ncp.ip().is_unspecified() && !v.contact_info.ncp.ip().is_unspecified()
&& !v.contact_info.ncp.ip().is_multicast() && !v.contact_info.ncp.ip().is_multicast()
}) }).collect();
.collect();
let choose_peer_strategy = ChooseWeightedPeerStrategy::new( let choose_peer_strategy = ChooseWeightedPeerStrategy::new(
&self.remote, &self.remote,
@ -867,8 +859,7 @@ impl Crdt {
let time_left = GOSSIP_SLEEP_MILLIS - elapsed; let time_left = GOSSIP_SLEEP_MILLIS - elapsed;
sleep(Duration::from_millis(time_left)); sleep(Duration::from_millis(time_left));
} }
}) }).unwrap()
.unwrap()
} }
fn run_window_request( fn run_window_request(
from: &NodeInfo, from: &NodeInfo,
@ -1191,8 +1182,7 @@ impl Crdt {
me.table.len() me.table.len()
); );
} }
}) }).unwrap()
.unwrap()
} }
fn is_valid_ip(addr: IpAddr) -> bool { fn is_valid_ip(addr: IpAddr) -> bool {

View File

@ -134,12 +134,10 @@ impl Drone {
.add_field( .add_field(
"request_amount", "request_amount",
influxdb::Value::Integer(request_amount as i64), influxdb::Value::Integer(request_amount as i64),
) ).add_field(
.add_field(
"request_current", "request_current",
influxdb::Value::Integer(self.request_current as i64), influxdb::Value::Integer(self.request_current as i64),
) ).to_owned(),
.to_owned(),
); );
client.retry_transfer_signed(&tx, 10) client.retry_transfer_signed(&tx, 10)
} else { } else {

View File

@ -112,7 +112,8 @@ impl Entry {
id: Hash::default(), id: Hash::default(),
transactions, transactions,
has_more: false, has_more: false,
}).unwrap() <= BLOB_DATA_SIZE as u64 }).unwrap()
<= BLOB_DATA_SIZE as u64
} }
/// Creates the next Tick Entry `num_hashes` after `start_hash`. /// Creates the next Tick Entry `num_hashes` after `start_hash`.

View File

@ -327,8 +327,7 @@ pub fn generate_coding(
.map(|(i, l)| { .map(|(i, l)| {
trace!("{} i: {} data: {}", id, i, l.data[0]); trace!("{} i: {} data: {}", id, i, l.data[0]);
&l.data[..max_data_size] &l.data[..max_data_size]
}) }).collect();
.collect();
let mut coding_locks: Vec<_> = coding_blobs let mut coding_locks: Vec<_> = coding_blobs
.iter() .iter()
@ -341,8 +340,7 @@ pub fn generate_coding(
.map(|(i, l)| { .map(|(i, l)| {
trace!("{} i: {} coding: {}", id, i, l.data[0],); trace!("{} i: {} coding: {}", id, i, l.data[0],);
&mut l.data_mut()[..max_data_size] &mut l.data_mut()[..max_data_size]
}) }).collect();
.collect();
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
debug!( debug!(

View File

@ -33,8 +33,7 @@ impl FetchStage {
.into_iter() .into_iter()
.map(|socket| { .map(|socket| {
streamer::receiver(socket, exit.clone(), recycler.clone(), sender.clone()) streamer::receiver(socket, exit.clone(), recycler.clone(), sender.clone())
}) }).collect();
.collect();
(FetchStage { exit, thread_hdls }, receiver) (FetchStage { exit, thread_hdls }, receiver)
} }

View File

@ -243,7 +243,7 @@ impl Fullnode {
// TODO: To light up PoH, uncomment the following line: // TODO: To light up PoH, uncomment the following line:
//let tick_duration = Some(Duration::from_millis(1000)); //let tick_duration = Some(Duration::from_millis(1000));
let (tpu, blob_receiver) = Tpu::new( let (tpu, entry_receiver) = Tpu::new(
keypair, keypair,
&bank, &bank,
&crdt, &crdt,
@ -262,7 +262,7 @@ impl Fullnode {
shared_window, shared_window,
entry_height, entry_height,
blob_recycler.clone(), blob_recycler.clone(),
blob_receiver, entry_receiver,
); );
thread_hdls.extend(broadcast_stage.thread_hdls()); thread_hdls.extend(broadcast_stage.thread_hdls());
} }
@ -323,8 +323,7 @@ mod tests {
let bank = Bank::new(&alice); let bank = Bank::new(&alice);
let entry = tn.info.clone(); let entry = tn.info.clone();
Fullnode::new_with_bank(keypair, bank, 0, &[], tn, Some(&entry), None, false) Fullnode::new_with_bank(keypair, bank, 0, &[], tn, Some(&entry), None, false)
}) }).collect();
.collect();
//each validator can exit in parallel to speed many sequential calls to `join` //each validator can exit in parallel to speed many sequential calls to `join`
vals.iter().for_each(|v| v.exit()); vals.iter().for_each(|v| v.exit());
//while join is called sequentially, the above exit call notified all the //while join is called sequentially, the above exit call notified all the

View File

@ -598,8 +598,7 @@ mod tests {
)], )],
false, false,
) )
}) }).collect()
.collect()
} }
fn make_test_entries() -> Vec<Entry> { fn make_test_entries() -> Vec<Entry> {

View File

@ -342,12 +342,10 @@ mod test {
.add_field( .add_field(
"random_bool", "random_bool",
influxdb::Value::Boolean(random::<u8>() < 128), influxdb::Value::Boolean(random::<u8>() < 128),
) ).add_field(
.add_field(
"random_int", "random_int",
influxdb::Value::Integer(random::<u8>() as i64), influxdb::Value::Integer(random::<u8>() as i64),
) ).to_owned();
.to_owned();
agent.submit(point); agent.submit(point);
} }

View File

@ -441,6 +441,18 @@ impl Blob {
self.meta.size = new_size; self.meta.size = new_size;
self.set_data_size(new_size as u64).unwrap(); self.set_data_size(new_size as u64).unwrap();
} }
pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> {
let mut p = r.write().expect("'r' write lock in pub fn recv_from");
trace!("receiving on {}", socket.local_addr().unwrap());
let (nrecv, from) = socket.recv_from(&mut p.data)?;
p.meta.size = nrecv;
p.meta.set_addr(&from);
trace!("got {} bytes from {}", nrecv, from);
Ok(())
}
pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result<SharedBlobs> { pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result<SharedBlobs> {
let mut v = Vec::new(); let mut v = Vec::new();
//DOCUMENTED SIDE-EFFECT //DOCUMENTED SIDE-EFFECT
@ -452,29 +464,23 @@ impl Blob {
socket.set_nonblocking(false)?; socket.set_nonblocking(false)?;
for i in 0..NUM_BLOBS { for i in 0..NUM_BLOBS {
let r = re.allocate(); let r = re.allocate();
{
let mut p = r.write().expect("'r' write lock in pub fn recv_from"); match Blob::recv_blob(socket, &r) {
trace!("receiving on {}", socket.local_addr().unwrap());
match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => { Err(_) if i > 0 => {
trace!("got {:?} messages on {}", i, socket.local_addr().unwrap()); trace!("got {:?} messages on {}", i, socket.local_addr().unwrap());
re.recycle(r, "Bob::recv_from::i>0");
break; break;
} }
Err(e) => { Err(e) => {
if e.kind() != io::ErrorKind::WouldBlock { if e.kind() != io::ErrorKind::WouldBlock {
info!("recv_from err {:?}", e); info!("recv_from err {:?}", e);
} }
re.recycle(r, "Blob::recv_from::empty");
return Err(Error::IO(e)); return Err(Error::IO(e));
} }
Ok((nrecv, from)) => { Ok(()) => if i == 0 {
p.meta.size = nrecv;
p.meta.set_addr(&from);
trace!("got {} bytes from {}", nrecv, from);
if i == 0 {
socket.set_nonblocking(true)?; socket.set_nonblocking(true)?;
} },
}
}
} }
v.push(r); v.push(r);
} }

View File

@ -5,11 +5,12 @@
//! Transaction, the latest hash, and the number of hashes since the last transaction. //! Transaction, the latest hash, and the number of hashes since the last transaction.
//! The resulting stream of entries represents ordered transactions in time. //! The resulting stream of entries represents ordered transactions in time.
use bank::Bank;
use entry::Entry; use entry::Entry;
use hash::Hash;
use recorder::Recorder; use recorder::Recorder;
use service::Service; use service::Service;
use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError}; use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError};
use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use transaction::Transaction; use transaction::Transaction;
@ -27,20 +28,16 @@ pub struct RecordStage {
impl RecordStage { impl RecordStage {
/// A background thread that will continue tagging received Transaction messages and /// A background thread that will continue tagging received Transaction messages and
/// sending back Entry messages until either the receiver or sender channel is closed. /// sending back Entry messages until either the receiver or sender channel is closed.
pub fn new( pub fn new(signal_receiver: Receiver<Signal>, bank: Arc<Bank>) -> (Self, Receiver<Vec<Entry>>) {
signal_receiver: Receiver<Signal>,
start_hash: &Hash,
) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let start_hash = *start_hash; let start_hash = bank.last_id();
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solana-record-stage".to_string()) .name("solana-record-stage".to_string())
.spawn(move || { .spawn(move || {
let mut recorder = Recorder::new(start_hash); let mut recorder = Recorder::new(start_hash);
let _ = Self::process_signals(&mut recorder, &signal_receiver, &entry_sender); let _ = Self::process_signals(&mut recorder, &signal_receiver, bank, &entry_sender);
}) }).unwrap();
.unwrap();
(RecordStage { thread_hdl }, entry_receiver) (RecordStage { thread_hdl }, entry_receiver)
} }
@ -48,11 +45,11 @@ impl RecordStage {
/// Same as `RecordStage::new`, but will automatically produce entries every `tick_duration`. /// Same as `RecordStage::new`, but will automatically produce entries every `tick_duration`.
pub fn new_with_clock( pub fn new_with_clock(
signal_receiver: Receiver<Signal>, signal_receiver: Receiver<Signal>,
start_hash: &Hash, bank: Arc<Bank>,
tick_duration: Duration, tick_duration: Duration,
) -> (Self, Receiver<Vec<Entry>>) { ) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let start_hash = *start_hash; let start_hash = bank.last_id();
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solana-record-stage".to_string()) .name("solana-record-stage".to_string())
@ -65,6 +62,7 @@ impl RecordStage {
start_time, start_time,
tick_duration, tick_duration,
&signal_receiver, &signal_receiver,
bank.clone(),
&entry_sender, &entry_sender,
).is_err() ).is_err()
{ {
@ -72,14 +70,14 @@ impl RecordStage {
} }
recorder.hash(); recorder.hash();
} }
}) }).unwrap();
.unwrap();
(RecordStage { thread_hdl }, entry_receiver) (RecordStage { thread_hdl }, entry_receiver)
} }
fn process_signal( fn process_signal(
signal: Signal, signal: Signal,
bank: &Arc<Bank>,
recorder: &mut Recorder, recorder: &mut Recorder,
sender: &Sender<Vec<Entry>>, sender: &Sender<Vec<Entry>>,
) -> Result<(), ()> { ) -> Result<(), ()> {
@ -89,6 +87,13 @@ impl RecordStage {
vec![] vec![]
}; };
let entries = recorder.record(txs); let entries = recorder.record(txs);
for entry in entries.iter() {
if !entry.has_more {
bank.register_entry_id(&entry.id);
}
}
sender.send(entries).or(Err(()))?; sender.send(entries).or(Err(()))?;
Ok(()) Ok(())
} }
@ -96,11 +101,12 @@ impl RecordStage {
fn process_signals( fn process_signals(
recorder: &mut Recorder, recorder: &mut Recorder,
receiver: &Receiver<Signal>, receiver: &Receiver<Signal>,
bank: Arc<Bank>,
sender: &Sender<Vec<Entry>>, sender: &Sender<Vec<Entry>>,
) -> Result<(), ()> { ) -> Result<(), ()> {
loop { loop {
match receiver.recv() { match receiver.recv() {
Ok(signal) => Self::process_signal(signal, recorder, sender)?, Ok(signal) => Self::process_signal(signal, &bank, recorder, sender)?,
Err(RecvError) => return Err(()), Err(RecvError) => return Err(()),
} }
} }
@ -111,6 +117,7 @@ impl RecordStage {
start_time: Instant, start_time: Instant,
tick_duration: Duration, tick_duration: Duration,
receiver: &Receiver<Signal>, receiver: &Receiver<Signal>,
bank: Arc<Bank>,
sender: &Sender<Vec<Entry>>, sender: &Sender<Vec<Entry>>,
) -> Result<(), ()> { ) -> Result<(), ()> {
loop { loop {
@ -118,7 +125,7 @@ impl RecordStage {
sender.send(vec![entry]).or(Err(()))?; sender.send(vec![entry]).or(Err(()))?;
} }
match receiver.try_recv() { match receiver.try_recv() {
Ok(signal) => Self::process_signal(signal, recorder, sender)?, Ok(signal) => Self::process_signal(signal, &bank, recorder, sender)?,
Err(TryRecvError::Empty) => return Ok(()), Err(TryRecvError::Empty) => return Ok(()),
Err(TryRecvError::Disconnected) => return Err(()), Err(TryRecvError::Disconnected) => return Err(()),
}; };
@ -139,16 +146,21 @@ impl Service for RecordStage {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use bank::Bank;
use ledger::Block; use ledger::Block;
use mint::Mint;
use signature::{Keypair, KeypairUtil}; use signature::{Keypair, KeypairUtil};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread::sleep; use std::thread::sleep;
#[test] #[test]
fn test_historian() { fn test_historian() {
let (tx_sender, tx_receiver) = channel(); let (tx_sender, tx_receiver) = channel();
let zero = Hash::default(); let mint = Mint::new(1234);
let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, &zero); let bank = Arc::new(Bank::new(&mint));
let zero = bank.last_id();
let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, bank);
tx_sender.send(Signal::Tick).unwrap(); tx_sender.send(Signal::Tick).unwrap();
sleep(Duration::new(0, 1_000_000)); sleep(Duration::new(0, 1_000_000));
@ -173,8 +185,9 @@ mod tests {
#[test] #[test]
fn test_historian_closed_sender() { fn test_historian_closed_sender() {
let (tx_sender, tx_receiver) = channel(); let (tx_sender, tx_receiver) = channel();
let zero = Hash::default(); let mint = Mint::new(1234);
let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, &zero); let bank = Arc::new(Bank::new(&mint));
let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, bank);
drop(entry_receiver); drop(entry_receiver);
tx_sender.send(Signal::Tick).unwrap(); tx_sender.send(Signal::Tick).unwrap();
assert_eq!(record_stage.thread_hdl.join().unwrap(), ()); assert_eq!(record_stage.thread_hdl.join().unwrap(), ());
@ -183,8 +196,10 @@ mod tests {
#[test] #[test]
fn test_transactions() { fn test_transactions() {
let (tx_sender, signal_receiver) = channel(); let (tx_sender, signal_receiver) = channel();
let zero = Hash::default(); let mint = Mint::new(1234);
let (_record_stage, entry_receiver) = RecordStage::new(signal_receiver, &zero); let bank = Arc::new(Bank::new(&mint));
let zero = bank.last_id();
let (_record_stage, entry_receiver) = RecordStage::new(signal_receiver, bank);
let alice_keypair = Keypair::new(); let alice_keypair = Keypair::new();
let bob_pubkey = Keypair::new().pubkey(); let bob_pubkey = Keypair::new().pubkey();
let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero); let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero);
@ -200,9 +215,11 @@ mod tests {
#[test] #[test]
fn test_clock() { fn test_clock() {
let (tx_sender, tx_receiver) = channel(); let (tx_sender, tx_receiver) = channel();
let zero = Hash::default(); let mint = Mint::new(1234);
let bank = Arc::new(Bank::new(&mint));
let zero = bank.last_id();
let (_record_stage, entry_receiver) = let (_record_stage, entry_receiver) =
RecordStage::new_with_clock(tx_receiver, &zero, Duration::from_millis(20)); RecordStage::new_with_clock(tx_receiver, bank, Duration::from_millis(20));
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
tx_sender.send(Signal::Tick).unwrap(); tx_sender.send(Signal::Tick).unwrap();
drop(tx_sender); drop(tx_sender);

View File

@ -110,8 +110,7 @@ impl ReplicateStage {
_ => error!("{:?}", e), _ => error!("{:?}", e),
} }
} }
}) }).unwrap();
.unwrap();
let mut thread_hdls = vec![t_responder, t_replicate]; let mut thread_hdls = vec![t_responder, t_replicate];
thread_hdls.extend(vote_stage.thread_hdls()); thread_hdls.extend(vote_stage.thread_hdls());

View File

@ -28,8 +28,7 @@ impl RequestStage {
deserialize(&x.data[0..x.meta.size]) deserialize(&x.data[0..x.meta.size])
.map(|req| (req, x.meta.addr())) .map(|req| (req, x.meta.addr()))
.ok() .ok()
}) }).collect()
.collect()
} }
pub fn process_request_packets( pub fn process_request_packets(
@ -103,8 +102,7 @@ impl RequestStage {
_ => error!("{:?}", e), _ => error!("{:?}", e),
} }
} }
}) }).unwrap();
.unwrap();
( (
RequestStage { RequestStage {
thread_hdl, thread_hdl,

View File

@ -69,8 +69,7 @@ fn retransmitter(
} }
} }
trace!("exiting retransmitter"); trace!("exiting retransmitter");
}) }).unwrap()
.unwrap()
} }
pub struct RetransmitStage { pub struct RetransmitStage {

View File

@ -96,8 +96,7 @@ pub fn ed25519_verify_cpu(batches: &[SharedPackets]) -> Vec<Vec<u8>> {
.par_iter() .par_iter()
.map(verify_packet) .map(verify_packet)
.collect() .collect()
}) }).collect();
.collect();
inc_new_counter_info!("ed25519_verify_cpu", count); inc_new_counter_info!("ed25519_verify_cpu", count);
rv rv
} }
@ -115,8 +114,7 @@ pub fn ed25519_verify_disabled(batches: &[SharedPackets]) -> Vec<Vec<u8>> {
.par_iter() .par_iter()
.map(verify_packet_disabled) .map(verify_packet_disabled)
.collect() .collect()
}) }).collect();
.collect();
inc_new_counter_info!("ed25519_verify_disabled", count); inc_new_counter_info!("ed25519_verify_disabled", count);
rv rv
} }

View File

@ -89,8 +89,7 @@ impl SigVerifyStage {
.add_field( .add_field(
"total_time_ms", "total_time_ms",
influxdb::Value::Integer(total_time_ms as i64), influxdb::Value::Integer(total_time_ms as i64),
) ).to_owned(),
.to_owned(),
); );
Ok(()) Ok(())

View File

@ -58,8 +58,7 @@ pub fn receiver(
.spawn(move || { .spawn(move || {
let _ = recv_loop(&sock, &exit, &recycler, &packet_sender); let _ = recv_loop(&sock, &exit, &recycler, &packet_sender);
() ()
}) }).unwrap()
.unwrap()
} }
fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> { fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> {
@ -104,8 +103,7 @@ pub fn responder(
_ => warn!("{} responder error: {:?}", name, e), _ => warn!("{} responder error: {:?}", name, e),
} }
} }
}) }).unwrap()
.unwrap()
} }
//TODO, we would need to stick block authentication before we create the //TODO, we would need to stick block authentication before we create the
@ -137,8 +135,7 @@ pub fn blob_receiver(
break; break;
} }
let _ = recv_blobs(&recycler, &sock, &s); let _ = recv_blobs(&recycler, &sock, &s);
}) }).unwrap()
.unwrap()
} }
#[cfg(test)] #[cfg(test)]

View File

@ -163,8 +163,7 @@ impl ThinClient {
.add_field( .add_field(
"duration_ms", "duration_ms",
influxdb::Value::Integer(timing::duration_as_ms(&now.elapsed()) as i64), influxdb::Value::Integer(timing::duration_as_ms(&now.elapsed()) as i64),
) ).to_owned(),
.to_owned(),
); );
result result
} }
@ -285,8 +284,7 @@ impl ThinClient {
.add_field( .add_field(
"duration_ms", "duration_ms",
influxdb::Value::Integer(timing::duration_as_ms(elapsed) as i64), influxdb::Value::Integer(timing::duration_as_ms(elapsed) as i64),
) ).to_owned(),
.to_owned(),
); );
} }
@ -359,8 +357,7 @@ impl ThinClient {
.add_field( .add_field(
"duration_ms", "duration_ms",
influxdb::Value::Integer(timing::duration_as_ms(&now.elapsed()) as i64), influxdb::Value::Integer(timing::duration_as_ms(&now.elapsed()) as i64),
) ).to_owned(),
.to_owned(),
); );
self.signature_status self.signature_status
} }

View File

@ -28,6 +28,7 @@
use bank::Bank; use bank::Bank;
use banking_stage::BankingStage; use banking_stage::BankingStage;
use crdt::Crdt; use crdt::Crdt;
use entry::Entry;
use fetch_stage::FetchStage; use fetch_stage::FetchStage;
use packet::{BlobRecycler, PacketRecycler}; use packet::{BlobRecycler, PacketRecycler};
use record_stage::RecordStage; use record_stage::RecordStage;
@ -36,10 +37,10 @@ use signature::Keypair;
use sigverify_stage::SigVerifyStage; use sigverify_stage::SigVerifyStage;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer::BlobReceiver;
use write_stage::WriteStage; use write_stage::WriteStage;
pub struct Tpu { pub struct Tpu {
@ -61,7 +62,7 @@ impl Tpu {
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
ledger_path: &str, ledger_path: &str,
sigverify_disabled: bool, sigverify_disabled: bool,
) -> (Self, BlobReceiver) { ) -> (Self, Receiver<Vec<Entry>>) {
let packet_recycler = PacketRecycler::default(); let packet_recycler = PacketRecycler::default();
let (fetch_stage, packet_receiver) = let (fetch_stage, packet_receiver) =
@ -75,12 +76,12 @@ impl Tpu {
let (record_stage, entry_receiver) = match tick_duration { let (record_stage, entry_receiver) = match tick_duration {
Some(tick_duration) => { Some(tick_duration) => {
RecordStage::new_with_clock(signal_receiver, &bank.last_id(), tick_duration) RecordStage::new_with_clock(signal_receiver, bank.clone(), tick_duration)
} }
None => RecordStage::new(signal_receiver, &bank.last_id()), None => RecordStage::new(signal_receiver, bank.clone()),
}; };
let (write_stage, blob_receiver) = WriteStage::new( let (write_stage, entry_forwarder) = WriteStage::new(
keypair, keypair,
bank.clone(), bank.clone(),
crdt.clone(), crdt.clone(),
@ -96,7 +97,7 @@ impl Tpu {
record_stage, record_stage,
write_stage, write_stage,
}; };
(tpu, blob_receiver) (tpu, entry_forwarder)
} }
pub fn close(self) -> thread::Result<()> { pub fn close(self) -> thread::Result<()> {

View File

@ -83,8 +83,7 @@ fn get_last_id_to_vote_on(
.add_field( .add_field(
"valid_peers", "valid_peers",
influxdb::Value::Integer(valid_ids.len() as i64), influxdb::Value::Integer(valid_ids.len() as i64),
) ).to_owned(),
.to_owned(),
); );
if valid_ids.len() > super_majority_index { if valid_ids.len() > super_majority_index {
@ -103,8 +102,7 @@ fn get_last_id_to_vote_on(
.add_field( .add_field(
"duration_ms", "duration_ms",
influxdb::Value::Integer((now - *last_valid_validator_timestamp) as i64), influxdb::Value::Integer((now - *last_valid_validator_timestamp) as i64),
) ).to_owned(),
.to_owned(),
); );
} }
@ -408,8 +406,7 @@ pub mod tests {
// sleep to get a different timestamp in the bank // sleep to get a different timestamp in the bank
sleep(Duration::from_millis(1)); sleep(Duration::from_millis(1));
last_id last_id
}) }).collect();
.collect();
// see that we fail to have 2/3rds consensus // see that we fail to have 2/3rds consensus
assert!( assert!(

View File

@ -90,8 +90,7 @@ impl WindowUtil for Window {
} }
self[i].clear_data(recycler); self[i].clear_data(recycler);
Some(pix) Some(pix)
}) }).collect()
.collect()
} }
fn repair( fn repair(
@ -140,8 +139,7 @@ impl WindowUtil for Window {
} else { } else {
" " " "
} }
}) }).collect();
.collect();
let buf: Vec<_> = self let buf: Vec<_> = self
.iter() .iter()
@ -157,8 +155,7 @@ impl WindowUtil for Window {
// data.is_none() // data.is_none()
"c" "c"
} }
}) }).collect();
.collect();
format!( format!(
"\n{}: WINDOW ({}): {}\n{}: WINDOW ({}): {}", "\n{}: WINDOW ({}): {}\n{}: WINDOW ({}): {}",
id, id,

View File

@ -61,7 +61,8 @@ fn add_block_to_retransmit_queue(
leader_id leader_id
); );
if p.get_id() if p.get_id()
.expect("get_id in fn add_block_to_retransmit_queue") == leader_id .expect("get_id in fn add_block_to_retransmit_queue")
== leader_id
{ {
//TODO //TODO
//need to copy the retransmitted blob //need to copy the retransmitted blob
@ -293,8 +294,7 @@ pub fn window_service(
}); });
} }
} }
}) }).unwrap()
.unwrap()
} }
#[cfg(test)] #[cfg(test)]
@ -561,8 +561,7 @@ mod test {
let rv = repair_backoff(&mut last, &mut times, 1) as usize; let rv = repair_backoff(&mut last, &mut times, 1) as usize;
assert_eq!(times, x + 2); assert_eq!(times, x + 2);
rv rv
}) }).sum();
.sum();
assert_eq!(times, 128); assert_eq!(times, 128);
assert_eq!(last, 1); assert_eq!(last, 1);
repair_backoff(&mut last, &mut times, 1); repair_backoff(&mut last, &mut times, 1);
@ -571,8 +570,7 @@ mod test {
assert_eq!(times, 2); assert_eq!(times, 2);
assert_eq!(last, 2); assert_eq!(last, 2);
total total
}) }).sum();
.sum();
let avg = res / num_tests; let avg = res / num_tests;
assert!(avg >= 3); assert!(avg >= 3);
assert!(avg <= 5); assert!(avg <= 5);

View File

@ -14,11 +14,12 @@ use service::Service;
use signature::Keypair; use signature::Keypair;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::Duration; use std::time::{Duration, Instant};
use streamer::{responder, BlobReceiver, BlobSender}; use streamer::responder;
use timing::{duration_as_ms, duration_as_s};
use vote_stage::send_leader_vote; use vote_stage::send_leader_vote;
pub struct WriteStage { pub struct WriteStage {
@ -27,41 +28,72 @@ pub struct WriteStage {
impl WriteStage { impl WriteStage {
/// Process any Entry items that have been published by the RecordStage. /// Process any Entry items that have been published by the RecordStage.
/// continuosly broadcast blobs of entries out /// continuosly send entries out
pub fn write_and_send_entries( pub fn write_and_send_entries(
crdt: &Arc<RwLock<Crdt>>, crdt: &Arc<RwLock<Crdt>>,
bank: &Arc<Bank>,
ledger_writer: &mut LedgerWriter, ledger_writer: &mut LedgerWriter,
blob_sender: &BlobSender, entry_sender: &Sender<Vec<Entry>>,
blob_recycler: &BlobRecycler,
entry_receiver: &Receiver<Vec<Entry>>, entry_receiver: &Receiver<Vec<Entry>>,
) -> Result<()> { ) -> Result<()> {
let mut ventries = Vec::new();
let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
let mut num_entries = entries.len();
let mut num_txs = 0;
ventries.push(entries);
while let Ok(more) = entry_receiver.try_recv() {
num_entries += more.len();
ventries.push(more);
}
info!("write_stage entries: {}", num_entries);
let to_blobs_total = 0;
let mut blob_send_total = 0;
let mut register_entry_total = 0;
let mut crdt_votes_total = 0;
let start = Instant::now();
for _ in 0..ventries.len() {
let entries = ventries.pop().unwrap();
for e in entries.iter() {
num_txs += e.transactions.len();
}
let crdt_votes_start = Instant::now();
let votes = &entries.votes(); let votes = &entries.votes();
crdt.write().unwrap().insert_votes(&votes); crdt.write().unwrap().insert_votes(&votes);
crdt_votes_total += duration_as_ms(&crdt_votes_start.elapsed());
ledger_writer.write_entries(entries.clone())?; ledger_writer.write_entries(entries.clone())?;
for entry in &entries { let register_entry_start = Instant::now();
if !entry.has_more { register_entry_total += duration_as_ms(&register_entry_start.elapsed());
bank.register_entry_id(&entry.id);
} inc_new_counter_info!("write_stage-write_entries", entries.len());
}
//TODO(anatoly): real stake based voting needs to change this //TODO(anatoly): real stake based voting needs to change this
//leader simply votes if the current set of validators have voted //leader simply votes if the current set of validators have voted
//on a valid last id //on a valid last id
trace!("New blobs? {}", entries.len()); trace!("New entries? {}", entries.len());
let blobs = entries.to_blobs(blob_recycler); let blob_send_start = Instant::now();
if !entries.is_empty() {
if !blobs.is_empty() {
inc_new_counter_info!("write_stage-recv_vote", votes.len()); inc_new_counter_info!("write_stage-recv_vote", votes.len());
inc_new_counter_info!("write_stage-broadcast_blobs", blobs.len()); inc_new_counter_info!("write_stage-broadcast_entries", entries.len());
trace!("broadcasting {}", blobs.len()); trace!("broadcasting {}", entries.len());
blob_sender.send(blobs)?; entry_sender.send(entries)?;
} }
blob_send_total += duration_as_ms(&blob_send_start.elapsed());
}
info!("done write_stage txs: {} time {} ms txs/s: {} to_blobs_total: {} register_entry_total: {} blob_send_total: {} crdt_votes_total: {}",
num_txs, duration_as_ms(&start.elapsed()),
num_txs as f32 / duration_as_s(&start.elapsed()),
to_blobs_total,
register_entry_total,
blob_send_total,
crdt_votes_total);
Ok(()) Ok(())
} }
@ -73,7 +105,7 @@ impl WriteStage {
blob_recycler: BlobRecycler, blob_recycler: BlobRecycler,
ledger_path: &str, ledger_path: &str,
entry_receiver: Receiver<Vec<Entry>>, entry_receiver: Receiver<Vec<Entry>>,
) -> (Self, BlobReceiver) { ) -> (Self, Receiver<Vec<Entry>>) {
let (vote_blob_sender, vote_blob_receiver) = channel(); let (vote_blob_sender, vote_blob_receiver) = channel();
let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
let t_responder = responder( let t_responder = responder(
@ -82,7 +114,7 @@ impl WriteStage {
blob_recycler.clone(), blob_recycler.clone(),
vote_blob_receiver, vote_blob_receiver,
); );
let (blob_sender, blob_receiver) = channel(); let (entry_sender, entry_receiver_forward) = channel();
let mut ledger_writer = LedgerWriter::recover(ledger_path).unwrap(); let mut ledger_writer = LedgerWriter::recover(ledger_path).unwrap();
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
@ -94,10 +126,8 @@ impl WriteStage {
loop { loop {
if let Err(e) = Self::write_and_send_entries( if let Err(e) = Self::write_and_send_entries(
&crdt, &crdt,
&bank,
&mut ledger_writer, &mut ledger_writer,
&blob_sender, &entry_sender,
&blob_recycler,
&entry_receiver, &entry_receiver,
) { ) {
match e { match e {
@ -126,11 +156,10 @@ impl WriteStage {
error!("{:?}", e); error!("{:?}", e);
} }
} }
}) }).unwrap();
.unwrap();
let thread_hdls = vec![t_responder, thread_hdl]; let thread_hdls = vec![t_responder, thread_hdl];
(WriteStage { thread_hdls }, blob_receiver) (WriteStage { thread_hdls }, entry_receiver_forward)
} }
} }

View File

@ -176,8 +176,7 @@ pub fn crdt_retransmit() -> result::Result<()> {
s.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); s.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let res = s.recv_from(&mut b.data); let res = s.recv_from(&mut b.data);
res.is_err() //true if failed to receive the retransmit packet res.is_err() //true if failed to receive the retransmit packet
}) }).collect();
.collect();
//true if failed receive the retransmit packet, r2, and r3 should succeed //true if failed receive the retransmit packet, r2, and r3 should succeed
//r1 was the sender, so it should fail to receive the packet //r1 was the sender, so it should fail to receive the packet
assert_eq!(res, [true, false, false]); assert_eq!(res, [true, false, false]);

View File

@ -593,10 +593,8 @@ fn test_multi_node_dynamic_network() {
assert_eq!(bal, Some(500)); assert_eq!(bal, Some(500));
info!("sent balance to[{}/{}] {}", n, num_nodes, keypair.pubkey()); info!("sent balance to[{}/{}] {}", n, num_nodes, keypair.pubkey());
keypair keypair
}) }).unwrap()
.unwrap() }).collect();
})
.collect();
info!("Waiting for keypairs to be created"); info!("Waiting for keypairs to be created");
let keypairs: Vec<_> = t1.into_iter().map(|t| t.join().unwrap()).collect(); let keypairs: Vec<_> = t1.into_iter().map(|t| t.join().unwrap()).collect();
@ -622,10 +620,8 @@ fn test_multi_node_dynamic_network() {
true, true,
); );
(rd, val) (rd, val)
}) }).unwrap()
.unwrap() }).collect();
})
.collect();
let mut validators: Vec<_> = t2.into_iter().map(|t| t.join().unwrap()).collect(); let mut validators: Vec<_> = t2.into_iter().map(|t| t.join().unwrap()).collect();
@ -645,8 +641,7 @@ fn test_multi_node_dynamic_network() {
&alice_arc.read().unwrap().keypair(), &alice_arc.read().unwrap().keypair(),
bob_pubkey, bob_pubkey,
&last_id, &last_id,
) ).unwrap();
.unwrap();
expected_balance += 500; expected_balance += 500;