rpc: refactor subscriptions and filters

This commit is contained in:
Bas van Kervel
2016-07-27 17:47:46 +02:00
committed by Bas van Kervel
parent 3b39d4d1c1
commit 47ff813012
15 changed files with 1304 additions and 1255 deletions

View File

@ -17,292 +17,414 @@
package filters
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math/big"
"sync"
"time"
"golang.org/x/net/context"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
"golang.org/x/net/context"
)
var (
filterTickerTime = 5 * time.Minute
deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline
)
// byte will be inferred
const (
unknownFilterTy = iota
blockFilterTy
transactionFilterTy
logFilterTy
)
// filter is a helper struct that holds meta information over the filter type
// and associated subscription in the event system.
type filter struct {
typ Type
deadline *time.Timer // filter is inactiv when deadline triggers
hashes []common.Hash
crit FilterCriteria
logs []Log
s *Subscription // associated subscription in event system
}
// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Ethereum protocol such als blocks, transactions and logs.
type PublicFilterAPI struct {
mux *event.TypeMux
quit chan struct{}
chainDb ethdb.Database
filterManager *FilterSystem
filterMapMu sync.RWMutex
filterMapping map[string]int // maps between filter internal filter identifiers and external filter identifiers
logMu sync.RWMutex
logQueue map[int]*logQueue
blockMu sync.RWMutex
blockQueue map[int]*hashQueue
transactionMu sync.RWMutex
transactionQueue map[int]*hashQueue
mux *event.TypeMux
quit chan struct{}
chainDb ethdb.Database
events *EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*filter
}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(chainDb ethdb.Database, mux *event.TypeMux) *PublicFilterAPI {
svc := &PublicFilterAPI{
mux: mux,
chainDb: chainDb,
filterManager: NewFilterSystem(mux),
filterMapping: make(map[string]int),
logQueue: make(map[int]*logQueue),
blockQueue: make(map[int]*hashQueue),
transactionQueue: make(map[int]*hashQueue),
api := &PublicFilterAPI{
mux: mux,
chainDb: chainDb,
events: NewEventSystem(mux),
filters: make(map[rpc.ID]*filter),
}
go svc.start()
return svc
go api.timeoutLoop()
return api
}
// Stop quits the work loop.
func (s *PublicFilterAPI) Stop() {
close(s.quit)
}
// start the work loop, wait and process events.
func (s *PublicFilterAPI) start() {
timer := time.NewTicker(2 * time.Second)
defer timer.Stop()
done:
// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used.
// Tt is started when the api is created.
func (api *PublicFilterAPI) timeoutLoop() {
ticker := time.NewTicker(5 * time.Minute)
for {
select {
case <-timer.C:
s.filterManager.Lock() // lock order like filterLoop()
s.logMu.Lock()
for id, filter := range s.logQueue {
if time.Since(filter.timeout) > filterTickerTime {
s.filterManager.Remove(id)
delete(s.logQueue, id)
}
<-ticker.C
api.filtersMu.Lock()
for id, f := range api.filters {
select {
case <-f.deadline.C:
f.s.Unsubscribe()
delete(api.filters, id)
default:
continue
}
s.logMu.Unlock()
s.blockMu.Lock()
for id, filter := range s.blockQueue {
if time.Since(filter.timeout) > filterTickerTime {
s.filterManager.Remove(id)
delete(s.blockQueue, id)
}
}
s.blockMu.Unlock()
s.transactionMu.Lock()
for id, filter := range s.transactionQueue {
if time.Since(filter.timeout) > filterTickerTime {
s.filterManager.Remove(id)
delete(s.transactionQueue, id)
}
}
s.transactionMu.Unlock()
s.filterManager.Unlock()
case <-s.quit:
break done
}
api.filtersMu.Unlock()
}
}
// NewBlockFilter create a new filter that returns blocks that are included into the canonical chain.
func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
// protect filterManager.Add() and setting of filter fields
s.filterManager.Lock()
defer s.filterManager.Unlock()
// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
// as transactions enter the pending state.
//
// It is part of the filter package because this filter can be used throug the
// `eth_getFilterChanges` polling method that is also used for log filters.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter
func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
var (
pendingTxs = make(chan common.Hash)
pendingTxSub = api.events.SubscribePendingTxEvents(pendingTxs)
)
externalId, err := newFilterId()
if err != nil {
return "", err
}
api.filtersMu.Lock()
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub}
api.filtersMu.Unlock()
filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, ChainFilter)
if err != nil {
return "", err
}
s.blockMu.Lock()
s.blockQueue[id] = &hashQueue{timeout: time.Now()}
s.blockMu.Unlock()
filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
s.blockMu.Lock()
defer s.blockMu.Unlock()
if queue := s.blockQueue[id]; queue != nil {
queue.add(block.Hash())
}
}
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return externalId, nil
}
// NewPendingTransactionFilter creates a filter that returns new pending transactions.
func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
// protect filterManager.Add() and setting of filter fields
s.filterManager.Lock()
defer s.filterManager.Unlock()
externalId, err := newFilterId()
if err != nil {
return "", err
}
filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, PendingTxFilter)
if err != nil {
return "", err
}
s.transactionMu.Lock()
s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
s.transactionMu.Unlock()
filter.TransactionCallback = func(tx *types.Transaction) {
s.transactionMu.Lock()
defer s.transactionMu.Unlock()
if queue := s.transactionQueue[id]; queue != nil {
queue.add(tx.Hash())
}
}
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return externalId, nil
}
// newLogFilter creates a new log filter.
func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) {
// protect filterManager.Add() and setting of filter fields
s.filterManager.Lock()
defer s.filterManager.Unlock()
filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, LogFilter)
if err != nil {
return 0, err
}
s.logMu.Lock()
s.logQueue[id] = &logQueue{timeout: time.Now()}
s.logMu.Unlock()
filter.SetBeginBlock(earliest)
filter.SetEndBlock(latest)
filter.SetAddresses(addresses)
filter.SetTopics(topics)
filter.LogCallback = func(log *vm.Log, removed bool) {
if callback != nil {
callback(log, removed)
} else {
s.logMu.Lock()
defer s.logMu.Unlock()
if queue := s.logQueue[id]; queue != nil {
queue.add(vmlog{log, removed})
go func() {
for {
select {
case ph := <-pendingTxs:
api.filtersMu.Lock()
if f, found := api.filters[pendingTxSub.ID]; found {
f.hashes = append(f.hashes, ph)
}
api.filtersMu.Unlock()
case <-pendingTxSub.Err():
api.filtersMu.Lock()
delete(api.filters, pendingTxSub.ID)
api.filtersMu.Unlock()
return
}
}
}()
return pendingTxSub.ID
}
// NewPendingTransactions creates a subscription that is triggered each time a transaction
// enters the transaction pool and was signed from one of the transactions this nodes manages.
func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
return id, nil
rpcSub := notifier.CreateSubscription()
go func() {
txHashes := make(chan common.Hash)
pendingTxSub := api.events.SubscribePendingTxEvents(txHashes)
for {
select {
case h := <-txHashes:
notifier.Notify(rpcSub.ID, h)
case <-rpcSub.Err():
pendingTxSub.Unsubscribe()
return
case <-notifier.Closed():
pendingTxSub.Unsubscribe()
return
}
}
}()
return rpcSub, nil
}
// NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
// It is part of the filter package since polling goes with eth_getFilterChanges.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter
func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
var (
headers = make(chan *types.Header)
headerSub = api.events.SubscribeNewHeads(headers)
)
api.filtersMu.Lock()
api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub}
api.filtersMu.Unlock()
go func() {
for {
select {
case h := <-headers:
api.filtersMu.Lock()
if f, found := api.filters[headerSub.ID]; found {
f.hashes = append(f.hashes, h.Hash())
}
api.filtersMu.Unlock()
case <-headerSub.Err():
api.filtersMu.Lock()
delete(api.filters, headerSub.ID)
api.filtersMu.Unlock()
return
}
}
}()
return headerSub.ID
}
// NewHeads send a notification each time a new (header) block is appended to the chain.
func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
go func() {
headers := make(chan *types.Header)
headersSub := api.events.SubscribeNewHeads(headers)
for {
select {
case h := <-headers:
notifier.Notify(rpcSub.ID, h)
case <-rpcSub.Err():
headersSub.Unsubscribe()
return
case <-notifier.Closed():
headersSub.Unsubscribe()
return
}
}
}()
return rpcSub, nil
}
// Logs creates a subscription that fires for all new log that match the given filter criteria.
func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) {
func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
var (
externalId string
subscription rpc.Subscription
err error
)
rpcSub := notifier.CreateSubscription()
if externalId, err = newFilterId(); err != nil {
return nil, err
}
go func() {
matchedLogs := make(chan []Log)
logsSub := api.events.SubscribeLogs(crit, matchedLogs)
// uninstall filter when subscription is unsubscribed/cancelled
if subscription, err = notifier.NewSubscription(func(string) {
s.UninstallFilter(externalId)
}); err != nil {
return nil, err
}
notifySubscriber := func(log *vm.Log, removed bool) {
rpcLog := toRPCLogs(vm.Logs{log}, removed)
if err := subscription.Notify(rpcLog); err != nil {
subscription.Cancel()
for {
select {
case logs := <-matchedLogs:
for _, log := range logs {
notifier.Notify(rpcSub.ID, &log)
}
case <-rpcSub.Err(): // client send an unsubscribe request
logsSub.Unsubscribe()
return
case <-notifier.Closed(): // connection dropped
logsSub.Unsubscribe()
return
}
}
}
}()
// from and to block number are not used since subscriptions don't allow you to travel to "time"
var id int
if len(args.Addresses) > 0 {
id, err = s.newLogFilter(-1, -1, args.Addresses, args.Topics, notifySubscriber)
} else {
id, err = s.newLogFilter(-1, -1, nil, args.Topics, notifySubscriber)
}
if err != nil {
subscription.Cancel()
return nil, err
}
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return subscription, err
return rpcSub, nil
}
// NewFilterArgs represents a request to create a new filter.
type NewFilterArgs struct {
FromBlock rpc.BlockNumber
ToBlock rpc.BlockNumber
// FilterCriteria represents a request to create a new filter.
type FilterCriteria struct {
FromBlock *big.Int
ToBlock *big.Int
Addresses []common.Address
Topics [][]common.Hash
}
// NewFilter creates a new filter and returns the filter id. It can be
// used to retrieve logs when the state changes. This method cannot be
// used to fetch logs that are already stored in the state.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter
func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID {
var (
logs = make(chan []Log)
logsSub = api.events.SubscribeLogs(crit, logs)
)
if crit.FromBlock == nil {
crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
if crit.ToBlock == nil {
crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
api.filtersMu.Lock()
api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]Log, 0), s: logsSub}
api.filtersMu.Unlock()
go func() {
for {
select {
case l := <-logs:
api.filtersMu.Lock()
if f, found := api.filters[logsSub.ID]; found {
f.logs = append(f.logs, l...)
}
api.filtersMu.Unlock()
case <-logsSub.Err():
api.filtersMu.Lock()
delete(api.filters, logsSub.ID)
api.filtersMu.Unlock()
return
}
}
}()
return logsSub.ID
}
// GetLogs returns logs matching the given argument that are stored within the state.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
func (api *PublicFilterAPI) GetLogs(crit FilterCriteria) []Log {
if crit.FromBlock == nil {
crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
if crit.ToBlock == nil {
crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
filter := New(api.chainDb)
filter.SetBeginBlock(crit.FromBlock.Int64())
filter.SetEndBlock(crit.ToBlock.Int64())
filter.SetAddresses(crit.Addresses)
filter.SetTopics(crit.Topics)
return returnLogs(filter.Find())
}
// UninstallFilter removes the filter with the given filter id.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter
func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool {
api.filtersMu.Lock()
f, found := api.filters[id]
if found {
delete(api.filters, id)
}
api.filtersMu.Unlock()
if found {
f.s.Unsubscribe()
}
return found
}
// GetFilterLogs returns the logs for the filter with the given id.
// If the filter could not be found an empty array of logs is returned.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs
func (api *PublicFilterAPI) GetFilterLogs(id rpc.ID) []Log {
api.filtersMu.Lock()
f, found := api.filters[id]
api.filtersMu.Unlock()
if !found || f.typ != LogsSubscription {
return []Log{}
}
filter := New(api.chainDb)
filter.SetBeginBlock(f.crit.FromBlock.Int64())
filter.SetEndBlock(f.crit.ToBlock.Int64())
filter.SetAddresses(f.crit.Addresses)
filter.SetTopics(f.crit.Topics)
return returnLogs(filter.Find())
}
// GetFilterChanges returns the logs for the filter with the given id since
// last time is was called. This can be used for polling.
//
// For pending transaction and block filters the result is []common.Hash.
// (pending)Log filters return []Log. If the filter could not be found
// []interface{}{} is returned.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} {
api.filtersMu.Lock()
defer api.filtersMu.Unlock()
if f, found := api.filters[id]; found {
if !f.deadline.Stop() {
// timer expired but filter is not yet removed in timeout loop
// receive timer value and reset timer
<-f.deadline.C
}
f.deadline.Reset(deadline)
switch f.typ {
case PendingTransactionsSubscription, BlocksSubscription:
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes)
case PendingLogsSubscription, LogsSubscription:
logs := f.logs
f.logs = nil
return returnLogs(logs)
}
}
return []interface{}{}
}
// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
// otherwise the given hashes array is returned.
func returnHashes(hashes []common.Hash) []common.Hash {
if hashes == nil {
return []common.Hash{}
}
return hashes
}
// returnLogs is a helper that will return an empty log array in case the given logs array is nil,
// otherwise the given logs array is returned.
func returnLogs(logs []Log) []Log {
if logs == nil {
return []Log{}
}
return logs
}
// UnmarshalJSON sets *args fields with given data.
func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
type input struct {
From *rpc.BlockNumber `json:"fromBlock"`
ToBlock *rpc.BlockNumber `json:"toBlock"`
@ -316,15 +438,15 @@ func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
}
if raw.From == nil || raw.From.Int64() < 0 {
args.FromBlock = rpc.LatestBlockNumber
args.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
} else {
args.FromBlock = *raw.From
args.FromBlock = big.NewInt(raw.From.Int64())
}
if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 {
args.ToBlock = rpc.LatestBlockNumber
args.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
} else {
args.ToBlock = *raw.ToBlock
args.ToBlock = big.NewInt(raw.ToBlock.Int64())
}
args.Addresses = []common.Address{}
@ -414,255 +536,3 @@ func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
return nil
}
// NewFilter creates a new filter and returns the filter id. It can be uses to retrieve logs.
func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
externalId, err := newFilterId()
if err != nil {
return "", err
}
var id int
if len(args.Addresses) > 0 {
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics, nil)
} else {
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics, nil)
}
if err != nil {
return "", err
}
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return externalId, nil
}
// GetLogs returns the logs matching the given argument.
func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog {
filter := New(s.chainDb)
filter.SetBeginBlock(args.FromBlock.Int64())
filter.SetEndBlock(args.ToBlock.Int64())
filter.SetAddresses(args.Addresses)
filter.SetTopics(args.Topics)
return toRPCLogs(filter.Find(), false)
}
// UninstallFilter removes the filter with the given filter id.
func (s *PublicFilterAPI) UninstallFilter(filterId string) bool {
s.filterManager.Lock()
defer s.filterManager.Unlock()
s.filterMapMu.Lock()
id, ok := s.filterMapping[filterId]
if !ok {
s.filterMapMu.Unlock()
return false
}
delete(s.filterMapping, filterId)
s.filterMapMu.Unlock()
s.filterManager.Remove(id)
s.logMu.Lock()
if _, ok := s.logQueue[id]; ok {
delete(s.logQueue, id)
s.logMu.Unlock()
return true
}
s.logMu.Unlock()
s.blockMu.Lock()
if _, ok := s.blockQueue[id]; ok {
delete(s.blockQueue, id)
s.blockMu.Unlock()
return true
}
s.blockMu.Unlock()
s.transactionMu.Lock()
if _, ok := s.transactionQueue[id]; ok {
delete(s.transactionQueue, id)
s.transactionMu.Unlock()
return true
}
s.transactionMu.Unlock()
return false
}
// getFilterType is a helper utility that determine the type of filter for the given filter id.
func (s *PublicFilterAPI) getFilterType(id int) byte {
if _, ok := s.blockQueue[id]; ok {
return blockFilterTy
} else if _, ok := s.transactionQueue[id]; ok {
return transactionFilterTy
} else if _, ok := s.logQueue[id]; ok {
return logFilterTy
}
return unknownFilterTy
}
// blockFilterChanged returns a collection of block hashes for the block filter with the given id.
func (s *PublicFilterAPI) blockFilterChanged(id int) []common.Hash {
s.blockMu.Lock()
defer s.blockMu.Unlock()
if s.blockQueue[id] != nil {
return s.blockQueue[id].get()
}
return nil
}
// transactionFilterChanged returns a collection of transaction hashes for the pending
// transaction filter with the given id.
func (s *PublicFilterAPI) transactionFilterChanged(id int) []common.Hash {
s.blockMu.Lock()
defer s.blockMu.Unlock()
if s.transactionQueue[id] != nil {
return s.transactionQueue[id].get()
}
return nil
}
// logFilterChanged returns a collection of logs for the log filter with the given id.
func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog {
s.logMu.Lock()
defer s.logMu.Unlock()
if s.logQueue[id] != nil {
return s.logQueue[id].get()
}
return nil
}
// GetFilterLogs returns the logs for the filter with the given id.
func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
s.filterMapMu.RLock()
id, ok := s.filterMapping[filterId]
s.filterMapMu.RUnlock()
if !ok {
return toRPCLogs(nil, false)
}
if filter := s.filterManager.Get(id); filter != nil {
return toRPCLogs(filter.Find(), false)
}
return toRPCLogs(nil, false)
}
// GetFilterChanges returns the logs for the filter with the given id since last time is was called.
// This can be used for polling.
func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
s.filterMapMu.RLock()
id, ok := s.filterMapping[filterId]
s.filterMapMu.RUnlock()
if !ok { // filter not found
return []interface{}{}
}
switch s.getFilterType(id) {
case blockFilterTy:
return returnHashes(s.blockFilterChanged(id))
case transactionFilterTy:
return returnHashes(s.transactionFilterChanged(id))
case logFilterTy:
return s.logFilterChanged(id)
}
return []interface{}{}
}
type vmlog struct {
*vm.Log
Removed bool `json:"removed"`
}
type logQueue struct {
mu sync.Mutex
logs []vmlog
timeout time.Time
id int
}
func (l *logQueue) add(logs ...vmlog) {
l.mu.Lock()
defer l.mu.Unlock()
l.logs = append(l.logs, logs...)
}
func (l *logQueue) get() []vmlog {
l.mu.Lock()
defer l.mu.Unlock()
l.timeout = time.Now()
tmp := l.logs
l.logs = nil
return tmp
}
type hashQueue struct {
mu sync.Mutex
hashes []common.Hash
timeout time.Time
id int
}
func (l *hashQueue) add(hashes ...common.Hash) {
l.mu.Lock()
defer l.mu.Unlock()
l.hashes = append(l.hashes, hashes...)
}
func (l *hashQueue) get() []common.Hash {
l.mu.Lock()
defer l.mu.Unlock()
l.timeout = time.Now()
tmp := l.hashes
l.hashes = nil
return tmp
}
// newFilterId generates a new random filter identifier that can be exposed to the outer world. By publishing random
// identifiers it is not feasible for DApp's to guess filter id's for other DApp's and uninstall or poll for them
// causing the affected DApp to miss data.
func newFilterId() (string, error) {
var subid [16]byte
n, _ := rand.Read(subid[:])
if n != 16 {
return "", errors.New("Unable to generate filter id")
}
return "0x" + hex.EncodeToString(subid[:]), nil
}
// toRPCLogs is a helper that will convert a vm.Logs array to an structure which
// can hold additional information about the logs such as whether it was deleted.
// Additionally when nil is given it will by default instead create an empty slice
// instead. This is required by the RPC specification.
func toRPCLogs(logs vm.Logs, removed bool) []vmlog {
convertedLogs := make([]vmlog, len(logs))
for i, log := range logs {
convertedLogs[i] = vmlog{Log: log, Removed: removed}
}
return convertedLogs
}
// returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will
// return the given hashes. The RPC interfaces defines that always an array is returned.
func returnHashes(hashes []common.Hash) []common.Hash {
if hashes == nil {
return []common.Hash{}
}
return hashes
}

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package filters_test
package filters
import (
"encoding/json"
@ -22,7 +22,6 @@ import (
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/rpc"
)
@ -39,14 +38,14 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
)
// default values
var test0 filters.NewFilterArgs
var test0 FilterCriteria
if err := json.Unmarshal([]byte("{}"), &test0); err != nil {
t.Fatal(err)
}
if test0.FromBlock != rpc.LatestBlockNumber {
if test0.FromBlock.Int64() != rpc.LatestBlockNumber.Int64() {
t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.FromBlock)
}
if test0.ToBlock != rpc.LatestBlockNumber {
if test0.ToBlock.Int64() != rpc.LatestBlockNumber.Int64() {
t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.ToBlock)
}
if len(test0.Addresses) != 0 {
@ -57,20 +56,20 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
}
// from, to block number
var test1 filters.NewFilterArgs
var test1 FilterCriteria
vector := fmt.Sprintf(`{"fromBlock":"0x%x","toBlock":"0x%x"}`, fromBlock, toBlock)
if err := json.Unmarshal([]byte(vector), &test1); err != nil {
t.Fatal(err)
}
if test1.FromBlock != fromBlock {
if test1.FromBlock.Int64() != fromBlock.Int64() {
t.Fatalf("expected FromBlock %d, got %d", fromBlock, test1.FromBlock)
}
if test1.ToBlock != toBlock {
if test1.ToBlock.Int64() != toBlock.Int64() {
t.Fatalf("expected ToBlock %d, got %d", toBlock, test1.ToBlock)
}
// single address
var test2 filters.NewFilterArgs
var test2 FilterCriteria
vector = fmt.Sprintf(`{"address": "%s"}`, address0.Hex())
if err := json.Unmarshal([]byte(vector), &test2); err != nil {
t.Fatal(err)
@ -83,7 +82,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
}
// multiple address
var test3 filters.NewFilterArgs
var test3 FilterCriteria
vector = fmt.Sprintf(`{"address": ["%s", "%s"]}`, address0.Hex(), address1.Hex())
if err := json.Unmarshal([]byte(vector), &test3); err != nil {
t.Fatal(err)
@ -99,7 +98,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
}
// single topic
var test4 filters.NewFilterArgs
var test4 FilterCriteria
vector = fmt.Sprintf(`{"topics": ["%s"]}`, topic0.Hex())
if err := json.Unmarshal([]byte(vector), &test4); err != nil {
t.Fatal(err)
@ -115,7 +114,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
}
// test multiple "AND" topics
var test5 filters.NewFilterArgs
var test5 FilterCriteria
vector = fmt.Sprintf(`{"topics": ["%s", "%s"]}`, topic0.Hex(), topic1.Hex())
if err := json.Unmarshal([]byte(vector), &test5); err != nil {
t.Fatal(err)
@ -137,7 +136,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
}
// test optional topic
var test6 filters.NewFilterArgs
var test6 FilterCriteria
vector = fmt.Sprintf(`{"topics": ["%s", null, "%s"]}`, topic0.Hex(), topic2.Hex())
if err := json.Unmarshal([]byte(vector), &test6); err != nil {
t.Fatal(err)
@ -165,7 +164,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
}
// test OR topics
var test7 filters.NewFilterArgs
var test7 FilterCriteria
vector = fmt.Sprintf(`{"topics": [["%s", "%s"], null, ["%s", null]]}`, topic0.Hex(), topic1.Hex(), topic2.Hex())
if err := json.Unmarshal([]byte(vector), &test7); err != nil {
t.Fatal(err)

View File

@ -23,15 +23,10 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
)
type AccountChange struct {
Address, StateAddress []byte
}
// Filtering interface
// Filter can be used to retrieve and filter logs
type Filter struct {
created time.Time
@ -39,70 +34,72 @@ type Filter struct {
begin, end int64
addresses []common.Address
topics [][]common.Hash
BlockCallback func(*types.Block, vm.Logs)
TransactionCallback func(*types.Transaction)
LogCallback func(*vm.Log, bool)
}
// Create a new filter which uses a bloom filter on blocks to figure out whether a particular block
// is interesting or not.
// New creates a new filter which uses a bloom filter on blocks to figure out whether
// a particular block is interesting or not.
func New(db ethdb.Database) *Filter {
return &Filter{db: db}
}
// Set the earliest and latest block for filtering.
// SetBeginBlock sets the earliest block for filtering.
// -1 = latest block (i.e., the current block)
// hash = particular hash from-to
func (self *Filter) SetBeginBlock(begin int64) {
self.begin = begin
func (f *Filter) SetBeginBlock(begin int64) {
f.begin = begin
}
func (self *Filter) SetEndBlock(end int64) {
self.end = end
// SetEndBlock sets the latest block for filtering.
func (f *Filter) SetEndBlock(end int64) {
f.end = end
}
func (self *Filter) SetAddresses(addr []common.Address) {
self.addresses = addr
// SetAddresses matches only logs that are generated from addresses that are included
// in the given addresses.
func (f *Filter) SetAddresses(addr []common.Address) {
f.addresses = addr
}
func (self *Filter) SetTopics(topics [][]common.Hash) {
self.topics = topics
// SetTopics matches only logs that have topics matching the given topics.
func (f *Filter) SetTopics(topics [][]common.Hash) {
f.topics = topics
}
// Run filters logs with the current parameters set
func (self *Filter) Find() vm.Logs {
latestHash := core.GetHeadBlockHash(self.db)
latestBlock := core.GetBlock(self.db, latestHash, core.GetBlockNumber(self.db, latestHash))
func (f *Filter) Find() []Log {
latestHash := core.GetHeadBlockHash(f.db)
latestBlock := core.GetBlock(f.db, latestHash, core.GetBlockNumber(f.db, latestHash))
if latestBlock == nil {
return vm.Logs{}
return []Log{}
}
var beginBlockNo uint64 = uint64(self.begin)
if self.begin == -1 {
var beginBlockNo uint64 = uint64(f.begin)
if f.begin == -1 {
beginBlockNo = latestBlock.NumberU64()
}
var endBlockNo uint64 = uint64(self.end)
if self.end == -1 {
endBlockNo := uint64(f.end)
if f.end == -1 {
endBlockNo = latestBlock.NumberU64()
}
// if no addresses are present we can't make use of fast search which
// uses the mipmap bloom filters to check for fast inclusion and uses
// higher range probability in order to ensure at least a false positive
if len(self.addresses) == 0 {
return self.getLogs(beginBlockNo, endBlockNo)
if len(f.addresses) == 0 {
return f.getLogs(beginBlockNo, endBlockNo)
}
return self.mipFind(beginBlockNo, endBlockNo, 0)
return f.mipFind(beginBlockNo, endBlockNo, 0)
}
func (self *Filter) mipFind(start, end uint64, depth int) (logs vm.Logs) {
func (f *Filter) mipFind(start, end uint64, depth int) (logs []Log) {
level := core.MIPMapLevels[depth]
// normalise numerator so we can work in level specific batches and
// work with the proper range checks
for num := start / level * level; num <= end; num += level {
// find addresses in bloom filters
bloom := core.GetMipmapBloom(self.db, num, level)
for _, addr := range self.addresses {
bloom := core.GetMipmapBloom(f.db, num, level)
for _, addr := range f.addresses {
if bloom.TestBytes(addr[:]) {
// range check normalised values and make sure that
// we're resolving the correct range instead of the
@ -110,9 +107,9 @@ func (self *Filter) mipFind(start, end uint64, depth int) (logs vm.Logs) {
start := uint64(math.Max(float64(num), float64(start)))
end := uint64(math.Min(float64(num+level-1), float64(end)))
if depth+1 == len(core.MIPMapLevels) {
logs = append(logs, self.getLogs(start, end)...)
logs = append(logs, f.getLogs(start, end)...)
} else {
logs = append(logs, self.mipFind(start, end, depth+1)...)
logs = append(logs, f.mipFind(start, end, depth+1)...)
}
// break so we don't check the same range for each
// possible address. Checks on multiple addresses
@ -125,12 +122,15 @@ func (self *Filter) mipFind(start, end uint64, depth int) (logs vm.Logs) {
return logs
}
func (self *Filter) getLogs(start, end uint64) (logs vm.Logs) {
func (f *Filter) getLogs(start, end uint64) (logs []Log) {
var block *types.Block
for i := start; i <= end; i++ {
var block *types.Block
hash := core.GetCanonicalHash(self.db, i)
hash := core.GetCanonicalHash(f.db, i)
if hash != (common.Hash{}) {
block = core.GetBlock(self.db, hash, i)
block = core.GetBlock(f.db, hash, i)
} else { // block not found
return logs
}
if block == nil { // block not found/written
return logs
@ -138,16 +138,20 @@ func (self *Filter) getLogs(start, end uint64) (logs vm.Logs) {
// Use bloom filtering to see if this block is interesting given the
// current parameters
if self.bloomFilter(block) {
if f.bloomFilter(block) {
// Get the logs of the block
var (
receipts = core.GetBlockReceipts(self.db, block.Hash(), i)
unfiltered vm.Logs
receipts = core.GetBlockReceipts(f.db, block.Hash(), i)
unfiltered []Log
)
for _, receipt := range receipts {
unfiltered = append(unfiltered, receipt.Logs...)
rl := make([]Log, len(receipt.Logs))
for i, l := range receipt.Logs {
rl[i] = Log{l, false}
}
unfiltered = append(unfiltered, rl...)
}
logs = append(logs, self.FilterLogs(unfiltered)...)
logs = append(logs, filterLogs(unfiltered, f.addresses, f.topics)...)
}
}
@ -164,26 +168,25 @@ func includes(addresses []common.Address, a common.Address) bool {
return false
}
func (self *Filter) FilterLogs(logs vm.Logs) vm.Logs {
var ret vm.Logs
func filterLogs(logs []Log, addresses []common.Address, topics [][]common.Hash) []Log {
var ret []Log
// Filter the logs for interesting stuff
Logs:
for _, log := range logs {
if len(self.addresses) > 0 && !includes(self.addresses, log.Address) {
if len(addresses) > 0 && !includes(addresses, log.Address) {
continue
}
logTopics := make([]common.Hash, len(self.topics))
logTopics := make([]common.Hash, len(topics))
copy(logTopics, log.Topics)
// If the to filtered topics is greater than the amount of topics in
// logs, skip.
if len(self.topics) > len(log.Topics) {
// If the to filtered topics is greater than the amount of topics in logs, skip.
if len(topics) > len(log.Topics) {
continue Logs
}
for i, topics := range self.topics {
for i, topics := range topics {
var match bool
for _, topic := range topics {
// common.Hash{} is a match all (wildcard)
@ -196,7 +199,6 @@ Logs:
if !match {
continue Logs
}
}
ret = append(ret, log)
@ -205,10 +207,10 @@ Logs:
return ret
}
func (self *Filter) bloomFilter(block *types.Block) bool {
if len(self.addresses) > 0 {
func (f *Filter) bloomFilter(block *types.Block) bool {
if len(f.addresses) > 0 {
var included bool
for _, addr := range self.addresses {
for _, addr := range f.addresses {
if types.BloomLookup(block.Bloom(), addr) {
included = true
break
@ -220,7 +222,7 @@ func (self *Filter) bloomFilter(block *types.Block) bool {
}
}
for _, sub := range self.topics {
for _, sub := range f.topics {
var included bool
for _, topic := range sub {
if (topic == common.Hash{}) || types.BloomLookup(block.Bloom(), topic) {

View File

@ -14,179 +14,305 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// package filters implements an ethereum filtering system for block,
// Package filters implements an ethereum filtering system for block,
// transactions and log events.
package filters
import (
"encoding/json"
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
)
// FilterType determines the type of filter and is used to put the filter in to
// Type determines the kind of filter and is used to put the filter in to
// the correct bucket when added.
type FilterType byte
type Type byte
const (
ChainFilter FilterType = iota // new block events filter
PendingTxFilter // pending transaction filter
LogFilter // new or removed log filter
PendingLogFilter // pending log filter
// UnknownSubscription indicates an unkown subscription type
UnknownSubscription Type = iota
// LogsSubscription queries for new or removed (chain reorg) logs
LogsSubscription
// PendingLogsSubscription queries for logs for the pending block
PendingLogsSubscription
// PendingTransactionsSubscription queries tx hashes for pending
// transactions entering the pending state
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
)
// FilterSystem manages filters that filter specific events such as
// block, transaction and log events. The Filtering system can be used to listen
// for specific LOG events fired by the EVM (Ethereum Virtual Machine).
type FilterSystem struct {
filterMu sync.RWMutex
filterId int
var (
ErrInvalidSubscriptionID = errors.New("invalid id")
)
chainFilters map[int]*Filter
pendingTxFilters map[int]*Filter
logFilters map[int]*Filter
pendingLogFilters map[int]*Filter
// generic is an ugly hack for Get
generic map[int]*Filter
sub event.Subscription
// Log is a helper that can hold additional information about vm.Log
// necessary for the RPC interface.
type Log struct {
*vm.Log
Removed bool `json:"removed"`
}
// NewFilterSystem returns a newly allocated filter manager
func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
fs := &FilterSystem{
chainFilters: make(map[int]*Filter),
pendingTxFilters: make(map[int]*Filter),
logFilters: make(map[int]*Filter),
pendingLogFilters: make(map[int]*Filter),
generic: make(map[int]*Filter),
func (l *Log) MarshalJSON() ([]byte, error) {
fields := map[string]interface{}{
"address": l.Address,
"data": fmt.Sprintf("0x%x", l.Data),
"blockNumber": fmt.Sprintf("%#x", l.BlockNumber),
"logIndex": fmt.Sprintf("%#x", l.Index),
"blockHash": l.BlockHash,
"transactionHash": l.TxHash,
"transactionIndex": fmt.Sprintf("%#x", l.TxIndex),
"topics": l.Topics,
"removed": l.Removed,
}
fs.sub = mux.Subscribe(
core.PendingLogsEvent{},
core.RemovedLogsEvent{},
core.ChainEvent{},
core.TxPreEvent{},
vm.Logs(nil),
)
go fs.filterLoop()
return fs
return json.Marshal(fields)
}
// Stop quits the filter loop required for polling events
func (fs *FilterSystem) Stop() {
fs.sub.Unsubscribe()
type subscription struct {
id rpc.ID
typ Type
created time.Time
logsCrit FilterCriteria
logs chan []Log
hashes chan common.Hash
headers chan *types.Header
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
}
// Acquire filter system maps lock, required to force lock acquisition
// sequence with filterMu acquired first to avoid deadlocks by callbacks
func (fs *FilterSystem) Lock() {
fs.filterMu.Lock()
// EventSystem creates subscriptions, processes events and broadcasts them to the
// subscription which match the subscription criteria.
type EventSystem struct {
mux *event.TypeMux
sub event.Subscription
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
}
// Release filter system maps lock
func (fs *FilterSystem) Unlock() {
fs.filterMu.Unlock()
}
// Add adds a filter to the filter manager
// Expects filterMu to be locked.
func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
id := fs.filterId
filter.created = time.Now()
switch filterType {
case ChainFilter:
fs.chainFilters[id] = filter
case PendingTxFilter:
fs.pendingTxFilters[id] = filter
case LogFilter:
fs.logFilters[id] = filter
case PendingLogFilter:
fs.pendingLogFilters[id] = filter
default:
return 0, fmt.Errorf("unknown filter type %v", filterType)
// NewEventSystem creates a new manager that listens for event on the given mux,
// parses and filters them. It uses the all map to retrieve filter changes. The
// work loop holds its own index that is used to forward events to filters.
//
// The returned manager has a loop that needs to be stopped with the Stop function
// or by stopping the given mux.
func NewEventSystem(mux *event.TypeMux) *EventSystem {
m := &EventSystem{
mux: mux,
install: make(chan *subscription),
uninstall: make(chan *subscription),
}
fs.generic[id] = filter
fs.filterId++
go m.eventLoop()
return id, nil
return m
}
// Remove removes a filter by filter id
// Expects filterMu to be locked.
func (fs *FilterSystem) Remove(id int) {
delete(fs.chainFilters, id)
delete(fs.pendingTxFilters, id)
delete(fs.logFilters, id)
delete(fs.pendingLogFilters, id)
delete(fs.generic, id)
// Subscription is created when the client registers itself for a particular event.
type Subscription struct {
ID rpc.ID
f *subscription
es *EventSystem
unsubOnce sync.Once
}
func (fs *FilterSystem) Get(id int) *Filter {
fs.filterMu.RLock()
defer fs.filterMu.RUnlock()
return fs.generic[id]
// Err returns a channel that is closed when unsubscribed.
func (sub *Subscription) Err() <-chan error {
return sub.f.err
}
// filterLoop waits for specific events from ethereum and fires their handlers
// when the filter matches the requirements.
func (fs *FilterSystem) filterLoop() {
for event := range fs.sub.Chan() {
switch ev := event.Data.(type) {
case core.ChainEvent:
fs.filterMu.RLock()
for _, filter := range fs.chainFilters {
if filter.BlockCallback != nil && !filter.created.After(event.Time) {
filter.BlockCallback(ev.Block, ev.Logs)
}
// Unsubscribe uninstalls the subscription from the event broadcast loop.
func (sub *Subscription) Unsubscribe() {
sub.unsubOnce.Do(func() {
uninstallLoop:
for {
// write uninstall request and consume logs/hashes. This prevents
// the eventLoop broadcast method to deadlock when writing to the
// filter event channel while the subscription loop is waiting for
// this method to return (and thus not reading these events).
select {
case sub.es.uninstall <- sub.f:
break uninstallLoop
case <-sub.f.logs:
case <-sub.f.hashes:
case <-sub.f.headers:
}
fs.filterMu.RUnlock()
case core.TxPreEvent:
fs.filterMu.RLock()
for _, filter := range fs.pendingTxFilters {
if filter.TransactionCallback != nil && !filter.created.After(event.Time) {
filter.TransactionCallback(ev.Tx)
}
}
fs.filterMu.RUnlock()
}
case vm.Logs:
fs.filterMu.RLock()
for _, filter := range fs.logFilters {
if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, log := range filter.FilterLogs(ev) {
filter.LogCallback(log, false)
// wait for filter to be uninstalled in work loop before returning
// this ensures that the manager won't use the event channel which
// will probably be closed by the client asap after this method returns.
<-sub.Err()
})
}
// subscribe installs the subscription in the event broadcast loop.
func (es *EventSystem) subscribe(sub *subscription) *Subscription {
es.install <- sub
<-sub.installed
return &Subscription{ID: sub.id, f: sub, es: es}
}
// SubscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel.
func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: LogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
// SubscribePendingLogs creates a subscription that will write pending logs matching the
// given criteria to the given channel.
func (es *EventSystem) SubscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
// SubscribePendingTxEvents creates a sbuscription that writes transaction hashes for
// transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []Log),
hashes: hashes,
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
// SubscribeNewHeads creates a subscription that writes the header of a block that is
// imported in the chain.
func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []Log),
hashes: make(chan common.Hash),
headers: headers,
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
type filterIndex map[Type]map[rpc.ID]*subscription
// broadcast event to filters that match criteria.
func broadcast(filters filterIndex, ev *event.Event) {
if ev == nil {
return
}
switch e := ev.Data.(type) {
case vm.Logs:
if len(e) > 0 {
for _, f := range filters[LogsSubscription] {
if ev.Time.After(f.created) {
if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
}
fs.filterMu.RUnlock()
case core.RemovedLogsEvent:
fs.filterMu.RLock()
for _, filter := range fs.logFilters {
if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, removedLog := range filter.FilterLogs(ev.Logs) {
filter.LogCallback(removedLog, true)
}
}
case core.RemovedLogsEvent:
for _, f := range filters[LogsSubscription] {
if ev.Time.After(f.created) {
if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
fs.filterMu.RUnlock()
case core.PendingLogsEvent:
fs.filterMu.RLock()
for _, filter := range fs.pendingLogFilters {
if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, pendingLog := range ev.Logs {
filter.LogCallback(pendingLog, false)
}
}
case core.PendingLogsEvent:
for _, f := range filters[PendingLogsSubscription] {
if ev.Time.After(f.created) {
if matchedLogs := filterLogs(convertLogs(e.Logs, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
fs.filterMu.RUnlock()
}
case core.TxPreEvent:
for _, f := range filters[PendingTransactionsSubscription] {
if ev.Time.After(f.created) {
f.hashes <- e.Tx.Hash()
}
}
case core.ChainEvent:
for _, f := range filters[BlocksSubscription] {
if ev.Time.After(f.created) {
f.headers <- e.Block.Header()
}
}
}
}
// eventLoop (un)installs filters and processes mux events.
func (es *EventSystem) eventLoop() {
var (
index = make(filterIndex)
sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, vm.Logs{}, core.TxPreEvent{}, core.ChainEvent{})
)
for {
select {
case ev, active := <-sub.Chan():
if !active { // system stopped
return
}
broadcast(index, ev)
case f := <-es.install:
if _, found := index[f.typ]; !found {
index[f.typ] = make(map[rpc.ID]*subscription)
}
index[f.typ][f.id] = f
close(f.installed)
case f := <-es.uninstall:
delete(index[f.typ], f.id)
close(f.err)
}
}
}
// convertLogs is a helper utility that converts vm.Logs to []filter.Log.
func convertLogs(in vm.Logs, removed bool) []Log {
logs := make([]Log, len(in))
for i, l := range in {
logs[i] = Log{l, removed}
}
return logs
}

View File

@ -17,101 +17,310 @@
package filters
import (
"math/big"
"reflect"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
)
func TestCallbacks(t *testing.T) {
var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
api = NewPublicFilterAPI(db, mux)
)
// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
// It creates multiple subscriptions:
// - one at the start and should receive all posted chain events and a second (blockHashes)
// - one that is created after a cutoff moment and uninstalled after a second cutoff moment (blockHashes[cutoff1:cutoff2])
// - one that is created after the second cutoff moment (blockHashes[cutoff2:])
func TestBlockSubscription(t *testing.T) {
t.Parallel()
var (
mux event.TypeMux
fs = NewFilterSystem(&mux)
blockDone = make(chan struct{})
txDone = make(chan struct{})
logDone = make(chan struct{})
removedLogDone = make(chan struct{})
pendingLogDone = make(chan struct{})
genesis = core.WriteGenesisBlockForTesting(db)
chain, _ = core.GenerateChain(nil, genesis, db, 10, func(i int, gen *core.BlockGen) {})
chainEvents = []core.ChainEvent{}
)
blockFilter := &Filter{
BlockCallback: func(*types.Block, vm.Logs) {
close(blockDone)
},
for _, blk := range chain {
chainEvents = append(chainEvents, core.ChainEvent{Hash: blk.Hash(), Block: blk})
}
txFilter := &Filter{
TransactionCallback: func(*types.Transaction) {
close(txDone)
},
}
logFilter := &Filter{
LogCallback: func(l *vm.Log, oob bool) {
if !oob {
close(logDone)
chan0 := make(chan *types.Header)
sub0 := api.events.SubscribeNewHeads(chan0)
chan1 := make(chan *types.Header)
sub1 := api.events.SubscribeNewHeads(chan1)
go func() { // simulate client
i1, i2 := 0, 0
for i1 != len(chainEvents) || i2 != len(chainEvents) {
select {
case header := <-chan0:
if chainEvents[i1].Hash != header.Hash() {
t.Errorf("sub0 received invalid hash on index %d, want %x, got %x", i1, chainEvents[i1].Hash, header.Hash())
}
i1++
case header := <-chan1:
if chainEvents[i2].Hash != header.Hash() {
t.Errorf("sub1 received invalid hash on index %d, want %x, got %x", i2, chainEvents[i2].Hash, header.Hash())
}
i2++
}
},
}
removedLogFilter := &Filter{
LogCallback: func(l *vm.Log, oob bool) {
if oob {
close(removedLogDone)
}
},
}
pendingLogFilter := &Filter{
LogCallback: func(*vm.Log, bool) {
close(pendingLogDone)
},
}
sub0.Unsubscribe()
sub1.Unsubscribe()
}()
time.Sleep(1 * time.Second)
for _, e := range chainEvents {
mux.Post(e)
}
fs.Add(blockFilter, ChainFilter)
fs.Add(txFilter, PendingTxFilter)
fs.Add(logFilter, LogFilter)
fs.Add(removedLogFilter, LogFilter)
fs.Add(pendingLogFilter, PendingLogFilter)
<-sub0.Err()
<-sub1.Err()
}
mux.Post(core.ChainEvent{})
mux.Post(core.TxPreEvent{})
mux.Post(vm.Logs{&vm.Log{}})
mux.Post(core.RemovedLogsEvent{Logs: vm.Logs{&vm.Log{}}})
mux.Post(core.PendingLogsEvent{Logs: vm.Logs{&vm.Log{}}})
// TestPendingTxFilter tests whether pending tx filters retrieve all pending transactions that are posted to the event mux.
func TestPendingTxFilter(t *testing.T) {
t.Parallel()
const dura = 5 * time.Second
failTimer := time.NewTimer(dura)
select {
case <-blockDone:
case <-failTimer.C:
t.Error("block filter failed to trigger (timeout)")
var (
transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
types.NewTransaction(2, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
types.NewTransaction(3, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
}
hashes []common.Hash
)
fid0 := api.NewPendingTransactionFilter()
time.Sleep(1 * time.Second)
for _, tx := range transactions {
ev := core.TxPreEvent{Tx: tx}
mux.Post(ev)
}
failTimer.Reset(dura)
select {
case <-txDone:
case <-failTimer.C:
t.Error("transaction filter failed to trigger (timeout)")
for {
h := api.GetFilterChanges(fid0).([]common.Hash)
hashes = append(hashes, h...)
if len(hashes) >= len(transactions) {
break
}
time.Sleep(100 * time.Millisecond)
}
failTimer.Reset(dura)
select {
case <-logDone:
case <-failTimer.C:
t.Error("log filter failed to trigger (timeout)")
}
failTimer.Reset(dura)
select {
case <-removedLogDone:
case <-failTimer.C:
t.Error("removed log filter failed to trigger (timeout)")
}
failTimer.Reset(dura)
select {
case <-pendingLogDone:
case <-failTimer.C:
t.Error("pending log filter failed to trigger (timeout)")
for i := range hashes {
if hashes[i] != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
}
}
}
// TestLogFilter tests whether log filters match the correct logs that are posted to the event mux.
func TestLogFilter(t *testing.T) {
t.Parallel()
var (
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333")
notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999")
firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
allLogs = vm.Logs{
// Note, these are used for comparison of the test cases.
vm.NewLog(firstAddr, []common.Hash{}, []byte(""), 0),
vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 1),
vm.NewLog(secondAddr, []common.Hash{firstTopic}, []byte(""), 1),
vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 2),
vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 3),
}
testCases = []struct {
crit FilterCriteria
expected vm.Logs
id rpc.ID
}{
// match all
{FilterCriteria{}, allLogs, ""},
// match none due to no matching addresses
{FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{allLogs[0].Topics}}, vm.Logs{}, ""},
// match logs based on addresses, ignore topics
{FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""},
// match none due to no matching topics (match with address)
{FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, ""},
// match logs based on addresses and topics
{FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[3:5], ""},
// match logs based on multiple addresses and "or" topics
{FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[2:5], ""},
// block numbers are ignored for filters created with New***Filter, these return all logs that match the given criterias when the state changes
{FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, allLogs[:2], ""},
}
err error
)
// create all filters
for i := range testCases {
testCases[i].id = api.NewFilter(testCases[i].crit)
}
// raise events
time.Sleep(1 * time.Second)
if err = mux.Post(allLogs); err != nil {
t.Fatal(err)
}
for i, tt := range testCases {
var fetched []Log
for { // fetch all expected logs
fetched = append(fetched, api.GetFilterChanges(tt.id).([]Log)...)
if len(fetched) >= len(tt.expected) {
break
}
time.Sleep(100 * time.Millisecond)
}
if len(fetched) != len(tt.expected) {
t.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))
return
}
for l := range fetched {
if fetched[l].Removed {
t.Errorf("expected log not to be removed for log %d in case %d", l, i)
}
if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) {
t.Errorf("invalid log on index %d for case %d", l, i)
}
}
}
}
// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event mux.
func TestPendingLogsSubscription(t *testing.T) {
t.Parallel()
var (
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333")
notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999")
firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
thirdTopic = common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333")
forthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444")
notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
allLogs = []core.PendingLogsEvent{
core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(firstAddr, []common.Hash{}, []byte(""), 0)}},
core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 1)}},
core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(secondAddr, []common.Hash{firstTopic}, []byte(""), 2)}},
core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 3)}},
core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 4)}},
core.PendingLogsEvent{Logs: vm.Logs{
vm.NewLog(thirdAddress, []common.Hash{firstTopic}, []byte(""), 5),
vm.NewLog(thirdAddress, []common.Hash{thirdTopic}, []byte(""), 5),
vm.NewLog(thirdAddress, []common.Hash{forthTopic}, []byte(""), 5),
vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 5),
}},
}
convertLogs = func(pl []core.PendingLogsEvent) vm.Logs {
var logs vm.Logs
for _, l := range pl {
logs = append(logs, l.Logs...)
}
return logs
}
testCases = []struct {
crit FilterCriteria
expected vm.Logs
c chan []Log
sub *Subscription
}{
// match all
{FilterCriteria{}, convertLogs(allLogs), nil, nil},
// match none due to no matching addresses
{FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{[]common.Hash{}}}, vm.Logs{}, nil, nil},
// match logs based on addresses, ignore topics
{FilterCriteria{Addresses: []common.Address{firstAddr}}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
// match none due to no matching topics (match with address)
{FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, nil, nil},
// match logs based on addresses and topics
{FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, append(convertLogs(allLogs[3:5]), allLogs[5].Logs[0]), nil, nil},
// match logs based on multiple addresses and "or" topics
{FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, append(convertLogs(allLogs[2:5]), allLogs[5].Logs[0]), nil, nil},
// block numbers are ignored for filters created with New***Filter, these return all logs that match the given criterias when the state changes
{FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
// multiple pending logs, should match only 2 topics from the logs in block 5
{FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, forthTopic}}}, vm.Logs{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil},
}
)
// create all subscriptions, this ensures all subscriptions are created before the events are posted.
// on slow machines this could otherwise lead to missing events when the subscription is created after
// (some) events are posted.
for i := range testCases {
testCases[i].c = make(chan []Log)
testCases[i].sub = api.events.SubscribePendingLogs(testCases[i].crit, testCases[i].c)
}
for n, test := range testCases {
i := n
tt := test
go func() {
var fetched []Log
fetchLoop:
for {
logs := <-tt.c
fetched = append(fetched, logs...)
if len(fetched) >= len(tt.expected) {
break fetchLoop
}
}
if len(fetched) != len(tt.expected) {
t.Fatalf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))
}
for l := range fetched {
if fetched[l].Removed {
t.Errorf("expected log not to be removed for log %d in case %d", l, i)
}
if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) {
t.Errorf("invalid log on index %d for case %d", l, i)
}
}
}()
}
// raise events
time.Sleep(1 * time.Second)
for _, l := range allLogs {
if err := mux.Post(l); err != nil {
t.Fatal(err)
}
}
}