| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | // Copyright 2016 The go-ethereum Authors | 
					
						
							|  |  |  | // This file is part of the go-ethereum library. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  | // it under the terms of the GNU Lesser General Public License as published by | 
					
						
							|  |  |  | // the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  | // (at your option) any later version. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is distributed in the hope that it will be useful, | 
					
						
							|  |  |  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | 
					
						
							|  |  |  | // GNU Lesser General Public License for more details. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // You should have received a copy of the GNU Lesser General Public License | 
					
						
							|  |  |  | // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package rpc | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"errors" | 
					
						
							|  |  |  | 	"sync" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"golang.org/x/net/context" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | var ( | 
					
						
							|  |  |  | 	// ErrNotificationsUnsupported is returned when the connection doesn't support notifications | 
					
						
							|  |  |  | 	ErrNotificationsUnsupported = errors.New("notifications not supported") | 
					
						
							|  |  |  | 	// ErrNotificationNotFound is returned when the notification for the given id is not found | 
					
						
							|  |  |  | 	ErrSubscriptionNotFound = errors.New("subscription not found") | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-01-06 19:44:35 +02:00
										 |  |  | // ID defines a pseudo random number that is used to identify RPC subscriptions. | 
					
						
							| 
									
										
										
										
											2016-07-27 17:47:46 +02:00
										 |  |  | type ID string | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // a Subscription is created by a notifier and tight to that notifier. The client can use | 
					
						
							|  |  |  | // this subscription to wait for an unsubscribe request for the client, see Err(). | 
					
						
							|  |  |  | type Subscription struct { | 
					
						
							|  |  |  | 	ID  ID | 
					
						
							|  |  |  | 	err chan error // closed on unsubscribe | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Err returns a channel that is closed when the client send an unsubscribe request. | 
					
						
							|  |  |  | func (s *Subscription) Err() <-chan error { | 
					
						
							|  |  |  | 	return s.err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // notifierKey is used to store a notifier within the connection context. | 
					
						
							|  |  |  | type notifierKey struct{} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Notifier is tight to a RPC connection that supports subscriptions. | 
					
						
							|  |  |  | // Server callbacks use the notifier to send notifications. | 
					
						
							|  |  |  | type Notifier struct { | 
					
						
							|  |  |  | 	codec    ServerCodec | 
					
						
							|  |  |  | 	subMu    sync.RWMutex // guards active and inactive maps | 
					
						
							|  |  |  | 	stopped  bool | 
					
						
							|  |  |  | 	active   map[ID]*Subscription | 
					
						
							|  |  |  | 	inactive map[ID]*Subscription | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // newNotifier creates a new notifier that can be used to send subscription | 
					
						
							|  |  |  | // notifications to the client. | 
					
						
							|  |  |  | func newNotifier(codec ServerCodec) *Notifier { | 
					
						
							|  |  |  | 	return &Notifier{ | 
					
						
							|  |  |  | 		codec:    codec, | 
					
						
							|  |  |  | 		active:   make(map[ID]*Subscription), | 
					
						
							|  |  |  | 		inactive: make(map[ID]*Subscription), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // NotifierFromContext returns the Notifier value stored in ctx, if any. | 
					
						
							|  |  |  | func NotifierFromContext(ctx context.Context) (*Notifier, bool) { | 
					
						
							|  |  |  | 	n, ok := ctx.Value(notifierKey{}).(*Notifier) | 
					
						
							|  |  |  | 	return n, ok | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // CreateSubscription returns a new subscription that is coupled to the | 
					
						
							|  |  |  | // RPC connection. By default subscriptions are inactive and notifications | 
					
						
							|  |  |  | // are dropped until the subscription is marked as active. This is done | 
					
						
							|  |  |  | // by the RPC server after the subscription ID is send to the client. | 
					
						
							|  |  |  | func (n *Notifier) CreateSubscription() *Subscription { | 
					
						
							|  |  |  | 	s := &Subscription{NewID(), make(chan error)} | 
					
						
							|  |  |  | 	n.subMu.Lock() | 
					
						
							|  |  |  | 	n.inactive[s.ID] = s | 
					
						
							|  |  |  | 	n.subMu.Unlock() | 
					
						
							|  |  |  | 	return s | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Notify sends a notification to the client with the given data as payload. | 
					
						
							|  |  |  | // If an error occurs the RPC connection is closed and the error is returned. | 
					
						
							|  |  |  | func (n *Notifier) Notify(id ID, data interface{}) error { | 
					
						
							|  |  |  | 	n.subMu.RLock() | 
					
						
							|  |  |  | 	defer n.subMu.RUnlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	_, active := n.active[id] | 
					
						
							|  |  |  | 	if active { | 
					
						
							|  |  |  | 		notification := n.codec.CreateNotification(string(id), data) | 
					
						
							|  |  |  | 		if err := n.codec.Write(notification); err != nil { | 
					
						
							|  |  |  | 			n.codec.Close() | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Closed returns a channel that is closed when the RPC connection is closed. | 
					
						
							|  |  |  | func (n *Notifier) Closed() <-chan interface{} { | 
					
						
							|  |  |  | 	return n.codec.Closed() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // unsubscribe a subscription. | 
					
						
							|  |  |  | // If the subscription could not be found ErrSubscriptionNotFound is returned. | 
					
						
							|  |  |  | func (n *Notifier) unsubscribe(id ID) error { | 
					
						
							|  |  |  | 	n.subMu.Lock() | 
					
						
							|  |  |  | 	defer n.subMu.Unlock() | 
					
						
							|  |  |  | 	if s, found := n.active[id]; found { | 
					
						
							|  |  |  | 		close(s.err) | 
					
						
							|  |  |  | 		delete(n.active, id) | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return ErrSubscriptionNotFound | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // activate enables a subscription. Until a subscription is enabled all | 
					
						
							|  |  |  | // notifications are dropped. This method is called by the RPC server after | 
					
						
							|  |  |  | // the subscription ID was sent to client. This prevents notifications being | 
					
						
							|  |  |  | // send to the client before the subscription ID is send to the client. | 
					
						
							|  |  |  | func (n *Notifier) activate(id ID) { | 
					
						
							|  |  |  | 	n.subMu.Lock() | 
					
						
							|  |  |  | 	defer n.subMu.Unlock() | 
					
						
							|  |  |  | 	if sub, found := n.inactive[id]; found { | 
					
						
							|  |  |  | 		n.active[id] = sub | 
					
						
							|  |  |  | 		delete(n.inactive, id) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } |