rpc: implement full bi-directional communication (#18471)
New APIs added: client.RegisterName(namespace, service) // makes service available to server client.Notify(ctx, method, args...) // sends a notification ClientFromContext(ctx) // to get a client in handler method This is essentially a rewrite of the server-side code. JSON-RPC processing code is now the same on both server and client side. Many minor issues were fixed in the process and there is a new test suite for JSON-RPC spec compliance (and non-compliance in some cases). List of behavior changes: - Method handlers are now called with a per-request context instead of a per-connection context. The context is canceled right after the method returns. - Subscription error channels are always closed when the connection ends. There is no need to also wait on the Notifier's Closed channel to detect whether the subscription has ended. - Client now omits "params" instead of sending "params": null when there are no arguments to a call. The previous behavior was not compliant with the spec. The server still accepts "params": null. - Floating point numbers are allowed as "id". The spec doesn't allow them, but we handle request "id" as json.RawMessage and guarantee that the same number will be sent back. - Logging is improved significantly. There is now a message at DEBUG level for each RPC call served.
This commit is contained in:
@ -17,9 +17,19 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"container/list"
|
||||
"context"
|
||||
crand "crypto/rand"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -29,10 +39,147 @@ var (
|
||||
ErrSubscriptionNotFound = errors.New("subscription not found")
|
||||
)
|
||||
|
||||
var globalGen = randomIDGenerator()
|
||||
|
||||
// ID defines a pseudo random number that is used to identify RPC subscriptions.
|
||||
type ID string
|
||||
|
||||
// a Subscription is created by a notifier and tight to that notifier. The client can use
|
||||
// NewID returns a new, random ID.
|
||||
func NewID() ID {
|
||||
return globalGen()
|
||||
}
|
||||
|
||||
// randomIDGenerator returns a function generates a random IDs.
|
||||
func randomIDGenerator() func() ID {
|
||||
seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader))
|
||||
if err != nil {
|
||||
seed = int64(time.Now().Nanosecond())
|
||||
}
|
||||
var (
|
||||
mu sync.Mutex
|
||||
rng = rand.New(rand.NewSource(seed))
|
||||
)
|
||||
return func() ID {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
id := make([]byte, 16)
|
||||
rng.Read(id)
|
||||
return encodeID(id)
|
||||
}
|
||||
}
|
||||
|
||||
func encodeID(b []byte) ID {
|
||||
id := hex.EncodeToString(b)
|
||||
id = strings.TrimLeft(id, "0")
|
||||
if id == "" {
|
||||
id = "0" // ID's are RPC quantities, no leading zero's and 0 is 0x0.
|
||||
}
|
||||
return ID("0x" + id)
|
||||
}
|
||||
|
||||
type notifierKey struct{}
|
||||
|
||||
// NotifierFromContext returns the Notifier value stored in ctx, if any.
|
||||
func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
|
||||
n, ok := ctx.Value(notifierKey{}).(*Notifier)
|
||||
return n, ok
|
||||
}
|
||||
|
||||
// Notifier is tied to a RPC connection that supports subscriptions.
|
||||
// Server callbacks use the notifier to send notifications.
|
||||
type Notifier struct {
|
||||
h *handler
|
||||
namespace string
|
||||
|
||||
mu sync.Mutex
|
||||
sub *Subscription
|
||||
buffer []json.RawMessage
|
||||
callReturned bool
|
||||
activated bool
|
||||
}
|
||||
|
||||
// CreateSubscription returns a new subscription that is coupled to the
|
||||
// RPC connection. By default subscriptions are inactive and notifications
|
||||
// are dropped until the subscription is marked as active. This is done
|
||||
// by the RPC server after the subscription ID is send to the client.
|
||||
func (n *Notifier) CreateSubscription() *Subscription {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
|
||||
if n.sub != nil {
|
||||
panic("can't create multiple subscriptions with Notifier")
|
||||
} else if n.callReturned {
|
||||
panic("can't create subscription after subscribe call has returned")
|
||||
}
|
||||
n.sub = &Subscription{ID: n.h.idgen(), namespace: n.namespace, err: make(chan error, 1)}
|
||||
return n.sub
|
||||
}
|
||||
|
||||
// Notify sends a notification to the client with the given data as payload.
|
||||
// If an error occurs the RPC connection is closed and the error is returned.
|
||||
func (n *Notifier) Notify(id ID, data interface{}) error {
|
||||
enc, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
|
||||
if n.sub == nil {
|
||||
panic("can't Notify before subscription is created")
|
||||
} else if n.sub.ID != id {
|
||||
panic("Notify with wrong ID")
|
||||
}
|
||||
if n.activated {
|
||||
return n.send(n.sub, enc)
|
||||
}
|
||||
n.buffer = append(n.buffer, enc)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Closed returns a channel that is closed when the RPC connection is closed.
|
||||
// Deprecated: use subscription error channel
|
||||
func (n *Notifier) Closed() <-chan interface{} {
|
||||
return n.h.conn.Closed()
|
||||
}
|
||||
|
||||
// takeSubscription returns the subscription (if one has been created). No subscription can
|
||||
// be created after this call.
|
||||
func (n *Notifier) takeSubscription() *Subscription {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
n.callReturned = true
|
||||
return n.sub
|
||||
}
|
||||
|
||||
// acticate is called after the subscription ID was sent to client. Notifications are
|
||||
// buffered before activation. This prevents notifications being sent to the client before
|
||||
// the subscription ID is sent to the client.
|
||||
func (n *Notifier) activate() error {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
|
||||
for _, data := range n.buffer {
|
||||
if err := n.send(n.sub, data); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
n.activated = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
|
||||
params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data})
|
||||
ctx := context.Background()
|
||||
return n.h.conn.Write(ctx, &jsonrpcMessage{
|
||||
Version: vsn,
|
||||
Method: n.namespace + notificationMethodSuffix,
|
||||
Params: params,
|
||||
})
|
||||
}
|
||||
|
||||
// A Subscription is created by a notifier and tight to that notifier. The client can use
|
||||
// this subscription to wait for an unsubscribe request for the client, see Err().
|
||||
type Subscription struct {
|
||||
ID ID
|
||||
@ -45,105 +192,136 @@ func (s *Subscription) Err() <-chan error {
|
||||
return s.err
|
||||
}
|
||||
|
||||
// notifierKey is used to store a notifier within the connection context.
|
||||
type notifierKey struct{}
|
||||
|
||||
// Notifier is tight to a RPC connection that supports subscriptions.
|
||||
// Server callbacks use the notifier to send notifications.
|
||||
type Notifier struct {
|
||||
codec ServerCodec
|
||||
subMu sync.Mutex
|
||||
active map[ID]*Subscription
|
||||
inactive map[ID]*Subscription
|
||||
buffer map[ID][]interface{} // unsent notifications of inactive subscriptions
|
||||
// MarshalJSON marshals a subscription as its ID.
|
||||
func (s *Subscription) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(s.ID)
|
||||
}
|
||||
|
||||
// newNotifier creates a new notifier that can be used to send subscription
|
||||
// notifications to the client.
|
||||
func newNotifier(codec ServerCodec) *Notifier {
|
||||
return &Notifier{
|
||||
codec: codec,
|
||||
active: make(map[ID]*Subscription),
|
||||
inactive: make(map[ID]*Subscription),
|
||||
buffer: make(map[ID][]interface{}),
|
||||
// ClientSubscription is a subscription established through the Client's Subscribe or
|
||||
// EthSubscribe methods.
|
||||
type ClientSubscription struct {
|
||||
client *Client
|
||||
etype reflect.Type
|
||||
channel reflect.Value
|
||||
namespace string
|
||||
subid string
|
||||
in chan json.RawMessage
|
||||
|
||||
quitOnce sync.Once // ensures quit is closed once
|
||||
quit chan struct{} // quit is closed when the subscription exits
|
||||
errOnce sync.Once // ensures err is closed once
|
||||
err chan error
|
||||
}
|
||||
|
||||
func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
|
||||
sub := &ClientSubscription{
|
||||
client: c,
|
||||
namespace: namespace,
|
||||
etype: channel.Type().Elem(),
|
||||
channel: channel,
|
||||
quit: make(chan struct{}),
|
||||
err: make(chan error, 1),
|
||||
in: make(chan json.RawMessage),
|
||||
}
|
||||
return sub
|
||||
}
|
||||
|
||||
// NotifierFromContext returns the Notifier value stored in ctx, if any.
|
||||
func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
|
||||
n, ok := ctx.Value(notifierKey{}).(*Notifier)
|
||||
return n, ok
|
||||
// Err returns the subscription error channel. The intended use of Err is to schedule
|
||||
// resubscription when the client connection is closed unexpectedly.
|
||||
//
|
||||
// The error channel receives a value when the subscription has ended due
|
||||
// to an error. The received error is nil if Close has been called
|
||||
// on the underlying client and no other error has occurred.
|
||||
//
|
||||
// The error channel is closed when Unsubscribe is called on the subscription.
|
||||
func (sub *ClientSubscription) Err() <-chan error {
|
||||
return sub.err
|
||||
}
|
||||
|
||||
// CreateSubscription returns a new subscription that is coupled to the
|
||||
// RPC connection. By default subscriptions are inactive and notifications
|
||||
// are dropped until the subscription is marked as active. This is done
|
||||
// by the RPC server after the subscription ID is send to the client.
|
||||
func (n *Notifier) CreateSubscription() *Subscription {
|
||||
s := &Subscription{ID: NewID(), err: make(chan error)}
|
||||
n.subMu.Lock()
|
||||
n.inactive[s.ID] = s
|
||||
n.subMu.Unlock()
|
||||
return s
|
||||
// Unsubscribe unsubscribes the notification and closes the error channel.
|
||||
// It can safely be called more than once.
|
||||
func (sub *ClientSubscription) Unsubscribe() {
|
||||
sub.quitWithError(nil, true)
|
||||
sub.errOnce.Do(func() { close(sub.err) })
|
||||
}
|
||||
|
||||
// Notify sends a notification to the client with the given data as payload.
|
||||
// If an error occurs the RPC connection is closed and the error is returned.
|
||||
func (n *Notifier) Notify(id ID, data interface{}) error {
|
||||
n.subMu.Lock()
|
||||
defer n.subMu.Unlock()
|
||||
|
||||
if sub, active := n.active[id]; active {
|
||||
n.send(sub, data)
|
||||
} else {
|
||||
n.buffer[id] = append(n.buffer[id], data)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Notifier) send(sub *Subscription, data interface{}) error {
|
||||
notification := n.codec.CreateNotification(string(sub.ID), sub.namespace, data)
|
||||
err := n.codec.Write(notification)
|
||||
if err != nil {
|
||||
n.codec.Close()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Closed returns a channel that is closed when the RPC connection is closed.
|
||||
func (n *Notifier) Closed() <-chan interface{} {
|
||||
return n.codec.Closed()
|
||||
}
|
||||
|
||||
// unsubscribe a subscription.
|
||||
// If the subscription could not be found ErrSubscriptionNotFound is returned.
|
||||
func (n *Notifier) unsubscribe(id ID) error {
|
||||
n.subMu.Lock()
|
||||
defer n.subMu.Unlock()
|
||||
if s, found := n.active[id]; found {
|
||||
close(s.err)
|
||||
delete(n.active, id)
|
||||
return nil
|
||||
}
|
||||
return ErrSubscriptionNotFound
|
||||
}
|
||||
|
||||
// activate enables a subscription. Until a subscription is enabled all
|
||||
// notifications are dropped. This method is called by the RPC server after
|
||||
// the subscription ID was sent to client. This prevents notifications being
|
||||
// send to the client before the subscription ID is send to the client.
|
||||
func (n *Notifier) activate(id ID, namespace string) {
|
||||
n.subMu.Lock()
|
||||
defer n.subMu.Unlock()
|
||||
|
||||
if sub, found := n.inactive[id]; found {
|
||||
sub.namespace = namespace
|
||||
n.active[id] = sub
|
||||
delete(n.inactive, id)
|
||||
// Send buffered notifications.
|
||||
for _, data := range n.buffer[id] {
|
||||
n.send(sub, data)
|
||||
func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) {
|
||||
sub.quitOnce.Do(func() {
|
||||
// The dispatch loop won't be able to execute the unsubscribe call
|
||||
// if it is blocked on deliver. Close sub.quit first because it
|
||||
// unblocks deliver.
|
||||
close(sub.quit)
|
||||
if unsubscribeServer {
|
||||
sub.requestUnsubscribe()
|
||||
}
|
||||
delete(n.buffer, id)
|
||||
if err != nil {
|
||||
if err == ErrClientQuit {
|
||||
err = nil // Adhere to subscription semantics.
|
||||
}
|
||||
sub.err <- err
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
|
||||
select {
|
||||
case sub.in <- result:
|
||||
return true
|
||||
case <-sub.quit:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *ClientSubscription) start() {
|
||||
sub.quitWithError(sub.forward())
|
||||
}
|
||||
|
||||
func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) {
|
||||
cases := []reflect.SelectCase{
|
||||
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
|
||||
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
|
||||
{Dir: reflect.SelectSend, Chan: sub.channel},
|
||||
}
|
||||
buffer := list.New()
|
||||
defer buffer.Init()
|
||||
for {
|
||||
var chosen int
|
||||
var recv reflect.Value
|
||||
if buffer.Len() == 0 {
|
||||
// Idle, omit send case.
|
||||
chosen, recv, _ = reflect.Select(cases[:2])
|
||||
} else {
|
||||
// Non-empty buffer, send the first queued item.
|
||||
cases[2].Send = reflect.ValueOf(buffer.Front().Value)
|
||||
chosen, recv, _ = reflect.Select(cases)
|
||||
}
|
||||
|
||||
switch chosen {
|
||||
case 0: // <-sub.quit
|
||||
return nil, false
|
||||
case 1: // <-sub.in
|
||||
val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
|
||||
if err != nil {
|
||||
return err, true
|
||||
}
|
||||
if buffer.Len() == maxClientSubscriptionBuffer {
|
||||
return ErrSubscriptionQueueOverflow, true
|
||||
}
|
||||
buffer.PushBack(val)
|
||||
case 2: // sub.channel<-
|
||||
cases[2].Send = reflect.Value{} // Don't hold onto the value.
|
||||
buffer.Remove(buffer.Front())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) {
|
||||
val := reflect.New(sub.etype)
|
||||
err := json.Unmarshal(result, val.Interface())
|
||||
return val.Elem().Interface(), err
|
||||
}
|
||||
|
||||
func (sub *ClientSubscription) requestUnsubscribe() error {
|
||||
var result interface{}
|
||||
return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid)
|
||||
}
|
||||
|
Reference in New Issue
Block a user