all: bloom-filter based pruning mechanism (#21724)
* cmd, core, tests: initial state pruner core: fix db inspector cmd/geth: add verify-state cmd/geth: add verification tool core/rawdb: implement flatdb cmd, core: fix rebase core/state: use new contract code layout core/state/pruner: avoid deleting genesis state cmd/geth: add helper function core, cmd: fix extract genesis core: minor fixes contracts: remove useless core/state/snapshot: plugin stacktrie core: polish core/state/snapshot: iterate storage concurrently core/state/snapshot: fix iteration core: add comments core/state/snapshot: polish code core/state: polish core/state/snapshot: rebase core/rawdb: add comments core/rawdb: fix tests core/rawdb: improve tests core/state/snapshot: fix concurrent iteration core/state: run pruning during the recovery core, trie: implement martin's idea core, eth: delete flatdb and polish pruner trie: fix import core/state/pruner: add log core/state/pruner: fix issues core/state/pruner: don't read back core/state/pruner: fix contract code write core/state/pruner: check root node presence cmd, core: polish log core/state: use HEAD-127 as the target core/state/snapshot: improve tests cmd/geth: fix verification tool cmd/geth: use HEAD as the verification default target all: replace the bloomfilter with martin's fork cmd, core: polish code core, cmd: forcibly delete state root core/state/pruner: add hash64 core/state/pruner: fix blacklist core/state: remove blacklist cmd, core: delete trie clean cache before pruning cmd, core: fix lint cmd, core: fix rebase core/state: fix the special case for clique networks core/state/snapshot: remove useless code core/state/pruner: capping the snapshot after pruning cmd, core, eth: fixes core/rawdb: update db inspector cmd/geth: polish code core/state/pruner: fsync bloom filter cmd, core: print warning log core/state/pruner: adjust the parameters for bloom filter cmd, core: create the bloom filter by size core: polish core/state/pruner: sanitize invalid bloomfilter size cmd: address comments cmd/geth: address comments cmd/geth: address comment core/state/pruner: address comments core/state/pruner: rename homedir to datadir cmd, core: address comments core/state/pruner: address comment core/state: address comments core, cmd, tests: address comments core: address comments core/state/pruner: release the iterator after each commit core/state/pruner: improve pruner cmd, core: adjust bloom paramters core/state/pruner: fix lint core/state/pruner: fix tests core: fix rebase core/state/pruner: remove atomic rename core/state/pruner: address comments all: run go mod tidy core/state/pruner: avoid false-positive for the middle state roots core/state/pruner: add checks for middle roots cmd/geth: replace crit with error * core/state/pruner: fix lint * core: drop legacy bloom filter * core/state/snapshot: improve pruner * core/state/snapshot: polish concurrent logs to report ETA vs. hashes * core/state/pruner: add progress report for pruning and compaction too * core: fix snapshot test API * core/state: fix some pruning logs * core/state/pruner: support recovering from bloom flush fail Co-authored-by: Péter Szilágyi <peterke@gmail.com>
This commit is contained in:
@ -372,7 +372,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
|
||||
log.Warn("Enabling snapshot recovery", "chainhead", head.NumberU64(), "diskbase", *layer)
|
||||
recover = true
|
||||
}
|
||||
bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, head.Root(), !bc.cacheConfig.SnapshotWait, recover)
|
||||
bc.snaps, _ = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, head.Root(), !bc.cacheConfig.SnapshotWait, true, recover)
|
||||
}
|
||||
// Take ownership of this particular state
|
||||
go bc.update()
|
||||
|
@ -31,7 +31,6 @@ import (
|
||||
"github.com/ethereum/go-ethereum/consensus"
|
||||
"github.com/ethereum/go-ethereum/consensus/ethash"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/core/state/snapshot"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/core/vm"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
@ -163,7 +162,7 @@ func (basic *snapshotTestBasic) verify(t *testing.T, chain *BlockChain, blocks [
|
||||
}
|
||||
|
||||
// Check the snapshot, ensure it's integrated
|
||||
if err := snapshot.VerifyState(chain.snaps, block.Root()); err != nil {
|
||||
if err := chain.snaps.Verify(block.Root()); err != nil {
|
||||
t.Errorf("The disk layer is not integrated %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -171,7 +171,6 @@ func SetupGenesisBlock(db ethdb.Database, genesis *Genesis) (*params.ChainConfig
|
||||
}
|
||||
return genesis.Config, block.Hash(), nil
|
||||
}
|
||||
|
||||
// We have the genesis block in database(perhaps in ancient database)
|
||||
// but the corresponding state is missing.
|
||||
header := rawdb.ReadHeader(db, stored, 0)
|
||||
@ -190,7 +189,6 @@ func SetupGenesisBlock(db ethdb.Database, genesis *Genesis) (*params.ChainConfig
|
||||
}
|
||||
return genesis.Config, block.Hash(), nil
|
||||
}
|
||||
|
||||
// Check whether the genesis block is already written.
|
||||
if genesis != nil {
|
||||
hash := genesis.ToBlock(nil).Hash()
|
||||
@ -198,7 +196,6 @@ func SetupGenesisBlock(db ethdb.Database, genesis *Genesis) (*params.ChainConfig
|
||||
return genesis.Config, hash, &GenesisMismatchError{stored, hash}
|
||||
}
|
||||
}
|
||||
|
||||
// Get the existing chain configuration.
|
||||
newcfg := genesis.configOrDefault(stored)
|
||||
if err := newcfg.CheckConfigForkOrder(); err != nil {
|
||||
@ -216,7 +213,6 @@ func SetupGenesisBlock(db ethdb.Database, genesis *Genesis) (*params.ChainConfig
|
||||
if genesis == nil && stored != params.MainnetGenesisHash {
|
||||
return storedcfg, stored, nil
|
||||
}
|
||||
|
||||
// Check config compatibility and write the config. Compatibility errors
|
||||
// are returned to the caller unless we're already at block zero.
|
||||
height := rawdb.ReadHeaderNumber(db, rawdb.ReadHeadHeaderHash(db))
|
||||
|
@ -335,7 +335,7 @@ func InspectDatabase(db ethdb.Database) error {
|
||||
hashNumPairings.Add(size)
|
||||
case len(key) == common.HashLength:
|
||||
tries.Add(size)
|
||||
case bytes.HasPrefix(key, codePrefix) && len(key) == len(codePrefix)+common.HashLength:
|
||||
case bytes.HasPrefix(key, CodePrefix) && len(key) == len(CodePrefix)+common.HashLength:
|
||||
codes.Add(size)
|
||||
case bytes.HasPrefix(key, txLookupPrefix) && len(key) == (len(txLookupPrefix)+common.HashLength):
|
||||
txLookups.Add(size)
|
||||
@ -347,15 +347,26 @@ func InspectDatabase(db ethdb.Database) error {
|
||||
preimages.Add(size)
|
||||
case bytes.HasPrefix(key, bloomBitsPrefix) && len(key) == (len(bloomBitsPrefix)+10+common.HashLength):
|
||||
bloomBits.Add(size)
|
||||
case bytes.HasPrefix(key, BloomBitsIndexPrefix):
|
||||
bloomBits.Add(size)
|
||||
case bytes.HasPrefix(key, []byte("clique-")) && len(key) == 7+common.HashLength:
|
||||
cliqueSnaps.Add(size)
|
||||
case bytes.HasPrefix(key, []byte("cht-")) && len(key) == 4+common.HashLength:
|
||||
case bytes.HasPrefix(key, []byte("cht-")) ||
|
||||
bytes.HasPrefix(key, []byte("chtIndexV2-")) ||
|
||||
bytes.HasPrefix(key, []byte("chtRootV2-")): // Canonical hash trie
|
||||
chtTrieNodes.Add(size)
|
||||
case bytes.HasPrefix(key, []byte("blt-")) && len(key) == 4+common.HashLength:
|
||||
case bytes.HasPrefix(key, []byte("blt-")) ||
|
||||
bytes.HasPrefix(key, []byte("bltIndex-")) ||
|
||||
bytes.HasPrefix(key, []byte("bltRoot-")): // Bloomtrie sub
|
||||
bloomTrieNodes.Add(size)
|
||||
default:
|
||||
var accounted bool
|
||||
for _, meta := range [][]byte{databaseVersionKey, headHeaderKey, headBlockKey, headFastBlockKey, fastTrieProgressKey, uncleanShutdownKey, badBlockKey} {
|
||||
for _, meta := range [][]byte{
|
||||
databaseVersionKey, headHeaderKey, headBlockKey, headFastBlockKey, lastPivotKey,
|
||||
fastTrieProgressKey, snapshotRootKey, snapshotJournalKey, snapshotGeneratorKey,
|
||||
snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey, uncleanShutdownKey,
|
||||
badBlockKey,
|
||||
} {
|
||||
if bytes.Equal(key, meta) {
|
||||
metadata.Add(size)
|
||||
accounted = true
|
||||
|
@ -85,7 +85,7 @@ var (
|
||||
bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash -> bloom bits
|
||||
SnapshotAccountPrefix = []byte("a") // SnapshotAccountPrefix + account hash -> account trie value
|
||||
SnapshotStoragePrefix = []byte("o") // SnapshotStoragePrefix + account hash + storage hash -> storage trie value
|
||||
codePrefix = []byte("c") // codePrefix + code hash -> account code
|
||||
CodePrefix = []byte("c") // CodePrefix + code hash -> account code
|
||||
|
||||
preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage
|
||||
configPrefix = []byte("ethereum-config-") // config prefix for the db
|
||||
@ -209,16 +209,16 @@ func preimageKey(hash common.Hash) []byte {
|
||||
return append(preimagePrefix, hash.Bytes()...)
|
||||
}
|
||||
|
||||
// codeKey = codePrefix + hash
|
||||
// codeKey = CodePrefix + hash
|
||||
func codeKey(hash common.Hash) []byte {
|
||||
return append(codePrefix, hash.Bytes()...)
|
||||
return append(CodePrefix, hash.Bytes()...)
|
||||
}
|
||||
|
||||
// IsCodeKey reports whether the given byte slice is the key of contract code,
|
||||
// if so return the raw code hash as well.
|
||||
func IsCodeKey(key []byte) (bool, []byte) {
|
||||
if bytes.HasPrefix(key, codePrefix) && len(key) == common.HashLength+len(codePrefix) {
|
||||
return true, key[len(codePrefix):]
|
||||
if bytes.HasPrefix(key, CodePrefix) && len(key) == common.HashLength+len(CodePrefix) {
|
||||
return true, key[len(CodePrefix):]
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
132
core/state/pruner/bloom.go
Normal file
132
core/state/pruner/bloom.go
Normal file
@ -0,0 +1,132 @@
|
||||
// Copyright 2020 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package pruner
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"os"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
bloomfilter "github.com/holiman/bloomfilter/v2"
|
||||
)
|
||||
|
||||
// stateBloomHasher is a wrapper around a byte blob to satisfy the interface API
|
||||
// requirements of the bloom library used. It's used to convert a trie hash or
|
||||
// contract code hash into a 64 bit mini hash.
|
||||
type stateBloomHasher []byte
|
||||
|
||||
func (f stateBloomHasher) Write(p []byte) (n int, err error) { panic("not implemented") }
|
||||
func (f stateBloomHasher) Sum(b []byte) []byte { panic("not implemented") }
|
||||
func (f stateBloomHasher) Reset() { panic("not implemented") }
|
||||
func (f stateBloomHasher) BlockSize() int { panic("not implemented") }
|
||||
func (f stateBloomHasher) Size() int { return 8 }
|
||||
func (f stateBloomHasher) Sum64() uint64 { return binary.BigEndian.Uint64(f) }
|
||||
|
||||
// stateBloom is a bloom filter used during the state convesion(snapshot->state).
|
||||
// The keys of all generated entries will be recorded here so that in the pruning
|
||||
// stage the entries belong to the specific version can be avoided for deletion.
|
||||
//
|
||||
// The false-positive is allowed here. The "false-positive" entries means they
|
||||
// actually don't belong to the specific version but they are not deleted in the
|
||||
// pruning. The downside of the false-positive allowance is we may leave some "dangling"
|
||||
// nodes in the disk. But in practice the it's very unlike the dangling node is
|
||||
// state root. So in theory this pruned state shouldn't be visited anymore. Another
|
||||
// potential issue is for fast sync. If we do another fast sync upon the pruned
|
||||
// database, it's problematic which will stop the expansion during the syncing.
|
||||
// TODO address it @rjl493456442 @holiman @karalabe.
|
||||
//
|
||||
// After the entire state is generated, the bloom filter should be persisted into
|
||||
// the disk. It indicates the whole generation procedure is finished.
|
||||
type stateBloom struct {
|
||||
bloom *bloomfilter.Filter
|
||||
}
|
||||
|
||||
// newStateBloomWithSize creates a brand new state bloom for state generation.
|
||||
// The bloom filter will be created by the passing bloom filter size. According
|
||||
// to the https://hur.st/bloomfilter/?n=600000000&p=&m=2048MB&k=4, the parameters
|
||||
// are picked so that the false-positive rate for mainnet is low enough.
|
||||
func newStateBloomWithSize(size uint64) (*stateBloom, error) {
|
||||
bloom, err := bloomfilter.New(size*1024*1024*8, 4)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Info("Initialized state bloom", "size", common.StorageSize(float64(bloom.M()/8)))
|
||||
return &stateBloom{bloom: bloom}, nil
|
||||
}
|
||||
|
||||
// NewStateBloomFromDisk loads the state bloom from the given file.
|
||||
// In this case the assumption is held the bloom filter is complete.
|
||||
func NewStateBloomFromDisk(filename string) (*stateBloom, error) {
|
||||
bloom, _, err := bloomfilter.ReadFile(filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &stateBloom{bloom: bloom}, nil
|
||||
}
|
||||
|
||||
// Commit flushes the bloom filter content into the disk and marks the bloom
|
||||
// as complete.
|
||||
func (bloom *stateBloom) Commit(filename, tempname string) error {
|
||||
// Write the bloom out into a temporary file
|
||||
_, err := bloom.bloom.WriteFile(tempname)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Ensure the file is synced to disk
|
||||
f, err := os.Open(tempname)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := f.Sync(); err != nil {
|
||||
f.Close()
|
||||
return err
|
||||
}
|
||||
f.Close()
|
||||
|
||||
// Move the teporary file into it's final location
|
||||
return os.Rename(tempname, filename)
|
||||
}
|
||||
|
||||
// Put implements the KeyValueWriter interface. But here only the key is needed.
|
||||
func (bloom *stateBloom) Put(key []byte, value []byte) error {
|
||||
// If the key length is not 32bytes, ensure it's contract code
|
||||
// entry with new scheme.
|
||||
if len(key) != common.HashLength {
|
||||
isCode, codeKey := rawdb.IsCodeKey(key)
|
||||
if !isCode {
|
||||
return errors.New("invalid entry")
|
||||
}
|
||||
bloom.bloom.Add(stateBloomHasher(codeKey))
|
||||
return nil
|
||||
}
|
||||
bloom.bloom.Add(stateBloomHasher(key))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete removes the key from the key-value data store.
|
||||
func (bloom *stateBloom) Delete(key []byte) error { panic("not supported") }
|
||||
|
||||
// Contain is the wrapper of the underlying contains function which
|
||||
// reports whether the key is contained.
|
||||
// - If it says yes, the key may be contained
|
||||
// - If it says no, the key is definitely not contained.
|
||||
func (bloom *stateBloom) Contain(key []byte) (bool, error) {
|
||||
return bloom.bloom.Contains(stateBloomHasher(key)), nil
|
||||
}
|
537
core/state/pruner/pruner.go
Normal file
537
core/state/pruner/pruner.go
Normal file
@ -0,0 +1,537 @@
|
||||
// Copyright 2020 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package pruner
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/ethereum/go-ethereum/core/state/snapshot"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
)
|
||||
|
||||
const (
|
||||
// stateBloomFilePrefix is the filename prefix of state bloom filter.
|
||||
stateBloomFilePrefix = "statebloom"
|
||||
|
||||
// stateBloomFilePrefix is the filename suffix of state bloom filter.
|
||||
stateBloomFileSuffix = "bf.gz"
|
||||
|
||||
// stateBloomFileTempSuffix is the filename suffix of state bloom filter
|
||||
// while it is being written out to detect write aborts.
|
||||
stateBloomFileTempSuffix = ".tmp"
|
||||
|
||||
// rangeCompactionThreshold is the minimal deleted entry number for
|
||||
// triggering range compaction. It's a quite arbitrary number but just
|
||||
// to avoid triggering range compaction because of small deletion.
|
||||
rangeCompactionThreshold = 100000
|
||||
)
|
||||
|
||||
var (
|
||||
// emptyRoot is the known root hash of an empty trie.
|
||||
emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
|
||||
|
||||
// emptyCode is the known hash of the empty EVM bytecode.
|
||||
emptyCode = crypto.Keccak256(nil)
|
||||
)
|
||||
|
||||
// Pruner is an offline tool to prune the stale state with the
|
||||
// help of the snapshot. The workflow of pruner is very simple:
|
||||
//
|
||||
// - iterate the snapshot, reconstruct the relevant state
|
||||
// - iterate the database, delete all other state entries which
|
||||
// don't belong to the target state and the genesis state
|
||||
//
|
||||
// It can take several hours(around 2 hours for mainnet) to finish
|
||||
// the whole pruning work. It's recommended to run this offline tool
|
||||
// periodically in order to release the disk usage and improve the
|
||||
// disk read performance to some extent.
|
||||
type Pruner struct {
|
||||
db ethdb.Database
|
||||
stateBloom *stateBloom
|
||||
datadir string
|
||||
trieCachePath string
|
||||
headHeader *types.Header
|
||||
snaptree *snapshot.Tree
|
||||
}
|
||||
|
||||
// NewPruner creates the pruner instance.
|
||||
func NewPruner(db ethdb.Database, headHeader *types.Header, datadir, trieCachePath string, bloomSize uint64) (*Pruner, error) {
|
||||
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, headHeader.Root, false, false, false)
|
||||
if err != nil {
|
||||
return nil, err // The relevant snapshot(s) might not exist
|
||||
}
|
||||
// Sanitize the bloom filter size if it's too small.
|
||||
if bloomSize < 256 {
|
||||
log.Warn("Sanitizing bloomfilter size", "provided(MB)", bloomSize, "updated(MB)", 256)
|
||||
bloomSize = 256
|
||||
}
|
||||
stateBloom, err := newStateBloomWithSize(bloomSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Pruner{
|
||||
db: db,
|
||||
stateBloom: stateBloom,
|
||||
datadir: datadir,
|
||||
trieCachePath: trieCachePath,
|
||||
headHeader: headHeader,
|
||||
snaptree: snaptree,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func prune(maindb ethdb.Database, stateBloom *stateBloom, middleStateRoots map[common.Hash]struct{}, start time.Time) error {
|
||||
// Delete all stale trie nodes in the disk. With the help of state bloom
|
||||
// the trie nodes(and codes) belong to the active state will be filtered
|
||||
// out. A very small part of stale tries will also be filtered because of
|
||||
// the false-positive rate of bloom filter. But the assumption is held here
|
||||
// that the false-positive is low enough(~0.05%). The probablity of the
|
||||
// dangling node is the state root is super low. So the dangling nodes in
|
||||
// theory will never ever be visited again.
|
||||
var (
|
||||
count int
|
||||
size common.StorageSize
|
||||
pstart = time.Now()
|
||||
logged = time.Now()
|
||||
batch = maindb.NewBatch()
|
||||
iter = maindb.NewIterator(nil, nil)
|
||||
)
|
||||
for iter.Next() {
|
||||
key := iter.Key()
|
||||
|
||||
// All state entries don't belong to specific state and genesis are deleted here
|
||||
// - trie node
|
||||
// - legacy contract code
|
||||
// - new-scheme contract code
|
||||
isCode, codeKey := rawdb.IsCodeKey(key)
|
||||
if len(key) == common.HashLength || isCode {
|
||||
checkKey := key
|
||||
if isCode {
|
||||
checkKey = codeKey
|
||||
}
|
||||
if _, exist := middleStateRoots[common.BytesToHash(checkKey)]; exist {
|
||||
log.Debug("Forcibly delete the middle state roots", "hash", common.BytesToHash(checkKey))
|
||||
} else {
|
||||
if ok, err := stateBloom.Contain(checkKey); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
count += 1
|
||||
size += common.StorageSize(len(key) + len(iter.Value()))
|
||||
batch.Delete(key)
|
||||
|
||||
var eta time.Duration // Realistically will never remain uninited
|
||||
if done := binary.BigEndian.Uint64(key[:8]); done > 0 {
|
||||
var (
|
||||
left = math.MaxUint64 - binary.BigEndian.Uint64(key[:8])
|
||||
speed = done/uint64(time.Since(start)/time.Millisecond+1) + 1 // +1s to avoid division by zero
|
||||
)
|
||||
eta = time.Duration(left/speed) * time.Millisecond
|
||||
}
|
||||
if time.Since(logged) > 8*time.Second {
|
||||
log.Info("Pruning state data", "nodes", count, "size", size,
|
||||
"elapsed", common.PrettyDuration(time.Since(pstart)), "eta", common.PrettyDuration(eta))
|
||||
logged = time.Now()
|
||||
}
|
||||
// Recreate the iterator after every batch commit in order
|
||||
// to allow the underlying compactor to delete the entries.
|
||||
if batch.ValueSize() >= ethdb.IdealBatchSize {
|
||||
batch.Write()
|
||||
batch.Reset()
|
||||
|
||||
iter.Release()
|
||||
iter = maindb.NewIterator(nil, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
if batch.ValueSize() > 0 {
|
||||
batch.Write()
|
||||
batch.Reset()
|
||||
}
|
||||
iter.Release()
|
||||
log.Info("Pruned state data", "nodes", count, "size", size, "elapsed", common.PrettyDuration(time.Since(pstart)))
|
||||
|
||||
// Start compactions, will remove the deleted data from the disk immediately.
|
||||
// Note for small pruning, the compaction is skipped.
|
||||
if count >= rangeCompactionThreshold {
|
||||
cstart := time.Now()
|
||||
|
||||
for b := byte(0); b < byte(16); b++ {
|
||||
log.Info("Compacting database", "range", fmt.Sprintf("%#x-%#x", b, b+1), "elapsed", common.PrettyDuration(time.Since(cstart)))
|
||||
if err := maindb.Compact([]byte{b}, []byte{b + 1}); err != nil {
|
||||
log.Error("Database compaction failed", "error", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
log.Info("Database compaction finished", "elapsed", common.PrettyDuration(time.Since(cstart)))
|
||||
}
|
||||
log.Info("State pruning successful", "pruned", size, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Prune deletes all historical state nodes except the nodes belong to the
|
||||
// specified state version. If user doesn't specify the state version, use
|
||||
// the bottom-most snapshot diff layer as the target.
|
||||
func (p *Pruner) Prune(root common.Hash) error {
|
||||
// If the state bloom filter is already committed previously,
|
||||
// reuse it for pruning instead of generating a new one. It's
|
||||
// mandatory because a part of state may already be deleted,
|
||||
// the recovery procedure is necessary.
|
||||
_, stateBloomRoot, err := findBloomFilter(p.datadir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if stateBloomRoot != (common.Hash{}) {
|
||||
return RecoverPruning(p.datadir, p.db, p.trieCachePath)
|
||||
}
|
||||
// If the target state root is not specified, use the HEAD-127 as the
|
||||
// target. The reason for picking it is:
|
||||
// - in most of the normal cases, the related state is available
|
||||
// - the probability of this layer being reorg is very low
|
||||
var layers []snapshot.Snapshot
|
||||
if root == (common.Hash{}) {
|
||||
// Retrieve all snapshot layers from the current HEAD.
|
||||
// In theory there are 128 difflayers + 1 disk layer present,
|
||||
// so 128 diff layers are expected to be returned.
|
||||
layers = p.snaptree.Snapshots(p.headHeader.Root, 128, true)
|
||||
if len(layers) != 128 {
|
||||
// Reject if the accumulated diff layers are less than 128. It
|
||||
// means in most of normal cases, there is no associated state
|
||||
// with bottom-most diff layer.
|
||||
return errors.New("the snapshot difflayers are less than 128")
|
||||
}
|
||||
// Use the bottom-most diff layer as the target
|
||||
root = layers[len(layers)-1].Root()
|
||||
}
|
||||
// Ensure the root is really present. The weak assumption
|
||||
// is the presence of root can indicate the presence of the
|
||||
// entire trie.
|
||||
if blob := rawdb.ReadTrieNode(p.db, root); len(blob) == 0 {
|
||||
// The special case is for clique based networks(rinkeby, goerli
|
||||
// and some other private networks), it's possible that two
|
||||
// consecutive blocks will have same root. In this case snapshot
|
||||
// difflayer won't be created. So HEAD-127 may not paired with
|
||||
// head-127 layer. Instead the paired layer is higher than the
|
||||
// bottom-most diff layer. Try to find the bottom-most snapshot
|
||||
// layer with state available.
|
||||
//
|
||||
// Note HEAD and HEAD-1 is ignored. Usually there is the associated
|
||||
// state available, but we don't want to use the topmost state
|
||||
// as the pruning target.
|
||||
var found bool
|
||||
for i := len(layers) - 2; i >= 2; i-- {
|
||||
if blob := rawdb.ReadTrieNode(p.db, layers[i].Root()); len(blob) != 0 {
|
||||
root = layers[i].Root()
|
||||
found = true
|
||||
log.Info("Selecting middle-layer as the pruning target", "root", root, "depth", i)
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
if len(layers) > 0 {
|
||||
return errors.New("no snapshot paired state")
|
||||
}
|
||||
return fmt.Errorf("associated state[%x] is not present", root)
|
||||
}
|
||||
} else {
|
||||
if len(layers) > 0 {
|
||||
log.Info("Selecting bottom-most difflayer as the pruning target", "root", root, "height", p.headHeader.Number.Uint64()-127)
|
||||
} else {
|
||||
log.Info("Selecting user-specified state as the pruning target", "root", root)
|
||||
}
|
||||
}
|
||||
// Before start the pruning, delete the clean trie cache first.
|
||||
// It's necessary otherwise in the next restart we will hit the
|
||||
// deleted state root in the "clean cache" so that the incomplete
|
||||
// state is picked for usage.
|
||||
deleteCleanTrieCache(p.trieCachePath)
|
||||
|
||||
// All the state roots of the middle layer should be forcibly pruned,
|
||||
// otherwise the dangling state will be left.
|
||||
middleRoots := make(map[common.Hash]struct{})
|
||||
for _, layer := range layers {
|
||||
if layer.Root() == root {
|
||||
break
|
||||
}
|
||||
middleRoots[layer.Root()] = struct{}{}
|
||||
}
|
||||
// Traverse the target state, re-construct the whole state trie and
|
||||
// commit to the given bloom filter.
|
||||
start := time.Now()
|
||||
if err := snapshot.GenerateTrie(p.snaptree, root, p.db, p.stateBloom); err != nil {
|
||||
return err
|
||||
}
|
||||
// Traverse the genesis, put all genesis state entries into the
|
||||
// bloom filter too.
|
||||
if err := extractGenesis(p.db, p.stateBloom); err != nil {
|
||||
return err
|
||||
}
|
||||
filterName := bloomFilterName(p.datadir, root)
|
||||
|
||||
log.Info("Writing state bloom to disk", "name", filterName)
|
||||
if err := p.stateBloom.Commit(filterName, filterName+stateBloomFileTempSuffix); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Info("State bloom filter committed", "name", filterName)
|
||||
|
||||
if err := prune(p.db, p.stateBloom, middleRoots, start); err != nil {
|
||||
return err
|
||||
}
|
||||
// Pruning is done, now drop the "useless" layers from the snapshot.
|
||||
// Firstly, flushing the target layer into the disk. After that all
|
||||
// diff layers below the target will all be merged into the disk.
|
||||
if err := p.snaptree.Cap(root, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
// Secondly, flushing the snapshot journal into the disk. All diff
|
||||
// layers upon the target layer are dropped silently. Eventually the
|
||||
// entire snapshot tree is converted into a single disk layer with
|
||||
// the pruning target as the root.
|
||||
if _, err := p.snaptree.Journal(root); err != nil {
|
||||
return err
|
||||
}
|
||||
// Delete the state bloom, it marks the entire pruning procedure is
|
||||
// finished. If any crashes or manual exit happens before this,
|
||||
// `RecoverPruning` will pick it up in the next restarts to redo all
|
||||
// the things.
|
||||
os.RemoveAll(filterName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// RecoverPruning will resume the pruning procedure during the system restart.
|
||||
// This function is used in this case: user tries to prune state data, but the
|
||||
// system was interrupted midway because of crash or manual-kill. In this case
|
||||
// if the bloom filter for filtering active state is already constructed, the
|
||||
// pruning can be resumed. What's more if the bloom filter is constructed, the
|
||||
// pruning **has to be resumed**. Otherwise a lot of dangling nodes may be left
|
||||
// in the disk.
|
||||
func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) error {
|
||||
stateBloomPath, stateBloomRoot, err := findBloomFilter(datadir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if stateBloomPath == "" {
|
||||
return nil // nothing to recover
|
||||
}
|
||||
headHeader, err := getHeadHeader(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Initialize the snapshot tree in recovery mode to handle this special case:
|
||||
// - Users run the `prune-state` command multiple times
|
||||
// - Neither these `prune-state` running is finished(e.g. interrupted manually)
|
||||
// - The state bloom filter is already generated, a part of state is deleted,
|
||||
// so that resuming the pruning here is mandatory
|
||||
// - The state HEAD is rewound already because of multiple incomplete `prune-state`
|
||||
// In this case, even the state HEAD is not exactly matched with snapshot, it
|
||||
// still feasible to recover the pruning correctly.
|
||||
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, headHeader.Root, false, false, true)
|
||||
if err != nil {
|
||||
return err // The relevant snapshot(s) might not exist
|
||||
}
|
||||
stateBloom, err := NewStateBloomFromDisk(stateBloomPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Info("Loaded state bloom filter", "path", stateBloomPath)
|
||||
|
||||
// Before start the pruning, delete the clean trie cache first.
|
||||
// It's necessary otherwise in the next restart we will hit the
|
||||
// deleted state root in the "clean cache" so that the incomplete
|
||||
// state is picked for usage.
|
||||
deleteCleanTrieCache(trieCachePath)
|
||||
|
||||
// All the state roots of the middle layers should be forcibly pruned,
|
||||
// otherwise the dangling state will be left.
|
||||
var (
|
||||
found bool
|
||||
layers = snaptree.Snapshots(headHeader.Root, 128, true)
|
||||
middleRoots = make(map[common.Hash]struct{})
|
||||
)
|
||||
for _, layer := range layers {
|
||||
if layer.Root() == stateBloomRoot {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
middleRoots[layer.Root()] = struct{}{}
|
||||
}
|
||||
if !found {
|
||||
log.Error("Pruning target state is not existent")
|
||||
return errors.New("non-existent target state")
|
||||
}
|
||||
if err := prune(db, stateBloom, middleRoots, time.Now()); err != nil {
|
||||
return err
|
||||
}
|
||||
// Pruning is done, now drop the "useless" layers from the snapshot.
|
||||
// Firstly, flushing the target layer into the disk. After that all
|
||||
// diff layers below the target will all be merged into the disk.
|
||||
if err := snaptree.Cap(stateBloomRoot, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
// Secondly, flushing the snapshot journal into the disk. All diff
|
||||
// layers upon are dropped silently. Eventually the entire snapshot
|
||||
// tree is converted into a single disk layer with the pruning target
|
||||
// as the root.
|
||||
if _, err := snaptree.Journal(stateBloomRoot); err != nil {
|
||||
return err
|
||||
}
|
||||
// Delete the state bloom, it marks the entire pruning procedure is
|
||||
// finished. If any crashes or manual exit happens before this,
|
||||
// `RecoverPruning` will pick it up in the next restarts to redo all
|
||||
// the things.
|
||||
os.RemoveAll(stateBloomPath)
|
||||
return nil
|
||||
}
|
||||
|
||||
// extractGenesis loads the genesis state and commits all the state entries
|
||||
// into the given bloomfilter.
|
||||
func extractGenesis(db ethdb.Database, stateBloom *stateBloom) error {
|
||||
genesisHash := rawdb.ReadCanonicalHash(db, 0)
|
||||
if genesisHash == (common.Hash{}) {
|
||||
return errors.New("missing genesis hash")
|
||||
}
|
||||
genesis := rawdb.ReadBlock(db, genesisHash, 0)
|
||||
if genesis == nil {
|
||||
return errors.New("missing genesis block")
|
||||
}
|
||||
t, err := trie.NewSecure(genesis.Root(), trie.NewDatabase(db))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
accIter := t.NodeIterator(nil)
|
||||
for accIter.Next(true) {
|
||||
hash := accIter.Hash()
|
||||
|
||||
// Embedded nodes don't have hash.
|
||||
if hash != (common.Hash{}) {
|
||||
stateBloom.Put(hash.Bytes(), nil)
|
||||
}
|
||||
// If it's a leaf node, yes we are touching an account,
|
||||
// dig into the storage trie further.
|
||||
if accIter.Leaf() {
|
||||
var acc state.Account
|
||||
if err := rlp.DecodeBytes(accIter.LeafBlob(), &acc); err != nil {
|
||||
return err
|
||||
}
|
||||
if acc.Root != emptyRoot {
|
||||
storageTrie, err := trie.NewSecure(acc.Root, trie.NewDatabase(db))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
storageIter := storageTrie.NodeIterator(nil)
|
||||
for storageIter.Next(true) {
|
||||
hash := storageIter.Hash()
|
||||
if hash != (common.Hash{}) {
|
||||
stateBloom.Put(hash.Bytes(), nil)
|
||||
}
|
||||
}
|
||||
if storageIter.Error() != nil {
|
||||
return storageIter.Error()
|
||||
}
|
||||
}
|
||||
if !bytes.Equal(acc.CodeHash, emptyCode) {
|
||||
stateBloom.Put(acc.CodeHash, nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
return accIter.Error()
|
||||
}
|
||||
|
||||
func bloomFilterName(datadir string, hash common.Hash) string {
|
||||
return filepath.Join(datadir, fmt.Sprintf("%s.%s.%s", stateBloomFilePrefix, hash.Hex(), stateBloomFileSuffix))
|
||||
}
|
||||
|
||||
func isBloomFilter(filename string) (bool, common.Hash) {
|
||||
filename = filepath.Base(filename)
|
||||
if strings.HasPrefix(filename, stateBloomFilePrefix) && strings.HasSuffix(filename, stateBloomFileSuffix) {
|
||||
return true, common.HexToHash(filename[len(stateBloomFilePrefix)+1 : len(filename)-len(stateBloomFileSuffix)-1])
|
||||
}
|
||||
return false, common.Hash{}
|
||||
}
|
||||
|
||||
func findBloomFilter(datadir string) (string, common.Hash, error) {
|
||||
var (
|
||||
stateBloomPath string
|
||||
stateBloomRoot common.Hash
|
||||
)
|
||||
if err := filepath.Walk(datadir, func(path string, info os.FileInfo, err error) error {
|
||||
if info != nil && !info.IsDir() {
|
||||
ok, root := isBloomFilter(path)
|
||||
if ok {
|
||||
stateBloomPath = path
|
||||
stateBloomRoot = root
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return "", common.Hash{}, err
|
||||
}
|
||||
return stateBloomPath, stateBloomRoot, nil
|
||||
}
|
||||
|
||||
func getHeadHeader(db ethdb.Database) (*types.Header, error) {
|
||||
headHeaderHash := rawdb.ReadHeadBlockHash(db)
|
||||
if headHeaderHash == (common.Hash{}) {
|
||||
return nil, errors.New("empty head block hash")
|
||||
}
|
||||
headHeaderNumber := rawdb.ReadHeaderNumber(db, headHeaderHash)
|
||||
if headHeaderNumber == nil {
|
||||
return nil, errors.New("empty head block number")
|
||||
}
|
||||
headHeader := rawdb.ReadHeader(db, headHeaderHash, *headHeaderNumber)
|
||||
if headHeader == nil {
|
||||
return nil, errors.New("empty head header")
|
||||
}
|
||||
return headHeader, nil
|
||||
}
|
||||
|
||||
const warningLog = `
|
||||
|
||||
WARNING!
|
||||
|
||||
The clean trie cache is not found. Please delete it by yourself after the
|
||||
pruning. Remember don't start the Geth without deleting the clean trie cache
|
||||
otherwise the entire database may be damaged!
|
||||
|
||||
Check the command description "geth snapshot prune-state --help" for more details.
|
||||
`
|
||||
|
||||
func deleteCleanTrieCache(path string) {
|
||||
if _, err := os.Stat(path); os.IsNotExist(err) {
|
||||
log.Warn(warningLog)
|
||||
return
|
||||
}
|
||||
os.RemoveAll(path)
|
||||
log.Info("Deleted trie clean cache", "path", path)
|
||||
}
|
@ -18,12 +18,17 @@ package snapshot
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/ethdb/memorydb"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
@ -38,46 +43,56 @@ type trieKV struct {
|
||||
type (
|
||||
// trieGeneratorFn is the interface of trie generation which can
|
||||
// be implemented by different trie algorithm.
|
||||
trieGeneratorFn func(in chan trieKV, out chan common.Hash)
|
||||
trieGeneratorFn func(db ethdb.KeyValueWriter, in chan (trieKV), out chan (common.Hash))
|
||||
|
||||
// leafCallbackFn is the callback invoked at the leaves of the trie,
|
||||
// returns the subtrie root with the specified subtrie identifier.
|
||||
leafCallbackFn func(hash common.Hash, stat *generateStats) common.Hash
|
||||
leafCallbackFn func(db ethdb.KeyValueWriter, accountHash, codeHash common.Hash, stat *generateStats) (common.Hash, error)
|
||||
)
|
||||
|
||||
// GenerateAccountTrieRoot takes an account iterator and reproduces the root hash.
|
||||
func GenerateAccountTrieRoot(it AccountIterator) (common.Hash, error) {
|
||||
return generateTrieRoot(it, common.Hash{}, stdGenerate, nil, &generateStats{start: time.Now()}, true)
|
||||
return generateTrieRoot(nil, it, common.Hash{}, stackTrieGenerate, nil, newGenerateStats(), true)
|
||||
}
|
||||
|
||||
// GenerateStorageTrieRoot takes a storage iterator and reproduces the root hash.
|
||||
func GenerateStorageTrieRoot(account common.Hash, it StorageIterator) (common.Hash, error) {
|
||||
return generateTrieRoot(it, account, stdGenerate, nil, &generateStats{start: time.Now()}, true)
|
||||
return generateTrieRoot(nil, it, account, stackTrieGenerate, nil, newGenerateStats(), true)
|
||||
}
|
||||
|
||||
// VerifyState takes the whole snapshot tree as the input, traverses all the accounts
|
||||
// as well as the corresponding storages and compares the re-computed hash with the
|
||||
// original one(state root and the storage root).
|
||||
func VerifyState(snaptree *Tree, root common.Hash) error {
|
||||
// GenerateTrie takes the whole snapshot tree as the input, traverses all the
|
||||
// accounts as well as the corresponding storages and regenerate the whole state
|
||||
// (account trie + all storage tries).
|
||||
func GenerateTrie(snaptree *Tree, root common.Hash, src ethdb.Database, dst ethdb.KeyValueWriter) error {
|
||||
// Traverse all state by snapshot, re-generate the whole state trie
|
||||
acctIt, err := snaptree.AccountIterator(root, common.Hash{})
|
||||
if err != nil {
|
||||
return err
|
||||
return err // The required snapshot might not exist.
|
||||
}
|
||||
defer acctIt.Release()
|
||||
|
||||
got, err := generateTrieRoot(acctIt, common.Hash{}, stdGenerate, func(account common.Hash, stat *generateStats) common.Hash {
|
||||
storageIt, err := snaptree.StorageIterator(root, account, common.Hash{})
|
||||
got, err := generateTrieRoot(dst, acctIt, common.Hash{}, stackTrieGenerate, func(dst ethdb.KeyValueWriter, accountHash, codeHash common.Hash, stat *generateStats) (common.Hash, error) {
|
||||
// Migrate the code first, commit the contract code into the tmp db.
|
||||
if codeHash != emptyCode {
|
||||
code := rawdb.ReadCode(src, codeHash)
|
||||
if len(code) == 0 {
|
||||
return common.Hash{}, errors.New("failed to read contract code")
|
||||
}
|
||||
rawdb.WriteCode(dst, codeHash, code)
|
||||
}
|
||||
// Then migrate all storage trie nodes into the tmp db.
|
||||
storageIt, err := snaptree.StorageIterator(root, accountHash, common.Hash{})
|
||||
if err != nil {
|
||||
return common.Hash{}
|
||||
return common.Hash{}, err
|
||||
}
|
||||
defer storageIt.Release()
|
||||
|
||||
hash, err := generateTrieRoot(storageIt, account, stdGenerate, nil, stat, false)
|
||||
hash, err := generateTrieRoot(dst, storageIt, accountHash, stackTrieGenerate, nil, stat, false)
|
||||
if err != nil {
|
||||
return common.Hash{}
|
||||
return common.Hash{}, err
|
||||
}
|
||||
return hash
|
||||
}, &generateStats{start: time.Now()}, true)
|
||||
return hash, nil
|
||||
}, newGenerateStats(), true)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
@ -91,23 +106,64 @@ func VerifyState(snaptree *Tree, root common.Hash) error {
|
||||
// generateStats is a collection of statistics gathered by the trie generator
|
||||
// for logging purposes.
|
||||
type generateStats struct {
|
||||
accounts uint64
|
||||
slots uint64
|
||||
curAccount common.Hash
|
||||
curSlot common.Hash
|
||||
start time.Time
|
||||
lock sync.RWMutex
|
||||
head common.Hash
|
||||
start time.Time
|
||||
|
||||
accounts uint64 // Number of accounts done (including those being crawled)
|
||||
slots uint64 // Number of storage slots done (including those being crawled)
|
||||
|
||||
slotsStart map[common.Hash]time.Time // Start time for account slot crawling
|
||||
slotsHead map[common.Hash]common.Hash // Slot head for accounts being crawled
|
||||
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
// progress records the progress trie generator made recently.
|
||||
func (stat *generateStats) progress(accounts, slots uint64, curAccount common.Hash, curSlot common.Hash) {
|
||||
// newGenerateStats creates a new generator stats.
|
||||
func newGenerateStats() *generateStats {
|
||||
return &generateStats{
|
||||
slotsStart: make(map[common.Hash]time.Time),
|
||||
slotsHead: make(map[common.Hash]common.Hash),
|
||||
start: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// progressAccounts updates the generator stats for the account range.
|
||||
func (stat *generateStats) progressAccounts(account common.Hash, done uint64) {
|
||||
stat.lock.Lock()
|
||||
defer stat.lock.Unlock()
|
||||
|
||||
stat.accounts += accounts
|
||||
stat.slots += slots
|
||||
stat.curAccount = curAccount
|
||||
stat.curSlot = curSlot
|
||||
stat.accounts += done
|
||||
stat.head = account
|
||||
}
|
||||
|
||||
// finishAccounts updates the gemerator stats for the finished account range.
|
||||
func (stat *generateStats) finishAccounts(done uint64) {
|
||||
stat.lock.Lock()
|
||||
defer stat.lock.Unlock()
|
||||
|
||||
stat.accounts += done
|
||||
}
|
||||
|
||||
// progressContract updates the generator stats for a specific in-progress contract.
|
||||
func (stat *generateStats) progressContract(account common.Hash, slot common.Hash, done uint64) {
|
||||
stat.lock.Lock()
|
||||
defer stat.lock.Unlock()
|
||||
|
||||
stat.slots += done
|
||||
stat.slotsHead[account] = slot
|
||||
if _, ok := stat.slotsStart[account]; !ok {
|
||||
stat.slotsStart[account] = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
// finishContract updates the generator stats for a specific just-finished contract.
|
||||
func (stat *generateStats) finishContract(account common.Hash, done uint64) {
|
||||
stat.lock.Lock()
|
||||
defer stat.lock.Unlock()
|
||||
|
||||
stat.slots += done
|
||||
delete(stat.slotsHead, account)
|
||||
delete(stat.slotsStart, account)
|
||||
}
|
||||
|
||||
// report prints the cumulative progress statistic smartly.
|
||||
@ -115,22 +171,39 @@ func (stat *generateStats) report() {
|
||||
stat.lock.RLock()
|
||||
defer stat.lock.RUnlock()
|
||||
|
||||
var ctx []interface{}
|
||||
if stat.curSlot != (common.Hash{}) {
|
||||
ctx = append(ctx, []interface{}{
|
||||
"in", stat.curAccount,
|
||||
"at", stat.curSlot,
|
||||
}...)
|
||||
} else {
|
||||
ctx = append(ctx, []interface{}{"at", stat.curAccount}...)
|
||||
ctx := []interface{}{
|
||||
"accounts", stat.accounts,
|
||||
"slots", stat.slots,
|
||||
"elapsed", common.PrettyDuration(time.Since(stat.start)),
|
||||
}
|
||||
// Add the usual measurements
|
||||
ctx = append(ctx, []interface{}{"accounts", stat.accounts}...)
|
||||
if stat.slots != 0 {
|
||||
ctx = append(ctx, []interface{}{"slots", stat.slots}...)
|
||||
if stat.accounts > 0 {
|
||||
// If there's progress on the account trie, estimate the time to finish crawling it
|
||||
if done := binary.BigEndian.Uint64(stat.head[:8]) / stat.accounts; done > 0 {
|
||||
var (
|
||||
left = (math.MaxUint64 - binary.BigEndian.Uint64(stat.head[:8])) / stat.accounts
|
||||
speed = done/uint64(time.Since(stat.start)/time.Millisecond+1) + 1 // +1s to avoid division by zero
|
||||
eta = time.Duration(left/speed) * time.Millisecond
|
||||
)
|
||||
// If there are large contract crawls in progress, estimate their finish time
|
||||
for acc, head := range stat.slotsHead {
|
||||
start := stat.slotsStart[acc]
|
||||
if done := binary.BigEndian.Uint64(head[:8]); done > 0 {
|
||||
var (
|
||||
left = math.MaxUint64 - binary.BigEndian.Uint64(head[:8])
|
||||
speed = done/uint64(time.Since(start)/time.Millisecond+1) + 1 // +1s to avoid division by zero
|
||||
)
|
||||
// Override the ETA if larger than the largest until now
|
||||
if slotETA := time.Duration(left/speed) * time.Millisecond; eta < slotETA {
|
||||
eta = slotETA
|
||||
}
|
||||
}
|
||||
}
|
||||
ctx = append(ctx, []interface{}{
|
||||
"eta", common.PrettyDuration(eta),
|
||||
}...)
|
||||
}
|
||||
}
|
||||
ctx = append(ctx, []interface{}{"elapsed", common.PrettyDuration(time.Since(stat.start))}...)
|
||||
log.Info("Generating trie hash from snapshot", ctx...)
|
||||
log.Info("Iterating state snapshot", ctx...)
|
||||
}
|
||||
|
||||
// reportDone prints the last log when the whole generation is finished.
|
||||
@ -144,13 +217,32 @@ func (stat *generateStats) reportDone() {
|
||||
ctx = append(ctx, []interface{}{"slots", stat.slots}...)
|
||||
}
|
||||
ctx = append(ctx, []interface{}{"elapsed", common.PrettyDuration(time.Since(stat.start))}...)
|
||||
log.Info("Generated trie hash from snapshot", ctx...)
|
||||
log.Info("Iterated snapshot", ctx...)
|
||||
}
|
||||
|
||||
// runReport periodically prints the progress information.
|
||||
func runReport(stats *generateStats, stop chan bool) {
|
||||
timer := time.NewTimer(0)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
stats.report()
|
||||
timer.Reset(time.Second * 8)
|
||||
case success := <-stop:
|
||||
if success {
|
||||
stats.reportDone()
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// generateTrieRoot generates the trie hash based on the snapshot iterator.
|
||||
// It can be used for generating account trie, storage trie or even the
|
||||
// whole state which connects the accounts and the corresponding storages.
|
||||
func generateTrieRoot(it Iterator, account common.Hash, generatorFn trieGeneratorFn, leafCallback leafCallbackFn, stats *generateStats, report bool) (common.Hash, error) {
|
||||
func generateTrieRoot(db ethdb.KeyValueWriter, it Iterator, account common.Hash, generatorFn trieGeneratorFn, leafCallback leafCallbackFn, stats *generateStats, report bool) (common.Hash, error) {
|
||||
var (
|
||||
in = make(chan trieKV) // chan to pass leaves
|
||||
out = make(chan common.Hash, 1) // chan to collect result
|
||||
@ -161,46 +253,43 @@ func generateTrieRoot(it Iterator, account common.Hash, generatorFn trieGenerato
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
generatorFn(in, out)
|
||||
generatorFn(db, in, out)
|
||||
}()
|
||||
|
||||
// Spin up a go-routine for progress logging
|
||||
if report && stats != nil {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
timer := time.NewTimer(0)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
stats.report()
|
||||
timer.Reset(time.Second * 8)
|
||||
case success := <-stoplog:
|
||||
if success {
|
||||
stats.reportDone()
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
runReport(stats, stoplog)
|
||||
}()
|
||||
}
|
||||
// Create a semaphore to assign tasks and collect results through. We'll pre-
|
||||
// fill it with nils, thus using the same channel for both limiting concurrent
|
||||
// processing and gathering results.
|
||||
threads := runtime.NumCPU()
|
||||
results := make(chan error, threads)
|
||||
for i := 0; i < threads; i++ {
|
||||
results <- nil // fill the semaphore
|
||||
}
|
||||
// stop is a helper function to shutdown the background threads
|
||||
// and return the re-generated trie hash.
|
||||
stop := func(success bool) common.Hash {
|
||||
stop := func(fail error) (common.Hash, error) {
|
||||
close(in)
|
||||
result := <-out
|
||||
stoplog <- success
|
||||
for i := 0; i < threads; i++ {
|
||||
if err := <-results; err != nil && fail == nil {
|
||||
fail = err
|
||||
}
|
||||
}
|
||||
stoplog <- fail == nil
|
||||
|
||||
wg.Wait()
|
||||
return result
|
||||
return result, fail
|
||||
}
|
||||
var (
|
||||
logged = time.Now()
|
||||
processed = uint64(0)
|
||||
leaf trieKV
|
||||
last common.Hash
|
||||
)
|
||||
// Start to feed leaves
|
||||
for it.Next() {
|
||||
@ -212,26 +301,35 @@ func generateTrieRoot(it Iterator, account common.Hash, generatorFn trieGenerato
|
||||
if leafCallback == nil {
|
||||
fullData, err = FullAccountRLP(it.(AccountIterator).Account())
|
||||
if err != nil {
|
||||
stop(false)
|
||||
return common.Hash{}, err
|
||||
return stop(err)
|
||||
}
|
||||
} else {
|
||||
// Wait until the semaphore allows us to continue, aborting if
|
||||
// a sub-task failed
|
||||
if err := <-results; err != nil {
|
||||
results <- nil // stop will drain the results, add a noop back for this error we just consumed
|
||||
return stop(err)
|
||||
}
|
||||
// Fetch the next account and process it concurrently
|
||||
account, err := FullAccount(it.(AccountIterator).Account())
|
||||
if err != nil {
|
||||
stop(false)
|
||||
return common.Hash{}, err
|
||||
}
|
||||
// Apply the leaf callback. Normally the callback is used to traverse
|
||||
// the storage trie and re-generate the subtrie root.
|
||||
subroot := leafCallback(it.Hash(), stats)
|
||||
if !bytes.Equal(account.Root, subroot.Bytes()) {
|
||||
stop(false)
|
||||
return common.Hash{}, fmt.Errorf("invalid subroot(%x), want %x, got %x", it.Hash(), account.Root, subroot)
|
||||
return stop(err)
|
||||
}
|
||||
go func(hash common.Hash) {
|
||||
subroot, err := leafCallback(db, hash, common.BytesToHash(account.CodeHash), stats)
|
||||
if err != nil {
|
||||
results <- err
|
||||
return
|
||||
}
|
||||
if !bytes.Equal(account.Root, subroot.Bytes()) {
|
||||
results <- fmt.Errorf("invalid subroot(%x), want %x, got %x", it.Hash(), account.Root, subroot)
|
||||
return
|
||||
}
|
||||
results <- nil
|
||||
}(it.Hash())
|
||||
fullData, err = rlp.EncodeToBytes(account)
|
||||
if err != nil {
|
||||
stop(false)
|
||||
return common.Hash{}, err
|
||||
return stop(err)
|
||||
}
|
||||
}
|
||||
leaf = trieKV{it.Hash(), fullData}
|
||||
@ -244,32 +342,34 @@ func generateTrieRoot(it Iterator, account common.Hash, generatorFn trieGenerato
|
||||
processed++
|
||||
if time.Since(logged) > 3*time.Second && stats != nil {
|
||||
if account == (common.Hash{}) {
|
||||
stats.progress(processed, 0, it.Hash(), common.Hash{})
|
||||
stats.progressAccounts(it.Hash(), processed)
|
||||
} else {
|
||||
stats.progress(0, processed, account, it.Hash())
|
||||
stats.progressContract(account, it.Hash(), processed)
|
||||
}
|
||||
logged, processed = time.Now(), 0
|
||||
}
|
||||
last = it.Hash()
|
||||
}
|
||||
// Commit the last part statistic.
|
||||
if processed > 0 && stats != nil {
|
||||
if account == (common.Hash{}) {
|
||||
stats.progress(processed, 0, last, common.Hash{})
|
||||
stats.finishAccounts(processed)
|
||||
} else {
|
||||
stats.progress(0, processed, account, last)
|
||||
stats.finishContract(account, processed)
|
||||
}
|
||||
}
|
||||
result := stop(true)
|
||||
return result, nil
|
||||
return stop(nil)
|
||||
}
|
||||
|
||||
// stdGenerate is a very basic hexary trie builder which uses the same Trie
|
||||
// as the rest of geth, with no enhancements or optimizations
|
||||
func stdGenerate(in chan trieKV, out chan common.Hash) {
|
||||
t, _ := trie.New(common.Hash{}, trie.NewDatabase(memorydb.New()))
|
||||
func stackTrieGenerate(db ethdb.KeyValueWriter, in chan trieKV, out chan common.Hash) {
|
||||
t := trie.NewStackTrie(db)
|
||||
for leaf := range in {
|
||||
t.TryUpdate(leaf.key[:], leaf.value)
|
||||
}
|
||||
out <- t.Hash()
|
||||
var root common.Hash
|
||||
if db == nil {
|
||||
root = t.Hash()
|
||||
} else {
|
||||
root, _ = t.Commit()
|
||||
}
|
||||
out <- root
|
||||
}
|
||||
|
@ -178,7 +178,7 @@ type Tree struct {
|
||||
// store, on a background thread. If the memory layers from the journal is not
|
||||
// continuous with disk layer or the journal is missing, all diffs will be discarded
|
||||
// iff it's in "recovery" mode, otherwise rebuild is mandatory.
|
||||
func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, async bool, recovery bool) *Tree {
|
||||
func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, async bool, rebuild bool, recovery bool) (*Tree, error) {
|
||||
// Create a new, empty snapshot tree
|
||||
snap := &Tree{
|
||||
diskdb: diskdb,
|
||||
@ -192,16 +192,19 @@ func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root comm
|
||||
// Attempt to load a previously persisted snapshot and rebuild one if failed
|
||||
head, err := loadSnapshot(diskdb, triedb, cache, root, recovery)
|
||||
if err != nil {
|
||||
log.Warn("Failed to load snapshot, regenerating", "err", err)
|
||||
snap.Rebuild(root)
|
||||
return snap
|
||||
if rebuild {
|
||||
log.Warn("Failed to load snapshot, regenerating", "err", err)
|
||||
snap.Rebuild(root)
|
||||
return snap, nil
|
||||
}
|
||||
return nil, err // Bail out the error, don't rebuild automatically.
|
||||
}
|
||||
// Existing snapshot loaded, seed all the layers
|
||||
for head != nil {
|
||||
snap.layers[head.Root()] = head
|
||||
head = head.Parent()
|
||||
}
|
||||
return snap
|
||||
return snap, nil
|
||||
}
|
||||
|
||||
// waitBuild blocks until the snapshot finishes rebuilding. This method is meant
|
||||
@ -234,6 +237,39 @@ func (t *Tree) Snapshot(blockRoot common.Hash) Snapshot {
|
||||
return t.layers[blockRoot]
|
||||
}
|
||||
|
||||
// Snapshots returns all visited layers from the topmost layer with specific
|
||||
// root and traverses downward. The layer amount is limited by the given number.
|
||||
// If nodisk is set, then disk layer is excluded.
|
||||
func (t *Tree) Snapshots(root common.Hash, limits int, nodisk bool) []Snapshot {
|
||||
t.lock.RLock()
|
||||
defer t.lock.RUnlock()
|
||||
|
||||
if limits == 0 {
|
||||
return nil
|
||||
}
|
||||
layer := t.layers[root]
|
||||
if layer == nil {
|
||||
return nil
|
||||
}
|
||||
var ret []Snapshot
|
||||
for {
|
||||
if _, isdisk := layer.(*diskLayer); isdisk && nodisk {
|
||||
break
|
||||
}
|
||||
ret = append(ret, layer)
|
||||
limits -= 1
|
||||
if limits == 0 {
|
||||
break
|
||||
}
|
||||
parent := layer.Parent()
|
||||
if parent == nil {
|
||||
break
|
||||
}
|
||||
layer = parent
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
// Update adds a new snapshot into the tree, if that can be linked to an existing
|
||||
// old parent. It is disallowed to insert a disk layer (the origin of all).
|
||||
func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) error {
|
||||
@ -681,6 +717,38 @@ func (t *Tree) StorageIterator(root common.Hash, account common.Hash, seek commo
|
||||
return newFastStorageIterator(t, root, account, seek)
|
||||
}
|
||||
|
||||
// Verify iterates the whole state(all the accounts as well as the corresponding storages)
|
||||
// with the specific root and compares the re-computed hash with the original one.
|
||||
func (t *Tree) Verify(root common.Hash) error {
|
||||
acctIt, err := t.AccountIterator(root, common.Hash{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer acctIt.Release()
|
||||
|
||||
got, err := generateTrieRoot(nil, acctIt, common.Hash{}, stackTrieGenerate, func(db ethdb.KeyValueWriter, accountHash, codeHash common.Hash, stat *generateStats) (common.Hash, error) {
|
||||
storageIt, err := t.StorageIterator(root, accountHash, common.Hash{})
|
||||
if err != nil {
|
||||
return common.Hash{}, err
|
||||
}
|
||||
defer storageIt.Release()
|
||||
|
||||
hash, err := generateTrieRoot(nil, storageIt, accountHash, stackTrieGenerate, nil, stat, false)
|
||||
if err != nil {
|
||||
return common.Hash{}, err
|
||||
}
|
||||
return hash, nil
|
||||
}, newGenerateStats(), true)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if got != root {
|
||||
return fmt.Errorf("state root hash mismatch: got %x, want %x", got, root)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// disklayer is an internal helper function to return the disk layer.
|
||||
// The lock of snapTree is assumed to be held already.
|
||||
func (t *Tree) disklayer() *diskLayer {
|
||||
|
@ -17,6 +17,7 @@
|
||||
package snapshot
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"math/rand"
|
||||
@ -369,3 +370,69 @@ func TestPostCapBasicDataAccess(t *testing.T) {
|
||||
t.Error("expected error capping the disk layer, got none")
|
||||
}
|
||||
}
|
||||
|
||||
// TestSnaphots tests the functionality for retrieveing the snapshot
|
||||
// with given head root and the desired depth.
|
||||
func TestSnaphots(t *testing.T) {
|
||||
// setAccount is a helper to construct a random account entry and assign it to
|
||||
// an account slot in a snapshot
|
||||
setAccount := func(accKey string) map[common.Hash][]byte {
|
||||
return map[common.Hash][]byte{
|
||||
common.HexToHash(accKey): randomAccount(),
|
||||
}
|
||||
}
|
||||
makeRoot := func(height uint64) common.Hash {
|
||||
var buffer [8]byte
|
||||
binary.BigEndian.PutUint64(buffer[:], height)
|
||||
return common.BytesToHash(buffer[:])
|
||||
}
|
||||
// Create a starting base layer and a snapshot tree out of it
|
||||
base := &diskLayer{
|
||||
diskdb: rawdb.NewMemoryDatabase(),
|
||||
root: common.HexToHash("0x01"),
|
||||
cache: fastcache.New(1024 * 500),
|
||||
}
|
||||
snaps := &Tree{
|
||||
layers: map[common.Hash]snapshot{
|
||||
base.root: base,
|
||||
},
|
||||
}
|
||||
// Construct the snapshots with 128 layers
|
||||
var (
|
||||
last = common.HexToHash("0x01")
|
||||
head common.Hash
|
||||
)
|
||||
// Flush another 128 layers, one diff will be flatten into the parent.
|
||||
for i := 0; i < 128; i++ {
|
||||
head = makeRoot(uint64(i + 2))
|
||||
snaps.Update(head, last, nil, setAccount(fmt.Sprintf("%d", i+2)), nil)
|
||||
last = head
|
||||
snaps.Cap(head, 128) // 129 layers(128 diffs + 1 disk) are allowed, 129th is the persistent layer
|
||||
}
|
||||
var cases = []struct {
|
||||
headRoot common.Hash
|
||||
limit int
|
||||
nodisk bool
|
||||
expected int
|
||||
expectBottom common.Hash
|
||||
}{
|
||||
{head, 0, false, 0, common.Hash{}},
|
||||
{head, 64, false, 64, makeRoot(127 + 2 - 63)},
|
||||
{head, 128, false, 128, makeRoot(2)}, // All diff layers
|
||||
{head, 129, true, 128, makeRoot(2)}, // All diff layers
|
||||
{head, 129, false, 129, common.HexToHash("0x01")}, // All diff layers + disk layer
|
||||
}
|
||||
for _, c := range cases {
|
||||
layers := snaps.Snapshots(c.headRoot, c.limit, c.nodisk)
|
||||
if len(layers) != c.expected {
|
||||
t.Fatalf("Returned snapshot layers are mismatched, want %v, got %v", c.expected, len(layers))
|
||||
}
|
||||
if len(layers) == 0 {
|
||||
continue
|
||||
}
|
||||
bottommost := layers[len(layers)-1]
|
||||
if bottommost.Root() != c.expectBottom {
|
||||
t.Fatalf("Snapshot mismatch, want %v, get %v", c.expectBottom, bottommost.Root())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user