Rate limit metrics per log level (#4313)

* Rate limit metrics per log level

* fix tests
This commit is contained in:
Pankaj Garg
2019-05-16 22:27:05 -07:00
committed by GitHub
parent 39e85a3e53
commit e920191de0
7 changed files with 124 additions and 59 deletions

2
Cargo.lock generated
View File

@ -2568,6 +2568,7 @@ dependencies = [
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"reqwest 0.9.17 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.9.17 (registry+https://github.com/rust-lang/crates.io-index)",
"solana-logger 0.15.0",
"solana-sdk 0.15.0", "solana-sdk 0.15.0",
"sys-info 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", "sys-info 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
@ -2734,6 +2735,7 @@ dependencies = [
name = "solana-upload-perf" name = "solana-upload-perf"
version = "0.15.0" version = "0.15.0"
dependencies = [ dependencies = [
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)",
"solana-metrics 0.15.0", "solana-metrics 0.15.0",
] ]

View File

@ -408,6 +408,7 @@ impl Blocktree {
), ),
) )
.to_owned(), .to_owned(),
log::Level::Error,
); );
} }
} }

View File

@ -14,6 +14,7 @@ log = "0.4.2"
reqwest = "0.9.17" reqwest = "0.9.17"
lazy_static = "1.3.0" lazy_static = "1.3.0"
sys-info = "0.5.6" sys-info = "0.5.6"
solana-logger = { path = "../logger", version = "0.15.0" }
solana-sdk = { path = "../sdk", version = "0.15.0" } solana-sdk = { path = "../sdk", version = "0.15.0" }
[dev-dependencies] [dev-dependencies]

View File

