296 lines
		
	
	
		
			6.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			296 lines
		
	
	
		
			6.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package p2p
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/ethereum/go-ethereum/ethutil"
 | 
						|
)
 | 
						|
 | 
						|
// Protocol represents a P2P subprotocol implementation.
 | 
						|
type Protocol struct {
 | 
						|
	// Name should contain the official protocol name,
 | 
						|
	// often a three-letter word.
 | 
						|
	Name string
 | 
						|
 | 
						|
	// Version should contain the version number of the protocol.
 | 
						|
	Version uint
 | 
						|
 | 
						|
	// Length should contain the number of message codes used
 | 
						|
	// by the protocol.
 | 
						|
	Length uint64
 | 
						|
 | 
						|
	// Run is called in a new groutine when the protocol has been
 | 
						|
	// negotiated with a peer. It should read and write messages from
 | 
						|
	// rw. The Payload for each message must be fully consumed.
 | 
						|
	//
 | 
						|
	// The peer connection is closed when Start returns. It should return
 | 
						|
	// any protocol-level error (such as an I/O error) that is
 | 
						|
	// encountered.
 | 
						|
	Run func(peer *Peer, rw MsgReadWriter) error
 | 
						|
}
 | 
						|
 | 
						|
func (p Protocol) cap() Cap {
 | 
						|
	return Cap{p.Name, p.Version}
 | 
						|
}
 | 
						|
 | 
						|
const (
 | 
						|
	baseProtocolVersion    = 2
 | 
						|
	baseProtocolLength     = uint64(16)
 | 
						|
	baseProtocolMaxMsgSize = 10 * 1024 * 1024
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	// devp2p message codes
 | 
						|
	handshakeMsg = 0x00
 | 
						|
	discMsg      = 0x01
 | 
						|
	pingMsg      = 0x02
 | 
						|
	pongMsg      = 0x03
 | 
						|
	getPeersMsg  = 0x04
 | 
						|
	peersMsg     = 0x05
 | 
						|
)
 | 
						|
 | 
						|
// handshake is the structure of a handshake list.
 | 
						|
type handshake struct {
 | 
						|
	Version    uint64
 | 
						|
	ID         string
 | 
						|
	Caps       []Cap
 | 
						|
	ListenPort uint64
 | 
						|
	NodeID     []byte
 | 
						|
}
 | 
						|
 | 
						|
func (h *handshake) String() string {
 | 
						|
	return h.ID
 | 
						|
}
 | 
						|
func (h *handshake) Pubkey() []byte {
 | 
						|
	return h.NodeID
 | 
						|
}
 | 
						|
 | 
						|
// Cap is the structure of a peer capability.
 | 
						|
type Cap struct {
 | 
						|
	Name    string
 | 
						|
	Version uint
 | 
						|
}
 | 
						|
 | 
						|
func (cap Cap) RlpData() interface{} {
 | 
						|
	return []interface{}{cap.Name, cap.Version}
 | 
						|
}
 | 
						|
 | 
						|
type capsByName []Cap
 | 
						|
 | 
						|
func (cs capsByName) Len() int           { return len(cs) }
 | 
						|
func (cs capsByName) Less(i, j int) bool { return cs[i].Name < cs[j].Name }
 | 
						|
func (cs capsByName) Swap(i, j int)      { cs[i], cs[j] = cs[j], cs[i] }
 | 
						|
 | 
						|
type baseProtocol struct {
 | 
						|
	rw   MsgReadWriter
 | 
						|
	peer *Peer
 | 
						|
}
 | 
						|
 | 
						|
