p2p/protocols: import to codebase, remove from vendor (#1416)

This commit is contained in:
Rafael Matias
2019-06-04 14:32:44 +02:00
committed by Anton Evangelatov
parent 70320ceeae
commit 6d0902da3c
26 changed files with 1270 additions and 23 deletions

View File

@ -30,8 +30,8 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/pot"
)

View File

@ -6,9 +6,9 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/p2p/protocols"
)
// ENRAddrEntry is the entry type to store the bzz key in the enode

View File

@ -26,7 +26,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/pot"
)

View File

@ -26,9 +26,9 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/state"
)

View File

@ -30,8 +30,8 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/protocols"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/pot"
)

View File

@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethersphere/swarm/chunk"
@ -36,6 +35,7 @@ import (
"github.com/ethersphere/swarm/network"
pq "github.com/ethersphere/swarm/network/priorityqueue"
"github.com/ethersphere/swarm/network/simulation"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/state"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/testutil"

View File

@ -27,11 +27,11 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
"github.com/ethersphere/swarm/network/stream/intervals"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/state"
"github.com/ethersphere/swarm/storage"
)

View File

@ -0,0 +1,323 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package protocols
import (
"context"
"flag"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"reflect"
"sync"
"testing"
"time"
"github.com/mattn/go-colorable"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
)
const (
content = "123456789"
)
var (
nodes = flag.Int("nodes", 30, "number of nodes to create (default 30)")
msgs = flag.Int("msgs", 100, "number of messages sent by node (default 100)")
loglevel = flag.Int("loglevel", 0, "verbosity of logs")
rawlog = flag.Bool("rawlog", false, "remove terminal formatting from logs")
)
func init() {
flag.Parse()
log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(!*rawlog))))
}
//TestAccountingSimulation runs a p2p/simulations simulation
//It creates a *nodes number of nodes, connects each one with each other,
//then sends out a random selection of messages up to *msgs amount of messages
//from the test protocol spec.
//The spec has some accounted messages defined through the Prices interface.
//The test does accounting for all the message exchanged, and then checks
//that every node has the same balance with a peer, but with opposite signs.
//Balance(AwithB) = 0 - Balance(BwithA) or Abs|Balance(AwithB)| == Abs|Balance(BwithA)|
func TestAccountingSimulation(t *testing.T) {
//setup the balances objects for every node
bal := newBalances(*nodes)
//setup the metrics system or tests will fail trying to write metrics
dir, err := ioutil.TempDir("", "account-sim")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
SetupAccountingMetrics(1*time.Second, filepath.Join(dir, "metrics.db"))
//define the node.Service for this test
services := adapters.Services{
"accounting": func(ctx *adapters.ServiceContext) (node.Service, error) {
return bal.newNode(), nil
},
}
//setup the simulation
adapter := adapters.NewSimAdapter(services)
net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{DefaultService: "accounting"})
defer net.Shutdown()
// we send msgs messages per node, wait for all messages to arrive
bal.wg.Add(*nodes * *msgs)
trigger := make(chan enode.ID)
go func() {
// wait for all of them to arrive
bal.wg.Wait()
// then trigger a check
// the selected node for the trigger is irrelevant,
// we just want to trigger the end of the simulation
trigger <- net.Nodes[0].ID()
}()
// create nodes and start them
for i := 0; i < *nodes; i++ {
conf := adapters.RandomNodeConfig()
bal.id2n[conf.ID] = i
if _, err := net.NewNodeWithConfig(conf); err != nil {
t.Fatal(err)
}
if err := net.Start(conf.ID); err != nil {
t.Fatal(err)
}
}
// fully connect nodes
for i, n := range net.Nodes {
for _, m := range net.Nodes[i+1:] {
if err := net.Connect(n.ID(), m.ID()); err != nil {
t.Fatal(err)
}
}
}
// empty action
action := func(ctx context.Context) error {
return nil
}
// check always checks out
check := func(ctx context.Context, id enode.ID) (bool, error) {
return true, nil
}
// run simulation
timeout := 30 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{
Action: action,
Trigger: trigger,
Expect: &simulations.Expectation{
Nodes: []enode.ID{net.Nodes[0].ID()},
Check: check,
},
})
if result.Error != nil {
t.Fatal(result.Error)
}
// check if balance matrix is symmetric
if err := bal.symmetric(); err != nil {
t.Fatal(err)
}
}
// matrix is a matrix of nodes and its balances
// matrix is in fact a linear array of size n*n,
// so the balance for any node A with B is at index
// A*n + B, while the balance of node B with A is at
// B*n + A
// (n entries in the array will not be filled -
// the balance of a node with itself)
type matrix struct {
n int //number of nodes
m []int64 //array of balances
lock sync.Mutex
}
// create a new matrix
func newMatrix(n int) *matrix {
return &matrix{
n: n,
m: make([]int64, n*n),
}
}
// called from the testBalance's Add accounting function: register balance change
func (m *matrix) add(i, j int, v int64) error {
// index for the balance of local node i with remote nodde j is
// i * number of nodes + remote node
mi := i*m.n + j
// register that balance
m.lock.Lock()
m.m[mi] += v
m.lock.Unlock()
return nil
}
// check that the balances are symmetric:
// balance of node i with node j is the same as j with i but with inverted signs
func (m *matrix) symmetric() error {
//iterate all nodes
for i := 0; i < m.n; i++ {
//iterate starting +1
for j := i + 1; j < m.n; j++ {
log.Debug("bal", "1", i, "2", j, "i,j", m.m[i*m.n+j], "j,i", m.m[j*m.n+i])
if m.m[i*m.n+j] != -m.m[j*m.n+i] {
return fmt.Errorf("value mismatch. m[%v, %v] = %v; m[%v, %v] = %v", i, j, m.m[i*m.n+j], j, i, m.m[j*m.n+i])
}
}
}
return nil
}
// all the balances
type balances struct {
i int
*matrix
id2n map[enode.ID]int
wg *sync.WaitGroup
}
func newBalances(n int) *balances {
return &balances{
matrix: newMatrix(n),
id2n: make(map[enode.ID]int),
wg: &sync.WaitGroup{},
}
}
// create a new testNode for every node created as part of the service
func (b *balances) newNode() *testNode {
defer func() { b.i++ }()
return &testNode{
bal: b,
i: b.i,
peers: make([]*testPeer, b.n), //a node will be connected to n-1 peers
}
}
type testNode struct {
bal *balances
i int
lock sync.Mutex
peers []*testPeer
peerCount int
}
// do the accounting for the peer's test protocol
// testNode implements protocols.Balance
func (t *testNode) Add(a int64, p *Peer) error {
//get the index for the remote peer
remote := t.bal.id2n[p.ID()]
log.Debug("add", "local", t.i, "remote", remote, "amount", a)
return t.bal.add(t.i, remote, a)
}
//run the p2p protocol
//for every node, represented by testNode, create a remote testPeer
func (t *testNode) run(p *p2p.Peer, rw p2p.MsgReadWriter) error {
spec := createTestSpec()
//create accounting hook
spec.Hook = NewAccounting(t, &dummyPrices{})
//create a peer for this node
tp := &testPeer{NewPeer(p, rw, spec), t.i, t.bal.id2n[p.ID()], t.bal.wg}
t.lock.Lock()
t.peers[t.bal.id2n[p.ID()]] = tp
t.peerCount++
if t.peerCount == t.bal.n-1 {
//when all peer connections are established, start sending messages from this peer
go t.send()
}
t.lock.Unlock()
return tp.Run(tp.handle)
}
// p2p message receive handler function
func (tp *testPeer) handle(ctx context.Context, msg interface{}) error {
tp.wg.Done()
log.Debug("receive", "from", tp.remote, "to", tp.local, "type", reflect.TypeOf(msg), "msg", msg)
return nil
}
type testPeer struct {
*Peer
local, remote int
wg *sync.WaitGroup
}
func (t *testNode) send() {
log.Debug("start sending")
for i := 0; i < *msgs; i++ {
//determine randomly to which peer to send
whom := rand.Intn(t.bal.n - 1)
if whom >= t.i {
whom++
}
t.lock.Lock()
p := t.peers[whom]
t.lock.Unlock()
//determine a random message from the spec's messages to be sent
which := rand.Intn(len(p.spec.Messages))
msg := p.spec.Messages[which]
switch msg.(type) {
case *perBytesMsgReceiverPays:
msg = &perBytesMsgReceiverPays{Content: content[:rand.Intn(len(content))]}
case *perBytesMsgSenderPays:
msg = &perBytesMsgSenderPays{Content: content[:rand.Intn(len(content))]}
}
log.Debug("send", "from", t.i, "to", whom, "type", reflect.TypeOf(msg), "msg", msg)
p.Send(context.TODO(), msg)
}
}
// define the protocol
func (t *testNode) Protocols() []p2p.Protocol {
return []p2p.Protocol{{
Length: 100,
Run: t.run,
}}
}
func (t *testNode) APIs() []rpc.API {
return nil
}
func (t *testNode) Start(server *p2p.Server) error {
return nil
}
func (t *testNode) Stop() error {
return nil
}

