This PR adds flag to enable InfluxDB v2 (--metrics.influxdbv2), flags for v2-specific features (--metrics.influxdb.token, --metrics.influxdb.bucket), also carries over addition of support for specifying organization (--metrics.influxdb.organization), but still retains backwards compatibility with InfluxDB v1.
		
			
				
	
	
		
			224 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			224 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
//
 | 
						|
// The go-ethereum library is distributed in the hope that it will be useful,
 | 
						|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | 
						|
// GNU Lesser General Public License for more details.
 | 
						|
//
 | 
						|
// You should have received a copy of the GNU Lesser General Public License
 | 
						|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 | 
						|
package influxdb
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/ethereum/go-ethereum/log"
 | 
						|
	"github.com/ethereum/go-ethereum/metrics"
 | 
						|
	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
 | 
						|
	"github.com/influxdata/influxdb-client-go/v2/api"
 | 
						|
)
 | 
						|
 | 
						|
type v2Reporter struct {
 | 
						|
	reg      metrics.Registry
 | 
						|
	interval time.Duration
 | 
						|
 | 
						|
	endpoint     string
 | 
						|
	token        string
 | 
						|
	bucket       string
 | 
						|
	organization string
 | 
						|
	namespace    string
 | 
						|
	tags         map[string]string
 | 
						|
 | 
						|
	client influxdb2.Client
 | 
						|
	write  api.WriteAPI
 | 
						|
 | 
						|
	cache map[string]int64
 | 
						|
}
 | 
						|
 | 
						|
// InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
 | 
						|
