metrics: pull library and introduce ResettingTimer and InfluxDB reporter (#15910)
* go-metrics: fork library and introduce ResettingTimer and InfluxDB reporter. * vendor: change nonsense/go-metrics to ethersphere/go-metrics * go-metrics: add tests. move ResettingTimer logic from reporter to type. * all, metrics: pull in metrics package in go-ethereum * metrics/test: make sure metrics are enabled for tests * metrics: apply gosimple rules * metrics/exp, internal/debug: init expvar endpoint when starting pprof server * internal/debug: tiny comment formatting fix
This commit is contained in:
committed by
Péter Szilágyi
parent
7f74bdf8dd
commit
ae9f97221a
19
metrics/influxdb/LICENSE
Normal file
19
metrics/influxdb/LICENSE
Normal file
@ -0,0 +1,19 @@
|
||||
Copyright (c) 2015 Vincent Rischmann
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
30
metrics/influxdb/README.md
Normal file
30
metrics/influxdb/README.md
Normal file
@ -0,0 +1,30 @@
|
||||
go-metrics-influxdb
|
||||
===================
|
||||
|
||||
This is a reporter for the [go-metrics](https://github.com/rcrowley/go-metrics) library which will post the metrics to [InfluxDB](https://influxdb.com/).
|
||||
|
||||
Note
|
||||
----
|
||||
|
||||
This is only compatible with InfluxDB 0.9+.
|
||||
|
||||
Usage
|
||||
-----
|
||||
|
||||
```go
|
||||
import "github.com/vrischmann/go-metrics-influxdb"
|
||||
|
||||
go influxdb.InfluxDB(
|
||||
metrics.DefaultRegistry, // metrics registry
|
||||
time.Second * 10, // interval
|
||||
"http://localhost:8086", // the InfluxDB url
|
||||
"mydb", // your InfluxDB database
|
||||
"myuser", // your InfluxDB user
|
||||
"mypassword", // your InfluxDB password
|
||||
)
|
||||
```
|
||||
|
||||
License
|
||||
-------
|
||||
|
||||
go-metrics-influxdb is licensed under the MIT license. See the LICENSE file for details.
|
227
metrics/influxdb/influxdb.go
Normal file
227
metrics/influxdb/influxdb.go
Normal file
@ -0,0 +1,227 @@
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
uurl "net/url"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/influxdata/influxdb/client"
|
||||
)
|
||||
|
||||
type reporter struct {
|
||||
reg metrics.Registry
|
||||
interval time.Duration
|
||||
|
||||
url uurl.URL
|
||||
database string
|
||||
username string
|
||||
password string
|
||||
namespace string
|
||||
tags map[string]string
|
||||
|
||||
client *client.Client
|
||||
|
||||
cache map[string]int64
|
||||
}
|
||||
|
||||
// InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval.
|
||||
func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) {
|
||||
InfluxDBWithTags(r, d, url, database, username, password, namespace, nil)
|
||||
}
|
||||
|
||||
// InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
|
||||
func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) {
|
||||
u, err := uurl.Parse(url)
|
||||
if err != nil {
|
||||
log.Printf("unable to parse InfluxDB url %s. err=%v", url, err)
|
||||
return
|
||||
}
|
||||
|
||||
rep := &reporter{
|
||||
reg: r,
|
||||
interval: d,
|
||||
url: *u,
|
||||
database: database,
|
||||
username: username,
|
||||
password: password,
|
||||
namespace: namespace,
|
||||
tags: tags,
|
||||
cache: make(map[string]int64),
|
||||
}
|
||||
if err := rep.makeClient(); err != nil {
|
||||
log.Printf("unable to make InfluxDB client. err=%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
rep.run()
|
||||
}
|
||||
|
||||
func (r *reporter) makeClient() (err error) {
|
||||
r.client, err = client.NewClient(client.Config{
|
||||
URL: r.url,
|
||||
Username: r.username,
|
||||
Password: r.password,
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (r *reporter) run() {
|
||||
intervalTicker := time.Tick(r.interval)
|
||||
pingTicker := time.Tick(time.Second * 5)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-intervalTicker:
|
||||
if err := r.send(); err != nil {
|
||||
log.Printf("unable to send to InfluxDB. err=%v", err)
|
||||
}
|
||||
case <-pingTicker:
|
||||
_, _, err := r.client.Ping()
|
||||
if err != nil {
|
||||
log.Printf("got error while sending a ping to InfluxDB, trying to recreate client. err=%v", err)
|
||||
|
||||
if err = r.makeClient(); err != nil {
|
||||
log.Printf("unable to make InfluxDB client. err=%v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *reporter) send() error {
|
||||
var pts []client.Point
|
||||
|
||||
r.reg.Each(func(name string, i interface{}) {
|
||||
now := time.Now()
|
||||
namespace := r.namespace
|
||||
|
||||
switch metric := i.(type) {
|
||||
case metrics.Counter:
|
||||
v := metric.Count()
|
||||
l := r.cache[name]
|
||||
pts = append(pts, client.Point{
|
||||
Measurement: fmt.Sprintf("%s%s.count", namespace, name),
|
||||
Tags: r.tags,
|
||||
Fields: map[string]interface{}{
|
||||
"value": v - l,
|
||||
},
|
||||
Time: now,
|
||||
})
|
||||
r.cache[name] = v
|
||||
case metrics.Gauge:
|
||||
ms := metric.Snapshot()
|
||||
pts = append(pts, client.Point{
|
||||
Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
|
||||
Tags: r.tags,
|
||||
Fields: map[string]interface{}{
|
||||
"value": ms.Value(),
|
||||
},
|
||||
Time: now,
|
||||
})
|
||||
case metrics.GaugeFloat64:
|
||||
ms := metric.Snapshot()
|
||||
pts = append(pts, client.Point{
|
||||
Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
|
||||
Tags: r.tags,
|
||||
Fields: map[string]interface{}{
|
||||
"value": ms.Value(),
|
||||
},
|
||||
Time: now,
|
||||
})
|
||||
case metrics.Histogram:
|
||||
ms := metric.Snapshot()
|
||||
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
|
||||
pts = append(pts, client.Point{
|
||||
Measurement: fmt.Sprintf("%s%s.histogram", namespace, name),
|
||||
Tags: r.tags,
|
||||
Fields: map[string]interface{}{
|
||||
"count": ms.Count(),
|
||||
"max": ms.Max(),
|
||||
"mean": ms.Mean(),
|
||||
"min": ms.Min(),
|
||||
"stddev": ms.StdDev(),
|
||||
"variance": ms.Variance(),
|
||||
"p50": ps[0],
|
||||
"p75": ps[1],
|
||||
"p95": ps[2],
|
||||
"p99": ps[3],
|
||||
"p999": ps[4],
|
||||
"p9999": ps[5],
|
||||
},
|
||||
Time: now,
|
||||
})
|
||||
case metrics.Meter:
|
||||
ms := metric.Snapshot()
|
||||
pts = append(pts, client.Point{
|
||||
Measurement: fmt.Sprintf("%s%s.meter", namespace, name),
|
||||
Tags: r.tags,
|
||||
Fields: map[string]interface{}{
|
||||
"count": ms.Count(),
|
||||
"m1": ms.Rate1(),
|
||||
"m5": ms.Rate5(),
|
||||
"m15": ms.Rate15(),
|
||||
"mean": ms.RateMean(),
|
||||
},
|
||||
Time: now,
|
||||
})
|
||||
case metrics.Timer:
|
||||
ms := metric.Snapshot()
|
||||
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
|
||||
pts = append(pts, client.Point{
|
||||
Measurement: fmt.Sprintf("%s%s.timer", namespace, name),
|
||||
Tags: r.tags,
|
||||
Fields: map[string]interface{}{
|
||||
"count": ms.Count(),
|
||||
"max": ms.Max(),
|
||||
"mean": ms.Mean(),
|
||||
"min": ms.Min(),
|
||||
"stddev": ms.StdDev(),
|
||||
"variance": ms.Variance(),
|
||||
"p50": ps[0],
|
||||
"p75": ps[1],
|
||||
"p95": ps[2],
|
||||
"p99": ps[3],
|
||||
"p999": ps[4],
|
||||
"p9999": ps[5],
|
||||
"m1": ms.Rate1(),
|
||||
"m5": ms.Rate5(),
|
||||
"m15": ms.Rate15(),
|
||||
"meanrate": ms.RateMean(),
|
||||
},
|
||||
Time: now,
|
||||
})
|
||||
case metrics.ResettingTimer:
|
||||
t := metric.Snapshot()
|
||||
|
||||
if len(t.Values()) > 0 {
|
||||
ps := t.Percentiles([]float64{50, 95, 99})
|
||||
val := t.Values()
|
||||
pts = append(pts, client.Point{
|
||||
Measurement: fmt.Sprintf("%s%s.span", namespace, name),
|
||||
Tags: r.tags,
|
||||
Fields: map[string]interface{}{
|
||||
"count": len(val),
|
||||
"max": val[len(val)-1],
|
||||
"mean": t.Mean(),
|
||||
"min": val[0],
|
||||
"p50": ps[0],
|
||||
"p95": ps[1],
|
||||
"p99": ps[2],
|
||||
},
|
||||
Time: now,
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
bps := client.BatchPoints{
|
||||
Points: pts,
|
||||
Database: r.database,
|
||||
}
|
||||
|
||||
_, err := r.client.Write(bps)
|
||||
return err
|
||||
}
|
Reference in New Issue
Block a user