diff --git a/src/tpu.rs b/src/tpu.rs index d42c3bcf13..0badedf3d5 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -42,30 +42,32 @@ impl Tpu { } } - fn update_entry(obj: &Tpu, writer: &Mutex, entry: &Entry) { + fn update_entry(&self, writer: &Mutex, entry: &Entry) { trace!("update_entry entry"); - obj.accounting_stage.accountant.register_entry_id(&entry.id); + self.accounting_stage + .accountant + .register_entry_id(&entry.id); writeln!( writer.lock().unwrap(), "{}", serde_json::to_string(&entry).unwrap() ).unwrap(); - obj.thin_client_service + self.thin_client_service .notify_entry_info_subscribers(&entry); } - fn receive_all(obj: &Tpu, writer: &Mutex) -> Result> { + fn receive_all(&self, writer: &Mutex) -> Result> { //TODO implement a serialize for channel that does this without allocations let mut l = vec![]; - let entry = obj.accounting_stage + let entry = self.accounting_stage .output .lock() .unwrap() .recv_timeout(Duration::new(1, 0))?; - Self::update_entry(obj, writer, &entry); + self.update_entry(writer, &entry); l.push(entry); - while let Ok(entry) = obj.accounting_stage.output.lock().unwrap().try_recv() { - Self::update_entry(obj, writer, &entry); + while let Ok(entry) = self.accounting_stage.output.lock().unwrap().try_recv() { + self.update_entry(writer, &entry); l.push(entry); } Ok(l) @@ -74,13 +76,13 @@ impl Tpu { /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out fn run_sync( - obj: SharedTpu, + &self, broadcast: &streamer::BlobSender, blob_recycler: &packet::BlobRecycler, writer: &Mutex, ) -> Result<()> { let mut q = VecDeque::new(); - let list = Self::receive_all(&obj, writer)?; + let list = self.receive_all(writer)?; trace!("New blobs? {}", list.len()); ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q); if !q.is_empty() { @@ -97,7 +99,7 @@ impl Tpu { writer: Mutex, ) -> JoinHandle<()> { spawn(move || loop { - let _ = Self::run_sync(obj.clone(), &broadcast, &blob_recycler, &writer); + let _ = obj.run_sync(&broadcast, &blob_recycler, &writer); if exit.load(Ordering::Relaxed) { info!("sync_service exiting"); break; @@ -107,14 +109,14 @@ impl Tpu { /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out - fn run_sync_no_broadcast(obj: SharedTpu) -> Result<()> { - Self::receive_all(&obj, &Arc::new(Mutex::new(sink())))?; + fn run_sync_no_broadcast(&self) -> Result<()> { + self.receive_all(&Arc::new(Mutex::new(sink())))?; Ok(()) } pub fn sync_no_broadcast_service(obj: SharedTpu, exit: Arc) -> JoinHandle<()> { spawn(move || loop { - let _ = Self::run_sync_no_broadcast(obj.clone()); + let _ = obj.run_sync_no_broadcast(); if exit.load(Ordering::Relaxed) { info!("sync_no_broadcast_service exiting"); break;