| 
									
										
										
										
											2018-02-23 10:56:08 +01:00
										 |  |  | package influxdb | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"fmt" | 
					
						
							|  |  |  | 	uurl "net/url" | 
					
						
							|  |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-07-02 14:51:02 +02:00
										 |  |  | 	"github.com/ethereum/go-ethereum/log" | 
					
						
							| 
									
										
										
										
											2018-02-23 10:56:08 +01:00
										 |  |  | 	"github.com/ethereum/go-ethereum/metrics" | 
					
						
							|  |  |  | 	"github.com/influxdata/influxdb/client" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type reporter struct { | 
					
						
							|  |  |  | 	reg      metrics.Registry | 
					
						
							|  |  |  | 	interval time.Duration | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	url       uurl.URL | 
					
						
							|  |  |  | 	database  string | 
					
						
							|  |  |  | 	username  string | 
					
						
							|  |  |  | 	password  string | 
					
						
							|  |  |  | 	namespace string | 
					
						
							|  |  |  | 	tags      map[string]string | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	client *client.Client | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	cache map[string]int64 | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval. | 
					
						
							|  |  |  | func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) { | 
					
						
							|  |  |  | 	InfluxDBWithTags(r, d, url, database, username, password, namespace, nil) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags | 
					
						
							|  |  |  | func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) { | 
					
						
							|  |  |  | 	u, err := uurl.Parse(url) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2018-07-02 14:51:02 +02:00
										 |  |  | 		log.Warn("Unable to parse InfluxDB", "url", url, "err", err) | 
					
						
							| 
									
										
										
										
											2018-02-23 10:56:08 +01:00
										 |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	rep := &reporter{ | 
					
						
							|  |  |  | 		reg:       r, | 
					
						
							|  |  |  | 		interval:  d, | 
					
						
							|  |  |  | 		url:       *u, | 
					
						
							|  |  |  | 		database:  database, | 
					
						
							|  |  |  | 		username:  username, | 
					
						
							|  |  |  | 		password:  password, | 
					
						
							|  |  |  | 		namespace: namespace, | 
					
						
							|  |  |  | 		tags:      tags, | 
					
						
							|  |  |  | 		cache:     make(map[string]int64), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if err := rep.makeClient(); err != nil { | 
					
						
							| 
									
										
										
										
											2018-07-02 14:51:02 +02:00
										 |  |  | 		log.Warn("Unable to make InfluxDB client", "err", err) | 
					
						
							| 
									
										
										
										
											2018-02-23 10:56:08 +01:00
										 |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	rep.run() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-12-11 13:51:58 +05:30
										 |  |  | // InfluxDBWithTagsOnce runs once an InfluxDB reporter and post the given metrics.Registry with the specified tags | 
					
						
							|  |  |  | func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) error { | 
					
						
							|  |  |  | 	u, err := uurl.Parse(url) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2019-11-22 16:04:35 +01:00
										 |  |  | 		return fmt.Errorf("unable to parse InfluxDB. url: %s, err: %v", url, err) | 
					
						
							| 
									
										
										
										
											2018-12-11 13:51:58 +05:30
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	rep := &reporter{ | 
					
						
							|  |  |  | 		reg:       r, | 
					
						
							|  |  |  | 		url:       *u, | 
					
						
							|  |  |  | 		database:  database, | 
					
						
							|  |  |  | 		username:  username, | 
					
						
							|  |  |  | 		password:  password, | 
					
						
							|  |  |  | 		namespace: namespace, | 
					
						
							|  |  |  | 		tags:      tags, | 
					
						
							|  |  |  | 		cache:     make(map[string]int64), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if err := rep.makeClient(); err != nil { | 
					
						
							| 
									
										
										
										
											2019-11-22 16:04:35 +01:00
										 |  |  | 		return fmt.Errorf("unable to make InfluxDB client. err: %v", err) | 
					
						
							| 
									
										
										
										
											2018-12-11 13:51:58 +05:30
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if err := rep.send(); err != nil { | 
					
						
							| 
									
										
										
										
											2019-11-22 16:04:35 +01:00
										 |  |  | 		return fmt.Errorf("unable to send to InfluxDB. err: %v", err) | 
					
						
							| 
									
										
										
										
											2018-12-11 13:51:58 +05:30
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-02-23 10:56:08 +01:00
										 |  |  | func (r *reporter) makeClient() (err error) { | 
					
						
							|  |  |  | 	r.client, err = client.NewClient(client.Config{ | 
					
						
							|  |  |  | 		URL:      r.url, | 
					
						
							|  |  |  | 		Username: r.username, | 
					
						
							|  |  |  | 		Password: r.password, | 
					
						
							| 
									
										
										
										
											2019-03-20 21:30:34 +01:00
										 |  |  | 		Timeout:  10 * time.Second, | 
					
						
							| 
									
										
										
										
											2018-02-23 10:56:08 +01:00
										 |  |  | 	}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *reporter) run() { | 
					
						
							|  |  |  | 	intervalTicker := time.Tick(r.interval) | 
					
						
							|  |  |  | 	pingTicker := time.Tick(time.Second * 5) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case <-intervalTicker: | 
					
						
							|  |  |  | 			if err := r.send(); err != nil { | 
					
						
							| 
									
										
										
										
											2018-07-02 14:51:02 +02:00
										 |  |  | 				log.Warn("Unable to send to InfluxDB", "err", err) | 
					
						
							| 
									
										
										
										
											2018-02-23 10:56:08 +01:00
										 |  |  | 			} | 
					
						
							|  |  |  | 		case <-pingTicker: | 
					
						
							|  |  |  | 			_, _, err := r.client.Ping() | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							| 
									
										
										
										
											2018-07-02 14:51:02 +02:00
										 |  |  | 				log.Warn("Got error while sending a ping to InfluxDB, trying to recreate client", "err", err) | 
					
						
							| 
									
										
										
										
											2018-02-23 10:56:08 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 				if err = r.makeClient(); err != nil { | 
					
						
							| 
									
										
										
										
											2018-07-02 14:51:02 +02:00
										 |  |  | 					log.Warn("Unable to make InfluxDB client", "err", err) | 
					
						
							| 
									
										
										
										
											2018-02-23 10:56:08 +01:00
										 |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *reporter) send() error { | 
					
						
							|  |  |  | 	var pts []client.Point | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	r.reg.Each(func(name string, i interface{}) { | 
					
						
							|  |  |  | 		now := time.Now() | 
					
						
							|  |  |  | 		namespace := r.namespace | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		switch metric := i.(type) { | 
					
						
							|  |  |  | 		case metrics.Counter: | 
					
						
							|  |  |  | 			v := metric.Count() | 
					
						
							|  |  |  | 			l := r.cache[name] | 
					
						
							|  |  |  | 			pts = append(pts, client.Point{ | 
					
						
							|  |  |  | 				Measurement: fmt.Sprintf("%s%s.count", namespace, name), | 
					
						
							|  |  |  | 				Tags:        r.tags, | 
					
						
							|  |  |  | 				Fields: map[string]interface{}{ | 
					
						
							|  |  |  | 					"value": v - l, | 
					
						
							|  |  |  | 				}, | 
					
						
							|  |  |  | 				Time: now, | 
					
						
							|  |  |  | 			}) | 
					
						
							|  |  |  | 			r.cache[name] = v | 
					
						
							|  |  |  | 		case metrics.Gauge: | 
					
						
							|  |  |  | 			ms := metric.Snapshot() | 
					
						
							|  |  |  | 			pts = append(pts, client.Point{ | 
					
						
							|  |  |  | 				Measurement: fmt.Sprintf("%s%s.gauge", namespace, name), | 
					
						
							|  |  |  | 				Tags:        r.tags, | 
					
						
							|  |  |  | 				Fields: map[string]interface{}{ | 
					
						
							|  |  |  | 					"value": ms.Value(), | 
					
						
							|  |  |  | 				}, | 
					
						
							|  |  |  | 				Time: now, | 
					
						
							|  |  |  | 			}) | 
					
						
							|  |  |  | 		case metrics.GaugeFloat64: | 
					
						
							|  |  |  | 			ms := metric.Snapshot() | 
					
						
							|  |  |  | 			pts = append(pts, client.Point{ | 
					
						
							|  |  |  | 				Measurement: fmt.Sprintf("%s%s.gauge", namespace, name), | 
					
						
							|  |  |  | 				Tags:        r.tags, | 
					
						
							|  |  |  | 				Fields: map[string]interface{}{ | 
					
						
							|  |  |  | 					"value": ms.Value(), | 
					
						
							|  |  |  | 				}, | 
					
						
							|  |  |  | 				Time: now, | 
					
						
							|  |  |  | 			}) | 
					
						
							|  |  |  | 		case metrics.Histogram: | 
					
						
							|  |  |  | 			ms := metric.Snapshot() | 
					
						
							|  |  |  | 			ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) | 
					
						
							|  |  |  | 			pts = append(pts, client.Point{ | 
					
						
							|  |  |  | 				Measurement: fmt.Sprintf("%s%s.histogram", namespace, name), | 
					
						
							|  |  |  | 				Tags:        r.tags, | 
					
						
							|  |  |  | 				Fields: map[string]interface{}{ | 
					
						
							|  |  |  | 					"count":    ms.Count(), | 
					
						
							|  |  |  | 					"max":      ms.Max(), | 
					
						
							|  |  |  | 					"mean":     ms.Mean(), | 
					
						
							|  |  |  | 					"min":      ms.Min(), | 
					
						
							|  |  |  | 					"stddev":   ms.StdDev(), | 
					
						
							|  |  |  | 					"variance": ms.Variance(), | 
					
						
							|  |  |  | 					"p50":      ps[0], | 
					
						
							|  |  |  | 					"p75":      ps[1], | 
					
						
							|  |  |  | 					"p95":      ps[2], | 
					
						
							|  |  |  | 					"p99":      ps[3], | 
					
						
							|  |  |  | 					"p999":     ps[4], | 
					
						
							|  |  |  | 					"p9999":    ps[5], | 
					
						
							|  |  |  | 				}, | 
					
						
							|  |  |  | 				Time: now, | 
					
						
							|  |  |  | 			}) | 
					
						
							|  |  |  | 		case metrics.Meter: | 
					
						
							|  |  |  | 			ms := metric.Snapshot() | 
					
						
							|  |  |  | 			pts = append(pts, client.Point{ | 
					
						
							|  |  |  | 				Measurement: fmt.Sprintf("%s%s.meter", namespace, name), | 
					
						
							|  |  |  | 				Tags:        r.tags, | 
					
						
							|  |  |  | 				Fields: map[string]interface{}{ | 
					
						
							|  |  |  | 					"count": ms.Count(), | 
					
						
							|  |  |  | 					"m1":    ms.Rate1(), | 
					
						
							|  |  |  | 					"m5":    ms.Rate5(), | 
					
						
							|  |  |  | 					"m15":   ms.Rate15(), | 
					
						
							|  |  |  | 					"mean":  ms.RateMean(), | 
					
						
							|  |  |  | 				}, | 
					
						
							|  |  |  | 				Time: now, | 
					
						
							|  |  |  | 			}) | 
					
						
							|  |  |  | 		case metrics.Timer: | 
					
						
							|  |  |  | 			ms := metric.Snapshot() | 
					
						
							|  |  |  | 			ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) | 
					
						
							|  |  |  | 			pts = append(pts, client.Point{ | 
					
						
							|  |  |  | 				Measurement: fmt.Sprintf("%s%s.timer", namespace, name), | 
					
						
							|  |  |  | 				Tags:        r.tags, | 
					
						
							|  |  |  | 				Fields: map[string]interface{}{ | 
					
						
							|  |  |  | 					"count":    ms.Count(), | 
					
						
							|  |  |  | 					"max":      ms.Max(), | 
					
						
							|  |  |  | 					"mean":     ms.Mean(), | 
					
						
							|  |  |  | 					"min":      ms.Min(), | 
					
						
							|  |  |  | 					"stddev":   ms.StdDev(), | 
					
						
							|  |  |  | 					"variance": ms.Variance(), | 
					
						
							|  |  |  | 					"p50":      ps[0], | 
					
						
							|  |  |  | 					"p75":      ps[1], | 
					
						
							|  |  |  | 					"p95":      ps[2], | 
					
						
							|  |  |  | 					"p99":      ps[3], | 
					
						
							|  |  |  | 					"p999":     ps[4], | 
					
						
							|  |  |  | 					"p9999":    ps[5], | 
					
						
							|  |  |  | 					"m1":       ms.Rate1(), | 
					
						
							|  |  |  | 					"m5":       ms.Rate5(), | 
					
						
							|  |  |  | 					"m15":      ms.Rate15(), | 
					
						
							|  |  |  | 					"meanrate": ms.RateMean(), | 
					
						
							|  |  |  | 				}, | 
					
						
							|  |  |  | 				Time: now, | 
					
						
							|  |  |  | 			}) | 
					
						
							|  |  |  | 		case metrics.ResettingTimer: | 
					
						
							|  |  |  | 			t := metric.Snapshot() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			if len(t.Values()) > 0 { | 
					
						
							|  |  |  | 				ps := t.Percentiles([]float64{50, 95, 99}) | 
					
						
							|  |  |  | 				val := t.Values() | 
					
						
							|  |  |  | 				pts = append(pts, client.Point{ | 
					
						
							|  |  |  | 					Measurement: fmt.Sprintf("%s%s.span", namespace, name), | 
					
						
							|  |  |  | 					Tags:        r.tags, | 
					
						
							|  |  |  | 					Fields: map[string]interface{}{ | 
					
						
							|  |  |  | 						"count": len(val), | 
					
						
							|  |  |  | 						"max":   val[len(val)-1], | 
					
						
							|  |  |  | 						"mean":  t.Mean(), | 
					
						
							|  |  |  | 						"min":   val[0], | 
					
						
							|  |  |  | 						"p50":   ps[0], | 
					
						
							|  |  |  | 						"p95":   ps[1], | 
					
						
							|  |  |  | 						"p99":   ps[2], | 
					
						
							|  |  |  | 					}, | 
					
						
							|  |  |  | 					Time: now, | 
					
						
							|  |  |  | 				}) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	bps := client.BatchPoints{ | 
					
						
							|  |  |  | 		Points:   pts, | 
					
						
							|  |  |  | 		Database: r.database, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	_, err := r.client.Write(bps) | 
					
						
							|  |  |  | 	return err | 
					
						
							|  |  |  | } |