* go-metrics: fork library and introduce ResettingTimer and InfluxDB reporter. * vendor: change nonsense/go-metrics to ethersphere/go-metrics * go-metrics: add tests. move ResettingTimer logic from reporter to type. * all, metrics: pull in metrics package in go-ethereum * metrics/test: make sure metrics are enabled for tests * metrics: apply gosimple rules * metrics/exp, internal/debug: init expvar endpoint when starting pprof server * internal/debug: tiny comment formatting fix
		
			
				
	
	
		
			636 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			636 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package client (v2) is the current official Go client for InfluxDB.
 | |
| package client // import "github.com/influxdata/influxdb/client/v2"
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"crypto/tls"
 | |
| 	"encoding/json"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"io/ioutil"
 | |
| 	"mime"
 | |
| 	"net/http"
 | |
| 	"net/url"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/influxdata/influxdb/models"
 | |
| )
 | |
| 
 | |
| // HTTPConfig is the config data needed to create an HTTP Client.
 | |
| type HTTPConfig struct {
 | |
| 	// Addr should be of the form "http://host:port"
 | |
| 	// or "http://[ipv6-host%zone]:port".
 | |
| 	Addr string
 | |
| 
 | |
| 	// Username is the influxdb username, optional.
 | |
| 	Username string
 | |
| 
 | |
| 	// Password is the influxdb password, optional.
 | |
| 	Password string
 | |
| 
 | |
| 	// UserAgent is the http User Agent, defaults to "InfluxDBClient".
 | |
| 	UserAgent string
 | |
| 
 | |
| 	// Timeout for influxdb writes, defaults to no timeout.
 | |
| 	Timeout time.Duration
 | |
| 
 | |
| 	// InsecureSkipVerify gets passed to the http client, if true, it will
 | |
| 	// skip https certificate verification. Defaults to false.
 | |
| 	InsecureSkipVerify bool
 | |
| 
 | |
| 	// TLSConfig allows the user to set their own TLS config for the HTTP
 | |
| 	// Client. If set, this option overrides InsecureSkipVerify.
 | |
| 	TLSConfig *tls.Config
 | |
| }
 | |
| 
 | |
| // BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct.
 | |
| type BatchPointsConfig struct {
 | |
| 	// Precision is the write precision of the points, defaults to "ns".
 | |
| 	Precision string
 | |
| 
 | |
| 	// Database is the database to write points to.
 | |
| 	Database string
 | |
| 
 | |
| 	// RetentionPolicy is the retention policy of the points.
 | |
| 	RetentionPolicy string
 | |
| 
 | |
| 	// Write consistency is the number of servers required to confirm write.
 | |
| 	WriteConsistency string
 | |
| }
 | |
| 
 | |
| // Client is a client interface for writing & querying the database.
 | |
| type Client interface {
 | |
| 	// Ping checks that status of cluster, and will always return 0 time and no
 | |
| 	// error for UDP clients.
 | |
| 	Ping(timeout time.Duration) (time.Duration, string, error)
 | |
| 
 | |
| 	// Write takes a BatchPoints object and writes all Points to InfluxDB.
 | |
| 	Write(bp BatchPoints) error
 | |
| 
 | |
| 	// Query makes an InfluxDB Query on the database. This will fail if using
 | |
| 	// the UDP client.
 | |
| 	Query(q Query) (*Response, error)
 | |
| 
 | |
| 	// Close releases any resources a Client may be using.
 | |
| 	Close() error
 | |
| }
 | |
| 
 | |
| // NewHTTPClient returns a new Client from the provided config.
 | |
| // Client is safe for concurrent use by multiple goroutines.
 | |
| func NewHTTPClient(conf HTTPConfig) (Client, error) {
 | |
| 	if conf.UserAgent == "" {
 | |
| 		conf.UserAgent = "InfluxDBClient"
 | |
| 	}
 | |
| 
 | |
| 	u, err := url.Parse(conf.Addr)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	} else if u.Scheme != "http" && u.Scheme != "https" {
 | |
| 		m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+
 | |
| 			" must start with http:// or https://", u.Scheme)
 | |
