whisper/whisperv6: implement pow/bloom exchange protocol (#15802)

This is the main feature of v6.
This commit is contained in:
gluk256
2018-01-12 13:11:22 +02:00
committed by Felix Lange
parent 56152b31ac
commit fd869dc839
7 changed files with 452 additions and 76 deletions

View File

@@ -48,9 +48,12 @@ type Statistics struct {
}
const (
minPowIdx = iota // Minimal PoW required by the whisper node
maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node
overflowIdx = iota // Indicator of message queue overflow
maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node
overflowIdx // Indicator of message queue overflow
minPowIdx // Minimal PoW required by the whisper node
minPowToleranceIdx // Minimal PoW tolerated by the whisper node for a limited time
bloomFilterIdx // Bloom filter for topics of interest for this node
bloomFilterToleranceIdx // Bloom filter tolerated by the whisper node for a limited time
)
// Whisper represents a dark communication interface through the Ethereum
@@ -76,7 +79,7 @@ type Whisper struct {
settings syncmap.Map // holds configuration settings that can be dynamically changed
reactionAllowance int // maximum time in seconds allowed to process the whisper-related messages
syncAllowance int // maximum time in seconds allowed to process the whisper-related messages
statsMu sync.Mutex // guard stats
stats Statistics // Statistics of whisper node
@@ -91,15 +94,15 @@ func New(cfg *Config) *Whisper {
}
whisper := &Whisper{
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[common.Hash]*Envelope),
expirations: make(map[uint32]*set.SetNonTS),
peers: make(map[*Peer]struct{}),
messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}),
reactionAllowance: SynchAllowance,
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[common.Hash]*Envelope),
expirations: make(map[uint32]*set.SetNonTS),
peers: make(map[*Peer]struct{}),
messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}),
syncAllowance: DefaultSyncAllowance,
}
whisper.filters = NewFilters(whisper)
@@ -126,11 +129,55 @@ func New(cfg *Config) *Whisper {
return whisper
}
// MinPow returns the PoW value required by this node.
func (w *Whisper) MinPow() float64 {
val, _ := w.settings.Load(minPowIdx)
val, exist := w.settings.Load(minPowIdx)
if !exist || val == nil {
return DefaultMinimumPoW
}
v, ok := val.(float64)
if !ok {
log.Error("Error loading minPowIdx, using default")
return DefaultMinimumPoW
}
return v
}
// MinPowTolerance returns the value of minimum PoW which is tolerated for a limited
// time after PoW was changed. If sufficient time have elapsed or no change of PoW
// have ever occurred, the return value will be the same as return value of MinPow().
func (w *Whisper) MinPowTolerance() float64 {
val, exist := w.settings.Load(minPowToleranceIdx)
if !exist || val == nil {
return DefaultMinimumPoW
}
return val.(float64)
}
// BloomFilter returns the aggregated bloom filter for all the topics of interest.
// The nodes are required to send only messages that match the advertised bloom filter.
// If a message does not match the bloom, it will tantamount to spam, and the peer will
// be disconnected.
func (w *Whisper) BloomFilter() []byte {
val, exist := w.settings.Load(bloomFilterIdx)
if !exist || val == nil {
return nil
}
return val.([]byte)
}
// BloomFilterTolerance returns the bloom filter which is tolerated for a limited
// time after new bloom was advertised to the peers. If sufficient time have elapsed
// or no change of bloom filter have ever occurred, the return value will be the same
// as return value of BloomFilter().
func (w *Whisper) BloomFilterTolerance() []byte {
val, exist := w.settings.Load(bloomFilterToleranceIdx)
if !exist || val == nil {
return nil
}
return val.([]byte)
}
// MaxMessageSize returns the maximum accepted message size.
func (w *Whisper) MaxMessageSize() uint32 {
val, _ := w.settings.Load(maxMsgSizeIdx)
@@ -180,18 +227,40 @@ func (w *Whisper) SetMaxMessageSize(size uint32) error {
return nil
}
// SetBloomFilter sets the new bloom filter
func (w *Whisper) SetBloomFilter(bloom []byte) error {
if len(bloom) != bloomFilterSize {
return fmt.Errorf("invalid bloom filter size: %d", len(bloom))
}
b := make([]byte, bloomFilterSize)
copy(b, bloom)
w.settings.Store(bloomFilterIdx, b)
w.notifyPeersAboutBloomFilterChange(b)
go func() {
// allow some time before all the peers have processed the notification
time.Sleep(time.Duration(w.syncAllowance) * time.Second)
w.settings.Store(bloomFilterToleranceIdx, b)
}()
return nil
}
// SetMinimumPoW sets the minimal PoW required by this node
func (w *Whisper) SetMinimumPoW(val float64) error {
if val < 0.0 {
return fmt.Errorf("invalid PoW: %f", val)
}
w.settings.Store(minPowIdx, val)
w.notifyPeersAboutPowRequirementChange(val)
go func() {
// allow some time before all the peers have processed the notification
time.Sleep(time.Duration(w.reactionAllowance) * time.Second)
w.settings.Store(minPowIdx, val)
time.Sleep(time.Duration(w.syncAllowance) * time.Second)
w.settings.Store(minPowToleranceIdx, val)
}()
return nil
@@ -199,21 +268,13 @@ func (w *Whisper) SetMinimumPoW(val float64) error {
// SetMinimumPoW sets the minimal PoW in test environment
func (w *Whisper) SetMinimumPowTest(val float64) {
w.notifyPeersAboutPowRequirementChange(val)
w.settings.Store(minPowIdx, val)
w.notifyPeersAboutPowRequirementChange(val)
w.settings.Store(minPowToleranceIdx, val)
}
func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) {
arr := make([]*Peer, len(w.peers))
i := 0
w.peerMu.Lock()
for p := range w.peers {
arr[i] = p
i++
}
w.peerMu.Unlock()
arr := w.getPeers()
for _, p := range arr {
err := p.notifyAboutPowRequirementChange(pow)
if err != nil {
@@ -221,11 +282,37 @@ func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) {
err = p.notifyAboutPowRequirementChange(pow)
}
if err != nil {
log.Warn("oversized message received", "peer", p.ID(), "error", err)
log.Warn("failed to notify peer about new pow requirement", "peer", p.ID(), "error", err)
}
}
}
func (w *Whisper) notifyPeersAboutBloomFilterChange(bloom []byte) {
arr := w.getPeers()
for _, p := range arr {
err := p.notifyAboutBloomFilterChange(bloom)
if err != nil {
// allow one retry
err = p.notifyAboutBloomFilterChange(bloom)
}
if err != nil {
log.Warn("failed to notify peer about new bloom filter", "peer", p.ID(), "error", err)
}
}
}
func (w *Whisper) getPeers() []*Peer {
arr := make([]*Peer, len(w.peers))
i := 0
w.peerMu.Lock()
for p := range w.peers {
arr[i] = p
i++
}
w.peerMu.Unlock()
return arr
}
// getPeer retrieves peer by ID
func (w *Whisper) getPeer(peerID []byte) (*Peer, error) {
w.peerMu.Lock()
@@ -459,7 +546,28 @@ func (w *Whisper) GetSymKey(id string) ([]byte, error) {
// Subscribe installs a new message handler used for filtering, decrypting
// and subsequent storing of incoming messages.
func (w *Whisper) Subscribe(f *Filter) (string, error) {
return w.filters.Install(f)
s, err := w.filters.Install(f)
if err == nil {
w.updateBloomFilter(f)
}
return s, err
}
// updateBloomFilter recalculates the new value of bloom filter,
// and informs the peers if necessary.
func (w *Whisper) updateBloomFilter(f *Filter) {
aggregate := make([]byte, bloomFilterSize)
for _, t := range f.Topics {
top := BytesToTopic(t)
b := TopicToBloom(top)
aggregate = addBloom(aggregate, b)
}
if !bloomFilterMatch(w.BloomFilter(), aggregate) {
// existing bloom filter must be updated
aggregate = addBloom(w.BloomFilter(), aggregate)
w.SetBloomFilter(aggregate)
}
}
// GetFilter returns the filter by id.
@@ -592,7 +700,21 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
}
p.powRequirement = f
case bloomFilterExCode:
// to be implemented
var bloom []byte
err := packet.Decode(&bloom)
if err == nil && len(bloom) != bloomFilterSize {
err = fmt.Errorf("wrong bloom filter size %d", len(bloom))
}
if err != nil {
log.Warn("failed to decode bloom filter exchange message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid bloom filter exchange message")
}
if isFullNode(bloom) {
p.bloomFilter = nil
} else {
p.bloomFilter = bloom
}
case p2pMessageCode:
// peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
// this message is not supposed to be forwarded to other peers, and
@@ -633,7 +755,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
sent := envelope.Expiry - envelope.TTL
if sent > now {
if sent-SynchAllowance > now {
if sent-DefaultSyncAllowance > now {
return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash())
} else {
// recalculate PoW, adjusted for the time difference, plus one second for latency
@@ -642,7 +764,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
}
if envelope.Expiry < now {
if envelope.Expiry+SynchAllowance*2 < now {
if envelope.Expiry+DefaultSyncAllowance*2 < now {
return false, fmt.Errorf("very old message")
} else {
log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex())
@@ -655,11 +777,22 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
}
if envelope.PoW() < wh.MinPow() {
log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex())
return false, nil // drop envelope without error for now
// maybe the value was recently changed, and the peers did not adjust yet.
// in this case the previous value is retrieved by MinPowTolerance()
// for a short period of peer synchronization.
if envelope.PoW() < wh.MinPowTolerance() {
return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
}
}
// once the status message includes the PoW requirement, an error should be returned here:
//return false, fmt.Errorf("envelope with low PoW dropped: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
if !bloomFilterMatch(wh.BloomFilter(), envelope.Bloom()) {
// maybe the value was recently changed, and the peers did not adjust yet.
// in this case the previous value is retrieved by BloomFilterTolerance()
// for a short period of peer synchronization.
if !bloomFilterMatch(wh.BloomFilterTolerance(), envelope.Bloom()) {
return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x",
envelope.Hash().Hex(), wh.BloomFilter(), envelope.Bloom(), envelope.Topic)
}
}
hash := envelope.Hash()
@@ -897,3 +1030,40 @@ func GenerateRandomID() (id string, err error) {
id = common.Bytes2Hex(buf)
return id, err
}
func isFullNode(bloom []byte) bool {
if bloom == nil {
return true
}
for _, b := range bloom {
if b != 255 {
return false
}
}
return true
}
func bloomFilterMatch(filter, sample []byte) bool {
if filter == nil {
// full node, accepts all messages
return true
}
for i := 0; i < bloomFilterSize; i++ {
f := filter[i]
s := sample[i]
if (f | s) != f {
return false
}
}
return true
}
func addBloom(a, b []byte) []byte {
c := make([]byte, bloomFilterSize)
for i := 0; i < bloomFilterSize; i++ {
c[i] = a[i] | b[i]
}
return c
}