p2p/discover: implement v5.1 wire protocol (#21647)
This change implements the Discovery v5.1 wire protocol and also adds an interactive test suite for this protocol.
This commit is contained in:
@ -31,6 +31,7 @@ import (
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/mclock"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p/discover/v5wire"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/ethereum/go-ethereum/p2p/netutil"
|
||||
@ -38,36 +39,24 @@ import (
|
||||
|
||||
const (
|
||||
lookupRequestLimit = 3 // max requests against a single node during lookup
|
||||
findnodeResultLimit = 15 // applies in FINDNODE handler
|
||||
findnodeResultLimit = 16 // applies in FINDNODE handler
|
||||
totalNodesResponseLimit = 5 // applies in waitForNodes
|
||||
nodesResponseItemLimit = 3 // applies in sendNodes
|
||||
|
||||
respTimeoutV5 = 700 * time.Millisecond
|
||||
)
|
||||
|
||||
// codecV5 is implemented by wireCodec (and testCodec).
|
||||
// codecV5 is implemented by v5wire.Codec (and testCodec).
|
||||
//
|
||||
// The UDPv5 transport is split into two objects: the codec object deals with
|
||||
// encoding/decoding and with the handshake; the UDPv5 object handles higher-level concerns.
|
||||
type codecV5 interface {
|
||||
// encode encodes a packet. The 'challenge' parameter is non-nil for calls which got a
|
||||
// WHOAREYOU response.
|
||||
encode(fromID enode.ID, fromAddr string, p packetV5, challenge *whoareyouV5) (enc []byte, authTag []byte, err error)
|
||||
// Encode encodes a packet.
|
||||
Encode(enode.ID, string, v5wire.Packet, *v5wire.Whoareyou) ([]byte, v5wire.Nonce, error)
|
||||
|
||||
// decode decodes a packet. It returns an *unknownV5 packet if decryption fails.
|
||||
// The fromNode return value is non-nil when the input contains a handshake response.
|
||||
decode(input []byte, fromAddr string) (fromID enode.ID, fromNode *enode.Node, p packetV5, err error)
|
||||
}
|
||||
|
||||
// packetV5 is implemented by all discv5 packet type structs.
|
||||
type packetV5 interface {
|
||||
// These methods provide information and set the request ID.
|
||||
name() string
|
||||
kind() byte
|
||||
setreqid([]byte)
|
||||
// handle should perform the appropriate action to handle the packet, i.e. this is the
|
||||
// place to send the response.
|
||||
handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr)
|
||||
// decode decodes a packet. It returns a *v5wire.Unknown packet if decryption fails.
|
||||
// The *enode.Node return value is non-nil when the input contains a handshake response.
|
||||
Decode([]byte, string) (enode.ID, *enode.Node, v5wire.Packet, error)
|
||||
}
|
||||
|
||||
// UDPv5 is the implementation of protocol version 5.
|
||||
@ -83,6 +72,10 @@ type UDPv5 struct {
|
||||
clock mclock.Clock
|
||||
validSchemes enr.IdentityScheme
|
||||
|
||||
// talkreq handler registry
|
||||
trlock sync.Mutex
|
||||
trhandlers map[string]func([]byte) []byte
|
||||
|
||||
// channels into dispatch
|
||||
packetInCh chan ReadPacket
|
||||
readNextCh chan struct{}
|
||||
@ -93,7 +86,7 @@ type UDPv5 struct {
|
||||
// state of dispatch
|
||||
codec codecV5
|
||||
activeCallByNode map[enode.ID]*callV5
|
||||
activeCallByAuth map[string]*callV5
|
||||
activeCallByAuth map[v5wire.Nonce]*callV5
|
||||
callQueue map[enode.ID][]*callV5
|
||||
|
||||
// shutdown stuff
|
||||
@ -106,16 +99,16 @@ type UDPv5 struct {
|
||||
// callV5 represents a remote procedure call against another node.
|
||||
type callV5 struct {
|
||||
node *enode.Node
|
||||
packet packetV5
|
||||
packet v5wire.Packet
|
||||
responseType byte // expected packet type of response
|
||||
reqid []byte
|
||||
ch chan packetV5 // responses sent here
|
||||
err chan error // errors sent here
|
||||
ch chan v5wire.Packet // responses sent here
|
||||
err chan error // errors sent here
|
||||
|
||||
// Valid for active calls only:
|
||||
authTag []byte // authTag of request packet
|
||||
handshakeCount int // # times we attempted handshake for this call
|
||||
challenge *whoareyouV5 // last sent handshake challenge
|
||||
nonce v5wire.Nonce // nonce of request packet
|
||||
handshakeCount int // # times we attempted handshake for this call
|
||||
challenge *v5wire.Whoareyou // last sent handshake challenge
|
||||
timeout mclock.Timer
|
||||
}
|
||||
|
||||
@ -152,6 +145,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
|
||||
log: cfg.Log,
|
||||
validSchemes: cfg.ValidSchemes,
|
||||
clock: cfg.Clock,
|
||||
trhandlers: make(map[string]func([]byte) []byte),
|
||||
// channels into dispatch
|
||||
packetInCh: make(chan ReadPacket, 1),
|
||||
readNextCh: make(chan struct{}, 1),
|
||||
@ -159,9 +153,9 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
|
||||
callDoneCh: make(chan *callV5),
|
||||
respTimeoutCh: make(chan *callTimeout),
|
||||
// state of dispatch
|
||||
codec: newWireCodec(ln, cfg.PrivateKey, cfg.Clock),
|
||||
codec: v5wire.NewCodec(ln, cfg.PrivateKey, cfg.Clock),
|
||||
activeCallByNode: make(map[enode.ID]*callV5),
|
||||
activeCallByAuth: make(map[string]*callV5),
|
||||
activeCallByAuth: make(map[v5wire.Nonce]*callV5),
|
||||
callQueue: make(map[enode.ID][]*callV5),
|
||||
// shutdown
|
||||
closeCtx: closeCtx,
|
||||
@ -236,6 +230,29 @@ func (t *UDPv5) LocalNode() *enode.LocalNode {
|
||||
return t.localNode
|
||||
}
|
||||
|
||||
// RegisterTalkHandler adds a handler for 'talk requests'. The handler function is called
|
||||
// whenever a request for the given protocol is received and should return the response
|
||||
// data or nil.
|
||||
func (t *UDPv5) RegisterTalkHandler(protocol string, handler func([]byte) []byte) {
|
||||
t.trlock.Lock()
|
||||
defer t.trlock.Unlock()
|
||||
t.trhandlers[protocol] = handler
|
||||
}
|
||||
|
||||
// TalkRequest sends a talk request to n and waits for a response.
|
||||
func (t *UDPv5) TalkRequest(n *enode.Node, protocol string, request []byte) ([]byte, error) {
|
||||
req := &v5wire.TalkRequest{Protocol: protocol, Message: request}
|
||||
resp := t.call(n, v5wire.TalkResponseMsg, req)
|
||||
defer t.callDone(resp)
|
||||
select {
|
||||
case respMsg := <-resp.ch:
|
||||
return respMsg.(*v5wire.TalkResponse).Message, nil
|
||||
case err := <-resp.err:
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// RandomNodes returns an iterator that finds random nodes in the DHT.
|
||||
func (t *UDPv5) RandomNodes() enode.Iterator {
|
||||
if t.tab.len() == 0 {
|
||||
// All nodes were dropped, refresh. The very first query will hit this
|
||||
@ -283,16 +300,14 @@ func (t *UDPv5) lookupWorker(destNode *node, target enode.ID) ([]*node, error) {
|
||||
nodes = nodesByDistance{target: target}
|
||||
err error
|
||||
)
|
||||
for i := 0; i < lookupRequestLimit && len(nodes.entries) < findnodeResultLimit; i++ {
|
||||
var r []*enode.Node
|
||||
r, err = t.findnode(unwrapNode(destNode), dists[i])
|
||||
if err == errClosed {
|
||||
return nil, err
|
||||
}
|
||||
for _, n := range r {
|
||||
if n.ID() != t.Self().ID() {
|
||||
nodes.push(wrapNode(n), findnodeResultLimit)
|
||||
}
|
||||
var r []*enode.Node
|
||||
r, err = t.findnode(unwrapNode(destNode), dists)
|
||||
if err == errClosed {
|
||||
return nil, err
|
||||
}
|
||||
for _, n := range r {
|
||||
if n.ID() != t.Self().ID() {
|
||||
nodes.push(wrapNode(n), findnodeResultLimit)
|
||||
}
|
||||
}
|
||||
return nodes.entries, err
|
||||
@ -301,15 +316,15 @@ func (t *UDPv5) lookupWorker(destNode *node, target enode.ID) ([]*node, error) {
|
||||
// lookupDistances computes the distance parameter for FINDNODE calls to dest.
|
||||
// It chooses distances adjacent to logdist(target, dest), e.g. for a target
|
||||
// with logdist(target, dest) = 255 the result is [255, 256, 254].
|
||||
func lookupDistances(target, dest enode.ID) (dists []int) {
|
||||
func lookupDistances(target, dest enode.ID) (dists []uint) {
|
||||
td := enode.LogDist(target, dest)
|
||||
dists = append(dists, td)
|
||||
dists = append(dists, uint(td))
|
||||
for i := 1; len(dists) < lookupRequestLimit; i++ {
|
||||
if td+i < 256 {
|
||||
dists = append(dists, td+i)
|
||||
dists = append(dists, uint(td+i))
|
||||
}
|
||||
if td-i > 0 {
|
||||
dists = append(dists, td-i)
|
||||
dists = append(dists, uint(td-i))
|
||||
}
|
||||
}
|
||||
return dists
|
||||
@ -317,11 +332,13 @@ func lookupDistances(target, dest enode.ID) (dists []int) {
|
||||
|
||||
// ping calls PING on a node and waits for a PONG response.
|
||||
func (t *UDPv5) ping(n *enode.Node) (uint64, error) {
|
||||
resp := t.call(n, p_pongV5, &pingV5{ENRSeq: t.localNode.Node().Seq()})
|
||||
req := &v5wire.Ping{ENRSeq: t.localNode.Node().Seq()}
|
||||
resp := t.call(n, v5wire.PongMsg, req)
|
||||
defer t.callDone(resp)
|
||||
|
||||
select {
|
||||
case pong := <-resp.ch:
|
||||
return pong.(*pongV5).ENRSeq, nil
|
||||
return pong.(*v5wire.Pong).ENRSeq, nil
|
||||
case err := <-resp.err:
|
||||
return 0, err
|
||||
}
|
||||
@ -329,7 +346,7 @@ func (t *UDPv5) ping(n *enode.Node) (uint64, error) {
|
||||
|
||||
// requestENR requests n's record.
|
||||
func (t *UDPv5) RequestENR(n *enode.Node) (*enode.Node, error) {
|
||||
nodes, err := t.findnode(n, 0)
|
||||
nodes, err := t.findnode(n, []uint{0})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -339,26 +356,14 @@ func (t *UDPv5) RequestENR(n *enode.Node) (*enode.Node, error) {
|
||||
return nodes[0], nil
|
||||
}
|
||||
|
||||
// requestTicket calls REQUESTTICKET on a node and waits for a TICKET response.
|
||||
func (t *UDPv5) requestTicket(n *enode.Node) ([]byte, error) {
|
||||
resp := t.call(n, p_ticketV5, &pingV5{})
|
||||
defer t.callDone(resp)
|
||||
select {
|
||||
case response := <-resp.ch:
|
||||
return response.(*ticketV5).Ticket, nil
|
||||
case err := <-resp.err:
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// findnode calls FINDNODE on a node and waits for responses.
|
||||
func (t *UDPv5) findnode(n *enode.Node, distance int) ([]*enode.Node, error) {
|
||||
resp := t.call(n, p_nodesV5, &findnodeV5{Distance: uint(distance)})
|
||||
return t.waitForNodes(resp, distance)
|
||||
func (t *UDPv5) findnode(n *enode.Node, distances []uint) ([]*enode.Node, error) {
|
||||
resp := t.call(n, v5wire.NodesMsg, &v5wire.Findnode{Distances: distances})
|
||||
return t.waitForNodes(resp, distances)
|
||||
}
|
||||
|
||||
// waitForNodes waits for NODES responses to the given call.
|
||||
func (t *UDPv5) waitForNodes(c *callV5, distance int) ([]*enode.Node, error) {
|
||||
func (t *UDPv5) waitForNodes(c *callV5, distances []uint) ([]*enode.Node, error) {
|
||||
defer t.callDone(c)
|
||||
|
||||
var (
|
||||
@ -369,11 +374,11 @@ func (t *UDPv5) waitForNodes(c *callV5, distance int) ([]*enode.Node, error) {
|
||||
for {
|
||||
select {
|
||||
case responseP := <-c.ch:
|
||||
response := responseP.(*nodesV5)
|
||||
response := responseP.(*v5wire.Nodes)
|
||||
for _, record := range response.Nodes {
|
||||
node, err := t.verifyResponseNode(c, record, distance, seen)
|
||||
node, err := t.verifyResponseNode(c, record, distances, seen)
|
||||
if err != nil {
|
||||
t.log.Debug("Invalid record in "+response.name(), "id", c.node.ID(), "err", err)
|
||||
t.log.Debug("Invalid record in "+response.Name(), "id", c.node.ID(), "err", err)
|
||||
continue
|
||||
}
|
||||
nodes = append(nodes, node)
|
||||
@ -391,7 +396,7 @@ func (t *UDPv5) waitForNodes(c *callV5, distance int) ([]*enode.Node, error) {
|
||||
}
|
||||
|
||||
// verifyResponseNode checks validity of a record in a NODES response.
|
||||
func (t *UDPv5) verifyResponseNode(c *callV5, r *enr.Record, distance int, seen map[enode.ID]struct{}) (*enode.Node, error) {
|
||||
func (t *UDPv5) verifyResponseNode(c *callV5, r *enr.Record, distances []uint, seen map[enode.ID]struct{}) (*enode.Node, error) {
|
||||
node, err := enode.New(t.validSchemes, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -402,9 +407,10 @@ func (t *UDPv5) verifyResponseNode(c *callV5, r *enr.Record, distance int, seen
|
||||
if c.node.UDP() <= 1024 {
|
||||
return nil, errLowPort
|
||||
}
|
||||
if distance != -1 {
|
||||
if d := enode.LogDist(c.node.ID(), node.ID()); d != distance {
|
||||
return nil, fmt.Errorf("wrong distance %d", d)
|
||||
if distances != nil {
|
||||
nd := enode.LogDist(c.node.ID(), node.ID())
|
||||
if !containsUint(uint(nd), distances) {
|
||||
return nil, errors.New("does not match any requested distance")
|
||||
}
|
||||
}
|
||||
if _, ok := seen[node.ID()]; ok {
|
||||
@ -414,20 +420,29 @@ func (t *UDPv5) verifyResponseNode(c *callV5, r *enr.Record, distance int, seen
|
||||
return node, nil
|
||||
}
|
||||
|
||||
// call sends the given call and sets up a handler for response packets (of type c.responseType).
|
||||
// Responses are dispatched to the call's response channel.
|
||||
func (t *UDPv5) call(node *enode.Node, responseType byte, packet packetV5) *callV5 {
|
||||
func containsUint(x uint, xs []uint) bool {
|
||||
for _, v := range xs {
|
||||
if x == v {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// call sends the given call and sets up a handler for response packets (of message type
|
||||
// responseType). Responses are dispatched to the call's response channel.
|
||||
func (t *UDPv5) call(node *enode.Node, responseType byte, packet v5wire.Packet) *callV5 {
|
||||
c := &callV5{
|
||||
node: node,
|
||||
packet: packet,
|
||||
responseType: responseType,
|
||||
reqid: make([]byte, 8),
|
||||
ch: make(chan packetV5, 1),
|
||||
ch: make(chan v5wire.Packet, 1),
|
||||
err: make(chan error, 1),
|
||||
}
|
||||
// Assign request ID.
|
||||
crand.Read(c.reqid)
|
||||
packet.setreqid(c.reqid)
|
||||
packet.SetRequestID(c.reqid)
|
||||
// Send call to dispatch.
|
||||
select {
|
||||
case t.callCh <- c:
|
||||
@ -482,7 +497,7 @@ func (t *UDPv5) dispatch() {
|
||||
panic("BUG: callDone for inactive call")
|
||||
}
|
||||
c.timeout.Stop()
|
||||
delete(t.activeCallByAuth, string(c.authTag))
|
||||
delete(t.activeCallByAuth, c.nonce)
|
||||
delete(t.activeCallByNode, id)
|
||||
t.sendNextCall(id)
|
||||
|
||||
@ -502,7 +517,7 @@ func (t *UDPv5) dispatch() {
|
||||
for id, c := range t.activeCallByNode {
|
||||
c.err <- errClosed
|
||||
delete(t.activeCallByNode, id)
|
||||
delete(t.activeCallByAuth, string(c.authTag))
|
||||
delete(t.activeCallByAuth, c.nonce)
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -548,38 +563,37 @@ func (t *UDPv5) sendNextCall(id enode.ID) {
|
||||
// sendCall encodes and sends a request packet to the call's recipient node.
|
||||
// This performs a handshake if needed.
|
||||
func (t *UDPv5) sendCall(c *callV5) {
|
||||
if len(c.authTag) > 0 {
|
||||
// The call already has an authTag from a previous handshake attempt. Remove the
|
||||
// entry for the authTag because we're about to generate a new authTag for this
|
||||
// call.
|
||||
delete(t.activeCallByAuth, string(c.authTag))
|
||||
// The call might have a nonce from a previous handshake attempt. Remove the entry for
|
||||
// the old nonce because we're about to generate a new nonce for this call.
|
||||
if c.nonce != (v5wire.Nonce{}) {
|
||||
delete(t.activeCallByAuth, c.nonce)
|
||||
}
|
||||
|
||||
addr := &net.UDPAddr{IP: c.node.IP(), Port: c.node.UDP()}
|
||||
newTag, _ := t.send(c.node.ID(), addr, c.packet, c.challenge)
|
||||
c.authTag = newTag
|
||||
t.activeCallByAuth[string(c.authTag)] = c
|
||||
newNonce, _ := t.send(c.node.ID(), addr, c.packet, c.challenge)
|
||||
c.nonce = newNonce
|
||||
t.activeCallByAuth[newNonce] = c
|
||||
t.startResponseTimeout(c)
|
||||
}
|
||||
|
||||
// sendResponse sends a response packet to the given node.
|
||||
// This doesn't trigger a handshake even if no keys are available.
|
||||
func (t *UDPv5) sendResponse(toID enode.ID, toAddr *net.UDPAddr, packet packetV5) error {
|
||||
func (t *UDPv5) sendResponse(toID enode.ID, toAddr *net.UDPAddr, packet v5wire.Packet) error {
|
||||
_, err := t.send(toID, toAddr, packet, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
// send sends a packet to the given node.
|
||||
func (t *UDPv5) send(toID enode.ID, toAddr *net.UDPAddr, packet packetV5, c *whoareyouV5) ([]byte, error) {
|
||||
func (t *UDPv5) send(toID enode.ID, toAddr *net.UDPAddr, packet v5wire.Packet, c *v5wire.Whoareyou) (v5wire.Nonce, error) {
|
||||
addr := toAddr.String()
|
||||
enc, authTag, err := t.codec.encode(toID, addr, packet, c)
|
||||
enc, nonce, err := t.codec.Encode(toID, addr, packet, c)
|
||||
if err != nil {
|
||||
t.log.Warn(">> "+packet.name(), "id", toID, "addr", addr, "err", err)
|
||||
return authTag, err
|
||||
t.log.Warn(">> "+packet.Name(), "id", toID, "addr", addr, "err", err)
|
||||
return nonce, err
|
||||
}
|
||||
_, err = t.conn.WriteToUDP(enc, toAddr)
|
||||
t.log.Trace(">> "+packet.name(), "id", toID, "addr", addr)
|
||||
return authTag, err
|
||||
t.log.Trace(">> "+packet.Name(), "id", toID, "addr", addr)
|
||||
return nonce, err
|
||||
}
|
||||
|
||||
// readLoop runs in its own goroutine and reads packets from the network.
|
||||
@ -617,7 +631,7 @@ func (t *UDPv5) dispatchReadPacket(from *net.UDPAddr, content []byte) bool {
|
||||
// handlePacket decodes and processes an incoming packet from the network.
|
||||
func (t *UDPv5) handlePacket(rawpacket []byte, fromAddr *net.UDPAddr) error {
|
||||
addr := fromAddr.String()
|
||||
fromID, fromNode, packet, err := t.codec.decode(rawpacket, addr)
|
||||
fromID, fromNode, packet, err := t.codec.Decode(rawpacket, addr)
|
||||
if err != nil {
|
||||
t.log.Debug("Bad discv5 packet", "id", fromID, "addr", addr, "err", err)
|
||||
return err
|
||||
@ -626,31 +640,32 @@ func (t *UDPv5) handlePacket(rawpacket []byte, fromAddr *net.UDPAddr) error {
|
||||
// Handshake succeeded, add to table.
|
||||
t.tab.addSeenNode(wrapNode(fromNode))
|
||||
}
|
||||
if packet.kind() != p_whoareyouV5 {
|
||||
// WHOAREYOU logged separately to report the sender ID.
|
||||
t.log.Trace("<< "+packet.name(), "id", fromID, "addr", addr)
|
||||
if packet.Kind() != v5wire.WhoareyouPacket {
|
||||
// WHOAREYOU logged separately to report errors.
|
||||
t.log.Trace("<< "+packet.Name(), "id", fromID, "addr", addr)
|
||||
}
|
||||
packet.handle(t, fromID, fromAddr)
|
||||
t.handle(packet, fromID, fromAddr)
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleCallResponse dispatches a response packet to the call waiting for it.
|
||||
func (t *UDPv5) handleCallResponse(fromID enode.ID, fromAddr *net.UDPAddr, reqid []byte, p packetV5) {
|
||||
func (t *UDPv5) handleCallResponse(fromID enode.ID, fromAddr *net.UDPAddr, p v5wire.Packet) bool {
|
||||
ac := t.activeCallByNode[fromID]
|
||||
if ac == nil || !bytes.Equal(reqid, ac.reqid) {
|
||||
t.log.Debug(fmt.Sprintf("Unsolicited/late %s response", p.name()), "id", fromID, "addr", fromAddr)
|
||||
return
|
||||
if ac == nil || !bytes.Equal(p.RequestID(), ac.reqid) {
|
||||
t.log.Debug(fmt.Sprintf("Unsolicited/late %s response", p.Name()), "id", fromID, "addr", fromAddr)
|
||||
return false
|
||||
}
|
||||
if !fromAddr.IP.Equal(ac.node.IP()) || fromAddr.Port != ac.node.UDP() {
|
||||
t.log.Debug(fmt.Sprintf("%s from wrong endpoint", p.name()), "id", fromID, "addr", fromAddr)
|
||||
return
|
||||
t.log.Debug(fmt.Sprintf("%s from wrong endpoint", p.Name()), "id", fromID, "addr", fromAddr)
|
||||
return false
|
||||
}
|
||||
if p.kind() != ac.responseType {
|
||||
t.log.Debug(fmt.Sprintf("Wrong disv5 response type %s", p.name()), "id", fromID, "addr", fromAddr)
|
||||
return
|
||||
if p.Kind() != ac.responseType {
|
||||
t.log.Debug(fmt.Sprintf("Wrong discv5 response type %s", p.Name()), "id", fromID, "addr", fromAddr)
|
||||
return false
|
||||
}
|
||||
t.startResponseTimeout(ac)
|
||||
ac.ch <- p
|
||||
return true
|
||||
}
|
||||
|
||||
// getNode looks for a node record in table and database.
|
||||
@ -664,50 +679,65 @@ func (t *UDPv5) getNode(id enode.ID) *enode.Node {
|
||||
return nil
|
||||
}
|
||||
|
||||
// UNKNOWN
|
||||
// handle processes incoming packets according to their message type.
|
||||
func (t *UDPv5) handle(p v5wire.Packet, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
switch p := p.(type) {
|
||||
case *v5wire.Unknown:
|
||||
t.handleUnknown(p, fromID, fromAddr)
|
||||
case *v5wire.Whoareyou:
|
||||
t.handleWhoareyou(p, fromID, fromAddr)
|
||||
case *v5wire.Ping:
|
||||
t.handlePing(p, fromID, fromAddr)
|
||||
case *v5wire.Pong:
|
||||
if t.handleCallResponse(fromID, fromAddr, p) {
|
||||
t.localNode.UDPEndpointStatement(fromAddr, &net.UDPAddr{IP: p.ToIP, Port: int(p.ToPort)})
|
||||
}
|
||||
case *v5wire.Findnode:
|
||||
t.handleFindnode(p, fromID, fromAddr)
|
||||
case *v5wire.Nodes:
|
||||
t.handleCallResponse(fromID, fromAddr, p)
|
||||
case *v5wire.TalkRequest:
|
||||
t.handleTalkRequest(p, fromID, fromAddr)
|
||||
case *v5wire.TalkResponse:
|
||||
t.handleCallResponse(fromID, fromAddr, p)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *unknownV5) name() string { return "UNKNOWN/v5" }
|
||||
func (p *unknownV5) kind() byte { return p_unknownV5 }
|
||||
func (p *unknownV5) setreqid(id []byte) {}
|
||||
|
||||
func (p *unknownV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
challenge := &whoareyouV5{AuthTag: p.AuthTag}
|
||||
// handleUnknown initiates a handshake by responding with WHOAREYOU.
|
||||
func (t *UDPv5) handleUnknown(p *v5wire.Unknown, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
challenge := &v5wire.Whoareyou{Nonce: p.Nonce}
|
||||
crand.Read(challenge.IDNonce[:])
|
||||
if n := t.getNode(fromID); n != nil {
|
||||
challenge.node = n
|
||||
challenge.Node = n
|
||||
challenge.RecordSeq = n.Seq()
|
||||
}
|
||||
t.sendResponse(fromID, fromAddr, challenge)
|
||||
}
|
||||
|
||||
// WHOAREYOU
|
||||
|
||||
func (p *whoareyouV5) name() string { return "WHOAREYOU/v5" }
|
||||
func (p *whoareyouV5) kind() byte { return p_whoareyouV5 }
|
||||
func (p *whoareyouV5) setreqid(id []byte) {}
|
||||
|
||||
func (p *whoareyouV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
c, err := p.matchWithCall(t, p.AuthTag)
|
||||
if err != nil {
|
||||
t.log.Debug("Invalid WHOAREYOU/v5", "addr", fromAddr, "err", err)
|
||||
return
|
||||
}
|
||||
// Resend the call that was answered by WHOAREYOU.
|
||||
t.log.Trace("<< "+p.name(), "id", c.node.ID(), "addr", fromAddr)
|
||||
c.handshakeCount++
|
||||
c.challenge = p
|
||||
p.node = c.node
|
||||
t.sendCall(c)
|
||||
}
|
||||
|
||||
var (
|
||||
errChallengeNoCall = errors.New("no matching call")
|
||||
errChallengeTwice = errors.New("second handshake")
|
||||
)
|
||||
|
||||
// matchWithCall checks whether the handshake attempt matches the active call.
|
||||
func (p *whoareyouV5) matchWithCall(t *UDPv5, authTag []byte) (*callV5, error) {
|
||||
c := t.activeCallByAuth[string(authTag)]
|
||||
// handleWhoareyou resends the active call as a handshake packet.
|
||||
func (t *UDPv5) handleWhoareyou(p *v5wire.Whoareyou, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
c, err := t.matchWithCall(fromID, p.Nonce)
|
||||
if err != nil {
|
||||
t.log.Debug("Invalid "+p.Name(), "addr", fromAddr, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Resend the call that was answered by WHOAREYOU.
|
||||
t.log.Trace("<< "+p.Name(), "id", c.node.ID(), "addr", fromAddr)
|
||||
c.handshakeCount++
|
||||
c.challenge = p
|
||||
p.Node = c.node
|
||||
t.sendCall(c)
|
||||
}
|
||||
|
||||
// matchWithCall checks whether a handshake attempt matches the active call.
|
||||
func (t *UDPv5) matchWithCall(fromID enode.ID, nonce v5wire.Nonce) (*callV5, error) {
|
||||
c := t.activeCallByAuth[nonce]
|
||||
if c == nil {
|
||||
return nil, errChallengeNoCall
|
||||
}
|
||||
@ -717,14 +747,9 @@ func (p *whoareyouV5) matchWithCall(t *UDPv5, authTag []byte) (*callV5, error) {
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// PING
|
||||
|
||||
func (p *pingV5) name() string { return "PING/v5" }
|
||||
func (p *pingV5) kind() byte { return p_pingV5 }
|
||||
func (p *pingV5) setreqid(id []byte) { p.ReqID = id }
|
||||
|
||||
func (p *pingV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
t.sendResponse(fromID, fromAddr, &pongV5{
|
||||
// handlePing sends a PONG response.
|
||||
func (t *UDPv5) handlePing(p *v5wire.Ping, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
t.sendResponse(fromID, fromAddr, &v5wire.Pong{
|
||||
ReqID: p.ReqID,
|
||||
ToIP: fromAddr.IP,
|
||||
ToPort: uint16(fromAddr.Port),
|
||||
@ -732,121 +757,81 @@ func (p *pingV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
})
|
||||
}
|
||||
|
||||
// PONG
|
||||
|
||||
func (p *pongV5) name() string { return "PONG/v5" }
|
||||
func (p *pongV5) kind() byte { return p_pongV5 }
|
||||
func (p *pongV5) setreqid(id []byte) { p.ReqID = id }
|
||||
|
||||
func (p *pongV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
t.localNode.UDPEndpointStatement(fromAddr, &net.UDPAddr{IP: p.ToIP, Port: int(p.ToPort)})
|
||||
t.handleCallResponse(fromID, fromAddr, p.ReqID, p)
|
||||
// handleFindnode returns nodes to the requester.
|
||||
func (t *UDPv5) handleFindnode(p *v5wire.Findnode, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
nodes := t.collectTableNodes(fromAddr.IP, p.Distances, findnodeResultLimit)
|
||||
for _, resp := range packNodes(p.ReqID, nodes) {
|
||||
t.sendResponse(fromID, fromAddr, resp)
|
||||
}
|
||||
}
|
||||
|
||||
// FINDNODE
|
||||
|
||||
func (p *findnodeV5) name() string { return "FINDNODE/v5" }
|
||||
func (p *findnodeV5) kind() byte { return p_findnodeV5 }
|
||||
func (p *findnodeV5) setreqid(id []byte) { p.ReqID = id }
|
||||
|
||||
func (p *findnodeV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
if p.Distance == 0 {
|
||||
t.sendNodes(fromID, fromAddr, p.ReqID, []*enode.Node{t.Self()})
|
||||
return
|
||||
}
|
||||
if p.Distance > 256 {
|
||||
p.Distance = 256
|
||||
}
|
||||
// Get bucket entries.
|
||||
t.tab.mutex.Lock()
|
||||
nodes := unwrapNodes(t.tab.bucketAtDistance(int(p.Distance)).entries)
|
||||
t.tab.mutex.Unlock()
|
||||
if len(nodes) > findnodeResultLimit {
|
||||
nodes = nodes[:findnodeResultLimit]
|
||||
}
|
||||
t.sendNodes(fromID, fromAddr, p.ReqID, nodes)
|
||||
}
|
||||
|
||||
// sendNodes sends the given records in one or more NODES packets.
|
||||
func (t *UDPv5) sendNodes(toID enode.ID, toAddr *net.UDPAddr, reqid []byte, nodes []*enode.Node) {
|
||||
// TODO livenessChecks > 1
|
||||
// TODO CheckRelayIP
|
||||
total := uint8(math.Ceil(float64(len(nodes)) / 3))
|
||||
resp := &nodesV5{ReqID: reqid, Total: total, Nodes: make([]*enr.Record, 3)}
|
||||
sent := false
|
||||
for len(nodes) > 0 {
|
||||
items := min(nodesResponseItemLimit, len(nodes))
|
||||
resp.Nodes = resp.Nodes[:items]
|
||||
for i := 0; i < items; i++ {
|
||||
resp.Nodes[i] = nodes[i].Record()
|
||||
// collectTableNodes creates a FINDNODE result set for the given distances.
|
||||
func (t *UDPv5) collectTableNodes(rip net.IP, distances []uint, limit int) []*enode.Node {
|
||||
var nodes []*enode.Node
|
||||
var processed = make(map[uint]struct{})
|
||||
for _, dist := range distances {
|
||||
// Reject duplicate / invalid distances.
|
||||
_, seen := processed[dist]
|
||||
if seen || dist > 256 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Get the nodes.
|
||||
var bn []*enode.Node
|
||||
if dist == 0 {
|
||||
bn = []*enode.Node{t.Self()}
|
||||
} else if dist <= 256 {
|
||||
t.tab.mutex.Lock()
|
||||
bn = unwrapNodes(t.tab.bucketAtDistance(int(dist)).entries)
|
||||
t.tab.mutex.Unlock()
|
||||
}
|
||||
processed[dist] = struct{}{}
|
||||
|
||||
// Apply some pre-checks to avoid sending invalid nodes.
|
||||
for _, n := range bn {
|
||||
// TODO livenessChecks > 1
|
||||
if netutil.CheckRelayIP(rip, n.IP()) != nil {
|
||||
continue
|
||||
}
|
||||
nodes = append(nodes, n)
|
||||
if len(nodes) >= limit {
|
||||
return nodes
|
||||
}
|
||||
}
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
// packNodes creates NODES response packets for the given node list.
|
||||
func packNodes(reqid []byte, nodes []*enode.Node) []*v5wire.Nodes {
|
||||
if len(nodes) == 0 {
|
||||
return []*v5wire.Nodes{{ReqID: reqid, Total: 1}}
|
||||
}
|
||||
|
||||
total := uint8(math.Ceil(float64(len(nodes)) / 3))
|
||||
var resp []*v5wire.Nodes
|
||||
for len(nodes) > 0 {
|
||||
p := &v5wire.Nodes{ReqID: reqid, Total: total}
|
||||
items := min(nodesResponseItemLimit, len(nodes))
|
||||
for i := 0; i < items; i++ {
|
||||
p.Nodes = append(p.Nodes, nodes[i].Record())
|
||||
}
|
||||
t.sendResponse(toID, toAddr, resp)
|
||||
nodes = nodes[items:]
|
||||
sent = true
|
||||
resp = append(resp, p)
|
||||
}
|
||||
// Ensure at least one response is sent.
|
||||
if !sent {
|
||||
resp.Total = 1
|
||||
resp.Nodes = nil
|
||||
t.sendResponse(toID, toAddr, resp)
|
||||
return resp
|
||||
}
|
||||
|
||||
// handleTalkRequest runs the talk request handler of the requested protocol.
|
||||
func (t *UDPv5) handleTalkRequest(p *v5wire.TalkRequest, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
t.trlock.Lock()
|
||||
handler := t.trhandlers[p.Protocol]
|
||||
t.trlock.Unlock()
|
||||
|
||||
var response []byte
|
||||
if handler != nil {
|
||||
response = handler(p.Message)
|
||||
}
|
||||
}
|
||||
|
||||
// NODES
|
||||
|
||||
func (p *nodesV5) name() string { return "NODES/v5" }
|
||||
func (p *nodesV5) kind() byte { return p_nodesV5 }
|
||||
func (p *nodesV5) setreqid(id []byte) { p.ReqID = id }
|
||||
|
||||
func (p *nodesV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
t.handleCallResponse(fromID, fromAddr, p.ReqID, p)
|
||||
}
|
||||
|
||||
// REQUESTTICKET
|
||||
|
||||
func (p *requestTicketV5) name() string { return "REQUESTTICKET/v5" }
|
||||
func (p *requestTicketV5) kind() byte { return p_requestTicketV5 }
|
||||
func (p *requestTicketV5) setreqid(id []byte) { p.ReqID = id }
|
||||
|
||||
func (p *requestTicketV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
t.sendResponse(fromID, fromAddr, &ticketV5{ReqID: p.ReqID})
|
||||
}
|
||||
|
||||
// TICKET
|
||||
|
||||
func (p *ticketV5) name() string { return "TICKET/v5" }
|
||||
func (p *ticketV5) kind() byte { return p_ticketV5 }
|
||||
func (p *ticketV5) setreqid(id []byte) { p.ReqID = id }
|
||||
|
||||
func (p *ticketV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
t.handleCallResponse(fromID, fromAddr, p.ReqID, p)
|
||||
}
|
||||
|
||||
// REGTOPIC
|
||||
|
||||
func (p *regtopicV5) name() string { return "REGTOPIC/v5" }
|
||||
func (p *regtopicV5) kind() byte { return p_regtopicV5 }
|
||||
func (p *regtopicV5) setreqid(id []byte) { p.ReqID = id }
|
||||
|
||||
func (p *regtopicV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
t.sendResponse(fromID, fromAddr, ®confirmationV5{ReqID: p.ReqID, Registered: false})
|
||||
}
|
||||
|
||||
// REGCONFIRMATION
|
||||
|
||||
func (p *regconfirmationV5) name() string { return "REGCONFIRMATION/v5" }
|
||||
func (p *regconfirmationV5) kind() byte { return p_regconfirmationV5 }
|
||||
func (p *regconfirmationV5) setreqid(id []byte) { p.ReqID = id }
|
||||
|
||||
func (p *regconfirmationV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
t.handleCallResponse(fromID, fromAddr, p.ReqID, p)
|
||||
}
|
||||
|
||||
// TOPICQUERY
|
||||
|
||||
func (p *topicqueryV5) name() string { return "TOPICQUERY/v5" }
|
||||
func (p *topicqueryV5) kind() byte { return p_topicqueryV5 }
|
||||
func (p *topicqueryV5) setreqid(id []byte) { p.ReqID = id }
|
||||
|
||||
func (p *topicqueryV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
|
||||
resp := &v5wire.TalkResponse{ReqID: p.ReqID, Message: response}
|
||||
t.sendResponse(fromID, fromAddr, resp)
|
||||
}
|
||||
|
Reference in New Issue
Block a user