Check for log level before doing perf counter work
Perf counters, especially when running the dynamic test can cause functions like crdt::apply_updates to be really slow (>500ms).
This commit is contained in:
parent
c64e2acf8b
commit
38be61bd22
@ -11,6 +11,7 @@ use entry::Entry;
|
||||
use hash::Hash;
|
||||
use itertools::Itertools;
|
||||
use ledger::Block;
|
||||
use log::Level;
|
||||
use mint::Mint;
|
||||
use payment_plan::{Payment, PaymentPlan, Witness};
|
||||
use signature::{KeyPair, PublicKey, Signature};
|
||||
@ -224,9 +225,9 @@ impl Bank {
|
||||
let option = bals.get_mut(&tx.from);
|
||||
if option.is_none() {
|
||||
if let Instruction::NewVote(_) = &tx.instruction {
|
||||
inc_new_counter!("bank-appy_debits-vote_account_not_found", 1);
|
||||
inc_new_counter_info!("bank-appy_debits-vote_account_not_found", 1);
|
||||
} else {
|
||||
inc_new_counter!("bank-appy_debits-generic_account_not_found", 1);
|
||||
inc_new_counter_info!("bank-appy_debits-generic_account_not_found", 1);
|
||||
}
|
||||
return Err(BankError::AccountNotFound(tx.from));
|
||||
}
|
||||
|
@ -5,6 +5,7 @@
|
||||
use bank::Bank;
|
||||
use bincode::deserialize;
|
||||
use counter::Counter;
|
||||
use log::Level;
|
||||
use packet::{PacketRecycler, Packets, SharedPackets};
|
||||
use rayon::prelude::*;
|
||||
use record_stage::Signal;
|
||||
@ -43,7 +44,7 @@ fn recv_multiple_packets(
|
||||
mms.append(&mut nq);
|
||||
|
||||
if recv_tries >= max_tries {
|
||||
inc_new_counter!("banking_stage-max_packets_coalesced", 1);
|
||||
inc_new_counter_info!("banking_stage-max_packets_coalesced", 1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -165,8 +166,8 @@ impl BankingStage {
|
||||
reqs_len,
|
||||
(reqs_len as f32) / (total_time_s)
|
||||
);
|
||||
inc_new_counter!("banking_stage-process_packets", count);
|
||||
inc_new_counter!(
|
||||
inc_new_counter_info!("banking_stage-process_packets", count);
|
||||
inc_new_counter_info!(
|
||||
"banking_stage-process_transactions",
|
||||
bank.transaction_count() - bank_starting_tx_count
|
||||
);
|
||||
|
@ -34,14 +34,21 @@ macro_rules! inc_counter {
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! inc_new_counter {
|
||||
macro_rules! inc_new_counter_info {
|
||||
($name:expr, $count:expr) => {{
|
||||
static mut INC_NEW_COUNTER: Counter = create_counter!($name, 0);
|
||||
inc_counter!(INC_NEW_COUNTER, $count);
|
||||
inc_new_counter!($name, $count, Level::Info, 0);
|
||||
}};
|
||||
($name:expr, $count:expr, $lograte:expr) => {{
|
||||
static mut INC_NEW_COUNTER: Counter = create_counter!($name, $lograte);
|
||||
inc_counter!(INC_NEW_COUNTER, $count);
|
||||
inc_new_counter!($name, $count, Level::Info, $lograte);
|
||||
}};
|
||||
}
|
||||
|
||||
macro_rules! inc_new_counter {
|
||||
($name:expr, $count:expr, $level:expr, $lograte:expr) => {{
|
||||
if log_enabled!($level) {
|
||||
static mut INC_NEW_COUNTER: Counter = create_counter!($name, $lograte);
|
||||
inc_counter!(INC_NEW_COUNTER, $count);
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
@ -89,6 +96,7 @@ impl Counter {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use counter::{Counter, DEFAULT_METRICS_RATE};
|
||||
use log::Level;
|
||||
use std::env;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Once, RwLock, ONCE_INIT};
|
||||
@ -134,8 +142,8 @@ mod tests {
|
||||
let _readlock = get_env_lock().read();
|
||||
//make sure that macros are syntactically correct
|
||||
//the variable is internal to the macro scope so there is no way to introspect it
|
||||
inc_new_counter!("counter-1", 1);
|
||||
inc_new_counter!("counter-2", 1, 2);
|
||||
inc_new_counter_info!("counter-1", 1);
|
||||
inc_new_counter_info!("counter-2", 1, 2);
|
||||
}
|
||||
#[test]
|
||||
fn test_lograte() {
|
||||
|
33
src/crdt.rs
33
src/crdt.rs
@ -19,6 +19,7 @@ use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerSt
|
||||
use counter::Counter;
|
||||
use hash::Hash;
|
||||
use ledger::LedgerWindow;
|
||||
use log::Level;
|
||||
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
|
||||
use pnet_datalink as datalink;
|
||||
use rand::{thread_rng, RngCore};
|
||||
@ -382,7 +383,7 @@ impl Crdt {
|
||||
self.debug_id(),
|
||||
make_debug_id(&pubkey)
|
||||
);
|
||||
inc_new_counter!("crdt-insert_vote-leader_voted", 1);
|
||||
inc_new_counter_info!("crdt-insert_vote-leader_voted", 1);
|
||||
}
|
||||
|
||||
if v.version <= self.table[pubkey].version {
|
||||
@ -408,7 +409,7 @@ impl Crdt {
|
||||
}
|
||||
}
|
||||
pub fn insert_votes(&mut self, votes: &[(PublicKey, Vote, Hash)]) {
|
||||
inc_new_counter!("crdt-vote-count", votes.len());
|
||||
inc_new_counter_info!("crdt-vote-count", votes.len());
|
||||
if !votes.is_empty() {
|
||||
info!("{:x}: INSERTING VOTES {}", self.debug_id(), votes.len());
|
||||
}
|
||||
@ -429,13 +430,13 @@ impl Crdt {
|
||||
v.version
|
||||
);
|
||||
if self.table.get(&v.id).is_none() {
|
||||
inc_new_counter!("crdt-insert-new_entry", 1, 1);
|
||||
inc_new_counter_info!("crdt-insert-new_entry", 1, 1);
|
||||
}
|
||||
|
||||
self.update_index += 1;
|
||||
let _ = self.table.insert(v.id, v.clone());
|
||||
let _ = self.local.insert(v.id, self.update_index);
|
||||
inc_new_counter!("crdt-update-count", 1);
|
||||
inc_new_counter_info!("crdt-update-count", 1);
|
||||
self.update_liveness(v.id);
|
||||
} else {
|
||||
trace!(
|
||||
@ -493,7 +494,7 @@ impl Crdt {
|
||||
})
|
||||
.collect();
|
||||
|
||||
inc_new_counter!("crdt-purge-count", dead_ids.len());
|
||||
inc_new_counter_info!("crdt-purge-count", dead_ids.len());
|
||||
|
||||
for id in &dead_ids {
|
||||
self.alive.remove(id);
|
||||
@ -511,7 +512,7 @@ impl Crdt {
|
||||
self.debug_id(),
|
||||
make_debug_id(id),
|
||||
);
|
||||
inc_new_counter!("crdt-purge-purged_leader", 1, 1);
|
||||
inc_new_counter_info!("crdt-purge-purged_leader", 1, 1);
|
||||
self.set_leader(PublicKey::default());
|
||||
}
|
||||
}
|
||||
@ -565,7 +566,7 @@ impl Crdt {
|
||||
) -> Result<()> {
|
||||
if broadcast_table.is_empty() {
|
||||
warn!("{:x}:not enough peers in crdt table", me.debug_id());
|
||||
inc_new_counter!("crdt-broadcast-not_enough_peers_error", 1);
|
||||
inc_new_counter_info!("crdt-broadcast-not_enough_peers_error", 1);
|
||||
Err(CrdtError::NoPeers)?;
|
||||
}
|
||||
trace!(
|
||||
@ -661,7 +662,7 @@ impl Crdt {
|
||||
transmit_index.data += 1;
|
||||
}
|
||||
}
|
||||
inc_new_counter!(
|
||||
inc_new_counter_info!(
|
||||
"crdt-broadcast-max_idx",
|
||||
(transmit_index.data - old_transmit_index) as usize
|
||||
);
|
||||
@ -717,7 +718,7 @@ impl Crdt {
|
||||
.collect();
|
||||
for e in errs {
|
||||
if let Err(e) = &e {
|
||||
inc_new_counter!("crdt-retransmit-send_to_error", 1, 1);
|
||||
inc_new_counter_info!("crdt-retransmit-send_to_error", 1, 1);
|
||||
error!("retransmit result {:?}", e);
|
||||
}
|
||||
e?;
|
||||
@ -999,11 +1000,11 @@ impl Crdt {
|
||||
outblob.meta.set_addr(&from.contact_info.tvu_window);
|
||||
outblob.set_id(sender_id).expect("blob set_id");
|
||||
}
|
||||
inc_new_counter!("crdt-window-request-pass", 1);
|
||||
inc_new_counter_info!("crdt-window-request-pass", 1);
|
||||
|
||||
return Some(out);
|
||||
} else {
|
||||
inc_new_counter!("crdt-window-request-outside", 1);
|
||||
inc_new_counter_info!("crdt-window-request-outside", 1);
|
||||
info!(
|
||||
"requested ix {} != blob_ix {}, outside window!",
|
||||
ix, blob_ix
|
||||
@ -1014,7 +1015,7 @@ impl Crdt {
|
||||
|
||||
if let Some(ledger_window) = ledger_window {
|
||||
if let Ok(entry) = ledger_window.get_entry(ix) {
|
||||
inc_new_counter!("crdt-window-request-ledger", 1);
|
||||
inc_new_counter_info!("crdt-window-request-ledger", 1);
|
||||
|
||||
let out = entry.to_blob(
|
||||
blob_recycler,
|
||||
@ -1027,7 +1028,7 @@ impl Crdt {
|
||||
}
|
||||
}
|
||||
|
||||
inc_new_counter!("crdt-window-request-fail", 1);
|
||||
inc_new_counter_info!("crdt-window-request-fail", 1);
|
||||
info!(
|
||||
"{:x}: failed RequestWindowIndex {:x} {} {}",
|
||||
me.debug_id(),
|
||||
@ -1077,7 +1078,7 @@ impl Crdt {
|
||||
me.debug_id(),
|
||||
make_debug_id(&from_rd.id)
|
||||
);
|
||||
inc_new_counter!("crdt-window-request-loopback", 1);
|
||||
inc_new_counter_info!("crdt-window-request-loopback", 1);
|
||||
return None;
|
||||
}
|
||||
// only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from`
|
||||
@ -1133,7 +1134,7 @@ impl Crdt {
|
||||
//TODO verify from is signed
|
||||
obj.write().unwrap().insert(&from);
|
||||
let me = obj.read().unwrap().my_data().clone();
|
||||
inc_new_counter!("crdt-window-request-recv", 1);
|
||||
inc_new_counter_info!("crdt-window-request-recv", 1);
|
||||
trace!(
|
||||
"{:x}:received RequestWindowIndex {:x} {} ",
|
||||
me.debug_id(),
|
||||
@ -1147,7 +1148,7 @@ impl Crdt {
|
||||
from.debug_id(),
|
||||
ix,
|
||||
);
|
||||
inc_new_counter!("crdt-window-request-address-eq", 1);
|
||||
inc_new_counter_info!("crdt-window-request-address-eq", 1);
|
||||
return None;
|
||||
}
|
||||
Self::run_window_request(&window, ledger_window, &me, &from, ix, blob_recycler)
|
||||
|
@ -2,6 +2,7 @@
|
||||
use bincode::{deserialize, serialize};
|
||||
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
||||
use counter::Counter;
|
||||
use log::Level;
|
||||
use result::{Error, Result};
|
||||
use serde::Serialize;
|
||||
use signature::PublicKey;
|
||||
@ -208,7 +209,7 @@ impl Packets {
|
||||
trace!("receiving on {}", socket.local_addr().unwrap());
|
||||
match socket.recv_from(&mut p.data) {
|
||||
Err(_) if i > 0 => {
|
||||
inc_new_counter!("packets-recv_count", 1);
|
||||
inc_new_counter_info!("packets-recv_count", 1);
|
||||
debug!("got {:?} messages on {}", i, socket.local_addr().unwrap());
|
||||
break;
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ use bank::Bank;
|
||||
use counter::Counter;
|
||||
use crdt::Crdt;
|
||||
use ledger::{reconstruct_entries_from_blobs, LedgerWriter};
|
||||
use log::Level;
|
||||
use packet::BlobRecycler;
|
||||
use result::{Error, Result};
|
||||
use service::Service;
|
||||
@ -53,7 +54,7 @@ impl ReplicateStage {
|
||||
wcrdt.insert_votes(&votes);
|
||||
}
|
||||
|
||||
inc_new_counter!(
|
||||
inc_new_counter_info!(
|
||||
"replicate-transactions",
|
||||
entries.iter().map(|x| x.transactions.len()).sum()
|
||||
);
|
||||
|
@ -5,6 +5,7 @@
|
||||
//!
|
||||
|
||||
use counter::Counter;
|
||||
use log::Level;
|
||||
use packet::{Packet, SharedPackets};
|
||||
use std::mem::size_of;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
@ -97,7 +98,7 @@ pub fn ed25519_verify_cpu(batches: &[SharedPackets]) -> Vec<Vec<u8>> {
|
||||
.collect()
|
||||
})
|
||||
.collect();
|
||||
inc_new_counter!("ed25519_verify", count);
|
||||
inc_new_counter_info!("ed25519_verify", count);
|
||||
rv
|
||||
}
|
||||
|
||||
@ -116,7 +117,7 @@ pub fn ed25519_verify_disabled(batches: &[SharedPackets]) -> Vec<Vec<u8>> {
|
||||
.collect()
|
||||
})
|
||||
.collect();
|
||||
inc_new_counter!("ed25519_verify", count);
|
||||
inc_new_counter_info!("ed25519_verify", count);
|
||||
rv
|
||||
}
|
||||
|
||||
@ -203,7 +204,7 @@ pub fn ed25519_verify(batches: &[SharedPackets]) -> Vec<Vec<u8>> {
|
||||
num += 1;
|
||||
}
|
||||
}
|
||||
inc_new_counter!("ed25519_verify", count);
|
||||
inc_new_counter_info!("ed25519_verify", count);
|
||||
rvs
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,7 @@ use counter::Counter;
|
||||
use crdt::{Crdt, CrdtError, NodeInfo};
|
||||
#[cfg(feature = "erasure")]
|
||||
use erasure;
|
||||
use log::Level::Trace;
|
||||
use log::Level;
|
||||
use packet::{
|
||||
Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedBlobs, SharedPackets, BLOB_SIZE,
|
||||
};
|
||||
@ -250,7 +250,7 @@ fn repair_window(
|
||||
let reqs = find_next_missing(window, crdt, recycler, consumed, highest_lost)?;
|
||||
trace!("{:x}: repair_window missing: {}", debug_id, reqs.len());
|
||||
if !reqs.is_empty() {
|
||||
inc_new_counter!("streamer-repair_window-repair", reqs.len());
|
||||
inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
|
||||
debug!(
|
||||
"{:x}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}",
|
||||
debug_id,
|
||||
@ -325,7 +325,7 @@ fn retransmit_all_leader_blocks(
|
||||
received,
|
||||
retransmit_queue.len(),
|
||||
);
|
||||
inc_new_counter!("streamer-recv_window-retransmit", retransmit_queue.len());
|
||||
inc_new_counter_info!("streamer-recv_window-retransmit", retransmit_queue.len());
|
||||
retransmit.send(retransmit_queue)?;
|
||||
}
|
||||
Ok(())
|
||||
@ -500,7 +500,7 @@ fn recv_window(
|
||||
while let Ok(mut nq) = r.try_recv() {
|
||||
dq.append(&mut nq)
|
||||
}
|
||||
inc_new_counter!("streamer-recv_window-recv", dq.len());
|
||||
inc_new_counter_info!("streamer-recv_window-recv", dq.len());
|
||||
debug!(
|
||||
"{:x}: RECV_WINDOW {} {}: got packets {}",
|
||||
debug_id,
|
||||
@ -548,7 +548,7 @@ fn recv_window(
|
||||
consumed,
|
||||
);
|
||||
}
|
||||
if log_enabled!(Trace) {
|
||||
if log_enabled!(Level::Trace) {
|
||||
trace!("{}", print_window(debug_id, window, *consumed));
|
||||
}
|
||||
trace!(
|
||||
@ -569,7 +569,7 @@ fn recv_window(
|
||||
debug_id,
|
||||
consume_queue.len()
|
||||
);
|
||||
inc_new_counter!("streamer-recv_window-consume", consume_queue.len());
|
||||
inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len());
|
||||
s.send(consume_queue)?;
|
||||
}
|
||||
Ok(())
|
||||
@ -720,7 +720,7 @@ pub fn window(
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
_ => {
|
||||
inc_new_counter!("streamer-window-error", 1, 1);
|
||||
inc_new_counter_info!("streamer-window-error", 1, 1);
|
||||
error!("window error: {:?}", e);
|
||||
}
|
||||
}
|
||||
@ -757,7 +757,7 @@ fn broadcast(
|
||||
// break them up into window-sized chunks to process
|
||||
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec());
|
||||
|
||||
if log_enabled!(Trace) {
|
||||
if log_enabled!(Level::Trace) {
|
||||
trace!("{}", print_window(debug_id, window, *receive_index));
|
||||
}
|
||||
|
||||
@ -769,7 +769,7 @@ fn broadcast(
|
||||
index_blobs(node_info, &blobs, receive_index).expect("index blobs for initial window");
|
||||
|
||||
// keep the cache of blobs that are broadcast
|
||||
inc_new_counter!("streamer-broadcast-sent", blobs.len());
|
||||
inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
|
||||
{
|
||||
let mut win = window.write().unwrap();
|
||||
assert!(blobs.len() <= win.len());
|
||||
@ -877,7 +877,7 @@ pub fn broadcaster(
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
|
||||
_ => {
|
||||
inc_new_counter!("streamer-broadcaster-error", 1, 1);
|
||||
inc_new_counter_info!("streamer-broadcaster-error", 1, 1);
|
||||
error!("broadcaster error: {:?}", e);
|
||||
}
|
||||
}
|
||||
@ -933,7 +933,7 @@ pub fn retransmitter(
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
_ => {
|
||||
inc_new_counter!("streamer-retransmit-error", 1, 1);
|
||||
inc_new_counter_info!("streamer-retransmit-error", 1, 1);
|
||||
error!("retransmitter error: {:?}", e);
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ use counter::Counter;
|
||||
use crdt::Crdt;
|
||||
use hash::Hash;
|
||||
use influx_db_client as influxdb;
|
||||
use log::Level;
|
||||
use metrics;
|
||||
use packet::{BlobRecycler, SharedBlob};
|
||||
use result::Result;
|
||||
@ -150,7 +151,7 @@ pub fn send_leader_vote(
|
||||
"{:x} leader_sent_vote finality: {} ms",
|
||||
debug_id, finality_ms
|
||||
);
|
||||
inc_new_counter!("vote_stage-leader_sent_vote", 1);
|
||||
inc_new_counter_info!("vote_stage-leader_sent_vote", 1);
|
||||
|
||||
metrics::submit(
|
||||
influxdb::Point::new(&"leader-finality")
|
||||
@ -172,7 +173,7 @@ fn send_validator_vote(
|
||||
) -> Result<()> {
|
||||
let last_id = bank.last_id();
|
||||
if let Ok((_, shared_blob)) = create_vote_tx_and_blob(&last_id, keypair, crdt, blob_recycler) {
|
||||
inc_new_counter!("replicate-vote_sent", 1);
|
||||
inc_new_counter_info!("replicate-vote_sent", 1);
|
||||
|
||||
vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?;
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ use counter::Counter;
|
||||
use crdt::Crdt;
|
||||
use entry::Entry;
|
||||
use ledger::{Block, LedgerWriter};
|
||||
use log::Level;
|
||||
use packet::BlobRecycler;
|
||||
use result::{Error, Result};
|
||||
use service::Service;
|
||||
@ -58,8 +59,8 @@ impl WriteStage {
|
||||
entries.to_blobs(blob_recycler, &mut blobs);
|
||||
|
||||
if !blobs.is_empty() {
|
||||
inc_new_counter!("write_stage-recv_vote", votes.len());
|
||||
inc_new_counter!("write_stage-broadcast_blobs", blobs.len());
|
||||
inc_new_counter_info!("write_stage-recv_vote", votes.len());
|
||||
inc_new_counter_info!("write_stage-broadcast_blobs", blobs.len());
|
||||
trace!("broadcasting {}", blobs.len());
|
||||
blob_sender.send(blobs)?;
|
||||
}
|
||||
@ -105,7 +106,10 @@ impl WriteStage {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
_ => {
|
||||
inc_new_counter!("write_stage-write_and_send_entries-error", 1);
|
||||
inc_new_counter_info!(
|
||||
"write_stage-write_and_send_entries-error",
|
||||
1
|
||||
);
|
||||
error!("{:?}", e);
|
||||
}
|
||||
}
|
||||
@ -120,7 +124,7 @@ impl WriteStage {
|
||||
&mut last_vote,
|
||||
&mut last_valid_validator_timestamp,
|
||||
) {
|
||||
inc_new_counter!("write_stage-leader_vote-error", 1);
|
||||
inc_new_counter_info!("write_stage-leader_vote-error", 1);
|
||||
error!("{:?}", e);
|
||||
}
|
||||
}
|
||||
|
@ -623,12 +623,12 @@ fn test_multi_node_dynamic_network() {
|
||||
}
|
||||
}
|
||||
}
|
||||
assert_eq!(consecutive_success, 10);
|
||||
info!(
|
||||
"Took {} s to converge total failures: {}",
|
||||
duration_as_s(&now.elapsed()),
|
||||
failures
|
||||
);
|
||||
assert_eq!(consecutive_success, 10);
|
||||
for (_, node) in &validators {
|
||||
node.exit();
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user