| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | // Copyright 2016 The go-ethereum Authors | 
					
						
							|  |  |  | // This file is part of the go-ethereum library. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  | // it under the terms of the GNU Lesser General Public License as published by | 
					
						
							|  |  |  | // the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  | // (at your option) any later version. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is distributed in the hope that it will be useful, | 
					
						
							|  |  |  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | 
					
						
							|  |  |  | // GNU Lesser General Public License for more details. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // You should have received a copy of the GNU Lesser General Public License | 
					
						
							|  |  |  | // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package rpc | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"bytes" | 
					
						
							| 
									
										
										
										
											2016-08-04 21:18:13 +02:00
										 |  |  | 	"container/list" | 
					
						
							| 
									
										
										
										
											2017-03-22 18:20:33 +01:00
										 |  |  | 	"context" | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 	"encoding/json" | 
					
						
							|  |  |  | 	"errors" | 
					
						
							|  |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"net" | 
					
						
							|  |  |  | 	"net/url" | 
					
						
							|  |  |  | 	"reflect" | 
					
						
							|  |  |  | 	"strconv" | 
					
						
							| 
									
										
										
										
											2017-04-06 08:56:41 +02:00
										 |  |  | 	"strings" | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 	"sync" | 
					
						
							|  |  |  | 	"sync/atomic" | 
					
						
							|  |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-02-22 14:10:07 +02:00
										 |  |  | 	"github.com/ethereum/go-ethereum/log" | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | var ( | 
					
						
							| 
									
										
										
										
											2016-08-04 21:18:13 +02:00
										 |  |  | 	ErrClientQuit                = errors.New("client is closed") | 
					
						
							|  |  |  | 	ErrNoResult                  = errors.New("no result in JSON-RPC response") | 
					
						
							|  |  |  | 	ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow") | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | const ( | 
					
						
							| 
									
										
										
										
											2016-08-04 21:18:13 +02:00
										 |  |  | 	// Timeouts | 
					
						
							|  |  |  | 	tcpKeepAliveInterval = 30 * time.Second | 
					
						
							|  |  |  | 	defaultDialTimeout   = 10 * time.Second // used when dialing if the context has no deadline | 
					
						
							|  |  |  | 	defaultWriteTimeout  = 10 * time.Second // used for calls if the context has no deadline | 
					
						
							|  |  |  | 	subscribeTimeout     = 5 * time.Second  // overall timeout eth_subscribe, rpc_modules calls | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | const ( | 
					
						
							|  |  |  | 	// Subscriptions are removed when the subscriber cannot keep up. | 
					
						
							|  |  |  | 	// | 
					
						
							|  |  |  | 	// This can be worked around by supplying a channel with sufficiently sized buffer, | 
					
						
							|  |  |  | 	// but this can be inconvenient and hard to explain in the docs. Another issue with | 
					
						
							|  |  |  | 	// buffered channels is that the buffer is static even though it might not be needed | 
					
						
							|  |  |  | 	// most of the time. | 
					
						
							|  |  |  | 	// | 
					
						
							|  |  |  | 	// The approach taken here is to maintain a per-subscription linked list buffer | 
					
						
							|  |  |  | 	// shrinks on demand. If the buffer reaches the size below, the subscription is | 
					
						
							|  |  |  | 	// dropped. | 
					
						
							|  |  |  | 	maxClientSubscriptionBuffer = 8000 | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // BatchElem is an element in a batch request. | 
					
						
							|  |  |  | type BatchElem struct { | 
					
						
							|  |  |  | 	Method string | 
					
						
							|  |  |  | 	Args   []interface{} | 
					
						
							|  |  |  | 	// The result is unmarshaled into this field. Result must be set to a | 
					
						
							|  |  |  | 	// non-nil pointer value of the desired type, otherwise the response will be | 
					
						
							|  |  |  | 	// discarded. | 
					
						
							|  |  |  | 	Result interface{} | 
					
						
							|  |  |  | 	// Error is set if the server returns an error for this request, or if | 
					
						
							|  |  |  | 	// unmarshaling into Result fails. It is not set for I/O errors. | 
					
						
							|  |  |  | 	Error error | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // A value of this type can a JSON-RPC request, notification, successful response or | 
					
						
							|  |  |  | // error response. Which one it is depends on the fields. | 
					
						
							|  |  |  | type jsonrpcMessage struct { | 
					
						
							|  |  |  | 	Version string          `json:"jsonrpc"` | 
					
						
							|  |  |  | 	ID      json.RawMessage `json:"id,omitempty"` | 
					
						
							|  |  |  | 	Method  string          `json:"method,omitempty"` | 
					
						
							|  |  |  | 	Params  json.RawMessage `json:"params,omitempty"` | 
					
						
							|  |  |  | 	Error   *jsonError      `json:"error,omitempty"` | 
					
						
							|  |  |  | 	Result  json.RawMessage `json:"result,omitempty"` | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (msg *jsonrpcMessage) isNotification() bool { | 
					
						
							|  |  |  | 	return msg.ID == nil && msg.Method != "" | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (msg *jsonrpcMessage) isResponse() bool { | 
					
						
							|  |  |  | 	return msg.hasValidID() && msg.Method == "" && len(msg.Params) == 0 | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (msg *jsonrpcMessage) hasValidID() bool { | 
					
						
							|  |  |  | 	return len(msg.ID) > 0 && msg.ID[0] != '{' && msg.ID[0] != '[' | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (msg *jsonrpcMessage) String() string { | 
					
						
							|  |  |  | 	b, _ := json.Marshal(msg) | 
					
						
							|  |  |  | 	return string(b) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Client represents a connection to an RPC server. | 
					
						
							|  |  |  | type Client struct { | 
					
						
							|  |  |  | 	idCounter   uint32 | 
					
						
							|  |  |  | 	connectFunc func(ctx context.Context) (net.Conn, error) | 
					
						
							|  |  |  | 	isHTTP      bool | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// writeConn is only safe to access outside dispatch, with the | 
					
						
							|  |  |  | 	// write lock held. The write lock is taken by sending on | 
					
						
							|  |  |  | 	// requestOp and released by sending on sendDone. | 
					
						
							|  |  |  | 	writeConn net.Conn | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// for dispatch | 
					
						
							|  |  |  | 	close       chan struct{} | 
					
						
							|  |  |  | 	didQuit     chan struct{}                  // closed when client quits | 
					
						
							|  |  |  | 	reconnected chan net.Conn                  // where write/reconnect sends the new connection | 
					
						
							|  |  |  | 	readErr     chan error                     // errors from read | 
					
						
							|  |  |  | 	readResp    chan []*jsonrpcMessage         // valid messages from read | 
					
						
							|  |  |  | 	requestOp   chan *requestOp                // for registering response IDs | 
					
						
							|  |  |  | 	sendDone    chan error                     // signals write completion, releases write lock | 
					
						
							|  |  |  | 	respWait    map[string]*requestOp          // active requests | 
					
						
							|  |  |  | 	subs        map[string]*ClientSubscription // active subscriptions | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type requestOp struct { | 
					
						
							|  |  |  | 	ids  []json.RawMessage | 
					
						
							|  |  |  | 	err  error | 
					
						
							|  |  |  | 	resp chan *jsonrpcMessage // receives up to len(ids) responses | 
					
						
							|  |  |  | 	sub  *ClientSubscription  // only set for EthSubscribe requests | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (op *requestOp) wait(ctx context.Context) (*jsonrpcMessage, error) { | 
					
						
							|  |  |  | 	select { | 
					
						
							|  |  |  | 	case <-ctx.Done(): | 
					
						
							|  |  |  | 		return nil, ctx.Err() | 
					
						
							|  |  |  | 	case resp := <-op.resp: | 
					
						
							|  |  |  | 		return resp, op.err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Dial creates a new client for the given URL. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a | 
					
						
							|  |  |  | // file name with no URL scheme, a local socket connection is established using UNIX | 
					
						
							|  |  |  | // domain sockets on supported platforms and named pipes on Windows. If you want to | 
					
						
							|  |  |  | // configure transport options, use DialHTTP, DialWebsocket or DialIPC instead. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // For websocket connections, the origin is set to the local host name. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The client reconnects automatically if the connection is lost. | 
					
						
							|  |  |  | func Dial(rawurl string) (*Client, error) { | 
					
						
							|  |  |  | 	return DialContext(context.Background(), rawurl) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // DialContext creates a new RPC client, just like Dial. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The context is used to cancel or time out the initial connection establishment. It does | 
					
						
							|  |  |  | // not affect subsequent interactions with the client. | 
					
						
							|  |  |  | func DialContext(ctx context.Context, rawurl string) (*Client, error) { | 
					
						
							|  |  |  | 	u, err := url.Parse(rawurl) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	switch u.Scheme { | 
					
						
							|  |  |  | 	case "http", "https": | 
					
						
							|  |  |  | 		return DialHTTP(rawurl) | 
					
						
							|  |  |  | 	case "ws", "wss": | 
					
						
							|  |  |  | 		return DialWebsocket(ctx, rawurl, "") | 
					
						
							|  |  |  | 	case "": | 
					
						
							|  |  |  | 		return DialIPC(ctx, rawurl) | 
					
						
							|  |  |  | 	default: | 
					
						
							|  |  |  | 		return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func newClient(initctx context.Context, connectFunc func(context.Context) (net.Conn, error)) (*Client, error) { | 
					
						
							|  |  |  | 	conn, err := connectFunc(initctx) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	_, isHTTP := conn.(*httpConn) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	c := &Client{ | 
					
						
							|  |  |  | 		writeConn:   conn, | 
					
						
							|  |  |  | 		isHTTP:      isHTTP, | 
					
						
							|  |  |  | 		connectFunc: connectFunc, | 
					
						
							|  |  |  | 		close:       make(chan struct{}), | 
					
						
							|  |  |  | 		didQuit:     make(chan struct{}), | 
					
						
							|  |  |  | 		reconnected: make(chan net.Conn), | 
					
						
							|  |  |  | 		readErr:     make(chan error), | 
					
						
							|  |  |  | 		readResp:    make(chan []*jsonrpcMessage), | 
					
						
							|  |  |  | 		requestOp:   make(chan *requestOp), | 
					
						
							|  |  |  | 		sendDone:    make(chan error, 1), | 
					
						
							|  |  |  | 		respWait:    make(map[string]*requestOp), | 
					
						
							|  |  |  | 		subs:        make(map[string]*ClientSubscription), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if !isHTTP { | 
					
						
							|  |  |  | 		go c.dispatch(conn) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return c, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (c *Client) nextID() json.RawMessage { | 
					
						
							|  |  |  | 	id := atomic.AddUint32(&c.idCounter, 1) | 
					
						
							|  |  |  | 	return []byte(strconv.FormatUint(uint64(id), 10)) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // SupportedModules calls the rpc_modules method, retrieving the list of | 
					
						
							|  |  |  | // APIs that are available on the server. | 
					
						
							|  |  |  | func (c *Client) SupportedModules() (map[string]string, error) { | 
					
						
							|  |  |  | 	var result map[string]string | 
					
						
							|  |  |  | 	ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) | 
					
						
							|  |  |  | 	defer cancel() | 
					
						
							|  |  |  | 	err := c.CallContext(ctx, &result, "rpc_modules") | 
					
						
							|  |  |  | 	return result, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Close closes the client, aborting any in-flight requests. | 
					
						
							|  |  |  | func (c *Client) Close() { | 
					
						
							|  |  |  | 	if c.isHTTP { | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	select { | 
					
						
							|  |  |  | 	case c.close <- struct{}{}: | 
					
						
							|  |  |  | 		<-c.didQuit | 
					
						
							|  |  |  | 	case <-c.didQuit: | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Call performs a JSON-RPC call with the given arguments and unmarshals into | 
					
						
							|  |  |  | // result if no error occurred. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The result must be a pointer so that package json can unmarshal into it. You | 
					
						
							|  |  |  | // can also pass nil, in which case the result is ignored. | 
					
						
							|  |  |  | func (c *Client) Call(result interface{}, method string, args ...interface{}) error { | 
					
						
							|  |  |  | 	ctx := context.Background() | 
					
						
							|  |  |  | 	return c.CallContext(ctx, result, method, args...) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // CallContext performs a JSON-RPC call with the given arguments. If the context is | 
					
						
							|  |  |  | // canceled before the call has successfully returned, CallContext returns immediately. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The result must be a pointer so that package json can unmarshal into it. You | 
					
						
							|  |  |  | // can also pass nil, in which case the result is ignored. | 
					
						
							|  |  |  | func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { | 
					
						
							|  |  |  | 	msg, err := c.newMessage(method, args...) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if c.isHTTP { | 
					
						
							|  |  |  | 		err = c.sendHTTP(ctx, op, msg) | 
					
						
							|  |  |  | 	} else { | 
					
						
							|  |  |  | 		err = c.send(ctx, op, msg) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// dispatch has accepted the request and will close the channel it when it quits. | 
					
						
							|  |  |  | 	switch resp, err := op.wait(ctx); { | 
					
						
							|  |  |  | 	case err != nil: | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	case resp.Error != nil: | 
					
						
							|  |  |  | 		return resp.Error | 
					
						
							|  |  |  | 	case len(resp.Result) == 0: | 
					
						
							|  |  |  | 		return ErrNoResult | 
					
						
							|  |  |  | 	default: | 
					
						
							|  |  |  | 		return json.Unmarshal(resp.Result, &result) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // BatchCall sends all given requests as a single batch and waits for the server | 
					
						
							|  |  |  | // to return a response for all of them. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // In contrast to Call, BatchCall only returns I/O errors. Any error specific to | 
					
						
							|  |  |  | // a request is reported through the Error field of the corresponding BatchElem. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // Note that batch calls may not be executed atomically on the server side. | 
					
						
							|  |  |  | func (c *Client) BatchCall(b []BatchElem) error { | 
					
						
							|  |  |  | 	ctx := context.Background() | 
					
						
							|  |  |  | 	return c.BatchCallContext(ctx, b) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // BatchCall sends all given requests as a single batch and waits for the server | 
					
						
							|  |  |  | // to return a response for all of them. The wait duration is bounded by the | 
					
						
							|  |  |  | // context's deadline. | 
					
						
							|  |  |  | // | 
					
						
							| 
									
										
										
										
											2016-08-04 21:18:13 +02:00
										 |  |  | // In contrast to CallContext, BatchCallContext only returns errors that have occurred | 
					
						
							|  |  |  | // while sending the request. Any error specific to a request is reported through the | 
					
						
							|  |  |  | // Error field of the corresponding BatchElem. | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | // | 
					
						
							|  |  |  | // Note that batch calls may not be executed atomically on the server side. | 
					
						
							|  |  |  | func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { | 
					
						
							|  |  |  | 	msgs := make([]*jsonrpcMessage, len(b)) | 
					
						
							|  |  |  | 	op := &requestOp{ | 
					
						
							|  |  |  | 		ids:  make([]json.RawMessage, len(b)), | 
					
						
							|  |  |  | 		resp: make(chan *jsonrpcMessage, len(b)), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	for i, elem := range b { | 
					
						
							|  |  |  | 		msg, err := c.newMessage(elem.Method, elem.Args...) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		msgs[i] = msg | 
					
						
							|  |  |  | 		op.ids[i] = msg.ID | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	var err error | 
					
						
							|  |  |  | 	if c.isHTTP { | 
					
						
							|  |  |  | 		err = c.sendBatchHTTP(ctx, op, msgs) | 
					
						
							|  |  |  | 	} else { | 
					
						
							|  |  |  | 		err = c.send(ctx, op, msgs) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Wait for all responses to come back. | 
					
						
							|  |  |  | 	for n := 0; n < len(b) && err == nil; n++ { | 
					
						
							|  |  |  | 		var resp *jsonrpcMessage | 
					
						
							|  |  |  | 		resp, err = op.wait(ctx) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			break | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		// Find the element corresponding to this response. | 
					
						
							|  |  |  | 		// The element is guaranteed to be present because dispatch | 
					
						
							|  |  |  | 		// only sends valid IDs to our channel. | 
					
						
							|  |  |  | 		var elem *BatchElem | 
					
						
							|  |  |  | 		for i := range msgs { | 
					
						
							|  |  |  | 			if bytes.Equal(msgs[i].ID, resp.ID) { | 
					
						
							|  |  |  | 				elem = &b[i] | 
					
						
							|  |  |  | 				break | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		if resp.Error != nil { | 
					
						
							|  |  |  | 			elem.Error = resp.Error | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		if len(resp.Result) == 0 { | 
					
						
							|  |  |  | 			elem.Error = ErrNoResult | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		elem.Error = json.Unmarshal(resp.Result, elem.Result) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // EthSubscribe calls the "eth_subscribe" method with the given arguments, | 
					
						
							|  |  |  | // registering a subscription. Server notifications for the subscription are | 
					
						
							|  |  |  | // sent to the given channel. The element type of the channel must match the | 
					
						
							|  |  |  | // expected type of content returned by the subscription. | 
					
						
							|  |  |  | // | 
					
						
							| 
									
										
										
										
											2016-08-05 13:24:48 +02:00
										 |  |  | // The context argument cancels the RPC request that sets up the subscription but has no | 
					
						
							|  |  |  | // effect on the subscription after EthSubscribe has returned. | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | // | 
					
						
							| 
									
										
										
										
											2016-08-04 21:18:13 +02:00
										 |  |  | // Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications | 
					
						
							|  |  |  | // before considering the subscriber dead. The subscription Err channel will receive | 
					
						
							|  |  |  | // ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure | 
					
						
							|  |  |  | // that the channel usually has at least one reader to prevent this issue. | 
					
						
							| 
									
										
										
										
											2016-08-05 13:24:48 +02:00
										 |  |  | func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 	// Check type of channel first. | 
					
						
							|  |  |  | 	chanVal := reflect.ValueOf(channel) | 
					
						
							|  |  |  | 	if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 { | 
					
						
							|  |  |  | 		panic("first argument to EthSubscribe must be a writable channel") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if chanVal.IsNil() { | 
					
						
							|  |  |  | 		panic("channel given to EthSubscribe must not be nil") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if c.isHTTP { | 
					
						
							|  |  |  | 		return nil, ErrNotificationsUnsupported | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-04-06 08:56:41 +02:00
										 |  |  | 	msg, err := c.newMessage("eth"+subscribeMethodSuffix, args...) | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	op := &requestOp{ | 
					
						
							|  |  |  | 		ids:  []json.RawMessage{msg.ID}, | 
					
						
							|  |  |  | 		resp: make(chan *jsonrpcMessage), | 
					
						
							| 
									
										
										
										
											2017-04-06 08:56:41 +02:00
										 |  |  | 		sub:  newClientSubscription(c, "eth", chanVal), | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Send the subscription request. | 
					
						
							|  |  |  | 	// The arrival and validity of the response is signaled on sub.quit. | 
					
						
							|  |  |  | 	if err := c.send(ctx, op, msg); err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if _, err := op.wait(ctx); err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return op.sub, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) { | 
					
						
							|  |  |  | 	params, err := json.Marshal(paramsIn) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return &jsonrpcMessage{Version: "2.0", ID: c.nextID(), Method: method, Params: params}, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // send registers op with the dispatch loop, then sends msg on the connection. | 
					
						
							|  |  |  | // if sending fails, op is deregistered. | 
					
						
							|  |  |  | func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error { | 
					
						
							|  |  |  | 	select { | 
					
						
							|  |  |  | 	case c.requestOp <- op: | 
					
						
							| 
									
										
										
										
											2017-02-22 14:10:07 +02:00
										 |  |  | 		log.Trace("", "msg", log.Lazy{Fn: func() string { | 
					
						
							|  |  |  | 			return fmt.Sprint("sending ", msg) | 
					
						
							|  |  |  | 		}}) | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 		err := c.write(ctx, msg) | 
					
						
							|  |  |  | 		c.sendDone <- err | 
					
						
							|  |  |  | 		return err | 
					
						
							| 
									
										
										
										
											2016-08-04 02:10:44 +02:00
										 |  |  | 	case <-ctx.Done(): | 
					
						
							|  |  |  | 		// This can happen if the client is overloaded or unable to keep up with | 
					
						
							|  |  |  | 		// subscription notifications. | 
					
						
							|  |  |  | 		return ctx.Err() | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 	case <-c.didQuit: | 
					
						
							|  |  |  | 		return ErrClientQuit | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (c *Client) write(ctx context.Context, msg interface{}) error { | 
					
						
							|  |  |  | 	deadline, ok := ctx.Deadline() | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							|  |  |  | 		deadline = time.Now().Add(defaultWriteTimeout) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	// The previous write failed. Try to establish a new connection. | 
					
						
							|  |  |  | 	if c.writeConn == nil { | 
					
						
							|  |  |  | 		if err := c.reconnect(ctx); err != nil { | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	c.writeConn.SetWriteDeadline(deadline) | 
					
						
							|  |  |  | 	err := json.NewEncoder(c.writeConn).Encode(msg) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		c.writeConn = nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (c *Client) reconnect(ctx context.Context) error { | 
					
						
							|  |  |  | 	newconn, err := c.connectFunc(ctx) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2017-02-22 14:10:07 +02:00
										 |  |  | 		log.Trace(fmt.Sprintf("reconnect failed: %v", err)) | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	select { | 
					
						
							|  |  |  | 	case c.reconnected <- newconn: | 
					
						
							|  |  |  | 		c.writeConn = newconn | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	case <-c.didQuit: | 
					
						
							|  |  |  | 		newconn.Close() | 
					
						
							|  |  |  | 		return ErrClientQuit | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // dispatch is the main loop of the client. | 
					
						
							|  |  |  | // It sends read messages to waiting calls to Call and BatchCall | 
					
						
							|  |  |  | // and subscription notifications to registered subscriptions. | 
					
						
							|  |  |  | func (c *Client) dispatch(conn net.Conn) { | 
					
						
							|  |  |  | 	// Spawn the initial read loop. | 
					
						
							|  |  |  | 	go c.read(conn) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	var ( | 
					
						
							|  |  |  | 		lastOp        *requestOp    // tracks last send operation | 
					
						
							|  |  |  | 		requestOpLock = c.requestOp // nil while the send lock is held | 
					
						
							|  |  |  | 		reading       = true        // if true, a read loop is running | 
					
						
							|  |  |  | 	) | 
					
						
							|  |  |  | 	defer close(c.didQuit) | 
					
						
							|  |  |  | 	defer func() { | 
					
						
							|  |  |  | 		c.closeRequestOps(ErrClientQuit) | 
					
						
							|  |  |  | 		conn.Close() | 
					
						
							|  |  |  | 		if reading { | 
					
						
							|  |  |  | 			// Empty read channels until read is dead. | 
					
						
							|  |  |  | 			for { | 
					
						
							|  |  |  | 				select { | 
					
						
							|  |  |  | 				case <-c.readResp: | 
					
						
							|  |  |  | 				case <-c.readErr: | 
					
						
							|  |  |  | 					return | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case <-c.close: | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Read path. | 
					
						
							|  |  |  | 		case batch := <-c.readResp: | 
					
						
							|  |  |  | 			for _, msg := range batch { | 
					
						
							|  |  |  | 				switch { | 
					
						
							|  |  |  | 				case msg.isNotification(): | 
					
						
							| 
									
										
										
										
											2017-02-22 14:10:07 +02:00
										 |  |  | 					log.Trace("", "msg", log.Lazy{Fn: func() string { | 
					
						
							|  |  |  | 						return fmt.Sprint("<-readResp: notification ", msg) | 
					
						
							|  |  |  | 					}}) | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 					c.handleNotification(msg) | 
					
						
							|  |  |  | 				case msg.isResponse(): | 
					
						
							| 
									
										
										
										
											2017-02-22 14:10:07 +02:00
										 |  |  | 					log.Trace("", "msg", log.Lazy{Fn: func() string { | 
					
						
							|  |  |  | 						return fmt.Sprint("<-readResp: response ", msg) | 
					
						
							|  |  |  | 					}}) | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 					c.handleResponse(msg) | 
					
						
							|  |  |  | 				default: | 
					
						
							| 
									
										
										
										
											2017-02-22 14:10:07 +02:00
										 |  |  | 					log.Debug("", "msg", log.Lazy{Fn: func() string { | 
					
						
							|  |  |  | 						return fmt.Sprint("<-readResp: dropping weird message", msg) | 
					
						
							|  |  |  | 					}}) | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 					// TODO: maybe close | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		case err := <-c.readErr: | 
					
						
							| 
									
										
										
										
											2017-02-22 14:10:07 +02:00
										 |  |  | 			log.Debug(fmt.Sprintf("<-readErr: %v", err)) | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 			c.closeRequestOps(err) | 
					
						
							|  |  |  | 			conn.Close() | 
					
						
							|  |  |  | 			reading = false | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		case newconn := <-c.reconnected: | 
					
						
							| 
									
										
										
										
											2017-02-22 14:10:07 +02:00
										 |  |  | 			log.Debug(fmt.Sprintf("<-reconnected: (reading=%t) %v", reading, conn.RemoteAddr())) | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 			if reading { | 
					
						
							|  |  |  | 				// Wait for the previous read loop to exit. This is a rare case. | 
					
						
							|  |  |  | 				conn.Close() | 
					
						
							|  |  |  | 				<-c.readErr | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			go c.read(newconn) | 
					
						
							|  |  |  | 			reading = true | 
					
						
							|  |  |  | 			conn = newconn | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Send path. | 
					
						
							|  |  |  | 		case op := <-requestOpLock: | 
					
						
							|  |  |  | 			// Stop listening for further send ops until the current one is done. | 
					
						
							|  |  |  | 			requestOpLock = nil | 
					
						
							|  |  |  | 			lastOp = op | 
					
						
							|  |  |  | 			for _, id := range op.ids { | 
					
						
							|  |  |  | 				c.respWait[string(id)] = op | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		case err := <-c.sendDone: | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				// Remove response handlers for the last send. We remove those here | 
					
						
							|  |  |  | 				// because the error is already handled in Call or BatchCall. When the | 
					
						
							|  |  |  | 				// read loop goes down, it will signal all other current operations. | 
					
						
							|  |  |  | 				for _, id := range lastOp.ids { | 
					
						
							|  |  |  | 					delete(c.respWait, string(id)) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			// Listen for send ops again. | 
					
						
							|  |  |  | 			requestOpLock = c.requestOp | 
					
						
							|  |  |  | 			lastOp = nil | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // closeRequestOps unblocks pending send ops and active subscriptions. | 
					
						
							|  |  |  | func (c *Client) closeRequestOps(err error) { | 
					
						
							|  |  |  | 	didClose := make(map[*requestOp]bool) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for id, op := range c.respWait { | 
					
						
							|  |  |  | 		// Remove the op so that later calls will not close op.resp again. | 
					
						
							|  |  |  | 		delete(c.respWait, id) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		if !didClose[op] { | 
					
						
							|  |  |  | 			op.err = err | 
					
						
							|  |  |  | 			close(op.resp) | 
					
						
							|  |  |  | 			didClose[op] = true | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	for id, sub := range c.subs { | 
					
						
							|  |  |  | 		delete(c.subs, id) | 
					
						
							|  |  |  | 		sub.quitWithError(err, false) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (c *Client) handleNotification(msg *jsonrpcMessage) { | 
					
						
							| 
									
										
										
										
											2017-04-06 08:56:41 +02:00
										 |  |  | 	if !strings.HasSuffix(msg.Method, notificationMethodSuffix) { | 
					
						
							| 
									
										
										
										
											2017-02-22 14:10:07 +02:00
										 |  |  | 		log.Debug(fmt.Sprint("dropping non-subscription message: ", msg)) | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	var subResult struct { | 
					
						
							|  |  |  | 		ID     string          `json:"subscription"` | 
					
						
							|  |  |  | 		Result json.RawMessage `json:"result"` | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if err := json.Unmarshal(msg.Params, &subResult); err != nil { | 
					
						
							| 
									
										
										
										
											2017-02-22 14:10:07 +02:00
										 |  |  | 		log.Debug(fmt.Sprint("dropping invalid subscription message: ", msg)) | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if c.subs[subResult.ID] != nil { | 
					
						
							|  |  |  | 		c.subs[subResult.ID].deliver(subResult.Result) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (c *Client) handleResponse(msg *jsonrpcMessage) { | 
					
						
							|  |  |  | 	op := c.respWait[string(msg.ID)] | 
					
						
							|  |  |  | 	if op == nil { | 
					
						
							| 
									
										
										
										
											2017-02-22 14:10:07 +02:00
										 |  |  | 		log.Debug(fmt.Sprintf("unsolicited response %v", msg)) | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	delete(c.respWait, string(msg.ID)) | 
					
						
							|  |  |  | 	// For normal responses, just forward the reply to Call/BatchCall. | 
					
						
							|  |  |  | 	if op.sub == nil { | 
					
						
							|  |  |  | 		op.resp <- msg | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	// For subscription responses, start the subscription if the server | 
					
						
							|  |  |  | 	// indicates success. EthSubscribe gets unblocked in either case through | 
					
						
							|  |  |  | 	// the op.resp channel. | 
					
						
							|  |  |  | 	defer close(op.resp) | 
					
						
							|  |  |  | 	if msg.Error != nil { | 
					
						
							|  |  |  | 		op.err = msg.Error | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil { | 
					
						
							|  |  |  | 		go op.sub.start() | 
					
						
							|  |  |  | 		c.subs[op.sub.subid] = op.sub | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Reading happens on a dedicated goroutine. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (c *Client) read(conn net.Conn) error { | 
					
						
							|  |  |  | 	var ( | 
					
						
							|  |  |  | 		buf json.RawMessage | 
					
						
							|  |  |  | 		dec = json.NewDecoder(conn) | 
					
						
							|  |  |  | 	) | 
					
						
							|  |  |  | 	readMessage := func() (rs []*jsonrpcMessage, err error) { | 
					
						
							|  |  |  | 		buf = buf[:0] | 
					
						
							|  |  |  | 		if err = dec.Decode(&buf); err != nil { | 
					
						
							|  |  |  | 			return nil, err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		if isBatch(buf) { | 
					
						
							|  |  |  | 			err = json.Unmarshal(buf, &rs) | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			rs = make([]*jsonrpcMessage, 1) | 
					
						
							|  |  |  | 			err = json.Unmarshal(buf, &rs[0]) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return rs, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		resp, err := readMessage() | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			c.readErr <- err | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		c.readResp <- resp | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Subscriptions. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // A ClientSubscription represents a subscription established through EthSubscribe. | 
					
						
							|  |  |  | type ClientSubscription struct { | 
					
						
							| 
									
										
										
										
											2017-04-06 08:56:41 +02:00
										 |  |  | 	client    *Client | 
					
						
							|  |  |  | 	etype     reflect.Type | 
					
						
							|  |  |  | 	channel   reflect.Value | 
					
						
							|  |  |  | 	namespace string | 
					
						
							|  |  |  | 	subid     string | 
					
						
							|  |  |  | 	in        chan json.RawMessage | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	quitOnce sync.Once     // ensures quit is closed once | 
					
						
							|  |  |  | 	quit     chan struct{} // quit is closed when the subscription exits | 
					
						
							|  |  |  | 	errOnce  sync.Once     // ensures err is closed once | 
					
						
							|  |  |  | 	err      chan error | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-04-06 08:56:41 +02:00
										 |  |  | func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription { | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 	sub := &ClientSubscription{ | 
					
						
							| 
									
										
										
										
											2017-04-06 08:56:41 +02:00
										 |  |  | 		client:    c, | 
					
						
							|  |  |  | 		namespace: namespace, | 
					
						
							|  |  |  | 		etype:     channel.Type().Elem(), | 
					
						
							|  |  |  | 		channel:   channel, | 
					
						
							|  |  |  | 		quit:      make(chan struct{}), | 
					
						
							|  |  |  | 		err:       make(chan error, 1), | 
					
						
							|  |  |  | 		in:        make(chan json.RawMessage), | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	return sub | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Err returns the subscription error channel. The intended use of Err is to schedule | 
					
						
							|  |  |  | // resubscription when the client connection is closed unexpectedly. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The error channel receives a value when the subscription has ended due | 
					
						
							| 
									
										
										
										
											2016-12-09 19:51:21 +01:00
										 |  |  | // to an error. The received error is nil if Close has been called | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | // on the underlying client and no other error has occurred. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The error channel is closed when Unsubscribe is called on the subscription. | 
					
						
							|  |  |  | func (sub *ClientSubscription) Err() <-chan error { | 
					
						
							|  |  |  | 	return sub.err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Unsubscribe unsubscribes the notification and closes the error channel. | 
					
						
							|  |  |  | // It can safely be called more than once. | 
					
						
							|  |  |  | func (sub *ClientSubscription) Unsubscribe() { | 
					
						
							|  |  |  | 	sub.quitWithError(nil, true) | 
					
						
							|  |  |  | 	sub.errOnce.Do(func() { close(sub.err) }) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) { | 
					
						
							|  |  |  | 	sub.quitOnce.Do(func() { | 
					
						
							| 
									
										
										
										
											2016-08-04 21:18:13 +02:00
										 |  |  | 		// The dispatch loop won't be able to execute the unsubscribe call | 
					
						
							|  |  |  | 		// if it is blocked on deliver. Close sub.quit first because it | 
					
						
							|  |  |  | 		// unblocks deliver. | 
					
						
							|  |  |  | 		close(sub.quit) | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 		if unsubscribeServer { | 
					
						
							|  |  |  | 			sub.requestUnsubscribe() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2016-12-09 19:51:21 +01:00
										 |  |  | 			if err == ErrClientQuit { | 
					
						
							|  |  |  | 				err = nil // Adhere to subscription semantics. | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 			sub.err <- err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) { | 
					
						
							|  |  |  | 	select { | 
					
						
							|  |  |  | 	case sub.in <- result: | 
					
						
							|  |  |  | 		return true | 
					
						
							|  |  |  | 	case <-sub.quit: | 
					
						
							|  |  |  | 		return false | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (sub *ClientSubscription) start() { | 
					
						
							|  |  |  | 	sub.quitWithError(sub.forward()) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) { | 
					
						
							|  |  |  | 	cases := []reflect.SelectCase{ | 
					
						
							|  |  |  | 		{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)}, | 
					
						
							| 
									
										
										
										
											2016-08-04 21:18:13 +02:00
										 |  |  | 		{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)}, | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 		{Dir: reflect.SelectSend, Chan: sub.channel}, | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2016-08-04 21:18:13 +02:00
										 |  |  | 	buffer := list.New() | 
					
						
							|  |  |  | 	defer buffer.Init() | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 	for { | 
					
						
							| 
									
										
										
										
											2016-08-04 21:18:13 +02:00
										 |  |  | 		var chosen int | 
					
						
							|  |  |  | 		var recv reflect.Value | 
					
						
							|  |  |  | 		if buffer.Len() == 0 { | 
					
						
							|  |  |  | 			// Idle, omit send case. | 
					
						
							|  |  |  | 			chosen, recv, _ = reflect.Select(cases[:2]) | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			// Non-empty buffer, send the first queued item. | 
					
						
							|  |  |  | 			cases[2].Send = reflect.ValueOf(buffer.Front().Value) | 
					
						
							|  |  |  | 			chosen, recv, _ = reflect.Select(cases) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		switch chosen { | 
					
						
							|  |  |  | 		case 0: // <-sub.quit | 
					
						
							|  |  |  | 			return nil, false | 
					
						
							|  |  |  | 		case 1: // <-sub.in | 
					
						
							|  |  |  | 			val, err := sub.unmarshal(recv.Interface().(json.RawMessage)) | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				return err, true | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2016-08-04 21:18:13 +02:00
										 |  |  | 			if buffer.Len() == maxClientSubscriptionBuffer { | 
					
						
							|  |  |  | 				return ErrSubscriptionQueueOverflow, true | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2016-08-04 21:18:13 +02:00
										 |  |  | 			buffer.PushBack(val) | 
					
						
							|  |  |  | 		case 2: // sub.channel<- | 
					
						
							|  |  |  | 			cases[2].Send = reflect.Value{} // Don't hold onto the value. | 
					
						
							|  |  |  | 			buffer.Remove(buffer.Front()) | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-08-04 21:18:13 +02:00
										 |  |  | func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) { | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | 	val := reflect.New(sub.etype) | 
					
						
							|  |  |  | 	err := json.Unmarshal(result, val.Interface()) | 
					
						
							| 
									
										
										
										
											2016-08-04 21:18:13 +02:00
										 |  |  | 	return val.Elem().Interface(), err | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (sub *ClientSubscription) requestUnsubscribe() error { | 
					
						
							|  |  |  | 	var result interface{} | 
					
						
							| 
									
										
										
										
											2017-04-06 08:56:41 +02:00
										 |  |  | 	return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid) | 
					
						
							| 
									
										
										
										
											2016-07-12 17:47:15 +02:00
										 |  |  | } |