View File

@ -0,0 +1,223 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package protocols
import (
"testing"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rlp"
)
//dummy Balance implementation
type dummyBalance struct {
amount int64
peer *Peer
}
//dummy Prices implementation
type dummyPrices struct{}
//a dummy message which needs size based accounting
//sender pays
type perBytesMsgSenderPays struct {
Content string
}
//a dummy message which needs size based accounting
//receiver pays
type perBytesMsgReceiverPays struct {
Content string
}
//a dummy message which is paid for per unit
//sender pays
type perUnitMsgSenderPays struct{}
//receiver pays
type perUnitMsgReceiverPays struct{}
//a dummy message which has zero as its price
type zeroPriceMsg struct{}
//a dummy message which has no accounting
type nilPriceMsg struct{}
//return the price for the defined messages
func (d *dummyPrices) Price(msg interface{}) *Price {
switch msg.(type) {
//size based message cost, receiver pays
case *perBytesMsgReceiverPays:
return &Price{
PerByte: true,
Value: uint64(100),
Payer: Receiver,
}
//size based message cost, sender pays
case *perBytesMsgSenderPays:
return &Price{
PerByte: true,
Value: uint64(100),
Payer: Sender,
}
//unitary cost, receiver pays
case *perUnitMsgReceiverPays:
return &Price{
PerByte: false,
Value: uint64(99),
Payer: Receiver,
}
//unitary cost, sender pays
case *perUnitMsgSenderPays:
return &Price{
PerByte: false,
Value: uint64(99),
Payer: Sender,
}
case *zeroPriceMsg:
return &Price{
PerByte: false,
Value: uint64(0),
Payer: Sender,
}
case *nilPriceMsg:
return nil
}
return nil
}
//dummy accounting implementation, only stores values for later check
func (d *dummyBalance) Add(amount int64, peer *Peer) error {
d.amount = amount
d.peer = peer
return nil
}
type testCase struct {
msg interface{}
size uint32
sendResult int64
recvResult int64
}
//lowest level unit test
func TestBalance(t *testing.T) {
//create instances
balance := &dummyBalance{}
prices := &dummyPrices{}
//create the spec
spec := createTestSpec()
//create the accounting hook for the spec
acc := NewAccounting(balance, prices)
//create a peer
id := adapters.RandomNodeConfig().ID
p := p2p.NewPeer(id, "testPeer", nil)
peer := NewPeer(p, &dummyRW{}, spec)
//price depends on size, receiver pays
msg := &perBytesMsgReceiverPays{Content: "testBalance"}
size, _ := rlp.EncodeToBytes(msg)
testCases := []testCase{
{
msg,
uint32(len(size)),
int64(len(size) * 100),
int64(len(size) * -100),
},
{
&perBytesMsgSenderPays{Content: "testBalance"},
uint32(len(size)),
int64(len(size) * -100),
int64(len(size) * 100),
},
{
&perUnitMsgSenderPays{},
0,
int64(-99),
int64(99),
},
{
&perUnitMsgReceiverPays{},
0,
int64(99),
int64(-99),
},
{
&zeroPriceMsg{},
0,
int64(0),
int64(0),
},
{
&nilPriceMsg{},
0,
int64(0),
int64(0),
},
}
checkAccountingTestCases(t, testCases, acc, peer, balance, true)
checkAccountingTestCases(t, testCases, acc, peer, balance, false)
}
func checkAccountingTestCases(t *testing.T, cases []testCase, acc *Accounting, peer *Peer, balance *dummyBalance, send bool) {
for _, c := range cases {
var err error
var expectedResult int64
//reset balance before every check
balance.amount = 0
if send {
err = acc.Send(peer, c.size, c.msg)
expectedResult = c.sendResult
} else {
err = acc.Receive(peer, c.size, c.msg)
expectedResult = c.recvResult
}
checkResults(t, err, balance, peer, expectedResult)
}
}
func checkResults(t *testing.T, err error, balance *dummyBalance, peer *Peer, result int64) {
if err != nil {
t.Fatal(err)
}
if balance.peer != peer {
t.Fatalf("expected Add to be called with peer %v, got %v", peer, balance.peer)
}
if balance.amount != result {
t.Fatalf("Expected balance to be %d but is %d", result, balance.amount)
}
}
//create a test spec
func createTestSpec() *Spec {
spec := &Spec{
Name: "test",
Version: 42,
MaxMsgSize: 10 * 1024,
Messages: []interface{}{
&perBytesMsgReceiverPays{},
&perBytesMsgSenderPays{},
&perUnitMsgReceiverPays{},
&perUnitMsgSenderPays{},
&zeroPriceMsg{},
&nilPriceMsg{},
},
}
return spec
}

