Kademlia refactor (#17641)
* swarm/network: simplify kademlia/hive; rid interfaces * swarm, swarm/network/stream, swarm/netork/simulations,, swarm/pss: adapt to new Kad API * swarm/network: minor changes re review; add missing lock to NeighbourhoodDepthC
This commit is contained in:
committed by
Balint Gabor
parent
b06ff563a1
commit
bfce00385f
@@ -110,10 +110,10 @@ func (params *PssParams) WithPrivateKey(privatekey *ecdsa.PrivateKey) *PssParams
|
||||
//
|
||||
// Implements node.Service
|
||||
type Pss struct {
|
||||
network.Overlay // we can get the overlayaddress from this
|
||||
privateKey *ecdsa.PrivateKey // pss can have it's own independent key
|
||||
w *whisper.Whisper // key and encryption backend
|
||||
auxAPIs []rpc.API // builtins (handshake, test) can add APIs
|
||||
*network.Kademlia // we can get the Kademlia address from this
|
||||
privateKey *ecdsa.PrivateKey // pss can have it's own independent key
|
||||
w *whisper.Whisper // key and encryption backend
|
||||
auxAPIs []rpc.API // builtins (handshake, test) can add APIs
|
||||
|
||||
// sending and forwarding
|
||||
fwdPool map[string]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer
|
||||
@@ -151,9 +151,9 @@ func (p *Pss) String() string {
|
||||
|
||||
// Creates a new Pss instance.
|
||||
//
|
||||
// In addition to params, it takes a swarm network overlay
|
||||
// In addition to params, it takes a swarm network Kademlia
|
||||
// and a FileStore storage for message cache storage.
|
||||
func NewPss(k network.Overlay, params *PssParams) (*Pss, error) {
|
||||
func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) {
|
||||
if params.privateKey == nil {
|
||||
return nil, errors.New("missing private key for pss")
|
||||
}
|
||||
@@ -162,7 +162,7 @@ func NewPss(k network.Overlay, params *PssParams) (*Pss, error) {
|
||||
Version: pssVersion,
|
||||
}
|
||||
ps := &Pss{
|
||||
Overlay: k,
|
||||
Kademlia: k,
|
||||
privateKey: params.privateKey,
|
||||
w: whisper.New(&whisper.DefaultConfig),
|
||||
quitC: make(chan struct{}),
|
||||
@@ -290,9 +290,9 @@ func (p *Pss) addAPI(api rpc.API) {
|
||||
p.auxAPIs = append(p.auxAPIs, api)
|
||||
}
|
||||
|
||||
// Returns the swarm overlay address of the pss node
|
||||
// Returns the swarm Kademlia address of the pss node
|
||||
func (p *Pss) BaseAddr() []byte {
|
||||
return p.Overlay.BaseAddr()
|
||||
return p.Kademlia.BaseAddr()
|
||||
}
|
||||
|
||||
// Returns the pss node's public key
|
||||
@@ -356,11 +356,11 @@ func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error {
|
||||
}
|
||||
if int64(pssmsg.Expire) < time.Now().Unix() {
|
||||
metrics.GetOrRegisterCounter("pss.expire", nil).Inc(1)
|
||||
log.Warn("pss filtered expired message", "from", common.ToHex(p.Overlay.BaseAddr()), "to", common.ToHex(pssmsg.To))
|
||||
log.Warn("pss filtered expired message", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", common.ToHex(pssmsg.To))
|
||||
return nil
|
||||
}
|
||||
if p.checkFwdCache(pssmsg) {
|
||||
log.Trace("pss relay block-cache match (process)", "from", common.ToHex(p.Overlay.BaseAddr()), "to", (common.ToHex(pssmsg.To)))
|
||||
log.Trace("pss relay block-cache match (process)", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", (common.ToHex(pssmsg.To)))
|
||||
return nil
|
||||
}
|
||||
p.addFwdCache(pssmsg)
|
||||
@@ -442,12 +442,12 @@ func (p *Pss) executeHandlers(topic Topic, payload []byte, from *PssAddress, asy
|
||||
|
||||
// will return false if using partial address
|
||||
func (p *Pss) isSelfRecipient(msg *PssMsg) bool {
|
||||
return bytes.Equal(msg.To, p.Overlay.BaseAddr())
|
||||
return bytes.Equal(msg.To, p.Kademlia.BaseAddr())
|
||||
}
|
||||
|
||||
// test match of leftmost bytes in given message to node's overlay address
|
||||
// test match of leftmost bytes in given message to node's Kademlia address
|
||||
func (p *Pss) isSelfPossibleRecipient(msg *PssMsg) bool {
|
||||
local := p.Overlay.BaseAddr()
|
||||
local := p.Kademlia.BaseAddr()
|
||||
return bytes.Equal(msg.To[:], local[:len(msg.To)])
|
||||
}
|
||||
|
||||
@@ -816,14 +816,7 @@ func (p *Pss) forward(msg *PssMsg) error {
|
||||
// send with kademlia
|
||||
// find the closest peer to the recipient and attempt to send
|
||||
sent := 0
|
||||
p.Overlay.EachConn(to, 256, func(op network.OverlayConn, po int, isproxbin bool) bool {
|
||||
// we need p2p.protocols.Peer.Send
|
||||
// cast and resolve
|
||||
sp, ok := op.(senderPeer)
|
||||
if !ok {
|
||||
log.Crit("Pss cannot use kademlia peer type")
|
||||
return false
|
||||
}
|
||||
p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool {
|
||||
info := sp.Info()
|
||||
|
||||
// check if the peer is running pss
|
||||
@@ -840,7 +833,7 @@ func (p *Pss) forward(msg *PssMsg) error {
|
||||
}
|
||||
|
||||
// get the protocol peer from the forwarding peer cache
|
||||
sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), op.Address())
|
||||
sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address())
|
||||
p.fwdPoolMu.RLock()
|
||||
pp := p.fwdPool[sp.Info().ID]
|
||||
p.fwdPoolMu.RUnlock()
|
||||
@@ -859,11 +852,11 @@ func (p *Pss) forward(msg *PssMsg) error {
|
||||
// - if the peer is end recipient but the full address has not been disclosed
|
||||
// - if the peer address matches the partial address fully
|
||||
// - if the peer is in proxbin
|
||||
if len(msg.To) < addressLength && bytes.Equal(msg.To, op.Address()[:len(msg.To)]) {
|
||||
if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) {
|
||||
log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match"))
|
||||
return true
|
||||
} else if isproxbin {
|
||||
log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(op.Address())))
|
||||
log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address())))
|
||||
return true
|
||||
}
|
||||
// at this point we stop forwarding, and the state is as follows:
|
||||
|
@@ -556,23 +556,6 @@ OUTER:
|
||||
}
|
||||
}
|
||||
|
||||
type pssTestPeer struct {
|
||||
*protocols.Peer
|
||||
addr []byte
|
||||
}
|
||||
|
||||
func (t *pssTestPeer) Address() []byte {
|
||||
return t.addr
|
||||
}
|
||||
|
||||
func (t *pssTestPeer) Update(addr network.OverlayAddr) network.OverlayAddr {
|
||||
return addr
|
||||
}
|
||||
|
||||
func (t *pssTestPeer) Off() network.OverlayAddr {
|
||||
return &pssTestPeer{}
|
||||
}
|
||||
|
||||
// forwarding should skip peers that do not have matching pss capabilities
|
||||
func TestMismatch(t *testing.T) {
|
||||
|
||||
@@ -582,7 +565,7 @@ func TestMismatch(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// initialize overlay
|
||||
// initialize kad
|
||||
baseaddr := network.RandomAddr()
|
||||
kad := network.NewKademlia((baseaddr).Over(), network.NewKadParams())
|
||||
rw := &p2p.MsgPipeRW{}
|
||||
@@ -594,10 +577,10 @@ func TestMismatch(t *testing.T) {
|
||||
Version: 0,
|
||||
}
|
||||
nid, _ := discover.HexID("0x01")
|
||||
wrongpsspeer := &pssTestPeer{
|
||||
Peer: protocols.NewPeer(p2p.NewPeer(nid, common.ToHex(wrongpssaddr.Over()), []p2p.Cap{wrongpsscap}), rw, nil),
|
||||
addr: wrongpssaddr.Over(),
|
||||
}
|
||||
wrongpsspeer := network.NewPeer(&network.BzzPeer{
|
||||
Peer: protocols.NewPeer(p2p.NewPeer(nid, common.ToHex(wrongpssaddr.Over()), []p2p.Cap{wrongpsscap}), rw, nil),
|
||||
BzzAddr: &network.BzzAddr{OAddr: wrongpssaddr.Over(), UAddr: nil},
|
||||
}, kad)
|
||||
|
||||
// one peer doesn't even have pss (boo!)
|
||||
nopssaddr := network.RandomAddr()
|
||||
@@ -606,16 +589,16 @@ func TestMismatch(t *testing.T) {
|
||||
Version: 1,
|
||||
}
|
||||
nid, _ = discover.HexID("0x02")
|
||||
nopsspeer := &pssTestPeer{
|
||||
Peer: protocols.NewPeer(p2p.NewPeer(nid, common.ToHex(nopssaddr.Over()), []p2p.Cap{nopsscap}), rw, nil),
|
||||
addr: nopssaddr.Over(),
|
||||
}
|
||||
nopsspeer := network.NewPeer(&network.BzzPeer{
|
||||
Peer: protocols.NewPeer(p2p.NewPeer(nid, common.ToHex(nopssaddr.Over()), []p2p.Cap{nopsscap}), rw, nil),
|
||||
BzzAddr: &network.BzzAddr{OAddr: nopssaddr.Over(), UAddr: nil},
|
||||
}, kad)
|
||||
|
||||
// add peers to kademlia and activate them
|
||||
// it's safe so don't check errors
|
||||
kad.Register([]network.OverlayAddr{wrongpsspeer})
|
||||
kad.Register(wrongpsspeer.BzzAddr)
|
||||
kad.On(wrongpsspeer)
|
||||
kad.Register([]network.OverlayAddr{nopsspeer})
|
||||
kad.Register(nopsspeer.BzzAddr)
|
||||
kad.On(nopsspeer)
|
||||
|
||||
// create pss
|
||||
@@ -1636,17 +1619,17 @@ func newServices(allowRaw bool) adapters.Services {
|
||||
}
|
||||
}
|
||||
|
||||
func newTestPss(privkey *ecdsa.PrivateKey, overlay network.Overlay, ppextra *PssParams) *Pss {
|
||||
func newTestPss(privkey *ecdsa.PrivateKey, kad *network.Kademlia, ppextra *PssParams) *Pss {
|
||||
|
||||
var nid discover.NodeID
|
||||
copy(nid[:], crypto.FromECDSAPub(&privkey.PublicKey))
|
||||
addr := network.NewAddrFromNodeID(nid)
|
||||
|
||||
// set up routing if kademlia is not passed to us
|
||||
if overlay == nil {
|
||||
if kad == nil {
|
||||
kp := network.NewKadParams()
|
||||
kp.MinProxBinSize = 3
|
||||
overlay = network.NewKademlia(addr.Over(), kp)
|
||||
kad = network.NewKademlia(addr.Over(), kp)
|
||||
}
|
||||
|
||||
// create pss
|
||||
@@ -1654,7 +1637,7 @@ func newTestPss(privkey *ecdsa.PrivateKey, overlay network.Overlay, ppextra *Pss
|
||||
if ppextra != nil {
|
||||
pp.SymKeyCacheCapacity = ppextra.SymKeyCacheCapacity
|
||||
}
|
||||
ps, err := NewPss(overlay, pp)
|
||||
ps, err := NewPss(kad, pp)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user