Move Accounting stage functionality into its own object
This commit is contained in:
225
src/tpu.rs
225
src/tpu.rs
@ -32,13 +32,98 @@ use streamer;
|
|||||||
use timing;
|
use timing;
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
|
||||||
pub struct Tpu {
|
struct AccountingStage {
|
||||||
acc: Mutex<Accountant>,
|
acc: Mutex<Accountant>,
|
||||||
historian_input: Mutex<SyncSender<Signal>>,
|
historian_input: Mutex<SyncSender<Signal>>,
|
||||||
historian: Historian,
|
historian: Historian,
|
||||||
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
|
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl AccountingStage {
|
||||||
|
/// Create a new Tpu that wraps the given Accountant.
|
||||||
|
pub fn new(acc: Accountant, historian_input: SyncSender<Signal>, historian: Historian) -> Self {
|
||||||
|
AccountingStage {
|
||||||
|
acc: Mutex::new(acc),
|
||||||
|
entry_info_subscribers: Mutex::new(vec![]),
|
||||||
|
historian_input: Mutex::new(historian_input),
|
||||||
|
historian,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process the transactions in parallel and then log the successful ones.
|
||||||
|
pub fn process_events(&self, events: Vec<Event>) -> Result<()> {
|
||||||
|
let results = self.acc.lock().unwrap().process_verified_events(events);
|
||||||
|
let events = results.into_iter().filter_map(|x| x.ok()).collect();
|
||||||
|
let sender = self.historian_input.lock().unwrap();
|
||||||
|
sender.send(Signal::Events(events))?;
|
||||||
|
debug!("after historian_input");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process Request items sent by clients.
|
||||||
|
fn process_request(
|
||||||
|
&self,
|
||||||
|
msg: Request,
|
||||||
|
rsp_addr: SocketAddr,
|
||||||
|
) -> Option<(Response, SocketAddr)> {
|
||||||
|
match msg {
|
||||||
|
Request::GetBalance { key } => {
|
||||||
|
let val = self.acc.lock().unwrap().get_balance(&key);
|
||||||
|
let rsp = (Response::Balance { key, val }, rsp_addr);
|
||||||
|
info!("Response::Balance {:?}", rsp);
|
||||||
|
Some(rsp)
|
||||||
|
}
|
||||||
|
Request::Transaction(_) => unreachable!(),
|
||||||
|
Request::Subscribe { subscriptions } => {
|
||||||
|
for subscription in subscriptions {
|
||||||
|
match subscription {
|
||||||
|
Subscription::EntryInfo => {
|
||||||
|
self.entry_info_subscribers.lock().unwrap().push(rsp_addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn process_requests(
|
||||||
|
&self,
|
||||||
|
reqs: Vec<(Request, SocketAddr)>,
|
||||||
|
) -> Vec<(Response, SocketAddr)> {
|
||||||
|
reqs.into_iter()
|
||||||
|
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn notify_entry_info_subscribers(&self, entry: &Entry) {
|
||||||
|
// TODO: No need to bind().
|
||||||
|
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");
|
||||||
|
|
||||||
|
// copy subscribers to avoid taking lock while doing io
|
||||||
|
let addrs = self.entry_info_subscribers.lock().unwrap().clone();
|
||||||
|
trace!("Sending to {} addrs", addrs.len());
|
||||||
|
for addr in addrs {
|
||||||
|
let entry_info = EntryInfo {
|
||||||
|
id: entry.id,
|
||||||
|
num_hashes: entry.num_hashes,
|
||||||
|
num_events: entry.events.len() as u64,
|
||||||
|
};
|
||||||
|
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
|
||||||
|
trace!("sending {} to {}", data.len(), addr);
|
||||||
|
//TODO dont do IO here, this needs to be on a separate channel
|
||||||
|
let res = socket.send_to(&data, addr);
|
||||||
|
if res.is_err() {
|
||||||
|
eprintln!("couldn't send response: {:?}", res);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Tpu {
|
||||||
|
accounting: AccountingStage,
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
|
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
pub enum Request {
|
pub enum Request {
|
||||||
@ -80,59 +165,37 @@ pub enum Response {
|
|||||||
impl Tpu {
|
impl Tpu {
|
||||||
/// Create a new Tpu that wraps the given Accountant.
|
/// Create a new Tpu that wraps the given Accountant.
|
||||||
pub fn new(acc: Accountant, historian_input: SyncSender<Signal>, historian: Historian) -> Self {
|
pub fn new(acc: Accountant, historian_input: SyncSender<Signal>, historian: Historian) -> Self {
|
||||||
Tpu {
|
let accounting = AccountingStage::new(acc, historian_input, historian);
|
||||||
acc: Mutex::new(acc),
|
Tpu { accounting }
|
||||||
entry_info_subscribers: Mutex::new(vec![]),
|
|
||||||
historian_input: Mutex::new(historian_input),
|
|
||||||
historian,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn notify_entry_info_subscribers(obj: &SharedTpu, entry: &Entry) {
|
|
||||||
// TODO: No need to bind().
|
|
||||||
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");
|
|
||||||
|
|
||||||
// copy subscribers to avoid taking lock while doing io
|
|
||||||
let addrs = obj.entry_info_subscribers.lock().unwrap().clone();
|
|
||||||
trace!("Sending to {} addrs", addrs.len());
|
|
||||||
for addr in addrs {
|
|
||||||
let entry_info = EntryInfo {
|
|
||||||
id: entry.id,
|
|
||||||
num_hashes: entry.num_hashes,
|
|
||||||
num_events: entry.events.len() as u64,
|
|
||||||
};
|
|
||||||
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
|
|
||||||
trace!("sending {} to {}", data.len(), addr);
|
|
||||||
//TODO dont do IO here, this needs to be on a separate channel
|
|
||||||
let res = socket.send_to(&data, addr);
|
|
||||||
if res.is_err() {
|
|
||||||
eprintln!("couldn't send response: {:?}", res);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_entry<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>, entry: &Entry) {
|
fn update_entry<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>, entry: &Entry) {
|
||||||
trace!("update_entry entry");
|
trace!("update_entry entry");
|
||||||
obj.acc.lock().unwrap().register_entry_id(&entry.id);
|
obj.accounting
|
||||||
|
.acc
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.register_entry_id(&entry.id);
|
||||||
writeln!(
|
writeln!(
|
||||||
writer.lock().unwrap(),
|
writer.lock().unwrap(),
|
||||||
"{}",
|
"{}",
|
||||||
serde_json::to_string(&entry).unwrap()
|
serde_json::to_string(&entry).unwrap()
|
||||||
).unwrap();
|
).unwrap();
|
||||||
Self::notify_entry_info_subscribers(obj, &entry);
|
obj.accounting.notify_entry_info_subscribers(&entry);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn receive_all<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>) -> Result<Vec<Entry>> {
|
fn receive_all<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>) -> Result<Vec<Entry>> {
|
||||||
//TODO implement a serialize for channel that does this without allocations
|
//TODO implement a serialize for channel that does this without allocations
|
||||||
let mut l = vec![];
|
let mut l = vec![];
|
||||||
let entry = obj.historian
|
let entry = obj.accounting
|
||||||
|
.historian
|
||||||
.output
|
.output
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.recv_timeout(Duration::new(1, 0))?;
|
.recv_timeout(Duration::new(1, 0))?;
|
||||||
Self::update_entry(obj, writer, &entry);
|
Self::update_entry(obj, writer, &entry);
|
||||||
l.push(entry);
|
l.push(entry);
|
||||||
while let Ok(entry) = obj.historian.receive() {
|
while let Ok(entry) = obj.accounting.historian.receive() {
|
||||||
Self::update_entry(obj, writer, &entry);
|
Self::update_entry(obj, writer, &entry);
|
||||||
l.push(entry);
|
l.push(entry);
|
||||||
}
|
}
|
||||||
@ -247,33 +310,6 @@ impl Tpu {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process Request items sent by clients.
|
|
||||||
pub fn process_request(
|
|
||||||
&self,
|
|
||||||
msg: Request,
|
|
||||||
rsp_addr: SocketAddr,
|
|
||||||
) -> Option<(Response, SocketAddr)> {
|
|
||||||
match msg {
|
|
||||||
Request::GetBalance { key } => {
|
|
||||||
let val = self.acc.lock().unwrap().get_balance(&key);
|
|
||||||
let rsp = (Response::Balance { key, val }, rsp_addr);
|
|
||||||
info!("Response::Balance {:?}", rsp);
|
|
||||||
Some(rsp)
|
|
||||||
}
|
|
||||||
Request::Transaction(_) => unreachable!(),
|
|
||||||
Request::Subscribe { subscriptions } => {
|
|
||||||
for subscription in subscriptions {
|
|
||||||
match subscription {
|
|
||||||
Subscription::EntryInfo => {
|
|
||||||
self.entry_info_subscribers.lock().unwrap().push(rsp_addr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> {
|
fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> {
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
let msgs = recvr.recv_timeout(timer)?;
|
let msgs = recvr.recv_timeout(timer)?;
|
||||||
@ -365,22 +401,6 @@ impl Tpu {
|
|||||||
(events, reqs)
|
(events, reqs)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process the transactions in parallel and then log the successful ones.
|
|
||||||
fn process_events(&self, events: Vec<Event>) -> Result<()> {
|
|
||||||
let results = self.acc.lock().unwrap().process_verified_events(events);
|
|
||||||
let events = results.into_iter().filter_map(|x| x.ok()).collect();
|
|
||||||
let sender = self.historian_input.lock().unwrap();
|
|
||||||
sender.send(Signal::Events(events))?;
|
|
||||||
debug!("after historian_input");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn process_requests(&self, reqs: Vec<(Request, SocketAddr)>) -> Vec<(Response, SocketAddr)> {
|
|
||||||
reqs.into_iter()
|
|
||||||
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_response(
|
fn serialize_response(
|
||||||
resp: Response,
|
resp: Response,
|
||||||
rsp_addr: SocketAddr,
|
rsp_addr: SocketAddr,
|
||||||
@ -445,11 +465,11 @@ impl Tpu {
|
|||||||
debug!("events: {} reqs: {}", events.len(), reqs.len());
|
debug!("events: {} reqs: {}", events.len(), reqs.len());
|
||||||
|
|
||||||
debug!("process_events");
|
debug!("process_events");
|
||||||
obj.process_events(events)?;
|
obj.accounting.process_events(events)?;
|
||||||
debug!("done process_events");
|
debug!("done process_events");
|
||||||
|
|
||||||
debug!("process_requests");
|
debug!("process_requests");
|
||||||
let rsps = obj.process_requests(reqs);
|
let rsps = obj.accounting.process_requests(reqs);
|
||||||
debug!("done process_requests");
|
debug!("done process_requests");
|
||||||
|
|
||||||
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
|
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
|
||||||
@ -485,7 +505,7 @@ impl Tpu {
|
|||||||
for msgs in &blobs {
|
for msgs in &blobs {
|
||||||
let blob = msgs.read().unwrap();
|
let blob = msgs.read().unwrap();
|
||||||
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
|
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
|
||||||
let acc = obj.acc.lock().unwrap();
|
let acc = obj.accounting.acc.lock().unwrap();
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
acc.register_entry_id(&entry.id);
|
acc.register_entry_id(&entry.id);
|
||||||
for result in acc.process_verified_events(entry.events) {
|
for result in acc.process_verified_events(entry.events) {
|
||||||
@ -807,7 +827,7 @@ mod tests {
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use streamer;
|
use streamer;
|
||||||
use thin_client::ThinClient;
|
use thin_client::ThinClient;
|
||||||
use tpu::Tpu;
|
use tpu::{AccountingStage, Tpu};
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -845,22 +865,22 @@ mod tests {
|
|||||||
let acc = Accountant::new(&mint);
|
let acc = Accountant::new(&mint);
|
||||||
let (input, event_receiver) = sync_channel(10);
|
let (input, event_receiver) = sync_channel(10);
|
||||||
let historian = Historian::new(event_receiver, &mint.last_id(), None);
|
let historian = Historian::new(event_receiver, &mint.last_id(), None);
|
||||||
let tpu = Tpu::new(acc, input, historian);
|
let stage = AccountingStage::new(acc, input, historian);
|
||||||
|
|
||||||
// Process a batch that includes a transaction that receives two tokens.
|
// Process a batch that includes a transaction that receives two tokens.
|
||||||
let alice = KeyPair::new();
|
let alice = KeyPair::new();
|
||||||
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
|
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
|
||||||
let events = vec![Event::Transaction(tr)];
|
let events = vec![Event::Transaction(tr)];
|
||||||
assert!(tpu.process_events(events).is_ok());
|
assert!(stage.process_events(events).is_ok());
|
||||||
|
|
||||||
// Process a second batch that spends one of those tokens.
|
// Process a second batch that spends one of those tokens.
|
||||||
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
|
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
|
||||||
let events = vec![Event::Transaction(tr)];
|
let events = vec![Event::Transaction(tr)];
|
||||||
assert!(tpu.process_events(events).is_ok());
|
assert!(stage.process_events(events).is_ok());
|
||||||
|
|
||||||
// Collect the ledger and feed it to a new accountant.
|
// Collect the ledger and feed it to a new accountant.
|
||||||
drop(tpu.historian_input);
|
drop(stage.historian_input);
|
||||||
let entries: Vec<Entry> = tpu.historian.output.lock().unwrap().iter().collect();
|
let entries: Vec<Entry> = stage.historian.output.lock().unwrap().iter().collect();
|
||||||
|
|
||||||
// Assert the user holds one token, not two. If the server only output one
|
// Assert the user holds one token, not two. If the server only output one
|
||||||
// entry, then the second transaction will be rejected, because it drives
|
// entry, then the second transaction will be rejected, because it drives
|
||||||
@ -993,10 +1013,10 @@ mod tests {
|
|||||||
let acc = Accountant::new(&alice);
|
let acc = Accountant::new(&alice);
|
||||||
let (input, event_receiver) = sync_channel(10);
|
let (input, event_receiver) = sync_channel(10);
|
||||||
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
||||||
let acc = Arc::new(Tpu::new(acc, input, historian));
|
let tpu = Arc::new(Tpu::new(acc, input, historian));
|
||||||
let replicate_addr = target1_data.replicate_addr;
|
let replicate_addr = target1_data.replicate_addr;
|
||||||
let threads = Tpu::replicate(
|
let threads = Tpu::replicate(
|
||||||
&acc,
|
&tpu,
|
||||||
target1_data,
|
target1_data,
|
||||||
target1_gossip,
|
target1_gossip,
|
||||||
target1_serve,
|
target1_serve,
|
||||||
@ -1018,9 +1038,11 @@ mod tests {
|
|||||||
w.set_index(i).unwrap();
|
w.set_index(i).unwrap();
|
||||||
w.set_id(leader_id).unwrap();
|
w.set_id(leader_id).unwrap();
|
||||||
|
|
||||||
|
let acc = tpu.accounting.acc.lock().unwrap();
|
||||||
|
|
||||||
let tr0 = Event::new_timestamp(&bob_keypair, Utc::now());
|
let tr0 = Event::new_timestamp(&bob_keypair, Utc::now());
|
||||||
let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]);
|
let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]);
|
||||||
acc.acc.lock().unwrap().register_entry_id(&cur_hash);
|
acc.register_entry_id(&cur_hash);
|
||||||
cur_hash = hash(&cur_hash);
|
cur_hash = hash(&cur_hash);
|
||||||
|
|
||||||
let tr1 = Transaction::new(
|
let tr1 = Transaction::new(
|
||||||
@ -1029,11 +1051,11 @@ mod tests {
|
|||||||
transfer_amount,
|
transfer_amount,
|
||||||
cur_hash,
|
cur_hash,
|
||||||
);
|
);
|
||||||
acc.acc.lock().unwrap().register_entry_id(&cur_hash);
|
acc.register_entry_id(&cur_hash);
|
||||||
cur_hash = hash(&cur_hash);
|
cur_hash = hash(&cur_hash);
|
||||||
let entry1 =
|
let entry1 =
|
||||||
entry::create_entry(&cur_hash, i + num_blobs, vec![Event::Transaction(tr1)]);
|
entry::create_entry(&cur_hash, i + num_blobs, vec![Event::Transaction(tr1)]);
|
||||||
acc.acc.lock().unwrap().register_entry_id(&cur_hash);
|
acc.register_entry_id(&cur_hash);
|
||||||
cur_hash = hash(&cur_hash);
|
cur_hash = hash(&cur_hash);
|
||||||
|
|
||||||
alice_ref_balance -= transfer_amount;
|
alice_ref_balance -= transfer_amount;
|
||||||
@ -1058,18 +1080,11 @@ mod tests {
|
|||||||
msgs.push(msg);
|
msgs.push(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
let alice_balance = acc.acc
|
let acc = tpu.accounting.acc.lock().unwrap();
|
||||||
.lock()
|
let alice_balance = acc.get_balance(&alice.keypair().pubkey()).unwrap();
|
||||||
.unwrap()
|
|
||||||
.get_balance(&alice.keypair().pubkey())
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(alice_balance, alice_ref_balance);
|
assert_eq!(alice_balance, alice_ref_balance);
|
||||||
|
|
||||||
let bob_balance = acc.acc
|
let bob_balance = acc.get_balance(&bob_keypair.pubkey()).unwrap();
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.get_balance(&bob_keypair.pubkey())
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(bob_balance, starting_balance - alice_ref_balance);
|
assert_eq!(bob_balance, starting_balance - alice_ref_balance);
|
||||||
|
|
||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
@ -1164,17 +1179,17 @@ mod bench {
|
|||||||
|
|
||||||
let (input, event_receiver) = sync_channel(10);
|
let (input, event_receiver) = sync_channel(10);
|
||||||
let historian = Historian::new(event_receiver, &mint.last_id(), None);
|
let historian = Historian::new(event_receiver, &mint.last_id(), None);
|
||||||
let tpu = Tpu::new(acc, input, historian);
|
let stage = AccountingStage::new(acc, input, historian);
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
assert!(tpu.process_events(req_vers).is_ok());
|
assert!(stage.process_events(req_vers).is_ok());
|
||||||
let duration = now.elapsed();
|
let duration = now.elapsed();
|
||||||
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
|
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
|
||||||
let tps = txs as f64 / sec;
|
let tps = txs as f64 / sec;
|
||||||
|
|
||||||
// Ensure that all transactions were successfully logged.
|
// Ensure that all transactions were successfully logged.
|
||||||
drop(tpu.historian_input);
|
drop(stage.historian_input);
|
||||||
let entries: Vec<Entry> = tpu.historian.output.lock().unwrap().iter().collect();
|
let entries: Vec<Entry> = stage.historian.output.lock().unwrap().iter().collect();
|
||||||
assert_eq!(entries.len(), 1);
|
assert_eq!(entries.len(), 1);
|
||||||
assert_eq!(entries[0].events.len(), txs as usize);
|
assert_eq!(entries[0].events.len(), txs as usize);
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user