diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index f04949cd9e..c18b04c413 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -1,12 +1,12 @@ use accountant::Accountant; -use bincode::{deserialize, serialize, serialize_into}; +use bincode::{deserialize, serialize}; use entry::Entry; use hash::Hash; use result::Result; use serde_json; use signature::PublicKey; use std::default::Default; -use std::io::Write; +use std::io::{ErrorKind, Write}; use std::net::{TcpListener, TcpStream, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; @@ -52,12 +52,14 @@ impl AccountantSkel { pub fn sync(&mut self) -> Hash { while let Ok(entry) = self.acc.historian.receiver.try_recv() { self.last_id = entry.id; - write!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap(); + writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap(); - for mut subscriber in &self.subscribers { - // TODO: Handle errors. If TCP stream is closed, remove it. - serialize_into(subscriber, &entry).unwrap(); - } + let buf = serialize(&entry).expect("serialize"); + self.subscribers + .retain(|ref mut subscriber| match subscriber.write(&buf) { + Err(err) => err.kind() != ErrorKind::BrokenPipe, + _ => true, + }); } self.last_id } diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index c7b835198a..3f607b20ef 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -84,7 +84,7 @@ impl AccountantStub { let mut buf = vec![0u8; 65_535]; let mut buf_offset = 0; let mut found = false; - if let Ok(bytes) = self.stream.read(&mut buf) { + if let Ok(_bytes) = self.stream.read(&mut buf) { loop { match deserialize::(&buf[buf_offset..]) { Ok(entry) => { @@ -100,10 +100,7 @@ impl AccountantStub { } } } - Err(_) => { - println!("read {} of {} in buf", buf_offset, bytes); - break; - } + Err(_) => break, } } } diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index bde0551296..8c075fa4aa 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -18,14 +18,17 @@ fn main() { let mint_pubkey = mint.pubkey(); let socket = UdpSocket::bind(send_addr).unwrap(); - let stream = TcpStream::connect(send_addr).unwrap(); + let stream = TcpStream::connect(addr).unwrap(); + stream.set_nonblocking(true).expect("nonblocking"); + let mut acc = AccountantStub::new(addr, socket, stream); let last_id = acc.get_last_id().unwrap(); - let txs = acc.get_balance(&mint_pubkey).unwrap().unwrap(); - println!("Mint's Initial Balance {}", txs); + let mint_balance = acc.get_balance(&mint_pubkey).unwrap().unwrap(); + println!("Mint's Initial Balance {}", mint_balance); println!("Signing transactions..."); + let txs = mint_balance; let now = Instant::now(); let transactions: Vec<_> = (0..txs) .map(|_| { @@ -66,7 +69,9 @@ fn main() { acc.transfer_signed(tr).unwrap(); } println!("Waiting for last transaction to be confirmed...",); - acc.wait_on_signature(&sig, &last_id).unwrap(); + if txs > 0 { + acc.wait_on_signature(&sig, &last_id).unwrap(); + } let duration = now.elapsed(); let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); @@ -74,5 +79,5 @@ fn main() { println!("Done. {} tps!", tps); let val = acc.get_balance(&mint_pubkey).unwrap().unwrap(); println!("Mint's Final Balance {}", val); - assert_eq!(val, 0); + assert_eq!(val, mint_balance - txs); }