Merge pull request #2278 from fjl/rpc-inproc-pipe
rpc: simplify inproc client
This commit is contained in:
		@@ -16,96 +16,46 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
package rpc
 | 
					package rpc
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import "encoding/json"
 | 
					import (
 | 
				
			||||||
 | 
						"encoding/json"
 | 
				
			||||||
// NewInProcRPCClient creates an in-process buffer stream attachment to a given
 | 
						"io"
 | 
				
			||||||
// RPC server.
 | 
						"net"
 | 
				
			||||||
func NewInProcRPCClient(handler *Server) Client {
 | 
					)
 | 
				
			||||||
	buffer := &inprocBuffer{
 | 
					 | 
				
			||||||
		requests:  make(chan []byte, 16),
 | 
					 | 
				
			||||||
		responses: make(chan []byte, 16),
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	client := &inProcClient{
 | 
					 | 
				
			||||||
		server: handler,
 | 
					 | 
				
			||||||
		buffer: buffer,
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	go handler.ServeCodec(NewJSONCodec(client.buffer))
 | 
					 | 
				
			||||||
	return client
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
// inProcClient is an in-process buffer stream attached to an RPC server.
 | 
					// inProcClient is an in-process buffer stream attached to an RPC server.
 | 
				
			||||||
type inProcClient struct {
 | 
					type inProcClient struct {
 | 
				
			||||||
	server *Server
 | 
						server *Server
 | 
				
			||||||
	buffer *inprocBuffer
 | 
						cl     io.Closer
 | 
				
			||||||
 | 
						enc    *json.Encoder
 | 
				
			||||||
 | 
						dec    *json.Decoder
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Close tears down the request channel of the in-proc client.
 | 
					// Close tears down the request channel of the in-proc client.
 | 
				
			||||||
func (c *inProcClient) Close() {
 | 
					func (c *inProcClient) Close() {
 | 
				
			||||||
	c.buffer.Close()
 | 
						c.cl.Close()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewInProcRPCClient creates an in-process buffer stream attachment to a given
 | 
				
			||||||
 | 
					// RPC server.
 | 
				
			||||||
 | 
					func NewInProcRPCClient(handler *Server) Client {
 | 
				
			||||||
 | 
						p1, p2 := net.Pipe()
 | 
				
			||||||
 | 
						go handler.ServeCodec(NewJSONCodec(p1))
 | 
				
			||||||
 | 
						return &inProcClient{handler, p2, json.NewEncoder(p2), json.NewDecoder(p2)}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Send marshals a message into a json format and injects in into the client
 | 
					// Send marshals a message into a json format and injects in into the client
 | 
				
			||||||
// request channel.
 | 
					// request channel.
 | 
				
			||||||
func (c *inProcClient) Send(msg interface{}) error {
 | 
					func (c *inProcClient) Send(msg interface{}) error {
 | 
				
			||||||
	d, err := json.Marshal(msg)
 | 
						return c.enc.Encode(msg)
 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	c.buffer.requests <- d
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Recv reads a message from the response channel and tries to parse it into the
 | 
					// Recv reads a message from the response channel and tries to parse it into the
 | 
				
			||||||
// given msg interface.
 | 
					// given msg interface.
 | 
				
			||||||
func (c *inProcClient) Recv(msg interface{}) error {
 | 
					func (c *inProcClient) Recv(msg interface{}) error {
 | 
				
			||||||
	data := <-c.buffer.responses
 | 
						return c.dec.Decode(msg)
 | 
				
			||||||
	return json.Unmarshal(data, &msg)
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Returns the collection of modules the RPC server offers.
 | 
					// Returns the collection of modules the RPC server offers.
 | 
				
			||||||
func (c *inProcClient) SupportedModules() (map[string]string, error) {
 | 
					func (c *inProcClient) SupportedModules() (map[string]string, error) {
 | 
				
			||||||
	return SupportedModules(c)
 | 
						return SupportedModules(c)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					 | 
				
			||||||
// inprocBuffer represents the connection between the RPC server and console
 | 
					 | 
				
			||||||
type inprocBuffer struct {
 | 
					 | 
				
			||||||
	readBuf   []byte      // store remaining request bytes after a partial read
 | 
					 | 
				
			||||||
	requests  chan []byte // list with raw serialized requests
 | 
					 | 
				
			||||||
	responses chan []byte // list with raw serialized responses
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Read will read the next request in json format.
 | 
					 | 
				
			||||||
func (b *inprocBuffer) Read(p []byte) (int, error) {
 | 
					 | 
				
			||||||
	// last read didn't read entire request, return remaining bytes
 | 
					 | 
				
			||||||
	if len(b.readBuf) > 0 {
 | 
					 | 
				
			||||||
		n := copy(p, b.readBuf)
 | 
					 | 
				
			||||||
		if n < len(b.readBuf) {
 | 
					 | 
				
			||||||
			b.readBuf = b.readBuf[:n]
 | 
					 | 
				
			||||||
		} else {
 | 
					 | 
				
			||||||
			b.readBuf = b.readBuf[:0]
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return n, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	// read next request
 | 
					 | 
				
			||||||
	req := <-b.requests
 | 
					 | 
				
			||||||
	n := copy(p, req)
 | 
					 | 
				
			||||||
	if n < len(req) {
 | 
					 | 
				
			||||||
		// inprocBuffer too small, store remaining chunk for next read
 | 
					 | 
				
			||||||
		b.readBuf = req[n:]
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return n, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Write sends the given buffer to the backend.
 | 
					 | 
				
			||||||
func (b *inprocBuffer) Write(p []byte) (n int, err error) {
 | 
					 | 
				
			||||||
	b.responses <- p
 | 
					 | 
				
			||||||
	return len(p), nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Close cleans up obtained resources.
 | 
					 | 
				
			||||||
func (b *inprocBuffer) Close() error {
 | 
					 | 
				
			||||||
	close(b.requests)
 | 
					 | 
				
			||||||
	close(b.responses)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user