logs
poll both endpoints in client logs logs logs names verify plan not sig log set udp buffer to max drop output more verbose about window requests log the leader load leader identity readme for single node demo update asserts update replay all rsync dynamic file read in testnode fix cleanup readme sum fix scripts cleanup cleanup readme
This commit is contained in:
committed by
Greg Fitzgerald
parent
6e35f54738
commit
fe93bba457
60
README.md
60
README.md
@ -47,46 +47,52 @@ used later in this demo.
|
|||||||
$ cat mint-demo.json | cargo run --release --bin solana-genesis-demo > genesis.log
|
$ cat mint-demo.json | cargo run --release --bin solana-genesis-demo > genesis.log
|
||||||
```
|
```
|
||||||
|
|
||||||
Now you can start the server:
|
Before you start the server, make sure you know the IP address of the machine ou want to be the leader for the demo, and make sure that udp ports 8000-10000 are open on all the machines you wan to test with. Now you can start the server:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
$ cat genesis.log | cargo run --release --bin solana-fullnode > transactions0.log
|
$ cat ./multinode-demo/leader.sh
|
||||||
|
#!/bin/bash
|
||||||
|
export RUST_LOG=solana=info
|
||||||
|
sudo sysctl -w net.core.rmem_max=26214400
|
||||||
|
cat genesis.log leader.log | cargo run --release --features cuda --bin solana-testnode -- -s leader.json -l leader.json -b 8000 -d 2>&1 | tee leader-tee.log
|
||||||
|
$ ./multinode-demo/leader.sh
|
||||||
```
|
```
|
||||||
|
|
||||||
Wait a few seconds for the server to initialize. It will print "Ready." when it's safe
|
Wait a few seconds for the server to initialize. It will print "Ready." when it's safe
|
||||||
to start sending it transactions.
|
to start sending it transactions.
|
||||||
|
|
||||||
|
Now you can start some validators:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
$ cat ./multinode-demo/validator.sh
|
||||||
|
#!/bin/bash
|
||||||
|
rsync -v -e ssh $1:~/solana/mint-demo.json .
|
||||||
|
rsync -v -e ssh $1:~/solana/leader.json .
|
||||||
|
rsync -v -e ssh $1:~/solana/genesis.log .
|
||||||
|
rsync -v -e ssh $1:~/solana/leader.log .
|
||||||
|
rsync -v -e ssh $1:~/solana/libcuda_verify_ed25519.a .
|
||||||
|
export RUST_LOG=solana=info
|
||||||
|
sudo sysctl -w net.core.rmem_max=26214400
|
||||||
|
cat genesis.log leader.log | cargo run --release --features cuda --bin solana-testnode -- -l validator.json -s validator.json -v leader.json -b 9000 -d 2>&1 | tee validator-tee.log
|
||||||
|
$ ./multinode-demo/validator.sh ubuntu@10.0.1.51 #The leader machine
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
Then, in a separate shell, let's execute some transactions. Note we pass in
|
Then, in a separate shell, let's execute some transactions. Note we pass in
|
||||||
the JSON configuration file here, not the genesis ledger.
|
the JSON configuration file here, not the genesis ledger.
|
||||||
|
>>>>>>> logs
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
$ cat mint-demo.json | cargo run --release --bin solana-client-demo
|
$ cat ./multinode-demo/client.sh
|
||||||
|
#!/bin/bash
|
||||||
|
export RUST_LOG=solana=info
|
||||||
|
rsync -v -e ssh $1:~/solana/leader.json .
|
||||||
|
rsync -v -e ssh $1:~/solana/mint-demo.json .
|
||||||
|
cat mint-demo.json | cargo run --release --bin solana-full-node -- -l leader.json -c 8100 -n 1
|
||||||
|
$ ./multinode-demo/client.sh ubuntu@10.0.1.51 #The leader machine
|
||||||
```
|
```
|
||||||
|
|
||||||
Now kill the server with Ctrl-C, and take a look at the ledger. You should
|
Try starting a more validators and reruning the client demo!
|
||||||
see something similar to:
|
|
||||||
|
|
||||||
```json
|
|
||||||
{"num_hashes":27,"id":[0, "..."],"event":"Tick"}
|
|
||||||
{"num_hashes":3,"id":[67, "..."],"event":{"Transaction":{"tokens":42}}}
|
|
||||||
{"num_hashes":27,"id":[0, "..."],"event":"Tick"}
|
|
||||||
```
|
|
||||||
|
|
||||||
Now restart the server from where we left off. Pass it both the genesis ledger, and
|
|
||||||
the transaction ledger.
|
|
||||||
|
|
||||||
```bash
|
|
||||||
$ cat genesis.log transactions0.log | cargo run --release --bin solana-fullnode > transactions1.log
|
|
||||||
```
|
|
||||||
|
|
||||||
Lastly, run the client demo again, and verify that all funds were spent in the
|
|
||||||
previous round, and so no additional transactions are added.
|
|
||||||
|
|
||||||
```bash
|
|
||||||
$ cat mint-demo.json | cargo run --release --bin solana-client-demo
|
|
||||||
```
|
|
||||||
|
|
||||||
Stop the server again, and verify there are only Tick entries, and no Transaction entries.
|
|
||||||
|
|
||||||
Developing
|
Developing
|
||||||
===
|
===
|
||||||
|
@ -1,7 +1,5 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
cd /home/ubuntu/solana
|
export RUST_LOG=solana=info
|
||||||
#git pull
|
rsync -v -e ssh $1:~/solana/leader.json .
|
||||||
export RUST_LOG=solana::crdt=trace
|
rsync -v -e ssh $1:~/solana/mint-demo.json .
|
||||||
# scp ubuntu@18.206.1.146:~/solana/leader.json .
|
cat mint-demo.json | cargo run --release --bin solana-client-demo -- -l leader.json -c 8100 -n 1
|
||||||
# scp ubuntu@18.206.1.146:~/solana/mint-demo.json .
|
|
||||||
cat mint-demo.json | cargo run --release --bin solana-multinode-demo -- -l leader.json -c 10.0.5.179:8100 -n 3
|
|
||||||
|
@ -1,6 +1,4 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
cd /home/ubuntu/solana
|
|
||||||
git pull
|
|
||||||
export RUST_LOG=solana=info
|
export RUST_LOG=solana=info
|
||||||
cat genesis.log | cargo run --release --features cuda --bin solana-testnode -- -s leader.json -b 8000 -d | grep INFO
|
sudo sysctl -w net.core.rmem_max=26214400
|
||||||
#cat genesis.log | cargo run --release --bin solana-testnode -- -s leader.json -b 8000 -d
|
cat genesis.log leader.log | cargo run --release --features cuda --bin solana-testnode -- -s leader.json -l leader.json -b 8000 -d 2>&1 | tee leader-tee.log
|
||||||
|
@ -1,10 +1,9 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
cd /home/ubuntu/solana
|
rsync -v -e ssh $1:~/solana/mint-demo.json .
|
||||||
git pull
|
rsync -v -e ssh $1:~/solana/leader.json .
|
||||||
scp ubuntu@18.206.1.146:~/solana/mint-demo.json .
|
rsync -v -e ssh $1:~/solana/genesis.log .
|
||||||
scp ubuntu@18.206.1.146:~/solana/leader.json .
|
rsync -v -e ssh $1:~/solana/leader.log .
|
||||||
scp ubuntu@18.206.1.146:~/solana/genesis.log .
|
rsync -v -e ssh $1:~/solana/libcuda_verify_ed25519.a .
|
||||||
scp ubuntu@18.206.1.146:~/solana/libcuda_verify_ed25519.a .
|
|
||||||
export RUST_LOG=solana=info
|
export RUST_LOG=solana=info
|
||||||
cat genesis.log | cargo run --release --features cuda --bin solana-testnode -- -s replicator.json -v leader.json -b 9000 -d 2>&1 | tee validator.log
|
sudo sysctl -w net.core.rmem_max=26214400
|
||||||
|
cat genesis.log leader.log | cargo run --release --features cuda --bin solana-fullnode -- -l validator.json -s validator.json -v leader.json -b 9000 -d 2>&1 | tee validator-tee.log
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
extern crate futures;
|
extern crate futures;
|
||||||
extern crate getopts;
|
extern crate getopts;
|
||||||
extern crate isatty;
|
extern crate isatty;
|
||||||
|
extern crate pnet;
|
||||||
extern crate rayon;
|
extern crate rayon;
|
||||||
extern crate serde_json;
|
extern crate serde_json;
|
||||||
extern crate solana;
|
extern crate solana;
|
||||||
@ -8,6 +9,7 @@ extern crate solana;
|
|||||||
use futures::Future;
|
use futures::Future;
|
||||||
use getopts::Options;
|
use getopts::Options;
|
||||||
use isatty::stdin_isatty;
|
use isatty::stdin_isatty;
|
||||||
|
use pnet::datalink;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use solana::crdt::{Crdt, ReplicatedData};
|
use solana::crdt::{Crdt, ReplicatedData};
|
||||||
use solana::mint::MintDemo;
|
use solana::mint::MintDemo;
|
||||||
@ -18,7 +20,7 @@ use solana::transaction::Transaction;
|
|||||||
use std::env;
|
use std::env;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{stdin, Read};
|
use std::io::{stdin, Read};
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::{IpAddr, SocketAddr, UdpSocket};
|
||||||
use std::process::exit;
|
use std::process::exit;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
@ -36,6 +38,17 @@ fn print_usage(program: &str, opts: Options) {
|
|||||||
print!("{}", opts.usage(&brief));
|
print!("{}", opts.usage(&brief));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_ip_addr() -> Option<IpAddr> {
|
||||||
|
for iface in datalink::interfaces() {
|
||||||
|
for p in iface.ips {
|
||||||
|
if !p.ip().is_loopback() && !p.ip().is_multicast() {
|
||||||
|
return Some(p.ip());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let mut threads = 4usize;
|
let mut threads = 4usize;
|
||||||
let mut num_nodes = 10usize;
|
let mut num_nodes = 10usize;
|
||||||
@ -43,7 +56,7 @@ fn main() {
|
|||||||
|
|
||||||
let mut opts = Options::new();
|
let mut opts = Options::new();
|
||||||
opts.optopt("l", "", "leader", "leader.json");
|
opts.optopt("l", "", "leader", "leader.json");
|
||||||
opts.optopt("c", "", "client address", "host:port");
|
opts.optopt("c", "", "client port", "port");
|
||||||
opts.optopt("t", "", "number of threads", &format!("{}", threads));
|
opts.optopt("t", "", "number of threads", &format!("{}", threads));
|
||||||
opts.optopt(
|
opts.optopt(
|
||||||
"n",
|
"n",
|
||||||
@ -69,12 +82,13 @@ fn main() {
|
|||||||
if matches.opt_present("l") {
|
if matches.opt_present("l") {
|
||||||
leader = matches.opt_str("l").unwrap();
|
leader = matches.opt_str("l").unwrap();
|
||||||
}
|
}
|
||||||
let client_addr: Arc<RwLock<SocketAddr>> = if matches.opt_present("c") {
|
let mut addr: SocketAddr = "127.0.0.1:8010".parse().unwrap();
|
||||||
let addr = matches.opt_str("c").unwrap().parse().unwrap();
|
if matches.opt_present("c") {
|
||||||
Arc::new(RwLock::new(addr))
|
let port = matches.opt_str("c").unwrap().parse().unwrap();
|
||||||
} else {
|
addr.set_port(port);
|
||||||
Arc::new(RwLock::new("127.0.0.1:8010".parse().unwrap()))
|
}
|
||||||
};
|
addr.set_ip(get_ip_addr().unwrap());
|
||||||
|
let client_addr: Arc<RwLock<SocketAddr>> = Arc::new(RwLock::new(addr));
|
||||||
if matches.opt_present("t") {
|
if matches.opt_present("t") {
|
||||||
threads = matches.opt_str("t").unwrap().parse().expect("integer");
|
threads = matches.opt_str("t").unwrap().parse().expect("integer");
|
||||||
}
|
}
|
||||||
@ -230,7 +244,6 @@ fn converge(
|
|||||||
let mut spy_crdt = Crdt::new(spy);
|
let mut spy_crdt = Crdt::new(spy);
|
||||||
spy_crdt.insert(&leader);
|
spy_crdt.insert(&leader);
|
||||||
spy_crdt.set_leader(leader.id);
|
spy_crdt.set_leader(leader.id);
|
||||||
|
|
||||||
let spy_ref = Arc::new(RwLock::new(spy_crdt));
|
let spy_ref = Arc::new(RwLock::new(spy_crdt));
|
||||||
let spy_window = default_window();
|
let spy_window = default_window();
|
||||||
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone());
|
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone());
|
||||||
|
@ -16,12 +16,12 @@ use solana::signature::{KeyPair, KeyPairUtil};
|
|||||||
use solana::transaction::Instruction;
|
use solana::transaction::Instruction;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{stdin, stdout, Read};
|
use std::io::{stdin, Read};
|
||||||
use std::net::{IpAddr, SocketAddr, UdpSocket};
|
use std::net::{IpAddr, SocketAddr, UdpSocket};
|
||||||
use std::process::exit;
|
use std::process::exit;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::AtomicBool;
|
use std::sync::atomic::AtomicBool;
|
||||||
use std::time::Duration;
|
//use std::time::Duration;
|
||||||
|
|
||||||
fn print_usage(program: &str, opts: Options) {
|
fn print_usage(program: &str, opts: Options) {
|
||||||
let mut brief = format!("Usage: cat <transaction.log> | {} [options]\n\n", program);
|
let mut brief = format!("Usage: cat <transaction.log> | {} [options]\n\n", program);
|
||||||
@ -38,6 +38,7 @@ fn main() {
|
|||||||
opts.optopt("b", "", "bind", "bind to port or address");
|
opts.optopt("b", "", "bind", "bind to port or address");
|
||||||
opts.optflag("d", "dyn", "detect network address dynamically");
|
opts.optflag("d", "dyn", "detect network address dynamically");
|
||||||
opts.optopt("s", "", "save", "save my identity to path.json");
|
opts.optopt("s", "", "save", "save my identity to path.json");
|
||||||
|
opts.optopt("l", "", "load", "load my identity to path.json");
|
||||||
opts.optflag("h", "help", "print help");
|
opts.optflag("h", "help", "print help");
|
||||||
opts.optopt(
|
opts.optopt(
|
||||||
"v",
|
"v",
|
||||||
@ -130,6 +131,12 @@ fn main() {
|
|||||||
// we need all the receiving sockets to be bound within the expected
|
// we need all the receiving sockets to be bound within the expected
|
||||||
// port range that we open on aws
|
// port range that we open on aws
|
||||||
let mut repl_data = make_repl_data(&bind_addr);
|
let mut repl_data = make_repl_data(&bind_addr);
|
||||||
|
if matches.opt_present("l") {
|
||||||
|
let path = matches.opt_str("l").unwrap();
|
||||||
|
if let Ok(file) = File::open(path) {
|
||||||
|
repl_data = serde_json::from_reader(file).expect("parse");
|
||||||
|
}
|
||||||
|
}
|
||||||
let threads = if matches.opt_present("v") {
|
let threads = if matches.opt_present("v") {
|
||||||
eprintln!("starting validator... {}", repl_data.requests_addr);
|
eprintln!("starting validator... {}", repl_data.requests_addr);
|
||||||
let path = matches.opt_str("v").unwrap();
|
let path = matches.opt_str("v").unwrap();
|
||||||
@ -149,10 +156,12 @@ fn main() {
|
|||||||
} else {
|
} else {
|
||||||
eprintln!("starting leader... {}", repl_data.requests_addr);
|
eprintln!("starting leader... {}", repl_data.requests_addr);
|
||||||
repl_data.current_leader_id = repl_data.id.clone();
|
repl_data.current_leader_id = repl_data.id.clone();
|
||||||
|
let file = File::create("leader.log").expect("leader.log create");
|
||||||
let server = Server::new_leader(
|
let server = Server::new_leader(
|
||||||
bank,
|
bank,
|
||||||
last_id,
|
last_id,
|
||||||
Some(Duration::from_millis(1000)),
|
//Some(Duration::from_millis(1000)),
|
||||||
|
None,
|
||||||
repl_data.clone(),
|
repl_data.clone(),
|
||||||
UdpSocket::bind(repl_data.requests_addr).unwrap(),
|
UdpSocket::bind(repl_data.requests_addr).unwrap(),
|
||||||
UdpSocket::bind(repl_data.transactions_addr).unwrap(),
|
UdpSocket::bind(repl_data.transactions_addr).unwrap(),
|
||||||
@ -160,7 +169,7 @@ fn main() {
|
|||||||
UdpSocket::bind("0.0.0.0:0").unwrap(),
|
UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||||
UdpSocket::bind(repl_data.gossip_addr).unwrap(),
|
UdpSocket::bind(repl_data.gossip_addr).unwrap(),
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
stdout(),
|
file,
|
||||||
);
|
);
|
||||||
server.thread_hdls
|
server.thread_hdls
|
||||||
};
|
};
|
||||||
@ -169,7 +178,7 @@ fn main() {
|
|||||||
let file = File::create(path).expect("file");
|
let file = File::create(path).expect("file");
|
||||||
serde_json::to_writer(file, &repl_data).expect("serialize");
|
serde_json::to_writer(file, &repl_data).expect("serialize");
|
||||||
}
|
}
|
||||||
eprintln!("Ready. Listening on {}", bind_addr);
|
eprintln!("Ready. Listening on {}", repl_data.events_addr);
|
||||||
|
|
||||||
for t in threads {
|
for t in threads {
|
||||||
t.join().expect("join");
|
t.join().expect("join");
|
||||||
|
26
src/crdt.rs
26
src/crdt.rs
@ -16,7 +16,7 @@
|
|||||||
use bincode::{deserialize, serialize};
|
use bincode::{deserialize, serialize};
|
||||||
use byteorder::{LittleEndian, ReadBytesExt};
|
use byteorder::{LittleEndian, ReadBytesExt};
|
||||||
use hash::Hash;
|
use hash::Hash;
|
||||||
use packet::SharedBlob;
|
use packet::{SharedBlob, BLOB_SIZE};
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use result::{Error, Result};
|
use result::{Error, Result};
|
||||||
use ring::rand::{SecureRandom, SystemRandom};
|
use ring::rand::{SecureRandom, SystemRandom};
|
||||||
@ -226,6 +226,7 @@ impl Crdt {
|
|||||||
.expect("set_index in pub fn broadcast");
|
.expect("set_index in pub fn broadcast");
|
||||||
//TODO profile this, may need multiple sockets for par_iter
|
//TODO profile this, may need multiple sockets for par_iter
|
||||||
trace!("broadcast {} to {}", blob.meta.size, v.replicate_addr);
|
trace!("broadcast {} to {}", blob.meta.size, v.replicate_addr);
|
||||||
|
assert!(blob.meta.size < BLOB_SIZE);
|
||||||
let e = s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr);
|
let e = s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr);
|
||||||
trace!("done broadcast {} to {}", blob.meta.size, v.replicate_addr);
|
trace!("done broadcast {} to {}", blob.meta.size, v.replicate_addr);
|
||||||
e
|
e
|
||||||
@ -285,6 +286,7 @@ impl Crdt {
|
|||||||
v.replicate_addr
|
v.replicate_addr
|
||||||
);
|
);
|
||||||
//TODO profile this, may need multiple sockets for par_iter
|
//TODO profile this, may need multiple sockets for par_iter
|
||||||
|
assert!(rblob.meta.size < BLOB_SIZE);
|
||||||
s.send_to(&rblob.data[..rblob.meta.size], &v.replicate_addr)
|
s.send_to(&rblob.data[..rblob.meta.size], &v.replicate_addr)
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
@ -327,14 +329,16 @@ impl Crdt {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec<u8>)> {
|
pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec<u8>)> {
|
||||||
if self.table.len() <= 1 {
|
let daddr = "0.0.0.0:0".parse().unwrap();
|
||||||
|
let valid: Vec<_> = self.table
|
||||||
|
.values()
|
||||||
|
.filter(|r| r.id != self.me && r.replicate_addr != daddr)
|
||||||
|
.collect();
|
||||||
|
if valid.is_empty() {
|
||||||
return Err(Error::CrdtTooSmall);
|
return Err(Error::CrdtTooSmall);
|
||||||
}
|
}
|
||||||
let mut n = (Self::random() as usize) % self.table.len();
|
let n = (Self::random() as usize) % valid.len();
|
||||||
while self.table.values().nth(n).unwrap().id == self.me {
|
let addr = valid[n].gossip_addr.clone();
|
||||||
n = (Self::random() as usize) % self.table.len();
|
|
||||||
}
|
|
||||||
let addr = self.table.values().nth(n).unwrap().gossip_addr.clone();
|
|
||||||
let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix);
|
let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix);
|
||||||
let out = serialize(&req)?;
|
let out = serialize(&req)?;
|
||||||
Ok((addr, out))
|
Ok((addr, out))
|
||||||
@ -431,6 +435,7 @@ impl Crdt {
|
|||||||
"responding RequestWindowIndex {} {}",
|
"responding RequestWindowIndex {} {}",
|
||||||
ix, from.replicate_addr
|
ix, from.replicate_addr
|
||||||
);
|
);
|
||||||
|
assert!(outblob.len() < BLOB_SIZE);
|
||||||
sock.send_to(&outblob, from.replicate_addr)?;
|
sock.send_to(&outblob, from.replicate_addr)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -442,7 +447,7 @@ impl Crdt {
|
|||||||
sock: &UdpSocket,
|
sock: &UdpSocket,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
//TODO cache connections
|
//TODO cache connections
|
||||||
let mut buf = vec![0u8; 1024 * 64];
|
let mut buf = vec![0u8; BLOB_SIZE];
|
||||||
trace!("recv_from on {}", sock.local_addr().unwrap());
|
trace!("recv_from on {}", sock.local_addr().unwrap());
|
||||||
let (amt, src) = sock.recv_from(&mut buf)?;
|
let (amt, src) = sock.recv_from(&mut buf)?;
|
||||||
trace!("got request from {}", src);
|
trace!("got request from {}", src);
|
||||||
@ -451,7 +456,7 @@ impl Crdt {
|
|||||||
match r {
|
match r {
|
||||||
// TODO sigverify these
|
// TODO sigverify these
|
||||||
Protocol::RequestUpdates(v, reqdata) => {
|
Protocol::RequestUpdates(v, reqdata) => {
|
||||||
trace!("RequestUpdates {}", v);
|
trace!("RequestUpdates {} from {}", v, src);
|
||||||
let addr = reqdata.gossip_addr;
|
let addr = reqdata.gossip_addr;
|
||||||
// only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from`
|
// only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from`
|
||||||
let (from, ups, data) = obj.read()
|
let (from, ups, data) = obj.read()
|
||||||
@ -464,12 +469,13 @@ impl Crdt {
|
|||||||
obj.write()
|
obj.write()
|
||||||
.expect("'obj' write lock in RequestUpdates")
|
.expect("'obj' write lock in RequestUpdates")
|
||||||
.insert(&reqdata);
|
.insert(&reqdata);
|
||||||
|
assert!(rsp.len() < BLOB_SIZE);
|
||||||
sock.send_to(&rsp, addr)
|
sock.send_to(&rsp, addr)
|
||||||
.expect("'sock.send_to' in RequestUpdates");
|
.expect("'sock.send_to' in RequestUpdates");
|
||||||
trace!("send_to done!");
|
trace!("send_to done!");
|
||||||
}
|
}
|
||||||
Protocol::ReceiveUpdates(from, ups, data) => {
|
Protocol::ReceiveUpdates(from, ups, data) => {
|
||||||
trace!("ReceivedUpdates");
|
trace!("ReceivedUpdates {} from {}", ups, src);
|
||||||
obj.write()
|
obj.write()
|
||||||
.expect("'obj' write lock in ReceiveUpdates")
|
.expect("'obj' write lock in ReceiveUpdates")
|
||||||
.apply_updates(from, ups, &data);
|
.apply_updates(from, ups, &data);
|
||||||
|
@ -55,7 +55,7 @@ fn batch_size(batches: &Vec<SharedPackets>) -> usize {
|
|||||||
batches
|
batches
|
||||||
.iter()
|
.iter()
|
||||||
.map(|p| p.read().unwrap().packets.len())
|
.map(|p| p.read().unwrap().packets.len())
|
||||||
.fold(0, |x, y| x + y)
|
.sum()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(feature = "cuda"))]
|
#[cfg(not(feature = "cuda"))]
|
||||||
|
@ -3,7 +3,7 @@
|
|||||||
use crdt::Crdt;
|
use crdt::Crdt;
|
||||||
#[cfg(feature = "erasure")]
|
#[cfg(feature = "erasure")]
|
||||||
use erasure;
|
use erasure;
|
||||||
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets};
|
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, BLOB_SIZE};
|
||||||
use result::{Error, Result};
|
use result::{Error, Result};
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
@ -177,10 +177,11 @@ fn repair_window(
|
|||||||
trace!("repair_window counter {} {}", *times, *consumed);
|
trace!("repair_window counter {} {}", *times, *consumed);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
info!("repair_window request {} {}", *consumed, *received);
|
|
||||||
let sock = UdpSocket::bind("0.0.0.0:0")?;
|
let sock = UdpSocket::bind("0.0.0.0:0")?;
|
||||||
for (to, req) in reqs {
|
for (to, req) in reqs {
|
||||||
//todo cache socket
|
//todo cache socket
|
||||||
|
info!("repair_window request {} {} {}", *consumed, *received, to);
|
||||||
|
assert!(req.len() < BLOB_SIZE);
|
||||||
sock.send_to(&req, to)?;
|
sock.send_to(&req, to)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -510,6 +511,7 @@ mod bench {
|
|||||||
let mut num = 0;
|
let mut num = 0;
|
||||||
for p in msgs_.read().unwrap().packets.iter() {
|
for p in msgs_.read().unwrap().packets.iter() {
|
||||||
let a = p.meta.addr();
|
let a = p.meta.addr();
|
||||||
|
assert!(p.meta.size < packet::BLOB_SIZE);
|
||||||
send.send_to(&p.data[..p.meta.size], &a).unwrap();
|
send.send_to(&p.data[..p.meta.size], &a).unwrap();
|
||||||
num += 1;
|
num += 1;
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user