From e920191de0eb781dcd398b74fe6aa84e596f9dc0 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Thu, 16 May 2019 22:27:05 -0700 Subject: [PATCH] Rate limit metrics per log level (#4313) * Rate limit metrics per log level * fix tests --- Cargo.lock | 2 + core/src/blocktree.rs | 1 + metrics/Cargo.toml | 1 + metrics/src/counter.rs | 13 ++- metrics/src/metrics.rs | 164 +++++++++++++++++++++------------ upload-perf/Cargo.toml | 1 + upload-perf/src/upload-perf.rs | 1 + 7 files changed, 124 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d00494a6d..4edb27529e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2568,6 +2568,7 @@ dependencies = [ "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.9.17 (registry+https://github.com/rust-lang/crates.io-index)", + "solana-logger 0.15.0", "solana-sdk 0.15.0", "sys-info 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2734,6 +2735,7 @@ dependencies = [ name = "solana-upload-perf" version = "0.15.0" dependencies = [ + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", "solana-metrics 0.15.0", ] diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 0d00c9d6de..d8b1140d1f 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -408,6 +408,7 @@ impl Blocktree { ), ) .to_owned(), + log::Level::Error, ); } } diff --git a/metrics/Cargo.toml b/metrics/Cargo.toml index 17b9186f61..d1bf689d74 100644 --- a/metrics/Cargo.toml +++ b/metrics/Cargo.toml @@ -14,6 +14,7 @@ log = "0.4.2" reqwest = "0.9.17" lazy_static = "1.3.0" sys-info = "0.5.6" +solana-logger = { path = "../logger", version = "0.15.0" } solana-sdk = { path = "../sdk", version = "0.15.0" } [dev-dependencies] diff --git a/metrics/src/counter.rs b/metrics/src/counter.rs index cbe6be8007..027571804a 100644 --- a/metrics/src/counter.rs +++ b/metrics/src/counter.rs @@ -95,6 +95,10 @@ impl Counter { ); } pub fn inc(&mut self, level: log::Level, events: usize) { + if !log_enabled!(level) { + return; + } + let counts = self.counts.fetch_add(events, Ordering::Relaxed); let times = self.times.fetch_add(1, Ordering::Relaxed); let mut lograte = self.lograte.load(Ordering::Relaxed); @@ -134,7 +138,7 @@ impl Counter { .or_insert(influxdb::Value::Integer(0)); } if let Some(ref mut point) = self.point { - submit(point.to_owned()); + submit(point.to_owned(), level); } } } @@ -144,6 +148,7 @@ impl Counter { mod tests { use crate::counter::{Counter, DEFAULT_LOG_RATE}; use log::Level; + use solana_logger; use std::env; use std::sync::atomic::Ordering; use std::sync::{Once, RwLock, ONCE_INIT}; @@ -162,6 +167,8 @@ mod tests { #[test] fn test_counter() { + env::set_var("RUST_LOG", "info"); + solana_logger::setup(); let _readlock = get_env_lock().read(); static mut COUNTER: Counter = create_counter!("test", 1000, 1); let count = 1; @@ -195,6 +202,8 @@ mod tests { } #[test] fn test_lograte() { + env::set_var("RUST_LOG", "info"); + solana_logger::setup(); let _readlock = get_env_lock().read(); assert_eq!( Counter::default_log_rate(), @@ -212,6 +221,8 @@ mod tests { #[test] fn test_lograte_env() { + env::set_var("RUST_LOG", "info"); + solana_logger::setup(); assert_ne!(DEFAULT_LOG_RATE, 0); let _writelock = get_env_lock().write(); static mut COUNTER: Counter = create_counter!("test_lograte_env", 0, 1); diff --git a/metrics/src/metrics.rs b/metrics/src/metrics.rs index f8f0220636..ee7adaefa4 100644 --- a/metrics/src/metrics.rs +++ b/metrics/src/metrics.rs @@ -1,15 +1,17 @@ //! The `metrics` module enables sending measurements to an `InfluxDB` instance use influx_db_client as influxdb; +use influx_db_client::Point; use lazy_static::lazy_static; use log::*; use solana_sdk::hash::hash; use solana_sdk::timing; -use std::env; +use std::collections::HashMap; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Barrier, Mutex, Once, ONCE_INIT}; use std::thread; use std::time::{Duration, Instant}; +use std::{cmp, env}; use sys_info::hostname; #[macro_export] @@ -53,7 +55,7 @@ macro_rules! datapoint { }; ($name:expr, $($fields:tt)+) => { - $crate::submit($crate::datapoint!(@point $name, $($fields)+)); + $crate::submit($crate::datapoint!(@point $name, $($fields)+), log::Level::Info); }; } @@ -73,7 +75,7 @@ lazy_static! { #[derive(Debug)] enum MetricsCommand { - Submit(influxdb::Point), + Submit(influxdb::Point, log::Level), Flush(Arc), } @@ -152,6 +154,53 @@ impl MetricsAgent { Self { sender } } + fn write( + points: &[Point], + last_write_time: Instant, + max_points: usize, + writer: &Arc, + max_points_per_sec: usize, + ) -> usize { + if points.is_empty() { + return 0; + } + + let now = Instant::now(); + let num_points = points.len(); + debug!("run: attempting to write {} points", num_points); + if num_points > max_points { + warn!( + "max submission rate of {} datapoints per second exceeded. only the + first {} of {} points will be submitted", + max_points_per_sec, max_points, num_points + ); + } + let points_written = cmp::min(num_points, max_points - 1); + + let extra = influxdb::Point::new("metrics") + .add_timestamp(timing::timestamp() as i64) + .add_field("host_id", influxdb::Value::String(HOST_INFO.to_string())) + .add_field( + "points_written", + influxdb::Value::Integer(points_written as i64), + ) + .add_field("num_points", influxdb::Value::Integer(num_points as i64)) + .add_field( + "secs_since_last_write", + influxdb::Value::Integer(now.duration_since(last_write_time).as_secs() as i64), + ) + .add_field( + "points_rate_exceeded", + influxdb::Value::Boolean(num_points > max_points), + ) + .to_owned(); + + writer.write(points[0..points_written].to_vec()); + writer.write([extra].to_vec()); + + points_written + } + fn run( receiver: &Receiver, writer: &Arc, @@ -160,7 +209,7 @@ impl MetricsAgent { ) { trace!("run: enter"); let mut last_write_time = Instant::now(); - let mut points = Vec::new(); + let mut points_map = HashMap::)>::new(); let max_points = write_frequency_secs.as_secs() as usize * max_points_per_sec; loop { @@ -168,15 +217,18 @@ impl MetricsAgent { Ok(cmd) => match cmd { MetricsCommand::Flush(barrier) => { debug!("metrics_thread: flush"); - if !points.is_empty() { - writer.write(points); - points = Vec::new(); + points_map.retain(|_, (_, points)| { + writer.write(points.to_vec()); last_write_time = Instant::now(); - } + false + }); barrier.wait(); } - MetricsCommand::Submit(point) => { + MetricsCommand::Submit(point, level) => { debug!("run: submit {:?}", point); + let (_, points) = points_map + .entry(level) + .or_insert((last_write_time, Vec::new())); points.push(point); } }, @@ -189,60 +241,49 @@ impl MetricsAgent { } } + let mut num_max_writes = max_points; + let now = Instant::now(); - if now.duration_since(last_write_time) >= write_frequency_secs && !points.is_empty() { - let num_points = points.len(); - let points_written; - debug!("run: attempting to write {} points", points.len()); - if points.len() > max_points { - warn!( - "max submission rate of {} datapoints per second exceeded. only the - first {} of {} points will be submitted", - max_points_per_sec, - max_points, - points.len() - ); - points.truncate(max_points - 1); - } - points_written = points.len(); + if now.duration_since(last_write_time) >= write_frequency_secs { + vec![ + Level::Error, + Level::Warn, + Level::Info, + Level::Debug, + Level::Trace, + ] + .iter() + .for_each(|x| { + if let Some((last_time, points)) = points_map.remove(x) { + let num_written = Self::write( + &points, + last_time, + num_max_writes, + writer, + max_points_per_sec, + ); - points.push( - influxdb::Point::new("metrics") - .add_timestamp(timing::timestamp() as i64) - .add_field("host_id", influxdb::Value::String(HOST_INFO.to_string())) - .add_field( - "points_written", - influxdb::Value::Integer(points_written as i64), - ) - .add_field("num_points", influxdb::Value::Integer(num_points as i64)) - .add_field( - "secs_since_last_write", - influxdb::Value::Integer( - now.duration_since(last_write_time).as_secs() as i64 - ), - ) - .add_field( - "points_rate_exceeded", - influxdb::Value::Boolean(num_points > max_points), - ) - .to_owned(), - ); + if num_written > 0 { + last_write_time = Instant::now(); + } - writer.write(points); - points = Vec::new(); - last_write_time = now; + num_max_writes = num_max_writes.saturating_sub(num_written); + } + }); } } trace!("run: exit"); } - pub fn submit(&self, mut point: influxdb::Point) { + pub fn submit(&self, mut point: influxdb::Point, level: log::Level) { point.add_field("host_id", influxdb::Value::String(HOST_INFO.to_string())); if point.timestamp.is_none() { point.timestamp = Some(timing::timestamp() as i64); } debug!("Submitting point: {:?}", point); - self.sender.send(MetricsCommand::Submit(point)).unwrap(); + self.sender + .send(MetricsCommand::Submit(point, level)) + .unwrap(); } pub fn flush(&self) { @@ -276,10 +317,10 @@ fn get_singleton_agent() -> Arc> { /// Submits a new point from any thread. Note that points are internally queued /// and transmitted periodically in batches. -pub fn submit(point: influxdb::Point) { +pub fn submit(point: influxdb::Point, level: log::Level) { let agent_mutex = get_singleton_agent(); let agent = agent_mutex.lock().unwrap(); - agent.submit(point); + agent.submit(point, level); } fn get_env_settings() -> Result<(String, String, String, String), env::VarError> { @@ -347,6 +388,7 @@ pub fn set_panic_hook(program: &'static str) { ) .add_field("host_id", influxdb::Value::String(HOST_INFO.to_string())) .to_owned(), + Level::Error, ); // Flush metrics immediately in case the process exits immediately // upon return @@ -396,7 +438,10 @@ mod test { let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000); for i in 0..42 { - agent.submit(influxdb::Point::new(&format!("measurement {}", i))); + agent.submit( + influxdb::Point::new(&format!("measurement {}", i)), + Level::Info, + ); } agent.flush(); @@ -408,7 +453,7 @@ mod test { let writer = Arc::new(MockMetricsWriter::new()); let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 1000); - agent.submit(influxdb::Point::new("point 1")); + agent.submit(influxdb::Point::new("point 1"), Level::Info); thread::sleep(Duration::from_secs(2)); assert_eq!(writer.points_written(), 2); } @@ -419,7 +464,10 @@ mod test { let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 100); for i in 0..102 { - agent.submit(influxdb::Point::new(&format!("measurement {}", i))); + agent.submit( + influxdb::Point::new(&format!("measurement {}", i)), + Level::Info, + ); } thread::sleep(Duration::from_secs(2)); @@ -445,7 +493,7 @@ mod test { let point = influxdb::Point::new(&format!("measurement {}", i)); let agent = Arc::clone(&agent); threads.push(thread::spawn(move || { - agent.lock().unwrap().submit(point); + agent.lock().unwrap().submit(point, Level::Info); })); } @@ -462,7 +510,7 @@ mod test { let writer = Arc::new(MockMetricsWriter::new()); { let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(9999999), 1000); - agent.submit(influxdb::Point::new("point 1")); + agent.submit(influxdb::Point::new("point 1"), Level::Info); } assert_eq!(writer.points_written(), 1); @@ -483,7 +531,7 @@ mod test { influxdb::Value::Integer(rand::random::() as i64), ) .to_owned(); - agent.submit(point); + agent.submit(point, Level::Info); } #[test] diff --git a/upload-perf/Cargo.toml b/upload-perf/Cargo.toml index 05f94cce8b..7ba9ca0ad5 100644 --- a/upload-perf/Cargo.toml +++ b/upload-perf/Cargo.toml @@ -9,6 +9,7 @@ edition = "2018" homepage = "https://solana.com/" [dependencies] +log = "0.4.2" serde_json = "1.0.39" solana-metrics = { path = "../metrics", version = "0.15.0" } diff --git a/upload-perf/src/upload-perf.rs b/upload-perf/src/upload-perf.rs index 33f742903b..2501a4ad59 100644 --- a/upload-perf/src/upload-perf.rs +++ b/upload-perf/src/upload-perf.rs @@ -77,6 +77,7 @@ fn main() { influxdb::Value::String(git_commit_hash.trim().to_string()), ) .to_owned(), + log::Level::Info, ); } let last_median = get_last_metrics(&"median".to_string(), &db, &name, &branch)