Revoke API access to first_id

This commit is contained in:
Greg Fitzgerald
2018-04-02 09:30:10 -06:00
parent e683c34a89
commit 46e8c09bd8
5 changed files with 35 additions and 44 deletions

View File

@ -15,7 +15,7 @@ use signature::{KeyPair, PublicKey, Signature};
use std::collections::hash_map::Entry::Occupied; use std::collections::hash_map::Entry::Occupied;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::result; use std::result;
use std::sync::mpsc::SendError; use std::sync::mpsc::{Receiver, SendError};
use transaction::Transaction; use transaction::Transaction;
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
@ -36,9 +36,8 @@ fn complete_transaction(balances: &mut HashMap<PublicKey, i64>, plan: &Plan) {
} }
pub struct Accountant { pub struct Accountant {
pub historian: Historian, historian: Historian,
pub balances: HashMap<PublicKey, i64>, balances: HashMap<PublicKey, i64>,
pub first_id: Hash,
pending: HashMap<Signature, Plan>, pending: HashMap<Signature, Plan>,
time_sources: HashSet<PublicKey>, time_sources: HashSet<PublicKey>,
last_time: DateTime<Utc>, last_time: DateTime<Utc>,
@ -46,7 +45,7 @@ pub struct Accountant {
impl Accountant { impl Accountant {
/// Create an Accountant using an existing ledger. /// Create an Accountant using an existing ledger.
pub fn new_from_entries<I>(entries: I, ms_per_tick: Option<u64>) -> Self pub fn new_from_entries<I>(entries: I, ms_per_tick: Option<u64>) -> (Self, Hash)
where where
I: IntoIterator<Item = Entry>, I: IntoIterator<Item = Entry>,
{ {
@ -61,7 +60,6 @@ impl Accountant {
let mut acc = Accountant { let mut acc = Accountant {
historian: hist, historian: hist,
balances: HashMap::new(), balances: HashMap::new(),
first_id: start_hash,
pending: HashMap::new(), pending: HashMap::new(),
time_sources: HashSet::new(), time_sources: HashSet::new(),
last_time: Utc.timestamp(0, 0), last_time: Utc.timestamp(0, 0),
@ -73,17 +71,19 @@ impl Accountant {
let entry1 = entries.next().unwrap(); let entry1 = entries.next().unwrap();
acc.process_verified_event(&entry1.events[0], true).unwrap(); acc.process_verified_event(&entry1.events[0], true).unwrap();
let mut last_id = entry1.id;
for entry in entries { for entry in entries {
last_id = entry.id;
for event in entry.events { for event in entry.events {
acc.process_verified_event(&event, false).unwrap(); acc.process_verified_event(&event, false).unwrap();
} }
} }
acc (acc, last_id)
} }
/// Create an Accountant with only a Mint. Typically used by unit tests. /// Create an Accountant with only a Mint. Typically used by unit tests.
pub fn new(mint: &Mint, ms_per_tick: Option<u64>) -> Self { pub fn new(mint: &Mint, ms_per_tick: Option<u64>) -> Self {
Self::new_from_entries(mint.create_entries(), ms_per_tick) Self::new_from_entries(mint.create_entries(), ms_per_tick).0
} }
fn is_deposit(allow_deposits: bool, from: &PublicKey, plan: &Plan) -> bool { fn is_deposit(allow_deposits: bool, from: &PublicKey, plan: &Plan) -> bool {
@ -94,6 +94,10 @@ impl Accountant {
} }
} }
pub fn receiver(&self) -> &Receiver<Entry> {
&self.historian.receiver
}
/// Process and log the given Transaction. /// Process and log the given Transaction.
pub fn log_verified_transaction(&mut self, tr: Transaction) -> Result<()> { pub fn log_verified_transaction(&mut self, tr: Transaction) -> Result<()> {
if self.get_balance(&tr.from).unwrap_or(0) < tr.tokens { if self.get_balance(&tr.from).unwrap_or(0) < tr.tokens {

View File

@ -22,8 +22,8 @@ use transaction::Transaction;
use rayon::prelude::*; use rayon::prelude::*;
pub struct AccountantSkel<W: Write + Send + 'static> { pub struct AccountantSkel<W: Write + Send + 'static> {
pub acc: Accountant, acc: Accountant,
pub last_id: Hash, last_id: Hash,
writer: W, writer: W,
} }
@ -32,7 +32,7 @@ pub struct AccountantSkel<W: Write + Send + 'static> {
pub enum Request { pub enum Request {
Transaction(Transaction), Transaction(Transaction),
GetBalance { key: PublicKey }, GetBalance { key: PublicKey },
GetId { is_last: bool }, GetLastId,
} }
impl Request { impl Request {
@ -54,23 +54,22 @@ fn filter_valid_requests(reqs: Vec<(Request, SocketAddr)>) -> Vec<(Request, Sock
pub enum Response { pub enum Response {
Balance { key: PublicKey, val: Option<i64> }, Balance { key: PublicKey, val: Option<i64> },
Entries { entries: Vec<Entry> }, Entries { entries: Vec<Entry> },
Id { id: Hash, is_last: bool }, LastId { id: Hash },
} }
impl<W: Write + Send + 'static> AccountantSkel<W> { impl<W: Write + Send + 'static> AccountantSkel<W> {
/// Create a new AccountantSkel that wraps the given Accountant. /// Create a new AccountantSkel that wraps the given Accountant.
pub fn new(acc: Accountant, w: W) -> Self { pub fn new(acc: Accountant, last_id: Hash, writer: W) -> Self {
let last_id = acc.first_id;
AccountantSkel { AccountantSkel {
acc, acc,
last_id, last_id,
writer: w, writer,
} }
} }
/// Process any Entry items that have been published by the Historian. /// Process any Entry items that have been published by the Historian.
pub fn sync(&mut self) -> Hash { pub fn sync(&mut self) -> Hash {
while let Ok(entry) = self.acc.historian.receiver.try_recv() { while let Ok(entry) = self.acc.receiver().try_recv() {
self.last_id = entry.id; self.last_id = entry.id;
writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap(); writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
} }
@ -90,14 +89,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
let val = self.acc.get_balance(&key); let val = self.acc.get_balance(&key);
Some(Response::Balance { key, val }) Some(Response::Balance { key, val })
} }
Request::GetId { is_last } => Some(Response::Id { Request::GetLastId => Some(Response::LastId { id: self.sync() }),
id: if is_last {
self.sync()
} else {
self.acc.first_id
},
is_last,
}),
} }
} }

View File

@ -65,26 +65,21 @@ impl AccountantStub {
Ok(None) Ok(None)
} }
/// Request the first or last Entry ID from the server.
fn get_id(&self, is_last: bool) -> io::Result<Hash> {
let req = Request::GetId { is_last };
let data = serialize(&req).expect("serialize GetId");
self.socket.send_to(&data, &self.addr)?;
let mut buf = vec![0u8; 1024];
self.socket.recv_from(&mut buf)?;
let resp = deserialize(&buf).expect("deserialize Id");
if let Response::Id { id, .. } = resp {
return Ok(id);
}
Ok(Default::default())
}
/// Request the last Entry ID from the server. This method blocks /// Request the last Entry ID from the server. This method blocks
/// until the server sends a response. At the time of this writing, /// until the server sends a response. At the time of this writing,
/// it also has the side-effect of causing the server to log any /// it also has the side-effect of causing the server to log any
/// entries that have been published by the Historian. /// entries that have been published by the Historian.
pub fn get_last_id(&self) -> io::Result<Hash> { pub fn get_last_id(&self) -> io::Result<Hash> {
self.get_id(true) let req = Request::GetLastId;
let data = serialize(&req).expect("serialize GetId");
self.socket.send_to(&data, &self.addr)?;
let mut buf = vec![0u8; 1024];
self.socket.recv_from(&mut buf)?;
let resp = deserialize(&buf).expect("deserialize Id");
if let Response::LastId { id } = resp {
return Ok(id);
}
Ok(Default::default())
} }
} }
@ -110,7 +105,7 @@ mod tests {
let acc = Accountant::new(&alice, Some(30)); let acc = Accountant::new(&alice, Some(30));
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let acc = Arc::new(Mutex::new(AccountantSkel::new(acc, sink()))); let acc = Arc::new(Mutex::new(AccountantSkel::new(acc, alice.seed(), sink())));
let _threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap(); let _threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap();
sleep(Duration::from_millis(300)); sleep(Duration::from_millis(300));

View File

@ -14,9 +14,9 @@ fn main() {
.lock() .lock()
.lines() .lines()
.map(|line| serde_json::from_str(&line.unwrap()).unwrap()); .map(|line| serde_json::from_str(&line.unwrap()).unwrap());
let acc = Accountant::new_from_entries(entries, Some(1000)); let (acc, last_id) = Accountant::new_from_entries(entries, Some(1000));
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let skel = Arc::new(Mutex::new(AccountantSkel::new(acc, stdout()))); let skel = Arc::new(Mutex::new(AccountantSkel::new(acc, last_id, stdout())));
eprintln!("Listening on {}", addr); eprintln!("Listening on {}", addr);
let threads = AccountantSkel::serve(skel, addr, exit.clone()).unwrap(); let threads = AccountantSkel::serve(skel, addr, exit.clone()).unwrap();
for t in threads { for t in threads {

View File

@ -34,11 +34,11 @@ pub struct Recorder {
} }
impl Recorder { impl Recorder {
pub fn new(receiver: Receiver<Signal>, sender: SyncSender<Entry>, start_hash: Hash) -> Self { pub fn new(receiver: Receiver<Signal>, sender: SyncSender<Entry>, last_hash: Hash) -> Self {
Recorder { Recorder {
receiver, receiver,
sender, sender,
last_hash: start_hash, last_hash,
events: vec![], events: vec![],
num_hashes: 0, num_hashes: 0,
num_ticks: 0, num_ticks: 0,