Add rate limit to metrics datapoint submission (#4237)

Cleanup

Raise limit on submission threshold

Pick nits and add metrics point

fmt

Fixup compiler warning

Cleanup if-else

Append new point to vec rather than submit
This commit is contained in:
Dan Albert
2019-05-13 14:17:25 -06:00
committed by GitHub
parent a2e3a92b01
commit e2830f5b0e

View File

@ -133,28 +133,38 @@ impl Default for MetricsAgent {
Self::new( Self::new(
Arc::new(InfluxDbMetricsWriter::new()), Arc::new(InfluxDbMetricsWriter::new()),
Duration::from_secs(10), Duration::from_secs(10),
//max per-second datapoint submission limit
4000,
) )
} }
} }
impl MetricsAgent { impl MetricsAgent {
fn new(writer: Arc<MetricsWriter + Send + Sync>, write_frequency: Duration) -> Self { fn new(
writer: Arc<MetricsWriter + Send + Sync>,
write_frequency_secs: Duration,
max_points_per_sec: usize,
) -> Self {
let (sender, receiver) = channel::<MetricsCommand>(); let (sender, receiver) = channel::<MetricsCommand>();
thread::spawn(move || Self::run(&receiver, &writer, write_frequency)); thread::spawn(move || {
Self::run(&receiver, &writer, write_frequency_secs, max_points_per_sec)
});
Self { sender } Self { sender }
} }
fn run( fn run(
receiver: &Receiver<MetricsCommand>, receiver: &Receiver<MetricsCommand>,
writer: &Arc<MetricsWriter + Send + Sync>, writer: &Arc<MetricsWriter + Send + Sync>,
write_frequency: Duration, write_frequency_secs: Duration,
max_points_per_sec: usize,
) { ) {
trace!("run: enter"); trace!("run: enter");
let mut last_write_time = Instant::now(); let mut last_write_time = Instant::now();
let mut points = Vec::new(); let mut points = Vec::new();
let max_points = write_frequency_secs.as_secs() as usize * max_points_per_sec;
loop { loop {
match receiver.recv_timeout(write_frequency / 2) { match receiver.recv_timeout(write_frequency_secs / 2) {
Ok(cmd) => match cmd { Ok(cmd) => match cmd {
MetricsCommand::Flush(barrier) => { MetricsCommand::Flush(barrier) => {
debug!("metrics_thread: flush"); debug!("metrics_thread: flush");
@ -180,8 +190,44 @@ impl MetricsAgent {
} }
let now = Instant::now(); let now = Instant::now();
if now.duration_since(last_write_time) >= write_frequency && !points.is_empty() { if now.duration_since(last_write_time) >= write_frequency_secs && !points.is_empty() {
debug!("run: writing {} points", points.len()); let num_points = points.len();
let points_written;
debug!("run: attempting to write {} points", points.len());
if points.len() > max_points {
warn!(
"max submission rate of {} datapoints per second exceeded. only the
first {} of {} points will be submitted",
max_points_per_sec,
max_points,
points.len()
);
points.truncate(max_points - 1);
}
points_written = points.len();
points.push(
influxdb::Point::new("metrics")
.add_timestamp(timing::timestamp() as i64)
.add_field("host_id", influxdb::Value::String(HOST_INFO.to_string()))
.add_field(
"points_written",
influxdb::Value::Integer(points_written as i64),
)
.add_field("num_points", influxdb::Value::Integer(num_points as i64))
.add_field(
"secs_since_last_write",
influxdb::Value::Integer(
now.duration_since(last_write_time).as_secs() as i64
),
)
.add_field(
"points_rate_exceeded",
influxdb::Value::Boolean(num_points > max_points),
)
.to_owned(),
);
writer.write(points); writer.write(points);
points = Vec::new(); points = Vec::new();
last_write_time = now; last_write_time = now;
@ -347,7 +393,7 @@ mod test {
#[test] #[test]
fn test_submit() { fn test_submit() {
let writer = Arc::new(MockMetricsWriter::new()); let writer = Arc::new(MockMetricsWriter::new());
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10)); let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
for i in 0..42 { for i in 0..42 {
agent.submit(influxdb::Point::new(&format!("measurement {}", i))); agent.submit(influxdb::Point::new(&format!("measurement {}", i)));
@ -360,11 +406,26 @@ mod test {
#[test] #[test]
fn test_submit_with_delay() { fn test_submit_with_delay() {
let writer = Arc::new(MockMetricsWriter::new()); let writer = Arc::new(MockMetricsWriter::new());
let agent = MetricsAgent::new(writer.clone(), Duration::from_millis(100)); let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 1000);
agent.submit(influxdb::Point::new("point 1")); agent.submit(influxdb::Point::new("point 1"));
thread::sleep(Duration::from_secs(2)); thread::sleep(Duration::from_secs(2));
assert_eq!(writer.points_written(), 1); assert_eq!(writer.points_written(), 2);
}
#[test]
fn test_submit_exceed_max_rate() {
let writer = Arc::new(MockMetricsWriter::new());
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 100);
for i in 0..102 {
agent.submit(influxdb::Point::new(&format!("measurement {}", i)));
}
thread::sleep(Duration::from_secs(2));
agent.flush();
assert_eq!(writer.points_written(), 100);
} }
#[test] #[test]
@ -373,6 +434,7 @@ mod test {
let agent = Arc::new(Mutex::new(MetricsAgent::new( let agent = Arc::new(Mutex::new(MetricsAgent::new(
writer.clone(), writer.clone(),
Duration::from_secs(10), Duration::from_secs(10),
1000,
))); )));
// //
@ -399,7 +461,7 @@ mod test {
fn test_flush_before_drop() { fn test_flush_before_drop() {
let writer = Arc::new(MockMetricsWriter::new()); let writer = Arc::new(MockMetricsWriter::new());
{ {
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(9999999)); let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(9999999), 1000);
agent.submit(influxdb::Point::new("point 1")); agent.submit(influxdb::Point::new("point 1"));
} }