@ -95,6 +95,10 @@ impl Counter {
); );
} }
pub fn inc(&mut self, level: log::Level, events: usize) { pub fn inc(&mut self, level: log::Level, events: usize) {
if !log_enabled!(level) {
return;
}
let counts = self.counts.fetch_add(events, Ordering::Relaxed); let counts = self.counts.fetch_add(events, Ordering::Relaxed);
let times = self.times.fetch_add(1, Ordering::Relaxed); let times = self.times.fetch_add(1, Ordering::Relaxed);
let mut lograte = self.lograte.load(Ordering::Relaxed); let mut lograte = self.lograte.load(Ordering::Relaxed);
@ -134,7 +138,7 @@ impl Counter {
.or_insert(influxdb::Value::Integer(0)); .or_insert(influxdb::Value::Integer(0));
} }
if let Some(ref mut point) = self.point { if let Some(ref mut point) = self.point {
submit(point.to_owned()); submit(point.to_owned(), level);
} }
} }
} }
@ -144,6 +148,7 @@ impl Counter {
mod tests { mod tests {
use crate::counter::{Counter, DEFAULT_LOG_RATE}; use crate::counter::{Counter, DEFAULT_LOG_RATE};
use log::Level; use log::Level;
use solana_logger;
use std::env; use std::env;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::{Once, RwLock, ONCE_INIT}; use std::sync::{Once, RwLock, ONCE_INIT};
@ -162,6 +167,8 @@ mod tests {
#[test] #[test]
fn test_counter() { fn test_counter() {
env::set_var("RUST_LOG", "info");
solana_logger::setup();
let _readlock = get_env_lock().read(); let _readlock = get_env_lock().read();
static mut COUNTER: Counter = create_counter!("test", 1000, 1); static mut COUNTER: Counter = create_counter!("test", 1000, 1);
let count = 1; let count = 1;
@ -195,6 +202,8 @@ mod tests {
} }
#[test] #[test]
fn test_lograte() { fn test_lograte() {
env::set_var("RUST_LOG", "info");
solana_logger::setup();
let _readlock = get_env_lock().read(); let _readlock = get_env_lock().read();
assert_eq!( assert_eq!(
Counter::default_log_rate(), Counter::default_log_rate(),
@ -212,6 +221,8 @@ mod tests {
#[test] #[test]
fn test_lograte_env() { fn test_lograte_env() {
env::set_var("RUST_LOG", "info");
solana_logger::setup();
assert_ne!(DEFAULT_LOG_RATE, 0); assert_ne!(DEFAULT_LOG_RATE, 0);
let _writelock = get_env_lock().write(); let _writelock = get_env_lock().write();
static mut COUNTER: Counter = create_counter!("test_lograte_env", 0, 1); static mut COUNTER: Counter = create_counter!("test_lograte_env", 0, 1);

View File

@ -1,15 +1,17 @@
//! The `metrics` module enables sending measurements to an `InfluxDB` instance //! The `metrics` module enables sending measurements to an `InfluxDB` instance
use influx_db_client as influxdb; use influx_db_client as influxdb;
use influx_db_client::Point;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::*; use log::*;
use solana_sdk::hash::hash; use solana_sdk::hash::hash;
use solana_sdk::timing; use solana_sdk::timing;
use std::env; use std::collections::HashMap;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Barrier, Mutex, Once, ONCE_INIT}; use std::sync::{Arc, Barrier, Mutex, Once, ONCE_INIT};
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{cmp, env};
use sys_info::hostname; use sys_info::hostname;
#[macro_export] #[macro_export]
@ -53,7 +55,7 @@ macro_rules! datapoint {
}; };
($name:expr, $($fields:tt)+) => { ($name:expr, $($fields:tt)+) => {
$crate::submit($crate::datapoint!(@point $name, $($fields)+)); $crate::submit($crate::datapoint!(@point $name, $($fields)+), log::Level::Info);
}; };
} }
@ -73,7 +75,7 @@ lazy_static! {
#[derive(Debug)] #[derive(Debug)]
enum MetricsCommand { enum MetricsCommand {
Submit(influxdb::Point), Submit(influxdb::Point, log::Level),
Flush(Arc<Barrier>), Flush(Arc<Barrier>),
} }
@ -152,6 +154,53 @@ impl MetricsAgent {
Self { sender } Self { sender }
} }
fn write(
points: &[Point],
last_write_time: Instant,
max_points: usize,
writer: &Arc<MetricsWriter + Send + Sync>,
max_points_per_sec: usize,
) -> usize {
if points.is_empty() {
return 0;
}
let now = Instant::now();
let num_points = points.len();
debug!("run: attempting to write {} points", num_points);
if num_points > max_points {
warn!(
"max submission rate of {} datapoints per second exceeded. only the
first {} of {} points will be submitted",
max_points_per_sec, max_points, num_points
);
}
let points_written = cmp::min(num_points, max_points - 1);
let extra = influxdb::Point::new("metrics")
.add_timestamp(timing::timestamp() as i64)
.add_field("host_id", influxdb::Value::String(HOST_INFO.to_string()))
.add_field(
"points_written",
influxdb::Value::Integer(points_written as i64),
)
.add_field("num_points", influxdb::Value::Integer(num_points as i64))
.add_field(
"secs_since_last_write",
influxdb::Value::Integer(now.duration_since(last_write_time).as_secs() as i64),
)
.add_field(
"points_rate_exceeded",
influxdb::Value::Boolean(num_points > max_points),
)
.to_owned();
writer.write(points[0..points_written].to_vec());
writer.write([extra].to_vec());
points_written
}
fn run( fn run(
receiver: &Receiver<MetricsCommand>, receiver: &Receiver<MetricsCommand>,
writer: &Arc<MetricsWriter + Send + Sync>, writer: &Arc<MetricsWriter + Send + Sync>,
@ -160,7 +209,7 @@ impl MetricsAgent {
) { ) {
trace!("run: enter"); trace!("run: enter");
let mut last_write_time = Instant::now(); let mut last_write_time = Instant::now();
let mut points = Vec::new(); let mut points_map = HashMap::<log::Level, (Instant, Vec<Point>)>::new();
let max_points = write_frequency_secs.as_secs() as usize * max_points_per_sec; let max_points = write_frequency_secs.as_secs() as usize * max_points_per_sec;
loop { loop {
@ -168,15 +217,18 @@ impl MetricsAgent {
Ok(cmd) => match cmd { Ok(cmd) => match cmd {
MetricsCommand::Flush(barrier) => { MetricsCommand::Flush(barrier) => {
debug!("metrics_thread: flush"); debug!("metrics_thread: flush");
if !points.is_empty() { points_map.retain(|_, (_, points)| {
writer.write(points); writer.write(points.to_vec());
points = Vec::new();
last_write_time = Instant::now(); last_write_time = Instant::now();
} false
});
barrier.wait(); barrier.wait();
} }
MetricsCommand::Submit(point) => { MetricsCommand::Submit(point, level) => {
debug!("run: submit {:?}", point); debug!("run: submit {:?}", point);
let (_, points) = points_map
.entry(level)
.or_insert((last_write_time, Vec::new()));
points.push(point); points.push(point);
} }
}, },
@ -189,60 +241,49 @@ impl MetricsAgent {
} }
} }
let mut num_max_writes = max_points;
let now = Instant::now(); let now = Instant::now();
if now.duration_since(last_write_time) >= write_frequency_secs && !points.is_empty() { if now.duration_since(last_write_time) >= write_frequency_secs {
let num_points = points.len(); vec![
let points_written; Level::Error,
debug!("run: attempting to write {} points", points.len()); Level::Warn,
if points.len() > max_points { Level::Info,
warn!( Level::Debug,
"max submission rate of {} datapoints per second exceeded. only the Level::Trace,
first {} of {} points will be submitted", ]
max_points_per_sec, .iter()
max_points, .for_each(|x| {
points.len() if let Some((last_time, points)) = points_map.remove(x) {
); let num_written = Self::write(
points.truncate(max_points - 1); &points,
} last_time,
points_written = points.len(); num_max_writes,
writer,
max_points_per_sec,
);
points.push( if num_written > 0 {
influxdb::Point::new("metrics") last_write_time = Instant::now();
.add_timestamp(timing::timestamp() as i64) }
.add_field("host_id", influxdb::Value::String(HOST_INFO.to_string()))
.add_field(
"points_written",
influxdb::Value::Integer(points_written as i64),
)
.add_field("num_points", influxdb::Value::Integer(num_points as i64))
.add_field(
"secs_since_last_write",
influxdb::Value::Integer(
now.duration_since(last_write_time).as_secs() as i64
),
)
.add_field(
"points_rate_exceeded",
influxdb::Value::Boolean(num_points > max_points),
)
.to_owned(),
);
writer.write(points); num_max_writes = num_max_writes.saturating_sub(num_written);
points = Vec::new(); }
last_write_time = now; });
} }
} }
trace!("run: exit"); trace!("run: exit");
} }
pub fn submit(&self, mut point: influxdb::Point) { pub fn submit(&self, mut point: influxdb::Point, level: log::Level) {
point.add_field("host_id", influxdb::Value::String(HOST_INFO.to_string())); point.add_field("host_id", influxdb::Value::String(HOST_INFO.to_string()));
if point.timestamp.is_none() { if point.timestamp.is_none() {
point.timestamp = Some(timing::timestamp() as i64); point.timestamp = Some(timing::timestamp() as i64);
} }
debug!("Submitting point: {:?}", point); debug!("Submitting point: {:?}", point);
self.sender.send(MetricsCommand::Submit(point)).unwrap(); self.sender
.send(MetricsCommand::Submit(point, level))
.unwrap();
} }
pub fn flush(&self) { pub fn flush(&self) {
@ -276,10 +317,10 @@ fn get_singleton_agent() -> Arc<Mutex<MetricsAgent>> {
/// Submits a new point from any thread. Note that points are internally queued /// Submits a new point from any thread. Note that points are internally queued
/// and transmitted periodically in batches. /// and transmitted periodically in batches.
pub fn submit(point: influxdb::Point) { pub fn submit(point: influxdb::Point, level: log::Level) {
let agent_mutex = get_singleton_agent(); let agent_mutex = get_singleton_agent();
let agent = agent_mutex.lock().unwrap(); let agent = agent_mutex.lock().unwrap();
agent.submit(point); agent.submit(point, level);
} }
fn get_env_settings() -> Result<(String, String, String, String), env::VarError> { fn get_env_settings() -> Result<(String, String, String, String), env::VarError> {
@ -347,6 +388,7 @@ pub fn set_panic_hook(program: &'static str) {
) )
.add_field("host_id", influxdb::Value::String(HOST_INFO.to_string())) .add_field("host_id", influxdb::Value::String(HOST_INFO.to_string()))
.to_owned(), .to_owned(),
Level::Error,
); );
// Flush metrics immediately in case the process exits immediately // Flush metrics immediately in case the process exits immediately
// upon return // upon return
@ -396,7 +438,10 @@ mod test {
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000); let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
for i in 0..42 { for i in 0..42 {
agent.submit(influxdb::Point::new(&format!("measurement {}", i))); agent.submit(
influxdb::Point::new(&format!("measurement {}", i)),
Level::Info,
);
} }
agent.flush(); agent.flush();
@ -408,7 +453,7 @@ mod test {
let writer = Arc::new(MockMetricsWriter::new()); let writer = Arc::new(MockMetricsWriter::new());
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 1000); let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 1000);
agent.submit(influxdb::Point::new("point 1")); agent.submit(influxdb::Point::new("point 1"), Level::Info);
thread::sleep(Duration::from_secs(2)); thread::sleep(Duration::from_secs(2));
assert_eq!(writer.points_written(), 2); assert_eq!(writer.points_written(), 2);
} }
@ -419,7 +464,10 @@ mod test {
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 100); let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 100);
for i in 0..102 { for i in 0..102 {
agent.submit(influxdb::Point::new(&format!("measurement {}", i))); agent.submit(
influxdb::Point::new(&format!("measurement {}", i)),
Level::Info,
);
} }
thread::sleep(Duration::from_secs(2)); thread::sleep(Duration::from_secs(2));
@ -445,7 +493,7 @@ mod test {
let point = influxdb::Point::new(&format!("measurement {}", i)); let point = influxdb::Point::new(&format!("measurement {}", i));
let agent = Arc::clone(&agent); let agent = Arc::clone(&agent);
threads.push(thread::spawn(move || { threads.push(thread::spawn(move || {
agent.lock().unwrap().submit(point); agent.lock().unwrap().submit(point, Level::Info);
})); }));
} }
@ -462,7 +510,7 @@ mod test {
let writer = Arc::new(MockMetricsWriter::new()); let writer = Arc::new(MockMetricsWriter::new());
{ {
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(9999999), 1000); let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(9999999), 1000);
agent.submit(influxdb::Point::new("point 1")); agent.submit(influxdb::Point::new("point 1"), Level::Info);
} }
assert_eq!(writer.points_written(), 1); assert_eq!(writer.points_written(), 1);
@ -483,7 +531,7 @@ mod test {
influxdb::Value::Integer(rand::random::<u8>() as i64), influxdb::Value::Integer(rand::random::<u8>() as i64),
) )
.to_owned(); .to_owned();
agent.submit(point); agent.submit(point, Level::Info);
} }
#[test] #[test]

View File

@ -9,6 +9,7 @@ edition = "2018"
homepage = "https://solana.com/" homepage = "https://solana.com/"
[dependencies] [dependencies]
log = "0.4.2"
serde_json = "1.0.39" serde_json = "1.0.39"
solana-metrics = { path = "../metrics", version = "0.15.0" } solana-metrics = { path = "../metrics", version = "0.15.0" }

View File

@ -77,6 +77,7 @@ fn main() {
influxdb::Value::String(git_commit_hash.trim().to_string()), influxdb::Value::String(git_commit_hash.trim().to_string()),
) )
.to_owned(), .to_owned(),
log::Level::Info,
); );
} }
let last_median = get_last_metrics(&"median".to_string(), &db, &name, &branch) let last_median = get_last_metrics(&"median".to_string(), &db, &name, &branch)