| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | // Copyright 2016 The go-ethereum Authors | 
					
						
							|  |  |  | // This file is part of the go-ethereum library. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  | // it under the terms of the GNU Lesser General Public License as published by | 
					
						
							|  |  |  | // the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  | // (at your option) any later version. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is distributed in the hope that it will be useful, | 
					
						
							|  |  |  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | 
					
						
							|  |  |  | // GNU Lesser General Public License for more details. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // You should have received a copy of the GNU Lesser General Public License | 
					
						
							|  |  |  | // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package rpc | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	"container/list" | 
					
						
							| 
									
										
										
										
											2017-03-22 18:20:33 +01:00
										 |  |  | 	"context" | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	crand "crypto/rand" | 
					
						
							|  |  |  | 	"encoding/binary" | 
					
						
							|  |  |  | 	"encoding/hex" | 
					
						
							|  |  |  | 	"encoding/json" | 
					
						
							| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | 	"errors" | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	"math/rand" | 
					
						
							|  |  |  | 	"reflect" | 
					
						
							|  |  |  | 	"strings" | 
					
						
							| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | 	"sync" | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	"time" | 
					
						
							| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | var ( | 
					
						
							|  |  |  | 	// ErrNotificationsUnsupported is returned when the connection doesn't support notifications | 
					
						
							|  |  |  | 	ErrNotificationsUnsupported = errors.New("notifications not supported") | 
					
						
							|  |  |  | 	// ErrNotificationNotFound is returned when the notification for the given id is not found | 
					
						
							|  |  |  | 	ErrSubscriptionNotFound = errors.New("subscription not found") | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | var globalGen = randomIDGenerator() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-01-06 19:44:35 +02:00
										 |  |  | // ID defines a pseudo random number that is used to identify RPC subscriptions. | 
					
						
							| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | type ID string | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | // NewID returns a new, random ID. | 
					
						
							|  |  |  | func NewID() ID { | 
					
						
							|  |  |  | 	return globalGen() | 
					
						
							| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | // randomIDGenerator returns a function generates a random IDs. | 
					
						
							|  |  |  | func randomIDGenerator() func() ID { | 
					
						
							| 
									
										
										
										
											2020-07-31 16:20:31 +02:00
										 |  |  | 	var buf = make([]byte, 8) | 
					
						
							|  |  |  | 	var seed int64 | 
					
						
							|  |  |  | 	if _, err := crand.Read(buf); err == nil { | 
					
						
							|  |  |  | 		seed = int64(binary.BigEndian.Uint64(buf)) | 
					
						
							|  |  |  | 	} else { | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 		seed = int64(time.Now().Nanosecond()) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2020-07-31 16:20:31 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	var ( | 
					
						
							|  |  |  | 		mu  sync.Mutex | 
					
						
							|  |  |  | 		rng = rand.New(rand.NewSource(seed)) | 
					
						
							|  |  |  | 	) | 
					
						
							|  |  |  | 	return func() ID { | 
					
						
							|  |  |  | 		mu.Lock() | 
					
						
							|  |  |  | 		defer mu.Unlock() | 
					
						
							|  |  |  | 		id := make([]byte, 16) | 
					
						
							|  |  |  | 		rng.Read(id) | 
					
						
							|  |  |  | 		return encodeID(id) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | func encodeID(b []byte) ID { | 
					
						
							|  |  |  | 	id := hex.EncodeToString(b) | 
					
						
							|  |  |  | 	id = strings.TrimLeft(id, "0") | 
					
						
							|  |  |  | 	if id == "" { | 
					
						
							|  |  |  | 		id = "0" // ID's are RPC quantities, no leading zero's and 0 is 0x0. | 
					
						
							| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	return ID("0x" + id) | 
					
						
							| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | type notifierKey struct{} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | // NotifierFromContext returns the Notifier value stored in ctx, if any. | 
					
						
							|  |  |  | func NotifierFromContext(ctx context.Context) (*Notifier, bool) { | 
					
						
							|  |  |  | 	n, ok := ctx.Value(notifierKey{}).(*Notifier) | 
					
						
							|  |  |  | 	return n, ok | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | // Notifier is tied to a RPC connection that supports subscriptions. | 
					
						
							|  |  |  | // Server callbacks use the notifier to send notifications. | 
					
						
							|  |  |  | type Notifier struct { | 
					
						
							|  |  |  | 	h         *handler | 
					
						
							|  |  |  | 	namespace string | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	mu           sync.Mutex | 
					
						
							|  |  |  | 	sub          *Subscription | 
					
						
							|  |  |  | 	buffer       []json.RawMessage | 
					
						
							|  |  |  | 	callReturned bool | 
					
						
							|  |  |  | 	activated    bool | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | // CreateSubscription returns a new subscription that is coupled to the | 
					
						
							|  |  |  | // RPC connection. By default subscriptions are inactive and notifications | 
					
						
							|  |  |  | // are dropped until the subscription is marked as active. This is done | 
					
						
							|  |  |  | // by the RPC server after the subscription ID is send to the client. | 
					
						
							|  |  |  | func (n *Notifier) CreateSubscription() *Subscription { | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	n.mu.Lock() | 
					
						
							|  |  |  | 	defer n.mu.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if n.sub != nil { | 
					
						
							|  |  |  | 		panic("can't create multiple subscriptions with Notifier") | 
					
						
							|  |  |  | 	} else if n.callReturned { | 
					
						
							|  |  |  | 		panic("can't create subscription after subscribe call has returned") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	n.sub = &Subscription{ID: n.h.idgen(), namespace: n.namespace, err: make(chan error, 1)} | 
					
						
							|  |  |  | 	return n.sub | 
					
						
							| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Notify sends a notification to the client with the given data as payload. | 
					
						
							|  |  |  | // If an error occurs the RPC connection is closed and the error is returned. | 
					
						
							|  |  |  | func (n *Notifier) Notify(id ID, data interface{}) error { | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	enc, err := json.Marshal(data) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-10-09 16:34:24 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	n.mu.Lock() | 
					
						
							|  |  |  | 	defer n.mu.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if n.sub == nil { | 
					
						
							|  |  |  | 		panic("can't Notify before subscription is created") | 
					
						
							|  |  |  | 	} else if n.sub.ID != id { | 
					
						
							|  |  |  | 		panic("Notify with wrong ID") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if n.activated { | 
					
						
							|  |  |  | 		return n.send(n.sub, enc) | 
					
						
							| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	n.buffer = append(n.buffer, enc) | 
					
						
							| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | // Closed returns a channel that is closed when the RPC connection is closed. | 
					
						
							|  |  |  | // Deprecated: use subscription error channel | 
					
						
							|  |  |  | func (n *Notifier) Closed() <-chan interface{} { | 
					
						
							| 
									
										
										
										
											2019-11-18 09:40:59 +01:00
										 |  |  | 	return n.h.conn.closed() | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // takeSubscription returns the subscription (if one has been created). No subscription can | 
					
						
							|  |  |  | // be created after this call. | 
					
						
							|  |  |  | func (n *Notifier) takeSubscription() *Subscription { | 
					
						
							|  |  |  | 	n.mu.Lock() | 
					
						
							|  |  |  | 	defer n.mu.Unlock() | 
					
						
							|  |  |  | 	n.callReturned = true | 
					
						
							|  |  |  | 	return n.sub | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-03-27 13:52:53 +01:00
										 |  |  | // activate is called after the subscription ID was sent to client. Notifications are | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | // buffered before activation. This prevents notifications being sent to the client before | 
					
						
							|  |  |  | // the subscription ID is sent to the client. | 
					
						
							|  |  |  | func (n *Notifier) activate() error { | 
					
						
							|  |  |  | 	n.mu.Lock() | 
					
						
							|  |  |  | 	defer n.mu.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for _, data := range n.buffer { | 
					
						
							|  |  |  | 		if err := n.send(n.sub, data); err != nil { | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2018-10-09 16:34:24 +02:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	n.activated = true | 
					
						
							|  |  |  | 	return nil | 
					
						
							| 
									
										
										
										
											2018-10-09 16:34:24 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | func (n *Notifier) send(sub *Subscription, data json.RawMessage) error { | 
					
						
							|  |  |  | 	params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data}) | 
					
						
							|  |  |  | 	ctx := context.Background() | 
					
						
							| 
									
										
										
										
											2019-11-18 09:40:59 +01:00
										 |  |  | 	return n.h.conn.writeJSON(ctx, &jsonrpcMessage{ | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 		Version: vsn, | 
					
						
							|  |  |  | 		Method:  n.namespace + notificationMethodSuffix, | 
					
						
							|  |  |  | 		Params:  params, | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-03-27 13:52:53 +01:00
										 |  |  | // A Subscription is created by a notifier and tied to that notifier. The client can use | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | // this subscription to wait for an unsubscribe request for the client, see Err(). | 
					
						
							|  |  |  | type Subscription struct { | 
					
						
							|  |  |  | 	ID        ID | 
					
						
							|  |  |  | 	namespace string | 
					
						
							|  |  |  | 	err       chan error // closed on unsubscribe | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Err returns a channel that is closed when the client send an unsubscribe request. | 
					
						
							|  |  |  | func (s *Subscription) Err() <-chan error { | 
					
						
							|  |  |  | 	return s.err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // MarshalJSON marshals a subscription as its ID. | 
					
						
							|  |  |  | func (s *Subscription) MarshalJSON() ([]byte, error) { | 
					
						
							|  |  |  | 	return json.Marshal(s.ID) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // ClientSubscription is a subscription established through the Client's Subscribe or | 
					
						
							|  |  |  | // EthSubscribe methods. | 
					
						
							|  |  |  | type ClientSubscription struct { | 
					
						
							|  |  |  | 	client    *Client | 
					
						
							|  |  |  | 	etype     reflect.Type | 
					
						
							|  |  |  | 	channel   reflect.Value | 
					
						
							|  |  |  | 	namespace string | 
					
						
							|  |  |  | 	subid     string | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-03-30 20:09:30 +02:00
										 |  |  | 	// The in channel receives notification values from client dispatcher. | 
					
						
							|  |  |  | 	in chan json.RawMessage | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// The error channel receives the error from the forwarding loop. | 
					
						
							|  |  |  | 	// It is closed by Unsubscribe. | 
					
						
							|  |  |  | 	err     chan error | 
					
						
							|  |  |  | 	errOnce sync.Once | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Closing of the subscription is requested by sending on 'quit'. This is handled by | 
					
						
							|  |  |  | 	// the forwarding loop, which closes 'forwardDone' when it has stopped sending to | 
					
						
							|  |  |  | 	// sub.channel. Finally, 'unsubDone' is closed after unsubscribing on the server side. | 
					
						
							|  |  |  | 	quit        chan error | 
					
						
							|  |  |  | 	forwardDone chan struct{} | 
					
						
							|  |  |  | 	unsubDone   chan struct{} | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-03-30 20:09:30 +02:00
										 |  |  | // This is the sentinel value sent on sub.quit when Unsubscribe is called. | 
					
						
							|  |  |  | var errUnsubscribed = errors.New("unsubscribed") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription { | 
					
						
							|  |  |  | 	sub := &ClientSubscription{ | 
					
						
							| 
									
										
										
										
											2021-03-30 20:09:30 +02:00
										 |  |  | 		client:      c, | 
					
						
							|  |  |  | 		namespace:   namespace, | 
					
						
							|  |  |  | 		etype:       channel.Type().Elem(), | 
					
						
							|  |  |  | 		channel:     channel, | 
					
						
							|  |  |  | 		in:          make(chan json.RawMessage), | 
					
						
							|  |  |  | 		quit:        make(chan error), | 
					
						
							|  |  |  | 		forwardDone: make(chan struct{}), | 
					
						
							|  |  |  | 		unsubDone:   make(chan struct{}), | 
					
						
							|  |  |  | 		err:         make(chan error, 1), | 
					
						
							| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	return sub | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Err returns the subscription error channel. The intended use of Err is to schedule | 
					
						
							|  |  |  | // resubscription when the client connection is closed unexpectedly. | 
					
						
							|  |  |  | // | 
					
						
							| 
									
										
										
										
											2021-03-30 20:09:30 +02:00
										 |  |  | // The error channel receives a value when the subscription has ended due to an error. The | 
					
						
							|  |  |  | // received error is nil if Close has been called on the underlying client and no other | 
					
						
							|  |  |  | // error has occurred. | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | // | 
					
						
							|  |  |  | // The error channel is closed when Unsubscribe is called on the subscription. | 
					
						
							|  |  |  | func (sub *ClientSubscription) Err() <-chan error { | 
					
						
							|  |  |  | 	return sub.err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Unsubscribe unsubscribes the notification and closes the error channel. | 
					
						
							|  |  |  | // It can safely be called more than once. | 
					
						
							|  |  |  | func (sub *ClientSubscription) Unsubscribe() { | 
					
						
							| 
									
										
										
										
											2021-03-30 20:09:30 +02:00
										 |  |  | 	sub.errOnce.Do(func() { | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case sub.quit <- errUnsubscribed: | 
					
						
							|  |  |  | 			<-sub.unsubDone | 
					
						
							|  |  |  | 		case <-sub.unsubDone: | 
					
						
							| 
									
										
										
										
											2018-10-09 16:34:24 +02:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2021-03-30 20:09:30 +02:00
										 |  |  | 		close(sub.err) | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	}) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-03-30 20:09:30 +02:00
										 |  |  | // deliver is called by the client's message dispatcher to send a notification value. | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) { | 
					
						
							|  |  |  | 	select { | 
					
						
							|  |  |  | 	case sub.in <- result: | 
					
						
							|  |  |  | 		return true | 
					
						
							| 
									
										
										
										
											2021-03-30 20:09:30 +02:00
										 |  |  | 	case <-sub.forwardDone: | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 		return false | 
					
						
							| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-03-30 20:09:30 +02:00
										 |  |  | // close is called by the client's message dispatcher when the connection is closed. | 
					
						
							|  |  |  | func (sub *ClientSubscription) close(err error) { | 
					
						
							|  |  |  | 	select { | 
					
						
							|  |  |  | 	case sub.quit <- err: | 
					
						
							|  |  |  | 	case <-sub.forwardDone: | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // run is the forwarding loop of the subscription. It runs in its own goroutine and | 
					
						
							|  |  |  | // is launched by the client's handler after the subscription has been created. | 
					
						
							|  |  |  | func (sub *ClientSubscription) run() { | 
					
						
							|  |  |  | 	defer close(sub.unsubDone) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	unsubscribe, err := sub.forward() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// The client's dispatch loop won't be able to execute the unsubscribe call if it is | 
					
						
							|  |  |  | 	// blocked in sub.deliver() or sub.close(). Closing forwardDone unblocks them. | 
					
						
							|  |  |  | 	close(sub.forwardDone) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Call the unsubscribe method on the server. | 
					
						
							|  |  |  | 	if unsubscribe { | 
					
						
							|  |  |  | 		sub.requestUnsubscribe() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Send the error. | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		if err == ErrClientQuit { | 
					
						
							|  |  |  | 			// ErrClientQuit gets here when Client.Close is called. This is reported as a | 
					
						
							|  |  |  | 			// nil error because it's not an error, but we can't close sub.err here. | 
					
						
							|  |  |  | 			err = nil | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		sub.err <- err | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-03-30 20:09:30 +02:00
										 |  |  | // forward is the forwarding loop. It takes in RPC notifications and sends them | 
					
						
							|  |  |  | // on the subscription channel. | 
					
						
							| 
									
										
										
										
											2019-11-20 09:06:21 +01:00
										 |  |  | func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) { | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	cases := []reflect.SelectCase{ | 
					
						
							|  |  |  | 		{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)}, | 
					
						
							|  |  |  | 		{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)}, | 
					
						
							|  |  |  | 		{Dir: reflect.SelectSend, Chan: sub.channel}, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	buffer := list.New() | 
					
						
							| 
									
										
										
										
											2021-03-30 20:09:30 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 	for { | 
					
						
							|  |  |  | 		var chosen int | 
					
						
							|  |  |  | 		var recv reflect.Value | 
					
						
							|  |  |  | 		if buffer.Len() == 0 { | 
					
						
							|  |  |  | 			// Idle, omit send case. | 
					
						
							|  |  |  | 			chosen, recv, _ = reflect.Select(cases[:2]) | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			// Non-empty buffer, send the first queued item. | 
					
						
							|  |  |  | 			cases[2].Send = reflect.ValueOf(buffer.Front().Value) | 
					
						
							|  |  |  | 			chosen, recv, _ = reflect.Select(cases) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		switch chosen { | 
					
						
							|  |  |  | 		case 0: // <-sub.quit | 
					
						
							| 
									
										
										
										
											2021-03-30 20:09:30 +02:00
										 |  |  | 			if !recv.IsNil() { | 
					
						
							|  |  |  | 				err = recv.Interface().(error) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			if err == errUnsubscribed { | 
					
						
							|  |  |  | 				// Exiting because Unsubscribe was called, unsubscribe on server. | 
					
						
							|  |  |  | 				return true, nil | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			return false, err | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 		case 1: // <-sub.in | 
					
						
							|  |  |  | 			val, err := sub.unmarshal(recv.Interface().(json.RawMessage)) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							| 
									
										
										
										
											2019-11-20 09:06:21 +01:00
										 |  |  | 				return true, err | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 			} | 
					
						
							|  |  |  | 			if buffer.Len() == maxClientSubscriptionBuffer { | 
					
						
							| 
									
										
										
										
											2019-11-20 09:06:21 +01:00
										 |  |  | 				return true, ErrSubscriptionQueueOverflow | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 			} | 
					
						
							|  |  |  | 			buffer.PushBack(val) | 
					
						
							| 
									
										
										
										
											2021-03-30 20:09:30 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-04 13:47:34 +01:00
										 |  |  | 		case 2: // sub.channel<- | 
					
						
							|  |  |  | 			cases[2].Send = reflect.Value{} // Don't hold onto the value. | 
					
						
							|  |  |  | 			buffer.Remove(buffer.Front()) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) { | 
					
						
							|  |  |  | 	val := reflect.New(sub.etype) | 
					
						
							|  |  |  | 	err := json.Unmarshal(result, val.Interface()) | 
					
						
							|  |  |  | 	return val.Elem().Interface(), err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (sub *ClientSubscription) requestUnsubscribe() error { | 
					
						
							|  |  |  | 	var result interface{} | 
					
						
							|  |  |  | 	return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid) | 
					
						
							|  |  |  | } |