diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index b4bf805c63..b10b2d7321 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -33,6 +33,7 @@ pub struct AccountantSkel { last_id: Hash, writer: W, historian: Historian, + entry_info_subscribers: Vec, } #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] @@ -41,6 +42,19 @@ pub enum Request { Transaction(Transaction), GetBalance { key: PublicKey }, GetLastId, + Subscribe { subscriptions: Vec }, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Subscription { + EntryInfo, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct EntryInfo { + id: Hash, + num_hashes: u64, + num_events: u64, } impl Request { @@ -68,6 +82,22 @@ impl AccountantSkel { last_id, writer, historian, + entry_info_subscribers: vec![], + } + } + + fn notify_entry_info_subscribers(&mut self, entry: &Entry) { + // TODO: No need to bind(). + let socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); + + for addr in &self.entry_info_subscribers { + let entry_info = EntryInfo { + id: entry.id, + num_hashes: entry.num_hashes, + num_events: entry.events.len() as u64, + }; + let data = serialize(&entry_info).expect("serialize EntryInfo"); + let _res = socket.send_to(&data, addr); } } @@ -77,6 +107,7 @@ impl AccountantSkel { self.last_id = entry.id; self.acc.register_entry_id(&self.last_id); writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap(); + self.notify_entry_info_subscribers(&entry); } self.last_id } @@ -94,6 +125,14 @@ impl AccountantSkel { } Request::GetLastId => Some((Response::LastId { id: self.sync() }, rsp_addr)), Request::Transaction(_) => unreachable!(), + Request::Subscribe { subscriptions } => { + for subscription in subscriptions { + match subscription { + Subscription::EntryInfo => self.entry_info_subscribers.push(rsp_addr), + } + } + None + } } } @@ -242,6 +281,9 @@ impl AccountantSkel { blob_sender.send(blobs)?; } packet_recycler.recycle(msgs); + + // Write new entries to the ledger and notify subscribers. + obj.lock().unwrap().sync(); } Ok(()) }