Validator/replicator metrics host id is no longer set by bash (#5755)
automerge
This commit is contained in:
@ -3,8 +3,5 @@ pub mod counter;
|
||||
|
||||
mod metrics;
|
||||
|
||||
pub use crate::metrics::flush;
|
||||
pub use crate::metrics::query;
|
||||
pub use crate::metrics::set_panic_hook;
|
||||
pub use crate::metrics::submit;
|
||||
pub use crate::metrics::{flush, query, set_host_id, set_panic_hook, submit};
|
||||
pub use influx_db_client as influxdb;
|
||||
|
@ -9,7 +9,7 @@ use solana_sdk::timing;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::Into;
|
||||
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
|
||||
use std::sync::{Arc, Barrier, Mutex, Once};
|
||||
use std::sync::{Arc, Barrier, Mutex, Once, RwLock};
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{cmp, env};
|
||||
@ -120,15 +120,6 @@ macro_rules! datapoint_debug {
|
||||
};
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref HOST_ID: String = {
|
||||
env::var("SOLANA_METRICS_HOST_ID").unwrap_or_else(|_| {
|
||||
let hostname: String = hostname().unwrap_or_else(|_| "".to_string());
|
||||
format!("host-{}", hash(hostname.as_bytes())).to_string()
|
||||
})
|
||||
};
|
||||
}
|
||||
|
||||
type CounterMap = HashMap<(&'static str, u64), CounterPoint>;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@ -152,7 +143,6 @@ impl CounterPoint {
|
||||
impl Into<influxdb::Point> for CounterPoint {
|
||||
fn into(self) -> influxdb::Point {
|
||||
let mut point = influxdb::Point::new(self.name);
|
||||
point.add_tag("host_id", influxdb::Value::String(HOST_ID.to_string()));
|
||||
point.add_field("count", influxdb::Value::Integer(self.count));
|
||||
point.add_timestamp(self.timestamp as i64);
|
||||
point
|
||||
@ -250,11 +240,13 @@ impl MetricsAgent {
|
||||
thread::spawn(move || {
|
||||
Self::run(&receiver, &writer, write_frequency_secs, max_points_per_sec)
|
||||
});
|
||||
|
||||
Self { sender }
|
||||
}
|
||||
|
||||
fn write(
|
||||
points: &[Point],
|
||||
host_id: &influxdb::Value,
|
||||
mut points: Vec<Point>,
|
||||
last_write_time: Instant,
|
||||
max_points: usize,
|
||||
writer: &Arc<dyn MetricsWriter + Send + Sync>,
|
||||
@ -275,10 +267,11 @@ impl MetricsAgent {
|
||||
);
|
||||
}
|
||||
let points_written = cmp::min(num_points, max_points - 1);
|
||||
points.truncate(points_written);
|
||||
|
||||
let extra = influxdb::Point::new("metrics")
|
||||
.add_timestamp(timing::timestamp() as i64)
|
||||
.add_tag("host_id", influxdb::Value::String(HOST_ID.to_string()))
|
||||
.add_tag("host_id", host_id.clone())
|
||||
.add_field(
|
||||
"points_written",
|
||||
influxdb::Value::Integer(points_written as i64),
|
||||
@ -294,7 +287,12 @@ impl MetricsAgent {
|
||||
)
|
||||
.to_owned();
|
||||
|
||||
writer.write(points[0..points_written].to_vec());
|
||||
for point in &mut points {
|
||||
// TODO: rework influx_db_client crate API to avoid this unnecessary cloning
|
||||
point.add_tag("host_id", host_id.clone());
|
||||
}
|
||||
|
||||
writer.write(points);
|
||||
writer.write([extra].to_vec());
|
||||
|
||||
points_written
|
||||
@ -361,6 +359,7 @@ impl MetricsAgent {
|
||||
let mut num_max_writes = max_points;
|
||||
|
||||
let now = Instant::now();
|
||||
let host_id = HOST_ID.read().unwrap();
|
||||
if now.duration_since(last_write_time) >= write_frequency_secs {
|
||||
vec![
|
||||
Level::Error,
|
||||
@ -375,7 +374,8 @@ impl MetricsAgent {
|
||||
let counter_points = counters.into_iter().map(|(_, v)| v.into());
|
||||
let points: Vec<_> = points.into_iter().chain(counter_points).collect();
|
||||
let num_written = Self::write(
|
||||
&points,
|
||||
&host_id,
|
||||
points,
|
||||
last_time,
|
||||
num_max_writes,
|
||||
writer,
|
||||
@ -440,10 +440,24 @@ fn get_singleton_agent() -> Arc<Mutex<MetricsAgent>> {
|
||||
}
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref HOST_ID: Arc<RwLock<influx_db_client::Value>> = {
|
||||
Arc::new(RwLock::new(influx_db_client::Value::String({
|
||||
let hostname: String = hostname().unwrap_or_else(|_| "".to_string());
|
||||
format!("{}", hash(hostname.as_bytes())).to_string()
|
||||
})))
|
||||
};
|
||||
}
|
||||
|
||||
pub fn set_host_id(host_id: String) {
|
||||
let mut rw = HOST_ID.write().unwrap();
|
||||
info!("host id: {}", host_id);
|
||||
std::mem::replace(&mut *rw, influx_db_client::Value::String(host_id));
|
||||
}
|
||||
|
||||
/// Submits a new point from any thread. Note that points are internally queued
|
||||
/// and transmitted periodically in batches.
|
||||
pub fn submit(mut point: influxdb::Point, level: log::Level) {
|
||||
point.add_tag("host_id", influxdb::Value::String(HOST_ID.to_string()));
|
||||
pub fn submit(point: influxdb::Point, level: log::Level) {
|
||||
let agent_mutex = get_singleton_agent();
|
||||
let agent = agent_mutex.lock().unwrap();
|
||||
agent.submit(point, level);
|
||||
@ -542,7 +556,6 @@ pub fn set_panic_hook(program: &'static str) {
|
||||
thread::current().name().unwrap_or("?").to_string(),
|
||||
),
|
||||
)
|
||||
.add_tag("host_id", influxdb::Value::String(HOST_ID.to_string()))
|
||||
// The 'one' field exists to give Kapacitor Alerts a numerical value
|
||||
// to filter on
|
||||
.add_field("one", influxdb::Value::Integer(1))
|
||||
|
Reference in New Issue
Block a user