From f4622d67e93cb3f730a618e9e0c708242be700c4 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Wed, 19 Feb 2020 21:48:52 -0700 Subject: [PATCH] Submit all metrics in one HTTP POST rather than a HTTP POST per level --- metrics/src/metrics.rs | 128 ++++++++++++++++++++--------------------- 1 file changed, 61 insertions(+), 67 deletions(-) diff --git a/metrics/src/metrics.rs b/metrics/src/metrics.rs index f24ca5af2d..028f22c9ea 100644 --- a/metrics/src/metrics.rs +++ b/metrics/src/metrics.rs @@ -135,26 +135,45 @@ impl Default for MetricsAgent { impl MetricsAgent { fn new( writer: Arc, - write_frequency_secs: Duration, + write_frequency: Duration, max_points_per_sec: usize, ) -> Self { let (sender, receiver) = channel::(); - thread::spawn(move || { - Self::run(&receiver, &writer, write_frequency_secs, max_points_per_sec) - }); + thread::spawn(move || Self::run(&receiver, &writer, write_frequency, max_points_per_sec)); Self { sender } } + fn collect_points( + points_map: &mut HashMap)>, + ) -> Vec { + let points: Vec = [ + Level::Error, + Level::Warn, + Level::Info, + Level::Debug, + Level::Trace, + ] + .iter() + .filter_map(|level| points_map.remove(level)) + .flat_map(|(counters, points)| { + let counter_points = counters.into_iter().map(|(_, v)| v.into()); + points.into_iter().chain(counter_points) + }) + .collect(); + points_map.clear(); + points + } + fn write( - mut points: Vec, - last_write_time: Instant, - max_points: usize, writer: &Arc, + mut points: Vec, + max_points: usize, max_points_per_sec: usize, - ) -> usize { + last_write_time: Instant, + ) { if points.is_empty() { - return 0; + return; } let now = Instant::now(); @@ -182,49 +201,45 @@ impl MetricsAgent { ); writer.write(points); - points_written } - fn run( receiver: &Receiver, writer: &Arc, - write_frequency_secs: Duration, + write_frequency: Duration, max_points_per_sec: usize, ) { trace!("run: enter"); let mut last_write_time = Instant::now(); - let mut points_map = HashMap::)>::new(); - let max_points = write_frequency_secs.as_secs() as usize * max_points_per_sec; + let mut points_map = HashMap::)>::new(); + let max_points = write_frequency.as_secs() as usize * max_points_per_sec; loop { - match receiver.recv_timeout(write_frequency_secs / 2) { + match receiver.recv_timeout(write_frequency / 2) { Ok(cmd) => match cmd { MetricsCommand::Flush(barrier) => { debug!("metrics_thread: flush"); - points_map.drain().for_each(|(_, (_, counters, points))| { - let counter_points = counters.into_iter().map(|(_, v)| v.into()); - let points: Vec<_> = points.into_iter().chain(counter_points).collect(); - writer.write(points); - last_write_time = Instant::now(); - }); + Self::write( + writer, + Self::collect_points(&mut points_map), + max_points, + max_points_per_sec, + last_write_time, + ); + last_write_time = Instant::now(); barrier.wait(); } MetricsCommand::Submit(point, level) => { log!(level, "{}", point); - let (_, _, points) = points_map.entry(level).or_insert(( - last_write_time, - HashMap::new(), - Vec::new(), - )); + let (_, points) = points_map + .entry(level) + .or_insert((HashMap::new(), Vec::new())); points.push(point); } MetricsCommand::SubmitCounter(counter, level, bucket) => { debug!("{:?}", counter); - let (_, counters, _) = points_map.entry(level).or_insert(( - last_write_time, - HashMap::new(), - Vec::new(), - )); + let (counters, _) = points_map + .entry(level) + .or_insert((HashMap::new(), Vec::new())); let key = (counter.name, bucket); if let Some(value) = counters.get_mut(&key) { @@ -243,37 +258,16 @@ impl MetricsAgent { } } - let mut num_max_writes = max_points; - let now = Instant::now(); - if now.duration_since(last_write_time) >= write_frequency_secs { - vec![ - Level::Error, - Level::Warn, - Level::Info, - Level::Debug, - Level::Trace, - ] - .iter() - .for_each(|x| { - if let Some((last_time, counters, points)) = points_map.remove(x) { - let counter_points = counters.into_iter().map(|(_, v)| v.into()); - let points: Vec<_> = points.into_iter().chain(counter_points).collect(); - let num_written = Self::write( - points, - last_time, - num_max_writes, - writer, - max_points_per_sec, - ); - - if num_written > 0 { - last_write_time = Instant::now(); - } - - num_max_writes = num_max_writes.saturating_sub(num_written); - } - }); + if now.duration_since(last_write_time) >= write_frequency { + Self::write( + writer, + Self::collect_points(&mut points_map), + max_points, + max_points_per_sec, + last_write_time, + ); + last_write_time = now; } } trace!("run: exit"); @@ -501,7 +495,7 @@ mod test { } agent.flush(); - assert_eq!(writer.points_written(), 42); + assert_eq!(writer.points_written(), 43); } #[test] @@ -515,7 +509,7 @@ mod test { } agent.flush(); - assert_eq!(writer.points_written(), 20); + assert_eq!(writer.points_written(), 21); } #[test] @@ -536,7 +530,7 @@ mod test { } agent.flush(); - assert_eq!(writer.points_written(), 1); + assert_eq!(writer.points_written(), 2); let submitted_point = writer.points_written.lock().unwrap()[0].clone(); assert_eq!(submitted_point.fields[0], ("count", "100i".to_string())); @@ -553,7 +547,7 @@ mod test { } agent.flush(); - assert_eq!(writer.points_written(), 10); + assert_eq!(writer.points_written(), 11); } #[test] @@ -613,7 +607,7 @@ mod test { } agent.lock().unwrap().flush(); - assert_eq!(writer.points_written(), 42); + assert_eq!(writer.points_written(), 43); } #[test] @@ -624,7 +618,7 @@ mod test { agent.submit(DataPoint::new("point 1"), Level::Info); } - assert_eq!(writer.points_written(), 1); + assert_eq!(writer.points_written(), 2); } #[test]