func runBaseProtocol(peer *Peer, rw MsgReadWriter) error {
 | 
						|
	bp := &baseProtocol{rw, peer}
 | 
						|
	if err := bp.doHandshake(rw); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	// run main loop
 | 
						|
	quit := make(chan error, 1)
 | 
						|
	go func() {
 | 
						|
		for {
 | 
						|
			if err := bp.handle(rw); err != nil {
 | 
						|
				quit <- err
 | 
						|
				break
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	return bp.loop(quit)
 | 
						|
}
 | 
						|
 | 
						|
var pingTimeout = 2 * time.Second
 | 
						|
 | 
						|
func (bp *baseProtocol) loop(quit <-chan error) error {
 | 
						|
	ping := time.NewTimer(pingTimeout)
 | 
						|
	activity := bp.peer.activity.Subscribe(time.Time{})
 | 
						|
	lastActive := time.Time{}
 | 
						|
	defer ping.Stop()
 | 
						|
	defer activity.Unsubscribe()
 | 
						|
 | 
						|
	getPeersTick := time.NewTicker(10 * time.Second)
 | 
						|
	defer getPeersTick.Stop()
 | 
						|
	err := bp.rw.EncodeMsg(getPeersMsg)
 | 
						|
 | 
						|
	for err == nil {
 | 
						|
		select {
 | 
						|
		case err = <-quit:
 | 
						|
			return err
 | 
						|
		case <-getPeersTick.C:
 | 
						|
			err = bp.rw.EncodeMsg(getPeersMsg)
 | 
						|
		case event := <-activity.Chan():
 | 
						|
			ping.Reset(pingTimeout)
 | 
						|
			lastActive = event.(time.Time)
 | 
						|
		case t := <-ping.C:
 | 
						|
			if lastActive.Add(pingTimeout * 2).Before(t) {
 | 
						|
				err = newPeerError(errPingTimeout, "")
 | 
						|
			} else if lastActive.Add(pingTimeout).Before(t) {
 | 
						|
				err = bp.rw.EncodeMsg(pingMsg)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (bp *baseProtocol) handle(rw MsgReadWriter) error {
 | 
						|
	msg, err := rw.ReadMsg()
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if msg.Size > baseProtocolMaxMsgSize {
 | 
						|
		return newPeerError(errMisc, "message too big")
 | 
						|
	}
 | 
						|
	// make sure that the payload has been fully consumed
 | 
						|
	defer msg.Discard()
 | 
						|
 | 
						|
	switch msg.Code {
 | 
						|
	case handshakeMsg:
 | 
						|
		return newPeerError(errProtocolBreach, "extra handshake received")
 | 
						|
 | 
						|
	case discMsg:
 | 
						|
		var reason DiscReason
 | 
						|
		if err := msg.Decode(&reason); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		bp.peer.Disconnect(reason)
 | 
						|
		return nil
 | 
						|
 | 
						|
	case pingMsg:
 | 
						|
		return bp.rw.EncodeMsg(pongMsg)
 | 
						|
 | 
						|
	case pongMsg:
 | 
						|
 | 
						|
	case getPeersMsg:
 | 
						|
		peers := bp.peerList()
 | 
						|
		// this is dangerous. the spec says that we should _delay_
 | 
						|
		// sending the response if no new information is available.
 | 
						|
		// this means that would need to send a response later when
 | 
						|
		// new peers become available.
 | 
						|
		//
 | 
						|
		// TODO: add event mechanism to notify baseProtocol for new peers
 | 
						|
		if len(peers) > 0 {
 | 
						|
			return bp.rw.EncodeMsg(peersMsg, peers)
 | 
						|
		}
 | 
						|
 | 
						|
	case peersMsg:
 | 
						|
		var peers []*peerAddr
 | 
						|
		if err := msg.Decode(&peers); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		for _, addr := range peers {
 | 
						|
			bp.peer.Debugf("received peer suggestion: %v", addr)
 | 
						|
			bp.peer.newPeerAddr <- addr
 | 
						|
		}
 | 
						|
 | 
						|
	default:
 | 
						|
		return newPeerError(errInvalidMsgCode, "unknown message code %v", msg.Code)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (bp *baseProtocol) doHandshake(rw MsgReadWriter) error {
 | 
						|
	// send our handshake
 | 
						|
	if err := rw.WriteMsg(bp.handshakeMsg()); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	// read and handle remote handshake
 | 
						|
	msg, err := rw.ReadMsg()
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if msg.Code != handshakeMsg {
 | 
						|
		return newPeerError(errProtocolBreach, "first message must be handshake, got %x", msg.Code)
 | 
						|
	}
 | 
						|
	if msg.Size > baseProtocolMaxMsgSize {
 | 
						|
		return newPeerError(errMisc, "message too big")
 | 
						|
	}
 | 
						|
 | 
						|
	var hs handshake
 | 
						|
	if err := msg.Decode(&hs); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	// validate handshake info
 | 
						|
	if hs.Version != baseProtocolVersion {
 | 
						|
		return newPeerError(errP2PVersionMismatch, "Require protocol %d, received %d\n",
 | 
						|
			baseProtocolVersion, hs.Version)
 | 
						|
	}
 | 
						|
	if len(hs.NodeID) == 0 {
 | 
						|
		return newPeerError(errPubkeyMissing, "")
 | 
						|
	}
 | 
						|
	if len(hs.NodeID) != 64 {
 | 
						|
		return newPeerError(errPubkeyInvalid, "require 512 bit, got %v", len(hs.NodeID)*8)
 | 
						|
	}
 | 
						|
	if da := bp.peer.dialAddr; da != nil {
 | 
						|
		// verify that the peer we wanted to connect to
 | 
						|
		// actually holds the target public key.
 | 
						|
		if da.Pubkey != nil && !bytes.Equal(da.Pubkey, hs.NodeID) {
 | 
						|
			return newPeerError(errPubkeyForbidden, "dial address pubkey mismatch")
 | 
						|
		}
 | 
						|
	}
 | 
						|
	pa := newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID)
 | 
						|
	if err := bp.peer.pubkeyHook(pa); err != nil {
 | 
						|
		return newPeerError(errPubkeyForbidden, "%v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	// TODO: remove Caps with empty name
 | 
						|
 | 
						|
	var addr *peerAddr
 | 
						|
	if hs.ListenPort != 0 {
 | 
						|
		addr = newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID)
 | 
						|
		addr.Port = hs.ListenPort
 | 
						|
	}
 | 
						|
	bp.peer.setHandshakeInfo(&hs, addr, hs.Caps)
 | 
						|
	bp.peer.startSubprotocols(hs.Caps)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (bp *baseProtocol) handshakeMsg() Msg {
 | 
						|
	var (
 | 
						|
		port uint64
 | 
						|
		caps []interface{}
 | 
						|
	)
 | 
						|
	if bp.peer.ourListenAddr != nil {
 | 
						|
		port = bp.peer.ourListenAddr.Port
 | 
						|
	}
 | 
						|
	for _, proto := range bp.peer.protocols {
 | 
						|
		caps = append(caps, proto.cap())
 | 
						|
	}
 | 
						|
	return NewMsg(handshakeMsg,
 | 
						|
		baseProtocolVersion,
 | 
						|
		bp.peer.ourID.String(),
 | 
						|
		caps,
 | 
						|
		port,
 | 
						|
		bp.peer.ourID.Pubkey()[1:],
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
func (bp *baseProtocol) peerList() []ethutil.RlpEncodable {
 | 
						|
	peers := bp.peer.otherPeers()
 | 
						|
	ds := make([]ethutil.RlpEncodable, 0, len(peers))
 | 
						|
	for _, p := range peers {
 | 
						|
		p.infolock.Lock()
 | 
						|
		addr := p.listenAddr
 | 
						|
		p.infolock.Unlock()
 | 
						|
		// filter out this peer and peers that are not listening or
 | 
						|
		// have not completed the handshake.
 | 
						|
		// TODO: track previously sent peers and exclude them as well.
 | 
						|
		if p == bp.peer || addr == nil {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		ds = append(ds, addr)
 | 
						|
	}
 | 
						|
	ourAddr := bp.peer.ourListenAddr
 | 
						|
	if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() {
 | 
						|
		ds = append(ds, ourAddr)
 | 
						|
	}
 | 
						|
	return ds
 | 
						|
}
 |