diff --git a/src/bank.rs b/src/bank.rs index d9565577d5..3ab7866a25 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -11,6 +11,7 @@ use entry::Entry; use hash::Hash; use itertools::Itertools; use ledger::Block; +use log::Level; use mint::Mint; use payment_plan::{Payment, PaymentPlan, Witness}; use signature::{KeyPair, PublicKey, Signature}; @@ -224,9 +225,9 @@ impl Bank { let option = bals.get_mut(&tx.from); if option.is_none() { if let Instruction::NewVote(_) = &tx.instruction { - inc_new_counter!("bank-appy_debits-vote_account_not_found", 1); + inc_new_counter_info!("bank-appy_debits-vote_account_not_found", 1); } else { - inc_new_counter!("bank-appy_debits-generic_account_not_found", 1); + inc_new_counter_info!("bank-appy_debits-generic_account_not_found", 1); } return Err(BankError::AccountNotFound(tx.from)); } diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 55851c0126..aec0cf65b6 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -5,6 +5,7 @@ use bank::Bank; use bincode::deserialize; use counter::Counter; +use log::Level; use packet::{PacketRecycler, Packets, SharedPackets}; use rayon::prelude::*; use record_stage::Signal; @@ -43,7 +44,7 @@ fn recv_multiple_packets( mms.append(&mut nq); if recv_tries >= max_tries { - inc_new_counter!("banking_stage-max_packets_coalesced", 1); + inc_new_counter_info!("banking_stage-max_packets_coalesced", 1); break; } } @@ -165,8 +166,8 @@ impl BankingStage { reqs_len, (reqs_len as f32) / (total_time_s) ); - inc_new_counter!("banking_stage-process_packets", count); - inc_new_counter!( + inc_new_counter_info!("banking_stage-process_packets", count); + inc_new_counter_info!( "banking_stage-process_transactions", bank.transaction_count() - bank_starting_tx_count ); diff --git a/src/counter.rs b/src/counter.rs index 50fd63ae6f..31d61748e9 100644 --- a/src/counter.rs +++ b/src/counter.rs @@ -34,14 +34,21 @@ macro_rules! inc_counter { }; } -macro_rules! inc_new_counter { +macro_rules! inc_new_counter_info { ($name:expr, $count:expr) => {{ - static mut INC_NEW_COUNTER: Counter = create_counter!($name, 0); - inc_counter!(INC_NEW_COUNTER, $count); + inc_new_counter!($name, $count, Level::Info, 0); }}; ($name:expr, $count:expr, $lograte:expr) => {{ - static mut INC_NEW_COUNTER: Counter = create_counter!($name, $lograte); - inc_counter!(INC_NEW_COUNTER, $count); + inc_new_counter!($name, $count, Level::Info, $lograte); + }}; +} + +macro_rules! inc_new_counter { + ($name:expr, $count:expr, $level:expr, $lograte:expr) => {{ + if log_enabled!($level) { + static mut INC_NEW_COUNTER: Counter = create_counter!($name, $lograte); + inc_counter!(INC_NEW_COUNTER, $count); + } }}; } @@ -89,6 +96,7 @@ impl Counter { #[cfg(test)] mod tests { use counter::{Counter, DEFAULT_METRICS_RATE}; + use log::Level; use std::env; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Once, RwLock, ONCE_INIT}; @@ -134,8 +142,8 @@ mod tests { let _readlock = get_env_lock().read(); //make sure that macros are syntactically correct //the variable is internal to the macro scope so there is no way to introspect it - inc_new_counter!("counter-1", 1); - inc_new_counter!("counter-2", 1, 2); + inc_new_counter_info!("counter-1", 1); + inc_new_counter_info!("counter-2", 1, 2); } #[test] fn test_lograte() { diff --git a/src/crdt.rs b/src/crdt.rs index 09550ce7ef..ca677c1782 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -19,6 +19,7 @@ use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerSt use counter::Counter; use hash::Hash; use ledger::LedgerWindow; +use log::Level; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use pnet_datalink as datalink; use rand::{thread_rng, RngCore}; @@ -382,7 +383,7 @@ impl Crdt { self.debug_id(), make_debug_id(&pubkey) ); - inc_new_counter!("crdt-insert_vote-leader_voted", 1); + inc_new_counter_info!("crdt-insert_vote-leader_voted", 1); } if v.version <= self.table[pubkey].version { @@ -408,7 +409,7 @@ impl Crdt { } } pub fn insert_votes(&mut self, votes: &[(PublicKey, Vote, Hash)]) { - inc_new_counter!("crdt-vote-count", votes.len()); + inc_new_counter_info!("crdt-vote-count", votes.len()); if !votes.is_empty() { info!("{:x}: INSERTING VOTES {}", self.debug_id(), votes.len()); } @@ -429,13 +430,13 @@ impl Crdt { v.version ); if self.table.get(&v.id).is_none() { - inc_new_counter!("crdt-insert-new_entry", 1, 1); + inc_new_counter_info!("crdt-insert-new_entry", 1, 1); } self.update_index += 1; let _ = self.table.insert(v.id, v.clone()); let _ = self.local.insert(v.id, self.update_index); - inc_new_counter!("crdt-update-count", 1); + inc_new_counter_info!("crdt-update-count", 1); self.update_liveness(v.id); } else { trace!( @@ -493,7 +494,7 @@ impl Crdt { }) .collect(); - inc_new_counter!("crdt-purge-count", dead_ids.len()); + inc_new_counter_info!("crdt-purge-count", dead_ids.len()); for id in &dead_ids { self.alive.remove(id); @@ -511,7 +512,7 @@ impl Crdt { self.debug_id(), make_debug_id(id), ); - inc_new_counter!("crdt-purge-purged_leader", 1, 1); + inc_new_counter_info!("crdt-purge-purged_leader", 1, 1); self.set_leader(PublicKey::default()); } } @@ -565,7 +566,7 @@ impl Crdt { ) -> Result<()> { if broadcast_table.is_empty() { warn!("{:x}:not enough peers in crdt table", me.debug_id()); - inc_new_counter!("crdt-broadcast-not_enough_peers_error", 1); + inc_new_counter_info!("crdt-broadcast-not_enough_peers_error", 1); Err(CrdtError::NoPeers)?; } trace!( @@ -661,7 +662,7 @@ impl Crdt { transmit_index.data += 1; } } - inc_new_counter!( + inc_new_counter_info!( "crdt-broadcast-max_idx", (transmit_index.data - old_transmit_index) as usize ); @@ -717,7 +718,7 @@ impl Crdt { .collect(); for e in errs { if let Err(e) = &e { - inc_new_counter!("crdt-retransmit-send_to_error", 1, 1); + inc_new_counter_info!("crdt-retransmit-send_to_error", 1, 1); error!("retransmit result {:?}", e); } e?; @@ -999,11 +1000,11 @@ impl Crdt { outblob.meta.set_addr(&from.contact_info.tvu_window); outblob.set_id(sender_id).expect("blob set_id"); } - inc_new_counter!("crdt-window-request-pass", 1); + inc_new_counter_info!("crdt-window-request-pass", 1); return Some(out); } else { - inc_new_counter!("crdt-window-request-outside", 1); + inc_new_counter_info!("crdt-window-request-outside", 1); info!( "requested ix {} != blob_ix {}, outside window!", ix, blob_ix @@ -1014,7 +1015,7 @@ impl Crdt { if let Some(ledger_window) = ledger_window { if let Ok(entry) = ledger_window.get_entry(ix) { - inc_new_counter!("crdt-window-request-ledger", 1); + inc_new_counter_info!("crdt-window-request-ledger", 1); let out = entry.to_blob( blob_recycler, @@ -1027,7 +1028,7 @@ impl Crdt { } } - inc_new_counter!("crdt-window-request-fail", 1); + inc_new_counter_info!("crdt-window-request-fail", 1); info!( "{:x}: failed RequestWindowIndex {:x} {} {}", me.debug_id(), @@ -1077,7 +1078,7 @@ impl Crdt { me.debug_id(), make_debug_id(&from_rd.id) ); - inc_new_counter!("crdt-window-request-loopback", 1); + inc_new_counter_info!("crdt-window-request-loopback", 1); return None; } // only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from` @@ -1133,7 +1134,7 @@ impl Crdt { //TODO verify from is signed obj.write().unwrap().insert(&from); let me = obj.read().unwrap().my_data().clone(); - inc_new_counter!("crdt-window-request-recv", 1); + inc_new_counter_info!("crdt-window-request-recv", 1); trace!( "{:x}:received RequestWindowIndex {:x} {} ", me.debug_id(), @@ -1147,7 +1148,7 @@ impl Crdt { from.debug_id(), ix, ); - inc_new_counter!("crdt-window-request-address-eq", 1); + inc_new_counter_info!("crdt-window-request-address-eq", 1); return None; } Self::run_window_request(&window, ledger_window, &me, &from, ix, blob_recycler) diff --git a/src/packet.rs b/src/packet.rs index f80b37ac2f..52cab8866f 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -2,6 +2,7 @@ use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use counter::Counter; +use log::Level; use result::{Error, Result}; use serde::Serialize; use signature::PublicKey; @@ -208,7 +209,7 @@ impl Packets { trace!("receiving on {}", socket.local_addr().unwrap()); match socket.recv_from(&mut p.data) { Err(_) if i > 0 => { - inc_new_counter!("packets-recv_count", 1); + inc_new_counter_info!("packets-recv_count", 1); debug!("got {:?} messages on {}", i, socket.local_addr().unwrap()); break; } diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index afe706ae03..d80753c5aa 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -4,6 +4,7 @@ use bank::Bank; use counter::Counter; use crdt::Crdt; use ledger::{reconstruct_entries_from_blobs, LedgerWriter}; +use log::Level; use packet::BlobRecycler; use result::{Error, Result}; use service::Service; @@ -53,7 +54,7 @@ impl ReplicateStage { wcrdt.insert_votes(&votes); } - inc_new_counter!( + inc_new_counter_info!( "replicate-transactions", entries.iter().map(|x| x.transactions.len()).sum() ); diff --git a/src/sigverify.rs b/src/sigverify.rs index 6317a5f148..5ebad8cbef 100644 --- a/src/sigverify.rs +++ b/src/sigverify.rs @@ -5,6 +5,7 @@ //! use counter::Counter; +use log::Level; use packet::{Packet, SharedPackets}; use std::mem::size_of; use std::sync::atomic::AtomicUsize; @@ -97,7 +98,7 @@ pub fn ed25519_verify_cpu(batches: &[SharedPackets]) -> Vec> { .collect() }) .collect(); - inc_new_counter!("ed25519_verify", count); + inc_new_counter_info!("ed25519_verify", count); rv } @@ -116,7 +117,7 @@ pub fn ed25519_verify_disabled(batches: &[SharedPackets]) -> Vec> { .collect() }) .collect(); - inc_new_counter!("ed25519_verify", count); + inc_new_counter_info!("ed25519_verify", count); rv } @@ -203,7 +204,7 @@ pub fn ed25519_verify(batches: &[SharedPackets]) -> Vec> { num += 1; } } - inc_new_counter!("ed25519_verify", count); + inc_new_counter_info!("ed25519_verify", count); rvs } diff --git a/src/streamer.rs b/src/streamer.rs index 1ad6bf2ecd..a5949db9ec 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -4,7 +4,7 @@ use counter::Counter; use crdt::{Crdt, CrdtError, NodeInfo}; #[cfg(feature = "erasure")] use erasure; -use log::Level::Trace; +use log::Level; use packet::{ Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedBlobs, SharedPackets, BLOB_SIZE, }; @@ -250,7 +250,7 @@ fn repair_window( let reqs = find_next_missing(window, crdt, recycler, consumed, highest_lost)?; trace!("{:x}: repair_window missing: {}", debug_id, reqs.len()); if !reqs.is_empty() { - inc_new_counter!("streamer-repair_window-repair", reqs.len()); + inc_new_counter_info!("streamer-repair_window-repair", reqs.len()); debug!( "{:x}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}", debug_id, @@ -325,7 +325,7 @@ fn retransmit_all_leader_blocks( received, retransmit_queue.len(), ); - inc_new_counter!("streamer-recv_window-retransmit", retransmit_queue.len()); + inc_new_counter_info!("streamer-recv_window-retransmit", retransmit_queue.len()); retransmit.send(retransmit_queue)?; } Ok(()) @@ -500,7 +500,7 @@ fn recv_window( while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq) } - inc_new_counter!("streamer-recv_window-recv", dq.len()); + inc_new_counter_info!("streamer-recv_window-recv", dq.len()); debug!( "{:x}: RECV_WINDOW {} {}: got packets {}", debug_id, @@ -548,7 +548,7 @@ fn recv_window( consumed, ); } - if log_enabled!(Trace) { + if log_enabled!(Level::Trace) { trace!("{}", print_window(debug_id, window, *consumed)); } trace!( @@ -569,7 +569,7 @@ fn recv_window( debug_id, consume_queue.len() ); - inc_new_counter!("streamer-recv_window-consume", consume_queue.len()); + inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len()); s.send(consume_queue)?; } Ok(()) @@ -720,7 +720,7 @@ pub fn window( Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), _ => { - inc_new_counter!("streamer-window-error", 1, 1); + inc_new_counter_info!("streamer-window-error", 1, 1); error!("window error: {:?}", e); } } @@ -757,7 +757,7 @@ fn broadcast( // break them up into window-sized chunks to process let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec()); - if log_enabled!(Trace) { + if log_enabled!(Level::Trace) { trace!("{}", print_window(debug_id, window, *receive_index)); } @@ -769,7 +769,7 @@ fn broadcast( index_blobs(node_info, &blobs, receive_index).expect("index blobs for initial window"); // keep the cache of blobs that are broadcast - inc_new_counter!("streamer-broadcast-sent", blobs.len()); + inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); { let mut win = window.write().unwrap(); assert!(blobs.len() <= win.len()); @@ -877,7 +877,7 @@ pub fn broadcaster( Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? _ => { - inc_new_counter!("streamer-broadcaster-error", 1, 1); + inc_new_counter_info!("streamer-broadcaster-error", 1, 1); error!("broadcaster error: {:?}", e); } } @@ -933,7 +933,7 @@ pub fn retransmitter( Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), _ => { - inc_new_counter!("streamer-retransmit-error", 1, 1); + inc_new_counter_info!("streamer-retransmit-error", 1, 1); error!("retransmitter error: {:?}", e); } } diff --git a/src/vote_stage.rs b/src/vote_stage.rs index eab9a0c427..cd9affc355 100644 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -6,6 +6,7 @@ use counter::Counter; use crdt::Crdt; use hash::Hash; use influx_db_client as influxdb; +use log::Level; use metrics; use packet::{BlobRecycler, SharedBlob}; use result::Result; @@ -150,7 +151,7 @@ pub fn send_leader_vote( "{:x} leader_sent_vote finality: {} ms", debug_id, finality_ms ); - inc_new_counter!("vote_stage-leader_sent_vote", 1); + inc_new_counter_info!("vote_stage-leader_sent_vote", 1); metrics::submit( influxdb::Point::new(&"leader-finality") @@ -172,7 +173,7 @@ fn send_validator_vote( ) -> Result<()> { let last_id = bank.last_id(); if let Ok((_, shared_blob)) = create_vote_tx_and_blob(&last_id, keypair, crdt, blob_recycler) { - inc_new_counter!("replicate-vote_sent", 1); + inc_new_counter_info!("replicate-vote_sent", 1); vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?; } diff --git a/src/write_stage.rs b/src/write_stage.rs index f8703d943e..b666496beb 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -7,6 +7,7 @@ use counter::Counter; use crdt::Crdt; use entry::Entry; use ledger::{Block, LedgerWriter}; +use log::Level; use packet::BlobRecycler; use result::{Error, Result}; use service::Service; @@ -58,8 +59,8 @@ impl WriteStage { entries.to_blobs(blob_recycler, &mut blobs); if !blobs.is_empty() { - inc_new_counter!("write_stage-recv_vote", votes.len()); - inc_new_counter!("write_stage-broadcast_blobs", blobs.len()); + inc_new_counter_info!("write_stage-recv_vote", votes.len()); + inc_new_counter_info!("write_stage-broadcast_blobs", blobs.len()); trace!("broadcasting {}", blobs.len()); blob_sender.send(blobs)?; } @@ -105,7 +106,10 @@ impl WriteStage { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), _ => { - inc_new_counter!("write_stage-write_and_send_entries-error", 1); + inc_new_counter_info!( + "write_stage-write_and_send_entries-error", + 1 + ); error!("{:?}", e); } } @@ -120,7 +124,7 @@ impl WriteStage { &mut last_vote, &mut last_valid_validator_timestamp, ) { - inc_new_counter!("write_stage-leader_vote-error", 1); + inc_new_counter_info!("write_stage-leader_vote-error", 1); error!("{:?}", e); } } diff --git a/tests/multinode.rs b/tests/multinode.rs index 956215256b..3467109ac0 100755 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -623,12 +623,12 @@ fn test_multi_node_dynamic_network() { } } } - assert_eq!(consecutive_success, 10); info!( "Took {} s to converge total failures: {}", duration_as_s(&now.elapsed()), failures ); + assert_eq!(consecutive_success, 10); for (_, node) in &validators { node.exit(); }