cmd, common, node, rpc: move HTTP RPC into node, drop singletone aspect
This commit is contained in:
		
							
								
								
									
										194
									
								
								node/node.go
									
									
									
									
									
								
							
							
						
						
									
										194
									
								
								node/node.go
									
									
									
									
									
								
							| @@ -55,10 +55,17 @@ type Node struct { | ||||
| 	serviceFuncs []ServiceConstructor     // Service constructors (in dependency order) | ||||
| 	services     map[reflect.Type]Service // Currently running services | ||||
|  | ||||
| 	rpcAPIs     []rpc.API    // List of APIs currently provided by the node | ||||
| 	ipcEndpoint string       // IPC endpoint to listen at (empty = IPC disabled) | ||||
| 	ipcListener net.Listener // IPC RPC listener socket to serve API requests | ||||
| 	ipcHandler  *rpc.Server  // IPC RPC request handler to process the API requests | ||||
|  | ||||
| 	httpEndpoint  string       // HTTP endpoint (interface + port) to listen at (empty = HTTP disabled) | ||||
| 	httpWhitelist []string     // HTTP RPC modules to allow through this endpoint | ||||
| 	httpCors      string       // HTTP RPC Cross-Origin Resource Sharing header | ||||
| 	httpListener  net.Listener // HTTP RPC listener socket to server API requests | ||||
| 	httpHandler   *rpc.Server  // HTTP RPC request handler to process the API requests | ||||
|  | ||||
| 	stop chan struct{} // Channel to wait for termination notifications | ||||
| 	lock sync.RWMutex | ||||
| } | ||||
| @@ -93,9 +100,12 @@ func New(conf *Config) (*Node, error) { | ||||
| 			MaxPeers:        conf.MaxPeers, | ||||
| 			MaxPendingPeers: conf.MaxPendingPeers, | ||||
| 		}, | ||||
| 		serviceFuncs: []ServiceConstructor{}, | ||||
| 		ipcEndpoint:  conf.IpcEndpoint(), | ||||
| 		eventmux:     new(event.TypeMux), | ||||
| 		serviceFuncs:  []ServiceConstructor{}, | ||||
| 		ipcEndpoint:   conf.IpcEndpoint(), | ||||
| 		httpEndpoint:  conf.HttpEndpoint(), | ||||
| 		httpWhitelist: conf.HttpModules, | ||||
| 		httpCors:      conf.HttpCors, | ||||
| 		eventmux:      new(event.TypeMux), | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| @@ -188,58 +198,146 @@ func (n *Node) Start() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // startRPC initializes and starts the IPC RPC endpoints. | ||||
| // startRPC is a helper method to start all the various RPC endpoint during node | ||||
| // startup. It's not meant to be called at any time afterwards as it makes certain | ||||
| // assumptions about the state of the node. | ||||
| func (n *Node) startRPC(services map[reflect.Type]Service) error { | ||||
| 	// Gather and register all the APIs exposed by the services | ||||
| 	// Gather all the possible APIs to surface | ||||
| 	apis := n.apis() | ||||
| 	for _, service := range services { | ||||
| 		apis = append(apis, service.APIs()...) | ||||
| 	} | ||||
| 	ipcHandler := rpc.NewServer() | ||||
| 	for _, api := range apis { | ||||
| 		if err := ipcHandler.RegisterName(api.Namespace, api.Service); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		glog.V(logger.Debug).Infof("Register %T under namespace '%s'", api.Service, api.Namespace) | ||||
| 	// Start the various API endpoints, terminating all in case of errors | ||||
| 	if err := n.startIPC(apis); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	// All APIs registered, start the IPC and HTTP listeners | ||||
| 	var ( | ||||
| 		ipcListener net.Listener | ||||
| 		err         error | ||||
| 	) | ||||
| 	if n.ipcEndpoint != "" { | ||||
| 		if ipcListener, err = rpc.CreateIPCListener(n.ipcEndpoint); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		go func() { | ||||
| 			glog.V(logger.Info).Infof("IPC endpoint opened: %s", n.ipcEndpoint) | ||||
| 			defer glog.V(logger.Info).Infof("IPC endpoint closed: %s", n.ipcEndpoint) | ||||
| 	if err := n.startHTTP(n.httpEndpoint, apis, n.httpWhitelist, n.httpCors); err != nil { | ||||
| 		n.stopIPC() | ||||
| 		return err | ||||
| 	} | ||||
| 	// All API endpoints started successfully | ||||
| 	n.rpcAPIs = apis | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| 			for { | ||||
| 				conn, err := ipcListener.Accept() | ||||
| 				if err != nil { | ||||
| 					// Terminate if the listener was closed | ||||
| 					n.lock.RLock() | ||||
| 					closed := n.ipcListener == nil | ||||
| 					n.lock.RUnlock() | ||||
| 					if closed { | ||||
| 						return | ||||
| 					} | ||||
| 					// Not closed, just some error; report and continue | ||||
| 					glog.V(logger.Error).Infof("IPC accept failed: %v", err) | ||||
| 					continue | ||||
| 				} | ||||
| 				go ipcHandler.ServeCodec(rpc.NewJSONCodec(conn)) | ||||
| 			} | ||||
| 		}() | ||||
| // startIPC initializes and starts the IPC RPC endpoint. | ||||
| func (n *Node) startIPC(apis []rpc.API) error { | ||||
| 	// Short circuit if the IPC endpoint isn't being exposed | ||||
| 	if n.ipcEndpoint == "" { | ||||
| 		return nil | ||||
| 	} | ||||
| 	// Register all the APIs exposed by the services | ||||
| 	handler := rpc.NewServer() | ||||
| 	for _, api := range apis { | ||||
| 		if err := handler.RegisterName(api.Namespace, api.Service); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		glog.V(logger.Debug).Infof("IPC registered %T under '%s'", api.Service, api.Namespace) | ||||
| 	} | ||||
| 	// All APIs registered, start the IPC listener | ||||
| 	var ( | ||||
| 		listener net.Listener | ||||
| 		err      error | ||||
| 	) | ||||
| 	if listener, err = rpc.CreateIPCListener(n.ipcEndpoint); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	go func() { | ||||
| 		glog.V(logger.Info).Infof("IPC endpoint opened: %s", n.ipcEndpoint) | ||||
|  | ||||
| 		for { | ||||
| 			conn, err := listener.Accept() | ||||
| 			if err != nil { | ||||
| 				// Terminate if the listener was closed | ||||
| 				n.lock.RLock() | ||||
| 				closed := n.ipcListener == nil | ||||
| 				n.lock.RUnlock() | ||||
| 				if closed { | ||||
| 					return | ||||
| 				} | ||||
| 				// Not closed, just some error; report and continue | ||||
| 				glog.V(logger.Error).Infof("IPC accept failed: %v", err) | ||||
| 				continue | ||||
| 			} | ||||
| 			go handler.ServeCodec(rpc.NewJSONCodec(conn)) | ||||
| 		} | ||||
| 	}() | ||||
| 	// All listeners booted successfully | ||||
| 	n.ipcListener = ipcListener | ||||
| 	n.ipcHandler = ipcHandler | ||||
| 	n.ipcListener = listener | ||||
| 	n.ipcHandler = handler | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // stopIPC terminates the IPC RPC endpoint. | ||||
| func (n *Node) stopIPC() { | ||||
| 	if n.ipcListener != nil { | ||||
| 		n.ipcListener.Close() | ||||
| 		n.ipcListener = nil | ||||
|  | ||||
| 		glog.V(logger.Info).Infof("IPC endpoint closed: %s", n.ipcEndpoint) | ||||
| 	} | ||||
| 	if n.ipcHandler != nil { | ||||
| 		n.ipcHandler.Stop() | ||||
| 		n.ipcHandler = nil | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // startHTTP initializes and starts the HTTP RPC endpoint. | ||||
| func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors string) error { | ||||
| 	// Short circuit if the IPC endpoint isn't being exposed | ||||
| 	if endpoint == "" { | ||||
| 		return nil | ||||
| 	} | ||||
| 	// Generate the whitelist based on the allowed modules | ||||
| 	whitelist := make(map[string]bool) | ||||
| 	for _, module := range modules { | ||||
| 		whitelist[module] = true | ||||
| 	} | ||||
| 	// Register all the APIs exposed by the services | ||||
| 	handler := rpc.NewServer() | ||||
| 	for _, api := range apis { | ||||
| 		if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { | ||||
| 			if err := handler.RegisterName(api.Namespace, api.Service); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 			glog.V(logger.Debug).Infof("HTTP registered %T under '%s'", api.Service, api.Namespace) | ||||
| 		} | ||||
| 	} | ||||
| 	// All APIs registered, start the HTTP listener | ||||
| 	var ( | ||||
| 		listener net.Listener | ||||
| 		err      error | ||||
| 	) | ||||
| 	if listener, err = net.Listen("tcp", endpoint); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	go rpc.NewHTTPServer(cors, handler).Serve(listener) | ||||
| 	glog.V(logger.Info).Infof("HTTP endpoint opened: http://%s", endpoint) | ||||
|  | ||||
| 	// All listeners booted successfully | ||||
| 	n.httpEndpoint = endpoint | ||||
| 	n.httpListener = listener | ||||
| 	n.httpHandler = handler | ||||
| 	n.httpCors = cors | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // stopHTTP terminates the HTTP RPC endpoint. | ||||
| func (n *Node) stopHTTP() { | ||||
| 	if n.httpListener != nil { | ||||
| 		n.httpListener.Close() | ||||
| 		n.httpListener = nil | ||||
|  | ||||
| 		glog.V(logger.Info).Infof("HTTP endpoint closed: http://%s", n.httpEndpoint) | ||||
| 	} | ||||
| 	if n.httpHandler != nil { | ||||
| 		n.httpHandler.Stop() | ||||
| 		n.httpHandler = nil | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Stop terminates a running node along with all it's services. In the node was | ||||
| // not started, an error is returned. | ||||
| func (n *Node) Stop() error { | ||||
| @@ -251,14 +349,10 @@ func (n *Node) Stop() error { | ||||
| 		return ErrNodeStopped | ||||
| 	} | ||||
| 	// Otherwise terminate the API, all services and the P2P server too | ||||
| 	if n.ipcListener != nil { | ||||
| 		n.ipcListener.Close() | ||||
| 		n.ipcListener = nil | ||||
| 	} | ||||
| 	if n.ipcHandler != nil { | ||||
| 		n.ipcHandler.Stop() | ||||
| 		n.ipcHandler = nil | ||||
| 	} | ||||
| 	n.stopIPC() | ||||
| 	n.stopHTTP() | ||||
| 	n.rpcAPIs = nil | ||||
|  | ||||
| 	failure := &StopError{ | ||||
| 		Services: make(map[reflect.Type]error), | ||||
| 	} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user