From 4751e459ccfea19b7b67b4ac1190c962ee7b7cd3 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 6 May 2018 22:25:05 -0700 Subject: [PATCH] fixed! --- src/accountant_stub.rs | 24 +++--------------------- src/packet.rs | 10 ++-------- src/recorder.rs | 4 ++-- 3 files changed, 7 insertions(+), 31 deletions(-) diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index dd853ca649..a3715cf837 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -149,7 +149,7 @@ mod tests { use super::*; use accountant::Accountant; use accountant_skel::AccountantSkel; - use crdt::ReplicatedData; + use crdt::{Crdt, ReplicatedData}; use futures::Future; use historian::Historian; use logger; @@ -158,7 +158,7 @@ mod tests { use std::io::sink; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::sync_channel; - use std::sync::Arc; + use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; @@ -199,25 +199,6 @@ mod tests { t.join().unwrap(); } } -} - -#[cfg(all(feature = "unstable", test))] -mod unstsable_tests { - use super::*; - use accountant::Accountant; - use accountant_skel::AccountantSkel; - use crdt::{Crdt, ReplicatedData}; - use futures::Future; - use historian::Historian; - use logger; - use mint::Mint; - use signature::{KeyPair, KeyPairUtil}; - use std::io::sink; - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::mpsc::sync_channel; - use std::sync::{Arc, RwLock}; - use std::thread::sleep; - use std::time::Duration; fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket) { let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -234,6 +215,7 @@ mod unstsable_tests { } #[test] + // #[ignore] fn test_multi_accountant_stub() { logger::setup(); info!("test_multi_accountant_stub"); diff --git a/src/packet.rs b/src/packet.rs index 471fe67fa5..713a166f68 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -17,6 +17,7 @@ pub type BlobRecycler = Recycler; pub const NUM_PACKETS: usize = 1024 * 8; pub const BLOB_SIZE: usize = 64 * 1024; +pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_ID_END; pub const PACKET_DATA_SIZE: usize = 256; pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE; @@ -176,26 +177,19 @@ impl Packets { // * read until it fails // * set it back to blocking before returning socket.set_nonblocking(false)?; - let mut error_msgs = 0; for p in &mut self.packets { p.meta.size = 0; trace!("receiving"); match socket.recv_from(&mut p.data) { Err(_) if i > 0 => { debug!("got {:?} messages", i); - error_msgs += 1; - if error_msgs > 30 { - break; - } else { - continue; - } + break; } Err(e) => { trace!("recv_from err {:?}", e); return Err(Error::IO(e)); } Ok((nrecv, from)) => { - error_msgs = 0; p.meta.size = nrecv; p.meta.set_addr(&from); if i == 0 { diff --git a/src/recorder.rs b/src/recorder.rs index cb6f81f09c..68a8cf8dae 100644 --- a/src/recorder.rs +++ b/src/recorder.rs @@ -8,7 +8,7 @@ use entry::{create_entry_mut, Entry}; use event::Event; use hash::{hash, Hash}; -use packet::BLOB_SIZE; +use packet::BLOB_DATA_SIZE; use std::mem; use std::sync::mpsc::{Receiver, SyncSender, TryRecvError}; use std::time::{Duration, Instant}; @@ -84,7 +84,7 @@ impl Recorder { // Record an entry early if we anticipate its serialized size will // be larger than 64kb. At the time of this writing, we assume each // event will be well under 256 bytes. - if self.events.len() >= BLOB_SIZE / 256 { + if self.events.len() >= BLOB_DATA_SIZE / 256 { self.record_entry()?; } }