Merge pull request #2106 from obscuren/out-of-bound-logs
eth/filters: added notifications for out of bound log events
This commit is contained in:
		@@ -206,12 +206,12 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
 | 
				
			|||||||
	filter.SetEndBlock(latest)
 | 
						filter.SetEndBlock(latest)
 | 
				
			||||||
	filter.SetAddresses(addresses)
 | 
						filter.SetAddresses(addresses)
 | 
				
			||||||
	filter.SetTopics(topics)
 | 
						filter.SetTopics(topics)
 | 
				
			||||||
	filter.LogsCallback = func(logs vm.Logs) {
 | 
						filter.LogCallback = func(log *vm.Log, removed bool) {
 | 
				
			||||||
		s.logMu.Lock()
 | 
							s.logMu.Lock()
 | 
				
			||||||
		defer s.logMu.Unlock()
 | 
							defer s.logMu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if queue := s.logQueue[id]; queue != nil {
 | 
							if queue := s.logQueue[id]; queue != nil {
 | 
				
			||||||
			queue.add(logs...)
 | 
								queue.add(vmlog{log, removed})
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -365,14 +365,14 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// GetLogs returns the logs matching the given argument.
 | 
					// GetLogs returns the logs matching the given argument.
 | 
				
			||||||
func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) vm.Logs {
 | 
					func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog {
 | 
				
			||||||
	filter := New(s.chainDb)
 | 
						filter := New(s.chainDb)
 | 
				
			||||||
	filter.SetBeginBlock(args.FromBlock.Int64())
 | 
						filter.SetBeginBlock(args.FromBlock.Int64())
 | 
				
			||||||
	filter.SetEndBlock(args.ToBlock.Int64())
 | 
						filter.SetEndBlock(args.ToBlock.Int64())
 | 
				
			||||||
	filter.SetAddresses(args.Addresses)
 | 
						filter.SetAddresses(args.Addresses)
 | 
				
			||||||
	filter.SetTopics(args.Topics)
 | 
						filter.SetTopics(args.Topics)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return returnLogs(filter.Find())
 | 
						return toRPCLogs(filter.Find(), false)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// UninstallFilter removes the filter with the given filter id.
 | 
					// UninstallFilter removes the filter with the given filter id.
 | 
				
			||||||
@@ -447,7 +447,7 @@ func (s *PublicFilterAPI) transactionFilterChanged(id int) []common.Hash {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// logFilterChanged returns a collection of logs for the log filter with the given id.
 | 
					// logFilterChanged returns a collection of logs for the log filter with the given id.
 | 
				
			||||||
func (s *PublicFilterAPI) logFilterChanged(id int) vm.Logs {
 | 
					func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog {
 | 
				
			||||||
	s.logMu.Lock()
 | 
						s.logMu.Lock()
 | 
				
			||||||
	defer s.logMu.Unlock()
 | 
						defer s.logMu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -458,17 +458,17 @@ func (s *PublicFilterAPI) logFilterChanged(id int) vm.Logs {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// GetFilterLogs returns the logs for the filter with the given id.
 | 
					// GetFilterLogs returns the logs for the filter with the given id.
 | 
				
			||||||
func (s *PublicFilterAPI) GetFilterLogs(filterId string) vm.Logs {
 | 
					func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
 | 
				
			||||||
	id, ok := s.filterMapping[filterId]
 | 
						id, ok := s.filterMapping[filterId]
 | 
				
			||||||
	if !ok {
 | 
						if !ok {
 | 
				
			||||||
		return returnLogs(nil)
 | 
							return toRPCLogs(nil, false)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if filter := s.filterManager.Get(id); filter != nil {
 | 
						if filter := s.filterManager.Get(id); filter != nil {
 | 
				
			||||||
		return returnLogs(filter.Find())
 | 
							return toRPCLogs(filter.Find(), false)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return returnLogs(nil)
 | 
						return toRPCLogs(nil, false)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// GetFilterChanges returns the logs for the filter with the given id since last time is was called.
 | 
					// GetFilterChanges returns the logs for the filter with the given id since last time is was called.
 | 
				
			||||||
@@ -488,28 +488,33 @@ func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
 | 
				
			|||||||
	case transactionFilterTy:
 | 
						case transactionFilterTy:
 | 
				
			||||||
		return returnHashes(s.transactionFilterChanged(id))
 | 
							return returnHashes(s.transactionFilterChanged(id))
 | 
				
			||||||
	case logFilterTy:
 | 
						case logFilterTy:
 | 
				
			||||||
		return returnLogs(s.logFilterChanged(id))
 | 
							return s.logFilterChanged(id)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return []interface{}{}
 | 
						return []interface{}{}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type vmlog struct {
 | 
				
			||||||
 | 
						*vm.Log
 | 
				
			||||||
 | 
						Removed bool `json:"removed"`
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type logQueue struct {
 | 
					type logQueue struct {
 | 
				
			||||||
	mu sync.Mutex
 | 
						mu sync.Mutex
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	logs    vm.Logs
 | 
						logs    []vmlog
 | 
				
			||||||
	timeout time.Time
 | 
						timeout time.Time
 | 
				
			||||||
	id      int
 | 
						id      int
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (l *logQueue) add(logs ...*vm.Log) {
 | 
					func (l *logQueue) add(logs ...vmlog) {
 | 
				
			||||||
	l.mu.Lock()
 | 
						l.mu.Lock()
 | 
				
			||||||
	defer l.mu.Unlock()
 | 
						defer l.mu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	l.logs = append(l.logs, logs...)
 | 
						l.logs = append(l.logs, logs...)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (l *logQueue) get() vm.Logs {
 | 
					func (l *logQueue) get() []vmlog {
 | 
				
			||||||
	l.mu.Lock()
 | 
						l.mu.Lock()
 | 
				
			||||||
	defer l.mu.Unlock()
 | 
						defer l.mu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -556,13 +561,16 @@ func newFilterId() (string, error) {
 | 
				
			|||||||
	return "0x" + hex.EncodeToString(subid[:]), nil
 | 
						return "0x" + hex.EncodeToString(subid[:]), nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// returnLogs is a helper that will return an empty logs array case the given logs is nil, otherwise is will return the
 | 
					// toRPCLogs is a helper that will convert a vm.Logs array to an structure which
 | 
				
			||||||
// given logs. The RPC interfaces defines that always an array is returned.
 | 
					// can hold additional information about the logs such as whether it was deleted.
 | 
				
			||||||
func returnLogs(logs vm.Logs) vm.Logs {
 | 
					// Additionally when nil is given it will by default instead create an empty slice
 | 
				
			||||||
	if logs == nil {
 | 
					// instead. This is required by the RPC specification.
 | 
				
			||||||
		return vm.Logs{}
 | 
					func toRPCLogs(logs vm.Logs, removed bool) []vmlog {
 | 
				
			||||||
 | 
						convertedLogs := make([]vmlog, len(logs))
 | 
				
			||||||
 | 
						for i, log := range logs {
 | 
				
			||||||
 | 
							convertedLogs[i] = vmlog{Log: log, Removed: removed}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return logs
 | 
						return convertedLogs
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will
 | 
					// returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -39,7 +39,7 @@ type Filter struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	BlockCallback       func(*types.Block, vm.Logs)
 | 
						BlockCallback       func(*types.Block, vm.Logs)
 | 
				
			||||||
	TransactionCallback func(*types.Transaction)
 | 
						TransactionCallback func(*types.Transaction)
 | 
				
			||||||
	LogsCallback        func(vm.Logs)
 | 
						LogCallback         func(*vm.Log, bool)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Create a new filter which uses a bloom filter on blocks to figure out whether a particular block
 | 
					// Create a new filter which uses a bloom filter on blocks to figure out whether a particular block
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -46,6 +46,7 @@ func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	fs.sub = mux.Subscribe(
 | 
						fs.sub = mux.Subscribe(
 | 
				
			||||||
		//core.PendingBlockEvent{},
 | 
							//core.PendingBlockEvent{},
 | 
				
			||||||
 | 
							core.RemovedLogEvent{},
 | 
				
			||||||
		core.ChainEvent{},
 | 
							core.ChainEvent{},
 | 
				
			||||||
		core.TxPreEvent{},
 | 
							core.TxPreEvent{},
 | 
				
			||||||
		vm.Logs(nil),
 | 
							vm.Logs(nil),
 | 
				
			||||||
@@ -96,7 +97,7 @@ func (fs *FilterSystem) filterLoop() {
 | 
				
			|||||||
		case core.ChainEvent:
 | 
							case core.ChainEvent:
 | 
				
			||||||
			fs.filterMu.RLock()
 | 
								fs.filterMu.RLock()
 | 
				
			||||||
			for id, filter := range fs.filters {
 | 
								for id, filter := range fs.filters {
 | 
				
			||||||
				if filter.BlockCallback != nil && fs.created[id].Before(event.Time) {
 | 
									if filter.BlockCallback != nil && !fs.created[id].After(event.Time) {
 | 
				
			||||||
					filter.BlockCallback(ev.Block, ev.Logs)
 | 
										filter.BlockCallback(ev.Block, ev.Logs)
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
@@ -105,7 +106,7 @@ func (fs *FilterSystem) filterLoop() {
 | 
				
			|||||||
		case core.TxPreEvent:
 | 
							case core.TxPreEvent:
 | 
				
			||||||
			fs.filterMu.RLock()
 | 
								fs.filterMu.RLock()
 | 
				
			||||||
			for id, filter := range fs.filters {
 | 
								for id, filter := range fs.filters {
 | 
				
			||||||
				if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) {
 | 
									if filter.TransactionCallback != nil && !fs.created[id].After(event.Time) {
 | 
				
			||||||
					filter.TransactionCallback(ev.Tx)
 | 
										filter.TransactionCallback(ev.Tx)
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
@@ -114,10 +115,20 @@ func (fs *FilterSystem) filterLoop() {
 | 
				
			|||||||
		case vm.Logs:
 | 
							case vm.Logs:
 | 
				
			||||||
			fs.filterMu.RLock()
 | 
								fs.filterMu.RLock()
 | 
				
			||||||
			for id, filter := range fs.filters {
 | 
								for id, filter := range fs.filters {
 | 
				
			||||||
				if filter.LogsCallback != nil && fs.created[id].Before(event.Time) {
 | 
									if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
 | 
				
			||||||
					msgs := filter.FilterLogs(ev)
 | 
										for _, log := range filter.FilterLogs(ev) {
 | 
				
			||||||
					if len(msgs) > 0 {
 | 
											filter.LogCallback(log, false)
 | 
				
			||||||
						filter.LogsCallback(msgs)
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								fs.filterMu.RUnlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							case core.RemovedLogEvent:
 | 
				
			||||||
 | 
								fs.filterMu.RLock()
 | 
				
			||||||
 | 
								for id, filter := range fs.filters {
 | 
				
			||||||
 | 
									if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
 | 
				
			||||||
 | 
										for _, removedLog := range ev.Logs {
 | 
				
			||||||
 | 
											filter.LogCallback(removedLog, true)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										87
									
								
								eth/filters/filter_system_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										87
									
								
								eth/filters/filter_system_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,87 @@
 | 
				
			|||||||
 | 
					package filters
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/ethereum/go-ethereum/core"
 | 
				
			||||||
 | 
						"github.com/ethereum/go-ethereum/core/types"
 | 
				
			||||||
 | 
						"github.com/ethereum/go-ethereum/core/vm"
 | 
				
			||||||
 | 
						"github.com/ethereum/go-ethereum/event"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestCallbacks(t *testing.T) {
 | 
				
			||||||
 | 
						var (
 | 
				
			||||||
 | 
							mux            event.TypeMux
 | 
				
			||||||
 | 
							fs             = NewFilterSystem(&mux)
 | 
				
			||||||
 | 
							blockDone      = make(chan struct{})
 | 
				
			||||||
 | 
							txDone         = make(chan struct{})
 | 
				
			||||||
 | 
							logDone        = make(chan struct{})
 | 
				
			||||||
 | 
							removedLogDone = make(chan struct{})
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						blockFilter := &Filter{
 | 
				
			||||||
 | 
							BlockCallback: func(*types.Block, vm.Logs) {
 | 
				
			||||||
 | 
								close(blockDone)
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						txFilter := &Filter{
 | 
				
			||||||
 | 
							TransactionCallback: func(*types.Transaction) {
 | 
				
			||||||
 | 
								close(txDone)
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						logFilter := &Filter{
 | 
				
			||||||
 | 
							LogCallback: func(l *vm.Log, oob bool) {
 | 
				
			||||||
 | 
								if !oob {
 | 
				
			||||||
 | 
									close(logDone)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						removedLogFilter := &Filter{
 | 
				
			||||||
 | 
							LogCallback: func(l *vm.Log, oob bool) {
 | 
				
			||||||
 | 
								if oob {
 | 
				
			||||||
 | 
									close(removedLogDone)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						fs.Add(blockFilter)
 | 
				
			||||||
 | 
						fs.Add(txFilter)
 | 
				
			||||||
 | 
						fs.Add(logFilter)
 | 
				
			||||||
 | 
						fs.Add(removedLogFilter)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						mux.Post(core.ChainEvent{})
 | 
				
			||||||
 | 
						mux.Post(core.TxPreEvent{})
 | 
				
			||||||
 | 
						mux.Post(core.RemovedLogEvent{vm.Logs{&vm.Log{}}})
 | 
				
			||||||
 | 
						mux.Post(vm.Logs{&vm.Log{}})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						const dura = 5 * time.Second
 | 
				
			||||||
 | 
						failTimer := time.NewTimer(dura)
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-blockDone:
 | 
				
			||||||
 | 
						case <-failTimer.C:
 | 
				
			||||||
 | 
							t.Error("block filter failed to trigger (timeout)")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						failTimer.Reset(dura)
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-txDone:
 | 
				
			||||||
 | 
						case <-failTimer.C:
 | 
				
			||||||
 | 
							t.Error("transaction filter failed to trigger (timeout)")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						failTimer.Reset(dura)
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-logDone:
 | 
				
			||||||
 | 
						case <-failTimer.C:
 | 
				
			||||||
 | 
							t.Error("log filter failed to trigger (timeout)")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						failTimer.Reset(dura)
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-removedLogDone:
 | 
				
			||||||
 | 
						case <-failTimer.C:
 | 
				
			||||||
 | 
							t.Error("removed log filter failed to trigger (timeout)")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Reference in New Issue
	
	Block a user