cmd, node, rpc: move websockets into node, break singleton

This commit is contained in:
Péter Szilágyi
2016-02-05 15:08:48 +02:00
parent a13bc9d7a1
commit 7486904b92
11 changed files with 194 additions and 292 deletions

View File

@ -17,13 +17,11 @@
package rpc
import (
"errors"
"fmt"
"net"
"net/http"
"sync"
"os"
"strings"
"sync"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
@ -31,12 +29,6 @@ import (
"gopkg.in/fatih/set.v0"
)
var (
wsServerMu sync.Mutex
wsRPCServer *Server
wsListener net.Listener
)
// wsReaderWriterCloser reads and write payloads from and to a websocket connection.
type wsReaderWriterCloser struct {
c *websocket.Conn
@ -57,14 +49,6 @@ func (rw *wsReaderWriterCloser) Close() error {
return rw.c.Close()
}
// wsHandler accepts a websocket connection and handles incoming RPC requests.
// Will return when the websocket connection is closed, either by the client or
// server.
func wsHandler(conn *websocket.Conn) {
rwc := &wsReaderWriterCloser{conn}
wsRPCServer.ServeCodec(NewJSONCodec(rwc))
}
// wsHandshakeValidator returns a handler that verifies the origin during the
// websocket upgrade process. When a '*' is specified as an allowed origins all
// connections are accepted.
@ -103,54 +87,16 @@ func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http
return f
}
// StartWS will start a websocket RPC server on the given address and port.
func StartWS(address string, port int, corsdomains []string, apis []API) error {
wsServerMu.Lock()
defer wsServerMu.Unlock()
if wsRPCServer != nil {
return fmt.Errorf("WS RPC interface already started on %s", wsListener.Addr())
// NewWSServer creates a new websocket RPC server around an API provider.
func NewWSServer(cors string, handler *Server) *http.Server {
return &http.Server{
Handler: websocket.Server{
Handshake: wsHandshakeValidator(strings.Split(cors, ",")),
Handler: func(conn *websocket.Conn) {
handler.ServeCodec(NewJSONCodec(&wsReaderWriterCloser{conn}))
},
},
}
rpcServer := NewServer()
for _, api := range apis {
if err := rpcServer.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
}
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", address, port))
if err != nil {
return err
}
wsServer := websocket.Server{Handshake: wsHandshakeValidator(corsdomains), Handler: wsHandler}
wsHTTPServer := http.Server{Handler: wsServer}
go wsHTTPServer.Serve(listener)
wsListener = listener
wsRPCServer = rpcServer
return nil
}
// StopWS stops the running websocket RPC server.
func StopWS() error {
wsServerMu.Lock()
defer wsServerMu.Unlock()
if wsRPCServer == nil {
return errors.New("HTTP RPC interface not started")
}
wsListener.Close()
wsRPCServer.Stop()
wsRPCServer = nil
wsListener = nil
return nil
}
// wsClient represents a RPC client that communicates over websockets with a