whisperv5: integrate whisper and add whisper RPC simulator

This commit is contained in:
Bas van Kervel
2017-06-13 11:49:07 +02:00
parent 80f7c6c299
commit b58a501673
27 changed files with 1311 additions and 555 deletions

View File

@ -26,6 +26,8 @@ import (
"sync"
"time"
"sync/atomic"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
@ -44,6 +46,38 @@ type Statistics struct {
totalMessagesCleared int
}
type settingType byte
type settingsMap map[settingType]interface{}
const (
minPowIdx settingType = iota // Minimal PoW required by the whisper node
maxMsgSizeIdx settingType = iota // Maximal message length allowed by the whisper node
OverflowIdx settingType = iota // Indicator of message queue overflow
)
type settingsVault struct {
vaultMu sync.Mutex
vault atomic.Value
}
func (s *settingsVault) get(idx settingType) interface{} {
m := s.vault.Load().(settingsMap)
return m[idx]
}
func (s *settingsVault) store(idx settingType, val interface{}) {
s.vaultMu.Lock()
defer s.vaultMu.Unlock()
m1 := s.vault.Load().(settingsMap)
m2 := make(settingsMap)
for k, v := range m1 {
m2[k] = v
}
m2[idx] = val
s.vault.Store(m2)
}
// Whisper represents a dark communication interface through the Ethereum
// network, using its very own P2P communication layer.
type Whisper struct {
@ -54,28 +88,27 @@ type Whisper struct {
symKeys map[string][]byte // Symmetric key storage
keyMu sync.RWMutex // Mutex associated with key storages
poolMu sync.RWMutex // Mutex to sync the message and expiration pools
envelopes map[common.Hash]*Envelope // Pool of envelopes currently tracked by this node
expirations map[uint32]*set.SetNonTS // Message expiration pool
poolMu sync.RWMutex // Mutex to sync the message and expiration pools
peers map[*Peer]struct{} // Set of currently active peers
peerMu sync.RWMutex // Mutex to sync the active peer set
peers map[*Peer]struct{} // Set of currently active peers
messageQueue chan *Envelope // Message queue for normal whisper messages
p2pMsgQueue chan *Envelope // Message queue for peer-to-peer messages (not to be forwarded any further)
quit chan struct{} // Channel used for graceful exit
minPoW float64 // Minimal PoW required by the whisper node
maxMsgLength int // Maximal message length allowed by the whisper node
overflow bool // Indicator of message queue overflow
settings settingsVault // holds configuration settings that can be dynamically changed
stats Statistics // Statistics of whisper node
statsMu sync.Mutex // guard stats
stats Statistics // Statistics of whisper node
mailServer MailServer // MailServer interface
}
// New creates a Whisper client ready to communicate through the Ethereum P2P network.
func New() *Whisper {
func New(cfg *Config) *Whisper {
whisper := &Whisper{
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
@ -85,22 +118,47 @@ func New() *Whisper {
messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}),
minPoW: DefaultMinimumPoW,
maxMsgLength: DefaultMaxMessageLength,
}
whisper.filters = NewFilters(whisper)
whisper.settings.vault.Store(make(settingsMap))
whisper.settings.store(minPowIdx, cfg.MinimumAcceptedPOW)
whisper.settings.store(maxMsgSizeIdx, cfg.MaxMessageSize)
whisper.settings.store(OverflowIdx, false)
// p2p whisper sub protocol handler
whisper.protocol = p2p.Protocol{
Name: ProtocolName,
Version: uint(ProtocolVersion),
Length: NumberOfMessageCodes,
Run: whisper.HandlePeer,
NodeInfo: func() interface{} {
return map[string]interface{}{
"version": ProtocolVersionStr,
"maxMessageSize": whisper.MaxMessageSize(),
"minimumPoW": whisper.MinPow(),
}
},
}
return whisper
}
func (w *Whisper) MinPow() float64 {
return w.settings.get(minPowIdx).(float64)
}
// MaxMessageSize returns the maximum accepted message size.
func (w *Whisper) MaxMessageSize() uint32 {
return w.settings.get(maxMsgSizeIdx).(uint32)
}
// Overflow returns an indication if the message queue is full.
func (w *Whisper) Overflow() bool {
return w.settings.get(OverflowIdx).(bool)
}
// APIs returns the RPC descriptors the Whisper implementation offers
func (w *Whisper) APIs() []rpc.API {
return []rpc.API{
@ -129,12 +187,12 @@ func (w *Whisper) Version() uint {
return w.protocol.Version
}
// SetMaxMessageLength sets the maximal message length allowed by this node
func (w *Whisper) SetMaxMessageLength(val int) error {
if val <= 0 {
return fmt.Errorf("invalid message length: %d", val)
// SetMaxMessageSize sets the maximal message size allowed by this node
func (w *Whisper) SetMaxMessageSize(size uint32) error {
if size > MaxMessageSize {
return fmt.Errorf("message size too large [%d>%d]", size, MaxMessageSize)
}
w.maxMsgLength = val
w.settings.store(maxMsgSizeIdx, uint32(size))
return nil
}
@ -143,7 +201,7 @@ func (w *Whisper) SetMinimumPoW(val float64) error {
if val <= 0.0 {
return fmt.Errorf("invalid PoW: %f", val)
}
w.minPoW = val
w.settings.store(minPowIdx, val)
return nil
}
@ -240,6 +298,20 @@ func (w *Whisper) DeleteKeyPair(key string) bool {
return false
}
// AddKeyPair imports a asymmetric private key and returns it identifier.
func (w *Whisper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) {
id, err := GenerateRandomID()
if err != nil {
return "", fmt.Errorf("failed to generate ID: %s", err)
}
w.keyMu.Lock()
w.privateKeys[id] = key
w.keyMu.Unlock()
return id, nil
}
// HasKeyPair checks if the the whisper node is configured with the private key
// of the specified public pair.
func (w *Whisper) HasKeyPair(id string) bool {
@ -451,7 +523,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
log.Warn("message loop", "peer", p.peer.ID(), "err", err)
return err
}
if packet.Size > uint32(wh.maxMsgLength) {
if packet.Size > wh.MaxMessageSize() {
log.Warn("oversized message received", "peer", p.peer.ID())
return errors.New("oversized message received")
}
@ -532,7 +604,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
}
}
if envelope.size() > wh.maxMsgLength {
if uint32(envelope.size()) > wh.MaxMessageSize() {
return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash())
}
@ -547,7 +619,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
return false, fmt.Errorf("wrong size of AESNonce: %d bytes [env: %x]", aesNonceSize, envelope.Hash())
}
if envelope.PoW() < wh.minPoW {
if envelope.PoW() < wh.MinPow() {
log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex())
return false, nil // drop envelope without error
}
@ -571,7 +643,9 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex())
} else {
log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex())
wh.statsMu.Lock()
wh.stats.memoryUsed += envelope.size()
wh.statsMu.Unlock()
wh.postEvent(envelope, false) // notify the local node about the new message
if wh.mailServer != nil {
wh.mailServer.Archive(envelope)
@ -600,13 +674,13 @@ func (w *Whisper) checkOverflow() {
queueSize := len(w.messageQueue)
if queueSize == messageQueueLimit {
if !w.overflow {
w.overflow = true
if !w.Overflow() {
w.settings.store(OverflowIdx, true)
log.Warn("message queue overflow")
}
} else if queueSize <= messageQueueLimit/2 {
if w.overflow {
w.overflow = false
if w.Overflow() {
w.settings.store(OverflowIdx, false)
log.Warn("message queue overflow fixed (back to normal)")
}
}
@ -653,6 +727,8 @@ func (w *Whisper) expire() {
w.poolMu.Lock()
defer w.poolMu.Unlock()
w.statsMu.Lock()
defer w.statsMu.Unlock()
w.stats.reset()
now := uint32(time.Now().Unix())
for expiry, hashSet := range w.expirations {
@ -673,17 +749,11 @@ func (w *Whisper) expire() {
}
// Stats returns the whisper node statistics.
func (w *Whisper) Stats() string {
result := fmt.Sprintf("Memory usage: %d bytes. Average messages cleared per expiry cycle: %d. Total messages cleared: %d.",
w.stats.memoryUsed, w.stats.totalMessagesCleared/w.stats.cycles, w.stats.totalMessagesCleared)
if w.stats.messagesCleared > 0 {
result += fmt.Sprintf(" Latest expiry cycle cleared %d messages (%d bytes).",
w.stats.messagesCleared, w.stats.memoryCleared)
}
if w.overflow {
result += " Message queue state: overflow."
}
return result
func (w *Whisper) Stats() Statistics {
w.statsMu.Lock()
defer w.statsMu.Unlock()
return w.stats
}
// Envelopes retrieves all the messages currently pooled by the node.
@ -734,15 +804,6 @@ func (s *Statistics) reset() {
s.messagesCleared = 0
}
// ValidateKeyID checks the format of key id.
func ValidateKeyID(id string) error {
const target = keyIdSize * 2
if len(id) != target {
return fmt.Errorf("wrong size of key ID (expected %d bytes, got %d)", target, len(id))
}
return nil
}
// ValidatePublicKey checks the format of the given public key.
func ValidatePublicKey(k *ecdsa.PublicKey) bool {
return k != nil && k.X != nil && k.Y != nil && k.X.Sign() != 0 && k.Y.Sign() != 0