Compare commits

...

7 Commits

Author SHA1 Message Date
d81eaf69db Update fetch-perf-libs.sh 2018-09-17 11:54:45 -07:00
b5935a3830 cargo fmt 2018-09-14 20:30:04 -07:00
c1b07d0f21 Upgrade rust stable to 1.29 2018-09-14 20:30:04 -07:00
a1579b5a47 Remove large-network test, it's ignored anyway 2018-09-14 20:11:46 -07:00
77949a4be6 cherry pick readme update 2018-09-13 19:19:48 -07:00
af58940964 Fix missing recycle in recv_from (#1205)
In the error case that i>0 (we have blobs to send)
we break out of the loop and do not push the allocated r
to the v array. We should recycle this blob, otherwise it
will be dropped.
2018-09-13 10:27:24 -07:00
21963b8c82 fix "leak" in Blob::recv_from (#1198)
* fix "leak" in Blob::recv_from

fixes #1199
2018-09-13 10:27:24 -07:00
45 changed files with 166 additions and 251 deletions

View File

@ -62,7 +62,7 @@ your odds of success if you check out the
before proceeding:
```bash
$ git checkout v0.7.2
$ git checkout v0.8.0
```
Configuration Setup
@ -113,7 +113,7 @@ To run a multinode testnet, after starting a leader node, spin up some validator
separate shells:
```bash
$ ./multinode-demo/validator.sh ubuntu@10.0.1.51:~/solana 10.0.1.51
$ ./multinode-demo/validator.sh
```
To run a performance-enhanced leader or validator (on Linux),
@ -123,22 +123,20 @@ your system:
```bash
$ ./fetch-perf-libs.sh
$ SOLANA_CUDA=1 ./multinode-demo/leader.sh
$ SOLANA_CUDA=1 ./multinode-demo/validator.sh ubuntu@10.0.1.51:~/solana 10.0.1.51
$ SOLANA_CUDA=1 ./multinode-demo/validator.sh
```
Testnet Client Demo
---
Now that your singlenode or multinode testnet is up and running let's send it some transactions! Note that we pass in
the expected number of nodes in the network. If running singlenode, pass 1; if multinode, pass the number
of validators you started.
Now that your singlenode or multinode testnet is up and running let's send it
some transactions!
In a separate shell start the client:
```bash
$ ./multinode-demo/client.sh ubuntu@10.0.1.51:~/solana 1
$ ./multinode-demo/client.sh # runs against localhost by default
```
What just happened? The client demo spins up several threads to send 500,000 transactions
@ -155,7 +153,7 @@ Public Testnet
In this example the client connects to our public testnet. To run validators on the testnet you would need to open udp ports `8000-10000`.
```bash
$ ./multinode-demo/client.sh testnet.solana.com 1 #The minumum number of nodes to discover on the network
$ ./multinode-demo/client.sh --network $(dig +short testnet.solana.com):8001 --identity config-private/client-id.json --duration 60
```
You can observe the effects of your client's transactions on our [dashboard](https://metrics.solana.com:3000/d/testnet/testnet-hud?orgId=2&from=now-30m&to=now&refresh=5s&var-testnet=testnet)

View File

@ -37,8 +37,7 @@ fn bench_process_transaction(bencher: &mut Bencher) {
// Finally, return the transaction to the benchmark.
tx
})
.collect();
}).collect();
bencher.iter(|| {
// Since benchmarker runs this multiple times, we need to clear the signatures.

View File

@ -116,8 +116,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
i as i64,
mint.last_id(),
)
})
.collect();
}).collect();
let (verified_sender, verified_receiver) = channel();
let (signal_sender, signal_receiver) = channel();
@ -131,8 +130,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
mint_total / num_src_accounts as i64,
mint.last_id(),
)
})
.collect();
}).collect();
bencher.iter(move || {
let bank = Arc::new(Bank::new(&mint));
@ -143,8 +141,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
.map(|x| {
let len = (*x).read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
}).collect();
verified_sender.send(verified_setup).unwrap();
BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler)
@ -157,8 +154,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
.map(|x| {
let len = (*x).read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
}).collect();
verified_sender.send(verified).unwrap();
BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler)
@ -187,8 +183,7 @@ fn bench_banking_stage_single_from(bencher: &mut Bencher) {
i as i64,
mint.last_id(),
)
})
.collect();
}).collect();
let (verified_sender, verified_receiver) = channel();
let (signal_sender, signal_receiver) = channel();
@ -201,8 +196,7 @@ fn bench_banking_stage_single_from(bencher: &mut Bencher) {
.map(|x| {
let len = (*x).read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
}).collect();
verified_sender.send(verified).unwrap();
BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler)
.unwrap();

