More idiomatic Rust
This commit is contained in:
30
src/tpu.rs
30
src/tpu.rs
@ -42,30 +42,32 @@ impl Tpu {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_entry<W: Write>(obj: &Tpu, writer: &Mutex<W>, entry: &Entry) {
|
fn update_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) {
|
||||||
trace!("update_entry entry");
|
trace!("update_entry entry");
|
||||||
obj.accounting_stage.accountant.register_entry_id(&entry.id);
|
self.accounting_stage
|
||||||
|
.accountant
|
||||||
|
.register_entry_id(&entry.id);
|
||||||
writeln!(
|
writeln!(
|
||||||
writer.lock().unwrap(),
|
writer.lock().unwrap(),
|
||||||
"{}",
|
"{}",
|
||||||
serde_json::to_string(&entry).unwrap()
|
serde_json::to_string(&entry).unwrap()
|
||||||
).unwrap();
|
).unwrap();
|
||||||
obj.thin_client_service
|
self.thin_client_service
|
||||||
.notify_entry_info_subscribers(&entry);
|
.notify_entry_info_subscribers(&entry);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn receive_all<W: Write>(obj: &Tpu, writer: &Mutex<W>) -> Result<Vec<Entry>> {
|
fn receive_all<W: Write>(&self, writer: &Mutex<W>) -> Result<Vec<Entry>> {
|
||||||
//TODO implement a serialize for channel that does this without allocations
|
//TODO implement a serialize for channel that does this without allocations
|
||||||
let mut l = vec![];
|
let mut l = vec![];
|
||||||
let entry = obj.accounting_stage
|
let entry = self.accounting_stage
|
||||||
.output
|
.output
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.recv_timeout(Duration::new(1, 0))?;
|
.recv_timeout(Duration::new(1, 0))?;
|
||||||
Self::update_entry(obj, writer, &entry);
|
self.update_entry(writer, &entry);
|
||||||
l.push(entry);
|
l.push(entry);
|
||||||
while let Ok(entry) = obj.accounting_stage.output.lock().unwrap().try_recv() {
|
while let Ok(entry) = self.accounting_stage.output.lock().unwrap().try_recv() {
|
||||||
Self::update_entry(obj, writer, &entry);
|
self.update_entry(writer, &entry);
|
||||||
l.push(entry);
|
l.push(entry);
|
||||||
}
|
}
|
||||||
Ok(l)
|
Ok(l)
|
||||||
@ -74,13 +76,13 @@ impl Tpu {
|
|||||||
/// Process any Entry items that have been published by the Historian.
|
/// Process any Entry items that have been published by the Historian.
|
||||||
/// continuosly broadcast blobs of entries out
|
/// continuosly broadcast blobs of entries out
|
||||||
fn run_sync<W: Write>(
|
fn run_sync<W: Write>(
|
||||||
obj: SharedTpu,
|
&self,
|
||||||
broadcast: &streamer::BlobSender,
|
broadcast: &streamer::BlobSender,
|
||||||
blob_recycler: &packet::BlobRecycler,
|
blob_recycler: &packet::BlobRecycler,
|
||||||
writer: &Mutex<W>,
|
writer: &Mutex<W>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut q = VecDeque::new();
|
let mut q = VecDeque::new();
|
||||||
let list = Self::receive_all(&obj, writer)?;
|
let list = self.receive_all(writer)?;
|
||||||
trace!("New blobs? {}", list.len());
|
trace!("New blobs? {}", list.len());
|
||||||
ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
|
ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
|
||||||
if !q.is_empty() {
|
if !q.is_empty() {
|
||||||
@ -97,7 +99,7 @@ impl Tpu {
|
|||||||
writer: Mutex<W>,
|
writer: Mutex<W>,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
spawn(move || loop {
|
spawn(move || loop {
|
||||||
let _ = Self::run_sync(obj.clone(), &broadcast, &blob_recycler, &writer);
|
let _ = obj.run_sync(&broadcast, &blob_recycler, &writer);
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
info!("sync_service exiting");
|
info!("sync_service exiting");
|
||||||
break;
|
break;
|
||||||
@ -107,14 +109,14 @@ impl Tpu {
|
|||||||
|
|
||||||
/// Process any Entry items that have been published by the Historian.
|
/// Process any Entry items that have been published by the Historian.
|
||||||
/// continuosly broadcast blobs of entries out
|
/// continuosly broadcast blobs of entries out
|
||||||
fn run_sync_no_broadcast(obj: SharedTpu) -> Result<()> {
|
fn run_sync_no_broadcast(&self) -> Result<()> {
|
||||||
Self::receive_all(&obj, &Arc::new(Mutex::new(sink())))?;
|
self.receive_all(&Arc::new(Mutex::new(sink())))?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn sync_no_broadcast_service(obj: SharedTpu, exit: Arc<AtomicBool>) -> JoinHandle<()> {
|
pub fn sync_no_broadcast_service(obj: SharedTpu, exit: Arc<AtomicBool>) -> JoinHandle<()> {
|
||||||
spawn(move || loop {
|
spawn(move || loop {
|
||||||
let _ = Self::run_sync_no_broadcast(obj.clone());
|
let _ = obj.run_sync_no_broadcast();
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
info!("sync_no_broadcast_service exiting");
|
info!("sync_no_broadcast_service exiting");
|
||||||
break;
|
break;
|
||||||
|
Reference in New Issue
Block a user