Add a way to subscribe for new entry metadata

This commit is contained in:
Greg Fitzgerald
2018-04-17 17:31:52 -04:00
parent 58860ed19f
commit ea8bfb46ce

View File

@ -33,6 +33,7 @@ pub struct AccountantSkel<W: Write + Send + 'static> {
last_id: Hash,
writer: W,
historian: Historian,
entry_info_subscribers: Vec<SocketAddr>,
}
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
@ -41,6 +42,19 @@ pub enum Request {
Transaction(Transaction),
GetBalance { key: PublicKey },
GetLastId,
Subscribe { subscriptions: Vec<Subscription> },
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Subscription {
EntryInfo,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct EntryInfo {
id: Hash,
num_hashes: u64,
num_events: u64,
}
impl Request {
@ -68,6 +82,22 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
last_id,
writer,
historian,
entry_info_subscribers: vec![],
}
}
fn notify_entry_info_subscribers(&mut self, entry: &Entry) {
// TODO: No need to bind().
let socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
for addr in &self.entry_info_subscribers {
let entry_info = EntryInfo {
id: entry.id,
num_hashes: entry.num_hashes,
num_events: entry.events.len() as u64,
};
let data = serialize(&entry_info).expect("serialize EntryInfo");
let _res = socket.send_to(&data, addr);
}
}
@ -77,6 +107,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
self.last_id = entry.id;
self.acc.register_entry_id(&self.last_id);
writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
self.notify_entry_info_subscribers(&entry);
}
self.last_id
}
@ -94,6 +125,14 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
}
Request::GetLastId => Some((Response::LastId { id: self.sync() }, rsp_addr)),
Request::Transaction(_) => unreachable!(),
Request::Subscribe { subscriptions } => {
for subscription in subscriptions {
match subscription {
Subscription::EntryInfo => self.entry_info_subscribers.push(rsp_addr),
}
}
None
}
}
}
@ -242,6 +281,9 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
blob_sender.send(blobs)?;
}
packet_recycler.recycle(msgs);
// Write new entries to the ledger and notify subscribers.
obj.lock().unwrap().sync();
}
Ok(())
}