Add channel pressure for validator TVU stages (#1509)

This commit is contained in:
Pankaj Garg
2018-10-16 12:54:23 -07:00
committed by GitHub
parent 2bd877528f
commit f6c10d8a2e
3 changed files with 48 additions and 0 deletions

View File

@ -5,9 +5,11 @@ use cluster_info::ClusterInfo;
use counter::Counter; use counter::Counter;
use entry::EntryReceiver; use entry::EntryReceiver;
use hash::Hash; use hash::Hash;
use influx_db_client as influxdb;
use leader_scheduler::LeaderScheduler; use leader_scheduler::LeaderScheduler;
use ledger::{Block, LedgerWriter}; use ledger::{Block, LedgerWriter};
use log::Level; use log::Level;
use metrics;
use result::{Error, Result}; use result::{Error, Result};
use service::Service; use service::Service;
use signature::{Keypair, KeypairUtil}; use signature::{Keypair, KeypairUtil};
@ -20,6 +22,7 @@ use std::thread::{self, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use streamer::{responder, BlobSender}; use streamer::{responder, BlobSender};
use sys_info::hostname;
use vote_stage::send_validator_vote; use vote_stage::send_validator_vote;
#[derive(Debug, PartialEq, Eq, Clone)] #[derive(Debug, PartialEq, Eq, Clone)]
@ -69,6 +72,15 @@ impl ReplicateStage {
entries.append(&mut more); entries.append(&mut more);
} }
metrics::submit(
influxdb::Point::new("replicate-stage")
.add_field(
"host",
influxdb::Value::String(hostname().unwrap_or_else(|_| "?".to_string())),
).add_field("count", influxdb::Value::Integer(entries.len() as i64))
.to_owned(),
);
let mut res = Ok(()); let mut res = Ok(());
let last_entry_id = { let last_entry_id = {
let mut num_entries_to_write = entries.len(); let mut num_entries_to_write = entries.len();

View File

@ -3,8 +3,10 @@
use cluster_info::ClusterInfo; use cluster_info::ClusterInfo;
use counter::Counter; use counter::Counter;
use entry::Entry; use entry::Entry;
use influx_db_client as influxdb;
use leader_scheduler::LeaderScheduler; use leader_scheduler::LeaderScheduler;
use log::Level; use log::Level;
use metrics;
use result::{Error, Result}; use result::{Error, Result};
use service::Service; use service::Service;
use std::net::UdpSocket; use std::net::UdpSocket;
@ -15,6 +17,7 @@ use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer::BlobReceiver; use streamer::BlobReceiver;
use sys_info::hostname;
use window::SharedWindow; use window::SharedWindow;
use window_service::window_service; use window_service::window_service;
@ -28,6 +31,16 @@ fn retransmit(
while let Ok(mut nq) = r.try_recv() { while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq); dq.append(&mut nq);
} }
metrics::submit(
influxdb::Point::new("retransmit-stage")
.add_field(
"host",
influxdb::Value::String(hostname().unwrap_or_else(|_| "?".to_string())),
).add_field("count", influxdb::Value::Integer(dq.len() as i64))
.to_owned(),
);
for b in &mut dq { for b in &mut dq {
ClusterInfo::retransmit(&cluster_info, b, sock)?; ClusterInfo::retransmit(&cluster_info, b, sock)?;
} }

View File

@ -3,8 +3,10 @@
use cluster_info::{ClusterInfo, NodeInfo}; use cluster_info::{ClusterInfo, NodeInfo};
use counter::Counter; use counter::Counter;
use entry::EntrySender; use entry::EntrySender;
use influx_db_client as influxdb;
use leader_scheduler::LeaderScheduler; use leader_scheduler::LeaderScheduler;
use log::Level; use log::Level;
use metrics;
use packet::SharedBlob; use packet::SharedBlob;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use result::{Error, Result}; use result::{Error, Result};
@ -16,6 +18,7 @@ use std::sync::{Arc, RwLock};
use std::thread::{Builder, JoinHandle}; use std::thread::{Builder, JoinHandle};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use streamer::{BlobReceiver, BlobSender}; use streamer::{BlobReceiver, BlobSender};
use sys_info::hostname;
use timing::duration_as_ms; use timing::duration_as_ms;
use window::{blob_idx_in_window, SharedWindow, WindowUtil}; use window::{blob_idx_in_window, SharedWindow, WindowUtil};
@ -116,6 +119,16 @@ fn retransmit_all_leader_blocks(
} }
} }
} }
metrics::submit(
influxdb::Point::new("retransmit-queue")
.add_field(
"host",
influxdb::Value::String(hostname().unwrap_or_else(|_| "?".to_string())),
).add_field(
"count",
influxdb::Value::Integer(retransmit_queue.len() as i64),
).to_owned(),
);
} else { } else {
warn!("{}: no leader to retransmit from", id); warn!("{}: no leader to retransmit from", id);
} }
@ -160,6 +173,16 @@ fn recv_window(
} }
let now = Instant::now(); let now = Instant::now();
inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100); inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100);
metrics::submit(
influxdb::Point::new("recv-window")
.add_field(
"host",
influxdb::Value::String(hostname().unwrap_or_else(|_| "?".to_string())),
).add_field("count", influxdb::Value::Integer(dq.len() as i64))
.to_owned(),
);
trace!( trace!(
"{}: RECV_WINDOW {} {}: got packets {}", "{}: RECV_WINDOW {} {}: got packets {}",
id, id,