Use ureq instead of influx_db_client (#5839)
This commit is contained in:
@ -10,7 +10,6 @@ edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
env_logger = "0.6.2"
|
||||
influx_db_client = "0.3.6"
|
||||
lazy_static = "1.4.0"
|
||||
log = "0.4.8"
|
||||
solana-sdk = { path = "../sdk", version = "0.19.0-pre0" }
|
||||
@ -19,7 +18,6 @@ ureq = "0.11.0"
|
||||
|
||||
[dev-dependencies]
|
||||
rand = "0.6.5"
|
||||
serde_json = "1.0"
|
||||
serial_test = "0.2.0"
|
||||
serial_test_derive = "0.2.0"
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
use crate::metrics::{submit_counter, CounterPoint};
|
||||
use crate::metrics::submit_counter;
|
||||
use log::*;
|
||||
use solana_sdk::timing;
|
||||
use std::env;
|
||||
@ -19,6 +19,24 @@ pub struct Counter {
|
||||
pub metricsrate: AtomicU64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct CounterPoint {
|
||||
pub name: &'static str,
|
||||
pub count: i64,
|
||||
pub timestamp: u64,
|
||||
}
|
||||
|
||||
impl CounterPoint {
|
||||
#[cfg(test)]
|
||||
pub fn new(name: &'static str) -> Self {
|
||||
CounterPoint {
|
||||
name,
|
||||
count: 0,
|
||||
timestamp: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! create_counter {
|
||||
($name:expr, $lograte:expr, $metricsrate:expr) => {
|
||||
|
176
metrics/src/datapoint.rs
Normal file
176
metrics/src/datapoint.rs
Normal file
@ -0,0 +1,176 @@
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DataPoint {
|
||||
pub name: &'static str,
|
||||
pub timestamp: u64,
|
||||
pub fields: Vec<(&'static str, String)>,
|
||||
}
|
||||
|
||||
impl DataPoint {
|
||||
pub fn new(name: &'static str) -> Self {
|
||||
DataPoint {
|
||||
name,
|
||||
timestamp: solana_sdk::timing::timestamp(),
|
||||
fields: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_field_str(&mut self, name: &'static str, value: &str) -> &mut Self {
|
||||
self.fields
|
||||
.push((name, format!("\"{}\"", value.replace("\"", "\\\""))));
|
||||
self
|
||||
}
|
||||
|
||||
pub fn add_field_bool(&mut self, name: &'static str, value: bool) -> &mut Self {
|
||||
self.fields.push((name, value.to_string()));
|
||||
self
|
||||
}
|
||||
|
||||
pub fn add_field_i64(&mut self, name: &'static str, value: i64) -> &mut Self {
|
||||
self.fields.push((name, value.to_string() + "i"));
|
||||
self
|
||||
}
|
||||
|
||||
pub fn add_field_f64(&mut self, name: &'static str, value: f64) -> &mut Self {
|
||||
self.fields.push((name, value.to_string()));
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! datapoint {
|
||||
(@field $point:ident $name:expr, $string:expr, String) => {
|
||||
$point.add_field_str($name, &$string);
|
||||
};
|
||||
(@field $point:ident $name:expr, $value:expr, i64) => {
|
||||
$point.add_field_i64($name, $value as i64);
|
||||
};
|
||||
(@field $point:ident $name:expr, $value:expr, f64) => {
|
||||
$point.add_field_f64($name, $value as f64);
|
||||
};
|
||||
(@field $point:ident $name:expr, $value:expr, bool) => {
|
||||
$point.add_field_bool($name, $value as bool);
|
||||
};
|
||||
|
||||
(@fields $point:ident) => {};
|
||||
(@fields $point:ident ($name:expr, $value:expr, $type:ident) , $($rest:tt)*) => {
|
||||
$crate::datapoint!(@field $point $name, $value, $type);
|
||||
$crate::datapoint!(@fields $point $($rest)*);
|
||||
};
|
||||
(@fields $point:ident ($name:expr, $value:expr, $type:ident)) => {
|
||||
$crate::datapoint!(@field $point $name, $value, $type);
|
||||
};
|
||||
|
||||
(@point $name:expr, $($fields:tt)+) => {
|
||||
{
|
||||
let mut point = $crate::datapoint::DataPoint::new(&$name);
|
||||
$crate::datapoint!(@fields point $($fields)+);
|
||||
point
|
||||
}
|
||||
};
|
||||
(@point $name:expr) => {
|
||||
$crate::datapoint::DataPoint::new(&$name)
|
||||
};
|
||||
($name:expr, $($fields:tt)+) => {
|
||||
if log::log_enabled!(log::Level::Debug) {
|
||||
$crate::submit($crate::datapoint!(@point $name, $($fields)+), log::Level::Debug);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! datapoint_error {
|
||||
($name:expr) => {
|
||||
if log::log_enabled!(log::Level::Error) {
|
||||
$crate::submit($crate::datapoint!(@point $name), log::Level::Error);
|
||||
}
|
||||
};
|
||||
($name:expr, $($fields:tt)+) => {
|
||||
if log::log_enabled!(log::Level::Error) {
|
||||
$crate::submit($crate::datapoint!(@point $name, $($fields)+), log::Level::Error);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! datapoint_warn {
|
||||
($name:expr) => {
|
||||
if log::log_enabled!(log::Level::Warn) {
|
||||
$crate::submit($crate::datapoint!(@point $name), log::Level::Warn);
|
||||
}
|
||||
};
|
||||
($name:expr, $($fields:tt)+) => {
|
||||
if log::log_enabled!(log::Level::Warn) {
|
||||
$crate::submit($crate::datapoint!(@point $name, $($fields)+), log::Level::Warn);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! datapoint_info {
|
||||
($name:expr) => {
|
||||
if log::log_enabled!(log::Level::Info) {
|
||||
$crate::submit($crate::datapoint!(@point $name), log::Level::Info);
|
||||
}
|
||||
};
|
||||
($name:expr, $($fields:tt)+) => {
|
||||
if log::log_enabled!(log::Level::Info) {
|
||||
$crate::submit($crate::datapoint!(@point $name, $($fields)+), log::Level::Info);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! datapoint_debug {
|
||||
($name:expr) => {
|
||||
if log::log_enabled!(log::Level::Debug) {
|
||||
$crate::submit($crate::datapoint!(@point $name), log::Level::Debug);
|
||||
}
|
||||
};
|
||||
($name:expr, $($fields:tt)+) => {
|
||||
if log::log_enabled!(log::Level::Debug) {
|
||||
$crate::submit($crate::datapoint!(@point $name, $($fields)+), log::Level::Debug);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
#[test]
|
||||
fn test_datapoint() {
|
||||
datapoint!("name", ("field name", "test".to_string(), String));
|
||||
datapoint!("name", ("field name", 12.34_f64, f64));
|
||||
datapoint!("name", ("field name", true, bool));
|
||||
datapoint!("name", ("field name", 1, i64));
|
||||
datapoint!("name", ("field name", 1, i64),);
|
||||
datapoint!("name", ("field1 name", 2, i64), ("field2 name", 2, i64));
|
||||
datapoint!("name", ("field1 name", 2, i64), ("field2 name", 2, i64),);
|
||||
datapoint!(
|
||||
"name",
|
||||
("field1 name", 2, i64),
|
||||
("field2 name", 2, i64),
|
||||
("field3 name", 3, i64)
|
||||
);
|
||||
datapoint!(
|
||||
"name",
|
||||
("field1 name", 2, i64),
|
||||
("field2 name", 2, i64),
|
||||
("field3 name", 3, i64),
|
||||
);
|
||||
|
||||
let point = datapoint!(
|
||||
@point "name",
|
||||
("i64", 1, i64),
|
||||
("String", "string space string".to_string(), String),
|
||||
("f64", 12.34_f64, f64),
|
||||
("bool", true, bool)
|
||||
);
|
||||
assert_eq!(point.name, "name");
|
||||
assert_eq!(point.fields[0], ("i64", "1i".to_string()));;
|
||||
assert_eq!(
|
||||
point.fields[1],
|
||||
("String", "\"string space string\"".to_string())
|
||||
);
|
||||
assert_eq!(point.fields[2], ("f64", "12.34".to_string()));
|
||||
assert_eq!(point.fields[3], ("bool", "true".to_string()));
|
||||
}
|
||||
}
|
@ -1,7 +1,4 @@
|
||||
#[macro_use]
|
||||
pub mod counter;
|
||||
|
||||
pub mod datapoint;
|
||||
mod metrics;
|
||||
|
||||
pub use crate::metrics::{flush, query, set_host_id, set_panic_hook, submit};
|
||||
pub use influx_db_client as influxdb;
|
||||
|
@ -1,11 +1,9 @@
|
||||
//! The `metrics` module enables sending measurements to an `InfluxDB` instance
|
||||
|
||||
use influx_db_client as influxdb;
|
||||
use influx_db_client::Point;
|
||||
use crate::{counter::CounterPoint, datapoint::DataPoint};
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use solana_sdk::hash::hash;
|
||||
use solana_sdk::timing;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::Into;
|
||||
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
|
||||
@ -15,136 +13,13 @@ use std::time::{Duration, Instant};
|
||||
use std::{cmp, env};
|
||||
use sys_info::hostname;
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! datapoint {
|
||||
(@field $point:ident $name:expr, $string:expr, String) => {
|
||||
$point.add_field(
|
||||
$name,
|
||||
$crate::influxdb::Value::String($string));
|
||||
};
|
||||
(@field $point:ident $name:expr, $value:expr, i64) => {
|
||||
$point.add_field(
|
||||
$name,
|
||||
$crate::influxdb::Value::Integer($value as i64));
|
||||
};
|
||||
(@field $point:ident $name:expr, $value:expr, f64) => {
|
||||
$point.add_field(
|
||||
$name,
|
||||
$crate::influxdb::Value::Float($value as f64));
|
||||
};
|
||||
(@field $point:ident $name:expr, $value:expr, bool) => {
|
||||
$point.add_field(
|
||||
$name,
|
||||
$crate::influxdb::Value::Boolean($value as bool));
|
||||
};
|
||||
|
||||
(@fields $point:ident) => {};
|
||||
(@fields $point:ident ($name:expr, $value:expr, $type:ident) , $($rest:tt)*) => {
|
||||
$crate::datapoint!(@field $point $name, $value, $type);
|
||||
$crate::datapoint!(@fields $point $($rest)*);
|
||||
};
|
||||
(@fields $point:ident ($name:expr, $value:expr, $type:ident)) => {
|
||||
$crate::datapoint!(@field $point $name, $value, $type);
|
||||
};
|
||||
|
||||
(@point $name:expr, $($fields:tt)+) => {
|
||||
{
|
||||
let mut point = $crate::influxdb::Point::new(&$name);
|
||||
$crate::datapoint!(@fields point $($fields)+);
|
||||
point
|
||||
}
|
||||
};
|
||||
(@point $name:expr) => {
|
||||
$crate::influxdb::Point::new(&$name)
|
||||
};
|
||||
($name:expr, $($fields:tt)+) => {
|
||||
if log_enabled!(log::Level::Debug) {
|
||||
$crate::submit($crate::datapoint!(@point $name, $($fields)+), log::Level::Debug);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! datapoint_error {
|
||||
($name:expr) => {
|
||||
if log_enabled!(log::Level::Error) {
|
||||
$crate::submit($crate::datapoint!(@point $name), log::Level::Error);
|
||||
}
|
||||
};
|
||||
($name:expr, $($fields:tt)+) => {
|
||||
if log_enabled!(log::Level::Error) {
|
||||
$crate::submit($crate::datapoint!(@point $name, $($fields)+), log::Level::Error);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! datapoint_warn {
|
||||
($name:expr) => {
|
||||
if log_enabled!(log::Level::Warn) {
|
||||
$crate::submit($crate::datapoint!(@point $name), log::Level::Warn);
|
||||
}
|
||||
};
|
||||
($name:expr, $($fields:tt)+) => {
|
||||
if log_enabled!(log::Level::Warn) {
|
||||
$crate::submit($crate::datapoint!(@point $name, $($fields)+), log::Level::Warn);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! datapoint_info {
|
||||
($name:expr) => {
|
||||
if log_enabled!(log::Level::Info) {
|
||||
$crate::submit($crate::datapoint!(@point $name), log::Level::Info);
|
||||
}
|
||||
};
|
||||
($name:expr, $($fields:tt)+) => {
|
||||
if log_enabled!(log::Level::Info) {
|
||||
$crate::submit($crate::datapoint!(@point $name, $($fields)+), log::Level::Info);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! datapoint_debug {
|
||||
($name:expr) => {
|
||||
if log_enabled!(log::Level::Debug) {
|
||||
$crate::submit($crate::datapoint!(@point $name), log::Level::Debug);
|
||||
}
|
||||
};
|
||||
($name:expr, $($fields:tt)+) => {
|
||||
if log_enabled!(log::Level::Debug) {
|
||||
$crate::submit($crate::datapoint!(@point $name, $($fields)+), log::Level::Debug);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
type CounterMap = HashMap<(&'static str, u64), CounterPoint>;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct CounterPoint {
|
||||
pub name: &'static str,
|
||||
pub count: i64,
|
||||
pub timestamp: u64,
|
||||
}
|
||||
|
||||
impl CounterPoint {
|
||||
#[cfg(test)]
|
||||
fn new(name: &'static str) -> Self {
|
||||
CounterPoint {
|
||||
name,
|
||||
count: 0,
|
||||
timestamp: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<influxdb::Point> for CounterPoint {
|
||||
fn into(self) -> influxdb::Point {
|
||||
let mut point = influxdb::Point::new(self.name);
|
||||
point.add_field("count", influxdb::Value::Integer(self.count));
|
||||
point.add_timestamp(self.timestamp as i64);
|
||||
impl Into<DataPoint> for CounterPoint {
|
||||
fn into(self) -> DataPoint {
|
||||
let mut point = DataPoint::new(self.name);
|
||||
point.timestamp = self.timestamp;
|
||||
point.add_field_i64("count", self.count);
|
||||
point
|
||||
}
|
||||
}
|
||||
@ -152,7 +27,7 @@ impl Into<influxdb::Point> for CounterPoint {
|
||||
#[derive(Debug)]
|
||||
enum MetricsCommand {
|
||||
Flush(Arc<Barrier>),
|
||||
Submit(influxdb::Point, log::Level),
|
||||
Submit(DataPoint, log::Level),
|
||||
SubmitCounter(CounterPoint, log::Level, u64),
|
||||
}
|
||||
|
||||
@ -163,21 +38,21 @@ struct MetricsAgent {
|
||||
trait MetricsWriter {
|
||||
// Write the points and empty the vector. Called on the internal
|
||||
// MetricsAgent worker thread.
|
||||
fn write(&self, points: Vec<influxdb::Point>);
|
||||
fn write(&self, points: Vec<DataPoint>);
|
||||
}
|
||||
|
||||
struct InfluxDbMetricsWriter {
|
||||
client: Option<influxdb::Client>,
|
||||
write_url: Option<String>,
|
||||
}
|
||||
|
||||
impl InfluxDbMetricsWriter {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
client: Self::build_client().ok(),
|
||||
write_url: Self::build_write_url().ok(),
|
||||
}
|
||||
}
|
||||
|
||||
fn build_client() -> Result<influxdb::Client, String> {
|
||||
fn build_write_url() -> Result<String, String> {
|
||||
let config = get_metrics_config().map_err(|err| {
|
||||
info!("metrics disabled: {}", err);
|
||||
err
|
||||
@ -187,28 +62,51 @@ impl InfluxDbMetricsWriter {
|
||||
"metrics configuration: host={} db={} username={}",
|
||||
config.host, config.db, config.username
|
||||
);
|
||||
let mut client = influxdb::Client::new_with_option(config.host, config.db, None)
|
||||
.set_authentication(config.username, config.password);
|
||||
|
||||
client.set_read_timeout(1 /*second*/);
|
||||
client.set_write_timeout(1 /*second*/);
|
||||
let write_url = format!(
|
||||
"{}/write?db={}&u={}&p={}&precision=ms",
|
||||
&config.host, &config.db, &config.username, &config.password
|
||||
);
|
||||
|
||||
debug!("InfluxDB version: {:?}", client.get_version());
|
||||
Ok(client)
|
||||
Ok(write_url)
|
||||
}
|
||||
}
|
||||
|
||||
impl MetricsWriter for InfluxDbMetricsWriter {
|
||||
fn write(&self, points: Vec<influxdb::Point>) {
|
||||
if let Some(ref client) = self.client {
|
||||
debug!("submitting {} points", points.len());
|
||||
if let Err(err) = client.write_points(
|
||||
influxdb::Points { point: points },
|
||||
Some(influxdb::Precision::Milliseconds),
|
||||
None,
|
||||
) {
|
||||
debug!("InfluxDbMetricsWriter write error: {:?}", err);
|
||||
fn write(&self, points: Vec<DataPoint>) {
|
||||
if let Some(ref write_url) = self.write_url {
|
||||
info!("submitting {} points", points.len());
|
||||
|
||||
let host_id = HOST_ID.read().unwrap();
|
||||
|
||||
let mut line = String::new();
|
||||
for point in points {
|
||||
line.push_str(&format!("{},host_id={}", &point.name, &host_id));
|
||||
|
||||
let mut first = true;
|
||||
for (name, value) in point.fields {
|
||||
line.push_str(&format!(
|
||||
"{}{}={}",
|
||||
if first { ' ' } else { ',' },
|
||||
name,
|
||||
value
|
||||
));
|
||||
first = false;
|
||||
}
|
||||
|
||||
line.push_str(&format!(" {}\n", &point.timestamp));
|
||||
}
|
||||
|
||||
let response = ureq::post(write_url.as_str())
|
||||
.timeout_connect(2_000)
|
||||
.timeout_read(2_000)
|
||||
.timeout_write(4_000)
|
||||
.send_string(&line);
|
||||
info!(
|
||||
"submit response: {} {}",
|
||||
response.status(),
|
||||
response.status_text()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -245,8 +143,7 @@ impl MetricsAgent {
|
||||
}
|
||||
|
||||
fn write(
|
||||
host_id: &influxdb::Value,
|
||||
mut points: Vec<Point>,
|
||||
mut points: Vec<DataPoint>,
|
||||
last_write_time: Instant,
|
||||
max_points: usize,
|
||||
writer: &Arc<dyn MetricsWriter + Send + Sync>,
|
||||
@ -268,33 +165,19 @@ impl MetricsAgent {
|
||||
}
|
||||
let points_written = cmp::min(num_points, max_points - 1);
|
||||
points.truncate(points_written);
|
||||
|
||||
let extra = influxdb::Point::new("metrics")
|
||||
.add_timestamp(timing::timestamp() as i64)
|
||||
.add_tag("host_id", host_id.clone())
|
||||
.add_field(
|
||||
"points_written",
|
||||
influxdb::Value::Integer(points_written as i64),
|
||||
)
|
||||
.add_field("num_points", influxdb::Value::Integer(num_points as i64))
|
||||
.add_field(
|
||||
"points_lost",
|
||||
influxdb::Value::Integer((num_points - points_written) as i64),
|
||||
)
|
||||
.add_field(
|
||||
"secs_since_last_write",
|
||||
influxdb::Value::Integer(now.duration_since(last_write_time).as_secs() as i64),
|
||||
)
|
||||
.to_owned();
|
||||
|
||||
for point in &mut points {
|
||||
// TODO: rework influx_db_client crate API to avoid this unnecessary cloning
|
||||
point.add_tag("host_id", host_id.clone());
|
||||
}
|
||||
points.push(
|
||||
DataPoint::new("metrics")
|
||||
.add_field_i64("points_written", points_written as i64)
|
||||
.add_field_i64("num_points", num_points as i64)
|
||||
.add_field_i64("points_lost", (num_points - points_written) as i64)
|
||||
.add_field_i64(
|
||||
"secs_since_last_write",
|
||||
now.duration_since(last_write_time).as_secs() as i64,
|
||||
)
|
||||
.to_owned(),
|
||||
);
|
||||
|
||||
writer.write(points);
|
||||
writer.write([extra].to_vec());
|
||||
|
||||
points_written
|
||||
}
|
||||
|
||||
@ -306,7 +189,7 @@ impl MetricsAgent {
|
||||
) {
|
||||
trace!("run: enter");
|
||||
let mut last_write_time = Instant::now();
|
||||
let mut points_map = HashMap::<log::Level, (Instant, CounterMap, Vec<Point>)>::new();
|
||||
let mut points_map = HashMap::<log::Level, (Instant, CounterMap, Vec<DataPoint>)>::new();
|
||||
let max_points = write_frequency_secs.as_secs() as usize * max_points_per_sec;
|
||||
|
||||
loop {
|
||||
@ -359,7 +242,6 @@ impl MetricsAgent {
|
||||
let mut num_max_writes = max_points;
|
||||
|
||||
let now = Instant::now();
|
||||
let host_id = HOST_ID.read().unwrap();
|
||||
if now.duration_since(last_write_time) >= write_frequency_secs {
|
||||
vec![
|
||||
Level::Error,
|
||||
@ -374,7 +256,6 @@ impl MetricsAgent {
|
||||
let counter_points = counters.into_iter().map(|(_, v)| v.into());
|
||||
let points: Vec<_> = points.into_iter().chain(counter_points).collect();
|
||||
let num_written = Self::write(
|
||||
&host_id,
|
||||
points,
|
||||
last_time,
|
||||
num_max_writes,
|
||||
@ -394,11 +275,8 @@ impl MetricsAgent {
|
||||
trace!("run: exit");
|
||||
}
|
||||
|
||||
pub fn submit(&self, mut point: influxdb::Point, level: log::Level) {
|
||||
if point.timestamp.is_none() {
|
||||
point.add_timestamp(timing::timestamp() as i64);
|
||||
}
|
||||
debug!("Submitting point: {:?}", point);
|
||||
pub fn submit(&self, point: DataPoint, level: log::Level) {
|
||||
debug!("Submitting data point: {:?}", point);
|
||||
self.sender
|
||||
.send(MetricsCommand::Submit(point, level))
|
||||
.unwrap();
|
||||
@ -441,23 +319,23 @@ fn get_singleton_agent() -> Arc<Mutex<MetricsAgent>> {
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref HOST_ID: Arc<RwLock<influx_db_client::Value>> = {
|
||||
Arc::new(RwLock::new(influx_db_client::Value::String({
|
||||
static ref HOST_ID: Arc<RwLock<String>> = {
|
||||
Arc::new(RwLock::new({
|
||||
let hostname: String = hostname().unwrap_or_else(|_| "".to_string());
|
||||
format!("{}", hash(hostname.as_bytes())).to_string()
|
||||
})))
|
||||
}))
|
||||
};
|
||||
}
|
||||
|
||||
pub fn set_host_id(host_id: String) {
|
||||
let mut rw = HOST_ID.write().unwrap();
|
||||
info!("host id: {}", host_id);
|
||||
std::mem::replace(&mut *rw, influx_db_client::Value::String(host_id));
|
||||
std::mem::replace(&mut *rw, host_id);
|
||||
}
|
||||
|
||||
/// Submits a new point from any thread. Note that points are internally queued
|
||||
/// and transmitted periodically in batches.
|
||||
pub fn submit(point: influxdb::Point, level: log::Level) {
|
||||
pub fn submit(point: DataPoint, level: log::Level) {
|
||||
let agent_mutex = get_singleton_agent();
|
||||
let agent = agent_mutex.lock().unwrap();
|
||||
agent.submit(point, level);
|
||||
@ -465,7 +343,7 @@ pub fn submit(point: influxdb::Point, level: log::Level) {
|
||||
|
||||
/// Submits a new counter or updates an existing counter from any thread. Note that points are
|
||||
/// internally queued and transmitted periodically in batches.
|
||||
pub fn submit_counter(point: CounterPoint, level: log::Level, bucket: u64) {
|
||||
pub(crate) fn submit_counter(point: CounterPoint, level: log::Level, bucket: u64) {
|
||||
let agent_mutex = get_singleton_agent();
|
||||
let agent = agent_mutex.lock().unwrap();
|
||||
agent.submit_counter(point, level, bucket);
|
||||
@ -518,12 +396,12 @@ fn get_metrics_config() -> Result<MetricsConfig, String> {
|
||||
|
||||
pub fn query(q: &str) -> Result<String, String> {
|
||||
let config = get_metrics_config().map_err(|err| err.to_string())?;
|
||||
let query = format!(
|
||||
let query_url = format!(
|
||||
"{}/query?u={}&p={}&q={}",
|
||||
&config.host, &config.username, &config.password, &q
|
||||
);
|
||||
|
||||
let response = ureq::get(query.as_str())
|
||||
let response = ureq::get(query_url.as_str())
|
||||
.call()
|
||||
.into_string()
|
||||
.map_err(|err| err.to_string())?;
|
||||
@ -547,32 +425,23 @@ pub fn set_panic_hook(program: &'static str) {
|
||||
let default_hook = panic::take_hook();
|
||||
panic::set_hook(Box::new(move |ono| {
|
||||
default_hook(ono);
|
||||
let location = match ono.location() {
|
||||
Some(location) => location.to_string(),
|
||||
None => "?".to_string(),
|
||||
};
|
||||
submit(
|
||||
influxdb::Point::new("panic")
|
||||
.add_tag("program", influxdb::Value::String(program.to_string()))
|
||||
.add_tag(
|
||||
"thread",
|
||||
influxdb::Value::String(
|
||||
thread::current().name().unwrap_or("?").to_string(),
|
||||
),
|
||||
)
|
||||
DataPoint::new("panic")
|
||||
.add_field_str("program", program)
|
||||
.add_field_str("thread", thread::current().name().unwrap_or("?"))
|
||||
// The 'one' field exists to give Kapacitor Alerts a numerical value
|
||||
// to filter on
|
||||
.add_field("one", influxdb::Value::Integer(1))
|
||||
.add_field(
|
||||
.add_field_i64("one", 1)
|
||||
.add_field_str(
|
||||
"message",
|
||||
influxdb::Value::String(
|
||||
// TODO: use ono.message() when it becomes stable
|
||||
ono.to_string(),
|
||||
),
|
||||
)
|
||||
.add_field(
|
||||
"location",
|
||||
influxdb::Value::String(match ono.location() {
|
||||
Some(location) => location.to_string(),
|
||||
None => "?".to_string(),
|
||||
}),
|
||||
// TODO: use ono.message() when it becomes stable
|
||||
&ono.to_string(),
|
||||
)
|
||||
.add_field_str("location", &location)
|
||||
.to_owned(),
|
||||
Level::Error,
|
||||
);
|
||||
@ -586,10 +455,9 @@ pub fn set_panic_hook(program: &'static str) {
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use serde_json;
|
||||
|
||||
struct MockMetricsWriter {
|
||||
points_written: Arc<Mutex<Vec<influxdb::Point>>>,
|
||||
points_written: Arc<Mutex<Vec<DataPoint>>>,
|
||||
}
|
||||
impl MockMetricsWriter {
|
||||
fn new() -> Self {
|
||||
@ -604,7 +472,7 @@ mod test {
|
||||
}
|
||||
|
||||
impl MetricsWriter for MockMetricsWriter {
|
||||
fn write(&self, points: Vec<influxdb::Point>) {
|
||||
fn write(&self, points: Vec<DataPoint>) {
|
||||
assert!(!points.is_empty());
|
||||
|
||||
let new_points = points.len();
|
||||
@ -616,7 +484,7 @@ mod test {
|
||||
info!(
|
||||
"Writing {} points ({} total)",
|
||||
new_points,
|
||||
self.points_written()
|
||||
self.points_written(),
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -628,7 +496,9 @@ mod test {
|
||||
|
||||
for i in 0..42 {
|
||||
agent.submit(
|
||||
influxdb::Point::new(&format!("measurement {}", i)),
|
||||
DataPoint::new("measurement")
|
||||
.add_field_i64("i", i)
|
||||
.to_owned(),
|
||||
Level::Info,
|
||||
);
|
||||
}
|
||||
@ -643,8 +513,8 @@ mod test {
|
||||
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
|
||||
|
||||
for i in 0..10 {
|
||||
agent.submit_counter(CounterPoint::new("counter - 1"), Level::Info, i);
|
||||
agent.submit_counter(CounterPoint::new("counter - 2"), Level::Info, i);
|
||||
agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i);
|
||||
agent.submit_counter(CounterPoint::new("counter 2"), Level::Info, i);
|
||||
}
|
||||
|
||||
agent.flush();
|
||||
@ -672,12 +542,7 @@ mod test {
|
||||
assert_eq!(writer.points_written(), 1);
|
||||
|
||||
let submitted_point = writer.points_written.lock().unwrap()[0].clone();
|
||||
let submitted_count = submitted_point.fields.get("count").unwrap();
|
||||
let expected_count = &influxdb::Value::Integer(100);
|
||||
assert_eq!(
|
||||
serde_json::to_string(submitted_count).unwrap(),
|
||||
serde_json::to_string(expected_count).unwrap()
|
||||
);
|
||||
assert_eq!(submitted_point.fields[0], ("count", "100i".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -686,8 +551,8 @@ mod test {
|
||||
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
|
||||
|
||||
for i in 0..50 {
|
||||
agent.submit_counter(CounterPoint::new("counter - 1"), Level::Info, i / 10);
|
||||
agent.submit_counter(CounterPoint::new("counter - 2"), Level::Info, i / 10);
|
||||
agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i / 10);
|
||||
agent.submit_counter(CounterPoint::new("counter 2"), Level::Info, i / 10);
|
||||
}
|
||||
|
||||
agent.flush();
|
||||
@ -699,7 +564,7 @@ mod test {
|
||||
let writer = Arc::new(MockMetricsWriter::new());
|
||||
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 1000);
|
||||
|
||||
agent.submit(influxdb::Point::new("point 1"), Level::Info);
|
||||
agent.submit(DataPoint::new("point 1"), Level::Info);
|
||||
thread::sleep(Duration::from_secs(2));
|
||||
assert_eq!(writer.points_written(), 2);
|
||||
}
|
||||
@ -711,7 +576,9 @@ mod test {
|
||||
|
||||
for i in 0..102 {
|
||||
agent.submit(
|
||||
influxdb::Point::new(&format!("measurement {}", i)),
|
||||
DataPoint::new("measurement")
|
||||
.add_field_i64("i", i)
|
||||
.to_owned(),
|
||||
Level::Info,
|
||||
);
|
||||
}
|
||||
@ -736,7 +603,8 @@ mod test {
|
||||
//
|
||||
let mut threads = Vec::new();
|
||||
for i in 0..42 {
|
||||
let point = influxdb::Point::new(&format!("measurement {}", i));
|
||||
let mut point = DataPoint::new("measurement");
|
||||
point.add_field_i64("i", i);
|
||||
let agent = Arc::clone(&agent);
|
||||
threads.push(thread::spawn(move || {
|
||||
agent.lock().unwrap().submit(point, Level::Info);
|
||||
@ -756,7 +624,7 @@ mod test {
|
||||
let writer = Arc::new(MockMetricsWriter::new());
|
||||
{
|
||||
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(9999999), 1000);
|
||||
agent.submit(influxdb::Point::new("point 1"), Level::Info);
|
||||
agent.submit(DataPoint::new("point 1"), Level::Info);
|
||||
}
|
||||
|
||||
assert_eq!(writer.points_written(), 1);
|
||||
@ -766,80 +634,11 @@ mod test {
|
||||
fn test_live_submit() {
|
||||
let agent = MetricsAgent::default();
|
||||
|
||||
let point = influxdb::Point::new("live_submit_test")
|
||||
.add_tag("test", influxdb::Value::Boolean(true))
|
||||
.add_field(
|
||||
"random_bool",
|
||||
influxdb::Value::Boolean(rand::random::<u8>() < 128),
|
||||
)
|
||||
.add_field(
|
||||
"random_int",
|
||||
influxdb::Value::Integer(rand::random::<u8>() as i64),
|
||||
)
|
||||
let point = DataPoint::new("live_submit_test")
|
||||
.add_field_bool("true", true)
|
||||
.add_field_bool("random_bool", rand::random::<u8>() < 128)
|
||||
.add_field_i64("random_int", rand::random::<u8>() as i64)
|
||||
.to_owned();
|
||||
agent.submit(point, Level::Info);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_datapoint() {
|
||||
macro_rules! matches {
|
||||
($e:expr, $p:pat) => {
|
||||
match $e {
|
||||
$p => true,
|
||||
_ => false,
|
||||
}
|
||||
};
|
||||
}
|
||||
datapoint!("name", ("field name", "test".to_string(), String));
|
||||
datapoint!("name", ("field name", 12.34_f64, f64));
|
||||
datapoint!("name", ("field name", true, bool));
|
||||
datapoint!("name", ("field name", 1, i64));
|
||||
datapoint!("name", ("field name", 1, i64),);
|
||||
datapoint!("name", ("field1 name", 2, i64), ("field2 name", 2, i64));
|
||||
datapoint!("name", ("field1 name", 2, i64), ("field2 name", 2, i64),);
|
||||
datapoint!(
|
||||
"name",
|
||||
("field1 name", 2, i64),
|
||||
("field2 name", 2, i64),
|
||||
("field3 name", 3, i64)
|
||||
);
|
||||
datapoint!(
|
||||
"name",
|
||||
("field1 name", 2, i64),
|
||||
("field2 name", 2, i64),
|
||||
("field3 name", 3, i64),
|
||||
);
|
||||
|
||||
let point = datapoint!(@point "name", ("i64", 1, i64), ("String", "string".to_string(), String), ("f64", 12.34_f64, f64), ("bool", true, bool));
|
||||
assert_eq!(point.measurement, "name");
|
||||
assert!(matches!(
|
||||
point.fields.get("i64").unwrap(),
|
||||
influxdb::Value::Integer(1)
|
||||
));
|
||||
assert!(match point.fields.get("String").unwrap() {
|
||||
influxdb::Value::String(ref s) => {
|
||||
if s == "string" {
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
_ => false,
|
||||
});
|
||||
assert!(match point.fields.get("f64").unwrap() {
|
||||
influxdb::Value::Float(f) => {
|
||||
if *f == 12.34_f64 {
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
_ => false,
|
||||
});
|
||||
assert!(matches!(
|
||||
point.fields.get("bool").unwrap(),
|
||||
influxdb::Value::Boolean(true)
|
||||
));
|
||||
}
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user