* cmd/geth: add flags for metrics export * cmd/geth: update usage fields for metrics flags * metrics/influxdb: update reporter logger to adhere to geth logging convention
		
			
				
	
	
		
			228 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			228 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package influxdb
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	uurl "net/url"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/ethereum/go-ethereum/log"
 | 
						|
	"github.com/ethereum/go-ethereum/metrics"
 | 
						|
	"github.com/influxdata/influxdb/client"
 | 
						|
)
 | 
						|
 | 
						|
type reporter struct {
 | 
						|
	reg      metrics.Registry
 | 
						|
	interval time.Duration
 | 
						|
 | 
						|
	url       uurl.URL
 | 
						|
	database  string
 | 
						|
	username  string
 | 
						|
	password  string
 | 
						|
	namespace string
 | 
						|
	tags      map[string]string
 | 
						|
 | 
						|
	client *client.Client
 | 
						|
 | 
						|
	cache map[string]int64
 | 
						|
}
 | 
						|
 | 
						|
// InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval.
 | 
						|
func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) {
 | 
						|
	InfluxDBWithTags(r, d, url, database, username, password, namespace, nil)
 | 
						|
}
 | 
						|
 | 
						|
// InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
 | 
						|
func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) {
 | 
						|
	u, err := uurl.Parse(url)
 | 
						|
	if err != nil {
 | 
						|
		log.Warn("Unable to parse InfluxDB", "url", url, "err", err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	rep := &reporter{
 | 
						|
		reg:       r,
 | 
						|
		interval:  d,
 | 
						|
		url:       *u,
 | 
						|
		database:  database,
 | 
						|
		username:  username,
 | 
						|
		password:  password,
 | 
						|
		namespace: namespace,
 | 
						|
		tags:      tags,
 | 
						|
		cache:     make(map[string]int64),
 | 
						|
	}
 | 
						|
	if err := rep.makeClient(); err != nil {
 | 
						|
		log.Warn("Unable to make InfluxDB client", "err", err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	rep.run()
 | 
						|
}
 | 
						|
 | 
						|
func (r *reporter) makeClient() (err error) {
 | 
						|
	r.client, err = client.NewClient(client.Config{
 | 
						|
		URL:      r.url,
 | 
						|
		Username: r.username,
 | 
						|
		Password: r.password,
 | 
						|
	})
 | 
						|
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func (r *reporter) run() {
 | 
						|
	intervalTicker := time.Tick(r.interval)
 | 
						|
	pingTicker := time.Tick(time.Second * 5)
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-intervalTicker:
 | 
						|
			if err := r.send(); err != nil {
 | 
						|
				log.Warn("Unable to send to InfluxDB", "err", err)
 | 
						|
			}
 | 
						|
		case <-pingTicker:
 | 
						|
			_, _, err := r.client.Ping()
 | 
						|
			if err != nil {
 | 
						|
				log.Warn("Got error while sending a ping to InfluxDB, trying to recreate client", "err", err)
 | 
						|
 | 
						|
				if err = r.makeClient(); err != nil {
 | 
						|
					log.Warn("Unable to make InfluxDB client", "err", err)
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (r *reporter) send() error {
 | 
						|
	var pts []client.Point
 | 
						|
 | 
						|
	r.reg.Each(func(name string, i interface{}) {
 | 
						|
		now := time.Now()
 | 
						|
		namespace := r.namespace
 | 
						|
 | 
						|
		switch metric := i.(type) {
 | 
						|
		case metrics.Counter:
 | 
						|
			v := metric.Count()
 | 
						|
			l := r.cache[name]
 | 
						|
			pts = append(pts, client.Point{
 | 
						|
				Measurement: fmt.Sprintf("%s%s.count", namespace, name),
 | 
						|
				Tags:        r.tags,
 | 
						|
				Fields: map[string]interface{}{
 | 
						|
					"value": v - l,
 | 
						|
				},
 | 
						|
				Time: now,
 | 
						|
			})
 | 
						|
			r.cache[name] = v
 | 
						|
		case metrics.Gauge:
 | 
						|
			ms := metric.Snapshot()
 | 
						|
			pts = append(pts, client.Point{
 | 
						|
				Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
 | 
						|
				Tags:        r.tags,
 | 
						|
				Fields: map[string]interface{}{
 | 
						|
					"value": ms.Value(),
 | 
						|
				},
 | 
						|
				Time: now,
 | 
						|
			})
 | 
						|
		case metrics.GaugeFloat64:
 | 
						|
			ms := metric.Snapshot()
 | 
						|
			pts = append(pts, client.Point{
 | 
						|
				Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
 | 
						|
				Tags:        r.tags,
 | 
						|
				Fields: map[string]interface{}{
 | 
						|
					"value": ms.Value(),
 | 
						|
				},
 | 
						|
				Time: now,
 | 
						|
			})
 | 
						|
		case metrics.Histogram:
 | 
						|
			ms := metric.Snapshot()
 | 
						|
			ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
 | 
						|
			pts = append(pts, client.Point{
 | 
						|
				Measurement: fmt.Sprintf("%s%s.histogram", namespace, name),
 | 
						|
				Tags:        r.tags,
 | 
						|
				Fields: map[string]interface{}{
 | 
						|
					"count":    ms.Count(),
 | 
						|
					"max":      ms.Max(),
 | 
						|
					"mean":     ms.Mean(),
 | 
						|
					"min":      ms.Min(),
 | 
						|
					"stddev":   ms.StdDev(),
 | 
						|
					"variance": ms.Variance(),
 | 
						|
					"p50":      ps[0],
 | 
						|
					"p75":      ps[1],
 | 
						|
					"p95":      ps[2],
 | 
						|
					"p99":      ps[3],
 | 
						|
					"p999":     ps[4],
 | 
						|
					"p9999":    ps[5],
 | 
						|
				},
 | 
						|
				Time: now,
 | 
						|
			})
 | 
						|
		case metrics.Meter:
 | 
						|
			ms := metric.Snapshot()
 | 
						|
			pts = append(pts, client.Point{
 | 
						|
				Measurement: fmt.Sprintf("%s%s.meter", namespace, name),
 | 
						|
				Tags:        r.tags,
 | 
						|
				Fields: map[string]interface{}{
 | 
						|
					"count": ms.Count(),
 | 
						|
					"m1":    ms.Rate1(),
 | 
						|
					"m5":    ms.Rate5(),
 | 
						|
					"m15":   ms.Rate15(),
 | 
						|
					"mean":  ms.RateMean(),
 | 
						|
				},
 | 
						|
				Time: now,
 | 
						|
			})
 | 
						|
		case metrics.Timer:
 | 
						|
			ms := metric.Snapshot()
 | 
						|
			ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
 | 
						|
			pts = append(pts, client.Point{
 | 
						|
				Measurement: fmt.Sprintf("%s%s.timer", namespace, name),
 | 
						|
				Tags:        r.tags,
 | 
						|
				Fields: map[string]interface{}{
 | 
						|
					"count":    ms.Count(),
 | 
						|
					"max":      ms.Max(),
 | 
						|
					"mean":     ms.Mean(),
 | 
						|
					"min":      ms.Min(),
 | 
						|
					"stddev":   ms.StdDev(),
 | 
						|
					"variance": ms.Variance(),
 | 
						|
					"p50":      ps[0],
 | 
						|
					"p75":      ps[1],
 | 
						|
					"p95":      ps[2],
 | 
						|
					"p99":      ps[3],
 | 
						|
					"p999":     ps[4],
 | 
						|
					"p9999":    ps[5],
 | 
						|
					"m1":       ms.Rate1(),
 | 
						|
					"m5":       ms.Rate5(),
 | 
						|
					"m15":      ms.Rate15(),
 | 
						|
					"meanrate": ms.RateMean(),
 | 
						|
				},
 | 
						|
				Time: now,
 | 
						|
			})
 | 
						|
		case metrics.ResettingTimer:
 | 
						|
			t := metric.Snapshot()
 | 
						|
 | 
						|
			if len(t.Values()) > 0 {
 | 
						|
				ps := t.Percentiles([]float64{50, 95, 99})
 | 
						|
				val := t.Values()
 | 
						|
				pts = append(pts, client.Point{
 | 
						|
					Measurement: fmt.Sprintf("%s%s.span", namespace, name),
 | 
						|
					Tags:        r.tags,
 | 
						|
					Fields: map[string]interface{}{
 | 
						|
						"count": len(val),
 | 
						|
						"max":   val[len(val)-1],
 | 
						|
						"mean":  t.Mean(),
 | 
						|
						"min":   val[0],
 | 
						|
						"p50":   ps[0],
 | 
						|
						"p95":   ps[1],
 | 
						|
						"p99":   ps[2],
 | 
						|
					},
 | 
						|
					Time: now,
 | 
						|
				})
 | 
						|
			}
 | 
						|
		}
 | 
						|
	})
 | 
						|
 | 
						|
	bps := client.BatchPoints{
 | 
						|
		Points:   pts,
 | 
						|
		Database: r.database,
 | 
						|
	}
 | 
						|
 | 
						|
	_, err := r.client.Write(bps)
 | 
						|
	return err
 | 
						|
}
 |