rpc: add PeerInfo (#24255)

This replaces the sketchy and undocumented string context keys for HTTP requests
with a defined interface. Using string keys with context is discouraged because
they may clash with keys created by other packages.

We added these keys to make connection metadata available in the signer, so this
change also updates signer/core to use the new PeerInfo API.
This commit is contained in:
Felix Lange
2022-01-20 12:45:07 +01:00
committed by GitHub
parent 514ae7cfa3
commit 5bcbb2980b
11 changed files with 183 additions and 64 deletions

View File

@ -58,12 +58,6 @@ const (
maxClientSubscriptionBuffer = 20000
)
const (
httpScheme = "http"
wsScheme = "ws"
ipcScheme = "ipc"
)
// BatchElem is an element in a batch request.
type BatchElem struct {
Method string
@ -80,7 +74,7 @@ type BatchElem struct {
// Client represents a connection to an RPC server.
type Client struct {
idgen func() ID // for subscriptions
scheme string // connection type: http, ws or ipc
isHTTP bool // connection type: http, ws or ipc
services *serviceRegistry
idCounter uint32
@ -115,11 +109,9 @@ type clientConn struct {
}
func (c *Client) newClientConn(conn ServerCodec) *clientConn {
ctx := context.WithValue(context.Background(), clientContextKey{}, c)
// Http connections have already set the scheme
if !c.isHTTP() && c.scheme != "" {
ctx = context.WithValue(ctx, "scheme", c.scheme)
}
ctx := context.Background()
ctx = context.WithValue(ctx, clientContextKey{}, c)
ctx = context.WithValue(ctx, peerInfoContextKey{}, conn.peerInfo())
handler := newHandler(ctx, conn, c.idgen, c.services)
return &clientConn{conn, handler}
}
@ -145,7 +137,7 @@ func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, erro
select {
case <-ctx.Done():
// Send the timeout to dispatch so it can remove the request IDs.
if !c.isHTTP() {
if !c.isHTTP {
select {
case c.reqTimeout <- op:
case <-c.closing:
@ -212,18 +204,10 @@ func newClient(initctx context.Context, connect reconnectFunc) (*Client, error)
}
func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client {
scheme := ""
switch conn.(type) {
case *httpConn:
scheme = httpScheme
case *websocketCodec:
scheme = wsScheme
case *jsonCodec:
scheme = ipcScheme
}
_, isHTTP := conn.(*httpConn)
c := &Client{
isHTTP: isHTTP,
idgen: idgen,
scheme: scheme,
services: services,
writeConn: conn,
close: make(chan struct{}),
@ -236,7 +220,7 @@ func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *C
reqSent: make(chan error, 1),
reqTimeout: make(chan *requestOp),
}
if !c.isHTTP() {
if !isHTTP {
go c.dispatch(conn)
}
return c
@ -267,7 +251,7 @@ func (c *Client) SupportedModules() (map[string]string, error) {
// Close closes the client, aborting any in-flight requests.
func (c *Client) Close() {
if c.isHTTP() {
if c.isHTTP {
return
}
select {
@ -281,7 +265,7 @@ func (c *Client) Close() {
// This method only works for clients using HTTP, it doesn't have
// any effect for clients using another transport.
func (c *Client) SetHeader(key, value string) {
if !c.isHTTP() {
if !c.isHTTP {
return
}
conn := c.writeConn.(*httpConn)
@ -315,7 +299,7 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str
}
op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
if c.isHTTP() {
if c.isHTTP {
err = c.sendHTTP(ctx, op, msg)
} else {
err = c.send(ctx, op, msg)
@ -378,7 +362,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
}
var err error
if c.isHTTP() {
if c.isHTTP {
err = c.sendBatchHTTP(ctx, op, msgs)
} else {
err = c.send(ctx, op, msgs)
@ -417,7 +401,7 @@ func (c *Client) Notify(ctx context.Context, method string, args ...interface{})
}
msg.ID = nil
if c.isHTTP() {
if c.isHTTP {
return c.sendHTTP(ctx, op, msg)
}
return c.send(ctx, op, msg)
@ -450,12 +434,12 @@ func (c *Client) Subscribe(ctx context.Context, namespace string, channel interf
// Check type of channel first.
chanVal := reflect.ValueOf(channel)
if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
panic("first argument to Subscribe must be a writable channel")
panic(fmt.Sprintf("channel argument of Subscribe has type %T, need writable channel", channel))
}
if chanVal.IsNil() {
panic("channel given to Subscribe must not be nil")
}
if c.isHTTP() {
if c.isHTTP {
return nil, ErrNotificationsUnsupported
}
@ -509,8 +493,8 @@ func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error
}
func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error {
// The previous write failed. Try to establish a new connection.
if c.writeConn == nil {
// The previous write failed. Try to establish a new connection.
if err := c.reconnect(ctx); err != nil {
return err
}
@ -657,7 +641,3 @@ func (c *Client) read(codec ServerCodec) {
c.readOp <- readOp{msgs, batch}
}
}
func (c *Client) isHTTP() bool {
return c.scheme == httpScheme
}