224 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			224 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
|   | // | ||
|  | // The go-ethereum library is distributed in the hope that it will be useful, | ||
|  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
|  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
|  | // GNU Lesser General Public License for more details. | ||
|  | // | ||
|  | // You should have received a copy of the GNU Lesser General Public License | ||
|  | // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | ||
|  | package influxdb | ||
|  | 
 | ||
|  | import ( | ||
|  | 	"context" | ||
|  | 	"fmt" | ||
|  | 	"time" | ||
|  | 
 | ||
|  | 	"github.com/ethereum/go-ethereum/log" | ||
|  | 	"github.com/ethereum/go-ethereum/metrics" | ||
|  | 	influxdb2 "github.com/influxdata/influxdb-client-go/v2" | ||
|  | 	"github.com/influxdata/influxdb-client-go/v2/api" | ||
|  | ) | ||
|  | 
 | ||
|  | type v2Reporter struct { | ||
|  | 	reg      metrics.Registry | ||
|  | 	interval time.Duration | ||
|  | 
 | ||
|  | 	endpoint     string | ||
|  | 	token        string | ||
|  | 	bucket       string | ||
|  | 	organization string | ||
|  | 	namespace    string | ||
|  | 	tags         map[string]string | ||
|  | 
 | ||
|  | 	client influxdb2.Client | ||
|  | 	write  api.WriteAPI | ||
|  | 
 | ||
|  | 	cache map[string]int64 | ||
|  | } | ||
|  | 
 | ||
|  | // InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags | ||
|  | func InfluxDBV2WithTags(r metrics.Registry, d time.Duration, endpoint string, token string, bucket string, organization string, namespace string, tags map[string]string) { | ||
|  | 	rep := &v2Reporter{ | ||
|  | 		reg:          r, | ||
|  | 		interval:     d, | ||
|  | 		endpoint:     endpoint, | ||
|  | 		token:        token, | ||
|  | 		bucket:       bucket, | ||
|  | 		organization: organization, | ||
|  | 		namespace:    namespace, | ||
|  | 		tags:         tags, | ||
|  | 		cache:        make(map[string]int64), | ||
|  | 	} | ||
|  | 
 | ||
|  | 	rep.client = influxdb2.NewClient(rep.endpoint, rep.token) | ||
|  | 	defer rep.client.Close() | ||
|  | 
 | ||
|  | 	// async write client | ||
|  | 	rep.write = rep.client.WriteAPI(rep.organization, rep.bucket) | ||
|  | 	errorsCh := rep.write.Errors() | ||
|  | 
 | ||
|  | 	// have to handle write errors in a separate goroutine like this b/c the channel is unbuffered and will block writes if not read | ||
|  | 	go func() { | ||
|  | 		for err := range errorsCh { | ||
|  | 			log.Warn("write error", "err", err.Error()) | ||
|  | 		} | ||
|  | 	}() | ||
|  | 	rep.run() | ||
|  | } | ||
|  | 
 | ||
|  | func (r *v2Reporter) run() { | ||
|  | 	intervalTicker := time.Tick(r.interval) | ||
|  | 	pingTicker := time.Tick(time.Second * 5) | ||
|  | 
 | ||
|  | 	for { | ||
|  | 		select { | ||
|  | 		case <-intervalTicker: | ||
|  | 			r.send() | ||
|  | 		case <-pingTicker: | ||
|  | 			_, err := r.client.Health(context.Background()) | ||
|  | 			if err != nil { | ||
|  | 				log.Warn("Got error from influxdb client health check", "err", err.Error()) | ||
|  | 			} | ||
|  | 		} | ||
|  | 	} | ||
|  | 
 | ||
|  | } | ||
|  | 
 | ||
