rpc, p2p/simulations: use github.com/gorilla/websocket (#20289)
* rpc: improve codec abstraction rpc.ServerCodec is an opaque interface. There was only one way to get a codec using existing APIs: rpc.NewJSONCodec. This change exports newCodec (as NewFuncCodec) and NewJSONCodec (as NewCodec). It also makes all codec methods non-public to avoid showing internals in godoc. While here, remove codec options in tests because they are not supported anymore. * p2p/simulations: use github.com/gorilla/websocket This package was the last remaining user of golang.org/x/net/websocket. Migrating to the new library wasn't straightforward because it is no longer possible to treat WebSocket connections as a net.Conn. * vendor: delete golang.org/x/net/websocket * rpc: fix godoc comments and run gofmt
This commit is contained in:
committed by
Péter Szilágyi
parent
9e71f55bfa
commit
7c4a4eb58a
@ -117,7 +117,7 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn {
|
||||
|
||||
func (cc *clientConn) close(err error, inflightReq *requestOp) {
|
||||
cc.handler.close(err, inflightReq)
|
||||
cc.codec.Close()
|
||||
cc.codec.close()
|
||||
}
|
||||
|
||||
type readOp struct {
|
||||
@ -484,7 +484,7 @@ func (c *Client) write(ctx context.Context, msg interface{}) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err := c.writeConn.Write(ctx, msg)
|
||||
err := c.writeConn.writeJSON(ctx, msg)
|
||||
if err != nil {
|
||||
c.writeConn = nil
|
||||
}
|
||||
@ -511,7 +511,7 @@ func (c *Client) reconnect(ctx context.Context) error {
|
||||
c.writeConn = newconn
|
||||
return nil
|
||||
case <-c.didClose:
|
||||
newconn.Close()
|
||||
newconn.close()
|
||||
return ErrClientQuit
|
||||
}
|
||||
}
|
||||
@ -558,7 +558,7 @@ func (c *Client) dispatch(codec ServerCodec) {
|
||||
|
||||
// Reconnect:
|
||||
case newcodec := <-c.reconnected:
|
||||
log.Debug("RPC client reconnected", "reading", reading, "conn", newcodec.RemoteAddr())
|
||||
log.Debug("RPC client reconnected", "reading", reading, "conn", newcodec.remoteAddr())
|
||||
if reading {
|
||||
// Wait for the previous read loop to exit. This is a rare case which
|
||||
// happens if this loop isn't notified in time after the connection breaks.
|
||||
@ -612,9 +612,9 @@ func (c *Client) drainRead() {
|
||||
// read decodes RPC messages from a codec, feeding them into dispatch.
|
||||
func (c *Client) read(codec ServerCodec) {
|
||||
for {
|
||||
msgs, batch, err := codec.Read()
|
||||
msgs, batch, err := codec.readBatch()
|
||||
if _, ok := err.(*json.SyntaxError); ok {
|
||||
codec.Write(context.Background(), errorMessage(&parseError{err.Error()}))
|
||||
codec.writeJSON(context.Background(), errorMessage(&parseError{err.Error()}))
|
||||
}
|
||||
if err != nil {
|
||||
c.readErr <- err
|
||||
|
@ -85,8 +85,8 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *
|
||||
serverSubs: make(map[ID]*Subscription),
|
||||
log: log.Root(),
|
||||
}
|
||||
if conn.RemoteAddr() != "" {
|
||||
h.log = h.log.New("conn", conn.RemoteAddr())
|
||||
if conn.remoteAddr() != "" {
|
||||
h.log = h.log.New("conn", conn.remoteAddr())
|
||||
}
|
||||
h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe))
|
||||
return h
|
||||
@ -97,7 +97,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
|
||||
// Emit error response for empty batches:
|
||||
if len(msgs) == 0 {
|
||||
h.startCallProc(func(cp *callProc) {
|
||||
h.conn.Write(cp.ctx, errorMessage(&invalidRequestError{"empty batch"}))
|
||||
h.conn.writeJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"}))
|
||||
})
|
||||
return
|
||||
}
|
||||
@ -122,7 +122,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
|
||||
}
|
||||
h.addSubscriptions(cp.notifiers)
|
||||
if len(answers) > 0 {
|
||||
h.conn.Write(cp.ctx, answers)
|
||||
h.conn.writeJSON(cp.ctx, answers)
|
||||
}
|
||||
for _, n := range cp.notifiers {
|
||||
n.activate()
|
||||
@ -139,7 +139,7 @@ func (h *handler) handleMsg(msg *jsonrpcMessage) {
|
||||
answer := h.handleCallMsg(cp, msg)
|
||||
h.addSubscriptions(cp.notifiers)
|
||||
if answer != nil {
|
||||
h.conn.Write(cp.ctx, answer)
|
||||
h.conn.writeJSON(cp.ctx, answer)
|
||||
}
|
||||
for _, n := range cp.notifiers {
|
||||
n.activate()
|
||||
|
26
rpc/http.go
26
rpc/http.go
@ -47,29 +47,29 @@ type httpConn struct {
|
||||
client *http.Client
|
||||
req *http.Request
|
||||
closeOnce sync.Once
|
||||
closed chan interface{}
|
||||
closeCh chan interface{}
|
||||
}
|
||||
|
||||
// httpConn is treated specially by Client.
|
||||
func (hc *httpConn) Write(context.Context, interface{}) error {
|
||||
panic("Write called on httpConn")
|
||||
func (hc *httpConn) writeJSON(context.Context, interface{}) error {
|
||||
panic("writeJSON called on httpConn")
|
||||
}
|
||||
|
||||
func (hc *httpConn) RemoteAddr() string {
|
||||
func (hc *httpConn) remoteAddr() string {
|
||||
return hc.req.URL.String()
|
||||
}
|
||||
|
||||
func (hc *httpConn) Read() ([]*jsonrpcMessage, bool, error) {
|
||||
<-hc.closed
|
||||
func (hc *httpConn) readBatch() ([]*jsonrpcMessage, bool, error) {
|
||||
<-hc.closeCh
|
||||
return nil, false, io.EOF
|
||||
}
|
||||
|
||||
func (hc *httpConn) Close() {
|
||||
hc.closeOnce.Do(func() { close(hc.closed) })
|
||||
func (hc *httpConn) close() {
|
||||
hc.closeOnce.Do(func() { close(hc.closeCh) })
|
||||
}
|
||||
|
||||
func (hc *httpConn) Closed() <-chan interface{} {
|
||||
return hc.closed
|
||||
func (hc *httpConn) closed() <-chan interface{} {
|
||||
return hc.closeCh
|
||||
}
|
||||
|
||||
// HTTPTimeouts represents the configuration params for the HTTP RPC server.
|
||||
@ -116,7 +116,7 @@ func DialHTTPWithClient(endpoint string, client *http.Client) (*Client, error) {
|
||||
|
||||
initctx := context.Background()
|
||||
return newClient(initctx, func(context.Context) (ServerCodec, error) {
|
||||
return &httpConn{client: client, req: req, closed: make(chan interface{})}, nil
|
||||
return &httpConn{client: client, req: req, closeCh: make(chan interface{})}, nil
|
||||
})
|
||||
}
|
||||
|
||||
@ -195,7 +195,7 @@ type httpServerConn struct {
|
||||
func newHTTPServerConn(r *http.Request, w http.ResponseWriter) ServerCodec {
|
||||
body := io.LimitReader(r.Body, maxRequestContentLength)
|
||||
conn := &httpServerConn{Reader: body, Writer: w, r: r}
|
||||
return NewJSONCodec(conn)
|
||||
return NewCodec(conn)
|
||||
}
|
||||
|
||||
// Close does nothing and always returns nil.
|
||||
@ -266,7 +266,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
w.Header().Set("content-type", contentType)
|
||||
codec := newHTTPServerConn(r, w)
|
||||
defer codec.Close()
|
||||
defer codec.close()
|
||||
s.serveSingleRequest(ctx, codec)
|
||||
}
|
||||
|
||||
|
@ -26,8 +26,8 @@ func DialInProc(handler *Server) *Client {
|
||||
initctx := context.Background()
|
||||
c, _ := newClient(initctx, func(context.Context) (ServerCodec, error) {
|
||||
p1, p2 := net.Pipe()
|
||||
go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions)
|
||||
return NewJSONCodec(p2), nil
|
||||
go handler.ServeCodec(NewCodec(p1), 0)
|
||||
return NewCodec(p2), nil
|
||||
})
|
||||
return c
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ func (s *Server) ServeListener(l net.Listener) error {
|
||||
return err
|
||||
}
|
||||
log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr())
|
||||
go s.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions)
|
||||
go s.ServeCodec(NewCodec(conn), 0)
|
||||
}
|
||||
}
|
||||
|
||||
@ -51,6 +51,6 @@ func DialIPC(ctx context.Context, endpoint string) (*Client, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewJSONCodec(conn), err
|
||||
return NewCodec(conn), err
|
||||
})
|
||||
}
|
||||
|
56
rpc/json.go
56
rpc/json.go
@ -164,43 +164,45 @@ func (c connWithRemoteAddr) RemoteAddr() string { return c.addr }
|
||||
// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has
|
||||
// support for parsing arguments and serializing (result) objects.
|
||||
type jsonCodec struct {
|
||||
remoteAddr string
|
||||
closer sync.Once // close closed channel once
|
||||
closed chan interface{} // closed on Close
|
||||
decode func(v interface{}) error // decoder to allow multiple transports
|
||||
encMu sync.Mutex // guards the encoder
|
||||
encode func(v interface{}) error // encoder to allow multiple transports
|
||||
conn deadlineCloser
|
||||
remote string
|
||||
closer sync.Once // close closed channel once
|
||||
closeCh chan interface{} // closed on Close
|
||||
decode func(v interface{}) error // decoder to allow multiple transports
|
||||
encMu sync.Mutex // guards the encoder
|
||||
encode func(v interface{}) error // encoder to allow multiple transports
|
||||
conn deadlineCloser
|
||||
}
|
||||
|
||||
func newCodec(conn deadlineCloser, encode, decode func(v interface{}) error) ServerCodec {
|
||||
// NewFuncCodec creates a codec which uses the given functions to read and write. If conn
|
||||
// implements ConnRemoteAddr, log messages will use it to include the remote address of
|
||||
// the connection.
|
||||
func NewFuncCodec(conn deadlineCloser, encode, decode func(v interface{}) error) ServerCodec {
|
||||
codec := &jsonCodec{
|
||||
closed: make(chan interface{}),
|
||||
encode: encode,
|
||||
decode: decode,
|
||||
conn: conn,
|
||||
closeCh: make(chan interface{}),
|
||||
encode: encode,
|
||||
decode: decode,
|
||||
conn: conn,
|
||||
}
|
||||
if ra, ok := conn.(ConnRemoteAddr); ok {
|
||||
codec.remoteAddr = ra.RemoteAddr()
|
||||
codec.remote = ra.RemoteAddr()
|
||||
}
|
||||
return codec
|
||||
}
|
||||
|
||||
// NewJSONCodec creates a codec that reads from the given connection. If conn implements
|
||||
// ConnRemoteAddr, log messages will use it to include the remote address of the
|
||||
// connection.
|
||||
func NewJSONCodec(conn Conn) ServerCodec {
|
||||
// NewCodec creates a codec on the given connection. If conn implements ConnRemoteAddr, log
|
||||
// messages will use it to include the remote address of the connection.
|
||||
func NewCodec(conn Conn) ServerCodec {
|
||||
enc := json.NewEncoder(conn)
|
||||
dec := json.NewDecoder(conn)
|
||||
dec.UseNumber()
|
||||
return newCodec(conn, enc.Encode, dec.Decode)
|
||||
return NewFuncCodec(conn, enc.Encode, dec.Decode)
|
||||
}
|
||||
|
||||
func (c *jsonCodec) RemoteAddr() string {
|
||||
return c.remoteAddr
|
||||
func (c *jsonCodec) remoteAddr() string {
|
||||
return c.remote
|
||||
}
|
||||
|
||||
func (c *jsonCodec) Read() (msg []*jsonrpcMessage, batch bool, err error) {
|
||||
func (c *jsonCodec) readBatch() (msg []*jsonrpcMessage, batch bool, err error) {
|
||||
// Decode the next JSON object in the input stream.
|
||||
// This verifies basic syntax, etc.
|
||||
var rawmsg json.RawMessage
|
||||
@ -211,8 +213,7 @@ func (c *jsonCodec) Read() (msg []*jsonrpcMessage, batch bool, err error) {
|
||||
return msg, batch, nil
|
||||
}
|
||||
|
||||
// Write sends a message to client.
|
||||
func (c *jsonCodec) Write(ctx context.Context, v interface{}) error {
|
||||
func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}) error {
|
||||
c.encMu.Lock()
|
||||
defer c.encMu.Unlock()
|
||||
|
||||
@ -224,17 +225,16 @@ func (c *jsonCodec) Write(ctx context.Context, v interface{}) error {
|
||||
return c.encode(v)
|
||||
}
|
||||
|
||||
// Close the underlying connection
|
||||
func (c *jsonCodec) Close() {
|
||||
func (c *jsonCodec) close() {
|
||||
c.closer.Do(func() {
|
||||
close(c.closed)
|
||||
close(c.closeCh)
|
||||
c.conn.Close()
|
||||
})
|
||||
}
|
||||
|
||||
// Closed returns a channel which will be closed when Close is called
|
||||
func (c *jsonCodec) Closed() <-chan interface{} {
|
||||
return c.closed
|
||||
func (c *jsonCodec) closed() <-chan interface{} {
|
||||
return c.closeCh
|
||||
}
|
||||
|
||||
// parseMessage parses raw bytes as a (batch of) JSON-RPC message(s). There are no error
|
||||
|
@ -72,7 +72,7 @@ func (s *Server) RegisterName(name string, receiver interface{}) error {
|
||||
//
|
||||
// Note that codec options are no longer supported.
|
||||
func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
|
||||
defer codec.Close()
|
||||
defer codec.close()
|
||||
|
||||
// Don't serve if server is stopped.
|
||||
if atomic.LoadInt32(&s.run) == 0 {
|
||||
@ -84,7 +84,7 @@ func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
|
||||
defer s.codecs.Remove(codec)
|
||||
|
||||
c := initClient(codec, s.idgen, &s.services)
|
||||
<-codec.Closed()
|
||||
<-codec.closed()
|
||||
c.Close()
|
||||
}
|
||||
|
||||
@ -101,10 +101,10 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
|
||||
h.allowSubscribe = false
|
||||
defer h.close(io.EOF, nil)
|
||||
|
||||
reqs, batch, err := codec.Read()
|
||||
reqs, batch, err := codec.readBatch()
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
codec.Write(ctx, errorMessage(&invalidMessageError{"parse error"}))
|
||||
codec.writeJSON(ctx, errorMessage(&invalidMessageError{"parse error"}))
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -122,7 +122,7 @@ func (s *Server) Stop() {
|
||||
if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
|
||||
log.Debug("RPC server shutting down")
|
||||
s.codecs.Each(func(c interface{}) bool {
|
||||
c.(ServerCodec).Close()
|
||||
c.(ServerCodec).close()
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ func runTestScript(t *testing.T, file string) {
|
||||
|
||||
clientConn, serverConn := net.Pipe()
|
||||
defer clientConn.Close()
|
||||
go server.ServeCodec(NewJSONCodec(serverConn), OptionMethodInvocation|OptionSubscriptions)
|
||||
go server.ServeCodec(NewCodec(serverConn), 0)
|
||||
readbuf := bufio.NewReader(clientConn)
|
||||
for _, line := range strings.Split(string(content), "\n") {
|
||||
line = strings.TrimSpace(line)
|
||||
|
@ -33,7 +33,7 @@ func DialStdIO(ctx context.Context) (*Client, error) {
|
||||
// DialIO creates a client which uses the given IO channels
|
||||
func DialIO(ctx context.Context, in io.Reader, out io.Writer) (*Client, error) {
|
||||
return newClient(ctx, func(_ context.Context) (ServerCodec, error) {
|
||||
return NewJSONCodec(stdioConn{
|
||||
return NewCodec(stdioConn{
|
||||
in: in,
|
||||
out: out,
|
||||
}), nil
|
||||
|
@ -141,7 +141,7 @@ func (n *Notifier) Notify(id ID, data interface{}) error {
|
||||
// Closed returns a channel that is closed when the RPC connection is closed.
|
||||
// Deprecated: use subscription error channel
|
||||
func (n *Notifier) Closed() <-chan interface{} {
|
||||
return n.h.conn.Closed()
|
||||
return n.h.conn.closed()
|
||||
}
|
||||
|
||||
// takeSubscription returns the subscription (if one has been created). No subscription can
|
||||
@ -172,7 +172,7 @@ func (n *Notifier) activate() error {
|
||||
func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
|
||||
params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data})
|
||||
ctx := context.Background()
|
||||
return n.h.conn.Write(ctx, &jsonrpcMessage{
|
||||
return n.h.conn.writeJSON(ctx, &jsonrpcMessage{
|
||||
Version: vsn,
|
||||
Method: n.namespace + notificationMethodSuffix,
|
||||
Params: params,
|
||||
|
@ -68,7 +68,7 @@ func TestSubscriptions(t *testing.T) {
|
||||
t.Fatalf("unable to register test service %v", err)
|
||||
}
|
||||
}
|
||||
go server.ServeCodec(NewJSONCodec(serverConn), OptionMethodInvocation|OptionSubscriptions)
|
||||
go server.ServeCodec(NewCodec(serverConn), 0)
|
||||
defer server.Stop()
|
||||
|
||||
// wait for message and write them to the given channels
|
||||
@ -130,7 +130,7 @@ func TestServerUnsubscribe(t *testing.T) {
|
||||
service := ¬ificationTestService{unsubscribed: make(chan string)}
|
||||
server.RegisterName("nftest2", service)
|
||||
p1, p2 := net.Pipe()
|
||||
go server.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions)
|
||||
go server.ServeCodec(NewCodec(p1), 0)
|
||||
|
||||
p2.SetDeadline(time.Now().Add(10 * time.Second))
|
||||
|
||||
|
10
rpc/types.go
10
rpc/types.go
@ -45,19 +45,19 @@ type Error interface {
|
||||
// a RPC session. Implementations must be go-routine safe since the codec can be called in
|
||||
// multiple go-routines concurrently.
|
||||
type ServerCodec interface {
|
||||
Read() (msgs []*jsonrpcMessage, isBatch bool, err error)
|
||||
Close()
|
||||
readBatch() (msgs []*jsonrpcMessage, isBatch bool, err error)
|
||||
close()
|
||||
jsonWriter
|
||||
}
|
||||
|
||||
// jsonWriter can write JSON messages to its underlying connection.
|
||||
// Implementations must be safe for concurrent use.
|
||||
type jsonWriter interface {
|
||||
Write(context.Context, interface{}) error
|
||||
writeJSON(context.Context, interface{}) error
|
||||
// Closed returns a channel which is closed when the connection is closed.
|
||||
Closed() <-chan interface{}
|
||||
closed() <-chan interface{}
|
||||
// RemoteAddr returns the peer address of the connection.
|
||||
RemoteAddr() string
|
||||
remoteAddr() string
|
||||
}
|
||||
|
||||
type BlockNumber int64
|
||||
|
@ -63,7 +63,7 @@ func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler {
|
||||
return
|
||||
}
|
||||
codec := newWebsocketCodec(conn)
|
||||
s.ServeCodec(codec, OptionMethodInvocation|OptionSubscriptions)
|
||||
s.ServeCodec(codec, 0)
|
||||
})
|
||||
}
|
||||
|
||||
@ -171,5 +171,5 @@ func wsClientHeaders(endpoint, origin string) (string, http.Header, error) {
|
||||
|
||||
func newWebsocketCodec(conn *websocket.Conn) ServerCodec {
|
||||
conn.SetReadLimit(maxRequestContentLength)
|
||||
return newCodec(conn, conn.WriteJSON, conn.ReadJSON)
|
||||
return NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON)
|
||||
}
|
||||
|
Reference in New Issue
Block a user