257 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			257 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package influxdb
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	uurl "net/url"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/ethereum/go-ethereum/log"
 | |
| 	"github.com/ethereum/go-ethereum/metrics"
 | |
| 	"github.com/influxdata/influxdb/client"
 | |
| )
 | |
| 
 | |
| type reporter struct {
 | |
| 	reg      metrics.Registry
 | |
| 	interval time.Duration
 | |
| 
 | |
| 	url       uurl.URL
 | |
| 	database  string
 | |
| 	username  string
 | |
| 	password  string
 | |
| 	namespace string
 | |
| 	tags      map[string]string
 | |
| 
 | |
| 	client *client.Client
 | |
| 
 | |
| 	cache map[string]int64
 | |
| }
 | |
| 
 | |
| // InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval.
 | |
| func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) {
 | |
| 	InfluxDBWithTags(r, d, url, database, username, password, namespace, nil)
 | |
| }
 | |
| 
 | |
| // InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
 | |
| func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) {
 | |
| 	u, err := uurl.Parse(url)
 | |
| 	if err != nil {
 | |
| 		log.Warn("Unable to parse InfluxDB", "url", url, "err", err)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	rep := &reporter{
 | |
| 		reg:       r,
 | |
| 		interval:  d,
 | |
| 		url:       *u,
 | |
| 		database:  database,
 | |
| 		username:  username,
 | |
| 		password:  password,
 | |
| 		namespace: namespace,
 | |
| 		tags:      tags,
 | |
| 		cache:     make(map[string]int64),
 | |
| 	}
 | |
| 	if err := rep.makeClient(); err != nil {
 | |
| 		log.Warn("Unable to make InfluxDB client", "err", err)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	rep.run()
 | |
| }
 | |
| 
 | |
| // InfluxDBWithTagsOnce runs once an InfluxDB reporter and post the given metrics.Registry with the specified tags
 | |
| func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) error {
 | |
| 	u, err := uurl.Parse(url)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("Unable to parse InfluxDB. url: %s, err: %v", url, err)
 | |
| 	}
 | |
| 
 | |
| 	rep := &reporter{
 | |
| 		reg:       r,
 | |
| 		url:       *u,
 | |
| 		database:  database,
 | |
| 		username:  username,
 | |
| 		password:  password,
 | |
| 		namespace: namespace,
 | |
| 		tags:      tags,
 | |
| 		cache:     make(map[string]int64),
 | |
| 	}
 | |
| 	if err := rep.makeClient(); err != nil {
 | |
| 		return fmt.Errorf("Unable to make InfluxDB client. err: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	if err := rep.send(); err != nil {
 | |
| 		return fmt.Errorf("Unable to send to InfluxDB. err: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (r *reporter) makeClient() (err error) {
 | |
| 	r.client, err = client.NewClient(client.Config{
 | |
| 		URL:      r.url,
 | |
| 		Username: r.username,
 | |
| 		Password: r.password,
 | |
| 		Timeout:  10 * time.Second,
 | |
| 	})
 | |
| 
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (r *reporter) run() {
 | |
| 	intervalTicker := time.Tick(r.interval)
 | |
| 	pingTicker := time.Tick(time.Second * 5)
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-intervalTicker:
 | |
| 			if err := r.send(); err != nil {
 | |
| 				log.Warn("Unable to send to InfluxDB", "err", err)
 | |
| 			}
 | |
| 		case <-pingTicker:
 | |
| 			_, _, err := r.client.Ping()
 | |
| 			if err != nil {
 | |
| 				log.Warn("Got error while sending a ping to InfluxDB, trying to recreate client", "err", err)
 | |
| 
 | |
| 				if err = r.makeClient(); err != nil {
 | |
| 					log.Warn("Unable to make InfluxDB client", "err", err)
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (r *reporter) send() error {
 | |
| 	var pts []client.Point
 | |
| 
 | |
| 	r.reg.Each(func(name string, i interface{}) {
 | |
| 		now := time.Now()
 | |
| 		namespace := r.namespace
 | |
| 
 | |
| 		switch metric := i.(type) {
 | |
| 		case metrics.Counter:
 | |
| 			v := metric.Count()
 | |
| 			l := r.cache[name]
 | |
| 			pts = append(pts, client.Point{
 | |
| 				Measurement: fmt.Sprintf("%s%s.count", namespace, name),
 | |
| 				Tags:        r.tags,
 | |
| 				Fields: map[string]interface{}{
 | |
| 					"value": v - l,
 | |
| 				},
 | |
| 				Time: now,
 | |
| 			})
 | |
| 			r.cache[name] = v
 | |
| 		case metrics.Gauge:
 | |
| 			ms := metric.Snapshot()
 | |
| 			pts = append(pts, client.Point{
 | |
| 				Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
 | |
| 				Tags:        r.tags,
 | |
| 				Fields: map[string]interface{}{
 | |
| 					"value": ms.Value(),
 | |
| 				},
 | |
| 				Time: now,
 | |
| 			})
 | |
| 		case metrics.GaugeFloat64:
 | |
| 			ms := metric.Snapshot()
 | |
| 			pts = append(pts, client.Point{
 | |
| 				Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
 | |
| 				Tags:        r.tags,
 | |
| 				Fields: map[string]interface{}{
 | |
| 					"value": ms.Value(),
 | |
| 				},
 | |
| 				Time: now,
 | |
| 			})
 | |
| 		case metrics.Histogram:
 | |
| 			ms := metric.Snapshot()
 | |
| 			ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
 | |
| 			pts = append(pts, client.Point{
 | |
| 				Measurement: fmt.Sprintf("%s%s.histogram", namespace, name),
 | |
| 				Tags:        r.tags,
 | |
| 				Fields: map[string]interface{}{
 | |
| 					"count":    ms.Count(),
 | |
| 					"max":      ms.Max(),
 | |
| 					"mean":     ms.Mean(),
 | |
| 					"min":      ms.Min(),
 | |
| 					"stddev":   ms.StdDev(),
 | |
| 					"variance": ms.Variance(),
 | |
| 					"p50":      ps[0],
 | |
| 					"p75":      ps[1],
 | |
| 					"p95":      ps[2],
 | |
| 					"p99":      ps[3],
 | |
| 					"p999":     ps[4],
 | |
| 					"p9999":    ps[5],
 | |
| 				},
 | |
| 				Time: now,
 | |
| 			})
 | |
| 		case metrics.Meter:
 | |
| 			ms := metric.Snapshot()
 | |
| 			pts = append(pts, client.Point{
 | |
| 				Measurement: fmt.Sprintf("%s%s.meter", namespace, name),
 | |
| 				Tags:        r.tags,
 | |
| 				Fields: map[string]interface{}{
 | |
| 					"count": ms.Count(),
 | |
| 					"m1":    ms.Rate1(),
 | |
| 					"m5":    ms.Rate5(),
 | |
| 					"m15":   ms.Rate15(),
 | |
| 					"mean":  ms.RateMean(),
 | |
| 				},
 | |
| 				Time: now,
 | |
| 			})
 | |
| 		case metrics.Timer:
 | |
| 			ms := metric.Snapshot()
 | |
| 			ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
 | |
| 			pts = append(pts, client.Point{
 | |
| 				Measurement: fmt.Sprintf("%s%s.timer", namespace, name),
 | |
| 				Tags:        r.tags,
 | |
| 				Fields: map[string]interface{}{
 | |
| 					"count":    ms.Count(),
 | |
| 					"max":      ms.Max(),
 | |
| 					"mean":     ms.Mean(),
 | |
| 					"min":      ms.Min(),
 | |
| 					"stddev":   ms.StdDev(),
 | |
| 					"variance": ms.Variance(),
 | |
| 					"p50":      ps[0],
 | |
| 					"p75":      ps[1],
 | |
| 					"p95":      ps[2],
 | |
| 					"p99":      ps[3],
 | |
| 					"p999":     ps[4],
 | |
| 					"p9999":    ps[5],
 | |
| 					"m1":       ms.Rate1(),
 | |
| 					"m5":       ms.Rate5(),
 | |
| 					"m15":      ms.Rate15(),
 | |
| 					"meanrate": ms.RateMean(),
 | |
| 				},
 | |
| 				Time: now,
 | |
| 			})
 | |
| 		case metrics.ResettingTimer:
 | |
| 			t := metric.Snapshot()
 | |
| 
 | |
| 			if len(t.Values()) > 0 {
 | |
| 				ps := t.Percentiles([]float64{50, 95, 99})
 | |
| 				val := t.Values()
 | |
| 				pts = append(pts, client.Point{
 | |
| 					Measurement: fmt.Sprintf("%s%s.span", namespace, name),
 | |
| 					Tags:        r.tags,
 | |
| 					Fields: map[string]interface{}{
 | |
| 						"count": len(val),
 | |
| 						"max":   val[len(val)-1],
 | |
| 						"mean":  t.Mean(),
 | |
| 						"min":   val[0],
 | |
| 						"p50":   ps[0],
 | |
| 						"p95":   ps[1],
 | |
| 						"p99":   ps[2],
 | |
| 					},
 | |
| 					Time: now,
 | |
| 				})
 | |
| 			}
 | |
| 		}
 | |
| 	})
 | |
| 
 | |
| 	bps := client.BatchPoints{
 | |
| 		Points:   pts,
 | |
| 		Database: r.database,
 | |
| 	}
 | |
| 
 | |
| 	_, err := r.client.Write(bps)
 | |
| 	return err
 | |
| }
 |