Submit all metrics in one HTTP POST rather than a HTTP POST per level

This commit is contained in:
Michael Vines
2020-02-19 21:48:52 -07:00
parent b65c9ea544
commit f4622d67e9

View File

@ -135,26 +135,45 @@ impl Default for MetricsAgent {
impl MetricsAgent { impl MetricsAgent {
fn new( fn new(
writer: Arc<dyn MetricsWriter + Send + Sync>, writer: Arc<dyn MetricsWriter + Send + Sync>,
write_frequency_secs: Duration, write_frequency: Duration,
max_points_per_sec: usize, max_points_per_sec: usize,
) -> Self { ) -> Self {
let (sender, receiver) = channel::<MetricsCommand>(); let (sender, receiver) = channel::<MetricsCommand>();
thread::spawn(move || { thread::spawn(move || Self::run(&receiver, &writer, write_frequency, max_points_per_sec));
Self::run(&receiver, &writer, write_frequency_secs, max_points_per_sec)
});
Self { sender } Self { sender }
} }
fn collect_points(
points_map: &mut HashMap<log::Level, (CounterMap, Vec<DataPoint>)>,
) -> Vec<DataPoint> {
let points: Vec<DataPoint> = [
Level::Error,
Level::Warn,
Level::Info,
Level::Debug,
Level::Trace,
]
.iter()
.filter_map(|level| points_map.remove(level))
.flat_map(|(counters, points)| {
let counter_points = counters.into_iter().map(|(_, v)| v.into());
points.into_iter().chain(counter_points)
})
.collect();
points_map.clear();
points
}
fn write( fn write(
mut points: Vec<DataPoint>,
last_write_time: Instant,
max_points: usize,
writer: &Arc<dyn MetricsWriter + Send + Sync>, writer: &Arc<dyn MetricsWriter + Send + Sync>,
mut points: Vec<DataPoint>,
max_points: usize,
max_points_per_sec: usize, max_points_per_sec: usize,
) -> usize { last_write_time: Instant,
) {
if points.is_empty() { if points.is_empty() {
return 0; return;
} }
let now = Instant::now(); let now = Instant::now();
@ -182,49 +201,45 @@ impl MetricsAgent {
); );
writer.write(points); writer.write(points);
points_written
} }
fn run( fn run(
receiver: &Receiver<MetricsCommand>, receiver: &Receiver<MetricsCommand>,
writer: &Arc<dyn MetricsWriter + Send + Sync>, writer: &Arc<dyn MetricsWriter + Send + Sync>,
write_frequency_secs: Duration, write_frequency: Duration,
max_points_per_sec: usize, max_points_per_sec: usize,
) { ) {
trace!("run: enter"); trace!("run: enter");
let mut last_write_time = Instant::now(); let mut last_write_time = Instant::now();
let mut points_map = HashMap::<log::Level, (Instant, CounterMap, Vec<DataPoint>)>::new(); let mut points_map = HashMap::<log::Level, (CounterMap, Vec<DataPoint>)>::new();
let max_points = write_frequency_secs.as_secs() as usize * max_points_per_sec; let max_points = write_frequency.as_secs() as usize * max_points_per_sec;
loop { loop {
match receiver.recv_timeout(write_frequency_secs / 2) { match receiver.recv_timeout(write_frequency / 2) {
Ok(cmd) => match cmd { Ok(cmd) => match cmd {
MetricsCommand::Flush(barrier) => { MetricsCommand::Flush(barrier) => {
debug!("metrics_thread: flush"); debug!("metrics_thread: flush");
points_map.drain().for_each(|(_, (_, counters, points))| { Self::write(
let counter_points = counters.into_iter().map(|(_, v)| v.into()); writer,
let points: Vec<_> = points.into_iter().chain(counter_points).collect(); Self::collect_points(&mut points_map),
writer.write(points); max_points,
last_write_time = Instant::now(); max_points_per_sec,
}); last_write_time,
);
last_write_time = Instant::now();
barrier.wait(); barrier.wait();
} }
MetricsCommand::Submit(point, level) => { MetricsCommand::Submit(point, level) => {
log!(level, "{}", point); log!(level, "{}", point);
let (_, _, points) = points_map.entry(level).or_insert(( let (_, points) = points_map
last_write_time, .entry(level)
HashMap::new(), .or_insert((HashMap::new(), Vec::new()));
Vec::new(),
));
points.push(point); points.push(point);
} }
MetricsCommand::SubmitCounter(counter, level, bucket) => { MetricsCommand::SubmitCounter(counter, level, bucket) => {
debug!("{:?}", counter); debug!("{:?}", counter);
let (_, counters, _) = points_map.entry(level).or_insert(( let (counters, _) = points_map
last_write_time, .entry(level)
HashMap::new(), .or_insert((HashMap::new(), Vec::new()));
Vec::new(),
));
let key = (counter.name, bucket); let key = (counter.name, bucket);
if let Some(value) = counters.get_mut(&key) { if let Some(value) = counters.get_mut(&key) {
@ -243,37 +258,16 @@ impl MetricsAgent {
} }
} }
let mut num_max_writes = max_points;
let now = Instant::now(); let now = Instant::now();
if now.duration_since(last_write_time) >= write_frequency_secs { if now.duration_since(last_write_time) >= write_frequency {
vec![ Self::write(
Level::Error, writer,
Level::Warn, Self::collect_points(&mut points_map),
Level::Info, max_points,
Level::Debug, max_points_per_sec,
Level::Trace, last_write_time,
] );
.iter() last_write_time = now;
.for_each(|x| {
if let Some((last_time, counters, points)) = points_map.remove(x) {
let counter_points = counters.into_iter().map(|(_, v)| v.into());
let points: Vec<_> = points.into_iter().chain(counter_points).collect();
let num_written = Self::write(
points,
last_time,
num_max_writes,
writer,
max_points_per_sec,
);
if num_written > 0 {
last_write_time = Instant::now();
}
num_max_writes = num_max_writes.saturating_sub(num_written);
}
});
} }
} }
trace!("run: exit"); trace!("run: exit");
@ -501,7 +495,7 @@ mod test {
} }
agent.flush(); agent.flush();
assert_eq!(writer.points_written(), 42); assert_eq!(writer.points_written(), 43);
} }
#[test] #[test]
@ -515,7 +509,7 @@ mod test {
} }
agent.flush(); agent.flush();
assert_eq!(writer.points_written(), 20); assert_eq!(writer.points_written(), 21);
} }
#[test] #[test]
@ -536,7 +530,7 @@ mod test {
} }
agent.flush(); agent.flush();
assert_eq!(writer.points_written(), 1); assert_eq!(writer.points_written(), 2);
let submitted_point = writer.points_written.lock().unwrap()[0].clone(); let submitted_point = writer.points_written.lock().unwrap()[0].clone();
assert_eq!(submitted_point.fields[0], ("count", "100i".to_string())); assert_eq!(submitted_point.fields[0], ("count", "100i".to_string()));
@ -553,7 +547,7 @@ mod test {
} }
agent.flush(); agent.flush();
assert_eq!(writer.points_written(), 10); assert_eq!(writer.points_written(), 11);
} }
#[test] #[test]
@ -613,7 +607,7 @@ mod test {
} }
agent.lock().unwrap().flush(); agent.lock().unwrap().flush();
assert_eq!(writer.points_written(), 42); assert_eq!(writer.points_written(), 43);
} }
#[test] #[test]
@ -624,7 +618,7 @@ mod test {
agent.submit(DataPoint::new("point 1"), Level::Info); agent.submit(DataPoint::new("point 1"), Level::Info);
} }
assert_eq!(writer.points_written(), 1); assert_eq!(writer.points_written(), 2);
} }
#[test] #[test]