| 		return nil, errors.New(m)
 | |
| 	}
 | |
| 
 | |
| 	tr := &http.Transport{
 | |
| 		TLSClientConfig: &tls.Config{
 | |
| 			InsecureSkipVerify: conf.InsecureSkipVerify,
 | |
| 		},
 | |
| 	}
 | |
| 	if conf.TLSConfig != nil {
 | |
| 		tr.TLSClientConfig = conf.TLSConfig
 | |
| 	}
 | |
| 	return &client{
 | |
| 		url:       *u,
 | |
| 		username:  conf.Username,
 | |
| 		password:  conf.Password,
 | |
| 		useragent: conf.UserAgent,
 | |
| 		httpClient: &http.Client{
 | |
| 			Timeout:   conf.Timeout,
 | |
| 			Transport: tr,
 | |
| 		},
 | |
| 		transport: tr,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| // Ping will check to see if the server is up with an optional timeout on waiting for leader.
 | |
| // Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
 | |
| func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) {
 | |
| 	now := time.Now()
 | |
| 	u := c.url
 | |
| 	u.Path = "ping"
 | |
| 
 | |
| 	req, err := http.NewRequest("GET", u.String(), nil)
 | |
| 	if err != nil {
 | |
| 		return 0, "", err
 | |
| 	}
 | |
| 
 | |
| 	req.Header.Set("User-Agent", c.useragent)
 | |
| 
 | |
| 	if c.username != "" {
 | |
| 		req.SetBasicAuth(c.username, c.password)
 | |
| 	}
 | |
| 
 | |
| 	if timeout > 0 {
 | |
| 		params := req.URL.Query()
 | |
| 		params.Set("wait_for_leader", fmt.Sprintf("%.0fs", timeout.Seconds()))
 | |
| 		req.URL.RawQuery = params.Encode()
 | |
| 	}
 | |
| 
 | |
| 	resp, err := c.httpClient.Do(req)
 | |
| 	if err != nil {
 | |
| 		return 0, "", err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 
 | |
| 	body, err := ioutil.ReadAll(resp.Body)
 | |
| 	if err != nil {
 | |
| 		return 0, "", err
 | |
| 	}
 | |
| 
 | |
| 	if resp.StatusCode != http.StatusNoContent {
 | |
| 		var err = fmt.Errorf(string(body))
 | |
| 		return 0, "", err
 | |
| 	}
 | |
| 
 | |
| 	version := resp.Header.Get("X-Influxdb-Version")
 | |
| 	return time.Since(now), version, nil
 | |
| }
 | |
| 
 | |
| // Close releases the client's resources.
 | |
| func (c *client) Close() error {
 | |
| 	c.transport.CloseIdleConnections()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // client is safe for concurrent use as the fields are all read-only
 | |
| // once the client is instantiated.
 | |
| type client struct {
 | |
| 	// N.B - if url.UserInfo is accessed in future modifications to the
 | |
| 	// methods on client, you will need to syncronise access to url.
 | |
| 	url        url.URL
 | |
| 	username   string
 | |
| 	password   string
 | |
| 	useragent  string
 | |
| 	httpClient *http.Client
 | |
| 	transport  *http.Transport
 | |
| }
 | |
| 
 | |
| // BatchPoints is an interface into a batched grouping of points to write into
 | |
| // InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate
 | |
| // batch for each goroutine.
 | |
| type BatchPoints interface {
 | |
| 	// AddPoint adds the given point to the Batch of points.
 | |
| 	AddPoint(p *Point)
 | |
| 	// AddPoints adds the given points to the Batch of points.
 | |
| 	AddPoints(ps []*Point)
 | |
| 	// Points lists the points in the Batch.
 | |
| 	Points() []*Point
 | |
| 
 | |
| 	// Precision returns the currently set precision of this Batch.
 | |
| 	Precision() string
 | |
| 	// SetPrecision sets the precision of this batch.
 | |
| 	SetPrecision(s string) error
 | |
| 
 | |
| 	// Database returns the currently set database of this Batch.
 | |
| 	Database() string
 | |
| 	// SetDatabase sets the database of this Batch.
 | |
| 	SetDatabase(s string)
 | |
| 
 | |
| 	// WriteConsistency returns the currently set write consistency of this Batch.
 | |
| 	WriteConsistency() string
 | |
| 	// SetWriteConsistency sets the write consistency of this Batch.
 | |
| 	SetWriteConsistency(s string)
 | |
| 
 | |
| 	// RetentionPolicy returns the currently set retention policy of this Batch.
 | |
| 	RetentionPolicy() string
 | |
| 	// SetRetentionPolicy sets the retention policy of this Batch.
 | |
| 	SetRetentionPolicy(s string)
 | |
| }
 | |
| 
 | |
| // NewBatchPoints returns a BatchPoints interface based on the given config.
 | |
| func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) {
 | |
| 	if conf.Precision == "" {
 | |
| 		conf.Precision = "ns"
 | |
| 	}
 | |
| 	if _, err := time.ParseDuration("1" + conf.Precision); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	bp := &batchpoints{
 | |
| 		database:         conf.Database,
 | |
| 		precision:        conf.Precision,
 | |
| 		retentionPolicy:  conf.RetentionPolicy,
 | |
| 		writeConsistency: conf.WriteConsistency,
 | |
| 	}
 | |
| 	return bp, nil
 | |
| }
 | |
| 
 | |
| type batchpoints struct {
 | |
| 	points           []*Point
 | |
| 	database         string
 | |
| 	precision        string
 | |
| 	retentionPolicy  string
 | |
| 	writeConsistency string
 | |
| }
 | |
| 
 | |
| func (bp *batchpoints) AddPoint(p *Point) {
 | |
| 	bp.points = append(bp.points, p)
 | |
| }
 | |
| 
 | |
| func (bp *batchpoints) AddPoints(ps []*Point) {
 | |
| 	bp.points = append(bp.points, ps...)
 | |
| }
 | |
| 
 | |
| func (bp *batchpoints) Points() []*Point {
 | |
| 	return bp.points
 | |
| }
 | |
| 
 | |
| func (bp *batchpoints) Precision() string {
 | |
| 	return bp.precision
 | |
| }
 | |
| 
 | |
| func (bp *batchpoints) Database() string {
 | |
| 	return bp.database
 | |
| }
 | |
| 
 | |
| func (bp *batchpoints) WriteConsistency() string {
 | |
| 	return bp.writeConsistency
 | |
| }
 | |
| 
 | |
| func (bp *batchpoints) RetentionPolicy() string {
 | |
| 	return bp.retentionPolicy
 | |
| }
 | |
| 
 | |
| func (bp *batchpoints) SetPrecision(p string) error {
 | |
| 	if _, err := time.ParseDuration("1" + p); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	bp.precision = p
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (bp *batchpoints) SetDatabase(db string) {
 | |
| 	bp.database = db
 | |
| }
 | |
| 
 | |
| func (bp *batchpoints) SetWriteConsistency(wc string) {
 | |
| 	bp.writeConsistency = wc
 | |
| }
 | |
| 
 | |
| func (bp *batchpoints) SetRetentionPolicy(rp string) {
 | |
| 	bp.retentionPolicy = rp
 | |
| }
 | |
| 
 | |
| // Point represents a single data point.
 | |
| type Point struct {
 | |
| 	pt models.Point
 | |
| }
 | |
| 
 | |
| // NewPoint returns a point with the given timestamp. If a timestamp is not
 | |
| // given, then data is sent to the database without a timestamp, in which case
 | |
| // the server will assign local time upon reception. NOTE: it is recommended to
 | |
| // send data with a timestamp.
 | |
| func NewPoint(
 | |
| 	name string,
 | |
| 	tags map[string]string,
 | |
| 	fields map[string]interface{},
 | |
| 	t ...time.Time,
 | |
| ) (*Point, error) {
 | |
| 	var T time.Time
 | |
| 	if len(t) > 0 {
 | |
| 		T = t[0]
 | |
| 	}
 | |
| 
 | |
| 	pt, err := models.NewPoint(name, models.NewTags(tags), fields, T)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return &Point{
 | |
| 		pt: pt,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| // String returns a line-protocol string of the Point.
 | |
| func (p *Point) String() string {
 | |
| 	return p.pt.String()
 | |
| }
 | |
| 
 | |
| // PrecisionString returns a line-protocol string of the Point,
 | |
| // with the timestamp formatted for the given precision.
 | |
| func (p *Point) PrecisionString(precison string) string {
 | |
| 	return p.pt.PrecisionString(precison)
 | |
| }
 | |
| 
 | |
| // Name returns the measurement name of the point.
 | |
| func (p *Point) Name() string {
 | |
| 	return string(p.pt.Name())
 | |
| }
 | |
| 
 | |
| // Tags returns the tags associated with the point.
 | |
| func (p *Point) Tags() map[string]string {
 | |
| 	return p.pt.Tags().Map()
 | |
| }
 | |
| 
 | |
| // Time return the timestamp for the point.
 | |
| func (p *Point) Time() time.Time {
 | |
| 	return p.pt.Time()
 | |
| }
 | |
| 
 | |
| // UnixNano returns timestamp of the point in nanoseconds since Unix epoch.
 | |
| func (p *Point) UnixNano() int64 {
 | |
| 	return p.pt.UnixNano()
 | |
| }
 | |
| 
 | |
| // Fields returns the fields for the point.
 | |
| func (p *Point) Fields() (map[string]interface{}, error) {
 | |
| 	return p.pt.Fields()
 | |
| }
 | |
| 
 | |
| // NewPointFrom returns a point from the provided models.Point.
 | |
| func NewPointFrom(pt models.Point) *Point {
 | |
| 	return &Point{pt: pt}
 | |
| }
 | |
| 
 | |
| func (c *client) Write(bp BatchPoints) error {
 | |
| 	var b bytes.Buffer
 | |
| 
 | |
| 	for _, p := range bp.Points() {
 | |
| 		if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		if err := b.WriteByte('\n'); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	u := c.url
 | |
| 	u.Path = "write"
 | |
| 	req, err := http.NewRequest("POST", u.String(), &b)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req.Header.Set("Content-Type", "")
 | |
| 	req.Header.Set("User-Agent", c.useragent)
 | |
| 	if c.username != "" {
 | |
| 		req.SetBasicAuth(c.username, c.password)
 | |
| 	}
 | |
| 
 | |
| 	params := req.URL.Query()
 | |
| 	params.Set("db", bp.Database())
 | |
| 	params.Set("rp", bp.RetentionPolicy())
 | |
| 	params.Set("precision", bp.Precision())
 | |
| 	params.Set("consistency", bp.WriteConsistency())
 | |
| 	req.URL.RawQuery = params.Encode()
 | |
| 
 | |
| 	resp, err := c.httpClient.Do(req)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 
 | |
| 	body, err := ioutil.ReadAll(resp.Body)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
 | |
| 		var err = fmt.Errorf(string(body))
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Query defines a query to send to the server.
 | |
| type Query struct {
 | |
| 	Command    string
 | |
| 	Database   string
 | |
| 	Precision  string
 | |
| 	Chunked    bool
 | |
| 	ChunkSize  int
 | |
| 	Parameters map[string]interface{}
 | |
| }
 | |
| 
 | |
| // NewQuery returns a query object.
 | |
| // The database and precision arguments can be empty strings if they are not needed for the query.
 | |
| func NewQuery(command, database, precision string) Query {
 | |
| 	return Query{
 | |
| 		Command:    command,
 | |
| 		Database:   database,
 | |
| 		Precision:  precision,
 | |
| 		Parameters: make(map[string]interface{}),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // NewQueryWithParameters returns a query object.
 | |
| // The database and precision arguments can be empty strings if they are not needed for the query.
 | |
| // parameters is a map of the parameter names used in the command to their values.
 | |
| func NewQueryWithParameters(command, database, precision string, parameters map[string]interface{}) Query {
 | |
| 	return Query{
 | |
| 		Command:    command,
 | |
| 		Database:   database,
 | |
| 		Precision:  precision,
 | |
| 		Parameters: parameters,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Response represents a list of statement results.
 | |
| type Response struct {
 | |
| 	Results []Result
 | |
| 	Err     string `json:"error,omitempty"`
 | |
| }
 | |
| 
 | |
| // Error returns the first error from any statement.
 | |
| // It returns nil if no errors occurred on any statements.
 | |
| func (r *Response) Error() error {
 | |
| 	if r.Err != "" {
 | |
| 		return fmt.Errorf(r.Err)
 | |
| 	}
 | |
| 	for _, result := range r.Results {
 | |
| 		if result.Err != "" {
 | |
| 			return fmt.Errorf(result.Err)
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Message represents a user message.
 | |
| type Message struct {
 | |
| 	Level string
 | |
| 	Text  string
 | |
| }
 | |
| 
 | |
| // Result represents a resultset returned from a single statement.
 | |
| type Result struct {
 | |
| 	Series   []models.Row
 | |
| 	Messages []*Message
 | |
| 	Err      string `json:"error,omitempty"`
 | |
| }
 | |
| 
 | |
| // Query sends a command to the server and returns the Response.
 | |
| func (c *client) Query(q Query) (*Response, error) {
 | |
| 	u := c.url
 | |
| 	u.Path = "query"
 | |
| 
 | |
| 	jsonParameters, err := json.Marshal(q.Parameters)
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	req, err := http.NewRequest("POST", u.String(), nil)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	req.Header.Set("Content-Type", "")
 | |
| 	req.Header.Set("User-Agent", c.useragent)
 | |
| 
 | |
| 	if c.username != "" {
 | |
| 		req.SetBasicAuth(c.username, c.password)
 | |
| 	}
 | |
| 
 | |
| 	params := req.URL.Query()
 | |
| 	params.Set("q", q.Command)
 | |
| 	params.Set("db", q.Database)
 | |
| 	params.Set("params", string(jsonParameters))
 | |
| 	if q.Chunked {
 | |
| 		params.Set("chunked", "true")
 | |
| 		if q.ChunkSize > 0 {
 | |
| 			params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if q.Precision != "" {
 | |
| 		params.Set("epoch", q.Precision)
 | |
| 	}
 | |
| 	req.URL.RawQuery = params.Encode()
 | |
| 
 | |
| 	resp, err := c.httpClient.Do(req)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 
 | |
| 	// If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb
 | |
| 	// but instead some other service. If the error code is also a 500+ code, then some
 | |
| 	// downstream loadbalancer/proxy/etc had an issue and we should report that.
 | |
| 	if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError {
 | |
| 		body, err := ioutil.ReadAll(resp.Body)
 | |
| 		if err != nil || len(body) == 0 {
 | |
| 			return nil, fmt.Errorf("received status code %d from downstream server", resp.StatusCode)
 | |
| 		}
 | |
| 
 | |
| 		return nil, fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body)
 | |
| 	}
 | |
| 
 | |
| 	// If we get an unexpected content type, then it is also not from influx direct and therefore
 | |
| 	// we want to know what we received and what status code was returned for debugging purposes.
 | |
| 	if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" {
 | |
| 		// Read up to 1kb of the body to help identify downstream errors and limit the impact of things
 | |
| 		// like downstream serving a large file
 | |
| 		body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024))
 | |
| 		if err != nil || len(body) == 0 {
 | |
| 			return nil, fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode)
 | |
| 		}
 | |
| 
 | |
| 		return nil, fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body)
 | |
| 	}
 | |
| 
 | |
| 	var response Response
 | |
| 	if q.Chunked {
 | |
| 		cr := NewChunkedResponse(resp.Body)
 | |
| 		for {
 | |
| 			r, err := cr.NextResponse()
 | |
| 			if err != nil {
 | |
| 				// If we got an error while decoding the response, send that back.
 | |
| 				return nil, err
 | |
| 			}
 | |
| 
 | |
| 			if r == nil {
 | |
| 				break
 | |
| 			}
 | |
| 
 | |
| 			response.Results = append(response.Results, r.Results...)
 | |
| 			if r.Err != "" {
 | |
| 				response.Err = r.Err
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 	} else {
 | |
| 		dec := json.NewDecoder(resp.Body)
 | |
| 		dec.UseNumber()
 | |
| 		decErr := dec.Decode(&response)
 | |
| 
 | |
| 		// ignore this error if we got an invalid status code
 | |
| 		if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
 | |
| 			decErr = nil
 | |
| 		}
 | |
| 		// If we got a valid decode error, send that back
 | |
| 		if decErr != nil {
 | |
| 			return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// If we don't have an error in our json response, and didn't get statusOK
 | |
| 	// then send back an error
 | |
| 	if resp.StatusCode != http.StatusOK && response.Error() == nil {
 | |
| 		return &response, fmt.Errorf("received status code %d from server", resp.StatusCode)
 | |
| 	}
 | |
| 	return &response, nil
 | |
| }
 | |
| 
 | |
| // duplexReader reads responses and writes it to another writer while
 | |
| // satisfying the reader interface.
 | |
| type duplexReader struct {
 | |
| 	r io.Reader
 | |
| 	w io.Writer
 | |
| }
 | |
| 
 | |
| func (r *duplexReader) Read(p []byte) (n int, err error) {
 | |
| 	n, err = r.r.Read(p)
 | |
| 	if err == nil {
 | |
| 		r.w.Write(p[:n])
 | |
| 	}
 | |
| 	return n, err
 | |
| }
 | |
| 
 | |
| // ChunkedResponse represents a response from the server that
 | |
| // uses chunking to stream the output.
 | |
| type ChunkedResponse struct {
 | |
| 	dec    *json.Decoder
 | |
| 	duplex *duplexReader
 | |
| 	buf    bytes.Buffer
 | |
| }
 | |
| 
 | |
| // NewChunkedResponse reads a stream and produces responses from the stream.
 | |
| func NewChunkedResponse(r io.Reader) *ChunkedResponse {
 | |
| 	resp := &ChunkedResponse{}
 | |
| 	resp.duplex = &duplexReader{r: r, w: &resp.buf}
 | |
| 	resp.dec = json.NewDecoder(resp.duplex)
 | |
| 	resp.dec.UseNumber()
 | |
| 	return resp
 | |
| }
 | |
| 
 | |
| // NextResponse reads the next line of the stream and returns a response.
 | |
| func (r *ChunkedResponse) NextResponse() (*Response, error) {
 | |
| 	var response Response
 | |
| 
 | |
| 	if err := r.dec.Decode(&response); err != nil {
 | |
| 		if err == io.EOF {
 | |
| 			return nil, nil
 | |
| 		}
 | |
| 		// A decoding error happened. This probably means the server crashed
 | |
| 		// and sent a last-ditch error message to us. Ensure we have read the
 | |
| 		// entirety of the connection to get any remaining error text.
 | |
| 		io.Copy(ioutil.Discard, r.duplex)
 | |
| 		return nil, errors.New(strings.TrimSpace(r.buf.String()))
 | |
| 	}
 | |
| 
 | |
| 	r.buf.Reset()
 | |
| 	return &response, nil
 | |
| }
 |