View File

@ -1,5 +1,5 @@
steps:
- command: "ci/docker-run.sh solanalabs/rust:1.28.0 ci/test-stable.sh"
- command: "ci/docker-run.sh solanalabs/rust:1.29.0 ci/test-stable.sh"
name: "stable [public]"
env:
CARGO_TARGET_CACHE_NAME: "stable"
@ -24,13 +24,6 @@ steps:
timeout_in_minutes: 20
agents:
- "queue=cuda"
- command: "ci/test-large-network.sh || true"
name: "large-network [public] [ignored]"
env:
CARGO_TARGET_CACHE_NAME: "stable"
timeout_in_minutes: 20
agents:
- "queue=large"
- command: "ci/pr-snap.sh"
timeout_in_minutes: 20
name: "snap [public]"

View File

@ -1,6 +1,6 @@
# Note: when the rust version (1.28) is changed also modify
# ci/buildkite.yml to pick up the new image tag
FROM rust:1.28
FROM rust:1.29.0
RUN set -x && \
apt update && \

View File

@ -23,8 +23,8 @@ nightly)
require cargo 1.29.[0-9]+-nightly
;;
stable)
require rustc 1.28.[0-9]+
require cargo 1.28.[0-9]+
require rustc 1.29.[0-9]+
require cargo 1.29.[0-9]+
;;
*)
echo Error: unknown argument: "$1"

View File

@ -15,7 +15,7 @@ mkdir -p target/perf-libs
cd target/perf-libs
(
set -x
curl https://solana-perf.s3.amazonaws.com/master/x86_64-unknown-linux-gnu/solana-perf.tgz | tar zxvf -
curl https://solana-perf.s3.amazonaws.com/v0.8.0/x86_64-unknown-linux-gnu/solana-perf.tgz | tar zxvf -
)
if [[ -r /usr/local/cuda/version.txt && -r cuda-version.txt ]]; then

View File

@ -18,4 +18,8 @@ usage() {
exit 1
}
$solana_bench_tps "$@"
if [[ -z $1 ]]; then # default behavior
$solana_bench_tps --identity config-private/client-id.json --network 127.0.0.1:8001 --duration 90
else
$solana_bench_tps "$@"
fi

View File

@ -34,6 +34,7 @@ ip_address_arg=-l
num_tokens=1000000000
node_type_leader=true
node_type_validator=true
node_type_client=true
while getopts "h?n:lpt:" opt; do
case $opt in
h|\?)
@ -55,10 +56,17 @@ while getopts "h?n:lpt:" opt; do
leader)
node_type_leader=true
node_type_validator=false
node_type_client=false
;;
validator)
node_type_leader=false
node_type_validator=true
node_type_client=false
;;
client)
node_type_leader=false
node_type_validator=false
node_type_client=true
;;
*)
usage "Error: unknown node type: $node_type"
@ -74,13 +82,19 @@ done
set -e
if $node_type_leader; then
for i in "$SOLANA_CONFIG_DIR" "$SOLANA_CONFIG_PRIVATE_DIR"; do
echo "Cleaning $i"
rm -rvf "$i"
mkdir -p "$i"
done
for i in "$SOLANA_CONFIG_DIR" "$SOLANA_CONFIG_VALIDATOR_DIR" "$SOLANA_CONFIG_PRIVATE_DIR"; do
echo "Cleaning $i"
rm -rvf "$i"
mkdir -p "$i"
done
if $node_type_client; then
client_id_path="$SOLANA_CONFIG_PRIVATE_DIR"/client-id.json
$solana_keygen -o "$client_id_path"
ls -lhR "$SOLANA_CONFIG_PRIVATE_DIR"/
fi
if $node_type_leader; then
leader_address_args=("$ip_address_arg")
leader_id_path="$SOLANA_CONFIG_PRIVATE_DIR"/leader-id.json
mint_path="$SOLANA_CONFIG_PRIVATE_DIR"/mint.json
@ -102,11 +116,6 @@ fi
if $node_type_validator; then
echo "Cleaning $SOLANA_CONFIG_VALIDATOR_DIR"
rm -rvf "$SOLANA_CONFIG_VALIDATOR_DIR"
mkdir -p "$SOLANA_CONFIG_VALIDATOR_DIR"
validator_address_args=("$ip_address_arg" -b 9000)
validator_id_path="$SOLANA_CONFIG_PRIVATE_DIR"/validator-id.json

