core/state/snapshot: implement storage iterator (#20971)
* core/state/snapshot: implement storage iterator * core/state/snapshot, tests: implement helper function * core/state/snapshot: fix storage issue If an account is deleted in the tx_1 but recreated in the tx_2, the it can happen that in this diff layer, both destructedSet and storageData records this account. In this case, the storage iterator should be able to iterate the slots belong to new account but disable further iteration in deeper layers(belong to old account) * core/state/snapshot: address peter and martin's comment * core/state: address comments * core/state/snapshot: fix test
This commit is contained in:
@ -17,6 +17,8 @@
|
||||
package snapshot
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -27,80 +29,239 @@ import (
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
)
|
||||
|
||||
// conversionAccount is used for converting between full and slim format. When
|
||||
// doing this, we can consider 'balance' as a byte array, as it has already
|
||||
// been converted from big.Int into an rlp-byteslice.
|
||||
type conversionAccount struct {
|
||||
Nonce uint64
|
||||
Balance []byte
|
||||
Root []byte
|
||||
CodeHash []byte
|
||||
}
|
||||
|
||||
// SlimToFull converts data on the 'slim RLP' format into the full RLP-format
|
||||
func SlimToFull(data []byte) ([]byte, error) {
|
||||
acc := &conversionAccount{}
|
||||
if err := rlp.DecodeBytes(data, acc); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(acc.Root) == 0 {
|
||||
acc.Root = emptyRoot[:]
|
||||
}
|
||||
if len(acc.CodeHash) == 0 {
|
||||
acc.CodeHash = emptyCode[:]
|
||||
}
|
||||
fullData, err := rlp.EncodeToBytes(acc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return fullData, nil
|
||||
}
|
||||
|
||||
// trieKV represents a trie key-value pair
|
||||
type trieKV struct {
|
||||
key common.Hash
|
||||
value []byte
|
||||
}
|
||||
|
||||
type trieGeneratorFn func(in chan (trieKV), out chan (common.Hash))
|
||||
type (
|
||||
// trieGeneratorFn is the interface of trie generation which can
|
||||
// be implemented by different trie algorithm.
|
||||
trieGeneratorFn func(in chan (trieKV), out chan (common.Hash))
|
||||
|
||||
// GenerateTrieRoot takes an account iterator and reproduces the root hash.
|
||||
func GenerateTrieRoot(it AccountIterator) common.Hash {
|
||||
return generateTrieRoot(it, stdGenerate)
|
||||
// leafCallbackFn is the callback invoked at the leaves of the trie,
|
||||
// returns the subtrie root with the specified subtrie identifier.
|
||||
leafCallbackFn func(hash common.Hash, stat *generateStats) common.Hash
|
||||
)
|
||||
|
||||
// GenerateAccountTrieRoot takes an account iterator and reproduces the root hash.
|
||||
func GenerateAccountTrieRoot(it AccountIterator) (common.Hash, error) {
|
||||
return generateTrieRoot(it, common.Hash{}, stdGenerate, nil, &generateStats{start: time.Now()}, true)
|
||||
}
|
||||
|
||||
func generateTrieRoot(it AccountIterator, generatorFn trieGeneratorFn) common.Hash {
|
||||
// GenerateStorageTrieRoot takes a storage iterator and reproduces the root hash.
|
||||
func GenerateStorageTrieRoot(account common.Hash, it StorageIterator) (common.Hash, error) {
|
||||
return generateTrieRoot(it, account, stdGenerate, nil, &generateStats{start: time.Now()}, true)
|
||||
}
|
||||
|
||||
// VerifyState takes the whole snapshot tree as the input, traverses all the accounts
|
||||
// as well as the corresponding storages and compares the re-computed hash with the
|
||||
// original one(state root and the storage root).
|
||||
func VerifyState(snaptree *Tree, root common.Hash) error {
|
||||
acctIt, err := snaptree.AccountIterator(root, common.Hash{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
got, err := generateTrieRoot(acctIt, common.Hash{}, stdGenerate, func(account common.Hash, stat *generateStats) common.Hash {
|
||||
storageIt, err := snaptree.StorageIterator(root, account, common.Hash{})
|
||||
if err != nil {
|
||||
return common.Hash{}
|
||||
}
|
||||
hash, err := generateTrieRoot(storageIt, account, stdGenerate, nil, stat, false)
|
||||
if err != nil {
|
||||
return common.Hash{}
|
||||
}
|
||||
return hash
|
||||
}, &generateStats{start: time.Now()}, true)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if got != root {
|
||||
return fmt.Errorf("State root hash mismatch, got %x, want %x", got, root)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// generateStats is a collection of statistics gathered by the trie generator
|
||||
// for logging purposes.
|
||||
type generateStats struct {
|
||||
accounts uint64
|
||||
slots uint64
|
||||
curAccount common.Hash
|
||||
curSlot common.Hash
|
||||
start time.Time
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
// progress records the progress trie generator made recently.
|
||||
func (stat *generateStats) progress(accounts, slots uint64, curAccount common.Hash, curSlot common.Hash) {
|
||||
stat.lock.Lock()
|
||||
defer stat.lock.Unlock()
|
||||
|
||||
stat.accounts += accounts
|
||||
stat.slots += slots
|
||||
if curAccount != (common.Hash{}) {
|
||||
stat.curAccount = curAccount
|
||||
}
|
||||
if curSlot != (common.Hash{}) {
|
||||
stat.curSlot = curSlot
|
||||
}
|
||||
}
|
||||
|
||||
// report prints the cumulative progress statistic smartly.
|
||||
func (stat *generateStats) report() {
|
||||
stat.lock.RLock()
|
||||
defer stat.lock.RUnlock()
|
||||
|
||||
var ctx []interface{}
|
||||
if stat.curSlot != (common.Hash{}) {
|
||||
ctx = append(ctx, []interface{}{
|
||||
"in", stat.curAccount,
|
||||
"at", stat.curSlot,
|
||||
}...)
|
||||
} else {
|
||||
ctx = append(ctx, []interface{}{"at", stat.curAccount}...)
|
||||
}
|
||||
// Add the usual measurements
|
||||
ctx = append(ctx, []interface{}{"accounts", stat.accounts}...)
|
||||
if stat.slots != 0 {
|
||||
ctx = append(ctx, []interface{}{"slots", stat.slots}...)
|
||||
}
|
||||
ctx = append(ctx, []interface{}{"elapsed", common.PrettyDuration(time.Since(stat.start))}...)
|
||||
log.Info("Generating trie hash from snapshot", ctx...)
|
||||
}
|
||||
|
||||
// reportDone prints the last log when the whole generation is finished.
|
||||
func (stat *generateStats) reportDone() {
|
||||
stat.lock.RLock()
|
||||
defer stat.lock.RUnlock()
|
||||
|
||||
var ctx []interface{}
|
||||
ctx = append(ctx, []interface{}{"accounts", stat.accounts}...)
|
||||
if stat.slots != 0 {
|
||||
ctx = append(ctx, []interface{}{"slots", stat.slots}...)
|
||||
}
|
||||
ctx = append(ctx, []interface{}{"elapsed", common.PrettyDuration(time.Since(stat.start))}...)
|
||||
log.Info("Generated trie hash from snapshot", ctx...)
|
||||
}
|
||||
|
||||
// generateTrieRoot generates the trie hash based on the snapshot iterator.
|
||||
// It can be used for generating account trie, storage trie or even the
|
||||
// whole state which connects the accounts and the corresponding storages.
|
||||
func generateTrieRoot(it Iterator, account common.Hash, generatorFn trieGeneratorFn, leafCallback leafCallbackFn, stats *generateStats, report bool) (common.Hash, error) {
|
||||
var (
|
||||
in = make(chan trieKV) // chan to pass leaves
|
||||
out = make(chan common.Hash) // chan to collect result
|
||||
wg sync.WaitGroup
|
||||
in = make(chan trieKV) // chan to pass leaves
|
||||
out = make(chan common.Hash, 1) // chan to collect result
|
||||
stoplog = make(chan bool, 1) // 1-size buffer, works when logging is not enabled
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
// Spin up a go-routine for trie hash re-generation
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
generatorFn(in, out)
|
||||
wg.Done()
|
||||
}()
|
||||
// Feed leaves
|
||||
start := time.Now()
|
||||
logged := time.Now()
|
||||
accounts := 0
|
||||
for it.Next() {
|
||||
slimData := it.Account()
|
||||
fullData, _ := SlimToFull(slimData)
|
||||
l := trieKV{it.Hash(), fullData}
|
||||
in <- l
|
||||
if time.Since(logged) > 8*time.Second {
|
||||
log.Info("Generating trie hash from snapshot",
|
||||
"at", l.key, "accounts", accounts, "elapsed", time.Since(start))
|
||||
logged = time.Now()
|
||||
}
|
||||
accounts++
|
||||
|
||||
// Spin up a go-routine for progress logging
|
||||
if report && stats != nil {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
timer := time.NewTimer(0)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
stats.report()
|
||||
timer.Reset(time.Second * 8)
|
||||
case success := <-stoplog:
|
||||
if success {
|
||||
stats.reportDone()
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
close(in)
|
||||
result := <-out
|
||||
log.Info("Generated trie hash from snapshot", "accounts", accounts, "elapsed", time.Since(start))
|
||||
wg.Wait()
|
||||
return result
|
||||
// stop is a helper function to shutdown the background threads
|
||||
// and return the re-generated trie hash.
|
||||
stop := func(success bool) common.Hash {
|
||||
close(in)
|
||||
result := <-out
|
||||
stoplog <- success
|
||||
wg.Wait()
|
||||
return result
|
||||
}
|
||||
var (
|
||||
logged = time.Now()
|
||||
processed = uint64(0)
|
||||
leaf trieKV
|
||||
last common.Hash
|
||||
)
|
||||
// Start to feed leaves
|
||||
for it.Next() {
|
||||
if account == (common.Hash{}) {
|
||||
var (
|
||||
err error
|
||||
fullData []byte
|
||||
)
|
||||
if leafCallback == nil {
|
||||
fullData, err = FullAccountRLP(it.(AccountIterator).Account())
|
||||
if err != nil {
|
||||
stop(false)
|
||||
return common.Hash{}, err
|
||||
}
|
||||
} else {
|
||||
account, err := FullAccount(it.(AccountIterator).Account())
|
||||
if err != nil {
|
||||
stop(false)
|
||||
return common.Hash{}, err
|
||||
}
|
||||
// Apply the leaf callback. Normally the callback is used to traverse
|
||||
// the storage trie and re-generate the subtrie root.
|
||||
subroot := leafCallback(it.Hash(), stats)
|
||||
if !bytes.Equal(account.Root, subroot.Bytes()) {
|
||||
stop(false)
|
||||
return common.Hash{}, fmt.Errorf("invalid subroot(%x), want %x, got %x", it.Hash(), account.Root, subroot)
|
||||
}
|
||||
fullData, err = rlp.EncodeToBytes(account)
|
||||
if err != nil {
|
||||
stop(false)
|
||||
return common.Hash{}, err
|
||||
}
|
||||
}
|
||||
leaf = trieKV{it.Hash(), fullData}
|
||||
} else {
|
||||
leaf = trieKV{it.Hash(), common.CopyBytes(it.(StorageIterator).Slot())}
|
||||
}
|
||||
in <- leaf
|
||||
|
||||
// Accumulate the generaation statistic if it's required.
|
||||
processed++
|
||||
if time.Since(logged) > 3*time.Second && stats != nil {
|
||||
if account == (common.Hash{}) {
|
||||
stats.progress(processed, 0, it.Hash(), common.Hash{})
|
||||
} else {
|
||||
stats.progress(0, processed, account, it.Hash())
|
||||
}
|
||||
logged, processed = time.Now(), 0
|
||||
}
|
||||
last = it.Hash()
|
||||
}
|
||||
// Commit the last part statistic.
|
||||
if processed > 0 && stats != nil {
|
||||
if account == (common.Hash{}) {
|
||||
stats.progress(processed, 0, last, common.Hash{})
|
||||
} else {
|
||||
stats.progress(0, processed, account, last)
|
||||
}
|
||||
}
|
||||
result := stop(true)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// stdGenerate is a very basic hexary trie builder which uses the same Trie
|
||||
|
Reference in New Issue
Block a user