This commit is contained in:
obscuren
2015-01-05 17:10:42 +01:00
parent b0854fbff5
commit 6abf8ef78f
41 changed files with 2329 additions and 1203 deletions

View File

@ -7,7 +7,7 @@ import (
)
func TestClientIdentity(t *testing.T) {
clientIdentity := NewSimpleClientIdentity("Ethereum(G)", "0.5.16", "test", "pubkey")
clientIdentity := NewSimpleClientIdentity("Ethereum(G)", "0.5.16", "test", []byte("pubkey"))
clientString := clientIdentity.String()
expected := fmt.Sprintf("Ethereum(G)/v0.5.16/test/%s/%s", runtime.GOOS, runtime.Version())
if clientString != expected {

View File

@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"io/ioutil"
"math/big"
@ -49,7 +50,14 @@ func encodePayload(params ...interface{}) []byte {
// For the decoding rules, please see package rlp.
func (msg Msg) Decode(val interface{}) error {
s := rlp.NewListStream(msg.Payload, uint64(msg.Size))
return s.Decode(val)
if err := s.Decode(val); err != nil {
return newPeerError(errInvalidMsg, "(code %#x) (size %d) %v", msg.Code, msg.Size, err)
}
return nil
}
func (msg Msg) String() string {
return fmt.Sprintf("msg #%v (%v bytes)", msg.Code, msg.Size)
}
// Discard reads any remaining payload data into a black hole.

View File

@ -45,8 +45,8 @@ func (d peerAddr) String() string {
return fmt.Sprintf("%v:%d", d.IP, d.Port)
}
func (d peerAddr) RlpData() interface{} {
return []interface{}{d.IP, d.Port, d.Pubkey}
func (d *peerAddr) RlpData() interface{} {
return []interface{}{string(d.IP), d.Port, d.Pubkey}
}
// Peer represents a remote peer.
@ -426,7 +426,7 @@ func (rw *proto) WriteMsg(msg Msg) error {
}
func (rw *proto) EncodeMsg(code uint64, data ...interface{}) error {
return rw.WriteMsg(NewMsg(code, data))
return rw.WriteMsg(NewMsg(code, data...))
}
func (rw *proto) ReadMsg() (Msg, error) {
@ -460,3 +460,25 @@ func (r *eofSignal) Read(buf []byte) (int, error) {
}
return n, err
}
func (peer *Peer) PeerList() []interface{} {
peers := peer.otherPeers()
ds := make([]interface{}, 0, len(peers))
for _, p := range peers {
p.infolock.Lock()
addr := p.listenAddr
p.infolock.Unlock()
// filter out this peer and peers that are not listening or
// have not completed the handshake.
// TODO: track previously sent peers and exclude them as well.
if p == peer || addr == nil {
continue
}
ds = append(ds, addr)
}
ourAddr := peer.ourListenAddr
if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() {
ds = append(ds, ourAddr)
}
return ds
}

View File

@ -30,9 +30,8 @@ var discard = Protocol{
func testPeer(protos []Protocol) (net.Conn, *Peer, <-chan error) {
conn1, conn2 := net.Pipe()
id := NewSimpleClientIdentity("test", "0", "0", "public key")
peer := newPeer(conn1, protos, nil)
peer.ourID = id
peer.ourID = &peerId{}
peer.pubkeyHook = func(*peerAddr) error { return nil }
errc := make(chan error, 1)
go func() {
@ -130,7 +129,7 @@ func TestPeerProtoEncodeMsg(t *testing.T) {
if err := rw.EncodeMsg(2); err == nil {
t.Error("expected error for out-of-range msg code, got nil")
}
if err := rw.EncodeMsg(1); err != nil {
if err := rw.EncodeMsg(1, "foo", "bar"); err != nil {
t.Errorf("write error: %v", err)
}
return nil
@ -148,6 +147,13 @@ func TestPeerProtoEncodeMsg(t *testing.T) {
if msg.Code != 17 {
t.Errorf("incorrect message code: got %d, expected %d", msg.Code, 17)
}
var data []string
if err := msg.Decode(&data); err != nil {
t.Errorf("payload decode error: %v", err)
}
if !reflect.DeepEqual(data, []string{"foo", "bar"}) {
t.Errorf("payload RLP mismatch, got %#v, want %#v", data, []string{"foo", "bar"})
}
}
func TestPeerWrite(t *testing.T) {
@ -226,8 +232,8 @@ func TestPeerActivity(t *testing.T) {
}
func TestNewPeer(t *testing.T) {
id := NewSimpleClientIdentity("clientid", "version", "customid", "pubkey")
caps := []Cap{{"foo", 2}, {"bar", 3}}
id := &peerId{}
p := NewPeer(id, caps)
if !reflect.DeepEqual(p.Caps(), caps) {
t.Errorf("Caps mismatch: got %v, expected %v", p.Caps(), caps)

View File

@ -3,8 +3,6 @@ package p2p
import (
"bytes"
"time"
"github.com/ethereum/go-ethereum/ethutil"
)
// Protocol represents a P2P subprotocol implementation.
@ -89,20 +87,25 @@ type baseProtocol struct {
func runBaseProtocol(peer *Peer, rw MsgReadWriter) error {
bp := &baseProtocol{rw, peer}
if err := bp.doHandshake(rw); err != nil {
errc := make(chan error, 1)
go func() { errc <- rw.WriteMsg(bp.handshakeMsg()) }()
if err := bp.readHandshake(); err != nil {
return err
}
// handle write error
if err := <-errc; err != nil {
return err
}
// run main loop
quit := make(chan error, 1)
go func() {
for {
if err := bp.handle(rw); err != nil {
quit <- err
errc <- err
break
}
}
}()
return bp.loop(quit)
return bp.loop(errc)
}
var pingTimeout = 2 * time.Second
@ -166,7 +169,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error {
case pongMsg:
case getPeersMsg:
peers := bp.peerList()
peers := bp.peer.PeerList()
// this is dangerous. the spec says that we should _delay_
// sending the response if no new information is available.
// this means that would need to send a response later when
@ -174,7 +177,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error {
//
// TODO: add event mechanism to notify baseProtocol for new peers
if len(peers) > 0 {
return bp.rw.EncodeMsg(peersMsg, peers)
return bp.rw.EncodeMsg(peersMsg, peers...)
}
case peersMsg:
@ -193,14 +196,9 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error {
return nil
}
func (bp *baseProtocol) doHandshake(rw MsgReadWriter) error {
// send our handshake
if err := rw.WriteMsg(bp.handshakeMsg()); err != nil {
return err
}
func (bp *baseProtocol) readHandshake() error {
// read and handle remote handshake
msg, err := rw.ReadMsg()
msg, err := bp.rw.ReadMsg()
if err != nil {
return err
}
@ -210,12 +208,10 @@ func (bp *baseProtocol) doHandshake(rw MsgReadWriter) error {
if msg.Size > baseProtocolMaxMsgSize {
return newPeerError(errMisc, "message too big")
}
var hs handshake
if err := msg.Decode(&hs); err != nil {
return err
}
// validate handshake info
if hs.Version != baseProtocolVersion {
return newPeerError(errP2PVersionMismatch, "Require protocol %d, received %d\n",
@ -238,9 +234,7 @@ func (bp *baseProtocol) doHandshake(rw MsgReadWriter) error {
if err := bp.peer.pubkeyHook(pa); err != nil {
return newPeerError(errPubkeyForbidden, "%v", err)
}
// TODO: remove Caps with empty name
var addr *peerAddr
if hs.ListenPort != 0 {
addr = newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID)
@ -270,25 +264,3 @@ func (bp *baseProtocol) handshakeMsg() Msg {
bp.peer.ourID.Pubkey()[1:],
)
}
func (bp *baseProtocol) peerList() []ethutil.RlpEncodable {
peers := bp.peer.otherPeers()
ds := make([]ethutil.RlpEncodable, 0, len(peers))
for _, p := range peers {
p.infolock.Lock()
addr := p.listenAddr
p.infolock.Unlock()
// filter out this peer and peers that are not listening or
// have not completed the handshake.
// TODO: track previously sent peers and exclude them as well.
if p == bp.peer || addr == nil {
continue
}
ds = append(ds, addr)
}
ourAddr := bp.peer.ourListenAddr
if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() {
ds = append(ds, ourAddr)
}
return ds
}

View File

@ -2,12 +2,89 @@ package p2p
import (
"fmt"
"net"
"reflect"
"testing"
"github.com/ethereum/go-ethereum/crypto"
)
type peerId struct {
pubkey []byte
}
func (self *peerId) String() string {
return fmt.Sprintf("test peer %x", self.Pubkey()[:4])
}
func (self *peerId) Pubkey() (pubkey []byte) {
pubkey = self.pubkey
if len(pubkey) == 0 {
pubkey = crypto.GenerateNewKeyPair().PublicKey
self.pubkey = pubkey
}
return
}
func newTestPeer() (peer *Peer) {
peer = NewPeer(&peerId{}, []Cap{})
peer.pubkeyHook = func(*peerAddr) error { return nil }
peer.ourID = &peerId{}
peer.listenAddr = &peerAddr{}
peer.otherPeers = func() []*Peer { return nil }
return
}
func TestBaseProtocolPeers(t *testing.T) {
cannedPeerList := []*peerAddr{
{IP: net.ParseIP("1.2.3.4"), Port: 2222, Pubkey: []byte{}},
{IP: net.ParseIP("5.6.7.8"), Port: 3333, Pubkey: []byte{}},
}
var ownAddr *peerAddr = &peerAddr{IP: net.ParseIP("1.3.5.7"), Port: 1111, Pubkey: []byte{}}
rw1, rw2 := MsgPipe()
// run matcher, close pipe when addresses have arrived
addrChan := make(chan *peerAddr, len(cannedPeerList))
go func() {
for _, want := range cannedPeerList {
got := <-addrChan
t.Logf("got peer: %+v", got)
if !reflect.DeepEqual(want, got) {
t.Errorf("mismatch: got %#v, want %#v", got, want)
}
}
close(addrChan)
var own []*peerAddr
var got *peerAddr
for got = range addrChan {
own = append(own, got)
}
if len(own) != 1 || !reflect.DeepEqual(ownAddr, own[0]) {
t.Errorf("mismatch: peers own address is incorrectly or not given, got %v, want %#v", ownAddr)
}
rw2.Close()
}()
// run first peer
peer1 := newTestPeer()
peer1.ourListenAddr = ownAddr
peer1.otherPeers = func() []*Peer {
pl := make([]*Peer, len(cannedPeerList))
for i, addr := range cannedPeerList {
pl[i] = &Peer{listenAddr: addr}
}
return pl
}
go runBaseProtocol(peer1, rw1)
// run second peer
peer2 := newTestPeer()
peer2.newPeerAddr = addrChan // feed peer suggestions into matcher
if err := runBaseProtocol(peer2, rw2); err != ErrPipeClosed {
t.Errorf("peer2 terminated with unexpected error: %v", err)
}
}
func TestBaseProtocolDisconnect(t *testing.T) {
peer := NewPeer(NewSimpleClientIdentity("p1", "", "", "foo"), nil)
peer.ourID = NewSimpleClientIdentity("p2", "", "", "bar")
peer := NewPeer(&peerId{}, nil)
peer.ourID = &peerId{}
peer.pubkeyHook = func(*peerAddr) error { return nil }
rw1, rw2 := MsgPipe()
@ -32,6 +109,7 @@ func TestBaseProtocolDisconnect(t *testing.T) {
if err := rw2.EncodeMsg(discMsg, DiscQuitting); err != nil {
t.Error(err)
}
close(done)
}()

View File

@ -113,9 +113,11 @@ func (srv *Server) PeerCount() int {
// SuggestPeer injects an address into the outbound address pool.
func (srv *Server) SuggestPeer(ip net.IP, port int, nodeID []byte) {
addr := &peerAddr{ip, uint64(port), nodeID}
select {
case srv.peerConnect <- &peerAddr{ip, uint64(port), nodeID}:
case srv.peerConnect <- addr:
default: // don't block
srvlog.Warnf("peer suggestion %v ignored", addr)
}
}
@ -258,6 +260,7 @@ func (srv *Server) listenLoop() {
for {
select {
case slot := <-srv.peerSlots:
srvlog.Debugf("grabbed slot %v for listening", slot)
conn, err := srv.listener.Accept()
if err != nil {
srv.peerSlots <- slot
@ -330,6 +333,7 @@ func (srv *Server) dialLoop() {
case desc := <-suggest:
// candidate peer found, will dial out asyncronously
// if connection fails slot will be released
srvlog.Infof("dial %v (%v)", desc, *slot)
go srv.dialPeer(desc, *slot)
// we can watch if more peers needed in the next loop
slots = srv.peerSlots

View File

@ -11,7 +11,7 @@ import (
func startTestServer(t *testing.T, pf peerFunc) *Server {
server := &Server{
Identity: NewSimpleClientIdentity("clientIdentifier", "version", "customIdentifier", "pubkey"),
Identity: &peerId{},
MaxPeers: 10,
ListenAddr: "127.0.0.1:0",
newPeerFunc: pf,