diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 8c2d8c087f..1cb627101d 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -2,7 +2,7 @@ use bank::Bank; use bincode::serialize; -use counter::Counter; +use counter::{Counter, DEFAULT_LOG_RATE}; use crdt::Crdt; use ledger; use packet::BlobRecycler; @@ -27,7 +27,6 @@ pub struct ReplicateStage { } const VOTE_TIMEOUT_MS: u64 = 1000; -const LOG_RATE: usize = 10; impl ReplicateStage { /// Process entry blobs, already in order @@ -48,13 +47,19 @@ impl ReplicateStage { } let blobs_len = blobs.len(); let entries = ledger::reconstruct_entries_from_blobs(blobs.clone())?; - let votes = entries_to_votes(&entries); - - static mut COUNTER_REPLICATE: Counter = create_counter!("replicate-transactions", LOG_RATE); - inc_counter!( - COUNTER_REPLICATE, - entries.iter().map(|x| x.transactions.len()).sum() - ); + { + let votes = entries_to_votes(&entries); + let mut wcrdt = crdt.write().unwrap(); + wcrdt.insert_votes(&votes); + }; + { + static mut COUNTER_REPLICATE: Counter = + create_counter!("replicate-transactions", DEFAULT_LOG_RATE); + inc_counter!( + COUNTER_REPLICATE, + entries.iter().map(|x| x.transactions.len()).sum() + ); + } let res = bank.process_entries(entries); if res.is_err() { error!("process_entries {} {:?}", blobs_len, res); @@ -66,7 +71,6 @@ impl ReplicateStage { let shared_blob = blob_recycler.allocate(); let (vote, addr) = { let mut wcrdt = crdt.write().unwrap(); - wcrdt.insert_votes(&votes); //TODO: doesn't seem like there is a synchronous call to get height and id info!("replicate_stage {} {:?}", height, &last_id[..8]); wcrdt.new_vote(height, last_id) @@ -80,7 +84,13 @@ impl ReplicateStage { blob.meta.set_addr(&addr); blob.meta.size = len; } + { + static mut COUNTER_REPLICATE_VOTE: Counter = + create_counter!("replicate-vote_sent", DEFAULT_LOG_RATE); + inc_counter!(COUNTER_REPLICATE_VOTE, 1); + } *last_vote = now; + vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?; } while let Some(blob) = blobs.pop_front() {