View File

@ -449,8 +449,7 @@ impl Bank {
.map(|(acc, tx)| match acc {
Err(e) => Err(e.clone()),
Ok(ref mut accounts) => Self::execute_transaction(tx, accounts),
})
.collect();
}).collect();
let execution_elapsed = now.elapsed();
let now = Instant::now();
Self::store_accounts(&res, &loaded_accounts, &mut accounts);
@ -1010,8 +1009,7 @@ mod tests {
let last_id = hash(&serialize(&i).unwrap()); // Unique hash
bank.register_entry_id(&last_id);
last_id
})
.collect();
}).collect();
assert_eq!(bank.count_valid_ids(&[]).len(), 0);
assert_eq!(bank.count_valid_ids(&[mint.last_id()]).len(), 0);
for (i, id) in bank.count_valid_ids(&ids).iter().enumerate() {

View File

@ -52,8 +52,7 @@ impl BankingStage {
_ => error!("{:?}", e),
}
}
})
.unwrap();
}).unwrap();
(BankingStage { thread_hdl }, signal_receiver)
}
@ -66,8 +65,7 @@ impl BankingStage {
deserialize(&x.data[0..x.meta.size])
.map(|req| (req, x.meta.addr()))
.ok()
})
.collect()
}).collect()
}
/// Process the incoming packets and send output `Signal` messages to `signal_sender`.
@ -105,8 +103,7 @@ impl BankingStage {
} else {
None
},
})
.collect();
}).collect();
debug!("process_transactions");
let results = bank.process_transactions(transactions);

View File

@ -68,8 +68,7 @@ fn main() -> Result<()> {
.value_name("NUM")
.takes_value(true)
.help("Use NUM receive sockets"),
)
.get_matches();
).get_matches();
if let Some(n) = matches.value_of("num-recv-sockets") {
num_sockets = max(num_sockets, n.to_string().parse().expect("integer"));

View File

@ -135,8 +135,7 @@ fn send_barrier_transaction(barrier_client: &mut ThinClient, last_id: &mut Hash,
.add_tag(
"op",
influxdb::Value::String("send_barrier_transaction".to_string()),
)
.add_field("poll_count", influxdb::Value::Integer(poll_count))
).add_field("poll_count", influxdb::Value::Integer(poll_count))
.add_field("duration", influxdb::Value::Integer(duration_ms as i64))
.to_owned(),
);
@ -147,8 +146,7 @@ fn send_barrier_transaction(barrier_client: &mut ThinClient, last_id: &mut Hash,
&id.pubkey(),
&Duration::from_millis(100),
&Duration::from_secs(10),
)
.expect("Failed to get balance");
).expect("Failed to get balance");
if balance != 1 {
panic!("Expected an account balance of 1 (balance: {}", balance);
}
@ -195,8 +193,7 @@ fn generate_txs(
} else {
Transaction::new(keypair, id.pubkey(), 1, *last_id)
}
})
.collect();
}).collect();
let duration = signing_start.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
@ -214,8 +211,7 @@ fn generate_txs(
.add_field(
"duration",
influxdb::Value::Integer(duration_as_ms(&duration) as i64),
)
.to_owned(),
).to_owned(),
);
let sz = transactions.len() / threads;
@ -267,8 +263,7 @@ fn do_tx_transfers(
.add_field(
"duration",
influxdb::Value::Integer(duration_as_ms(&transfer_start.elapsed()) as i64),
)
.add_field("count", influxdb::Value::Integer(tx_len as i64))
).add_field("count", influxdb::Value::Integer(tx_len as i64))
.to_owned(),
);
}
@ -578,10 +573,8 @@ fn main() {
.name("solana-client-sample".to_string())
.spawn(move || {
sample_tx_count(&exit_signal, &maxes, first_tx_count, &v, sample_period);
})
.unwrap()
})
.collect();
}).unwrap()
}).collect();
let shared_txs: Arc<RwLock<VecDeque<Vec<Transaction>>>> =
Arc::new(RwLock::new(VecDeque::new()));
@ -606,10 +599,8 @@ fn main() {
&shared_tx_active_thread_count,
&total_tx_sent_count,
);
})
.unwrap()
})
.collect();
}).unwrap()
}).collect();
// generate and send transactions for the specified duration
let start = Instant::now();

