whisper: refactoring (#3411)

* whisper: refactored message processing
* whisper: final polishing
* whisper: logging updated
* whisper: moved the check, changed the default PoW
* whisper: refactoring of message queuing
* whisper: refactored parameters
This commit is contained in:
gluk256
2016-12-20 00:58:01 +01:00
committed by Felix Lange
parent 64bf5bafe9
commit ba996f5e27
12 changed files with 253 additions and 66 deletions

View File

@@ -22,6 +22,7 @@ import (
crand "crypto/rand"
"crypto/sha256"
"fmt"
"runtime"
"sync"
"time"
@@ -45,7 +46,7 @@ type Whisper struct {
symKeys map[string][]byte
keyMu sync.RWMutex
envelopes map[common.Hash]*Envelope // Pool of messages currently tracked by this node
envelopes map[common.Hash]*Envelope // Pool of envelopes currently tracked by this node
messages map[common.Hash]*ReceivedMessage // Pool of successfully decrypted messages, which are not expired yet
expirations map[uint32]*set.SetNonTS // Message expiration pool
poolMu sync.RWMutex // Mutex to sync the message and expiration pools
@@ -55,22 +56,28 @@ type Whisper struct {
mailServer MailServer
quit chan struct{}
test bool
messageQueue chan *Envelope
p2pMsgQueue chan *Envelope
quit chan struct{}
overflow bool
test bool
}
// New creates a Whisper client ready to communicate through the Ethereum P2P network.
// Param s should be passed if you want to implement mail server, otherwise nil.
func NewWhisper(server MailServer) *Whisper {
whisper := &Whisper{
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[common.Hash]*Envelope),
messages: make(map[common.Hash]*ReceivedMessage),
expirations: make(map[uint32]*set.SetNonTS),
peers: make(map[*Peer]struct{}),
mailServer: server,
quit: make(chan struct{}),
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[common.Hash]*Envelope),
messages: make(map[common.Hash]*ReceivedMessage),
expirations: make(map[uint32]*set.SetNonTS),
peers: make(map[*Peer]struct{}),
mailServer: server,
messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}),
}
whisper.filters = NewFilters(whisper)
@@ -124,7 +131,7 @@ func (w *Whisper) RequestHistoricMessages(peerID []byte, data []byte) error {
return err
}
p.trusted = true
return p2p.Send(p.ws, mailRequestCode, data)
return p2p.Send(p.ws, p2pRequestCode, data)
}
func (w *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error {
@@ -270,6 +277,12 @@ func (w *Whisper) Send(envelope *Envelope) error {
func (w *Whisper) Start(*p2p.Server) error {
glog.V(logger.Info).Infoln("Whisper started")
go w.update()
numCPU := runtime.NumCPU()
for i := 0; i < numCPU; i++ {
go w.processQueue()
}
return nil
}
@@ -350,10 +363,10 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
return fmt.Errorf("garbage received (directMessage)")
}
for _, envelope := range envelopes {
wh.postEvent(envelope, p2pCode)
wh.postEvent(envelope, true)
}
}
case mailRequestCode:
case p2pRequestCode:
// Must be processed if mail server is implemented. Otherwise ignore.
if wh.mailServer != nil {
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
@@ -382,7 +395,7 @@ func (wh *Whisper) add(envelope *Envelope) error {
if sent > now {
if sent-SynchAllowance > now {
return fmt.Errorf("message created in the future")
return fmt.Errorf("envelope created in the future [%x]", envelope.Hash())
} else {
// recalculate PoW, adjusted for the time difference, plus one second for latency
envelope.calculatePoW(sent - now + 1)
@@ -393,30 +406,31 @@ func (wh *Whisper) add(envelope *Envelope) error {
if envelope.Expiry+SynchAllowance*2 < now {
return fmt.Errorf("very old message")
} else {
glog.V(logger.Debug).Infof("expired envelope dropped [%x]", envelope.Hash())
return nil // drop envelope without error
}
}
if len(envelope.Data) > MaxMessageLength {
return fmt.Errorf("huge messages are not allowed")
return fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash())
}
if len(envelope.Version) > 4 {
return fmt.Errorf("oversized Version")
return fmt.Errorf("oversized version [%x]", envelope.Hash())
}
if len(envelope.AESNonce) > AESNonceMaxLength {
// the standard AES GSM nonce size is 12,
// but const gcmStandardNonceSize cannot be accessed directly
return fmt.Errorf("oversized AESNonce")
return fmt.Errorf("oversized AESNonce [%x]", envelope.Hash())
}
if len(envelope.Salt) > saltLength {
return fmt.Errorf("oversized Salt")
return fmt.Errorf("oversized salt [%x]", envelope.Hash())
}
if envelope.PoW() < MinimumPoW && !wh.test {
glog.V(logger.Debug).Infof("envelope with low PoW dropped: %f", envelope.PoW())
glog.V(logger.Debug).Infof("envelope with low PoW dropped: %f [%x]", envelope.PoW(), envelope.Hash())
return nil // drop envelope without error
}
@@ -436,22 +450,59 @@ func (wh *Whisper) add(envelope *Envelope) error {
wh.poolMu.Unlock()
if alreadyCached {
glog.V(logger.Detail).Infof("whisper envelope already cached: %x\n", envelope)
glog.V(logger.Detail).Infof("whisper envelope already cached [%x]\n", envelope.Hash())
} else {
wh.postEvent(envelope, messagesCode) // notify the local node about the new message
glog.V(logger.Detail).Infof("cached whisper envelope %v\n", envelope)
glog.V(logger.Detail).Infof("cached whisper envelope [%x]: %v\n", envelope.Hash(), envelope)
wh.postEvent(envelope, false) // notify the local node about the new message
}
return nil
}
// postEvent delivers the message to the watchers.
func (w *Whisper) postEvent(envelope *Envelope, messageCode uint64) {
// postEvent queues the message for further processing.
func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) {
// if the version of incoming message is higher than
// currently supported version, we can not decrypt it,
// and therefore just ignore this message
if envelope.Ver() <= EnvelopeVersion {
// todo: review if you need an additional thread here
go w.filters.NotifyWatchers(envelope, messageCode)
if isP2P {
w.p2pMsgQueue <- envelope
} else {
w.checkOverflow()
w.messageQueue <- envelope
}
}
}
// checkOverflow checks if message queue overflow occurs and reports it if necessary.
func (w *Whisper) checkOverflow() {
queueSize := len(w.messageQueue)
if queueSize == messageQueueLimit {
if !w.overflow {
w.overflow = true
glog.V(logger.Warn).Infoln("message queue overflow")
}
} else if queueSize <= messageQueueLimit/2 {
if w.overflow {
w.overflow = false
}
}
}
// processQueue delivers the messages to the watchers during the lifetime of the whisper node.
func (w *Whisper) processQueue() {
var e *Envelope
for {
select {
case <-w.quit:
return
case e = <-w.messageQueue:
w.filters.NotifyWatchers(e, false)
case e = <-w.p2pMsgQueue:
w.filters.NotifyWatchers(e, true)
}
}
}