rpc, whisper, xeth: fix RPC message retrieval data race
This commit is contained in:
		| @@ -467,7 +467,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err | ||||
| 		if err := json.Unmarshal(req.Params, &args); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		*reply = api.xeth().Whisper().Messages(args.Id) | ||||
| 		*reply = api.xeth().Messages(args.Id) | ||||
|  | ||||
| 	// case "eth_register": | ||||
| 	// 	// Placeholder for actual type | ||||
|   | ||||
| @@ -73,6 +73,7 @@ func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) { | ||||
| 	message := &Message{ | ||||
| 		Flags: data[0], | ||||
| 		Sent:  int64(self.Expiry - self.TTL), | ||||
| 		Hash:  self.Hash(), | ||||
| 	} | ||||
| 	data = data[1:] | ||||
|  | ||||
|   | ||||
							
								
								
									
										36
									
								
								whisper/envelope_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										36
									
								
								whisper/envelope_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,36 @@ | ||||
| package whisper | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"testing" | ||||
| ) | ||||
|  | ||||
| func TestEnvelopeOpen(t *testing.T) { | ||||
| 	payload := []byte("hello world") | ||||
| 	message := NewMessage(payload) | ||||
|  | ||||
| 	envelope, err := message.Wrap(DefaultPoW, Options{}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("failed to wrap message: %v", err) | ||||
| 	} | ||||
| 	opened, err := envelope.Open(nil) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("failed to open envelope: %v.", err) | ||||
| 	} | ||||
| 	if opened.Flags != message.Flags { | ||||
| 		t.Fatalf("flags mismatch: have %d, want %d", opened.Flags, message.Flags) | ||||
| 	} | ||||
| 	if bytes.Compare(opened.Signature, message.Signature) != 0 { | ||||
| 		t.Fatalf("signature mismatch: have 0x%x, want 0x%x", opened.Signature, message.Signature) | ||||
| 	} | ||||
| 	if bytes.Compare(opened.Payload, message.Payload) != 0 { | ||||
| 		t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, message.Payload) | ||||
| 	} | ||||
| 	if opened.Sent != message.Sent { | ||||
| 		t.Fatalf("send time mismatch: have %d, want %d", opened.Sent, message.Sent) | ||||
| 	} | ||||
|  | ||||
| 	if opened.Hash != envelope.Hash() { | ||||
| 		t.Fatalf("message hash mismatch: have 0x%x, want 0x%x", opened.Hash, envelope.Hash()) | ||||
| 	} | ||||
| } | ||||
| @@ -8,12 +8,13 @@ import ( | ||||
| 	"math/rand" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/ethereum/go-ethereum/common" | ||||
| 	"github.com/ethereum/go-ethereum/crypto" | ||||
| 	"github.com/ethereum/go-ethereum/logger" | ||||
| 	"github.com/ethereum/go-ethereum/logger/glog" | ||||
| ) | ||||
|  | ||||
| // Message represents an end-user data packet to trasmit through the Whisper | ||||
| // Message represents an end-user data packet to transmit through the Whisper | ||||
| // protocol. These are wrapped into Envelopes that need not be understood by | ||||
| // intermediate nodes, just forwarded. | ||||
| type Message struct { | ||||
| @@ -22,7 +23,8 @@ type Message struct { | ||||
| 	Payload   []byte | ||||
| 	Sent      int64 | ||||
|  | ||||
| 	To *ecdsa.PublicKey | ||||
| 	To   *ecdsa.PublicKey // Message recipient (identity used to decode the message) | ||||
| 	Hash common.Hash      // Message envelope hash to act as a unique id in de-duplication | ||||
| } | ||||
|  | ||||
| // Options specifies the exact way a message should be wrapped into an Envelope. | ||||
|   | ||||
| @@ -1,26 +1,84 @@ | ||||
| // Contains the external API side message filter for watching, pooling and polling | ||||
| // matched whisper messages. | ||||
| // matched whisper messages, also serializing data access to avoid duplications. | ||||
|  | ||||
| package xeth | ||||
|  | ||||
| import "time" | ||||
| import ( | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/ethereum/go-ethereum/common" | ||||
| ) | ||||
|  | ||||
| // whisperFilter is the message cache matching a specific filter, accumulating | ||||
| // inbound messages until the are requested by the client. | ||||
| type whisperFilter struct { | ||||
| 	id      int              // Filter identifier | ||||
| 	id  int      // Filter identifier for old message retrieval | ||||
| 	ref *Whisper // Whisper reference for old message retrieval | ||||
|  | ||||
| 	cache  []WhisperMessage         // Cache of messages not yet polled | ||||
| 	timeout time.Time        // Time when the last message batch was queries | ||||
| 	skip   map[common.Hash]struct{} // List of retrieved messages to avoid duplication | ||||
| 	update time.Time                // Time of the last message query | ||||
|  | ||||
| 	lock sync.RWMutex // Lock protecting the filter internals | ||||
| } | ||||
|  | ||||
| // newWhisperFilter creates a new serialized, poll based whisper topic filter. | ||||
| func newWhisperFilter(id int, ref *Whisper) *whisperFilter { | ||||
| 	return &whisperFilter{ | ||||
| 		id:  id, | ||||
| 		ref: ref, | ||||
|  | ||||
| 		update: time.Now(), | ||||
| 		skip:   make(map[common.Hash]struct{}), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // messages retrieves all the cached messages from the entire pool matching the | ||||
| // filter, resetting the filter's change buffer. | ||||
| func (w *whisperFilter) messages() []WhisperMessage { | ||||
| 	w.lock.Lock() | ||||
| 	defer w.lock.Unlock() | ||||
|  | ||||
| 	w.cache = nil | ||||
| 	w.update = time.Now() | ||||
|  | ||||
| 	w.skip = make(map[common.Hash]struct{}) | ||||
| 	messages := w.ref.Messages(w.id) | ||||
| 	for _, message := range messages { | ||||
| 		w.skip[message.ref.Hash] = struct{}{} | ||||
| 	} | ||||
| 	return messages | ||||
| } | ||||
|  | ||||
| // insert injects a new batch of messages into the filter cache. | ||||
| func (w *whisperFilter) insert(msgs ...WhisperMessage) { | ||||
| 	w.cache = append(w.cache, msgs...) | ||||
| func (w *whisperFilter) insert(messages ...WhisperMessage) { | ||||
| 	w.lock.Lock() | ||||
| 	defer w.lock.Unlock() | ||||
|  | ||||
| 	for _, message := range messages { | ||||
| 		if _, ok := w.skip[message.ref.Hash]; !ok { | ||||
| 			w.cache = append(w.cache, messages...) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // retrieve fetches all the cached messages from the filter. | ||||
| func (w *whisperFilter) retrieve() (messages []WhisperMessage) { | ||||
| 	w.lock.Lock() | ||||
| 	defer w.lock.Unlock() | ||||
|  | ||||
| 	messages, w.cache = w.cache, nil | ||||
| 	w.timeout = time.Now() | ||||
| 	w.update = time.Now() | ||||
|  | ||||
| 	return | ||||
| } | ||||
|  | ||||
| // activity returns the last time instance when client requests were executed on | ||||
| // the filter. | ||||
| func (w *whisperFilter) activity() time.Time { | ||||
| 	w.lock.RLock() | ||||
| 	defer w.lock.RUnlock() | ||||
|  | ||||
| 	return w.update | ||||
| } | ||||
|   | ||||
							
								
								
									
										13
									
								
								xeth/xeth.go
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								xeth/xeth.go
									
									
									
									
									
								
							| @@ -97,7 +97,7 @@ done: | ||||
| 			} | ||||
|  | ||||
| 			for id, filter := range self.messages { | ||||
| 				if time.Since(filter.timeout) > filterTickerTime { | ||||
| 				if time.Since(filter.activity()) > filterTickerTime { | ||||
| 					self.Whisper().Unwatch(id) | ||||
| 					delete(self.messages, id) | ||||
| 				} | ||||
| @@ -461,7 +461,7 @@ func (p *XEth) NewWhisperFilter(to, from string, topics []string) int { | ||||
| 		p.messages[id].insert(msg) | ||||
| 	} | ||||
| 	id = p.Whisper().Watch(to, from, topics, callback) | ||||
| 	p.messages[id] = &whisperFilter{timeout: time.Now()} | ||||
| 	p.messages[id] = newWhisperFilter(id, p.Whisper()) | ||||
| 	return id | ||||
| } | ||||
|  | ||||
| @@ -481,7 +481,16 @@ func (self *XEth) MessagesChanged(id int) []WhisperMessage { | ||||
| 	if self.messages[id] != nil { | ||||
| 		return self.messages[id].retrieve() | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (self *XEth) Messages(id int) []WhisperMessage { | ||||
| 	self.messagesMut.Lock() | ||||
| 	defer self.messagesMut.Unlock() | ||||
|  | ||||
| 	if self.messages[id] != nil { | ||||
| 		return self.messages[id].messages() | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user