View File

@ -48,8 +48,7 @@ fn main() -> Result<(), Box<error::Error>> {
.takes_value(true)
.required(true)
.help("rendezvous with the network at this gossip entry point"),
)
.arg(
).arg(
Arg::with_name("keypair")
.short("k")
.long("keypair")
@ -57,22 +56,19 @@ fn main() -> Result<(), Box<error::Error>> {
.takes_value(true)
.required(true)
.help("File to read the client's keypair from"),
)
.arg(
).arg(
Arg::with_name("slice")
.long("slice")
.value_name("SECONDS")
.takes_value(true)
.help("Time slice over which to limit requests to drone"),
)
.arg(
).arg(
Arg::with_name("cap")
.long("cap")
.value_name("NUMBER")
.takes_value(true)
.help("Request limit for time slice"),
)
.get_matches();
).get_matches();
let network = matches
.value_of("network")
@ -159,8 +155,7 @@ fn main() -> Result<(), Box<error::Error>> {
io::ErrorKind::Other,
format!("Drone response: {:?}", err),
))
}))
.then(|_| Ok(()));
})).then(|_| Ok(()));
tokio::spawn(server)
});
tokio::run(done);

View File

@ -21,31 +21,27 @@ fn main() {
.long("local")
.takes_value(false)
.help("detect network address from local machine configuration"),
)
.arg(
).arg(
Arg::with_name("keypair")
.short("k")
.long("keypair")
.value_name("PATH")
.takes_value(true)
.help("/path/to/id.json"),
)
.arg(
).arg(
Arg::with_name("public")
.short("p")
.long("public")
.takes_value(false)
.help("detect public network address using public servers"),
)
.arg(
).arg(
Arg::with_name("bind")
.short("b")
.long("bind")
.value_name("PORT")
.takes_value(true)
.help("bind to port or address"),
)
.get_matches();
).get_matches();
let bind_addr: SocketAddr = {
let mut bind_addr = parse_port_or_addr(matches.value_of("bind"), FULLNODE_PORT_RANGE.0);

View File

@ -36,16 +36,14 @@ fn main() -> () {
.value_name("FILE")
.takes_value(true)
.help("run with the identity found in FILE"),
)
.arg(
).arg(
Arg::with_name("network")
.short("n")
.long("network")
.value_name("HOST:PORT")
.takes_value(true)
.help("connect/rendezvous with the network at this gossip entry point"),
)
.arg(
).arg(
Arg::with_name("ledger")
.short("l")
.long("ledger")
@ -53,8 +51,7 @@ fn main() -> () {
.takes_value(true)
.required(true)
.help("use DIR as persistent ledger location"),
)
.get_matches();
).get_matches();
let (keypair, ncp) = if let Some(i) = matches.value_of("identity") {
let path = i.to_string();

View File

@ -25,8 +25,7 @@ fn main() -> Result<(), Box<error::Error>> {
.takes_value(true)
.required(true)
.help("Number of tokens with which to initialize mint"),
)
.arg(
).arg(
Arg::with_name("ledger")
.short("l")
.long("ledger")
@ -34,8 +33,7 @@ fn main() -> Result<(), Box<error::Error>> {
.takes_value(true)
.required(true)
.help("use DIR as persistent ledger location"),
)
.get_matches();
).get_matches();
let tokens = value_t_or_exit!(matches, "tokens", i64);
let ledger_path = matches.value_of("ledger").unwrap();

View File

@ -22,8 +22,7 @@ fn main() -> Result<(), Box<error::Error>> {
.value_name("PATH")
.takes_value(true)
.help("path to generated file"),
)
.get_matches();
).get_matches();
let rnd = SystemRandom::new();
let pkcs8_bytes = Ed25519KeyPair::generate_pkcs8(&rnd)?;

View File

@ -84,23 +84,20 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.value_name("PATH")
.takes_value(true)
.help("/path/to/leader.json"),
)
.arg(
).arg(
Arg::with_name("keypair")
.short("k")
.long("keypair")
.value_name("PATH")
.takes_value(true)
.help("/path/to/id.json"),
)
.arg(
).arg(
Arg::with_name("timeout")
.long("timeout")
.value_name("SECONDS")
.takes_value(true)
.help("Max SECONDS to wait to get necessary gossip from the network"),
)
.subcommand(
).subcommand(
SubCommand::with_name("airdrop")
.about("Request a batch of tokens")
.arg(
@ -111,8 +108,7 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.required(true)
.help("The number of tokens to request"),
),
)
.subcommand(
).subcommand(
SubCommand::with_name("pay")
.about("Send a payment")
.arg(
@ -122,16 +118,14 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.takes_value(true)
.required(true)
.help("The number of tokens to send"),
)
.arg(
).arg(
Arg::with_name("to")
.long("to")
.value_name("PUBKEY")
.takes_value(true)
.help("The pubkey of recipient"),
),
)
.subcommand(
).subcommand(
SubCommand::with_name("confirm")
.about("Confirm your payment by signature")
.arg(
@ -141,8 +135,7 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.required(true)
.help("The transaction signature to confirm"),
),
)
.subcommand(SubCommand::with_name("balance").about("Get your balance"))
).subcommand(SubCommand::with_name("balance").about("Get your balance"))
.subcommand(SubCommand::with_name("address").about("Get your public key"))
.get_matches();

