rpc/http: improve request handling

This commit is contained in:
Bas van Kervel
2016-02-24 11:19:00 +01:00
parent 6d3cd03a03
commit a7bae3b2a6
15 changed files with 903 additions and 236 deletions

View File

@ -117,14 +117,12 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
return nil
}
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the
// response back using the given codec. It will block until the codec is closed.
//
// This server will:
// 1. allow for asynchronous and parallel request execution
// 2. supports notifications (pub/sub)
// 3. supports request batches
func (s *Server) ServeCodec(codec ServerCodec) {
// serveRequest will reads requests from the codec, calls the RPC callback and
// writes the response to the given codec.
// If singleShot is true it will process a single request, otherwise it will handle
// requests until the codec returns an error when reading a request (in most cases
// an EOF). It executes requests in parallel when singleShot is false.
func (s *Server) serveRequest(codec ServerCodec, singleShot bool) error {
defer func() {
if err := recover(); err != nil {
const size = 64 << 10
@ -132,7 +130,12 @@ func (s *Server) ServeCodec(codec ServerCodec) {
buf = buf[:runtime.Stack(buf, false)]
glog.Errorln(string(buf))
}
codec.Close()
s.codecsMu.Lock()
s.codecs.Remove(codec)
s.codecsMu.Unlock()
return
}()
ctx, cancel := context.WithCancel(context.Background())
@ -141,20 +144,22 @@ func (s *Server) ServeCodec(codec ServerCodec) {
s.codecsMu.Lock()
if atomic.LoadInt32(&s.run) != 1 { // server stopped
s.codecsMu.Unlock()
return
return &shutdownError{}
}
s.codecs.Add(codec)
s.codecsMu.Unlock()
// test if the server is ordered to stop
for atomic.LoadInt32(&s.run) == 1 {
reqs, batch, err := s.readRequest(codec)
if err != nil {
glog.V(logger.Debug).Infof("%v\n", err)
codec.Write(codec.CreateErrorResponse(nil, err))
break
return nil
}
// check if server is ordered to shutdown and return an error
// telling the client that his request failed.
if atomic.LoadInt32(&s.run) != 1 {
err = &shutdownError{}
if batch {
@ -166,15 +171,42 @@ func (s *Server) ServeCodec(codec ServerCodec) {
} else {
codec.Write(codec.CreateErrorResponse(&reqs[0].id, err))
}
break
return nil
}
if batch {
if singleShot && batch {
s.execBatch(ctx, codec, reqs)
return nil
} else if singleShot && !batch {
s.exec(ctx, codec, reqs[0])
return nil
} else if !singleShot && batch {
go s.execBatch(ctx, codec, reqs)
} else {
go s.exec(ctx, codec, reqs[0])
}
}
return nil
}
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the
// response back using the given codec. It will block until the codec is closed or the server is
// stopped. In either case the codec is closed.
//
// This server will:
// 1. allow for asynchronous and parallel request execution
// 2. supports notifications (pub/sub)
// 3. supports request batches
func (s *Server) ServeCodec(codec ServerCodec) {
defer codec.Close()
s.serveRequest(codec, false)
}
// ServeSingleRequest reads and processes a single RPC request from the given codec. It will not
// close the codec unless a non-recoverable error has occurred.
func (s *Server) ServeSingleRequest(codec ServerCodec) {
s.serveRequest(codec, true)
}
// Stop will stop reading new requests, wait for stopPendingRequestTimeout to allow pending requests to finish,