View File

@ -0,0 +1,624 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package protocols
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
)
// handshake message type
type hs0 struct {
C uint
}
// message to kill/drop the peer with nodeID
type kill struct {
C enode.ID
}
// message to drop connection
type drop struct {
}
/// protoHandshake represents module-independent aspects of the protocol and is
// the first message peers send and receive as part the initial exchange
type protoHandshake struct {
Version uint // local and remote peer should have identical version
NetworkID string // local and remote peer should have identical network id
}
// checkProtoHandshake verifies local and remote protoHandshakes match
func checkProtoHandshake(testVersion uint, testNetworkID string) func(interface{}) error {
return func(rhs interface{}) error {
remote := rhs.(*protoHandshake)
if remote.NetworkID != testNetworkID {
return fmt.Errorf("%s (!= %s)", remote.NetworkID, testNetworkID)
}
if remote.Version != testVersion {
return fmt.Errorf("%d (!= %d)", remote.Version, testVersion)
}
return nil
}
}
// newProtocol sets up a protocol
// the run function here demonstrates a typical protocol using peerPool, handshake
// and messages registered to handlers
func newProtocol(pp *p2ptest.TestPeerPool) func(*p2p.Peer, p2p.MsgReadWriter) error {
spec := &Spec{
Name: "test",
Version: 42,
MaxMsgSize: 10 * 1024,
Messages: []interface{}{
protoHandshake{},
hs0{},
kill{},
drop{},
},
}
return func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := NewPeer(p, rw, spec)
// initiate one-off protohandshake and check validity
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
phs := &protoHandshake{42, "420"}
hsCheck := checkProtoHandshake(phs.Version, phs.NetworkID)
_, err := peer.Handshake(ctx, phs, hsCheck)
if err != nil {
return err
}
lhs := &hs0{42}
// module handshake demonstrating a simple repeatable exchange of same-type message
hs, err := peer.Handshake(ctx, lhs, nil)
if err != nil {
return err
}
if rmhs := hs.(*hs0); rmhs.C > lhs.C {
return fmt.Errorf("handshake mismatch remote %v > local %v", rmhs.C, lhs.C)
}
handle := func(ctx context.Context, msg interface{}) error {
switch msg := msg.(type) {
case *protoHandshake:
return errors.New("duplicate handshake")
case *hs0:
rhs := msg
if rhs.C > lhs.C {
return fmt.Errorf("handshake mismatch remote %v > local %v", rhs.C, lhs.C)
}
lhs.C += rhs.C
return peer.Send(ctx, lhs)
case *kill:
// demonstrates use of peerPool, killing another peer connection as a response to a message
id := msg.C
pp.Get(id).Drop()
return nil
case *drop:
// for testing we can trigger self induced disconnect upon receiving drop message
return errors.New("dropped")
default:
return fmt.Errorf("unknown message type: %T", msg)
}
}
pp.Add(peer)
defer pp.Remove(peer)
return peer.Run(handle)
}
}
func protocolTester(pp *p2ptest.TestPeerPool) *p2ptest.ProtocolTester {
prvkey, err := crypto.GenerateKey()
if err != nil {
panic(err)
}
return p2ptest.NewProtocolTester(prvkey, 2, newProtocol(pp))
}
func protoHandshakeExchange(id enode.ID, proto *protoHandshake) []p2ptest.Exchange {
return []p2ptest.Exchange{
{
Expects: []p2ptest.Expect{
{
Code: 0,
Msg: &protoHandshake{42, "420"},
Peer: id,
},
},
},
{
Triggers: []p2ptest.Trigger{
{
Code: 0,
Msg: proto,
Peer: id,
},
},
},
}
}
func runProtoHandshake(t *testing.T, proto *protoHandshake, errs ...error) {
t.Helper()
pp := p2ptest.NewTestPeerPool()
s := protocolTester(pp)
defer s.Stop()
// TODO: make this more than one handshake
node := s.Nodes[0]
if err := s.TestExchanges(protoHandshakeExchange(node.ID(), proto)...); err != nil {
t.Fatal(err)
}
var disconnects []*p2ptest.Disconnect
for i, err := range errs {
disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.Nodes[i].ID(), Error: err})
}
if err := s.TestDisconnected(disconnects...); err != nil {
t.Fatal(err)
}
}
type dummyHook struct {
peer *Peer
size uint32
msg interface{}
send bool
err error
waitC chan struct{}
mu sync.Mutex
}
type dummyMsg struct {
Content string
}
func (d *dummyHook) Send(peer *Peer, size uint32, msg interface{}) error {
d.mu.Lock()
defer d.mu.Unlock()
d.peer = peer
d.size = size
d.msg = msg
d.send = true
return d.err
}
func (d *dummyHook) Receive(peer *Peer, size uint32, msg interface{}) error {
d.mu.Lock()
defer d.mu.Unlock()
d.peer = peer
d.size = size
d.msg = msg
d.send = false
d.waitC <- struct{}{}
return d.err
}
func TestProtocolHook(t *testing.T) {
testHook := &dummyHook{
waitC: make(chan struct{}, 1),
}
spec := &Spec{
Name: "test",
Version: 42,
MaxMsgSize: 10 * 1024,
Messages: []interface{}{
dummyMsg{},
},
Hook: testHook,
}
runFunc := func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := NewPeer(p, rw, spec)
ctx := context.TODO()
err := peer.Send(ctx, &dummyMsg{
Content: "handshake"})
if err != nil {
t.Fatal(err)
}
handle := func(ctx context.Context, msg interface{}) error {
return nil
}
return peer.Run(handle)
}
prvkey, err := crypto.GenerateKey()
if err != nil {
panic(err)
}
tester := p2ptest.NewProtocolTester(prvkey, 2, runFunc)
defer tester.Stop()
err = tester.TestExchanges(p2ptest.Exchange{
Expects: []p2ptest.Expect{
{
Code: 0,
Msg: &dummyMsg{Content: "handshake"},
Peer: tester.Nodes[0].ID(),
},
},
})
if err != nil {
t.Fatal(err)
}
testHook.mu.Lock()
if testHook.msg == nil || testHook.msg.(*dummyMsg).Content != "handshake" {
t.Fatal("Expected msg to be set, but it is not")
}
if !testHook.send {
t.Fatal("Expected a send message, but it is not")
}
if testHook.peer == nil {
t.Fatal("Expected peer to be set, is nil")
}
if peerId := testHook.peer.ID(); peerId != tester.Nodes[0].ID() && peerId != tester.Nodes[1].ID() {
t.Fatalf("Expected peer ID to be set correctly, but it is not (got %v, exp %v or %v", peerId, tester.Nodes[0].ID(), tester.Nodes[1].ID())
}
if testHook.size != 11 { //11 is the length of the encoded message
t.Fatalf("Expected size to be %d, but it is %d ", 1, testHook.size)
}
testHook.mu.Unlock()
err = tester.TestExchanges(p2ptest.Exchange{
Triggers: []p2ptest.Trigger{
{
Code: 0,
Msg: &dummyMsg{Content: "response"},
Peer: tester.Nodes[1].ID(),
},
},
})
<-testHook.waitC
if err != nil {
t.Fatal(err)
}
testHook.mu.Lock()
if testHook.msg == nil || testHook.msg.(*dummyMsg).Content != "response" {
t.Fatal("Expected msg to be set, but it is not")
}
if testHook.send {
t.Fatal("Expected a send message, but it is not")
}
if testHook.peer == nil || testHook.peer.ID() != tester.Nodes[1].ID() {
t.Fatal("Expected peer ID to be set correctly, but it is not")
}
if testHook.size != 10 { //11 is the length of the encoded message
t.Fatalf("Expected size to be %d, but it is %d ", 1, testHook.size)
}
testHook.mu.Unlock()
testHook.err = fmt.Errorf("dummy error")
err = tester.TestExchanges(p2ptest.Exchange{
Triggers: []p2ptest.Trigger{
{
Code: 0,
Msg: &dummyMsg{Content: "response"},
Peer: tester.Nodes[1].ID(),
},
},
})
<-testHook.waitC
time.Sleep(100 * time.Millisecond)
err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: tester.Nodes[1].ID(), Error: testHook.err})
if err != nil {
t.Fatalf("Expected a specific disconnect error, but got different one: %v", err)
}
}
//We need to test that if the hook is not defined, then message infrastructure
//(send,receive) still works
func TestNoHook(t *testing.T) {
//create a test spec
spec := createTestSpec()
//a random node
id := adapters.RandomNodeConfig().ID
//a peer
p := p2p.NewPeer(id, "testPeer", nil)
rw := &dummyRW{}
peer := NewPeer(p, rw, spec)
ctx := context.TODO()
msg := &perBytesMsgSenderPays{Content: "testBalance"}
//send a message
if err := peer.Send(ctx, msg); err != nil {
t.Fatal(err)
}
//simulate receiving a message
rw.msg = msg
handler := func(ctx context.Context, msg interface{}) error {
return nil
}
if err := peer.handleIncoming(handler); err != nil {
t.Fatal(err)
}
}
func TestProtoHandshakeVersionMismatch(t *testing.T) {
runProtoHandshake(t, &protoHandshake{41, "420"}, errorf(ErrHandshake, errorf(ErrHandler, "(msg code 0): 41 (!= 42)").Error()))
}
func TestProtoHandshakeNetworkIDMismatch(t *testing.T) {
runProtoHandshake(t, &protoHandshake{42, "421"}, errorf(ErrHandshake, errorf(ErrHandler, "(msg code 0): 421 (!= 420)").Error()))
}
func TestProtoHandshakeSuccess(t *testing.T) {
runProtoHandshake(t, &protoHandshake{42, "420"})
}
func moduleHandshakeExchange(id enode.ID, resp uint) []p2ptest.Exchange {
return []p2ptest.Exchange{
{
Expects: []p2ptest.Expect{
{
Code: 1,
Msg: &hs0{42},
Peer: id,
},
},
},
{
Triggers: []p2ptest.Trigger{
{
Code: 1,
Msg: &hs0{resp},
Peer: id,
},
},
},
}
}
func runModuleHandshake(t *testing.T, resp uint, errs ...error) {
t.Helper()
pp := p2ptest.NewTestPeerPool()
s := protocolTester(pp)
defer s.Stop()
node := s.Nodes[0]
if err := s.TestExchanges(protoHandshakeExchange(node.ID(), &protoHandshake{42, "420"})...); err != nil {
t.Fatal(err)
}
if err := s.TestExchanges(moduleHandshakeExchange(node.ID(), resp)...); err != nil {
t.Fatal(err)
}
var disconnects []*p2ptest.Disconnect
for i, err := range errs {
disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.Nodes[i].ID(), Error: err})
}
if err := s.TestDisconnected(disconnects...); err != nil {
t.Fatal(err)
}
}
func TestModuleHandshakeError(t *testing.T) {
runModuleHandshake(t, 43, fmt.Errorf("handshake mismatch remote 43 > local 42"))
}
func TestModuleHandshakeSuccess(t *testing.T) {
runModuleHandshake(t, 42)
}
// testing complex interactions over multiple peers, relaying, dropping
func testMultiPeerSetup(a, b enode.ID) []p2ptest.Exchange {
return []p2ptest.Exchange{
{
Label: "primary handshake",
Expects: []p2ptest.Expect{
{
Code: 0,
Msg: &protoHandshake{42, "420"},
Peer: a,
},
{
Code: 0,
Msg: &protoHandshake{42, "420"},
Peer: b,
},
},
},
{
Label: "module handshake",
Triggers: []p2ptest.Trigger{
{
Code: 0,
Msg: &protoHandshake{42, "420"},
Peer: a,
},
{
Code: 0,
Msg: &protoHandshake{42, "420"},
Peer: b,
},
},
Expects: []p2ptest.Expect{
{
Code: 1,
Msg: &hs0{42},
Peer: a,
},
{
Code: 1,
Msg: &hs0{42},
Peer: b,
},
},
},
{Label: "alternative module handshake", Triggers: []p2ptest.Trigger{{Code: 1, Msg: &hs0{41}, Peer: a},
{Code: 1, Msg: &hs0{41}, Peer: b}}},
{Label: "repeated module handshake", Triggers: []p2ptest.Trigger{{Code: 1, Msg: &hs0{1}, Peer: a}}},
{Label: "receiving repeated module handshake", Expects: []p2ptest.Expect{{Code: 1, Msg: &hs0{43}, Peer: a}}}}
}
func runMultiplePeers(t *testing.T, peer int, errs ...error) {
t.Helper()
pp := p2ptest.NewTestPeerPool()
s := protocolTester(pp)
defer s.Stop()
if err := s.TestExchanges(testMultiPeerSetup(s.Nodes[0].ID(), s.Nodes[1].ID())...); err != nil {
t.Fatal(err)
}
// after some exchanges of messages, we can test state changes
// here this is simply demonstrated by the peerPool
// after the handshake negotiations peers must be added to the pool
// time.Sleep(1)
tick := time.NewTicker(10 * time.Millisecond)
timeout := time.NewTimer(1 * time.Second)
WAIT:
for {
select {
case <-tick.C:
if pp.Has(s.Nodes[0].ID()) {
break WAIT
}
case <-timeout.C:
t.Fatal("timeout")
}
}
if !pp.Has(s.Nodes[1].ID()) {
t.Fatalf("missing peer test-1: %v (%v)", pp, s.Nodes)
}
// peer 0 sends kill request for peer with index <peer>
err := s.TestExchanges(p2ptest.Exchange{
Triggers: []p2ptest.Trigger{
{
Code: 2,
Msg: &kill{s.Nodes[peer].ID()},
Peer: s.Nodes[0].ID(),
},
},
})
if err != nil {
t.Fatal(err)
}
// the peer not killed sends a drop request
err = s.TestExchanges(p2ptest.Exchange{
Triggers: []p2ptest.Trigger{
{
Code: 3,
Msg: &drop{},
Peer: s.Nodes[(peer+1)%2].ID(),
},
},
})
if err != nil {
t.Fatal(err)
}
// check the actual discconnect errors on the individual peers
var disconnects []*p2ptest.Disconnect
for i, err := range errs {
disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.Nodes[i].ID(), Error: err})
}
if err := s.TestDisconnected(disconnects...); err != nil {
t.Fatal(err)
}
// test if disconnected peers have been removed from peerPool
if pp.Has(s.Nodes[peer].ID()) {
t.Fatalf("peer test-%v not dropped: %v (%v)", peer, pp, s.Nodes)
}
}
func TestMultiplePeersDropSelf(t *testing.T) {
runMultiplePeers(t, 0,
fmt.Errorf("subprotocol error"),
fmt.Errorf("Message handler error: (msg code 3): dropped"),
)
}
func TestMultiplePeersDropOther(t *testing.T) {
runMultiplePeers(t, 1,
fmt.Errorf("Message handler error: (msg code 3): dropped"),
fmt.Errorf("subprotocol error"),
)
}
//dummy implementation of a MsgReadWriter
//this allows for quick and easy unit tests without
//having to build up the complete protocol
type dummyRW struct {
msg interface{}
size uint32
code uint64
}
func (d *dummyRW) WriteMsg(msg p2p.Msg) error {
return nil
}
func (d *dummyRW) ReadMsg() (p2p.Msg, error) {
enc := bytes.NewReader(d.getDummyMsg())
return p2p.Msg{
Code: d.code,
Size: d.size,
Payload: enc,
ReceivedAt: time.Now(),
}, nil
}
func (d *dummyRW) getDummyMsg() []byte {
r, _ := rlp.EncodeToBytes(d.msg)
var b bytes.Buffer
wmsg := WrappedMsg{
Context: b.Bytes(),
Size: uint32(len(r)),
Payload: r,
}
rr, _ := rlp.EncodeToBytes(wmsg)
d.size = uint32(len(rr))
return rr
}

