les: light client protocol and API

This commit is contained in:
Zsolt Felfoldi
2016-10-14 05:51:29 +02:00
committed by Felix Lange
parent 760fd65487
commit 9f8d192991
41 changed files with 5860 additions and 33 deletions

View File

@ -61,12 +61,14 @@ type PublicFilterAPI struct {
}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(chainDb ethdb.Database, mux *event.TypeMux) *PublicFilterAPI {
func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
api := &PublicFilterAPI{
mux: mux,
chainDb: chainDb,
events: NewEventSystem(mux),
filters: make(map[rpc.ID]*filter),
backend: backend,
useMipMap: !lightMode,
mux: backend.EventMux(),
chainDb: backend.ChainDb(),
events: NewEventSystem(backend.EventMux(), backend, lightMode),
filters: make(map[rpc.ID]*filter),
}
go api.timeoutLoop()

View File

@ -207,11 +207,15 @@ Logs:
return ret
}
func (f *Filter) bloomFilter(block *types.Block) bool {
if len(f.addresses) > 0 {
func (f *Filter) bloomFilter(bloom types.Bloom) bool {
return bloomFilter(bloom, f.addresses, f.topics)
}
func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool {
if len(addresses) > 0 {
var included bool
for _, addr := range f.addresses {
if types.BloomLookup(block.Bloom(), addr) {
for _, addr := range addresses {
if types.BloomLookup(bloom, addr) {
included = true
break
}
@ -222,7 +226,7 @@ func (f *Filter) bloomFilter(block *types.Block) bool {
}
}
for _, sub := range f.topics {
for _, sub := range topics {
var included bool
for _, topic := range sub {
if (topic == common.Hash{}) || types.BloomLookup(block.Bloom(), topic) {

View File

@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
"golang.org/x/net/context"
)
// Type determines the kind of filter and is used to put the filter in to
@ -95,6 +96,9 @@ type subscription struct {
type EventSystem struct {
mux *event.TypeMux
sub event.Subscription
backend Backend
lightMode bool
lastHead *types.Header
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
}
@ -105,9 +109,11 @@ type EventSystem struct {
//
// The returned manager has a loop that needs to be stopped with the Stop function
// or by stopping the given mux.
func NewEventSystem(mux *event.TypeMux) *EventSystem {
func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem {
m := &EventSystem{
mux: mux,
backend: backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
}
@ -235,7 +241,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
type filterIndex map[Type]map[rpc.ID]*subscription
// broadcast event to filters that match criteria.
func broadcast(filters filterIndex, ev *event.Event) {
func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) {
if ev == nil {
return
}
@ -279,9 +285,79 @@ func broadcast(filters filterIndex, ev *event.Event) {
f.headers <- e.Block.Header()
}
}
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
if ev.Time.After(f.created) {
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
}
})
}
}
}
func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {
oldh := es.lastHead
es.lastHead = newHeader
if oldh == nil {
return
}
newh := newHeader
// find common ancestor, create list of rolled back and new block hashes
var oldHeaders, newHeaders []*types.Header
for oldh.Hash() != newh.Hash() {
if oldh.GetNumberU64() >= newh.GetNumberU64() {
oldHeaders = append(oldHeaders, oldh)
oldh = core.GetHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1)
}
if oldh.GetNumberU64() < newh.GetNumberU64() {
newHeaders = append(newHeaders, newh)
newh = core.GetHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1)
if newh == nil {
// happens when CHT syncing, nothing to do
newh = oldh
}
}
}
// roll back old blocks
for _, h := range oldHeaders {
callBack(h, true)
}
// check new blocks (array is in reverse order)
for i := len(newHeaders) - 1; i >= 0; i-- {
callBack(newHeaders[i], false)
}
}
// filter logs of a single header in light client mode
func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []Log {
//fmt.Println("lightFilterLogs", header.Number.Uint64(), remove)
if bloomFilter(header.Bloom, addresses, topics) {
//fmt.Println("bloom match")
// Get the logs of the block
ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
receipts, err := es.backend.GetReceipts(ctx, header.Hash())
if err != nil {
return nil
}
var unfiltered []Log
for _, receipt := range receipts {
rl := make([]Log, len(receipt.Logs))
for i, l := range receipt.Logs {
rl[i] = Log{l, remove}
}
unfiltered = append(unfiltered, rl...)
}
logs := filterLogs(unfiltered, addresses, topics)
//fmt.Println("found", len(logs))
return logs
}
return nil
}
// eventLoop (un)installs filters and processes mux events.
func (es *EventSystem) eventLoop() {
var (
@ -294,7 +370,7 @@ func (es *EventSystem) eventLoop() {
if !active { // system stopped
return
}
broadcast(index, ev)
es.broadcast(index, ev)
case f := <-es.install:
if _, found := index[f.typ]; !found {
index[f.typ] = make(map[rpc.ID]*subscription)

View File

@ -32,9 +32,10 @@ import (
)
var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
api = NewPublicFilterAPI(db, mux)
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
backend = &testBackend{mux, db}
api = NewPublicFilterAPI(backend, false)
)
// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.