View File

@ -32,8 +32,7 @@ impl BlobFetchStage {
.into_iter()
.map(|socket| {
streamer::blob_receiver(socket, exit.clone(), recycler.clone(), sender.clone())
})
.collect();
}).collect();
(BlobFetchStage { exit, thread_hdls }, receiver)
}

View File

@ -183,8 +183,7 @@ impl BroadcastStage {
.name("solana-broadcaster".to_string())
.spawn(move || {
Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver);
})
.unwrap();
}).unwrap();
BroadcastStage { thread_hdl }
}

View File

@ -86,8 +86,7 @@ impl Counter {
.add_field(
"count",
influxdb::Value::Integer(counts as i64 - lastlog as i64),
)
.to_owned(),
).to_owned(),
);
self.lastlog
.compare_and_swap(lastlog, counts, Ordering::Relaxed);

View File

@ -277,8 +277,7 @@ impl Crdt {
node.contact_info.rpu.to_string(),
node.contact_info.tpu.to_string()
)
})
.collect();
}).collect();
format!(
" NodeInfo.contact_info | Node identifier\n\
@ -402,8 +401,7 @@ impl Crdt {
trace!("{} purge skipped {} {} {}", self.id, k, now - v, limit);
None
}
})
.collect();
}).collect();
inc_new_counter_info!("crdt-purge-count", dead_ids.len());
@ -450,8 +448,7 @@ impl Crdt {
trace!("{}:broadcast node {} {}", me.id, v.id, v.contact_info.tvu);
true
}
})
.cloned()
}).cloned()
.collect();
cloned_table
}
@ -552,8 +549,7 @@ impl Crdt {
v.contact_info.tvu
);
e
})
.collect();
}).collect();
trace!("broadcast results {}", errs.len());
for e in errs {
@ -607,8 +603,7 @@ impl Crdt {
} else {
true
}
})
.collect();
}).collect();
trace!("retransmit orders {}", orders.len());
let errs: Vec<_> = orders
.par_iter()
@ -623,8 +618,7 @@ impl Crdt {
//TODO profile this, may need multiple sockets for par_iter
assert!(rblob.meta.size <= BLOB_SIZE);
s.send_to(&rblob.data[..rblob.meta.size], &v.contact_info.tvu)
})
.collect();
}).collect();
for e in errs {
if let Err(e) = &e {
inc_new_counter_info!("crdt-retransmit-send_to_error", 1, 1);
@ -666,8 +660,7 @@ impl Crdt {
r.id != Pubkey::default()
&& (Self::is_valid_address(&r.contact_info.tpu)
|| Self::is_valid_address(&r.contact_info.tvu))
})
.map(|x| x.ledger_state.last_id)
}).map(|x| x.ledger_state.last_id)
.collect()
}
@ -702,8 +695,7 @@ impl Crdt {
v.id != self.id
&& !v.contact_info.ncp.ip().is_unspecified()
&& !v.contact_info.ncp.ip().is_multicast()
})
.collect();
}).collect();
let choose_peer_strategy = ChooseWeightedPeerStrategy::new(
&self.remote,
@ -867,8 +859,7 @@ impl Crdt {
let time_left = GOSSIP_SLEEP_MILLIS - elapsed;
sleep(Duration::from_millis(time_left));
}
})
.unwrap()
}).unwrap()
}
fn run_window_request(
from: &NodeInfo,
@ -1191,8 +1182,7 @@ impl Crdt {
me.table.len()
);
}
})
.unwrap()
}).unwrap()
}
fn is_valid_ip(addr: IpAddr) -> bool {

View File

@ -134,12 +134,10 @@ impl Drone {
.add_field(
"request_amount",
influxdb::Value::Integer(request_amount as i64),
)
.add_field(
).add_field(
"request_current",
influxdb::Value::Integer(self.request_current as i64),
)
.to_owned(),
).to_owned(),
);
client.retry_transfer_signed(&tx, 10)
} else {

View File

@ -112,7 +112,8 @@ impl Entry {
id: Hash::default(),
transactions,
has_more: false,
}).unwrap() <= BLOB_DATA_SIZE as u64
}).unwrap()
<= BLOB_DATA_SIZE as u64
}
/// Creates the next Tick Entry `num_hashes` after `start_hash`.

