diff --git a/src/ledger_write_stage.rs b/src/ledger_write_stage.rs index 6703c905cd..c55965689e 100644 --- a/src/ledger_write_stage.rs +++ b/src/ledger_write_stage.rs @@ -10,7 +10,8 @@ use service::Service; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::RecvTimeoutError; use std::thread::{self, Builder, JoinHandle}; -use std::time::Duration; +use std::time::{Duration, Instant}; +use timing::duration_as_ms; pub struct LedgerWriteStage { write_thread: JoinHandle<()>, @@ -24,8 +25,11 @@ impl LedgerWriteStage { ) -> Result<()> { let mut ventries = Vec::new(); let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; + let mut num_new_entries = 0; + let now = Instant::now(); loop { + num_new_entries += received_entries.len(); ventries.push(received_entries); if let Ok(n) = entry_receiver.try_recv() { @@ -39,11 +43,16 @@ impl LedgerWriteStage { ledger_writer.write_entries(ventries.iter().flatten())?; } + inc_new_counter_info!("ledger_writer_stage-entries_received", num_new_entries); if let Some(forwarder) = forwarder { for entries in ventries { forwarder.send(entries)?; } } + inc_new_counter_info!( + "ledger_writer_stage-time_ms", + duration_as_ms(&now.elapsed()) as usize + ); Ok(()) } @@ -65,7 +74,7 @@ impl LedgerWriteStage { Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), _ => { inc_new_counter_info!( - "ledger-write_stage-write_and_send_entries-error", + "ledger_writer_stage-write_and_send_entries-error", 1 ); error!("{:?}", e);