diff --git a/Cargo.lock b/Cargo.lock index 9ec8e1a745..4aaad9fb10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3685,8 +3685,10 @@ name = "solana-replicator" version = "0.19.0-pre0" dependencies = [ "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", + "console 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "solana-core 0.19.0-pre0", "solana-logger 0.19.0-pre0", + "solana-metrics 0.19.0-pre0", "solana-netutil 0.19.0-pre0", "solana-sdk 0.19.0-pre0", ] diff --git a/metrics/src/lib.rs b/metrics/src/lib.rs index 81f11034fa..a46e3124d7 100644 --- a/metrics/src/lib.rs +++ b/metrics/src/lib.rs @@ -3,8 +3,5 @@ pub mod counter; mod metrics; -pub use crate::metrics::flush; -pub use crate::metrics::query; -pub use crate::metrics::set_panic_hook; -pub use crate::metrics::submit; +pub use crate::metrics::{flush, query, set_host_id, set_panic_hook, submit}; pub use influx_db_client as influxdb; diff --git a/metrics/src/metrics.rs b/metrics/src/metrics.rs index a523eb7112..5e929f2e30 100644 --- a/metrics/src/metrics.rs +++ b/metrics/src/metrics.rs @@ -9,7 +9,7 @@ use solana_sdk::timing; use std::collections::HashMap; use std::convert::Into; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; -use std::sync::{Arc, Barrier, Mutex, Once}; +use std::sync::{Arc, Barrier, Mutex, Once, RwLock}; use std::thread; use std::time::{Duration, Instant}; use std::{cmp, env}; @@ -120,15 +120,6 @@ macro_rules! datapoint_debug { }; } -lazy_static! { - static ref HOST_ID: String = { - env::var("SOLANA_METRICS_HOST_ID").unwrap_or_else(|_| { - let hostname: String = hostname().unwrap_or_else(|_| "".to_string()); - format!("host-{}", hash(hostname.as_bytes())).to_string() - }) - }; -} - type CounterMap = HashMap<(&'static str, u64), CounterPoint>; #[derive(Clone, Debug)] @@ -152,7 +143,6 @@ impl CounterPoint { impl Into for CounterPoint { fn into(self) -> influxdb::Point { let mut point = influxdb::Point::new(self.name); - point.add_tag("host_id", influxdb::Value::String(HOST_ID.to_string())); point.add_field("count", influxdb::Value::Integer(self.count)); point.add_timestamp(self.timestamp as i64); point @@ -250,11 +240,13 @@ impl MetricsAgent { thread::spawn(move || { Self::run(&receiver, &writer, write_frequency_secs, max_points_per_sec) }); + Self { sender } } fn write( - points: &[Point], + host_id: &influxdb::Value, + mut points: Vec, last_write_time: Instant, max_points: usize, writer: &Arc, @@ -275,10 +267,11 @@ impl MetricsAgent { ); } let points_written = cmp::min(num_points, max_points - 1); + points.truncate(points_written); let extra = influxdb::Point::new("metrics") .add_timestamp(timing::timestamp() as i64) - .add_tag("host_id", influxdb::Value::String(HOST_ID.to_string())) + .add_tag("host_id", host_id.clone()) .add_field( "points_written", influxdb::Value::Integer(points_written as i64), @@ -294,7 +287,12 @@ impl MetricsAgent { ) .to_owned(); - writer.write(points[0..points_written].to_vec()); + for point in &mut points { + // TODO: rework influx_db_client crate API to avoid this unnecessary cloning + point.add_tag("host_id", host_id.clone()); + } + + writer.write(points); writer.write([extra].to_vec()); points_written @@ -361,6 +359,7 @@ impl MetricsAgent { let mut num_max_writes = max_points; let now = Instant::now(); + let host_id = HOST_ID.read().unwrap(); if now.duration_since(last_write_time) >= write_frequency_secs { vec![ Level::Error, @@ -375,7 +374,8 @@ impl MetricsAgent { let counter_points = counters.into_iter().map(|(_, v)| v.into()); let points: Vec<_> = points.into_iter().chain(counter_points).collect(); let num_written = Self::write( - &points, + &host_id, + points, last_time, num_max_writes, writer, @@ -440,10 +440,24 @@ fn get_singleton_agent() -> Arc> { } } +lazy_static! { + static ref HOST_ID: Arc> = { + Arc::new(RwLock::new(influx_db_client::Value::String({ + let hostname: String = hostname().unwrap_or_else(|_| "".to_string()); + format!("{}", hash(hostname.as_bytes())).to_string() + }))) + }; +} + +pub fn set_host_id(host_id: String) { + let mut rw = HOST_ID.write().unwrap(); + info!("host id: {}", host_id); + std::mem::replace(&mut *rw, influx_db_client::Value::String(host_id)); +} + /// Submits a new point from any thread. Note that points are internally queued /// and transmitted periodically in batches. -pub fn submit(mut point: influxdb::Point, level: log::Level) { - point.add_tag("host_id", influxdb::Value::String(HOST_ID.to_string())); +pub fn submit(point: influxdb::Point, level: log::Level) { let agent_mutex = get_singleton_agent(); let agent = agent_mutex.lock().unwrap(); agent.submit(point, level); @@ -542,7 +556,6 @@ pub fn set_panic_hook(program: &'static str) { thread::current().name().unwrap_or("?").to_string(), ), ) - .add_tag("host_id", influxdb::Value::String(HOST_ID.to_string())) // The 'one' field exists to give Kapacitor Alerts a numerical value // to filter on .add_field("one", influxdb::Value::Integer(1)) diff --git a/multinode-demo/bootstrap-leader.sh b/multinode-demo/bootstrap-leader.sh index 51e00b2fe6..2891e23b58 100755 --- a/multinode-demo/bootstrap-leader.sh +++ b/multinode-demo/bootstrap-leader.sh @@ -71,9 +71,6 @@ args+=( ) default_arg --gossip-port 8001 -identity_pubkey=$($solana_keygen pubkey "$identity_keypair") -export SOLANA_METRICS_HOST_ID="$identity_pubkey" - set -x # shellcheck disable=SC2086 # Don't want to double quote $program exec $program "${args[@]}" diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index 25de11c3f3..f57fc3e12a 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -277,21 +277,6 @@ while true; do [[ -r "$storage_keypair_path" ]] || $solana_keygen new -o "$storage_keypair_path" setup_validator_accounts "$node_lamports" - - vote_pubkey=$($solana_keygen pubkey "$voting_keypair_path") - storage_pubkey=$($solana_keygen pubkey "$storage_keypair_path") - identity_pubkey=$($solana_keygen pubkey "$identity_keypair_path") - export SOLANA_METRICS_HOST_ID="$identity_pubkey" - - cat <