View File

@ -0,0 +1,83 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package protocols
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"github.com/ethereum/go-ethereum/log"
)
//TestReporter tests that the metrics being collected for p2p accounting
//are being persisted and available after restart of a node.
//It simulates restarting by just recreating the DB as if the node had restarted.
func TestReporter(t *testing.T) {
//create a test directory
dir, err := ioutil.TempDir("", "reporter-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
//setup the metrics
log.Debug("Setting up metrics first time")
reportInterval := 2 * time.Millisecond
metrics := SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db"))
log.Debug("Done.")
//change metrics
mBalanceCredit.Inc(12)
mBytesCredit.Inc(34)
mMsgDebit.Inc(9)
//store expected metrics
expectedBalanceCredit := mBalanceCredit.Count()
expectedBytesCredit := mBytesCredit.Count()
expectedMsgDebit := mMsgDebit.Count()
//give the reporter time to write the metrics to DB
time.Sleep(20 * time.Millisecond)
//close the DB also, or we can't create a new one
metrics.Close()
//clear the metrics - this effectively simulates the node having shut down...
mBalanceCredit.Clear()
mBytesCredit.Clear()
mMsgDebit.Clear()
//setup the metrics again
log.Debug("Setting up metrics second time")
metrics = SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db"))
defer metrics.Close()
log.Debug("Done.")
//now check the metrics, they should have the same value as before "shutdown"
if mBalanceCredit.Count() != expectedBalanceCredit {
t.Fatalf("Expected counter to be %d, but is %d", expectedBalanceCredit, mBalanceCredit.Count())
}
if mBytesCredit.Count() != expectedBytesCredit {
t.Fatalf("Expected counter to be %d, but is %d", expectedBytesCredit, mBytesCredit.Count())
}
if mMsgDebit.Count() != expectedMsgDebit {
t.Fatalf("Expected counter to be %d, but is %d", expectedMsgDebit, mMsgDebit.Count())
}
}

View File

@ -28,10 +28,10 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/pss"
)

