swarm/pss: forwarding function refactoring (#18353)
This commit is contained in:
committed by
Anton Evangelatov
parent
e1edfe0689
commit
ca7c13ba8f
133
swarm/pss/pss.go
133
swarm/pss/pss.go
@@ -891,68 +891,97 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by
|
||||
return nil
|
||||
}
|
||||
|
||||
// Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct
|
||||
// The recipient address can be of any length, and the byte slice will be matched to the MSB slice
|
||||
// of the peer address of the equivalent length.
|
||||
// sendFunc is a helper function that tries to send a message and returns true on success.
|
||||
// It is set here for usage in production, and optionally overridden in tests.
|
||||
var sendFunc func(p *Pss, sp *network.Peer, msg *PssMsg) bool = sendMsg
|
||||
|
||||
// tries to send a message, returns true if successful
|
||||
func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool {
|
||||
var isPssEnabled bool
|
||||
info := sp.Info()
|
||||
for _, capability := range info.Caps {
|
||||
if capability == p.capstring {
|
||||
isPssEnabled = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !isPssEnabled {
|
||||
log.Error("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
|
||||
return false
|
||||
}
|
||||
|
||||
// get the protocol peer from the forwarding peer cache
|
||||
p.fwdPoolMu.RLock()
|
||||
pp := p.fwdPool[sp.Info().ID]
|
||||
p.fwdPoolMu.RUnlock()
|
||||
|
||||
err := pp.Send(context.TODO(), msg)
|
||||
if err != nil {
|
||||
metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
|
||||
log.Error(err.Error())
|
||||
}
|
||||
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// Forwards a pss message to the peer(s) based on recipient address according to the algorithm
|
||||
// described below. The recipient address can be of any length, and the byte slice will be matched
|
||||
// to the MSB slice of the peer address of the equivalent length.
|
||||
//
|
||||
// If the recipient address (or partial address) is within the neighbourhood depth of the forwarding
|
||||
// node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of
|
||||
// partial address, it should be forwarded to all the peers matching the partial address, if there
|
||||
// are any; otherwise only to one peer, closest to the recipient address. In any case, if the message
|
||||
// forwarding fails, the node should try to forward it to the next best peer, until the message is
|
||||
// successfully forwarded to at least one peer.
|
||||
func (p *Pss) forward(msg *PssMsg) error {
|
||||
metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1)
|
||||
|
||||
sent := 0 // number of successful sends
|
||||
to := make([]byte, addressLength)
|
||||
copy(to[:len(msg.To)], msg.To)
|
||||
neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth()
|
||||
|
||||
// send with kademlia
|
||||
// find the closest peer to the recipient and attempt to send
|
||||
sent := 0
|
||||
p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool {
|
||||
info := sp.Info()
|
||||
// luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness,
|
||||
// but the luminosity is less. here luminosity equals the number of bits given in the destination address.
|
||||
luminosityRadius := len(msg.To) * 8
|
||||
|
||||
// check if the peer is running pss
|
||||
var ispss bool
|
||||
for _, cap := range info.Caps {
|
||||
if cap == p.capstring {
|
||||
ispss = true
|
||||
break
|
||||
// proximity order function matching up to neighbourhoodDepth bits (po <= neighbourhoodDepth)
|
||||
pof := pot.DefaultPof(neighbourhoodDepth)
|
||||
|
||||
// soft threshold for msg broadcast
|
||||
broadcastThreshold, _ := pof(to, p.BaseAddr(), 0)
|
||||
if broadcastThreshold > luminosityRadius {
|
||||
broadcastThreshold = luminosityRadius
|
||||
}
|
||||
|
||||
var onlySendOnce bool // indicates if the message should only be sent to one peer with closest address
|
||||
|
||||
// if measured from the recipient address as opposed to the base address (see Kademlia.EachConn
|
||||
// call below), then peers that fall in the same proximity bin as recipient address will appear
|
||||
// [at least] one bit closer, but only if these additional bits are given in the recipient address.
|
||||
if broadcastThreshold < luminosityRadius && broadcastThreshold < neighbourhoodDepth {
|
||||
broadcastThreshold++
|
||||
onlySendOnce = true
|
||||
}
|
||||
|
||||
p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool {
|
||||
if po < broadcastThreshold && sent > 0 {
|
||||
return false // stop iterating
|
||||
}
|
||||
if sendFunc(p, sp, msg) {
|
||||
sent++
|
||||
if onlySendOnce {
|
||||
return false
|
||||
}
|
||||
if po == addressLength*8 {
|
||||
// stop iterating if successfully sent to the exact recipient (perfect match of full address)
|
||||
return false
|
||||
}
|
||||
}
|
||||
if !ispss {
|
||||
log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
|
||||
return true
|
||||
}
|
||||
|
||||
// get the protocol peer from the forwarding peer cache
|
||||
sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address())
|
||||
p.fwdPoolMu.RLock()
|
||||
pp := p.fwdPool[sp.Info().ID]
|
||||
p.fwdPoolMu.RUnlock()
|
||||
|
||||
// attempt to send the message
|
||||
err := pp.Send(context.TODO(), msg)
|
||||
if err != nil {
|
||||
metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
|
||||
log.Error(err.Error())
|
||||
return true
|
||||
}
|
||||
sent++
|
||||
log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg))
|
||||
|
||||
// continue forwarding if:
|
||||
// - if the peer is end recipient but the full address has not been disclosed
|
||||
// - if the peer address matches the partial address fully
|
||||
// - if the peer is in proxbin
|
||||
if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) {
|
||||
log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match"))
|
||||
return true
|
||||
} else if isproxbin {
|
||||
log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address())))
|
||||
return true
|
||||
}
|
||||
// at this point we stop forwarding, and the state is as follows:
|
||||
// - the peer is end recipient and we have full address
|
||||
// - we are not in proxbin (directed routing)
|
||||
// - partial addresses don't fully match
|
||||
return false
|
||||
return true
|
||||
})
|
||||
|
||||
// if we failed to send to anyone, re-insert message in the send-queue
|
||||
if sent == 0 {
|
||||
log.Debug("unable to forward to any peers")
|
||||
if err := p.enqueue(msg); err != nil {
|
||||
|
Reference in New Issue
Block a user