|  | func (r *v2Reporter) send() { | ||
|  | 	r.reg.Each(func(name string, i interface{}) { | ||
|  | 		now := time.Now() | ||
|  | 		namespace := r.namespace | ||
|  | 
 | ||
|  | 		switch metric := i.(type) { | ||
|  | 
 | ||
|  | 		case metrics.Counter: | ||
|  | 			v := metric.Count() | ||
|  | 			l := r.cache[name] | ||
|  | 
 | ||
|  | 			measurement := fmt.Sprintf("%s%s.count", namespace, name) | ||
|  | 			fields := map[string]interface{}{ | ||
|  | 				"value": v - l, | ||
|  | 			} | ||
|  | 
 | ||
|  | 			pt := influxdb2.NewPoint(measurement, r.tags, fields, now) | ||
|  | 			r.write.WritePoint(pt) | ||
|  | 
 | ||
|  | 			r.cache[name] = v | ||
|  | 
 | ||
|  | 		case metrics.Gauge: | ||
|  | 			ms := metric.Snapshot() | ||
|  | 
 | ||
|  | 			measurement := fmt.Sprintf("%s%s.gauge", namespace, name) | ||
|  | 			fields := map[string]interface{}{ | ||
|  | 				"value": ms.Value(), | ||
|  | 			} | ||
|  | 
 | ||
|  | 			pt := influxdb2.NewPoint(measurement, r.tags, fields, now) | ||
|  | 			r.write.WritePoint(pt) | ||
|  | 
 | ||
|  | 		case metrics.GaugeFloat64: | ||
|  | 			ms := metric.Snapshot() | ||
|  | 
 | ||
|  | 			measurement := fmt.Sprintf("%s%s.gauge", namespace, name) | ||
|  | 			fields := map[string]interface{}{ | ||
|  | 				"value": ms.Value(), | ||
|  | 			} | ||
|  | 
 | ||
|  | 			pt := influxdb2.NewPoint(measurement, r.tags, fields, now) | ||
|  | 			r.write.WritePoint(pt) | ||
|  | 
 | ||
|  | 		case metrics.Histogram: | ||
|  | 			ms := metric.Snapshot() | ||
|  | 
 | ||
|  | 			if ms.Count() > 0 { | ||
|  | 				ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) | ||
|  | 				measurement := fmt.Sprintf("%s%s.histogram", namespace, name) | ||
|  | 				fields := map[string]interface{}{ | ||
|  | 					"count":    ms.Count(), | ||
|  | 					"max":      ms.Max(), | ||
|  | 					"mean":     ms.Mean(), | ||
|  | 					"min":      ms.Min(), | ||
|  | 					"stddev":   ms.StdDev(), | ||
|  | 					"variance": ms.Variance(), | ||
|  | 					"p50":      ps[0], | ||
|  | 					"p75":      ps[1], | ||
|  | 					"p95":      ps[2], | ||
|  | 					"p99":      ps[3], | ||
|  | 					"p999":     ps[4], | ||
|  | 					"p9999":    ps[5], | ||
|  | 				} | ||
|  | 
 | ||
|  | 				pt := influxdb2.NewPoint(measurement, r.tags, fields, now) | ||
|  | 				r.write.WritePoint(pt) | ||
|  | 			} | ||
|  | 
 | ||
|  | 		case metrics.Meter: | ||
|  | 			ms := metric.Snapshot() | ||
|  | 
 | ||
|  | 			measurement := fmt.Sprintf("%s%s.meter", namespace, name) | ||
|  | 			fields := map[string]interface{}{ | ||
|  | 				"count": ms.Count(), | ||
|  | 				"m1":    ms.Rate1(), | ||
|  | 				"m5":    ms.Rate5(), | ||
|  | 				"m15":   ms.Rate15(), | ||
|  | 				"mean":  ms.RateMean(), | ||
|  | 			} | ||
|  | 
 | ||
|  | 			pt := influxdb2.NewPoint(measurement, r.tags, fields, now) | ||
|  | 			r.write.WritePoint(pt) | ||
|  | 
 | ||
|  | 		case metrics.Timer: | ||
|  | 			ms := metric.Snapshot() | ||
|  | 			ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) | ||
|  | 
 | ||
|  | 			measurement := fmt.Sprintf("%s%s.timer", namespace, name) | ||
|  | 			fields := map[string]interface{}{ | ||
|  | 				"count":    ms.Count(), | ||
|  | 				"max":      ms.Max(), | ||
|  | 				"mean":     ms.Mean(), | ||
|  | 				"min":      ms.Min(), | ||
|  | 				"stddev":   ms.StdDev(), | ||
|  | 				"variance": ms.Variance(), | ||
|  | 				"p50":      ps[0], | ||
|  | 				"p75":      ps[1], | ||
|  | 				"p95":      ps[2], | ||
|  | 				"p99":      ps[3], | ||
|  | 				"p999":     ps[4], | ||
|  | 				"p9999":    ps[5], | ||
|  | 				"m1":       ms.Rate1(), | ||
|  | 				"m5":       ms.Rate5(), | ||
|  | 				"m15":      ms.Rate15(), | ||
|  | 				"meanrate": ms.RateMean(), | ||
|  | 			} | ||
|  | 
 | ||
|  | 			pt := influxdb2.NewPoint(measurement, r.tags, fields, now) | ||
|  | 			r.write.WritePoint(pt) | ||
|  | 
 | ||
|  | 		case metrics.ResettingTimer: | ||
|  | 			t := metric.Snapshot() | ||
|  | 
 | ||
|  | 			if len(t.Values()) > 0 { | ||
|  | 				ps := t.Percentiles([]float64{50, 95, 99}) | ||
|  | 				val := t.Values() | ||
|  | 
 | ||
|  | 				measurement := fmt.Sprintf("%s%s.span", namespace, name) | ||
|  | 				fields := map[string]interface{}{ | ||
|  | 					"count": len(val), | ||
|  | 					"max":   val[len(val)-1], | ||
|  | 					"mean":  t.Mean(), | ||
|  | 					"min":   val[0], | ||
|  | 					"p50":   ps[0], | ||
|  | 					"p95":   ps[1], | ||
|  | 					"p99":   ps[2], | ||
|  | 				} | ||
|  | 
 | ||
|  | 				pt := influxdb2.NewPoint(measurement, r.tags, fields, now) | ||
|  | 				r.write.WritePoint(pt) | ||
|  | 			} | ||
|  | 		} | ||
|  | 	}) | ||
|  | 
 | ||
|  | 	// Force all unwritten data to be sent | ||
|  | 	r.write.Flush() | ||
|  | } |