cmd, metrics: add support for influxdb-v2 (cherry-picking from italoacasas' changes), leave existing support for v1 to maintain backwards-compatibility. (#23194)
This PR adds flag to enable InfluxDB v2 (--metrics.influxdbv2), flags for v2-specific features (--metrics.influxdb.token, --metrics.influxdb.bucket), also carries over addition of support for specifying organization (--metrics.influxdb.organization), but still retains backwards compatibility with InfluxDB v1.
This commit is contained in:
223
metrics/influxdb/influxdbv2.go
Normal file
223
metrics/influxdb/influxdbv2.go
Normal file
@ -0,0 +1,223 @@
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||
"github.com/influxdata/influxdb-client-go/v2/api"
|
||||
)
|
||||
|
||||
type v2Reporter struct {
|
||||
reg metrics.Registry
|
||||
interval time.Duration
|
||||
|
||||
endpoint string
|
||||
token string
|
||||
bucket string
|
||||
organization string
|
||||
namespace string
|
||||
tags map[string]string
|
||||
|
||||
client influxdb2.Client
|
||||
write api.WriteAPI
|
||||
|
||||
cache map[string]int64
|
||||
}
|
||||
|
||||
// InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
|
||||
func InfluxDBV2WithTags(r metrics.Registry, d time.Duration, endpoint string, token string, bucket string, organization string, namespace string, tags map[string]string) {
|
||||
rep := &v2Reporter{
|
||||
reg: r,
|
||||
interval: d,
|
||||
endpoint: endpoint,
|
||||
token: token,
|
||||
bucket: bucket,
|
||||
organization: organization,
|
||||
namespace: namespace,
|
||||
tags: tags,
|
||||
cache: make(map[string]int64),
|
||||
}
|
||||
|
||||
rep.client = influxdb2.NewClient(rep.endpoint, rep.token)
|
||||
defer rep.client.Close()
|
||||
|
||||
// async write client
|
||||
rep.write = rep.client.WriteAPI(rep.organization, rep.bucket)
|
||||
errorsCh := rep.write.Errors()
|
||||
|
||||
// have to handle write errors in a separate goroutine like this b/c the channel is unbuffered and will block writes if not read
|
||||
go func() {
|
||||
for err := range errorsCh {
|
||||
log.Warn("write error", "err", err.Error())
|
||||
}
|
||||
}()
|
||||
rep.run()
|
||||
}
|
||||
|
||||
func (r *v2Reporter) run() {
|
||||
intervalTicker := time.Tick(r.interval)
|
||||
pingTicker := time.Tick(time.Second * 5)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-intervalTicker:
|
||||
r.send()
|
||||
case <-pingTicker:
|
||||
_, err := r.client.Health(context.Background())
|
||||
if err != nil {
|
||||
log.Warn("Got error from influxdb client health check", "err", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (r *v2Reporter) send() {
|
||||
r.reg.Each(func(name string, i interface{}) {
|
||||
now := time.Now()
|
||||
namespace := r.namespace
|
||||
|
||||
switch metric := i.(type) {
|
||||
|
||||
case metrics.Counter:
|
||||
v := metric.Count()
|
||||
l := r.cache[name]
|
||||
|
||||
measurement := fmt.Sprintf("%s%s.count", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"value": v - l,
|
||||
}
|
||||
|
||||
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
|
||||
r.write.WritePoint(pt)
|
||||
|
||||
r.cache[name] = v
|
||||
|
||||
case metrics.Gauge:
|
||||
ms := metric.Snapshot()
|
||||
|
||||
measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"value": ms.Value(),
|
||||
}
|
||||
|
||||
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
|
||||
r.write.WritePoint(pt)
|
||||
|
||||
case metrics.GaugeFloat64:
|
||||
ms := metric.Snapshot()
|
||||
|
||||
measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"value": ms.Value(),
|
||||
}
|
||||
|
||||
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
|
||||
r.write.WritePoint(pt)
|
||||
|
||||
case metrics.Histogram:
|
||||
ms := metric.Snapshot()
|
||||
|
||||
if ms.Count() > 0 {
|
||||
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
|
||||
measurement := fmt.Sprintf("%s%s.histogram", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"count": ms.Count(),
|
||||
"max": ms.Max(),
|
||||
"mean": ms.Mean(),
|
||||
"min": ms.Min(),
|
||||
"stddev": ms.StdDev(),
|
||||
"variance": ms.Variance(),
|
||||
"p50": ps[0],
|
||||
"p75": ps[1],
|
||||
"p95": ps[2],
|
||||
"p99": ps[3],
|
||||
"p999": ps[4],
|
||||
"p9999": ps[5],
|
||||
}
|
||||
|
||||
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
|
||||
r.write.WritePoint(pt)
|
||||
}
|
||||
|
||||
case metrics.Meter:
|
||||
ms := metric.Snapshot()
|
||||
|
||||
measurement := fmt.Sprintf("%s%s.meter", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"count": ms.Count(),
|
||||
"m1": ms.Rate1(),
|
||||
"m5": ms.Rate5(),
|
||||
"m15": ms.Rate15(),
|
||||
"mean": ms.RateMean(),
|
||||
}
|
||||
|
||||
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
|
||||
r.write.WritePoint(pt)
|
||||
|
||||
case metrics.Timer:
|
||||
ms := metric.Snapshot()
|
||||
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
|
||||
|
||||
measurement := fmt.Sprintf("%s%s.timer", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"count": ms.Count(),
|
||||
"max": ms.Max(),
|
||||
"mean": ms.Mean(),
|
||||
"min": ms.Min(),
|
||||
"stddev": ms.StdDev(),
|
||||
"variance": ms.Variance(),
|
||||
"p50": ps[0],
|
||||
"p75": ps[1],
|
||||
"p95": ps[2],
|
||||
"p99": ps[3],
|
||||
"p999": ps[4],
|
||||
"p9999": ps[5],
|
||||
"m1": ms.Rate1(),
|
||||
"m5": ms.Rate5(),
|
||||
"m15": ms.Rate15(),
|
||||
"meanrate": ms.RateMean(),
|
||||
}
|
||||
|
||||
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
|
||||
r.write.WritePoint(pt)
|
||||
|
||||
case metrics.ResettingTimer:
|
||||
t := metric.Snapshot()
|
||||
|
||||
if len(t.Values()) > 0 {
|
||||
ps := t.Percentiles([]float64{50, 95, 99})
|
||||
val := t.Values()
|
||||
|
||||
measurement := fmt.Sprintf("%s%s.span", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"count": len(val),
|
||||
"max": val[len(val)-1],
|
||||
"mean": t.Mean(),
|
||||
"min": val[0],
|
||||
"p50": ps[0],
|
||||
"p95": ps[1],
|
||||
"p99": ps[2],
|
||||
}
|
||||
|
||||
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
|
||||
r.write.WritePoint(pt)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// Force all unwritten data to be sent
|
||||
r.write.Flush()
|
||||
}
|
Reference in New Issue
Block a user