node: refactor package node (#21105)

This PR significantly changes the APIs for instantiating Ethereum nodes in
a Go program. The new APIs are not backwards-compatible, but we feel that
this is made up for by the much simpler way of registering services on
node.Node. You can find more information and rationale in the design
document: https://gist.github.com/renaynay/5bec2de19fde66f4d04c535fd24f0775.

There is also a new feature in Node's Go API: it is now possible to
register arbitrary handlers on the user-facing HTTP server. In geth, this
facility is used to enable GraphQL.

There is a single minor change relevant for geth users in this PR: The
GraphQL API is no longer available separately from the JSON-RPC HTTP
server. If you want GraphQL, you need to enable it using the
./geth --http --graphql flag combination.

The --graphql.port and --graphql.addr flags are no longer available.
This commit is contained in:
rene
2020-08-03 19:40:46 +02:00
committed by GitHub
parent b2b14e6ce3
commit c0c01612e9
63 changed files with 2606 additions and 2887 deletions

View File

@ -75,11 +75,11 @@ func (e *ExecAdapter) Name() string {
// NewNode returns a new ExecNode using the given config
func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) {
if len(config.Services) == 0 {
return nil, errors.New("node must have at least one service")
if len(config.Lifecycles) == 0 {
return nil, errors.New("node must have at least one service lifecycle")
}
for _, service := range config.Services {
if _, exists := serviceFuncs[service]; !exists {
for _, service := range config.Lifecycles {
if _, exists := lifecycleConstructorFuncs[service]; !exists {
return nil, fmt.Errorf("unknown node service %q", service)
}
}
@ -263,7 +263,7 @@ func (n *ExecNode) waitForStartupJSON(ctx context.Context) (string, chan nodeSta
func (n *ExecNode) execCommand() *exec.Cmd {
return &exec.Cmd{
Path: reexec.Self(),
Args: []string{"p2p-node", strings.Join(n.Config.Node.Services, ","), n.ID.String()},
Args: []string{"p2p-node", strings.Join(n.Config.Node.Lifecycles, ","), n.ID.String()},
}
}
@ -400,7 +400,7 @@ func execP2PNode() {
defer signal.Stop(sigc)
<-sigc
log.Info("Received SIGTERM, shutting down...")
stack.Stop()
stack.Close()
}()
stack.Wait() // Wait for the stack to exit.
}
@ -434,44 +434,36 @@ func startExecNodeStack() (*node.Node, error) {
return nil, fmt.Errorf("error creating node stack: %v", err)
}
// register the services, collecting them into a map so we can wrap
// them in a snapshot service
services := make(map[string]node.Service, len(serviceNames))
// Register the services, collecting them into a map so they can
// be accessed by the snapshot API.
services := make(map[string]node.Lifecycle, len(serviceNames))
for _, name := range serviceNames {
serviceFunc, exists := serviceFuncs[name]
lifecycleFunc, exists := lifecycleConstructorFuncs[name]
if !exists {
return nil, fmt.Errorf("unknown node service %q", err)
}
constructor := func(nodeCtx *node.ServiceContext) (node.Service, error) {
ctx := &ServiceContext{
RPCDialer: &wsRPCDialer{addrs: conf.PeerAddrs},
NodeContext: nodeCtx,
Config: conf.Node,
}
if conf.Snapshots != nil {
ctx.Snapshot = conf.Snapshots[name]
}
service, err := serviceFunc(ctx)
if err != nil {
return nil, err
}
services[name] = service
return service, nil
ctx := &ServiceContext{
RPCDialer: &wsRPCDialer{addrs: conf.PeerAddrs},
Config: conf.Node,
}
if err := stack.Register(constructor); err != nil {
return stack, fmt.Errorf("error registering service %q: %v", name, err)
if conf.Snapshots != nil {
ctx.Snapshot = conf.Snapshots[name]
}
service, err := lifecycleFunc(ctx, stack)
if err != nil {
return nil, err
}
services[name] = service
stack.RegisterLifecycle(service)
}
// register the snapshot service
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return &snapshotService{services}, nil
})
if err != nil {
return stack, fmt.Errorf("error starting snapshot service: %v", err)
}
// Add the snapshot API.
stack.RegisterAPIs([]rpc.API{{
Namespace: "simulation",
Version: "1.0",
Service: SnapshotAPI{services},
}})
// start the stack
if err = stack.Start(); err != nil {
err = fmt.Errorf("error starting stack: %v", err)
}
@ -490,35 +482,9 @@ type nodeStartupJSON struct {
NodeInfo *p2p.NodeInfo
}
// snapshotService is a node.Service which wraps a list of services and
// exposes an API to generate a snapshot of those services
type snapshotService struct {
services map[string]node.Service
}
func (s *snapshotService) APIs() []rpc.API {
return []rpc.API{{
Namespace: "simulation",
Version: "1.0",
Service: SnapshotAPI{s.services},
}}
}
func (s *snapshotService) Protocols() []p2p.Protocol {
return nil
}
func (s *snapshotService) Start(*p2p.Server) error {
return nil
}
func (s *snapshotService) Stop() error {
return nil
}
// SnapshotAPI provides an RPC method to create snapshots of services
type SnapshotAPI struct {
services map[string]node.Service
services map[string]node.Lifecycle
}
func (api SnapshotAPI) Snapshot() (map[string][]byte, error) {

View File

@ -37,29 +37,21 @@ import (
// SimAdapter is a NodeAdapter which creates in-memory simulation nodes and
// connects them using net.Pipe
type SimAdapter struct {
pipe func() (net.Conn, net.Conn, error)
mtx sync.RWMutex
nodes map[enode.ID]*SimNode
services map[string]ServiceFunc
pipe func() (net.Conn, net.Conn, error)
mtx sync.RWMutex
nodes map[enode.ID]*SimNode
lifecycles LifecycleConstructors
}
// NewSimAdapter creates a SimAdapter which is capable of running in-memory
// simulation nodes running any of the given services (the services to run on a
// particular node are passed to the NewNode function in the NodeConfig)
// the adapter uses a net.Pipe for in-memory simulated network connections
func NewSimAdapter(services map[string]ServiceFunc) *SimAdapter {
func NewSimAdapter(services LifecycleConstructors) *SimAdapter {
return &SimAdapter{
pipe: pipes.NetPipe,
nodes: make(map[enode.ID]*SimNode),
services: services,
}
}
func NewTCPAdapter(services map[string]ServiceFunc) *SimAdapter {
return &SimAdapter{
pipe: pipes.TCPPipe,
nodes: make(map[enode.ID]*SimNode),
services: services,
pipe: pipes.NetPipe,
nodes: make(map[enode.ID]*SimNode),
lifecycles: services,
}
}
@ -85,11 +77,11 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
}
// check the services are valid
if len(config.Services) == 0 {
if len(config.Lifecycles) == 0 {
return nil, errors.New("node must have at least one service")
}
for _, service := range config.Services {
if _, exists := s.services[service]; !exists {
for _, service := range config.Lifecycles {
if _, exists := s.lifecycles[service]; !exists {
return nil, fmt.Errorf("unknown node service %q", service)
}
}
@ -119,7 +111,7 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
config: config,
node: n,
adapter: s,
running: make(map[string]node.Service),
running: make(map[string]node.Lifecycle),
}
s.nodes[id] = simNode
return simNode, nil
@ -155,11 +147,7 @@ func (s *SimAdapter) DialRPC(id enode.ID) (*rpc.Client, error) {
if !ok {
return nil, fmt.Errorf("unknown node: %s", id)
}
handler, err := node.node.RPCHandler()
if err != nil {
return nil, err
}
return rpc.DialInProc(handler), nil
return node.node.Attach()
}
// GetNode returns the node with the given ID if it exists
@ -179,7 +167,7 @@ type SimNode struct {
config *NodeConfig
adapter *SimAdapter
node *node.Node
running map[string]node.Service
running map[string]node.Lifecycle
client *rpc.Client
registerOnce sync.Once
}
@ -227,7 +215,7 @@ func (sn *SimNode) ServeRPC(conn *websocket.Conn) error {
// simulation_snapshot RPC method
func (sn *SimNode) Snapshots() (map[string][]byte, error) {
sn.lock.RLock()
services := make(map[string]node.Service, len(sn.running))
services := make(map[string]node.Lifecycle, len(sn.running))
for name, service := range sn.running {
services[name] = service
}
@ -252,35 +240,30 @@ func (sn *SimNode) Snapshots() (map[string][]byte, error) {
// Start registers the services and starts the underlying devp2p node
func (sn *SimNode) Start(snapshots map[string][]byte) error {
newService := func(name string) func(ctx *node.ServiceContext) (node.Service, error) {
return func(nodeCtx *node.ServiceContext) (node.Service, error) {
ctx := &ServiceContext{
RPCDialer: sn.adapter,
NodeContext: nodeCtx,
Config: sn.config,
}
if snapshots != nil {
ctx.Snapshot = snapshots[name]
}
serviceFunc := sn.adapter.services[name]
service, err := serviceFunc(ctx)
if err != nil {
return nil, err
}
sn.running[name] = service
return service, nil
}
}
// ensure we only register the services once in the case of the node
// being stopped and then started again
var regErr error
sn.registerOnce.Do(func() {
for _, name := range sn.config.Services {
if err := sn.node.Register(newService(name)); err != nil {
for _, name := range sn.config.Lifecycles {
ctx := &ServiceContext{
RPCDialer: sn.adapter,
Config: sn.config,
}
if snapshots != nil {
ctx.Snapshot = snapshots[name]
}
serviceFunc := sn.adapter.lifecycles[name]
service, err := serviceFunc(ctx, sn.node)
if err != nil {
regErr = err
break
}
// if the service has already been registered, don't register it again.
if _, ok := sn.running[name]; ok {
continue
}
sn.running[name] = service
sn.node.RegisterLifecycle(service)
}
})
if regErr != nil {
@ -292,13 +275,12 @@ func (sn *SimNode) Start(snapshots map[string][]byte) error {
}
// create an in-process RPC client
handler, err := sn.node.RPCHandler()
client, err := sn.node.Attach()
if err != nil {
return err
}
sn.lock.Lock()
sn.client = rpc.DialInProc(handler)
sn.client = client
sn.lock.Unlock()
return nil
@ -312,21 +294,21 @@ func (sn *SimNode) Stop() error {
sn.client = nil
}
sn.lock.Unlock()
return sn.node.Stop()
return sn.node.Close()
}
// Service returns a running service by name
func (sn *SimNode) Service(name string) node.Service {
func (sn *SimNode) Service(name string) node.Lifecycle {
sn.lock.RLock()
defer sn.lock.RUnlock()
return sn.running[name]
}
// Services returns a copy of the underlying services
func (sn *SimNode) Services() []node.Service {
func (sn *SimNode) Services() []node.Lifecycle {
sn.lock.RLock()
defer sn.lock.RUnlock()
services := make([]node.Service, 0, len(sn.running))
services := make([]node.Lifecycle, 0, len(sn.running))
for _, service := range sn.running {
services = append(services, service)
}
@ -334,10 +316,10 @@ func (sn *SimNode) Services() []node.Service {
}
// ServiceMap returns a map by names of the underlying services
func (sn *SimNode) ServiceMap() map[string]node.Service {
func (sn *SimNode) ServiceMap() map[string]node.Lifecycle {
sn.lock.RLock()
defer sn.lock.RUnlock()
services := make(map[string]node.Service, len(sn.running))
services := make(map[string]node.Lifecycle, len(sn.running))
for name, service := range sn.running {
services[name] = service
}

View File

@ -96,11 +96,11 @@ type NodeConfig struct {
// Use an existing database instead of a temporary one if non-empty
DataDir string
// Services are the names of the services which should be run when
// starting the node (for SimNodes it should be the names of services
// contained in SimAdapter.services, for other nodes it should be
// services registered by calling the RegisterService function)
Services []string
// Lifecycles are the names of the service lifecycles which should be run when
// starting the node (for SimNodes it should be the names of service lifecycles
// contained in SimAdapter.lifecycles, for other nodes it should be
// service lifecycles registered by calling the RegisterLifecycle function)
Lifecycles []string
// Properties are the names of the properties this node should hold
// within running services (e.g. "bootnode", "lightnode" or any custom values)
@ -137,7 +137,7 @@ func (n *NodeConfig) MarshalJSON() ([]byte, error) {
confJSON := nodeConfigJSON{
ID: n.ID.String(),
Name: n.Name,
Services: n.Services,
Services: n.Lifecycles,
Properties: n.Properties,
Port: n.Port,
EnableMsgEvents: n.EnableMsgEvents,
@ -175,7 +175,7 @@ func (n *NodeConfig) UnmarshalJSON(data []byte) error {
}
n.Name = confJSON.Name
n.Services = confJSON.Services
n.Lifecycles = confJSON.Services
n.Properties = confJSON.Properties
n.Port = confJSON.Port
n.EnableMsgEvents = confJSON.EnableMsgEvents
@ -233,9 +233,8 @@ func assignTCPPort() (uint16, error) {
type ServiceContext struct {
RPCDialer
NodeContext *node.ServiceContext
Config *NodeConfig
Snapshot []byte
Config *NodeConfig
Snapshot []byte
}
// RPCDialer is used when initialising services which need to connect to
@ -245,27 +244,29 @@ type RPCDialer interface {
DialRPC(id enode.ID) (*rpc.Client, error)
}
// Services is a collection of services which can be run in a simulation
type Services map[string]ServiceFunc
// LifecycleConstructor allows a Lifecycle to be constructed during node start-up.
// While the service-specific package usually takes care of Lifecycle creation and registration,
// for testing purposes, it is useful to be able to construct a Lifecycle on spot.
type LifecycleConstructor func(ctx *ServiceContext, stack *node.Node) (node.Lifecycle, error)
// ServiceFunc returns a node.Service which can be used to boot a devp2p node
type ServiceFunc func(ctx *ServiceContext) (node.Service, error)
// LifecycleConstructors stores LifecycleConstructor functions to call during node start-up.
type LifecycleConstructors map[string]LifecycleConstructor
// serviceFuncs is a map of registered services which are used to boot devp2p
// lifecycleConstructorFuncs is a map of registered services which are used to boot devp2p
// nodes
var serviceFuncs = make(Services)
var lifecycleConstructorFuncs = make(LifecycleConstructors)
// RegisterServices registers the given Services which can then be used to
// RegisterLifecycles registers the given Services which can then be used to
// start devp2p nodes using either the Exec or Docker adapters.
//
// It should be called in an init function so that it has the opportunity to
// execute the services before main() is called.
func RegisterServices(services Services) {
for name, f := range services {
if _, exists := serviceFuncs[name]; exists {
func RegisterLifecycles(lifecycles LifecycleConstructors) {
for name, f := range lifecycles {
if _, exists := lifecycleConstructorFuncs[name]; exists {
panic(fmt.Sprintf("node service already exists: %q", name))
}
serviceFuncs[name] = f
lifecycleConstructorFuncs[name] = f
}
// now we have registered the services, run reexec.Init() which will

View File

@ -26,8 +26,8 @@ import (
func newTestNetwork(t *testing.T, nodeCount int) (*Network, []enode.ID) {
t.Helper()
adapter := adapters.NewSimAdapter(adapters.Services{
"noopwoop": func(ctx *adapters.ServiceContext) (node.Service, error) {
adapter := adapters.NewSimAdapter(adapters.LifecycleConstructors{
"noopwoop": func(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) {
return NewNoopService(nil), nil
},
})

View File

@ -31,7 +31,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rpc"
)
var adapterType = flag.String("adapter", "sim", `node adapter to use (one of "sim", "exec" or "docker")`)
@ -45,12 +44,14 @@ func main() {
log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(false))))
// register a single ping-pong service
services := map[string]adapters.ServiceFunc{
"ping-pong": func(ctx *adapters.ServiceContext) (node.Service, error) {
return newPingPongService(ctx.Config.ID), nil
services := map[string]adapters.LifecycleConstructor{
"ping-pong": func(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) {
pps := newPingPongService(ctx.Config.ID)
stack.RegisterProtocols(pps.Protocols())
return pps, nil
},
}
adapters.RegisterServices(services)
adapters.RegisterLifecycles(services)
// create the NodeAdapter
var adapter adapters.NodeAdapter
@ -110,11 +111,7 @@ func (p *pingPongService) Protocols() []p2p.Protocol {
}}
}
func (p *pingPongService) APIs() []rpc.API {
return nil
}
func (p *pingPongService) Start(server *p2p.Server) error {
func (p *pingPongService) Start() error {
p.log.Info("ping-pong service starting")
return nil
}

View File

@ -64,12 +64,15 @@ type testService struct {
state atomic.Value
}
func newTestService(ctx *adapters.ServiceContext) (node.Service, error) {
func newTestService(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) {
svc := &testService{
id: ctx.Config.ID,
peers: make(map[enode.ID]*testPeer),
}
svc.state.Store(ctx.Snapshot)
stack.RegisterProtocols(svc.Protocols())
stack.RegisterAPIs(svc.APIs())
return svc, nil
}
@ -126,7 +129,7 @@ func (t *testService) APIs() []rpc.API {
}}
}
func (t *testService) Start(server *p2p.Server) error {
func (t *testService) Start() error {
return nil
}
@ -288,7 +291,7 @@ func (t *TestAPI) Events(ctx context.Context) (*rpc.Subscription, error) {
return rpcSub, nil
}
var testServices = adapters.Services{
var testServices = adapters.LifecycleConstructors{
"test": newTestService,
}

View File

@ -110,8 +110,8 @@ func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error)
}
// if no services are configured, use the default service
if len(conf.Services) == 0 {
conf.Services = []string{net.DefaultService}
if len(conf.Lifecycles) == 0 {
conf.Lifecycles = []string{net.DefaultService}
}
// use the NodeAdapter to create the node
@ -913,19 +913,19 @@ func (net *Network) snapshot(addServices []string, removeServices []string) (*Sn
snap.Nodes[i].Snapshots = snapshots
for _, addSvc := range addServices {
haveSvc := false
for _, svc := range snap.Nodes[i].Node.Config.Services {
for _, svc := range snap.Nodes[i].Node.Config.Lifecycles {
if svc == addSvc {
haveSvc = true
break
}
}
if !haveSvc {
snap.Nodes[i].Node.Config.Services = append(snap.Nodes[i].Node.Config.Services, addSvc)
snap.Nodes[i].Node.Config.Lifecycles = append(snap.Nodes[i].Node.Config.Lifecycles, addSvc)
}
}
if len(removeServices) > 0 {
var cleanedServices []string
for _, svc := range snap.Nodes[i].Node.Config.Services {
for _, svc := range snap.Nodes[i].Node.Config.Lifecycles {
haveSvc := false
for _, rmSvc := range removeServices {
if rmSvc == svc {
@ -938,7 +938,7 @@ func (net *Network) snapshot(addServices []string, removeServices []string) (*Sn
}
}
snap.Nodes[i].Node.Config.Services = cleanedServices
snap.Nodes[i].Node.Config.Lifecycles = cleanedServices
}
}
for _, conn := range net.Conns {

View File

@ -41,8 +41,8 @@ func TestSnapshot(t *testing.T) {
// create snapshot from ring network
// this is a minimal service, whose protocol will take exactly one message OR close of connection before quitting
adapter := adapters.NewSimAdapter(adapters.Services{
"noopwoop": func(ctx *adapters.ServiceContext) (node.Service, error) {
adapter := adapters.NewSimAdapter(adapters.LifecycleConstructors{
"noopwoop": func(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) {
return NewNoopService(nil), nil
},
})
@ -165,8 +165,8 @@ OUTER:
// PART II
// load snapshot and verify that exactly same connections are formed
adapter = adapters.NewSimAdapter(adapters.Services{
"noopwoop": func(ctx *adapters.ServiceContext) (node.Service, error) {
adapter = adapters.NewSimAdapter(adapters.LifecycleConstructors{
"noopwoop": func(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) {
return NewNoopService(nil), nil
},
})
@ -256,8 +256,8 @@ OuterTwo:
t.Run("conns after load", func(t *testing.T) {
// Create new network.
n := NewNetwork(
adapters.NewSimAdapter(adapters.Services{
"noopwoop": func(ctx *adapters.ServiceContext) (node.Service, error) {
adapters.NewSimAdapter(adapters.LifecycleConstructors{
"noopwoop": func(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) {
return NewNoopService(nil), nil
},
}),
@ -288,7 +288,7 @@ OuterTwo:
// with each other and that a snapshot fully represents the desired topology
func TestNetworkSimulation(t *testing.T) {
// create simulation network with 20 testService nodes
adapter := adapters.NewSimAdapter(adapters.Services{
adapter := adapters.NewSimAdapter(adapters.LifecycleConstructors{
"test": newTestService,
})
network := NewNetwork(adapter, &NetworkConfig{
@ -437,7 +437,7 @@ func createTestNodesWithProperty(property string, count int, network *Network) (
// It then tests again whilst excluding a node ID from being returned.
// If a node ID is not returned, or more node IDs than expected are returned, the test fails.
func TestGetNodeIDs(t *testing.T) {
adapter := adapters.NewSimAdapter(adapters.Services{
adapter := adapters.NewSimAdapter(adapters.LifecycleConstructors{
"test": newTestService,
})
network := NewNetwork(adapter, &NetworkConfig{
@ -486,7 +486,7 @@ func TestGetNodeIDs(t *testing.T) {
// It then tests again whilst excluding a node from being returned.
// If a node is not returned, or more nodes than expected are returned, the test fails.
func TestGetNodes(t *testing.T) {
adapter := adapters.NewSimAdapter(adapters.Services{
adapter := adapters.NewSimAdapter(adapters.LifecycleConstructors{
"test": newTestService,
})
network := NewNetwork(adapter, &NetworkConfig{
@ -534,7 +534,7 @@ func TestGetNodes(t *testing.T) {
// TestGetNodesByID creates a set of nodes and attempts to retrieve a subset of them by ID
// If a node is not returned, or more nodes than expected are returned, the test fails.
func TestGetNodesByID(t *testing.T) {
adapter := adapters.NewSimAdapter(adapters.Services{
adapter := adapters.NewSimAdapter(adapters.LifecycleConstructors{
"test": newTestService,
})
network := NewNetwork(adapter, &NetworkConfig{
@ -579,7 +579,7 @@ func TestGetNodesByID(t *testing.T) {
// GetNodesByProperty is then checked for correctness by comparing the nodes returned to those initially created.
// If a node with a property is not found, or more nodes than expected are returned, the test fails.
func TestGetNodesByProperty(t *testing.T) {
adapter := adapters.NewSimAdapter(adapters.Services{
adapter := adapters.NewSimAdapter(adapters.LifecycleConstructors{
"test": newTestService,
})
network := NewNetwork(adapter, &NetworkConfig{
@ -624,7 +624,7 @@ func TestGetNodesByProperty(t *testing.T) {
// GetNodeIDsByProperty is then checked for correctness by comparing the node IDs returned to those initially created.
// If a node ID with a property is not found, or more nodes IDs than expected are returned, the test fails.
func TestGetNodeIDsByProperty(t *testing.T) {
adapter := adapters.NewSimAdapter(adapters.Services{
adapter := adapters.NewSimAdapter(adapters.LifecycleConstructors{
"test": newTestService,
})
network := NewNetwork(adapter, &NetworkConfig{
@ -705,8 +705,8 @@ func benchmarkMinimalServiceTmp(b *testing.B) {
// this is a minimal service, whose protocol will close a channel upon run of protocol
// making it possible to bench the time it takes for the service to start and protocol actually to be run
protoCMap := make(map[enode.ID]map[enode.ID]chan struct{})
adapter := adapters.NewSimAdapter(adapters.Services{
"noopwoop": func(ctx *adapters.ServiceContext) (node.Service, error) {
adapter := adapters.NewSimAdapter(adapters.LifecycleConstructors{
"noopwoop": func(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) {
protoCMap[ctx.Config.ID] = make(map[enode.ID]chan struct{})
svc := NewNoopService(protoCMap[ctx.Config.ID])
return svc, nil

View File

@ -66,7 +66,7 @@ func (t *NoopService) APIs() []rpc.API {
return []rpc.API{}
}
func (t *NoopService) Start(server *p2p.Server) error {
func (t *NoopService) Start() error {
return nil
}

View File

@ -1,67 +0,0 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package testing
import (
"fmt"
"sync"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
)
type TestPeer interface {
ID() enode.ID
Drop()
}
// TestPeerPool is an example peerPool to demonstrate registration of peer connections
type TestPeerPool struct {
lock sync.Mutex
peers map[enode.ID]TestPeer
}
func NewTestPeerPool() *TestPeerPool {
return &TestPeerPool{peers: make(map[enode.ID]TestPeer)}
}
func (p *TestPeerPool) Add(peer TestPeer) {
p.lock.Lock()
defer p.lock.Unlock()
log.Trace(fmt.Sprintf("pp add peer %v", peer.ID()))
p.peers[peer.ID()] = peer
}
func (p *TestPeerPool) Remove(peer TestPeer) {
p.lock.Lock()
defer p.lock.Unlock()
delete(p.peers, peer.ID())
}
func (p *TestPeerPool) Has(id enode.ID) bool {
p.lock.Lock()
defer p.lock.Unlock()
_, ok := p.peers[id]
return ok
}
func (p *TestPeerPool) Get(id enode.ID) TestPeer {
p.lock.Lock()
defer p.lock.Unlock()
return p.peers[id]
}

View File

@ -1,283 +0,0 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package testing
import (
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
)
var errTimedOut = errors.New("timed out")
// ProtocolSession is a quasi simulation of a pivot node running
// a service and a number of dummy peers that can send (trigger) or
// receive (expect) messages
type ProtocolSession struct {
Server *p2p.Server
Nodes []*enode.Node
adapter *adapters.SimAdapter
events chan *p2p.PeerEvent
}
// Exchange is the basic units of protocol tests
// the triggers and expects in the arrays are run immediately and asynchronously
// thus one cannot have multiple expects for the SAME peer with DIFFERENT message types
// because it's unpredictable which expect will receive which message
// (with expect #1 and #2, messages might be sent #2 and #1, and both expects will complain about wrong message code)
// an exchange is defined on a session
type Exchange struct {
Label string
Triggers []Trigger
Expects []Expect
Timeout time.Duration
}
// Trigger is part of the exchange, incoming message for the pivot node
// sent by a peer
type Trigger struct {
Msg interface{} // type of message to be sent
Code uint64 // code of message is given
Peer enode.ID // the peer to send the message to
Timeout time.Duration // timeout duration for the sending
}
// Expect is part of an exchange, outgoing message from the pivot node
// received by a peer
type Expect struct {
Msg interface{} // type of message to expect
Code uint64 // code of message is now given
Peer enode.ID // the peer that expects the message
Timeout time.Duration // timeout duration for receiving
}
// Disconnect represents a disconnect event, used and checked by TestDisconnected
type Disconnect struct {
Peer enode.ID // discconnected peer
Error error // disconnect reason
}
// trigger sends messages from peers
func (s *ProtocolSession) trigger(trig Trigger) error {
simNode, ok := s.adapter.GetNode(trig.Peer)
if !ok {
return fmt.Errorf("trigger: peer %v does not exist (1- %v)", trig.Peer, len(s.Nodes))
}
mockNode, ok := simNode.Services()[0].(*mockNode)
if !ok {
return fmt.Errorf("trigger: peer %v is not a mock", trig.Peer)
}
errc := make(chan error)
go func() {
log.Trace(fmt.Sprintf("trigger %v (%v)....", trig.Msg, trig.Code))
errc <- mockNode.Trigger(&trig)
log.Trace(fmt.Sprintf("triggered %v (%v)", trig.Msg, trig.Code))
}()
t := trig.Timeout
if t == time.Duration(0) {
t = 1000 * time.Millisecond
}
select {
case err := <-errc:
return err
case <-time.After(t):
return fmt.Errorf("timout expecting %v to send to peer %v", trig.Msg, trig.Peer)
}
}
// expect checks an expectation of a message sent out by the pivot node
func (s *ProtocolSession) expect(exps []Expect) error {
// construct a map of expectations for each node
peerExpects := make(map[enode.ID][]Expect)
for _, exp := range exps {
if exp.Msg == nil {
return errors.New("no message to expect")
}
peerExpects[exp.Peer] = append(peerExpects[exp.Peer], exp)
}
// construct a map of mockNodes for each node
mockNodes := make(map[enode.ID]*mockNode)
for nodeID := range peerExpects {
simNode, ok := s.adapter.GetNode(nodeID)
if !ok {
return fmt.Errorf("trigger: peer %v does not exist (1- %v)", nodeID, len(s.Nodes))
}
mockNode, ok := simNode.Services()[0].(*mockNode)
if !ok {
return fmt.Errorf("trigger: peer %v is not a mock", nodeID)
}
mockNodes[nodeID] = mockNode
}
// done chanell cancels all created goroutines when function returns
done := make(chan struct{})
defer close(done)
// errc catches the first error from
errc := make(chan error)
wg := &sync.WaitGroup{}
wg.Add(len(mockNodes))
for nodeID, mockNode := range mockNodes {
nodeID := nodeID
mockNode := mockNode
go func() {
defer wg.Done()
// Sum all Expect timeouts to give the maximum
// time for all expectations to finish.
// mockNode.Expect checks all received messages against
// a list of expected messages and timeout for each
// of them can not be checked separately.
var t time.Duration
for _, exp := range peerExpects[nodeID] {
if exp.Timeout == time.Duration(0) {
t += 2000 * time.Millisecond
} else {
t += exp.Timeout
}
}
alarm := time.NewTimer(t)
defer alarm.Stop()
// expectErrc is used to check if error returned
// from mockNode.Expect is not nil and to send it to
// errc only in that case.
// done channel will be closed when function
expectErrc := make(chan error)
go func() {
select {
case expectErrc <- mockNode.Expect(peerExpects[nodeID]...):
case <-done:
case <-alarm.C:
}
}()
select {
case err := <-expectErrc:
if err != nil {
select {
case errc <- err:
case <-done:
case <-alarm.C:
errc <- errTimedOut
}
}
case <-done:
case <-alarm.C:
errc <- errTimedOut
}
}()
}
go func() {
wg.Wait()
// close errc when all goroutines finish to return nill err from errc
close(errc)
}()
return <-errc
}
// TestExchanges tests a series of exchanges against the session
func (s *ProtocolSession) TestExchanges(exchanges ...Exchange) error {
for i, e := range exchanges {
if err := s.testExchange(e); err != nil {
return fmt.Errorf("exchange #%d %q: %v", i, e.Label, err)
}
log.Trace(fmt.Sprintf("exchange #%d %q: run successfully", i, e.Label))
}
return nil
}
// testExchange tests a single Exchange.
// Default timeout value is 2 seconds.
func (s *ProtocolSession) testExchange(e Exchange) error {
errc := make(chan error)
done := make(chan struct{})
defer close(done)
go func() {
for _, trig := range e.Triggers {
err := s.trigger(trig)
if err != nil {
errc <- err
return
}
}
select {
case errc <- s.expect(e.Expects):
case <-done:
}
}()
// time out globally or finish when all expectations satisfied
t := e.Timeout
if t == 0 {
t = 2000 * time.Millisecond
}
alarm := time.NewTimer(t)
defer alarm.Stop()
select {
case err := <-errc:
return err
case <-alarm.C:
return errTimedOut
}
}
// TestDisconnected tests the disconnections given as arguments
// the disconnect structs describe what disconnect error is expected on which peer
func (s *ProtocolSession) TestDisconnected(disconnects ...*Disconnect) error {
expects := make(map[enode.ID]error)
for _, disconnect := range disconnects {
expects[disconnect.Peer] = disconnect.Error
}
timeout := time.After(time.Second)
for len(expects) > 0 {
select {
case event := <-s.events:
if event.Type != p2p.PeerEventTypeDrop {
continue
}
expectErr, ok := expects[event.Peer]
if !ok {
continue
}
if !(expectErr == nil && event.Error == "" || expectErr != nil && expectErr.Error() == event.Error) {
return fmt.Errorf("unexpected error on peer %v. expected '%v', got '%v'", event.Peer, expectErr, event.Error)
}
delete(expects, event.Peer)
case <-timeout:
return fmt.Errorf("timed out waiting for peers to disconnect")
}
}
return nil
}

View File

@ -1,284 +0,0 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
/*
the p2p/testing package provides a unit test scheme to check simple
protocol message exchanges with one pivot node and a number of dummy peers
The pivot test node runs a node.Service, the dummy peers run a mock node
that can be used to send and receive messages
*/
package testing
import (
"bytes"
"crypto/ecdsa"
"fmt"
"io"
"io/ioutil"
"strings"
"sync"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
)
// ProtocolTester is the tester environment used for unit testing protocol
// message exchanges. It uses p2p/simulations framework
type ProtocolTester struct {
*ProtocolSession
network *simulations.Network
}
// NewProtocolTester constructs a new ProtocolTester
// it takes as argument the pivot node id, the number of dummy peers and the
// protocol run function called on a peer connection by the p2p server
func NewProtocolTester(prvkey *ecdsa.PrivateKey, nodeCount int, run func(*p2p.Peer, p2p.MsgReadWriter) error) *ProtocolTester {
services := adapters.Services{
"test": func(ctx *adapters.ServiceContext) (node.Service, error) {
return &testNode{run}, nil
},
"mock": func(ctx *adapters.ServiceContext) (node.Service, error) {
return newMockNode(), nil
},
}
adapter := adapters.NewSimAdapter(services)
net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{})
nodeConfig := &adapters.NodeConfig{
PrivateKey: prvkey,
EnableMsgEvents: true,
Services: []string{"test"},
}
if _, err := net.NewNodeWithConfig(nodeConfig); err != nil {
panic(err.Error())
}
if err := net.Start(nodeConfig.ID); err != nil {
panic(err.Error())
}
node := net.GetNode(nodeConfig.ID).Node.(*adapters.SimNode)
peers := make([]*adapters.NodeConfig, nodeCount)
nodes := make([]*enode.Node, nodeCount)
for i := 0; i < nodeCount; i++ {
peers[i] = adapters.RandomNodeConfig()
peers[i].Services = []string{"mock"}
if _, err := net.NewNodeWithConfig(peers[i]); err != nil {
panic(fmt.Sprintf("error initializing peer %v: %v", peers[i].ID, err))
}
if err := net.Start(peers[i].ID); err != nil {
panic(fmt.Sprintf("error starting peer %v: %v", peers[i].ID, err))
}
nodes[i] = peers[i].Node()
}
events := make(chan *p2p.PeerEvent, 1000)
node.SubscribeEvents(events)
ps := &ProtocolSession{
Server: node.Server(),
Nodes: nodes,
adapter: adapter,
events: events,
}
self := &ProtocolTester{
ProtocolSession: ps,
network: net,
}
self.Connect(nodeConfig.ID, peers...)
return self
}
// Stop stops the p2p server
func (t *ProtocolTester) Stop() {
t.Server.Stop()
t.network.Shutdown()
}
// Connect brings up the remote peer node and connects it using the
// p2p/simulations network connection with the in memory network adapter
func (t *ProtocolTester) Connect(selfID enode.ID, peers ...*adapters.NodeConfig) {
for _, peer := range peers {
log.Trace(fmt.Sprintf("connect to %v", peer.ID))
if err := t.network.Connect(selfID, peer.ID); err != nil {
panic(fmt.Sprintf("error connecting to peer %v: %v", peer.ID, err))
}
}
}
// testNode wraps a protocol run function and implements the node.Service
// interface
type testNode struct {
run func(*p2p.Peer, p2p.MsgReadWriter) error
}
func (t *testNode) Protocols() []p2p.Protocol {
return []p2p.Protocol{{
Length: 100,
Run: t.run,
}}
}
func (t *testNode) APIs() []rpc.API {
return nil
}
func (t *testNode) Start(server *p2p.Server) error {
return nil
}
func (t *testNode) Stop() error {
return nil
}
// mockNode is a testNode which doesn't actually run a protocol, instead
// exposing channels so that tests can manually trigger and expect certain
// messages
type mockNode struct {
testNode
trigger chan *Trigger
expect chan []Expect
err chan error
stop chan struct{}
stopOnce sync.Once
}
func newMockNode() *mockNode {
mock := &mockNode{
trigger: make(chan *Trigger),
expect: make(chan []Expect),
err: make(chan error),
stop: make(chan struct{}),
}
mock.testNode.run = mock.Run
return mock
}
// Run is a protocol run function which just loops waiting for tests to
// instruct it to either trigger or expect a message from the peer
func (m *mockNode) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
for {
select {
case trig := <-m.trigger:
wmsg := Wrap(trig.Msg)
m.err <- p2p.Send(rw, trig.Code, wmsg)
case exps := <-m.expect:
m.err <- expectMsgs(rw, exps)
case <-m.stop:
return nil
}
}
}
func (m *mockNode) Trigger(trig *Trigger) error {
m.trigger <- trig
return <-m.err
}
func (m *mockNode) Expect(exp ...Expect) error {
m.expect <- exp
return <-m.err
}
func (m *mockNode) Stop() error {
m.stopOnce.Do(func() { close(m.stop) })
return nil
}
func expectMsgs(rw p2p.MsgReadWriter, exps []Expect) error {
matched := make([]bool, len(exps))
for {
msg, err := rw.ReadMsg()
if err != nil {
if err == io.EOF {
break
}
return err
}
actualContent, err := ioutil.ReadAll(msg.Payload)
if err != nil {
return err
}
var found bool
for i, exp := range exps {
if exp.Code == msg.Code && bytes.Equal(actualContent, mustEncodeMsg(Wrap(exp.Msg))) {
if matched[i] {
return fmt.Errorf("message #%d received two times", i)
}
matched[i] = true
found = true
break
}
}
if !found {
expected := make([]string, 0)
for i, exp := range exps {
if matched[i] {
continue
}
expected = append(expected, fmt.Sprintf("code %d payload %x", exp.Code, mustEncodeMsg(Wrap(exp.Msg))))
}
return fmt.Errorf("unexpected message code %d payload %x, expected %s", msg.Code, actualContent, strings.Join(expected, " or "))
}
done := true
for _, m := range matched {
if !m {
done = false
break
}
}
if done {
return nil
}
}
for i, m := range matched {
if !m {
return fmt.Errorf("expected message #%d not received", i)
}
}
return nil
}
// mustEncodeMsg uses rlp to encode a message.
// In case of error it panics.
func mustEncodeMsg(msg interface{}) []byte {
contentEnc, err := rlp.EncodeToBytes(msg)
if err != nil {
panic("content encode error: " + err.Error())
}
return contentEnc
}
type WrappedMsg struct {
Context []byte
Size uint32
Payload []byte
}
func Wrap(msg interface{}) interface{} {
data, _ := rlp.EncodeToBytes(msg)
return &WrappedMsg{
Size: uint32(len(data)),
Payload: data,
}
}