| 
									
										
										
										
											2019-07-22 12:17:27 +03:00
										 |  |  | // Copyright 2019 The go-ethereum Authors | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | // This file is part of the go-ethereum library. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  | // it under the terms of the GNU Lesser General Public License as published by | 
					
						
							|  |  |  | // the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  | // (at your option) any later version. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is distributed in the hope that it will be useful, | 
					
						
							|  |  |  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | 
					
						
							|  |  |  | // GNU Lesser General Public License for more details. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // You should have received a copy of the GNU Lesser General Public License | 
					
						
							|  |  |  | // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package rpc | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 	"encoding/json" | 
					
						
							|  |  |  | 	"reflect" | 
					
						
							|  |  |  | 	"strconv" | 
					
						
							|  |  |  | 	"strings" | 
					
						
							|  |  |  | 	"sync" | 
					
						
							|  |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/log" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // handler handles JSON-RPC messages. There is one handler per connection. Note that | 
					
						
							|  |  |  | // handler is not safe for concurrent use. Message handling never blocks indefinitely | 
					
						
							|  |  |  | // because RPCs are processed on background goroutines launched by handler. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The entry points for incoming messages are: | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | //    h.handleMsg(message) | 
					
						
							|  |  |  | //    h.handleBatch(message) | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // Outgoing calls use the requestOp struct. Register the request before sending it | 
					
						
							|  |  |  | // on the connection: | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | //    op := &requestOp{ids: ...} | 
					
						
							|  |  |  | //    h.addRequestOp(op) | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // Now send the request, then wait for the reply to be delivered through handleMsg: | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | //    if err := op.wait(...); err != nil { | 
					
						
							|  |  |  | //        h.removeRequestOp(op) // timeout, etc. | 
					
						
							|  |  |  | //    } | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | type handler struct { | 
					
						
							|  |  |  | 	reg            *serviceRegistry | 
					
						
							|  |  |  | 	unsubscribeCb  *callback | 
					
						
							|  |  |  | 	idgen          func() ID                      // subscription ID generator | 
					
						
							|  |  |  | 	respWait       map[string]*requestOp          // active client requests | 
					
						
							|  |  |  | 	clientSubs     map[string]*ClientSubscription // active client subscriptions | 
					
						
							|  |  |  | 	callWG         sync.WaitGroup                 // pending call goroutines | 
					
						
							|  |  |  | 	rootCtx        context.Context                // canceled by close() | 
					
						
							|  |  |  | 	cancelRoot     func()                         // cancel function for rootCtx | 
					
						
							|  |  |  | 	conn           jsonWriter                     // where responses will be sent | 
					
						
							|  |  |  | 	log            log.Logger | 
					
						
							|  |  |  | 	allowSubscribe bool | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	subLock    sync.Mutex | 
					
						
							|  |  |  | 	serverSubs map[ID]*Subscription | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type callProc struct { | 
					
						
							|  |  |  | 	ctx       context.Context | 
					
						
							|  |  |  | 	notifiers []*Notifier | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler { | 
					
						
							|  |  |  | 	rootCtx, cancelRoot := context.WithCancel(connCtx) | 
					
						
							|  |  |  | 	h := &handler{ | 
					
						
							|  |  |  | 		reg:            reg, | 
					
						
							|  |  |  | 		idgen:          idgen, | 
					
						
							|  |  |  | 		conn:           conn, | 
					
						
							|  |  |  | 		respWait:       make(map[string]*requestOp), | 
					
						
							|  |  |  | 		clientSubs:     make(map[string]*ClientSubscription), | 
					
						
							|  |  |  | 		rootCtx:        rootCtx, | 
					
						
							|  |  |  | 		cancelRoot:     cancelRoot, | 
					
						
							|  |  |  | 		allowSubscribe: true, | 
					
						
							|  |  |  | 		serverSubs:     make(map[ID]*Subscription), | 
					
						
							|  |  |  | 		log:            log.Root(), | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2019-11-18 09:40:59 +01:00
										 |  |  | 	if conn.remoteAddr() != "" { | 
					
						
							|  |  |  | 		h.log = h.log.New("conn", conn.remoteAddr()) | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe)) | 
					
						
							|  |  |  | 	return h | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // handleBatch executes all messages in a batch and returns the responses. | 
					
						
							|  |  |  | func (h *handler) handleBatch(msgs []*jsonrpcMessage) { | 
					
						
							|  |  |  | 	// Emit error response for empty batches: | 
					
						
							|  |  |  | 	if len(msgs) == 0 { | 
					
						
							|  |  |  | 		h.startCallProc(func(cp *callProc) { | 
					
						
							| 
									
										
										
										
											2019-11-18 09:40:59 +01:00
										 |  |  | 			h.conn.writeJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"})) | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 		}) | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Handle non-call messages first: | 
					
						
							|  |  |  | 	calls := make([]*jsonrpcMessage, 0, len(msgs)) | 
					
						
							|  |  |  | 	for _, msg := range msgs { | 
					
						
							|  |  |  | 		if handled := h.handleImmediate(msg); !handled { | 
					
						
							|  |  |  | 			calls = append(calls, msg) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if len(calls) == 0 { | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	// Process calls on a goroutine because they may block indefinitely: | 
					
						
							|  |  |  | 	h.startCallProc(func(cp *callProc) { | 
					
						
							|  |  |  | 		answers := make([]*jsonrpcMessage, 0, len(msgs)) | 
					
						
							|  |  |  | 		for _, msg := range calls { | 
					
						
							|  |  |  | 			if answer := h.handleCallMsg(cp, msg); answer != nil { | 
					
						
							|  |  |  | 				answers = append(answers, answer) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		h.addSubscriptions(cp.notifiers) | 
					
						
							|  |  |  | 		if len(answers) > 0 { | 
					
						
							| 
									
										
										
										
											2019-11-18 09:40:59 +01:00
										 |  |  | 			h.conn.writeJSON(cp.ctx, answers) | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 		} | 
					
						
							|  |  |  | 		for _, n := range cp.notifiers { | 
					
						
							|  |  |  | 			n.activate() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // handleMsg handles a single message. | 
					
						
							|  |  |  | func (h *handler) handleMsg(msg *jsonrpcMessage) { | 
					
						
							|  |  |  | 	if ok := h.handleImmediate(msg); ok { | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	h.startCallProc(func(cp *callProc) { | 
					
						
							|  |  |  | 		answer := h.handleCallMsg(cp, msg) | 
					
						
							|  |  |  | 		h.addSubscriptions(cp.notifiers) | 
					
						
							|  |  |  | 		if answer != nil { | 
					
						
							| 
									
										
										
										
											2019-11-18 09:40:59 +01:00
										 |  |  | 			h.conn.writeJSON(cp.ctx, answer) | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 		} | 
					
						
							|  |  |  | 		for _, n := range cp.notifiers { | 
					
						
							|  |  |  | 			n.activate() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // close cancels all requests except for inflightReq and waits for | 
					
						
							|  |  |  | // call goroutines to shut down. | 
					
						
							|  |  |  | func (h *handler) close(err error, inflightReq *requestOp) { | 
					
						
							|  |  |  | 	h.cancelAllRequests(err, inflightReq) | 
					
						
							|  |  |  | 	h.callWG.Wait() | 
					
						
							| 
									
										
										
										
											2019-04-10 16:47:09 +08:00
										 |  |  | 	h.cancelRoot() | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	h.cancelServerSubscriptions(err) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // addRequestOp registers a request operation. | 
					
						
							|  |  |  | func (h *handler) addRequestOp(op *requestOp) { | 
					
						
							|  |  |  | 	for _, id := range op.ids { | 
					
						
							|  |  |  | 		h.respWait[string(id)] = op | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // removeRequestOps stops waiting for the given request IDs. | 
					
						
							|  |  |  | func (h *handler) removeRequestOp(op *requestOp) { | 
					
						
							|  |  |  | 	for _, id := range op.ids { | 
					
						
							|  |  |  | 		delete(h.respWait, string(id)) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // cancelAllRequests unblocks and removes pending requests and active subscriptions. | 
					
						
							|  |  |  | func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) { | 
					
						
							|  |  |  | 	didClose := make(map[*requestOp]bool) | 
					
						
							|  |  |  | 	if inflightReq != nil { | 
					
						
							|  |  |  | 		didClose[inflightReq] = true | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for id, op := range h.respWait { | 
					
						
							|  |  |  | 		// Remove the op so that later calls will not close op.resp again. | 
					
						
							|  |  |  | 		delete(h.respWait, id) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		if !didClose[op] { | 
					
						
							|  |  |  | 			op.err = err | 
					
						
							|  |  |  | 			close(op.resp) | 
					
						
							|  |  |  | 			didClose[op] = true | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	for id, sub := range h.clientSubs { | 
					
						
							|  |  |  | 		delete(h.clientSubs, id) | 
					
						
							| 
									
										
										
										
											2021-03-30 20:09:30 +02:00
										 |  |  | 		sub.close(err) | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (h *handler) addSubscriptions(nn []*Notifier) { | 
					
						
							|  |  |  | 	h.subLock.Lock() | 
					
						
							|  |  |  | 	defer h.subLock.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for _, n := range nn { | 
					
						
							|  |  |  | 		if sub := n.takeSubscription(); sub != nil { | 
					
						
							|  |  |  | 			h.serverSubs[sub.ID] = sub | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // cancelServerSubscriptions removes all subscriptions and closes their error channels. | 
					
						
							|  |  |  | func (h *handler) cancelServerSubscriptions(err error) { | 
					
						
							|  |  |  | 	h.subLock.Lock() | 
					
						
							|  |  |  | 	defer h.subLock.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for id, s := range h.serverSubs { | 
					
						
							|  |  |  | 		s.err <- err | 
					
						
							|  |  |  | 		close(s.err) | 
					
						
							|  |  |  | 		delete(h.serverSubs, id) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group. | 
					
						
							|  |  |  | func (h *handler) startCallProc(fn func(*callProc)) { | 
					
						
							|  |  |  | 	h.callWG.Add(1) | 
					
						
							|  |  |  | 	go func() { | 
					
						
							|  |  |  | 		ctx, cancel := context.WithCancel(h.rootCtx) | 
					
						
							|  |  |  | 		defer h.callWG.Done() | 
					
						
							|  |  |  | 		defer cancel() | 
					
						
							|  |  |  | 		fn(&callProc{ctx: ctx}) | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // handleImmediate executes non-call messages. It returns false if the message is a | 
					
						
							|  |  |  | // call or requires a reply. | 
					
						
							|  |  |  | func (h *handler) handleImmediate(msg *jsonrpcMessage) bool { | 
					
						
							|  |  |  | 	start := time.Now() | 
					
						
							|  |  |  | 	switch { | 
					
						
							|  |  |  | 	case msg.isNotification(): | 
					
						
							|  |  |  | 		if strings.HasSuffix(msg.Method, notificationMethodSuffix) { | 
					
						
							|  |  |  | 			h.handleSubscriptionResult(msg) | 
					
						
							|  |  |  | 			return true | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return false | 
					
						
							|  |  |  | 	case msg.isResponse(): | 
					
						
							|  |  |  | 		h.handleResponse(msg) | 
					
						
							|  |  |  | 		h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "t", time.Since(start)) | 
					
						
							|  |  |  | 		return true | 
					
						
							|  |  |  | 	default: | 
					
						
							|  |  |  | 		return false | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // handleSubscriptionResult processes subscription notifications. | 
					
						
							|  |  |  | func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) { | 
					
						
							|  |  |  | 	var result subscriptionResult | 
					
						
							|  |  |  | 	if err := json.Unmarshal(msg.Params, &result); err != nil { | 
					
						
							|  |  |  | 		h.log.Debug("Dropping invalid subscription message") | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if h.clientSubs[result.ID] != nil { | 
					
						
							|  |  |  | 		h.clientSubs[result.ID].deliver(result.Result) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // handleResponse processes method call responses. | 
					
						
							|  |  |  | func (h *handler) handleResponse(msg *jsonrpcMessage) { | 
					
						
							|  |  |  | 	op := h.respWait[string(msg.ID)] | 
					
						
							|  |  |  | 	if op == nil { | 
					
						
							|  |  |  | 		h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID}) | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	delete(h.respWait, string(msg.ID)) | 
					
						
							|  |  |  | 	// For normal responses, just forward the reply to Call/BatchCall. | 
					
						
							|  |  |  | 	if op.sub == nil { | 
					
						
							|  |  |  | 		op.resp <- msg | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	// For subscription responses, start the subscription if the server | 
					
						
							|  |  |  | 	// indicates success. EthSubscribe gets unblocked in either case through | 
					
						
							|  |  |  | 	// the op.resp channel. | 
					
						
							|  |  |  | 	defer close(op.resp) | 
					
						
							|  |  |  | 	if msg.Error != nil { | 
					
						
							|  |  |  | 		op.err = msg.Error | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil { | 
					
						
							| 
									
										
										
										
											2021-03-30 20:09:30 +02:00
										 |  |  | 		go op.sub.run() | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 		h.clientSubs[op.sub.subid] = op.sub | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // handleCallMsg executes a call message and returns the answer. | 
					
						
							|  |  |  | func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage { | 
					
						
							|  |  |  | 	start := time.Now() | 
					
						
							|  |  |  | 	switch { | 
					
						
							|  |  |  | 	case msg.isNotification(): | 
					
						
							|  |  |  | 		h.handleCall(ctx, msg) | 
					
						
							|  |  |  | 		h.log.Debug("Served "+msg.Method, "t", time.Since(start)) | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	case msg.isCall(): | 
					
						
							|  |  |  | 		resp := h.handleCall(ctx, msg) | 
					
						
							| 
									
										
										
										
											2020-06-08 10:09:49 +02:00
										 |  |  | 		var ctx []interface{} | 
					
						
							|  |  |  | 		ctx = append(ctx, "reqid", idForLog{msg.ID}, "t", time.Since(start)) | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 		if resp.Error != nil { | 
					
						
							| 
									
										
										
										
											2020-06-08 10:09:49 +02:00
										 |  |  | 			ctx = append(ctx, "err", resp.Error.Message) | 
					
						
							|  |  |  | 			if resp.Error.Data != nil { | 
					
						
							|  |  |  | 				ctx = append(ctx, "errdata", resp.Error.Data) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			h.log.Warn("Served "+msg.Method, ctx...) | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 		} else { | 
					
						
							| 
									
										
										
										
											2020-06-08 10:09:49 +02:00
										 |  |  | 			h.log.Debug("Served "+msg.Method, ctx...) | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 		} | 
					
						
							|  |  |  | 		return resp | 
					
						
							|  |  |  | 	case msg.hasValidID(): | 
					
						
							|  |  |  | 		return msg.errorResponse(&invalidRequestError{"invalid request"}) | 
					
						
							|  |  |  | 	default: | 
					
						
							|  |  |  | 		return errorMessage(&invalidRequestError{"invalid request"}) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // handleCall processes method calls. | 
					
						
							|  |  |  | func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage { | 
					
						
							|  |  |  | 	if msg.isSubscribe() { | 
					
						
							|  |  |  | 		return h.handleSubscribe(cp, msg) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	var callb *callback | 
					
						
							|  |  |  | 	if msg.isUnsubscribe() { | 
					
						
							|  |  |  | 		callb = h.unsubscribeCb | 
					
						
							|  |  |  | 	} else { | 
					
						
							|  |  |  | 		callb = h.reg.callback(msg.Method) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if callb == nil { | 
					
						
							|  |  |  | 		return msg.errorResponse(&methodNotFoundError{method: msg.Method}) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	args, err := parsePositionalArguments(msg.Params, callb.argTypes) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return msg.errorResponse(&invalidParamsError{err.Error()}) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2020-04-03 18:36:44 +08:00
										 |  |  | 	start := time.Now() | 
					
						
							|  |  |  | 	answer := h.runMethod(cp.ctx, msg, callb, args) | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-04-03 18:36:44 +08:00
										 |  |  | 	// Collect the statistics for RPC calls if metrics is enabled. | 
					
						
							|  |  |  | 	// We only care about pure rpc call. Filter out subscription. | 
					
						
							|  |  |  | 	if callb != h.unsubscribeCb { | 
					
						
							|  |  |  | 		rpcRequestGauge.Inc(1) | 
					
						
							|  |  |  | 		if answer.Error != nil { | 
					
						
							|  |  |  | 			failedReqeustGauge.Inc(1) | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			successfulRequestGauge.Inc(1) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		rpcServingTimer.UpdateSince(start) | 
					
						
							|  |  |  | 		newRPCServingTimer(msg.Method, answer.Error == nil).UpdateSince(start) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return answer | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // handleSubscribe processes *_subscribe method calls. | 
					
						
							|  |  |  | func (h *handler) handleSubscribe(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage { | 
					
						
							|  |  |  | 	if !h.allowSubscribe { | 
					
						
							|  |  |  | 		return msg.errorResponse(ErrNotificationsUnsupported) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Subscription method name is first argument. | 
					
						
							|  |  |  | 	name, err := parseSubscriptionName(msg.Params) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return msg.errorResponse(&invalidParamsError{err.Error()}) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	namespace := msg.namespace() | 
					
						
							|  |  |  | 	callb := h.reg.subscription(namespace, name) | 
					
						
							|  |  |  | 	if callb == nil { | 
					
						
							|  |  |  | 		return msg.errorResponse(&subscriptionNotFoundError{namespace, name}) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Parse subscription name arg too, but remove it before calling the callback. | 
					
						
							|  |  |  | 	argTypes := append([]reflect.Type{stringType}, callb.argTypes...) | 
					
						
							|  |  |  | 	args, err := parsePositionalArguments(msg.Params, argTypes) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return msg.errorResponse(&invalidParamsError{err.Error()}) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	args = args[1:] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Install notifier in context so the subscription handler can find it. | 
					
						
							|  |  |  | 	n := &Notifier{h: h, namespace: namespace} | 
					
						
							|  |  |  | 	cp.notifiers = append(cp.notifiers, n) | 
					
						
							|  |  |  | 	ctx := context.WithValue(cp.ctx, notifierKey{}, n) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return h.runMethod(ctx, msg, callb, args) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // runMethod runs the Go callback for an RPC method. | 
					
						
							|  |  |  | func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *callback, args []reflect.Value) *jsonrpcMessage { | 
					
						
							|  |  |  | 	result, err := callb.call(ctx, msg.Method, args) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return msg.errorResponse(err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return msg.response(result) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // unsubscribe is the callback function for all *_unsubscribe calls. | 
					
						
							|  |  |  | func (h *handler) unsubscribe(ctx context.Context, id ID) (bool, error) { | 
					
						
							|  |  |  | 	h.subLock.Lock() | 
					
						
							|  |  |  | 	defer h.subLock.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	s := h.serverSubs[id] | 
					
						
							|  |  |  | 	if s == nil { | 
					
						
							|  |  |  | 		return false, ErrSubscriptionNotFound | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	close(s.err) | 
					
						
							|  |  |  | 	delete(h.serverSubs, id) | 
					
						
							|  |  |  | 	return true, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type idForLog struct{ json.RawMessage } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (id idForLog) String() string { | 
					
						
							|  |  |  | 	if s, err := strconv.Unquote(string(id.RawMessage)); err == nil { | 
					
						
							|  |  |  | 		return s | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return string(id.RawMessage) | 
					
						
							|  |  |  | } |