Move the writer stage's utilities to its own module
This commit is contained in:
94
src/entry_writer.rs
Normal file
94
src/entry_writer.rs
Normal file
@ -0,0 +1,94 @@
|
|||||||
|
//! The `entry_writer` module helps implement the TPU's write stage.
|
||||||
|
|
||||||
|
use accounting_stage::AccountingStage;
|
||||||
|
use entry::Entry;
|
||||||
|
use ledger;
|
||||||
|
use packet;
|
||||||
|
use result::Result;
|
||||||
|
use serde_json;
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use std::io::Write;
|
||||||
|
use std::io::sink;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::Duration;
|
||||||
|
use streamer;
|
||||||
|
use thin_client_service::ThinClientService;
|
||||||
|
|
||||||
|
pub struct EntryWriter<'a> {
|
||||||
|
accounting_stage: &'a AccountingStage,
|
||||||
|
thin_client_service: &'a ThinClientService,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> EntryWriter<'a> {
|
||||||
|
/// Create a new Tpu that wraps the given Accountant.
|
||||||
|
pub fn new(
|
||||||
|
accounting_stage: &'a AccountingStage,
|
||||||
|
thin_client_service: &'a ThinClientService,
|
||||||
|
) -> Self {
|
||||||
|
EntryWriter {
|
||||||
|
accounting_stage,
|
||||||
|
thin_client_service,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) {
|
||||||
|
trace!("write_entry entry");
|
||||||
|
self.accounting_stage
|
||||||
|
.accountant
|
||||||
|
.register_entry_id(&entry.id);
|
||||||
|
writeln!(
|
||||||
|
writer.lock().expect("'writer' lock in fn fn write_entry"),
|
||||||
|
"{}",
|
||||||
|
serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry")
|
||||||
|
).expect("writeln! in fn write_entry");
|
||||||
|
self.thin_client_service
|
||||||
|
.notify_entry_info_subscribers(&entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_entries<W: Write>(&self, writer: &Mutex<W>) -> Result<Vec<Entry>> {
|
||||||
|
//TODO implement a serialize for channel that does this without allocations
|
||||||
|
let mut l = vec![];
|
||||||
|
let entry = self.accounting_stage
|
||||||
|
.output
|
||||||
|
.lock()
|
||||||
|
.expect("'ouput' lock in fn receive_all")
|
||||||
|
.recv_timeout(Duration::new(1, 0))?;
|
||||||
|
self.write_entry(writer, &entry);
|
||||||
|
l.push(entry);
|
||||||
|
while let Ok(entry) = self.accounting_stage
|
||||||
|
.output
|
||||||
|
.lock()
|
||||||
|
.expect("'output' lock in fn write_entries")
|
||||||
|
.try_recv()
|
||||||
|
{
|
||||||
|
self.write_entry(writer, &entry);
|
||||||
|
l.push(entry);
|
||||||
|
}
|
||||||
|
Ok(l)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process any Entry items that have been published by the Historian.
|
||||||
|
/// continuosly broadcast blobs of entries out
|
||||||
|
pub fn write_and_send_entries<W: Write>(
|
||||||
|
&self,
|
||||||
|
broadcast: &streamer::BlobSender,
|
||||||
|
blob_recycler: &packet::BlobRecycler,
|
||||||
|
writer: &Mutex<W>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut q = VecDeque::new();
|
||||||
|
let list = self.write_entries(writer)?;
|
||||||
|
trace!("New blobs? {}", list.len());
|
||||||
|
ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
|
||||||
|
if !q.is_empty() {
|
||||||
|
broadcast.send(q)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process any Entry items that have been published by the Historian.
|
||||||
|
/// continuosly broadcast blobs of entries out
|
||||||
|
pub fn drain_entries(&self) -> Result<()> {
|
||||||
|
self.write_entries(&Arc::new(Mutex::new(sink())))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
@ -4,6 +4,7 @@ pub mod accounting_stage;
|
|||||||
pub mod crdt;
|
pub mod crdt;
|
||||||
pub mod ecdsa;
|
pub mod ecdsa;
|
||||||
pub mod entry;
|
pub mod entry;
|
||||||
|
pub mod entry_writer;
|
||||||
#[cfg(feature = "erasure")]
|
#[cfg(feature = "erasure")]
|
||||||
pub mod erasure;
|
pub mod erasure;
|
||||||
pub mod event;
|
pub mod event;
|
||||||
|
76
src/tpu.rs
76
src/tpu.rs
@ -3,16 +3,13 @@
|
|||||||
|
|
||||||
use accounting_stage::AccountingStage;
|
use accounting_stage::AccountingStage;
|
||||||
use crdt::{Crdt, ReplicatedData};
|
use crdt::{Crdt, ReplicatedData};
|
||||||
use entry::Entry;
|
use entry_writer::EntryWriter;
|
||||||
use ledger;
|
use ledger;
|
||||||
use packet;
|
use packet;
|
||||||
use packet::SharedPackets;
|
use packet::SharedPackets;
|
||||||
use result::Result;
|
use result::Result;
|
||||||
use serde_json;
|
|
||||||
use sig_verify_stage::SigVerifyStage;
|
use sig_verify_stage::SigVerifyStage;
|
||||||
use std::collections::VecDeque;
|
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::io::sink;
|
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::{channel, Receiver};
|
use std::sync::mpsc::{channel, Receiver};
|
||||||
@ -39,60 +36,6 @@ impl Tpu {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) {
|
|
||||||
trace!("write_entry entry");
|
|
||||||
self.accounting_stage
|
|
||||||
.accountant
|
|
||||||
.register_entry_id(&entry.id);
|
|
||||||
writeln!(
|
|
||||||
writer.lock().expect("'writer' lock in fn fn write_entry"),
|
|
||||||
"{}",
|
|
||||||
serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry")
|
|
||||||
).expect("writeln! in fn write_entry");
|
|
||||||
self.thin_client_service
|
|
||||||
.notify_entry_info_subscribers(&entry);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn write_entries<W: Write>(&self, writer: &Mutex<W>) -> Result<Vec<Entry>> {
|
|
||||||
//TODO implement a serialize for channel that does this without allocations
|
|
||||||
let mut l = vec![];
|
|
||||||
let entry = self.accounting_stage
|
|
||||||
.output
|
|
||||||
.lock()
|
|
||||||
.expect("'ouput' lock in fn receive_all")
|
|
||||||
.recv_timeout(Duration::new(1, 0))?;
|
|
||||||
self.write_entry(writer, &entry);
|
|
||||||
l.push(entry);
|
|
||||||
while let Ok(entry) = self.accounting_stage
|
|
||||||
.output
|
|
||||||
.lock()
|
|
||||||
.expect("'output' lock in fn write_entries")
|
|
||||||
.try_recv()
|
|
||||||
{
|
|
||||||
self.write_entry(writer, &entry);
|
|
||||||
l.push(entry);
|
|
||||||
}
|
|
||||||
Ok(l)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Process any Entry items that have been published by the Historian.
|
|
||||||
/// continuosly broadcast blobs of entries out
|
|
||||||
fn write_and_send_entries<W: Write>(
|
|
||||||
&self,
|
|
||||||
broadcast: &streamer::BlobSender,
|
|
||||||
blob_recycler: &packet::BlobRecycler,
|
|
||||||
writer: &Mutex<W>,
|
|
||||||
) -> Result<()> {
|
|
||||||
let mut q = VecDeque::new();
|
|
||||||
let list = self.write_entries(writer)?;
|
|
||||||
trace!("New blobs? {}", list.len());
|
|
||||||
ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
|
|
||||||
if !q.is_empty() {
|
|
||||||
broadcast.send(q)?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn write_service<W: Write + Send + 'static>(
|
pub fn write_service<W: Write + Send + 'static>(
|
||||||
obj: SharedTpu,
|
obj: SharedTpu,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
@ -101,7 +44,8 @@ impl Tpu {
|
|||||||
writer: Mutex<W>,
|
writer: Mutex<W>,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
spawn(move || loop {
|
spawn(move || loop {
|
||||||
let _ = obj.write_and_send_entries(&broadcast, &blob_recycler, &writer);
|
let entry_writer = EntryWriter::new(&obj.accounting_stage, &obj.thin_client_service);
|
||||||
|
let _ = entry_writer.write_and_send_entries(&broadcast, &blob_recycler, &writer);
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
info!("broadcat_service exiting");
|
info!("broadcat_service exiting");
|
||||||
break;
|
break;
|
||||||
@ -109,20 +53,16 @@ impl Tpu {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process any Entry items that have been published by the Historian.
|
|
||||||
/// continuosly broadcast blobs of entries out
|
|
||||||
fn drain_entries(&self) -> Result<()> {
|
|
||||||
self.write_entries(&Arc::new(Mutex::new(sink())))?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn drain_service(obj: SharedTpu, exit: Arc<AtomicBool>) -> JoinHandle<()> {
|
pub fn drain_service(obj: SharedTpu, exit: Arc<AtomicBool>) -> JoinHandle<()> {
|
||||||
spawn(move || loop {
|
spawn(move || {
|
||||||
let _ = obj.drain_entries();
|
let entry_writer = EntryWriter::new(&obj.accounting_stage, &obj.thin_client_service);
|
||||||
|
loop {
|
||||||
|
let _ = entry_writer.drain_entries();
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
info!("drain_service exiting");
|
info!("drain_service exiting");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user