Rate limit counter metrics points to one per second (#5496)

* Rate limit counter metrics points to one per second

* Remove old env var

* Test that metrics counter is incrementing

* Fix typo
This commit is contained in:
Justin Starry
2019-08-12 18:15:34 -04:00
committed by GitHub
parent 771d1a78fd
commit 0fde19239b
12 changed files with 209 additions and 163 deletions

View File

@@ -19,6 +19,7 @@ sys-info = "0.5.7"
[dev-dependencies]
rand = "0.6.5"
serde_json = "1.0"
serial_test = "0.2.0"
serial_test_derive = "0.2.0"

View File

@@ -1,15 +1,12 @@
use crate::{influxdb, submit};
use crate::metrics::{submit_counter, CounterPoint};
use log::*;
use solana_sdk::timing;
use std::env;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
const DEFAULT_LOG_RATE: usize = 1000;
const DEFAULT_METRICS_RATE: usize = 1;
const DEFAULT_METRICS_HIGH_RATE: usize = 10;
/// Use default metrics high rate
pub const HIGH_RATE: usize = 999_999;
// Submit a datapoint every second by default
const DEFAULT_METRICS_RATE: u64 = 1000;
pub struct Counter {
pub name: &'static str,
@@ -19,8 +16,7 @@ pub struct Counter {
/// last accumulated value logged
pub lastlog: AtomicUsize,
pub lograte: AtomicUsize,
pub metricsrate: AtomicUsize,
pub point: Option<influxdb::Point>,
pub metricsrate: AtomicU64,
}
#[macro_export]
@@ -32,8 +28,7 @@ macro_rules! create_counter {
times: std::sync::atomic::AtomicUsize::new(0),
lastlog: std::sync::atomic::AtomicUsize::new(0),
lograte: std::sync::atomic::AtomicUsize::new($lograte),
metricsrate: std::sync::atomic::AtomicUsize::new($metricsrate),
point: None,
metricsrate: std::sync::atomic::AtomicU64::new($metricsrate),
}
};
}
@@ -129,65 +124,13 @@ macro_rules! inc_new_counter_debug {
}};
}
#[macro_export]
macro_rules! inc_new_high_rate_counter_error {
($name:expr, $count:expr) => {{
inc_new_counter!(
$name,
$count,
log::Level::Error,
0,
$crate::counter::HIGH_RATE
);
}};
}
#[macro_export]
macro_rules! inc_new_high_rate_counter_warn {
($name:expr, $count:expr) => {{
inc_new_counter!(
$name,
$count,
log::Level::Warn,
0,
$crate::counter::HIGH_RATE
);
}};
}
#[macro_export]
macro_rules! inc_new_high_rate_counter_info {
($name:expr, $count:expr) => {{
inc_new_counter!(
$name,
$count,
log::Level::Info,
0,
$crate::counter::HIGH_RATE
);
}};
}
#[macro_export]
macro_rules! inc_new_high_rate_counter_debug {
($name:expr, $count:expr) => {{
inc_new_counter!(
$name,
$count,
log::Level::Debug,
0,
$crate::counter::HIGH_RATE
);
}};
}
impl Counter {
fn default_metrics_high_rate() -> usize {
let v = env::var("SOLANA_METRICS_HIGH_RATE")
fn default_metrics_rate() -> u64 {
let v = env::var("SOLANA_DEFAULT_METRICS_RATE")
.map(|x| x.parse().unwrap_or(0))
.unwrap_or(0);
if v == 0 {
DEFAULT_METRICS_HIGH_RATE
DEFAULT_METRICS_RATE
} else {
v
}
@@ -203,22 +146,13 @@ impl Counter {
}
}
pub fn init(&mut self) {
self.point = Some(
influxdb::Point::new(&self.name)
.add_field("count", influxdb::Value::Integer(0))
.to_owned(),
);
self.lograte
.compare_and_swap(0, Self::default_log_rate(), Ordering::Relaxed);
self.metricsrate.compare_and_swap(
HIGH_RATE,
Self::default_metrics_high_rate(),
Ordering::Relaxed,
);
self.metricsrate
.compare_and_swap(0, DEFAULT_METRICS_RATE, Ordering::Relaxed);
.compare_and_swap(0, Self::default_metrics_rate(), Ordering::Relaxed);
}
pub fn inc(&mut self, level: log::Level, events: usize) {
let now = timing::timestamp();
let counts = self.counts.fetch_add(events, Ordering::Relaxed);
let times = self.times.fetch_add(1, Ordering::Relaxed);
let lograte = self.lograte.load(Ordering::Relaxed);
@@ -230,36 +164,29 @@ impl Counter {
self.name,
counts + events,
times,
timing::timestamp(),
now,
events,
);
}
if times % metricsrate == 0 && times > 0 {
let lastlog = self.lastlog.load(Ordering::Relaxed);
let prev = self
.lastlog
.compare_and_swap(lastlog, counts, Ordering::Relaxed);
if prev == lastlog {
if let Some(ref mut point) = self.point {
point
.fields
.entry("count".to_string())
.and_modify(|v| {
*v = influxdb::Value::Integer(counts as i64 - lastlog as i64)
})
.or_insert(influxdb::Value::Integer(0));
}
if let Some(ref mut point) = self.point {
submit(point.to_owned(), level);
}
}
let lastlog = self.lastlog.load(Ordering::Relaxed);
let prev = self
.lastlog
.compare_and_swap(lastlog, counts, Ordering::Relaxed);
if prev == lastlog {
let bucket = now / metricsrate;
let counter = CounterPoint {
name: self.name,
count: counts as i64 - lastlog as i64,
timestamp: now,
};
submit_counter(counter, level, bucket);
}
}
}
#[cfg(test)]
mod tests {
use crate::counter::{Counter, DEFAULT_LOG_RATE, DEFAULT_METRICS_HIGH_RATE, HIGH_RATE};
use crate::counter::{Counter, DEFAULT_LOG_RATE, DEFAULT_METRICS_RATE};
use log::Level;
use log::*;
use serial_test_derive::serial;
@@ -313,31 +240,31 @@ mod tests {
#[test]
#[serial]
fn test_high_rate_counter() {
fn test_metricsrate() {
env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("solana=info"))
.try_init()
.ok();
let _readlock = get_env_lock().read();
static mut COUNTER: Counter = create_counter!("test", 1000, HIGH_RATE);
env::remove_var("SOLANA_METRICS_HIGH_RATE");
env::remove_var("SOLANA_DEFAULT_METRICS_RATE");
static mut COUNTER: Counter = create_counter!("test", 1000, 0);
unsafe {
COUNTER.init();
assert_eq!(
COUNTER.metricsrate.load(Ordering::Relaxed),
DEFAULT_METRICS_HIGH_RATE
DEFAULT_METRICS_RATE
);
}
}
#[test]
#[serial]
fn test_high_rate_counter_env() {
fn test_metricsrate_env() {
env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("solana=info"))
.try_init()
.ok();
let _writelock = get_env_lock().write();
static mut COUNTER: Counter = create_counter!("test", 1000, HIGH_RATE);
env::set_var("SOLANA_METRICS_HIGH_RATE", "50");
env::set_var("SOLANA_DEFAULT_METRICS_RATE", "50");
static mut COUNTER: Counter = create_counter!("test", 1000, 0);
unsafe {
COUNTER.init();
assert_eq!(COUNTER.metricsrate.load(Ordering::Relaxed), 50);

View File

@@ -7,6 +7,7 @@ use log::*;
use solana_sdk::hash::hash;
use solana_sdk::timing;
use std::collections::HashMap;
use std::convert::Into;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Barrier, Mutex, Once};
use std::thread;
@@ -128,10 +129,41 @@ lazy_static! {
};
}
type CounterMap = HashMap<(&'static str, u64), CounterPoint>;
#[derive(Clone, Debug)]
pub struct CounterPoint {
pub name: &'static str,
pub count: i64,
pub timestamp: u64,
}
impl CounterPoint {
#[cfg(test)]
fn new(name: &'static str) -> Self {
CounterPoint {
name,
count: 0,
timestamp: 0,
}
}
}
impl Into<influxdb::Point> for CounterPoint {
fn into(self) -> influxdb::Point {
let mut point = influxdb::Point::new(self.name);
point.add_tag("host_id", influxdb::Value::String(HOST_ID.to_string()));
point.add_field("count", influxdb::Value::Integer(self.count));
point.add_timestamp(self.timestamp as i64);
point
}
}
#[derive(Debug)]
enum MetricsCommand {
Submit(influxdb::Point, log::Level),
Flush(Arc<Barrier>),
Submit(influxdb::Point, log::Level),
SubmitCounter(CounterPoint, log::Level, u64),
}
struct MetricsAgent {
@@ -270,7 +302,7 @@ impl MetricsAgent {
) {
trace!("run: enter");
let mut last_write_time = Instant::now();
let mut points_map = HashMap::<log::Level, (Instant, Vec<Point>)>::new();
let mut points_map = HashMap::<log::Level, (Instant, CounterMap, Vec<Point>)>::new();
let max_points = write_frequency_secs.as_secs() as usize * max_points_per_sec;
loop {
@@ -278,20 +310,38 @@ impl MetricsAgent {
Ok(cmd) => match cmd {
MetricsCommand::Flush(barrier) => {
debug!("metrics_thread: flush");
points_map.retain(|_, (_, points)| {
writer.write(points.to_vec());
points_map.drain().for_each(|(_, (_, counters, points))| {
let counter_points = counters.into_iter().map(|(_, v)| v.into());
let points: Vec<_> = points.into_iter().chain(counter_points).collect();
writer.write(points);
last_write_time = Instant::now();
false
});
barrier.wait();
}
MetricsCommand::Submit(point, level) => {
debug!("run: submit {:?}", point);
let (_, points) = points_map
.entry(level)
.or_insert((last_write_time, Vec::new()));
let (_, _, points) = points_map.entry(level).or_insert((
last_write_time,
HashMap::new(),
Vec::new(),
));
points.push(point);
}
MetricsCommand::SubmitCounter(counter, level, bucket) => {
debug!("run: submit counter {:?}", counter);
let (_, counters, _) = points_map.entry(level).or_insert((
last_write_time,
HashMap::new(),
Vec::new(),
));
let key = (counter.name, bucket);
if let Some(value) = counters.get_mut(&key) {
value.count += counter.count;
} else {
counters.insert(key, counter);
}
}
},
Err(RecvTimeoutError::Timeout) => {
trace!("run: receive timeout");
@@ -315,7 +365,9 @@ impl MetricsAgent {
]
.iter()
.for_each(|x| {
if let Some((last_time, points)) = points_map.remove(x) {
if let Some((last_time, counters, points)) = points_map.remove(x) {
let counter_points = counters.into_iter().map(|(_, v)| v.into());
let points: Vec<_> = points.into_iter().chain(counter_points).collect();
let num_written = Self::write(
&points,
last_time,
@@ -338,7 +390,7 @@ impl MetricsAgent {
pub fn submit(&self, mut point: influxdb::Point, level: log::Level) {
if point.timestamp.is_none() {
point.timestamp = Some(timing::timestamp() as i64);
point.add_timestamp(timing::timestamp() as i64);
}
debug!("Submitting point: {:?}", point);
self.sender
@@ -346,6 +398,13 @@ impl MetricsAgent {
.unwrap();
}
pub fn submit_counter(&self, point: CounterPoint, level: log::Level, bucket: u64) {
debug!("Submitting counter point: {:?}", point);
self.sender
.send(MetricsCommand::SubmitCounter(point, level, bucket))
.unwrap();
}
pub fn flush(&self) {
debug!("Flush");
let barrier = Arc::new(Barrier::new(2));
@@ -384,6 +443,14 @@ pub fn submit(mut point: influxdb::Point, level: log::Level) {
agent.submit(point, level);
}
/// Submits a new counter or updates an existing counter from any thread. Note that points are
/// internally queued and transmitted periodically in batches.
pub fn submit_counter(point: CounterPoint, level: log::Level, bucket: u64) {
let agent_mutex = get_singleton_agent();
let agent = agent_mutex.lock().unwrap();
agent.submit_counter(point, level, bucket);
}
fn get_env_settings() -> Result<(String, String, String, String), env::VarError> {
let host =
env::var("INFLUX_HOST").unwrap_or_else(|_| "https://metrics.solana.com:8086".to_string());
@@ -461,20 +528,20 @@ pub fn set_panic_hook(program: &'static str) {
#[cfg(test)]
mod test {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use serde_json;
struct MockMetricsWriter {
points_written: AtomicUsize,
points_written: Arc<Mutex<Vec<influxdb::Point>>>,
}
impl MockMetricsWriter {
fn new() -> Self {
MockMetricsWriter {
points_written: AtomicUsize::new(0),
points_written: Arc::new(Mutex::new(Vec::new())),
}
}
fn points_written(&self) -> usize {
return self.points_written.load(Ordering::Relaxed);
self.points_written.lock().unwrap().len()
}
}
@@ -482,13 +549,16 @@ mod test {
fn write(&self, points: Vec<influxdb::Point>) {
assert!(!points.is_empty());
let new_points = points.len();
self.points_written
.fetch_add(points.len(), Ordering::Relaxed);
.lock()
.unwrap()
.extend(points.into_iter());
info!(
"Writing {} points ({} total)",
points.len(),
self.points_written.load(Ordering::Relaxed)
new_points,
self.points_written()
);
}
}
@@ -509,6 +579,63 @@ mod test {
assert_eq!(writer.points_written(), 42);
}
#[test]
fn test_submit_counter() {
let writer = Arc::new(MockMetricsWriter::new());
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
for i in 0..10 {
agent.submit_counter(CounterPoint::new("counter - 1"), Level::Info, i);
agent.submit_counter(CounterPoint::new("counter - 2"), Level::Info, i);
}
agent.flush();
assert_eq!(writer.points_written(), 20);
}
#[test]
fn test_submit_counter_increment() {
let writer = Arc::new(MockMetricsWriter::new());
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
for _ in 0..10 {
agent.submit_counter(
CounterPoint {
name: "counter",
count: 10,
timestamp: 0,
},
Level::Info,
0, // use the same bucket
);
}
agent.flush();
assert_eq!(writer.points_written(), 1);
let submitted_point = writer.points_written.lock().unwrap()[0].clone();
let submitted_count = submitted_point.fields.get("count").unwrap();
let expected_count = &influxdb::Value::Integer(100);
assert_eq!(
serde_json::to_string(submitted_count).unwrap(),
serde_json::to_string(expected_count).unwrap()
);
}
#[test]
fn test_submit_bucketed_counter() {
let writer = Arc::new(MockMetricsWriter::new());
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
for i in 0..50 {
agent.submit_counter(CounterPoint::new("counter - 1"), Level::Info, i / 10);
agent.submit_counter(CounterPoint::new("counter - 2"), Level::Info, i / 10);
}
agent.flush();
assert_eq!(writer.points_written(), 10);
}
#[test]
fn test_submit_with_delay() {
let writer = Arc::new(MockMetricsWriter::new());