Reference count the accountant

So that the thin client can reference the AccountingStage's accountant
from separate threads.
This commit is contained in:
Greg Fitzgerald
2018-05-09 14:32:12 -06:00
parent 4223aff840
commit bc824c1a6c
2 changed files with 8 additions and 7 deletions

View File

@ -11,13 +11,13 @@ use result::Result;
use signature::PublicKey; use signature::PublicKey;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Mutex; use std::sync::{Arc, Mutex};
use transaction::Transaction; use transaction::Transaction;
pub struct AccountingStage { pub struct AccountingStage {
pub output: Mutex<Receiver<Entry>>, pub output: Mutex<Receiver<Entry>>,
entry_sender: Mutex<Sender<Entry>>, entry_sender: Mutex<Sender<Entry>>,
pub acc: Accountant, pub acc: Arc<Accountant>,
historian_input: Mutex<Sender<Signal>>, historian_input: Mutex<Sender<Signal>>,
historian: Mutex<Historian>, historian: Mutex<Historian>,
entry_info_subscribers: Mutex<Vec<SocketAddr>>, entry_info_subscribers: Mutex<Vec<SocketAddr>>,
@ -32,7 +32,7 @@ impl AccountingStage {
AccountingStage { AccountingStage {
output: Mutex::new(output), output: Mutex::new(output),
entry_sender: Mutex::new(entry_sender), entry_sender: Mutex::new(entry_sender),
acc, acc: Arc::new(acc),
entry_info_subscribers: Mutex::new(vec![]), entry_info_subscribers: Mutex::new(vec![]),
historian_input: Mutex::new(historian_input), historian_input: Mutex::new(historian_input),
historian: Mutex::new(historian), historian: Mutex::new(historian),

View File

@ -1,6 +1,7 @@
//! The `tpu` module implements the Transaction Processing Unit, a //! The `tpu` module implements the Transaction Processing Unit, a
//! 5-stage transaction processing pipeline in software. //! 5-stage transaction processing pipeline in software.
use accountant::Accountant;
use accounting_stage::{AccountingStage, Request, Response}; use accounting_stage::{AccountingStage, Request, Response};
use bincode::{deserialize, serialize, serialize_into}; use bincode::{deserialize, serialize, serialize_into};
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, ReplicatedData};
@ -140,17 +141,17 @@ impl Tpu {
}) })
} }
fn process_thin_client_requests(_obj: SharedTpu, _socket: &UdpSocket) -> Result<()> { fn process_thin_client_requests(_acc: &Arc<Accountant>, _socket: &UdpSocket) -> Result<()> {
Ok(()) Ok(())
} }
fn thin_client_service( fn thin_client_service(
obj: SharedTpu, acc: Arc<Accountant>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
socket: UdpSocket, socket: UdpSocket,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
let _ = Self::process_thin_client_requests(obj.clone(), &socket); let _ = Self::process_thin_client_requests(&acc, &socket);
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
info!("sync_service exiting"); info!("sync_service exiting");
break; break;
@ -455,7 +456,7 @@ impl Tpu {
Arc::new(Mutex::new(writer)), Arc::new(Mutex::new(writer)),
); );
let t_skinny = Self::thin_client_service(obj.clone(), exit.clone(), skinny); let t_skinny = Self::thin_client_service(obj.accounting.acc.clone(), exit.clone(), skinny);
let tpu = obj.clone(); let tpu = obj.clone();
let t_server = spawn(move || loop { let t_server = spawn(move || loop {