Multinode fixes and test
* Replace magic numbers for 64k event size * Fix gossip, dont ping yourself * Retransmit only to listening nodes * Multinode test in stub marked unstable
This commit is contained in:
committed by
Anatoly Yakovenko
parent
6af27669b0
commit
e8f5fb35ac
@ -19,6 +19,7 @@ use serde_json;
|
|||||||
use signature::PublicKey;
|
use signature::PublicKey;
|
||||||
use std::cmp::max;
|
use std::cmp::max;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
use std::io::sink;
|
||||||
use std::io::{Cursor, Write};
|
use std::io::{Cursor, Write};
|
||||||
use std::mem::size_of;
|
use std::mem::size_of;
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
@ -100,6 +101,8 @@ impl AccountantSkel {
|
|||||||
num_events: entry.events.len() as u64,
|
num_events: entry.events.len() as u64,
|
||||||
};
|
};
|
||||||
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
|
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
|
||||||
|
trace!("sending {} to {}", data.len(), addr);
|
||||||
|
//TODO dont do IO here, this needs to be on a separate channel
|
||||||
let res = socket.send_to(&data, addr);
|
let res = socket.send_to(&data, addr);
|
||||||
if res.is_err() {
|
if res.is_err() {
|
||||||
eprintln!("couldn't send response: {:?}", res);
|
eprintln!("couldn't send response: {:?}", res);
|
||||||
@ -182,16 +185,11 @@ impl AccountantSkel {
|
|||||||
broadcast: &streamer::BlobSender,
|
broadcast: &streamer::BlobSender,
|
||||||
blob_recycler: &packet::BlobRecycler,
|
blob_recycler: &packet::BlobRecycler,
|
||||||
writer: &Arc<Mutex<W>>,
|
writer: &Arc<Mutex<W>>,
|
||||||
exit: Arc<AtomicBool>,
|
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut q = VecDeque::new();
|
let mut q = VecDeque::new();
|
||||||
while let Ok(list) = Self::receive_all(&obj, writer) {
|
let list = Self::receive_all(&obj, writer)?;
|
||||||
trace!("New blobs? {}", list.len());
|
trace!("New blobs? {}", list.len());
|
||||||
Self::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
|
Self::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
|
||||||
if exit.load(Ordering::Relaxed) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !q.is_empty() {
|
if !q.is_empty() {
|
||||||
broadcast.send(q)?;
|
broadcast.send(q)?;
|
||||||
}
|
}
|
||||||
@ -206,14 +204,26 @@ impl AccountantSkel {
|
|||||||
writer: Arc<Mutex<W>>,
|
writer: Arc<Mutex<W>>,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
spawn(move || loop {
|
spawn(move || loop {
|
||||||
let e = Self::run_sync(
|
let _ = Self::run_sync(obj.clone(), &broadcast, &blob_recycler, &writer);
|
||||||
obj.clone(),
|
if exit.load(Ordering::Relaxed) {
|
||||||
&broadcast,
|
info!("sync_service exiting");
|
||||||
&blob_recycler,
|
break;
|
||||||
&writer,
|
}
|
||||||
exit.clone(),
|
})
|
||||||
);
|
}
|
||||||
if e.is_err() && exit.load(Ordering::Relaxed) {
|
|
||||||
|
/// Process any Entry items that have been published by the Historian.
|
||||||
|
/// continuosly broadcast blobs of entries out
|
||||||
|
fn run_sync_no_broadcast(obj: SharedSkel) -> Result<()> {
|
||||||
|
Self::receive_all(&obj, &Arc::new(Mutex::new(sink())))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn sync_no_broadcast_service(obj: SharedSkel, exit: Arc<AtomicBool>) -> JoinHandle<()> {
|
||||||
|
spawn(move || loop {
|
||||||
|
let _ = Self::run_sync_no_broadcast(obj.clone());
|
||||||
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
info!("sync_no_broadcast_service exiting");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -228,7 +238,9 @@ impl AccountantSkel {
|
|||||||
match msg {
|
match msg {
|
||||||
Request::GetBalance { key } => {
|
Request::GetBalance { key } => {
|
||||||
let val = self.acc.lock().unwrap().get_balance(&key);
|
let val = self.acc.lock().unwrap().get_balance(&key);
|
||||||
Some((Response::Balance { key, val }, rsp_addr))
|
let rsp = (Response::Balance { key, val }, rsp_addr);
|
||||||
|
info!("Response::Balance {:?}", rsp);
|
||||||
|
Some(rsp)
|
||||||
}
|
}
|
||||||
Request::Transaction(_) => unreachable!(),
|
Request::Transaction(_) => unreachable!(),
|
||||||
Request::Subscribe { subscriptions } => {
|
Request::Subscribe { subscriptions } => {
|
||||||
@ -247,10 +259,10 @@ impl AccountantSkel {
|
|||||||
fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<Vec<SharedPackets>> {
|
fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<Vec<SharedPackets>> {
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
let msgs = recvr.recv_timeout(timer)?;
|
let msgs = recvr.recv_timeout(timer)?;
|
||||||
trace!("got msgs");
|
debug!("got msgs");
|
||||||
let mut batch = vec![msgs];
|
let mut batch = vec![msgs];
|
||||||
while let Ok(more) = recvr.try_recv() {
|
while let Ok(more) = recvr.try_recv() {
|
||||||
trace!("got more msgs");
|
debug!("got more msgs");
|
||||||
batch.push(more);
|
batch.push(more);
|
||||||
}
|
}
|
||||||
info!("batch len {}", batch.len());
|
info!("batch len {}", batch.len());
|
||||||
@ -275,6 +287,7 @@ impl AccountantSkel {
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let batch = Self::recv_batch(recvr)?;
|
let batch = Self::recv_batch(recvr)?;
|
||||||
let verified_batches = Self::verify_batch(batch);
|
let verified_batches = Self::verify_batch(batch);
|
||||||
|
debug!("verified batches: {}", verified_batches.len());
|
||||||
for xs in verified_batches {
|
for xs in verified_batches {
|
||||||
sendr.send(xs)?;
|
sendr.send(xs)?;
|
||||||
}
|
}
|
||||||
@ -315,8 +328,9 @@ impl AccountantSkel {
|
|||||||
&self,
|
&self,
|
||||||
req_vers: Vec<(Request, SocketAddr, u8)>,
|
req_vers: Vec<(Request, SocketAddr, u8)>,
|
||||||
) -> Result<Vec<(Response, SocketAddr)>> {
|
) -> Result<Vec<(Response, SocketAddr)>> {
|
||||||
trace!("partitioning");
|
debug!("partitioning");
|
||||||
let (trs, reqs) = Self::partition_requests(req_vers);
|
let (trs, reqs) = Self::partition_requests(req_vers);
|
||||||
|
debug!("trs: {} reqs: {}", trs.len(), reqs.len());
|
||||||
|
|
||||||
// Process the transactions in parallel and then log the successful ones.
|
// Process the transactions in parallel and then log the successful ones.
|
||||||
for result in self.acc.lock().unwrap().process_verified_transactions(trs) {
|
for result in self.acc.lock().unwrap().process_verified_transactions(trs) {
|
||||||
@ -328,15 +342,21 @@ impl AccountantSkel {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
debug!("processing verified");
|
||||||
|
|
||||||
// Let validators know they should not attempt to process additional
|
// Let validators know they should not attempt to process additional
|
||||||
// transactions in parallel.
|
// transactions in parallel.
|
||||||
self.historian_input.lock().unwrap().send(Signal::Tick)?;
|
self.historian_input.lock().unwrap().send(Signal::Tick)?;
|
||||||
|
|
||||||
|
debug!("after historian_input");
|
||||||
|
|
||||||
// Process the remaining requests serially.
|
// Process the remaining requests serially.
|
||||||
let rsps = reqs.into_iter()
|
let rsps = reqs.into_iter()
|
||||||
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
|
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
debug!("returning rsps");
|
||||||
|
|
||||||
Ok(rsps)
|
Ok(rsps)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -377,7 +397,7 @@ impl AccountantSkel {
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
let mms = verified_receiver.recv_timeout(timer)?;
|
let mms = verified_receiver.recv_timeout(timer)?;
|
||||||
trace!("got some messages: {}", mms.len());
|
debug!("got some messages: {}", mms.len());
|
||||||
for (msgs, vers) in mms {
|
for (msgs, vers) in mms {
|
||||||
let reqs = Self::deserialize_packets(&msgs.read().unwrap());
|
let reqs = Self::deserialize_packets(&msgs.read().unwrap());
|
||||||
let req_vers = reqs.into_iter()
|
let req_vers = reqs.into_iter()
|
||||||
@ -389,18 +409,18 @@ impl AccountantSkel {
|
|||||||
v
|
v
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
trace!("process_packets");
|
debug!("process_packets");
|
||||||
let rsps = obj.process_packets(req_vers)?;
|
let rsps = obj.process_packets(req_vers)?;
|
||||||
trace!("done process_packets");
|
debug!("done process_packets");
|
||||||
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
|
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
|
||||||
trace!("sending blobs: {}", blobs.len());
|
|
||||||
if !blobs.is_empty() {
|
if !blobs.is_empty() {
|
||||||
|
info!("process: sending blobs: {}", blobs.len());
|
||||||
//don't wake up the other side if there is nothing
|
//don't wake up the other side if there is nothing
|
||||||
responder_sender.send(blobs)?;
|
responder_sender.send(blobs)?;
|
||||||
}
|
}
|
||||||
packet_recycler.recycle(msgs);
|
packet_recycler.recycle(msgs);
|
||||||
}
|
}
|
||||||
trace!("done responding");
|
debug!("done responding");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
/// Process verified blobs, already in order
|
/// Process verified blobs, already in order
|
||||||
@ -412,6 +432,7 @@ impl AccountantSkel {
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
let blobs = verified_receiver.recv_timeout(timer)?;
|
let blobs = verified_receiver.recv_timeout(timer)?;
|
||||||
|
trace!("replicating blobs {}", blobs.len());
|
||||||
for msgs in &blobs {
|
for msgs in &blobs {
|
||||||
let blob = msgs.read().unwrap();
|
let blob = msgs.read().unwrap();
|
||||||
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
|
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
|
||||||
@ -541,10 +562,12 @@ impl AccountantSkel {
|
|||||||
obj: &SharedSkel,
|
obj: &SharedSkel,
|
||||||
me: ReplicatedData,
|
me: ReplicatedData,
|
||||||
gossip: UdpSocket,
|
gossip: UdpSocket,
|
||||||
|
serve: UdpSocket,
|
||||||
replicate: UdpSocket,
|
replicate: UdpSocket,
|
||||||
leader: ReplicatedData,
|
leader: ReplicatedData,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
) -> Result<Vec<JoinHandle<()>>> {
|
) -> Result<Vec<JoinHandle<()>>> {
|
||||||
|
//replicate pipeline
|
||||||
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
|
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
|
||||||
crdt.write().unwrap().set_leader(leader.id);
|
crdt.write().unwrap().set_leader(leader.id);
|
||||||
crdt.write().unwrap().insert(leader);
|
crdt.write().unwrap().insert(leader);
|
||||||
@ -580,7 +603,7 @@ impl AccountantSkel {
|
|||||||
//then sent to the window, which does the erasure coding reconstruction
|
//then sent to the window, which does the erasure coding reconstruction
|
||||||
let t_window = streamer::window(
|
let t_window = streamer::window(
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
crdt,
|
crdt.clone(),
|
||||||
blob_recycler.clone(),
|
blob_recycler.clone(),
|
||||||
blob_receiver,
|
blob_receiver,
|
||||||
window_sender,
|
window_sender,
|
||||||
@ -588,19 +611,76 @@ impl AccountantSkel {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let skel = obj.clone();
|
let skel = obj.clone();
|
||||||
let t_server = spawn(move || loop {
|
let s_exit = exit.clone();
|
||||||
|
let t_replicator = spawn(move || loop {
|
||||||
let e = Self::replicate_state(&skel, &window_receiver, &blob_recycler);
|
let e = Self::replicate_state(&skel, &window_receiver, &blob_recycler);
|
||||||
if e.is_err() && exit.load(Ordering::Relaxed) {
|
if e.is_err() && s_exit.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
//serve pipeline
|
||||||
|
// make sure we are on the same interface
|
||||||
|
let mut local = serve.local_addr()?;
|
||||||
|
local.set_port(0);
|
||||||
|
let respond_socket = UdpSocket::bind(local.clone())?;
|
||||||
|
|
||||||
|
let packet_recycler = packet::PacketRecycler::default();
|
||||||
|
let blob_recycler = packet::BlobRecycler::default();
|
||||||
|
let (packet_sender, packet_receiver) = channel();
|
||||||
|
let t_packet_receiver =
|
||||||
|
streamer::receiver(serve, exit.clone(), packet_recycler.clone(), packet_sender)?;
|
||||||
|
let (responder_sender, responder_receiver) = channel();
|
||||||
|
let t_responder = streamer::responder(
|
||||||
|
respond_socket,
|
||||||
|
exit.clone(),
|
||||||
|
blob_recycler.clone(),
|
||||||
|
responder_receiver,
|
||||||
|
);
|
||||||
|
let (verified_sender, verified_receiver) = channel();
|
||||||
|
|
||||||
|
let exit_ = exit.clone();
|
||||||
|
let t_verifier = spawn(move || loop {
|
||||||
|
let e = Self::verifier(&packet_receiver, &verified_sender);
|
||||||
|
if e.is_err() && exit_.load(Ordering::Relaxed) {
|
||||||
|
trace!("verifier exiting");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone());
|
||||||
|
|
||||||
|
let skel = obj.clone();
|
||||||
|
let s_exit = exit.clone();
|
||||||
|
let t_server = spawn(move || loop {
|
||||||
|
let e = Self::process(
|
||||||
|
&mut skel.clone(),
|
||||||
|
&verified_receiver,
|
||||||
|
&responder_sender,
|
||||||
|
&packet_recycler,
|
||||||
|
&blob_recycler,
|
||||||
|
);
|
||||||
|
if e.is_err() {
|
||||||
|
if s_exit.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
Ok(vec![
|
Ok(vec![
|
||||||
|
//replicate threads
|
||||||
t_blob_receiver,
|
t_blob_receiver,
|
||||||
t_retransmit,
|
t_retransmit,
|
||||||
t_window,
|
t_window,
|
||||||
t_server,
|
t_replicator,
|
||||||
t_gossip,
|
t_gossip,
|
||||||
t_listen,
|
t_listen,
|
||||||
|
//serve threads
|
||||||
|
t_packet_receiver,
|
||||||
|
t_responder,
|
||||||
|
t_server,
|
||||||
|
t_verifier,
|
||||||
|
t_sync,
|
||||||
])
|
])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -769,7 +849,7 @@ mod tests {
|
|||||||
tr2.data.plan = Plan::new_payment(502, bob_pubkey);
|
tr2.data.plan = Plan::new_payment(502, bob_pubkey);
|
||||||
let _sig = acc_stub.transfer_signed(tr2).unwrap();
|
let _sig = acc_stub.transfer_signed(tr2).unwrap();
|
||||||
|
|
||||||
assert_eq!(acc_stub.get_balance(&bob_pubkey).wait().unwrap(), 500);
|
assert_eq!(acc_stub.get_balance(&bob_pubkey).unwrap(), 500);
|
||||||
trace!("exiting");
|
trace!("exiting");
|
||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
trace!("joining threads");
|
trace!("joining threads");
|
||||||
@ -797,7 +877,7 @@ mod tests {
|
|||||||
fn test_replicate() {
|
fn test_replicate() {
|
||||||
logger::setup();
|
logger::setup();
|
||||||
let (leader_data, leader_gossip, _, leader_serve) = test_node();
|
let (leader_data, leader_gossip, _, leader_serve) = test_node();
|
||||||
let (target1_data, target1_gossip, target1_replicate, _) = test_node();
|
let (target1_data, target1_gossip, target1_replicate, target1_serve) = test_node();
|
||||||
let (target2_data, target2_gossip, target2_replicate, _) = test_node();
|
let (target2_data, target2_gossip, target2_replicate, _) = test_node();
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
@ -851,6 +931,7 @@ mod tests {
|
|||||||
&acc,
|
&acc,
|
||||||
target1_data,
|
target1_data,
|
||||||
target1_gossip,
|
target1_gossip,
|
||||||
|
target1_serve,
|
||||||
target1_replicate,
|
target1_replicate,
|
||||||
leader_data,
|
leader_data,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
|
@ -11,6 +11,7 @@ use signature::{KeyPair, PublicKey, Signature};
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
|
use std::time::Duration;
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
|
||||||
pub struct AccountantStub {
|
pub struct AccountantStub {
|
||||||
@ -47,7 +48,10 @@ impl AccountantStub {
|
|||||||
|
|
||||||
pub fn recv_response(&self) -> io::Result<Response> {
|
pub fn recv_response(&self) -> io::Result<Response> {
|
||||||
let mut buf = vec![0u8; 1024];
|
let mut buf = vec![0u8; 1024];
|
||||||
|
self.socket.set_read_timeout(Some(Duration::new(1, 0)))?;
|
||||||
|
info!("start recv_from");
|
||||||
self.socket.recv_from(&mut buf)?;
|
self.socket.recv_from(&mut buf)?;
|
||||||
|
info!("end recv_from");
|
||||||
let resp = deserialize(&buf).expect("deserialize balance");
|
let resp = deserialize(&buf).expect("deserialize balance");
|
||||||
Ok(resp)
|
Ok(resp)
|
||||||
}
|
}
|
||||||
@ -55,9 +59,11 @@ impl AccountantStub {
|
|||||||
pub fn process_response(&mut self, resp: Response) {
|
pub fn process_response(&mut self, resp: Response) {
|
||||||
match resp {
|
match resp {
|
||||||
Response::Balance { key, val } => {
|
Response::Balance { key, val } => {
|
||||||
|
info!("Response balance {:?} {:?}", key, val);
|
||||||
self.balances.insert(key, val);
|
self.balances.insert(key, val);
|
||||||
}
|
}
|
||||||
Response::EntryInfo(entry_info) => {
|
Response::EntryInfo(entry_info) => {
|
||||||
|
trace!("Response entry_info {:?}", entry_info.id);
|
||||||
self.last_id = Some(entry_info.id);
|
self.last_id = Some(entry_info.id);
|
||||||
self.num_events += entry_info.num_events;
|
self.num_events += entry_info.num_events;
|
||||||
}
|
}
|
||||||
@ -88,7 +94,8 @@ impl AccountantStub {
|
|||||||
/// Request the balance of the user holding `pubkey`. This method blocks
|
/// Request the balance of the user holding `pubkey`. This method blocks
|
||||||
/// until the server sends a response. If the response packet is dropped
|
/// until the server sends a response. If the response packet is dropped
|
||||||
/// by the network, this method will hang indefinitely.
|
/// by the network, this method will hang indefinitely.
|
||||||
pub fn get_balance(&mut self, pubkey: &PublicKey) -> FutureResult<i64, i64> {
|
pub fn get_balance(&mut self, pubkey: &PublicKey) -> io::Result<i64> {
|
||||||
|
info!("get_balance");
|
||||||
let req = Request::GetBalance { key: *pubkey };
|
let req = Request::GetBalance { key: *pubkey };
|
||||||
let data = serialize(&req).expect("serialize GetBalance");
|
let data = serialize(&req).expect("serialize GetBalance");
|
||||||
self.socket
|
self.socket
|
||||||
@ -96,13 +103,14 @@ impl AccountantStub {
|
|||||||
.expect("buffer error");
|
.expect("buffer error");
|
||||||
let mut done = false;
|
let mut done = false;
|
||||||
while !done {
|
while !done {
|
||||||
let resp = self.recv_response().expect("recv response");
|
let resp = self.recv_response()?;
|
||||||
|
info!("recv_response {:?}", resp);
|
||||||
if let &Response::Balance { ref key, .. } = &resp {
|
if let &Response::Balance { ref key, .. } = &resp {
|
||||||
done = key == pubkey;
|
done = key == pubkey;
|
||||||
}
|
}
|
||||||
self.process_response(resp);
|
self.process_response(resp);
|
||||||
}
|
}
|
||||||
ok(self.balances[pubkey].unwrap())
|
self.balances[pubkey].ok_or(io::Error::new(io::ErrorKind::Other, "nokey"))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Request the last Entry ID from the server. This method blocks
|
/// Request the last Entry ID from the server. This method blocks
|
||||||
@ -146,6 +154,7 @@ mod tests {
|
|||||||
use crdt::ReplicatedData;
|
use crdt::ReplicatedData;
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
use historian::Historian;
|
use historian::Historian;
|
||||||
|
use logger;
|
||||||
use mint::Mint;
|
use mint::Mint;
|
||||||
use signature::{KeyPair, KeyPairUtil};
|
use signature::{KeyPair, KeyPairUtil};
|
||||||
use std::io::sink;
|
use std::io::sink;
|
||||||
@ -158,6 +167,7 @@ mod tests {
|
|||||||
// TODO: Figure out why this test sometimes hangs on TravisCI.
|
// TODO: Figure out why this test sometimes hangs on TravisCI.
|
||||||
#[test]
|
#[test]
|
||||||
fn test_accountant_stub() {
|
fn test_accountant_stub() {
|
||||||
|
logger::setup();
|
||||||
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let addr = serve.local_addr().unwrap();
|
let addr = serve.local_addr().unwrap();
|
||||||
@ -186,10 +196,168 @@ mod tests {
|
|||||||
let last_id = acc.get_last_id().wait().unwrap();
|
let last_id = acc.get_last_id().wait().unwrap();
|
||||||
let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
|
let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(acc.get_balance(&bob_pubkey).wait().unwrap(), 500);
|
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500);
|
||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
for t in threads {
|
for t in threads {
|
||||||
t.join().unwrap();
|
t.join().unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(all(feature = "unstable", test))]
|
||||||
|
mod unstsable_tests {
|
||||||
|
use super::*;
|
||||||
|
use accountant::Accountant;
|
||||||
|
use accountant_skel::AccountantSkel;
|
||||||
|
use crdt::{Crdt, ReplicatedData};
|
||||||
|
use futures::Future;
|
||||||
|
use historian::Historian;
|
||||||
|
use logger;
|
||||||
|
use mint::Mint;
|
||||||
|
use signature::{KeyPair, KeyPairUtil};
|
||||||
|
use std::io::sink;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use std::sync::mpsc::sync_channel;
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
use std::thread::sleep;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket) {
|
||||||
|
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
let pubkey = KeyPair::new().pubkey();
|
||||||
|
let leader = ReplicatedData::new(
|
||||||
|
pubkey,
|
||||||
|
gossip.local_addr().unwrap(),
|
||||||
|
replicate.local_addr().unwrap(),
|
||||||
|
serve.local_addr().unwrap(),
|
||||||
|
);
|
||||||
|
(leader, gossip, serve, replicate)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_multi_accountant_stub() {
|
||||||
|
logger::setup();
|
||||||
|
info!("test_multi_accountant_stub");
|
||||||
|
let leader = test_node();
|
||||||
|
let replicant = test_node();
|
||||||
|
let alice = Mint::new(10_000);
|
||||||
|
let bob_pubkey = KeyPair::new().pubkey();
|
||||||
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
|
let leader_acc = {
|
||||||
|
let (input, event_receiver) = sync_channel(10);
|
||||||
|
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
||||||
|
let acc = Accountant::new(&alice);
|
||||||
|
Arc::new(AccountantSkel::new(acc, input, historian))
|
||||||
|
};
|
||||||
|
|
||||||
|
let replicant_acc = {
|
||||||
|
let (input, event_receiver) = sync_channel(10);
|
||||||
|
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
||||||
|
let acc = Accountant::new(&alice);
|
||||||
|
Arc::new(AccountantSkel::new(acc, input, historian))
|
||||||
|
};
|
||||||
|
|
||||||
|
let leader_threads = AccountantSkel::serve(
|
||||||
|
&leader_acc,
|
||||||
|
leader.0.clone(),
|
||||||
|
leader.2,
|
||||||
|
leader.1,
|
||||||
|
exit.clone(),
|
||||||
|
sink(),
|
||||||
|
).unwrap();
|
||||||
|
let replicant_threads = AccountantSkel::replicate(
|
||||||
|
&replicant_acc,
|
||||||
|
replicant.0.clone(),
|
||||||
|
replicant.1,
|
||||||
|
replicant.2,
|
||||||
|
replicant.3,
|
||||||
|
leader.0.clone(),
|
||||||
|
exit.clone(),
|
||||||
|
).unwrap();
|
||||||
|
|
||||||
|
//lets spy on the network
|
||||||
|
let (mut spy, spy_gossip, _, _) = test_node();
|
||||||
|
let daddr = "0.0.0.0:0".parse().unwrap();
|
||||||
|
spy.replicate_addr = daddr;
|
||||||
|
spy.serve_addr = daddr;
|
||||||
|
let mut spy_crdt = Crdt::new(spy);
|
||||||
|
spy_crdt.insert(leader.0.clone());
|
||||||
|
spy_crdt.set_leader(leader.0.id);
|
||||||
|
|
||||||
|
let spy_ref = Arc::new(RwLock::new(spy_crdt));
|
||||||
|
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_gossip, exit.clone());
|
||||||
|
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
|
||||||
|
//wait for the network to converge
|
||||||
|
for _ in 0..20 {
|
||||||
|
let ix = spy_ref.read().unwrap().update_index;
|
||||||
|
info!("my update index is {}", ix);
|
||||||
|
let len = spy_ref.read().unwrap().remote.values().len();
|
||||||
|
let mut done = false;
|
||||||
|
info!("remote len {}", len);
|
||||||
|
if len > 1 && ix > 2 {
|
||||||
|
done = true;
|
||||||
|
//check if everyones remote index is greater or equal to ours
|
||||||
|
let vs: Vec<u64> = spy_ref.read().unwrap().remote.values().cloned().collect();
|
||||||
|
for t in vs.into_iter() {
|
||||||
|
info!("remote update index is {} vs {}", t, ix);
|
||||||
|
if t < 3 {
|
||||||
|
done = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if done == true {
|
||||||
|
info!("converged!");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
sleep(Duration::new(1, 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
//verify leader can do transfer
|
||||||
|
let leader_balance = {
|
||||||
|
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
|
||||||
|
|
||||||
|
let mut acc = AccountantStub::new(leader.0.serve_addr, socket);
|
||||||
|
info!("getting leader last_id");
|
||||||
|
let last_id = acc.get_last_id().wait().unwrap();
|
||||||
|
info!("executing leader transer");
|
||||||
|
let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
|
||||||
|
.unwrap();
|
||||||
|
info!("getting leader balance");
|
||||||
|
acc.get_balance(&bob_pubkey).unwrap()
|
||||||
|
};
|
||||||
|
assert_eq!(leader_balance, 500);
|
||||||
|
//verify replicant has the same balance
|
||||||
|
let mut replicant_balance = 0;
|
||||||
|
for _ in 0..10 {
|
||||||
|
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
|
||||||
|
|
||||||
|
let mut acc = AccountantStub::new(replicant.0.serve_addr, socket);
|
||||||
|
info!("getting replicant balance");
|
||||||
|
if let Ok(bal) = acc.get_balance(&bob_pubkey) {
|
||||||
|
replicant_balance = bal;
|
||||||
|
}
|
||||||
|
info!("replicant balance {}", replicant_balance);
|
||||||
|
if replicant_balance == leader_balance {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
sleep(Duration::new(1, 0));
|
||||||
|
}
|
||||||
|
assert_eq!(replicant_balance, leader_balance);
|
||||||
|
|
||||||
|
exit.store(true, Ordering::Relaxed);
|
||||||
|
for t in leader_threads {
|
||||||
|
t.join().unwrap();
|
||||||
|
}
|
||||||
|
for t in replicant_threads {
|
||||||
|
t.join().unwrap();
|
||||||
|
}
|
||||||
|
for t in vec![t_spy_listen, t_spy_gossip] {
|
||||||
|
t.join().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -18,6 +18,8 @@ use std::env;
|
|||||||
use std::io::{stdin, Read};
|
use std::io::{stdin, Read};
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
use std::process::exit;
|
use std::process::exit;
|
||||||
|
use std::thread::sleep;
|
||||||
|
use std::time::Duration;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use untrusted::Input;
|
use untrusted::Input;
|
||||||
|
|
||||||
@ -38,7 +40,7 @@ fn main() {
|
|||||||
let mut opts = Options::new();
|
let mut opts = Options::new();
|
||||||
opts.optopt("s", "", "server address", "host:port");
|
opts.optopt("s", "", "server address", "host:port");
|
||||||
opts.optopt("c", "", "client address", "host:port");
|
opts.optopt("c", "", "client address", "host:port");
|
||||||
opts.optopt("t", "", "number of threads", "4");
|
opts.optopt("t", "", "number of threads", &format!("{}", threads));
|
||||||
opts.optflag("h", "help", "print help");
|
opts.optflag("h", "help", "print help");
|
||||||
let args: Vec<String> = env::args().collect();
|
let args: Vec<String> = env::args().collect();
|
||||||
let matches = match opts.parse(&args[1..]) {
|
let matches = match opts.parse(&args[1..]) {
|
||||||
@ -84,6 +86,7 @@ fn main() {
|
|||||||
|
|
||||||
println!("Binding to {}", client_addr);
|
println!("Binding to {}", client_addr);
|
||||||
let socket = UdpSocket::bind(&client_addr).unwrap();
|
let socket = UdpSocket::bind(&client_addr).unwrap();
|
||||||
|
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
|
||||||
let mut acc = AccountantStub::new(addr.parse().unwrap(), socket);
|
let mut acc = AccountantStub::new(addr.parse().unwrap(), socket);
|
||||||
|
|
||||||
println!("Get last ID...");
|
println!("Get last ID...");
|
||||||
@ -104,7 +107,7 @@ fn main() {
|
|||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id))
|
.map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id))
|
||||||
.collect();
|
.collect();
|
||||||
let duration = now.elapsed();
|
let mut duration = now.elapsed();
|
||||||
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
|
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
|
||||||
let bsps = txs as f64 / ns as f64;
|
let bsps = txs as f64 / ns as f64;
|
||||||
let nsps = ns as f64 / txs as f64;
|
let nsps = ns as f64 / txs as f64;
|
||||||
@ -115,6 +118,7 @@ fn main() {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let initial_tx_count = acc.transaction_count();
|
let initial_tx_count = acc.transaction_count();
|
||||||
|
println!("initial count {}", initial_tx_count);
|
||||||
|
|
||||||
println!("Transfering {} transactions in {} batches", txs, threads);
|
println!("Transfering {} transactions in {} batches", txs, threads);
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
@ -131,16 +135,16 @@ fn main() {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
println!("Waiting for half the transactions to complete...",);
|
println!("Waiting for transactions to complete...",);
|
||||||
let mut tx_count = acc.transaction_count();
|
let mut tx_count;
|
||||||
while tx_count < transactions.len() as u64 / 2 {
|
for _ in 0..5 {
|
||||||
tx_count = acc.transaction_count();
|
tx_count = acc.transaction_count();
|
||||||
}
|
duration = now.elapsed();
|
||||||
let txs = tx_count - initial_tx_count;
|
let txs = tx_count - initial_tx_count;
|
||||||
println!("Transactions processed {}", txs);
|
println!("Transactions processed {}", txs);
|
||||||
|
|
||||||
let duration = now.elapsed();
|
|
||||||
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
|
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
|
||||||
let tps = (txs * 1_000_000_000) as f64 / ns as f64;
|
let tps = (txs * 1_000_000_000) as f64 / ns as f64;
|
||||||
println!("Done. {} tps", tps);
|
println!("{} tps", tps);
|
||||||
|
sleep(Duration::new(1, 0));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -76,6 +76,8 @@ fn main() {
|
|||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
|
eprintln!("done parsing...");
|
||||||
|
|
||||||
// The first item in the ledger is required to be an entry with zero num_hashes,
|
// The first item in the ledger is required to be an entry with zero num_hashes,
|
||||||
// which implies its id can be used as the ledger's seed.
|
// which implies its id can be used as the ledger's seed.
|
||||||
let entry0 = entries.next().unwrap();
|
let entry0 = entries.next().unwrap();
|
||||||
@ -90,10 +92,14 @@ fn main() {
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
|
eprintln!("creating accountant...");
|
||||||
|
|
||||||
let acc = Accountant::new_from_deposit(&deposit.unwrap());
|
let acc = Accountant::new_from_deposit(&deposit.unwrap());
|
||||||
acc.register_entry_id(&entry0.id);
|
acc.register_entry_id(&entry0.id);
|
||||||
acc.register_entry_id(&entry1.id);
|
acc.register_entry_id(&entry1.id);
|
||||||
|
|
||||||
|
eprintln!("processing entries...");
|
||||||
|
|
||||||
let mut last_id = entry1.id;
|
let mut last_id = entry1.id;
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
last_id = entry.id;
|
last_id = entry.id;
|
||||||
@ -101,6 +107,8 @@ fn main() {
|
|||||||
acc.register_entry_id(&last_id);
|
acc.register_entry_id(&last_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
eprintln!("creating networking stack...");
|
||||||
|
|
||||||
let (input, event_receiver) = sync_channel(10_000);
|
let (input, event_receiver) = sync_channel(10_000);
|
||||||
let historian = Historian::new(event_receiver, &last_id, Some(1000));
|
let historian = Historian::new(event_receiver, &last_id, Some(1000));
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
@ -115,6 +123,7 @@ fn main() {
|
|||||||
replicate_sock.local_addr().unwrap(),
|
replicate_sock.local_addr().unwrap(),
|
||||||
serve_sock.local_addr().unwrap(),
|
serve_sock.local_addr().unwrap(),
|
||||||
);
|
);
|
||||||
|
eprintln!("starting server...");
|
||||||
let threads =
|
let threads =
|
||||||
AccountantSkel::serve(&skel, d, serve_sock, gossip_sock, exit.clone(), stdout()).unwrap();
|
AccountantSkel::serve(&skel, d, serve_sock, gossip_sock, exit.clone(), stdout()).unwrap();
|
||||||
eprintln!("Ready. Listening on {}", serve_addr);
|
eprintln!("Ready. Listening on {}", serve_addr);
|
||||||
|
69
src/crdt.rs
69
src/crdt.rs
@ -91,7 +91,7 @@ pub struct Crdt {
|
|||||||
local: HashMap<PublicKey, u64>,
|
local: HashMap<PublicKey, u64>,
|
||||||
/// The value of the remote update index that i have last seen
|
/// The value of the remote update index that i have last seen
|
||||||
/// This Node will ask external nodes for updates since the value in this list
|
/// This Node will ask external nodes for updates since the value in this list
|
||||||
remote: HashMap<PublicKey, u64>,
|
pub remote: HashMap<PublicKey, u64>,
|
||||||
pub update_index: u64,
|
pub update_index: u64,
|
||||||
me: PublicKey,
|
me: PublicKey,
|
||||||
timeout: Duration,
|
timeout: Duration,
|
||||||
@ -172,20 +172,33 @@ impl Crdt {
|
|||||||
let cloned_table: Vec<ReplicatedData> = robj.table.values().cloned().collect();
|
let cloned_table: Vec<ReplicatedData> = robj.table.values().cloned().collect();
|
||||||
(robj.table[&robj.me].clone(), cloned_table)
|
(robj.table[&robj.me].clone(), cloned_table)
|
||||||
};
|
};
|
||||||
let errs: Vec<_> = table
|
let daddr = "0.0.0.0:0".parse().unwrap();
|
||||||
|
let items: Vec<(usize, &ReplicatedData)> = table
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.filter(|v| {
|
||||||
.cycle()
|
|
||||||
.zip(blobs.iter())
|
|
||||||
.map(|((i, v), b)| {
|
|
||||||
if me.id == v.id {
|
if me.id == v.id {
|
||||||
return Ok(0);
|
//filter myself
|
||||||
|
false
|
||||||
|
} else if v.replicate_addr == daddr {
|
||||||
|
//filter nodes that are not listening
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
true
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
.enumerate()
|
||||||
|
.collect();
|
||||||
|
let orders: Vec<_> = items.into_iter().cycle().zip(blobs.iter()).collect();
|
||||||
|
let errs: Vec<_> = orders
|
||||||
|
.into_par_iter()
|
||||||
|
.map(|((i, v), b)| {
|
||||||
// only leader should be broadcasting
|
// only leader should be broadcasting
|
||||||
assert!(me.current_leader_id != v.id);
|
assert!(me.current_leader_id != v.id);
|
||||||
let mut blob = b.write().unwrap();
|
let mut blob = b.write().unwrap();
|
||||||
|
blob.set_id(me.id).expect("set_id");
|
||||||
blob.set_index(*transmit_index + i as u64)
|
blob.set_index(*transmit_index + i as u64)
|
||||||
.expect("set_index");
|
.expect("set_index");
|
||||||
|
//TODO profile this, may need multiple sockets for par_iter
|
||||||
s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr)
|
s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr)
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
@ -210,17 +223,28 @@ impl Crdt {
|
|||||||
(s.table[&s.me].clone(), s.table.values().cloned().collect())
|
(s.table[&s.me].clone(), s.table.values().cloned().collect())
|
||||||
};
|
};
|
||||||
let rblob = blob.read().unwrap();
|
let rblob = blob.read().unwrap();
|
||||||
let errs: Vec<_> = table
|
let daddr = "0.0.0.0:0".parse().unwrap();
|
||||||
|
let orders: Vec<_> = table
|
||||||
|
.iter()
|
||||||
|
.filter(|v| {
|
||||||
|
if me.id == v.id {
|
||||||
|
false
|
||||||
|
} else if me.current_leader_id == v.id {
|
||||||
|
trace!("skip retransmit to leader {:?}", v.id);
|
||||||
|
false
|
||||||
|
} else if v.replicate_addr == daddr {
|
||||||
|
trace!("skip nodes that are not listening {:?}", v.id);
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let errs: Vec<_> = orders
|
||||||
.par_iter()
|
.par_iter()
|
||||||
.map(|v| {
|
.map(|v| {
|
||||||
if me.id == v.id {
|
|
||||||
return Ok(0);
|
|
||||||
}
|
|
||||||
if me.current_leader_id == v.id {
|
|
||||||
trace!("skip retransmit to leader{:?}", v.id);
|
|
||||||
return Ok(0);
|
|
||||||
}
|
|
||||||
trace!("retransmit blob to {}", v.replicate_addr);
|
trace!("retransmit blob to {}", v.replicate_addr);
|
||||||
|
//TODO profile this, may need multiple sockets for par_iter
|
||||||
s.send_to(&rblob.data[..rblob.meta.size], &v.replicate_addr)
|
s.send_to(&rblob.data[..rblob.meta.size], &v.replicate_addr)
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
@ -258,13 +282,18 @@ impl Crdt {
|
|||||||
/// (A,B)
|
/// (A,B)
|
||||||
/// * A - Address to send to
|
/// * A - Address to send to
|
||||||
/// * B - RequestUpdates protocol message
|
/// * B - RequestUpdates protocol message
|
||||||
fn gossip_request(&self) -> (SocketAddr, Protocol) {
|
fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> {
|
||||||
let n = (Self::random() as usize) % self.table.len();
|
if self.table.len() <= 1 {
|
||||||
trace!("random {:?} {}", &self.me[0..1], n);
|
return Err(Error::GeneralError);
|
||||||
|
}
|
||||||
|
let mut n = (Self::random() as usize) % self.table.len();
|
||||||
|
while self.table.values().nth(n).unwrap().id == self.me {
|
||||||
|
n = (Self::random() as usize) % self.table.len();
|
||||||
|
}
|
||||||
let v = self.table.values().nth(n).unwrap().clone();
|
let v = self.table.values().nth(n).unwrap().clone();
|
||||||
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
|
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
|
||||||
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
|
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
|
||||||
(v.gossip_addr, req)
|
Ok((v.gossip_addr, req))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// At random pick a node and try to get updated changes from them
|
/// At random pick a node and try to get updated changes from them
|
||||||
@ -274,7 +303,7 @@ impl Crdt {
|
|||||||
|
|
||||||
// Lock the object only to do this operation and not for any longer
|
// Lock the object only to do this operation and not for any longer
|
||||||
// especially not when doing the `sock.send_to`
|
// especially not when doing the `sock.send_to`
|
||||||
let (remote_gossip_addr, req) = obj.read().unwrap().gossip_request();
|
let (remote_gossip_addr, req) = obj.read().unwrap().gossip_request()?;
|
||||||
let sock = UdpSocket::bind("0.0.0.0:0")?;
|
let sock = UdpSocket::bind("0.0.0.0:0")?;
|
||||||
// TODO this will get chatty, so we need to first ask for number of updates since
|
// TODO this will get chatty, so we need to first ask for number of updates since
|
||||||
// then only ask for specific data that we dont have
|
// then only ask for specific data that we dont have
|
||||||
|
@ -176,18 +176,26 @@ impl Packets {
|
|||||||
// * read until it fails
|
// * read until it fails
|
||||||
// * set it back to blocking before returning
|
// * set it back to blocking before returning
|
||||||
socket.set_nonblocking(false)?;
|
socket.set_nonblocking(false)?;
|
||||||
|
let mut error_msgs = 0;
|
||||||
for p in &mut self.packets {
|
for p in &mut self.packets {
|
||||||
p.meta.size = 0;
|
p.meta.size = 0;
|
||||||
|
trace!("receiving");
|
||||||
match socket.recv_from(&mut p.data) {
|
match socket.recv_from(&mut p.data) {
|
||||||
Err(_) if i > 0 => {
|
Err(_) if i > 0 => {
|
||||||
trace!("got {:?} messages", i);
|
debug!("got {:?} messages", i);
|
||||||
|
error_msgs += 1;
|
||||||
|
if error_msgs > 30 {
|
||||||
break;
|
break;
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
info!("recv_from err {:?}", e);
|
trace!("recv_from err {:?}", e);
|
||||||
return Err(Error::IO(e));
|
return Err(Error::IO(e));
|
||||||
}
|
}
|
||||||
Ok((nrecv, from)) => {
|
Ok((nrecv, from)) => {
|
||||||
|
error_msgs = 0;
|
||||||
p.meta.size = nrecv;
|
p.meta.size = nrecv;
|
||||||
p.meta.set_addr(&from);
|
p.meta.set_addr(&from);
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
@ -202,6 +210,7 @@ impl Packets {
|
|||||||
pub fn recv_from(&mut self, socket: &UdpSocket) -> Result<()> {
|
pub fn recv_from(&mut self, socket: &UdpSocket) -> Result<()> {
|
||||||
let sz = self.run_read_from(socket)?;
|
let sz = self.run_read_from(socket)?;
|
||||||
self.packets.resize(sz, Packet::default());
|
self.packets.resize(sz, Packet::default());
|
||||||
|
debug!("recv_from: {}", sz);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub fn send_to(&self, socket: &UdpSocket) -> Result<()> {
|
pub fn send_to(&self, socket: &UdpSocket) -> Result<()> {
|
||||||
@ -233,6 +242,7 @@ impl Blob {
|
|||||||
let e = deserialize(&self.data[BLOB_INDEX_END..BLOB_ID_END])?;
|
let e = deserialize(&self.data[BLOB_INDEX_END..BLOB_ID_END])?;
|
||||||
Ok(e)
|
Ok(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_id(&mut self, id: PublicKey) -> Result<()> {
|
pub fn set_id(&mut self, id: PublicKey) -> Result<()> {
|
||||||
let wtr = serialize(&id)?;
|
let wtr = serialize(&id)?;
|
||||||
self.data[BLOB_INDEX_END..BLOB_ID_END].clone_from_slice(&wtr);
|
self.data[BLOB_INDEX_END..BLOB_ID_END].clone_from_slice(&wtr);
|
||||||
|
@ -8,6 +8,7 @@
|
|||||||
use entry::{create_entry_mut, Entry};
|
use entry::{create_entry_mut, Entry};
|
||||||
use event::Event;
|
use event::Event;
|
||||||
use hash::{hash, Hash};
|
use hash::{hash, Hash};
|
||||||
|
use packet::BLOB_SIZE;
|
||||||
use std::mem;
|
use std::mem;
|
||||||
use std::sync::mpsc::{Receiver, SyncSender, TryRecvError};
|
use std::sync::mpsc::{Receiver, SyncSender, TryRecvError};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
@ -83,7 +84,7 @@ impl Recorder {
|
|||||||
// Record an entry early if we anticipate its serialized size will
|
// Record an entry early if we anticipate its serialized size will
|
||||||
// be larger than 64kb. At the time of this writing, we assume each
|
// be larger than 64kb. At the time of this writing, we assume each
|
||||||
// event will be well under 256 bytes.
|
// event will be well under 256 bytes.
|
||||||
if self.events.len() >= 65_536 / 256 {
|
if self.events.len() >= BLOB_SIZE / (2 * mem::size_of::<Event>()) {
|
||||||
self.record_entry()?;
|
self.record_entry()?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -100,8 +101,8 @@ mod tests {
|
|||||||
use super::*;
|
use super::*;
|
||||||
use bincode::serialize;
|
use bincode::serialize;
|
||||||
use signature::{KeyPair, KeyPairUtil};
|
use signature::{KeyPair, KeyPairUtil};
|
||||||
use transaction::Transaction;
|
|
||||||
use std::sync::mpsc::sync_channel;
|
use std::sync::mpsc::sync_channel;
|
||||||
|
use transaction::Transaction;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sub64k_entry_size() {
|
fn test_sub64k_entry_size() {
|
||||||
|
@ -18,6 +18,7 @@ pub enum Error {
|
|||||||
AccountingError(accountant::AccountingError),
|
AccountingError(accountant::AccountingError),
|
||||||
SendError,
|
SendError,
|
||||||
Services,
|
Services,
|
||||||
|
GeneralError,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
Reference in New Issue
Block a user