From f6c10d8a2e2806f6e5cb12d7738664433b4f6939 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Tue, 16 Oct 2018 12:54:23 -0700 Subject: [PATCH] Add channel pressure for validator TVU stages (#1509) --- src/replicate_stage.rs | 12 ++++++++++++ src/retransmit_stage.rs | 13 +++++++++++++ src/window_service.rs | 23 +++++++++++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index c22bb67245..86a4891005 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -5,9 +5,11 @@ use cluster_info::ClusterInfo; use counter::Counter; use entry::EntryReceiver; use hash::Hash; +use influx_db_client as influxdb; use leader_scheduler::LeaderScheduler; use ledger::{Block, LedgerWriter}; use log::Level; +use metrics; use result::{Error, Result}; use service::Service; use signature::{Keypair, KeypairUtil}; @@ -20,6 +22,7 @@ use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use std::time::Instant; use streamer::{responder, BlobSender}; +use sys_info::hostname; use vote_stage::send_validator_vote; #[derive(Debug, PartialEq, Eq, Clone)] @@ -69,6 +72,15 @@ impl ReplicateStage { entries.append(&mut more); } + metrics::submit( + influxdb::Point::new("replicate-stage") + .add_field( + "host", + influxdb::Value::String(hostname().unwrap_or_else(|_| "?".to_string())), + ).add_field("count", influxdb::Value::Integer(entries.len() as i64)) + .to_owned(), + ); + let mut res = Ok(()); let last_entry_id = { let mut num_entries_to_write = entries.len(); diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index ec3dca1530..8f788586a4 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -3,8 +3,10 @@ use cluster_info::ClusterInfo; use counter::Counter; use entry::Entry; +use influx_db_client as influxdb; use leader_scheduler::LeaderScheduler; use log::Level; +use metrics; use result::{Error, Result}; use service::Service; use std::net::UdpSocket; @@ -15,6 +17,7 @@ use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use streamer::BlobReceiver; +use sys_info::hostname; use window::SharedWindow; use window_service::window_service; @@ -28,6 +31,16 @@ fn retransmit( while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq); } + + metrics::submit( + influxdb::Point::new("retransmit-stage") + .add_field( + "host", + influxdb::Value::String(hostname().unwrap_or_else(|_| "?".to_string())), + ).add_field("count", influxdb::Value::Integer(dq.len() as i64)) + .to_owned(), + ); + for b in &mut dq { ClusterInfo::retransmit(&cluster_info, b, sock)?; } diff --git a/src/window_service.rs b/src/window_service.rs index 3796a032dd..51c929f1ce 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -3,8 +3,10 @@ use cluster_info::{ClusterInfo, NodeInfo}; use counter::Counter; use entry::EntrySender; +use influx_db_client as influxdb; use leader_scheduler::LeaderScheduler; use log::Level; +use metrics; use packet::SharedBlob; use rand::{thread_rng, Rng}; use result::{Error, Result}; @@ -16,6 +18,7 @@ use std::sync::{Arc, RwLock}; use std::thread::{Builder, JoinHandle}; use std::time::{Duration, Instant}; use streamer::{BlobReceiver, BlobSender}; +use sys_info::hostname; use timing::duration_as_ms; use window::{blob_idx_in_window, SharedWindow, WindowUtil}; @@ -116,6 +119,16 @@ fn retransmit_all_leader_blocks( } } } + metrics::submit( + influxdb::Point::new("retransmit-queue") + .add_field( + "host", + influxdb::Value::String(hostname().unwrap_or_else(|_| "?".to_string())), + ).add_field( + "count", + influxdb::Value::Integer(retransmit_queue.len() as i64), + ).to_owned(), + ); } else { warn!("{}: no leader to retransmit from", id); } @@ -160,6 +173,16 @@ fn recv_window( } let now = Instant::now(); inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100); + + metrics::submit( + influxdb::Point::new("recv-window") + .add_field( + "host", + influxdb::Value::String(hostname().unwrap_or_else(|_| "?".to_string())), + ).add_field("count", influxdb::Value::Integer(dq.len() as i64)) + .to_owned(), + ); + trace!( "{}: RECV_WINDOW {} {}: got packets {}", id,