Add ledger write stage counters (#1713)
This commit is contained in:
@ -10,7 +10,8 @@ use service::Service;
|
|||||||
use std::sync::atomic::AtomicUsize;
|
use std::sync::atomic::AtomicUsize;
|
||||||
use std::sync::mpsc::RecvTimeoutError;
|
use std::sync::mpsc::RecvTimeoutError;
|
||||||
use std::thread::{self, Builder, JoinHandle};
|
use std::thread::{self, Builder, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
|
use timing::duration_as_ms;
|
||||||
|
|
||||||
pub struct LedgerWriteStage {
|
pub struct LedgerWriteStage {
|
||||||
write_thread: JoinHandle<()>,
|
write_thread: JoinHandle<()>,
|
||||||
@ -24,8 +25,11 @@ impl LedgerWriteStage {
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut ventries = Vec::new();
|
let mut ventries = Vec::new();
|
||||||
let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
|
let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
|
||||||
|
let mut num_new_entries = 0;
|
||||||
|
let now = Instant::now();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
num_new_entries += received_entries.len();
|
||||||
ventries.push(received_entries);
|
ventries.push(received_entries);
|
||||||
|
|
||||||
if let Ok(n) = entry_receiver.try_recv() {
|
if let Ok(n) = entry_receiver.try_recv() {
|
||||||
@ -39,11 +43,16 @@ impl LedgerWriteStage {
|
|||||||
ledger_writer.write_entries(ventries.iter().flatten())?;
|
ledger_writer.write_entries(ventries.iter().flatten())?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inc_new_counter_info!("ledger_writer_stage-entries_received", num_new_entries);
|
||||||
if let Some(forwarder) = forwarder {
|
if let Some(forwarder) = forwarder {
|
||||||
for entries in ventries {
|
for entries in ventries {
|
||||||
forwarder.send(entries)?;
|
forwarder.send(entries)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
inc_new_counter_info!(
|
||||||
|
"ledger_writer_stage-time_ms",
|
||||||
|
duration_as_ms(&now.elapsed()) as usize
|
||||||
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -65,7 +74,7 @@ impl LedgerWriteStage {
|
|||||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||||
_ => {
|
_ => {
|
||||||
inc_new_counter_info!(
|
inc_new_counter_info!(
|
||||||
"ledger-write_stage-write_and_send_entries-error",
|
"ledger_writer_stage-write_and_send_entries-error",
|
||||||
1
|
1
|
||||||
);
|
);
|
||||||
error!("{:?}", e);
|
error!("{:?}", e);
|
||||||
|
Reference in New Issue
Block a user