cmd, common, core, eth, node, rpc, tests, whisper, xeth: use protocol stacks
This commit is contained in:
124
node/node.go
124
node/node.go
@ -21,6 +21,7 @@ import (
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
@ -40,14 +41,17 @@ var (
|
||||
|
||||
// Node represents a P2P node into which arbitrary services might be registered.
|
||||
type Node struct {
|
||||
datadir string // Path to the currently used data directory
|
||||
config *p2p.Server // Configuration of the underlying P2P networking layer
|
||||
stack map[string]ServiceConstructor // Protocol stack registered into this node
|
||||
emux *event.TypeMux // Event multiplexer used between the services of a stack
|
||||
datadir string // Path to the currently used data directory
|
||||
eventmux *event.TypeMux // Event multiplexer used between the services of a stack
|
||||
|
||||
running *p2p.Server // Currently running P2P networking layer
|
||||
services map[string]Service // Currently running services
|
||||
serverConfig *p2p.Server // Configuration of the underlying P2P networking layer
|
||||
server *p2p.Server // Currently running P2P networking layer
|
||||
|
||||
serviceIndex map[string]ServiceConstructor // Set of services currently registered in the node
|
||||
serviceOrder []string // Service construction order to handle dependencies
|
||||
services map[string]Service // Currently running services
|
||||
|
||||
stop chan struct{} // Channel to wait for termination notifications
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
@ -66,7 +70,7 @@ func New(conf *Config) (*Node, error) {
|
||||
}
|
||||
return &Node{
|
||||
datadir: conf.DataDir,
|
||||
config: &p2p.Server{
|
||||
serverConfig: &p2p.Server{
|
||||
PrivateKey: conf.NodeKey(),
|
||||
Name: conf.Name,
|
||||
Discovery: !conf.NoDiscovery,
|
||||
@ -81,8 +85,9 @@ func New(conf *Config) (*Node, error) {
|
||||
MaxPeers: conf.MaxPeers,
|
||||
MaxPendingPeers: conf.MaxPendingPeers,
|
||||
},
|
||||
stack: make(map[string]ServiceConstructor),
|
||||
emux: new(event.TypeMux),
|
||||
serviceIndex: make(map[string]ServiceConstructor),
|
||||
serviceOrder: []string{},
|
||||
eventmux: new(event.TypeMux),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -92,14 +97,15 @@ func (n *Node) Register(id string, constructor ServiceConstructor) error {
|
||||
defer n.lock.Unlock()
|
||||
|
||||
// Short circuit if the node is running or if the id is taken
|
||||
if n.running != nil {
|
||||
if n.server != nil {
|
||||
return ErrNodeRunning
|
||||
}
|
||||
if _, ok := n.stack[id]; ok {
|
||||
if _, ok := n.serviceIndex[id]; ok {
|
||||
return ErrServiceRegistered
|
||||
}
|
||||
// Otherwise register the service and return
|
||||
n.stack[id] = constructor
|
||||
n.serviceOrder = append(n.serviceOrder, id)
|
||||
n.serviceIndex[id] = constructor
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -111,14 +117,20 @@ func (n *Node) Unregister(id string) error {
|
||||
defer n.lock.Unlock()
|
||||
|
||||
// Short circuit if the node is running, or if the service is unknown
|
||||
if n.running != nil {
|
||||
if n.server != nil {
|
||||
return ErrNodeRunning
|
||||
}
|
||||
if _, ok := n.stack[id]; !ok {
|
||||
if _, ok := n.serviceIndex[id]; !ok {
|
||||
return ErrServiceUnknown
|
||||
}
|
||||
// Otherwise drop the service and return
|
||||
delete(n.stack, id)
|
||||
delete(n.serviceIndex, id)
|
||||
for i, service := range n.serviceOrder {
|
||||
if service == id {
|
||||
n.serviceOrder = append(n.serviceOrder[:i], n.serviceOrder[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -128,19 +140,27 @@ func (n *Node) Start() error {
|
||||
defer n.lock.Unlock()
|
||||
|
||||
// Short circuit if the node's already running
|
||||
if n.running != nil {
|
||||
if n.server != nil {
|
||||
return ErrNodeRunning
|
||||
}
|
||||
// Otherwise copy and specialize the P2P configuration
|
||||
running := new(p2p.Server)
|
||||
*running = *n.config
|
||||
*running = *n.serverConfig
|
||||
|
||||
ctx := &ServiceContext{
|
||||
dataDir: n.datadir,
|
||||
EventMux: n.emux,
|
||||
}
|
||||
services := make(map[string]Service)
|
||||
for id, constructor := range n.stack {
|
||||
for _, id := range n.serviceOrder {
|
||||
constructor := n.serviceIndex[id]
|
||||
|
||||
// Create a new context for the particular service
|
||||
ctx := &ServiceContext{
|
||||
datadir: n.datadir,
|
||||
services: make(map[string]Service),
|
||||
EventMux: n.eventmux,
|
||||
}
|
||||
for id, s := range services { // copy needed for threaded access
|
||||
ctx.services[id] = s
|
||||
}
|
||||
// Construct and save the service
|
||||
service, err := constructor(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -161,10 +181,12 @@ func (n *Node) Start() error {
|
||||
started := []string{}
|
||||
for id, service := range services {
|
||||
// Start the next service, stopping all previous upon failure
|
||||
if err := service.Start(); err != nil {
|
||||
if err := service.Start(running); err != nil {
|
||||
for _, id := range started {
|
||||
services[id].Stop()
|
||||
}
|
||||
running.Stop()
|
||||
|
||||
return err
|
||||
}
|
||||
// Mark the service started for potential cleanup
|
||||
@ -172,7 +194,8 @@ func (n *Node) Start() error {
|
||||
}
|
||||
// Finish initializing the startup
|
||||
n.services = services
|
||||
n.running = running
|
||||
n.server = running
|
||||
n.stop = make(chan struct{})
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -184,7 +207,7 @@ func (n *Node) Stop() error {
|
||||
defer n.lock.Unlock()
|
||||
|
||||
// Short circuit if the node's not running
|
||||
if n.running == nil {
|
||||
if n.server == nil {
|
||||
return ErrNodeStopped
|
||||
}
|
||||
// Otherwise terminate all the services and the P2P server too
|
||||
@ -196,10 +219,11 @@ func (n *Node) Stop() error {
|
||||
failure.Services[id] = err
|
||||
}
|
||||
}
|
||||
n.running.Stop()
|
||||
n.server.Stop()
|
||||
|
||||
n.services = nil
|
||||
n.running = nil
|
||||
n.server = nil
|
||||
close(n.stop)
|
||||
|
||||
if len(failure.Services) > 0 {
|
||||
return failure
|
||||
@ -207,6 +231,19 @@ func (n *Node) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Wait blocks the thread until the node is stopped. If the node is not running
|
||||
// at the time of invocation, the method immediately returns.
|
||||
func (n *Node) Wait() {
|
||||
n.lock.RLock()
|
||||
if n.server == nil {
|
||||
return
|
||||
}
|
||||
stop := n.stop
|
||||
n.lock.RUnlock()
|
||||
|
||||
<-stop
|
||||
}
|
||||
|
||||
// Restart terminates a running node and boots up a new one in its place. If the
|
||||
// node isn't running, an error is returned.
|
||||
func (n *Node) Restart() error {
|
||||
@ -226,20 +263,45 @@ func (n *Node) Server() *p2p.Server {
|
||||
n.lock.RLock()
|
||||
defer n.lock.RUnlock()
|
||||
|
||||
return n.running
|
||||
return n.server
|
||||
}
|
||||
|
||||
// Service retrieves a currently running services registered under a given id.
|
||||
// Service retrieves a currently running service registered under a given id.
|
||||
func (n *Node) Service(id string) Service {
|
||||
n.lock.RLock()
|
||||
defer n.lock.RUnlock()
|
||||
|
||||
if n.services == nil {
|
||||
// Short circuit if the node's not running
|
||||
if n.server == nil {
|
||||
return nil
|
||||
}
|
||||
return n.services[id]
|
||||
}
|
||||
|
||||
// SingletonService retrieves a currently running service using a specific type
|
||||
// implementing the Service interface. This is a utility function for scenarios
|
||||
// where it is known that only one instance of a given service type is running,
|
||||
// allowing to access services without needing to know their specific id with
|
||||
// which they were registered. Note, this method uses reflection, so do not run
|
||||
// in a tight loop.
|
||||
func (n *Node) SingletonService(service interface{}) (string, error) {
|
||||
n.lock.RLock()
|
||||
defer n.lock.RUnlock()
|
||||
|
||||
// Short circuit if the node's not running
|
||||
if n.server == nil {
|
||||
return "", ErrServiceUnknown
|
||||
}
|
||||
// Otherwise try to find the service to return
|
||||
for id, running := range n.services {
|
||||
if reflect.TypeOf(running) == reflect.ValueOf(service).Elem().Type() {
|
||||
reflect.ValueOf(service).Elem().Set(reflect.ValueOf(running))
|
||||
return id, nil
|
||||
}
|
||||
}
|
||||
return "", ErrServiceUnknown
|
||||
}
|
||||
|
||||
// DataDir retrieves the current datadir used by the protocol stack.
|
||||
func (n *Node) DataDir() string {
|
||||
return n.datadir
|
||||
@ -248,5 +310,5 @@ func (n *Node) DataDir() string {
|
||||
// EventMux retrieves the event multiplexer used by all the network services in
|
||||
// the current protocol stack.
|
||||
func (n *Node) EventMux() *event.TypeMux {
|
||||
return n.emux
|
||||
return n.eventmux
|
||||
}
|
||||
|
@ -35,8 +35,8 @@ import (
|
||||
type SampleService struct{}
|
||||
|
||||
func (s *SampleService) Protocols() []p2p.Protocol { return nil }
|
||||
func (s *SampleService) Start() error { fmt.Println("Sample service starting..."); return nil }
|
||||
func (s *SampleService) Stop() error { fmt.Println("Sample service stopping..."); return nil }
|
||||
func (s *SampleService) Start(*p2p.Server) error { fmt.Println("Service starting..."); return nil }
|
||||
func (s *SampleService) Stop() error { fmt.Println("Service stopping..."); return nil }
|
||||
|
||||
func ExampleUsage() {
|
||||
// Create a network node to run protocols with the default values. The below list
|
||||
@ -80,8 +80,8 @@ func ExampleUsage() {
|
||||
log.Fatalf("Failed to stop the protocol stack: %v", err)
|
||||
}
|
||||
// Output:
|
||||
// Sample service starting...
|
||||
// Sample service stopping...
|
||||
// Sample service starting...
|
||||
// Sample service stopping...
|
||||
// Service starting...
|
||||
// Service stopping...
|
||||
// Service starting...
|
||||
// Service stopping...
|
||||
}
|
||||
|
@ -102,7 +102,7 @@ func TestNodeUsedDataDir(t *testing.T) {
|
||||
type NoopService struct{}
|
||||
|
||||
func (s *NoopService) Protocols() []p2p.Protocol { return nil }
|
||||
func (s *NoopService) Start() error { return nil }
|
||||
func (s *NoopService) Start(*p2p.Server) error { return nil }
|
||||
func (s *NoopService) Stop() error { return nil }
|
||||
|
||||
func NewNoopService(*ServiceContext) (Service, error) { return new(NoopService), nil }
|
||||
@ -148,7 +148,7 @@ type InstrumentedService struct {
|
||||
stop error
|
||||
|
||||
protocolsHook func()
|
||||
startHook func()
|
||||
startHook func(*p2p.Server)
|
||||
stopHook func()
|
||||
}
|
||||
|
||||
@ -159,9 +159,9 @@ func (s *InstrumentedService) Protocols() []p2p.Protocol {
|
||||
return s.protocols
|
||||
}
|
||||
|
||||
func (s *InstrumentedService) Start() error {
|
||||
func (s *InstrumentedService) Start(server *p2p.Server) error {
|
||||
if s.startHook != nil {
|
||||
s.startHook()
|
||||
s.startHook(server)
|
||||
}
|
||||
return s.start
|
||||
}
|
||||
@ -189,7 +189,7 @@ func TestServiceLifeCycle(t *testing.T) {
|
||||
id := id // Closure for the constructor
|
||||
constructor := func(*ServiceContext) (Service, error) {
|
||||
return &InstrumentedService{
|
||||
startHook: func() { started[id] = true },
|
||||
startHook: func(*p2p.Server) { started[id] = true },
|
||||
stopHook: func() { stopped[id] = true },
|
||||
}, nil
|
||||
}
|
||||
@ -241,7 +241,7 @@ func TestServiceRestarts(t *testing.T) {
|
||||
running = false
|
||||
|
||||
return &InstrumentedService{
|
||||
startHook: func() {
|
||||
startHook: func(*p2p.Server) {
|
||||
if running {
|
||||
panic("already running")
|
||||
}
|
||||
@ -288,7 +288,7 @@ func TestServiceConstructionAbortion(t *testing.T) {
|
||||
id := id // Closure for the constructor
|
||||
constructor := func(*ServiceContext) (Service, error) {
|
||||
return &InstrumentedService{
|
||||
startHook: func() { started[id] = true },
|
||||
startHook: func(*p2p.Server) { started[id] = true },
|
||||
}, nil
|
||||
}
|
||||
if err := stack.Register(id, constructor); err != nil {
|
||||
@ -334,7 +334,7 @@ func TestServiceStartupAbortion(t *testing.T) {
|
||||
id := id // Closure for the constructor
|
||||
constructor := func(*ServiceContext) (Service, error) {
|
||||
return &InstrumentedService{
|
||||
startHook: func() { started[id] = true },
|
||||
startHook: func(*p2p.Server) { started[id] = true },
|
||||
stopHook: func() { stopped[id] = true },
|
||||
}, nil
|
||||
}
|
||||
@ -384,7 +384,7 @@ func TestServiceTerminationGuarantee(t *testing.T) {
|
||||
id := id // Closure for the constructor
|
||||
constructor := func(*ServiceContext) (Service, error) {
|
||||
return &InstrumentedService{
|
||||
startHook: func() { started[id] = true },
|
||||
startHook: func(*p2p.Server) { started[id] = true },
|
||||
stopHook: func() { stopped[id] = true },
|
||||
}, nil
|
||||
}
|
||||
@ -438,6 +438,42 @@ func TestServiceTerminationGuarantee(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestSingletonServiceRetrieval tests that singleton services can be retrieved.
|
||||
func TestSingletonServiceRetrieval(t *testing.T) {
|
||||
// Create a simple stack and register two service types
|
||||
stack, err := New(testNodeConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create protocol stack: %v", err)
|
||||
}
|
||||
if err := stack.Register("noop", func(*ServiceContext) (Service, error) { return new(NoopService), nil }); err != nil {
|
||||
t.Fatalf("noop service registration failed: %v", err)
|
||||
}
|
||||
if err := stack.Register("instrumented", func(*ServiceContext) (Service, error) { return new(InstrumentedService), nil }); err != nil {
|
||||
t.Fatalf("instrumented service registration failed: %v", err)
|
||||
}
|
||||
// Make sure none of the services can be retrieved until started
|
||||
var noopServ *NoopService
|
||||
if id, err := stack.SingletonService(&noopServ); id != "" || err != ErrServiceUnknown {
|
||||
t.Fatalf("noop service retrieval mismatch: have %v/%v, want %v/%v", id, err, "", ErrServiceUnknown)
|
||||
}
|
||||
var instServ *InstrumentedService
|
||||
if id, err := stack.SingletonService(&instServ); id != "" || err != ErrServiceUnknown {
|
||||
t.Fatalf("instrumented service retrieval mismatch: have %v/%v, want %v/%v", id, err, "", ErrServiceUnknown)
|
||||
}
|
||||
// Start the stack and ensure everything is retrievable now
|
||||
if err := stack.Start(); err != nil {
|
||||
t.Fatalf("failed to start stack: %v", err)
|
||||
}
|
||||
defer stack.Stop()
|
||||
|
||||
if id, err := stack.SingletonService(&noopServ); id != "noop" || err != nil {
|
||||
t.Fatalf("noop service retrieval mismatch: have %v/%v, want %v/%v", id, err, "noop", nil)
|
||||
}
|
||||
if id, err := stack.SingletonService(&instServ); id != "instrumented" || err != nil {
|
||||
t.Fatalf("instrumented service retrieval mismatch: have %v/%v, want %v/%v", id, err, "instrumented", nil)
|
||||
}
|
||||
}
|
||||
|
||||
// Tests that all protocols defined by individual services get launched.
|
||||
func TestProtocolGather(t *testing.T) {
|
||||
stack, err := New(testNodeConfig)
|
||||
|
@ -18,6 +18,7 @@ package node
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
@ -28,18 +29,39 @@ import (
|
||||
// the protocol stack, that is passed to all constructors to be optionally used;
|
||||
// as well as utility methods to operate on the service environment.
|
||||
type ServiceContext struct {
|
||||
dataDir string // Data directory for protocol persistence
|
||||
EventMux *event.TypeMux // Event multiplexer used for decoupled notifications
|
||||
datadir string // Data directory for protocol persistence
|
||||
services map[string]Service // Index of the already constructed services
|
||||
EventMux *event.TypeMux // Event multiplexer used for decoupled notifications
|
||||
}
|
||||
|
||||
// Database opens an existing database with the given name (or creates one if no
|
||||
// previous can be found) from within the node's data directory. If the node is
|
||||
// an ephemeral one, a memory database is returned.
|
||||
func (ctx *ServiceContext) Database(name string, cache int) (ethdb.Database, error) {
|
||||
if ctx.dataDir == "" {
|
||||
if ctx.datadir == "" {
|
||||
return ethdb.NewMemDatabase()
|
||||
}
|
||||
return ethdb.NewLDBDatabase(filepath.Join(ctx.dataDir, name), cache)
|
||||
return ethdb.NewLDBDatabase(filepath.Join(ctx.datadir, name), cache)
|
||||
}
|
||||
|
||||
// Service retrieves an already constructed service registered under a given id.
|
||||
func (ctx *ServiceContext) Service(id string) Service {
|
||||
return ctx.services[id]
|
||||
}
|
||||
|
||||
// SingletonService retrieves an already constructed service using a specific type
|
||||
// implementing the Service interface. This is a utility function for scenarios
|
||||
// where it is known that only one instance of a given service type is running,
|
||||
// allowing to access services without needing to know their specific id with
|
||||
// which they were registered.
|
||||
func (ctx *ServiceContext) SingletonService(service interface{}) (string, error) {
|
||||
for id, running := range ctx.services {
|
||||
if reflect.TypeOf(running) == reflect.ValueOf(service).Elem().Type() {
|
||||
reflect.ValueOf(service).Elem().Set(reflect.ValueOf(running))
|
||||
return id, nil
|
||||
}
|
||||
}
|
||||
return "", ErrServiceUnknown
|
||||
}
|
||||
|
||||
// ServiceConstructor is the function signature of the constructors needed to be
|
||||
@ -58,8 +80,9 @@ type Service interface {
|
||||
// Protocol retrieves the P2P protocols the service wishes to start.
|
||||
Protocols() []p2p.Protocol
|
||||
|
||||
// Start spawns any goroutines required by the service.
|
||||
Start() error
|
||||
// Start is called after all services have been constructed and the networking
|
||||
// layer was also initialized to spawn any goroutines required by the service.
|
||||
Start(server *p2p.Server) error
|
||||
|
||||
// Stop terminates all goroutines belonging to the service, blocking until they
|
||||
// are all terminated.
|
||||
|
@ -17,14 +17,16 @@
|
||||
package node
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Tests that service context methods work properly.
|
||||
func TestServiceContext(t *testing.T) {
|
||||
// Tests that databases are correctly created persistent or ephemeral based on
|
||||
// the configured service context.
|
||||
func TestContextDatabases(t *testing.T) {
|
||||
// Create a temporary folder and ensure no database is contained within
|
||||
dir, err := ioutil.TempDir("", "")
|
||||
if err != nil {
|
||||
@ -36,7 +38,7 @@ func TestServiceContext(t *testing.T) {
|
||||
t.Fatalf("non-created database already exists")
|
||||
}
|
||||
// Request the opening/creation of a database and ensure it persists to disk
|
||||
ctx := &ServiceContext{dataDir: dir}
|
||||
ctx := &ServiceContext{datadir: dir}
|
||||
db, err := ctx.Database("persistent", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to open persistent database: %v", err)
|
||||
@ -47,7 +49,7 @@ func TestServiceContext(t *testing.T) {
|
||||
t.Fatalf("persistent database doesn't exists: %v", err)
|
||||
}
|
||||
// Request th opening/creation of an ephemeral database and ensure it's not persisted
|
||||
ctx = &ServiceContext{dataDir: ""}
|
||||
ctx = &ServiceContext{datadir: ""}
|
||||
db, err = ctx.Database("ephemeral", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to open ephemeral database: %v", err)
|
||||
@ -58,3 +60,47 @@ func TestServiceContext(t *testing.T) {
|
||||
t.Fatalf("ephemeral database exists")
|
||||
}
|
||||
}
|
||||
|
||||
// Tests that already constructed services can be retrieves by later ones.
|
||||
func TestContextServices(t *testing.T) {
|
||||
stack, err := New(testNodeConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create protocol stack: %v", err)
|
||||
}
|
||||
// Define a set of services, constructed before/after a verifier
|
||||
formers := []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J"}
|
||||
latters := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}
|
||||
|
||||
verifier := func(ctx *ServiceContext) (Service, error) {
|
||||
for i, id := range formers {
|
||||
if ctx.Service(id) == nil {
|
||||
return nil, fmt.Errorf("former %d: service not found", i)
|
||||
}
|
||||
}
|
||||
for i, id := range latters {
|
||||
if ctx.Service(id) != nil {
|
||||
return nil, fmt.Errorf("latters %d: service found", i)
|
||||
}
|
||||
}
|
||||
return new(NoopService), nil
|
||||
}
|
||||
// Register the collection of services
|
||||
for i, id := range formers {
|
||||
if err := stack.Register(id, NewNoopService); err != nil {
|
||||
t.Fatalf("former #%d: failed to register service: %v", i, err)
|
||||
}
|
||||
}
|
||||
if err := stack.Register("verifier", verifier); err != nil {
|
||||
t.Fatalf("failed to register service verifier: %v", err)
|
||||
}
|
||||
for i, id := range latters {
|
||||
if err := stack.Register(id, NewNoopService); err != nil {
|
||||
t.Fatalf("latter #%d: failed to register service: %v", i, err)
|
||||
}
|
||||
}
|
||||
// Start the protocol stack and ensure services are constructed in order
|
||||
if err := stack.Start(); err != nil {
|
||||
t.Fatalf("failed to start stack: %v", err)
|
||||
}
|
||||
defer stack.Stop()
|
||||
}
|
||||
|
Reference in New Issue
Block a user