func InfluxDBV2WithTags(r metrics.Registry, d time.Duration, endpoint string, token string, bucket string, organization string, namespace string, tags map[string]string) {
 | 
						|
	rep := &v2Reporter{
 | 
						|
		reg:          r,
 | 
						|
		interval:     d,
 | 
						|
		endpoint:     endpoint,
 | 
						|
		token:        token,
 | 
						|
		bucket:       bucket,
 | 
						|
		organization: organization,
 | 
						|
		namespace:    namespace,
 | 
						|
		tags:         tags,
 | 
						|
		cache:        make(map[string]int64),
 | 
						|
	}
 | 
						|
 | 
						|
	rep.client = influxdb2.NewClient(rep.endpoint, rep.token)
 | 
						|
	defer rep.client.Close()
 | 
						|
 | 
						|
	// async write client
 | 
						|
	rep.write = rep.client.WriteAPI(rep.organization, rep.bucket)
 | 
						|
	errorsCh := rep.write.Errors()
 | 
						|
 | 
						|
	// have to handle write errors in a separate goroutine like this b/c the channel is unbuffered and will block writes if not read
 | 
						|
	go func() {
 | 
						|
		for err := range errorsCh {
 | 
						|
			log.Warn("write error", "err", err.Error())
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	rep.run()
 | 
						|
}
 | 
						|
 | 
						|
func (r *v2Reporter) run() {
 | 
						|
	intervalTicker := time.Tick(r.interval)
 | 
						|
	pingTicker := time.Tick(time.Second * 5)
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-intervalTicker:
 | 
						|
			r.send()
 | 
						|
		case <-pingTicker:
 | 
						|
			_, err := r.client.Health(context.Background())
 | 
						|
			if err != nil {
 | 
						|
				log.Warn("Got error from influxdb client health check", "err", err.Error())
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
func (r *v2Reporter) send() {
 | 
						|
	r.reg.Each(func(name string, i interface{}) {
 | 
						|
		now := time.Now()
 | 
						|
		namespace := r.namespace
 | 
						|
 | 
						|
		switch metric := i.(type) {
 | 
						|
 | 
						|
		case metrics.Counter:
 | 
						|
			v := metric.Count()
 | 
						|
			l := r.cache[name]
 | 
						|
 | 
						|
			measurement := fmt.Sprintf("%s%s.count", namespace, name)
 | 
						|
			fields := map[string]interface{}{
 | 
						|
				"value": v - l,
 | 
						|
			}
 | 
						|
 | 
						|
			pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
 | 
						|
			r.write.WritePoint(pt)
 | 
						|
 | 
						|
			r.cache[name] = v
 | 
						|
 | 
						|
		case metrics.Gauge:
 | 
						|
			ms := metric.Snapshot()
 | 
						|
 | 
						|
			measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
 | 
						|
			fields := map[string]interface{}{
 | 
						|
				"value": ms.Value(),
 | 
						|
			}
 | 
						|
 | 
						|
			pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
 | 
						|
			r.write.WritePoint(pt)
 | 
						|
 | 
						|
		case metrics.GaugeFloat64:
 | 
						|
			ms := metric.Snapshot()
 | 
						|
 | 
						|
			measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
 | 
						|
			fields := map[string]interface{}{
 | 
						|
				"value": ms.Value(),
 | 
						|
			}
 | 
						|
 | 
						|
			pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
 | 
						|
			r.write.WritePoint(pt)
 | 
						|
 | 
						|
		case metrics.Histogram:
 | 
						|
			ms := metric.Snapshot()
 | 
						|
 | 
						|
			if ms.Count() > 0 {
 | 
						|
				ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
 | 
						|
				measurement := fmt.Sprintf("%s%s.histogram", namespace, name)
 | 
						|
				fields := map[string]interface{}{
 | 
						|
					"count":    ms.Count(),
 | 
						|
					"max":      ms.Max(),
 | 
						|
					"mean":     ms.Mean(),
 | 
						|
					"min":      ms.Min(),
 | 
						|
					"stddev":   ms.StdDev(),
 | 
						|
					"variance": ms.Variance(),
 | 
						|
					"p50":      ps[0],
 | 
						|
					"p75":      ps[1],
 | 
						|
					"p95":      ps[2],
 | 
						|
					"p99":      ps[3],
 | 
						|
					"p999":     ps[4],
 | 
						|
					"p9999":    ps[5],
 | 
						|
				}
 | 
						|
 | 
						|
				pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
 | 
						|
				r.write.WritePoint(pt)
 | 
						|
			}
 | 
						|
 | 
						|
		case metrics.Meter:
 | 
						|
			ms := metric.Snapshot()
 | 
						|
 | 
						|
			measurement := fmt.Sprintf("%s%s.meter", namespace, name)
 | 
						|
			fields := map[string]interface{}{
 | 
						|
				"count": ms.Count(),
 | 
						|
				"m1":    ms.Rate1(),
 | 
						|
				"m5":    ms.Rate5(),
 | 
						|
				"m15":   ms.Rate15(),
 | 
						|
				"mean":  ms.RateMean(),
 | 
						|
			}
 | 
						|
 | 
						|
			pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
 | 
						|
			r.write.WritePoint(pt)
 | 
						|
 | 
						|
		case metrics.Timer:
 | 
						|
			ms := metric.Snapshot()
 | 
						|
			ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
 | 
						|
 | 
						|
			measurement := fmt.Sprintf("%s%s.timer", namespace, name)
 | 
						|
			fields := map[string]interface{}{
 | 
						|
				"count":    ms.Count(),
 | 
						|
				"max":      ms.Max(),
 | 
						|
				"mean":     ms.Mean(),
 | 
						|
				"min":      ms.Min(),
 | 
						|
				"stddev":   ms.StdDev(),
 | 
						|
				"variance": ms.Variance(),
 | 
						|
				"p50":      ps[0],
 | 
						|
				"p75":      ps[1],
 | 
						|
				"p95":      ps[2],
 | 
						|
				"p99":      ps[3],
 | 
						|
				"p999":     ps[4],
 | 
						|
				"p9999":    ps[5],
 | 
						|
				"m1":       ms.Rate1(),
 | 
						|
				"m5":       ms.Rate5(),
 | 
						|
				"m15":      ms.Rate15(),
 | 
						|
				"meanrate": ms.RateMean(),
 | 
						|
			}
 | 
						|
 | 
						|
			pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
 | 
						|
			r.write.WritePoint(pt)
 | 
						|
 | 
						|
		case metrics.ResettingTimer:
 | 
						|
			t := metric.Snapshot()
 | 
						|
 | 
						|
			if len(t.Values()) > 0 {
 | 
						|
				ps := t.Percentiles([]float64{50, 95, 99})
 | 
						|
				val := t.Values()
 | 
						|
 | 
						|
				measurement := fmt.Sprintf("%s%s.span", namespace, name)
 | 
						|
				fields := map[string]interface{}{
 | 
						|
					"count": len(val),
 | 
						|
					"max":   val[len(val)-1],
 | 
						|
					"mean":  t.Mean(),
 | 
						|
					"min":   val[0],
 | 
						|
					"p50":   ps[0],
 | 
						|
					"p95":   ps[1],
 | 
						|
					"p99":   ps[2],
 | 
						|
				}
 | 
						|
 | 
						|
				pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
 | 
						|
				r.write.WritePoint(pt)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	})
 | 
						|
 | 
						|
	// Force all unwritten data to be sent
 | 
						|
	r.write.Flush()
 | 
						|
}
 |