core/state: convert prefetcher to concurrent per-trie loader

This commit is contained in:
Péter Szilágyi
2021-01-08 15:01:49 +02:00
parent 1e1865b73f
commit 42f9f1f073
9 changed files with 385 additions and 282 deletions

View File

@ -17,233 +17,318 @@
package state
import (
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)
var (
// trieDeliveryMeter counts how many times the prefetcher was unable to supply
// the statedb with a prefilled trie. This meter should be zero -- if it's not, that
// needs to be investigated
trieDeliveryMissMeter = metrics.NewRegisteredMeter("trie/prefetch/deliverymiss", nil)
triePrefetchFetchMeter = metrics.NewRegisteredMeter("trie/prefetch/fetch", nil)
triePrefetchSkipMeter = metrics.NewRegisteredMeter("trie/prefetch/skip", nil)
triePrefetchDropMeter = metrics.NewRegisteredMeter("trie/prefetch/drop", nil)
// triePrefetchMetricsPrefix is the prefix under which to publis the metrics.
triePrefetchMetricsPrefix = "trie/prefetch/"
)
// TriePrefetcher is an active prefetcher, which receives accounts or storage
// items on two channels, and does trie-loading of the items.
// The goal is to get as much useful content into the caches as possible
type TriePrefetcher struct {
requestCh chan (fetchRequest) // Chan to receive requests for data to fetch
cmdCh chan (*cmd) // Chan to control activity, pause/new root
quitCh chan (struct{})
deliveryCh chan (struct{})
db Database
// triePrefetcher is an active prefetcher, which receives accounts or storage
// items and does trie-loading of them. The goal is to get as much useful content
// into the caches as possible.
//
// Note, the prefetcher's API is not thread safe.
type triePrefetcher struct {
db Database // Database to fetch trie nodes through
root common.Hash // Root hash of theaccount trie for metrics
fetches map[common.Hash]Trie // Partially or fully fetcher tries
fetchers map[common.Hash]*subfetcher // Subfetchers for each trie
paused bool
storageTries map[common.Hash]Trie
accountTrie Trie
accountTrieRoot common.Hash
deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
accountDupMeter metrics.Meter
accountSkipMeter metrics.Meter
accountWasteMeter metrics.Meter
storageLoadMeter metrics.Meter
storageDupMeter metrics.Meter
storageSkipMeter metrics.Meter
storageWasteMeter metrics.Meter
}
func NewTriePrefetcher(db Database) *TriePrefetcher {
return &TriePrefetcher{
requestCh: make(chan fetchRequest, 200),
cmdCh: make(chan *cmd),
quitCh: make(chan struct{}),
deliveryCh: make(chan struct{}),
db: db,
// newTriePrefetcher
func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
p := &triePrefetcher{
db: db,
root: root,
fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil),
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil),
storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
}
return p
}
type cmd struct {
root common.Hash
}
// close iterates over all the subfetchers, aborts any that were left spinning
// and reports the stats to the metrics subsystem.
func (p *triePrefetcher) close() {
for _, fetcher := range p.fetchers {
fetcher.abort() // safe to do multiple times
type fetchRequest struct {
slots []common.Hash
storageRoot *common.Hash
addresses []common.Address
}
if metrics.Enabled {
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))
func (p *TriePrefetcher) Loop() {
var (
accountTrieRoot common.Hash
accountTrie Trie
storageTries map[common.Hash]Trie
err error
// Some tracking of performance
skipped int64
fetched int64
paused = true
)
// The prefetcher loop has two distinct phases:
// 1: Paused: when in this state, the accumulated tries are accessible to outside
// callers.
// 2: Active prefetching, awaiting slots and accounts to prefetch
for {
select {
case <-p.quitCh:
return
case cmd := <-p.cmdCh:
// Clear out any old requests
drain:
for {
select {
case req := <-p.requestCh:
if req.slots != nil {
skipped += int64(len(req.slots))
} else {
skipped += int64(len(req.addresses))
}
default:
break drain
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
}
if paused {
// Clear old data
p.storageTries = nil
p.accountTrie = nil
p.accountTrieRoot = common.Hash{}
// Resume again
storageTries = make(map[common.Hash]Trie)
accountTrieRoot = cmd.root
accountTrie, err = p.db.OpenTrie(accountTrieRoot)
if err != nil {
log.Error("Trie prefetcher failed opening trie", "root", accountTrieRoot, "err", err)
}
if accountTrieRoot == (common.Hash{}) {
log.Error("Trie prefetcher unpaused with bad root")
}
paused = false
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
} else {
// Update metrics at new block events
triePrefetchFetchMeter.Mark(fetched)
triePrefetchSkipMeter.Mark(skipped)
fetched, skipped = 0, 0
// Make the tries accessible
p.accountTrie = accountTrie
p.storageTries = storageTries
p.accountTrieRoot = accountTrieRoot
if cmd.root != (common.Hash{}) {
log.Error("Trie prefetcher paused with non-empty root")
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
paused = true
}
p.deliveryCh <- struct{}{}
case req := <-p.requestCh:
if paused {
continue
}
if sRoot := req.storageRoot; sRoot != nil {
// Storage slots to fetch
var (
storageTrie Trie
err error
)
if storageTrie = storageTries[*sRoot]; storageTrie == nil {
if storageTrie, err = p.db.OpenTrie(*sRoot); err != nil {
log.Warn("trie prefetcher failed opening storage trie", "root", *sRoot, "err", err)
skipped += int64(len(req.slots))
continue
}
storageTries[*sRoot] = storageTrie
}
for _, key := range req.slots {
storageTrie.TryGet(key[:])
}
fetched += int64(len(req.slots))
} else { // an account
for _, addr := range req.addresses {
accountTrie.TryGet(addr[:])
}
fetched += int64(len(req.addresses))
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
}
}
}
// Clear out all fetchers (will crash on a second call, deliberate)
p.fetchers = nil
}
// Close stops the prefetcher
func (p *TriePrefetcher) Close() {
if p.quitCh != nil {
close(p.quitCh)
p.quitCh = nil
// copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data
// already loaded will be copied over, but no goroutines will be started. This
// is mostly used in the miner which creates a copy of it's actively mutated
// state to be sealed while it may further mutate the state.
func (p *triePrefetcher) copy() *triePrefetcher {
copy := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[common.Hash]Trie), // Active prefetchers use the fetches map
deliveryMissMeter: p.deliveryMissMeter,
accountLoadMeter: p.accountLoadMeter,
accountDupMeter: p.accountDupMeter,
accountSkipMeter: p.accountSkipMeter,
accountWasteMeter: p.accountWasteMeter,
storageLoadMeter: p.storageLoadMeter,
storageDupMeter: p.storageDupMeter,
storageSkipMeter: p.storageSkipMeter,
storageWasteMeter: p.storageWasteMeter,
}
}
// Resume causes the prefetcher to clear out old data, and get ready to
// fetch data concerning the new root
func (p *TriePrefetcher) Resume(root common.Hash) {
p.paused = false
p.cmdCh <- &cmd{
root: root,
// If the prefetcher is already a copy, duplicate the data
if p.fetches != nil {
for root, fetch := range p.fetches {
copy.fetches[root] = p.db.CopyTrie(fetch)
}
return copy
}
// Wait for it
<-p.deliveryCh
// Otherwise we're copying an active fetcher, retrieve the current states
for root, fetcher := range p.fetchers {
copy.fetches[root] = fetcher.peek()
}
return copy
}
// Pause causes the prefetcher to pause prefetching, and make tries
// accessible to callers via GetTrie
func (p *TriePrefetcher) Pause() {
if p.paused {
// prefetch schedules a batch of trie items to prefetch.
func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte) {
// If the prefetcher is an inactive one, bail out
if p.fetches != nil {
return
}
p.paused = true
p.cmdCh <- &cmd{
root: common.Hash{},
// Active fetcher, schedule the retrievals
fetcher := p.fetchers[root]
if fetcher == nil {
fetcher = newSubfetcher(p.db, root)
p.fetchers[root] = fetcher
}
// Wait for it
<-p.deliveryCh
fetcher.schedule(keys)
}
// PrefetchAddresses adds an address for prefetching
func (p *TriePrefetcher) PrefetchAddresses(addresses []common.Address) {
cmd := fetchRequest{
addresses: addresses,
// trie returns the trie matching the root hash, or nil if the prefetcher doesn't
// have it.
func (p *triePrefetcher) trie(root common.Hash) Trie {
// If the prefetcher is inactive, return from existing deep copies
if p.fetches != nil {
trie := p.fetches[root]
if trie == nil {
p.deliveryMissMeter.Mark(1)
return nil
}
return p.db.CopyTrie(trie)
}
// We do an async send here, to not cause the caller to block
//p.requestCh <- cmd
// Otherwise the prefetcher is active, bail if no trie was prefetched for this root
fetcher := p.fetchers[root]
if fetcher == nil {
p.deliveryMissMeter.Mark(1)
return nil
}
// Interrupt the prefetcher if it's by any chance still running and return
// a copy of any pre-loaded trie.
fetcher.abort() // safe to do multiple times
trie := fetcher.peek()
if trie == nil {
p.deliveryMissMeter.Mark(1)
return nil
}
return trie
}
// used marks a batch of state items used to allow creating statistics as to
// how useful or wasteful the prefetcher is.
func (p *triePrefetcher) used(root common.Hash, used [][]byte) {
if fetcher := p.fetchers[root]; fetcher != nil {
fetcher.used = used
}
}
// subfetcher is a trie fetcher goroutine responsible for pulling entries for a
// single trie. It is spawned when a new root is encountered and lives until the
// main prefetcher is paused and either all requested items are processed or if
// the trie being worked on is retrieved from the prefetcher.
type subfetcher struct {
db Database // Database to load trie nodes through
root common.Hash // Root hash of the trie to prefetch
trie Trie // Trie being populated with nodes
tasks [][]byte // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue
wake chan struct{} // Wake channel if a new task is scheduled
stop chan struct{} // Channel to interrupt processing
term chan struct{} // Channel to signal iterruption
copy chan chan Trie // Channel to request a copy of the current trie
seen map[string]struct{} // Tracks the entries already loaded
dups int // Number of duplicate preload tasks
used [][]byte // Tracks the entries used in the end
}
// newSubfetcher creates a goroutine to prefetch state items belonging to a
// particular root hash.
func newSubfetcher(db Database, root common.Hash) *subfetcher {
sf := &subfetcher{
db: db,
root: root,
wake: make(chan struct{}, 1),
stop: make(chan struct{}),
term: make(chan struct{}),
copy: make(chan chan Trie),
seen: make(map[string]struct{}),
}
go sf.loop()
return sf
}
// schedule adds a batch of trie keys to the queue to prefetch.
func (sf *subfetcher) schedule(keys [][]byte) {
// Append the tasks to the current queue
sf.lock.Lock()
sf.tasks = append(sf.tasks, keys...)
sf.lock.Unlock()
// Notify the prefetcher, it's fine if it's already terminated
select {
case p.requestCh <- cmd:
case sf.wake <- struct{}{}:
default:
triePrefetchDropMeter.Mark(int64(len(addresses)))
}
}
// PrefetchStorage adds a storage root and a set of keys for prefetching
func (p *TriePrefetcher) PrefetchStorage(root common.Hash, slots []common.Hash) {
cmd := fetchRequest{
storageRoot: &root,
slots: slots,
}
// We do an async send here, to not cause the caller to block
//p.requestCh <- cmd
// peek tries to retrieve a deep copy of the fetcher's trie in whatever form it
// is currently.
func (sf *subfetcher) peek() Trie {
ch := make(chan Trie)
select {
case p.requestCh <- cmd:
default:
triePrefetchDropMeter.Mark(int64(len(slots)))
case sf.copy <- ch:
// Subfetcher still alive, return copy from it
return <-ch
case <-sf.term:
// Subfetcher already terminated, return a copy directly
if sf.trie == nil {
return nil
}
return sf.db.CopyTrie(sf.trie)
}
}
// GetTrie returns the trie matching the root hash, or nil if the prefetcher
// doesn't have it.
func (p *TriePrefetcher) GetTrie(root common.Hash) Trie {
if root == p.accountTrieRoot {
return p.accountTrie
// abort interrupts the subfetcher immediately. It is safe to call abort multiple
// times but it is not thread safe.
func (sf *subfetcher) abort() {
select {
case <-sf.stop:
default:
close(sf.stop)
}
<-sf.term
}
// loop waits for new tasks to be scheduled and keeps loading them until it runs
// out of tasks or its underlying trie is retrieved for committing.
func (sf *subfetcher) loop() {
// No matter how the loop stops, signal anyone waiting that it's terminated
defer close(sf.term)
// Start by opening the trie and stop processing if it fails
trie, err := sf.db.OpenTrie(sf.root)
if err != nil {
log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
return
}
sf.trie = trie
// Trie opened successfully, keep prefetching items
for {
select {
case <-sf.wake:
// Subfetcher was woken up, retrieve any tasks to avoid spinning the lock
sf.lock.Lock()
tasks := sf.tasks
sf.tasks = nil
sf.lock.Unlock()
// Prefetch any tasks until the loop is interrupted
for i, task := range tasks {
select {
case <-sf.stop:
// If termination is requested, add any leftover back and return
sf.lock.Lock()
sf.tasks = append(sf.tasks, tasks[i:]...)
sf.lock.Unlock()
return
case ch := <-sf.copy:
// Somebody wants a copy of the current trie, grant them
ch <- sf.db.CopyTrie(sf.trie)
default:
// No termination request yet, prefetch the next entry
taskid := string(task)
if _, ok := sf.seen[taskid]; ok {
sf.dups++
} else {
sf.trie.TryGet(task)
sf.seen[taskid] = struct{}{}
}
}
}
case ch := <-sf.copy:
// Somebody wants a copy of the current trie, grant them
ch <- sf.db.CopyTrie(sf.trie)
case <-sf.stop:
// Termination is requested, abort and leave remaining tasks
return
}
}
if storageTrie, ok := p.storageTries[root]; ok {
// Two accounts may well have the same storage root, but we cannot allow
// them both to make updates to the same trie instance. Therefore,
// we need to either delete the trie now, or deliver a copy of the trie.
delete(p.storageTries, root)
return storageTrie
}
trieDeliveryMissMeter.Mark(1)
return nil
}