Let clients subscribe to the ledger over TCP
TODO: Add more tests Fixes #27
This commit is contained in:
@ -1,5 +1,5 @@
|
|||||||
use accountant::Accountant;
|
use accountant::Accountant;
|
||||||
use bincode::{deserialize, serialize};
|
use bincode::{deserialize, serialize, serialize_into};
|
||||||
use entry::Entry;
|
use entry::Entry;
|
||||||
use hash::Hash;
|
use hash::Hash;
|
||||||
use result::Result;
|
use result::Result;
|
||||||
@ -7,7 +7,7 @@ use serde_json;
|
|||||||
use signature::PublicKey;
|
use signature::PublicKey;
|
||||||
use std::default::Default;
|
use std::default::Default;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::net::UdpSocket;
|
use std::net::{TcpListener, TcpStream, UdpSocket};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::channel;
|
use std::sync::mpsc::channel;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
@ -21,6 +21,7 @@ pub struct AccountantSkel<W: Write + Send + 'static> {
|
|||||||
pub last_id: Hash,
|
pub last_id: Hash,
|
||||||
pub ledger: Vec<Entry>,
|
pub ledger: Vec<Entry>,
|
||||||
writer: W,
|
writer: W,
|
||||||
|
subscribers: Vec<TcpStream>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
|
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
|
||||||
@ -47,6 +48,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
|||||||
last_id,
|
last_id,
|
||||||
ledger: vec![],
|
ledger: vec![],
|
||||||
writer: w,
|
writer: w,
|
||||||
|
subscribers: vec![],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -54,6 +56,12 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
|||||||
while let Ok(entry) = self.acc.historian.receiver.try_recv() {
|
while let Ok(entry) = self.acc.historian.receiver.try_recv() {
|
||||||
self.last_id = entry.id;
|
self.last_id = entry.id;
|
||||||
write!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
|
write!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
|
||||||
|
|
||||||
|
for mut subscriber in &self.subscribers {
|
||||||
|
// TODO: Handle errors. If TCP stream is closed, remove it.
|
||||||
|
serialize_into(subscriber, &entry).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
self.ledger.push(entry);
|
self.ledger.push(entry);
|
||||||
}
|
}
|
||||||
self.last_id
|
self.last_id
|
||||||
@ -92,8 +100,9 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
|||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process(
|
fn process(
|
||||||
&mut self,
|
obj: &Arc<Mutex<AccountantSkel<W>>>,
|
||||||
r_reader: &streamer::Receiver,
|
r_reader: &streamer::Receiver,
|
||||||
s_responder: &streamer::Responder,
|
s_responder: &streamer::Responder,
|
||||||
packet_recycler: &streamer::PacketRecycler,
|
packet_recycler: &streamer::PacketRecycler,
|
||||||
@ -110,7 +119,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
|||||||
for packet in &msgs.read().unwrap().packets {
|
for packet in &msgs.read().unwrap().packets {
|
||||||
let sz = packet.meta.size;
|
let sz = packet.meta.size;
|
||||||
let req = deserialize(&packet.data[0..sz])?;
|
let req = deserialize(&packet.data[0..sz])?;
|
||||||
if let Some(resp) = self.process_request(req) {
|
if let Some(resp) = obj.lock().unwrap().process_request(req) {
|
||||||
if ursps.responses.len() <= num {
|
if ursps.responses.len() <= num {
|
||||||
ursps
|
ursps
|
||||||
.responses
|
.responses
|
||||||
@ -153,21 +162,30 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
|||||||
let t_responder =
|
let t_responder =
|
||||||
streamer::responder(write, exit.clone(), response_recycler.clone(), r_responder);
|
streamer::responder(write, exit.clone(), response_recycler.clone(), r_responder);
|
||||||
|
|
||||||
let t_server = spawn(move || {
|
let skel = obj.clone();
|
||||||
if let Ok(me) = Arc::try_unwrap(obj) {
|
let t_server = spawn(move || loop {
|
||||||
loop {
|
let e = AccountantSkel::process(
|
||||||
let e = me.lock().unwrap().process(
|
&skel,
|
||||||
&r_reader,
|
&r_reader,
|
||||||
&s_responder,
|
&s_responder,
|
||||||
&packet_recycler,
|
&packet_recycler,
|
||||||
&response_recycler,
|
&response_recycler,
|
||||||
);
|
);
|
||||||
if e.is_err() && exit.load(Ordering::Relaxed) {
|
if e.is_err() && exit.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let listener = TcpListener::bind(addr)?;
|
||||||
|
let t_listener = spawn(move || {
|
||||||
|
for stream in listener.incoming() {
|
||||||
|
match stream {
|
||||||
|
Ok(stream) => obj.lock().unwrap().subscribers.push(stream),
|
||||||
|
Err(_) => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Ok(vec![t_receiver, t_responder, t_server])
|
|
||||||
|
Ok(vec![t_receiver, t_responder, t_server, t_listener])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,24 +3,26 @@
|
|||||||
//! transfer funds to other users.
|
//! transfer funds to other users.
|
||||||
|
|
||||||
use accountant_skel::{Request, Response};
|
use accountant_skel::{Request, Response};
|
||||||
use bincode::{deserialize, serialize};
|
use bincode::{deserialize, serialize, serialized_size};
|
||||||
use entry::Entry;
|
use entry::Entry;
|
||||||
use hash::Hash;
|
use hash::Hash;
|
||||||
use signature::{KeyPair, PublicKey, Signature};
|
use signature::{KeyPair, PublicKey, Signature};
|
||||||
use std::io;
|
use std::io::{self, Read};
|
||||||
use std::net::UdpSocket;
|
use std::net::{TcpStream, UdpSocket};
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
|
||||||
pub struct AccountantStub {
|
pub struct AccountantStub {
|
||||||
pub addr: String,
|
pub addr: String,
|
||||||
pub socket: UdpSocket,
|
pub socket: UdpSocket,
|
||||||
|
pub stream: TcpStream,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AccountantStub {
|
impl AccountantStub {
|
||||||
pub fn new(addr: &str, socket: UdpSocket) -> Self {
|
pub fn new(addr: &str, socket: UdpSocket, stream: TcpStream) -> Self {
|
||||||
AccountantStub {
|
AccountantStub {
|
||||||
addr: addr.to_string(),
|
addr: addr.to_string(),
|
||||||
socket,
|
socket,
|
||||||
|
stream,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -79,25 +81,29 @@ impl AccountantStub {
|
|||||||
last_id: &Hash,
|
last_id: &Hash,
|
||||||
) -> io::Result<(bool, Hash)> {
|
) -> io::Result<(bool, Hash)> {
|
||||||
let mut last_id = *last_id;
|
let mut last_id = *last_id;
|
||||||
let req = Request::GetEntries { last_id };
|
|
||||||
let data = serialize(&req).unwrap();
|
|
||||||
self.socket.send_to(&data, &self.addr).map(|_| ())?;
|
|
||||||
|
|
||||||
let mut buf = vec![0u8; 65_535];
|
let mut buf = vec![0u8; 65_535];
|
||||||
self.socket.recv_from(&mut buf)?;
|
let mut buf_offset = 0;
|
||||||
let resp = deserialize(&buf).expect("deserialize signature");
|
|
||||||
let mut found = false;
|
let mut found = false;
|
||||||
if let Response::Entries { entries } = resp {
|
if let Ok(bytes) = self.stream.read(&mut buf) {
|
||||||
for Entry { id, events, .. } in entries {
|
loop {
|
||||||
last_id = id;
|
match deserialize::<Entry>(&buf[buf_offset..]) {
|
||||||
if !found {
|
Ok(entry) => {
|
||||||
for event in events {
|
buf_offset += serialized_size(&entry).unwrap() as usize;
|
||||||
if let Some(sig) = event.get_signature() {
|
last_id = entry.id;
|
||||||
if sig == *wait_sig {
|
if !found {
|
||||||
found = true;
|
for event in entry.events {
|
||||||
|
if let Some(sig) = event.get_signature() {
|
||||||
|
if sig == *wait_sig {
|
||||||
|
found = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Err(_) => {
|
||||||
|
println!("read {} of {} in buf", buf_offset, bytes);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -112,6 +118,9 @@ impl AccountantStub {
|
|||||||
let ret = self.check_on_signature(wait_sig, &last_id)?;
|
let ret = self.check_on_signature(wait_sig, &last_id)?;
|
||||||
found = ret.0;
|
found = ret.0;
|
||||||
last_id = ret.1;
|
last_id = ret.1;
|
||||||
|
|
||||||
|
// Clunky way to force a sync in the skel.
|
||||||
|
self.get_last_id()?;
|
||||||
}
|
}
|
||||||
Ok(last_id)
|
Ok(last_id)
|
||||||
}
|
}
|
||||||
@ -139,19 +148,19 @@ mod tests {
|
|||||||
let bob_pubkey = KeyPair::new().pubkey();
|
let bob_pubkey = KeyPair::new().pubkey();
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let acc = Arc::new(Mutex::new(AccountantSkel::new(acc, sink())));
|
let acc = Arc::new(Mutex::new(AccountantSkel::new(acc, sink())));
|
||||||
let threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap();
|
let _threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap();
|
||||||
sleep(Duration::from_millis(300));
|
sleep(Duration::from_millis(300));
|
||||||
|
|
||||||
let socket = UdpSocket::bind(send_addr).unwrap();
|
let socket = UdpSocket::bind(send_addr).unwrap();
|
||||||
let mut acc = AccountantStub::new(addr, socket);
|
let stream = TcpStream::connect(addr).expect("tcp connect");
|
||||||
|
stream.set_nonblocking(true).expect("nonblocking");
|
||||||
|
|
||||||
|
let mut acc = AccountantStub::new(addr, socket, stream);
|
||||||
let last_id = acc.get_last_id().unwrap();
|
let last_id = acc.get_last_id().unwrap();
|
||||||
let sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
|
let sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
acc.wait_on_signature(&sig, &last_id).unwrap();
|
acc.wait_on_signature(&sig, &last_id).unwrap();
|
||||||
assert_eq!(acc.get_balance(&bob_pubkey).unwrap().unwrap(), 500);
|
assert_eq!(acc.get_balance(&bob_pubkey).unwrap().unwrap(), 500);
|
||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
for t in threads {
|
|
||||||
t.join().expect("join");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,7 @@ use silk::mint::Mint;
|
|||||||
use silk::signature::{KeyPair, KeyPairUtil};
|
use silk::signature::{KeyPair, KeyPairUtil};
|
||||||
use silk::transaction::Transaction;
|
use silk::transaction::Transaction;
|
||||||
use std::io::stdin;
|
use std::io::stdin;
|
||||||
use std::net::UdpSocket;
|
use std::net::{TcpStream, UdpSocket};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
@ -18,7 +18,8 @@ fn main() {
|
|||||||
let mint_pubkey = mint.pubkey();
|
let mint_pubkey = mint.pubkey();
|
||||||
|
|
||||||
let socket = UdpSocket::bind(send_addr).unwrap();
|
let socket = UdpSocket::bind(send_addr).unwrap();
|
||||||
let mut acc = AccountantStub::new(addr, socket);
|
let stream = TcpStream::connect(send_addr).unwrap();
|
||||||
|
let mut acc = AccountantStub::new(addr, socket, stream);
|
||||||
let last_id = acc.get_last_id().unwrap();
|
let last_id = acc.get_last_id().unwrap();
|
||||||
|
|
||||||
let txs = acc.get_balance(&mint_pubkey).unwrap().unwrap();
|
let txs = acc.get_balance(&mint_pubkey).unwrap().unwrap();
|
||||||
|
Reference in New Issue
Block a user