View File

@ -327,8 +327,7 @@ pub fn generate_coding(
.map(|(i, l)| {
trace!("{} i: {} data: {}", id, i, l.data[0]);
&l.data[..max_data_size]
})
.collect();
}).collect();
let mut coding_locks: Vec<_> = coding_blobs
.iter()
@ -341,8 +340,7 @@ pub fn generate_coding(
.map(|(i, l)| {
trace!("{} i: {} coding: {}", id, i, l.data[0],);
&mut l.data_mut()[..max_data_size]
})
.collect();
}).collect();
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
debug!(

View File

@ -33,8 +33,7 @@ impl FetchStage {
.into_iter()
.map(|socket| {
streamer::receiver(socket, exit.clone(), recycler.clone(), sender.clone())
})
.collect();
}).collect();
(FetchStage { exit, thread_hdls }, receiver)
}

View File

@ -323,8 +323,7 @@ mod tests {
let bank = Bank::new(&alice);
let entry = tn.info.clone();
Fullnode::new_with_bank(keypair, bank, 0, &[], tn, Some(&entry), None, false)
})
.collect();
}).collect();
//each validator can exit in parallel to speed many sequential calls to `join`
vals.iter().for_each(|v| v.exit());
//while join is called sequentially, the above exit call notified all the

View File

@ -598,8 +598,7 @@ mod tests {
)],
false,
)
})
.collect()
}).collect()
}
fn make_test_entries() -> Vec<Entry> {

View File

@ -342,12 +342,10 @@ mod test {
.add_field(
"random_bool",
influxdb::Value::Boolean(random::<u8>() < 128),
)
.add_field(
).add_field(
"random_int",
influxdb::Value::Integer(random::<u8>() as i64),
)
.to_owned();
).to_owned();
agent.submit(point);
}

View File