View File

@ -29,7 +29,7 @@
// "fmt"
// "os"
// pss "github.com/ethersphere/swarm/pss/client"
// "github.com/ethereum/go-ethereum/p2p/protocols"
// "github.com/ethersphere/swarm/p2p/protocols"
// "github.com/ethereum/go-ethereum/p2p"
// "github.com/ethersphere/swarm/pot"
// "github.com/ethersphere/swarm/log"

View File

@ -9,9 +9,9 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/ethersphere/swarm/network"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/pot"
)

View File

@ -24,8 +24,8 @@ import (
"time"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/p2p/protocols"
)
// Generic ping protocol implementation for

View File

@ -25,9 +25,9 @@ import (
"time"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/p2p/protocols"
)
const (

View File

@ -32,11 +32,11 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/rpc"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/pot"
"github.com/ethersphere/swarm/storage"
"golang.org/x/crypto/sha3"

View File

@ -41,12 +41,12 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rpc"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/ethersphere/swarm/network"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/pot"
"github.com/ethersphere/swarm/state"
)

View File

@ -23,8 +23,8 @@ import (
"sync"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/state"
)

View File

@ -27,8 +27,8 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/state"
colorable "github.com/mattn/go-colorable"
)

View File

@ -40,7 +40,6 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethersphere/swarm/api"
@ -51,6 +50,7 @@ import (
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
"github.com/ethersphere/swarm/network/stream"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/pss"
"github.com/ethersphere/swarm/state"
"github.com/ethersphere/swarm/storage"

6
vendor/vendor.json vendored
View File

@ -548,12 +548,6 @@
"revision": "7a22da98b9f81d206eb65d1fa4f5e773d888bac3",
"revisionTime": "2019-05-31T09:30:28Z"
},
{
"checksumSHA1": "T77w498ZNEY5giuaY6In8CFOSZw=",
"path": "github.com/ethereum/go-ethereum/p2p/protocols",
"revision": "7a22da98b9f81d206eb65d1fa4f5e773d888bac3",
"revisionTime": "2019-05-31T09:30:28Z"
},
{
"checksumSHA1": "F0b72D7R9g/viwLv0a+0aQmJ3ZI=",
"path": "github.com/ethereum/go-ethereum/p2p/simulations",