@ -441,6 +441,18 @@ impl Blob {
self.meta.size = new_size;
self.set_data_size(new_size as u64).unwrap();
}
pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> {
let mut p = r.write().expect("'r' write lock in pub fn recv_from");
trace!("receiving on {}", socket.local_addr().unwrap());
let (nrecv, from) = socket.recv_from(&mut p.data)?;
p.meta.size = nrecv;
p.meta.set_addr(&from);
trace!("got {} bytes from {}", nrecv, from);
Ok(())
}
pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result<SharedBlobs> {
let mut v = Vec::new();
//DOCUMENTED SIDE-EFFECT
@ -452,29 +464,23 @@ impl Blob {
socket.set_nonblocking(false)?;
for i in 0..NUM_BLOBS {
let r = re.allocate();
{
let mut p = r.write().expect("'r' write lock in pub fn recv_from");
trace!("receiving on {}", socket.local_addr().unwrap());
match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => {
trace!("got {:?} messages on {}", i, socket.local_addr().unwrap());
break;
}
Err(e) => {
if e.kind() != io::ErrorKind::WouldBlock {
info!("recv_from err {:?}", e);
}
return Err(Error::IO(e));
}
Ok((nrecv, from)) => {
p.meta.size = nrecv;
p.meta.set_addr(&from);
trace!("got {} bytes from {}", nrecv, from);
if i == 0 {
socket.set_nonblocking(true)?;
}
}
match Blob::recv_blob(socket, &r) {
Err(_) if i > 0 => {
trace!("got {:?} messages on {}", i, socket.local_addr().unwrap());
re.recycle(r, "Bob::recv_from::i>0");
break;
}
Err(e) => {
if e.kind() != io::ErrorKind::WouldBlock {
info!("recv_from err {:?}", e);
}
re.recycle(r, "Blob::recv_from::empty");
return Err(Error::IO(e));
}
Ok(()) => if i == 0 {
socket.set_nonblocking(true)?;
},
}
v.push(r);
}

View File

@ -39,8 +39,7 @@ impl RecordStage {
.spawn(move || {
let mut recorder = Recorder::new(start_hash);
let _ = Self::process_signals(&mut recorder, &signal_receiver, &entry_sender);
})
.unwrap();
}).unwrap();
(RecordStage { thread_hdl }, entry_receiver)
}
@ -72,8 +71,7 @@ impl RecordStage {
}
recorder.hash();
}
})
.unwrap();
}).unwrap();
(RecordStage { thread_hdl }, entry_receiver)
}

View File

@ -110,8 +110,7 @@ impl ReplicateStage {
_ => error!("{:?}", e),
}
}
})
.unwrap();
}).unwrap();
let mut thread_hdls = vec![t_responder, t_replicate];
thread_hdls.extend(vote_stage.thread_hdls());

View File

@ -28,8 +28,7 @@ impl RequestStage {
deserialize(&x.data[0..x.meta.size])
.map(|req| (req, x.meta.addr()))
.ok()
})
.collect()
}).collect()
}
pub fn process_request_packets(
@ -103,8 +102,7 @@ impl RequestStage {
_ => error!("{:?}", e),
}
}
})
.unwrap();
}).unwrap();
(
RequestStage {
thread_hdl,

View File

@ -69,8 +69,7 @@ fn retransmitter(
}
}
trace!("exiting retransmitter");
})
.unwrap()
}).unwrap()
}
pub struct RetransmitStage {

View File

@ -96,8 +96,7 @@ pub fn ed25519_verify_cpu(batches: &[SharedPackets]) -> Vec<Vec<u8>> {
.par_iter()
.map(verify_packet)
.collect()
})
.collect();
}).collect();
inc_new_counter_info!("ed25519_verify_cpu", count);
rv
}
@ -115,8 +114,7 @@ pub fn ed25519_verify_disabled(batches: &[SharedPackets]) -> Vec<Vec<u8>> {
.par_iter()
.map(verify_packet_disabled)
.collect()
})
.collect();
}).collect();
inc_new_counter_info!("ed25519_verify_disabled", count);
rv
}

View File

@ -89,8 +89,7 @@ impl SigVerifyStage {
.add_field(
"total_time_ms",
influxdb::Value::Integer(total_time_ms as i64),
)
.to_owned(),
).to_owned(),
);
Ok(())

View File

@ -58,8 +58,7 @@ pub fn receiver(
.spawn(move || {
let _ = recv_loop(&sock, &exit, &recycler, &packet_sender);
()
})
.unwrap()
}).unwrap()
}
fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> {
@ -104,8 +103,7 @@ pub fn responder(
_ => warn!("{} responder error: {:?}", name, e),
}
}
})
.unwrap()
}).unwrap()
}
//TODO, we would need to stick block authentication before we create the
@ -137,8 +135,7 @@ pub fn blob_receiver(
break;
}
let _ = recv_blobs(&recycler, &sock, &s);
})
.unwrap()
}).unwrap()
}
#[cfg(test)]

View File

@ -163,8 +163,7 @@ impl ThinClient {
.add_field(
"duration_ms",
influxdb::Value::Integer(timing::duration_as_ms(&now.elapsed()) as i64),
)
.to_owned(),
).to_owned(),
);
result
}
@ -285,8 +284,7 @@ impl ThinClient {
.add_field(
"duration_ms",
influxdb::Value::Integer(timing::duration_as_ms(elapsed) as i64),
)
.to_owned(),
).to_owned(),
);
}
@ -359,8 +357,7 @@ impl ThinClient {
.add_field(
"duration_ms",
influxdb::Value::Integer(timing::duration_as_ms(&now.elapsed()) as i64),
)
.to_owned(),
).to_owned(),
);
self.signature_status
}

View File

@ -83,8 +83,7 @@ fn get_last_id_to_vote_on(
.add_field(
"valid_peers",
influxdb::Value::Integer(valid_ids.len() as i64),
)
.to_owned(),
).to_owned(),
);
if valid_ids.len() > super_majority_index {
@ -103,8 +102,7 @@ fn get_last_id_to_vote_on(
.add_field(
"duration_ms",
influxdb::Value::Integer((now - *last_valid_validator_timestamp) as i64),
)
.to_owned(),
).to_owned(),
);
}
@ -408,8 +406,7 @@ pub mod tests {
// sleep to get a different timestamp in the bank
sleep(Duration::from_millis(1));
last_id
})
.collect();
}).collect();
// see that we fail to have 2/3rds consensus
assert!(

View File

@ -90,8 +90,7 @@ impl WindowUtil for Window {
}
self[i].clear_data(recycler);
Some(pix)
})
.collect()
}).collect()
}
fn repair(
@ -140,8 +139,7 @@ impl WindowUtil for Window {
} else {
" "
}
})
.collect();
}).collect();
let buf: Vec<_> = self
.iter()
@ -157,8 +155,7 @@ impl WindowUtil for Window {
// data.is_none()
"c"
}
})
.collect();
}).collect();
format!(
"\n{}: WINDOW ({}): {}\n{}: WINDOW ({}): {}",
id,

View File

@ -61,7 +61,8 @@ fn add_block_to_retransmit_queue(
leader_id
);
if p.get_id()
.expect("get_id in fn add_block_to_retransmit_queue") == leader_id
.expect("get_id in fn add_block_to_retransmit_queue")
== leader_id
{
//TODO
//need to copy the retransmitted blob
@ -293,8 +294,7 @@ pub fn window_service(
});
}
}
})
.unwrap()
}).unwrap()
}
#[cfg(test)]
@ -561,8 +561,7 @@ mod test {
let rv = repair_backoff(&mut last, &mut times, 1) as usize;
assert_eq!(times, x + 2);
rv
})
.sum();
}).sum();
assert_eq!(times, 128);
assert_eq!(last, 1);
repair_backoff(&mut last, &mut times, 1);
@ -571,8 +570,7 @@ mod test {
assert_eq!(times, 2);
assert_eq!(last, 2);
total
})
.sum();
}).sum();
let avg = res / num_tests;
assert!(avg >= 3);
assert!(avg <= 5);

View File

@ -126,8 +126,7 @@ impl WriteStage {
error!("{:?}", e);
}
}
})
.unwrap();
}).unwrap();
let thread_hdls = vec![t_responder, thread_hdl];
(WriteStage { thread_hdls }, blob_receiver)

View File

@ -176,8 +176,7 @@ pub fn crdt_retransmit() -> result::Result<()> {
s.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let res = s.recv_from(&mut b.data);
res.is_err() //true if failed to receive the retransmit packet
})
.collect();
}).collect();
//true if failed receive the retransmit packet, r2, and r3 should succeed
//r1 was the sender, so it should fail to receive the packet
assert_eq!(res, [true, false, false]);

View File

@ -593,10 +593,8 @@ fn test_multi_node_dynamic_network() {
assert_eq!(bal, Some(500));
info!("sent balance to[{}/{}] {}", n, num_nodes, keypair.pubkey());
keypair
})
.unwrap()
})
.collect();
}).unwrap()
}).collect();
info!("Waiting for keypairs to be created");
let keypairs: Vec<_> = t1.into_iter().map(|t| t.join().unwrap()).collect();
@ -622,10 +620,8 @@ fn test_multi_node_dynamic_network() {
true,
);
(rd, val)
})
.unwrap()
})
.collect();
}).unwrap()
}).collect();
let mut validators: Vec<_> = t2.into_iter().map(|t| t.join().unwrap()).collect();
@ -645,8 +641,7 @@ fn test_multi_node_dynamic_network() {
&alice_arc.read().unwrap().keypair(),
bob_pubkey,
&last_id,
)
.unwrap();
).unwrap();
expected_balance += 500;