core/rawdb, core/state/snapshot: runtime snapshot generation

This commit is contained in:
Péter Szilágyi
2019-11-26 09:48:29 +02:00
parent f300c0df01
commit 351a5903b0
21 changed files with 1551 additions and 486 deletions

View File

@ -18,12 +18,13 @@ package snapshot
import (
"bytes"
"fmt"
"encoding/binary"
"math/big"
"time"
"github.com/VictoriaMetrics/fastcache"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
@ -40,103 +41,122 @@ var (
emptyCode = crypto.Keccak256Hash(nil)
)
// wipeSnapshot iterates over the entire key-value database and deletes all the
// data associated with the snapshot (accounts, storage, metadata). After all is
// done, the snapshot range of the database is compacted to free up unused data
// blocks.
func wipeSnapshot(db ethdb.KeyValueStore) error {
// Batch deletions together to avoid holding an iterator for too long
var (
batch = db.NewBatch()
items int
)
// Iterate over the snapshot key-range and delete all of them
log.Info("Deleting previous snapshot leftovers")
start, logged := time.Now(), time.Now()
it := db.NewIteratorWithStart(rawdb.StateSnapshotPrefix)
for it.Next() {
// Skip any keys with the correct prefix but wrong lenth (trie nodes)
key := it.Key()
if !bytes.HasPrefix(key, rawdb.StateSnapshotPrefix) {
break
}
if len(key) != len(rawdb.StateSnapshotPrefix)+common.HashLength && len(key) != len(rawdb.StateSnapshotPrefix)+2*common.HashLength {
continue
}
// Delete the key and periodically recreate the batch and iterator
batch.Delete(key)
items++
if items%10000 == 0 {
// Batch too large (or iterator too long lived, flush and recreate)
it.Release()
if err := batch.Write(); err != nil {
return err
}
batch.Reset()
it = db.NewIteratorWithStart(key)
if time.Since(logged) > 8*time.Second {
log.Info("Deleting previous snapshot leftovers", "wiped", items, "elapsed", time.Since(start))
logged = time.Now()
}
}
}
it.Release()
rawdb.DeleteSnapshotRoot(batch)
if err := batch.Write(); err != nil {
return err
}
log.Info("Deleted previous snapshot leftovers", "wiped", items, "elapsed", time.Since(start))
// Compact the snapshot section of the database to get rid of unused space
log.Info("Compacting snapshot area in database")
start = time.Now()
end := common.CopyBytes(rawdb.StateSnapshotPrefix)
end[len(end)-1]++
if err := db.Compact(rawdb.StateSnapshotPrefix, end); err != nil {
return err
}
log.Info("Compacted snapshot area in database", "elapsed", time.Since(start))
return nil
// generatorStats is a collection of statistics gathered by the snapshot generator
// for logging purposes.
type generatorStats struct {
wiping chan struct{} // Notification channel if wiping is in progress
origin uint64 // Origin prefix where generation started
start time.Time // Timestamp when generation started
accounts uint64 // Number of accounts indexed
slots uint64 // Number of storage slots indexed
storage common.StorageSize // Account and storage slot size
}
// generateSnapshot regenerates a brand new snapshot based on an existing state database and head block.
func generateSnapshot(db ethdb.KeyValueStore, journal string, root common.Hash) (snapshot, error) {
// Wipe any previously existing snapshot from the database
if err := wipeSnapshot(db); err != nil {
return nil, err
}
// Iterate the entire storage trie and re-generate the state snapshot
var (
accountCount int
storageCount int
storageNodes int
accountSize common.StorageSize
storageSize common.StorageSize
logged time.Time
)
batch := db.NewBatch()
triedb := trie.NewDatabase(db)
// Log creates an contextual log with the given message and the context pulled
// from the internally maintained statistics.
func (gs *generatorStats) Log(msg string, marker []byte) {
var ctx []interface{}
accTrie, err := trie.NewSecure(root, triedb)
if err != nil {
return nil, err
// Figure out whether we're after or within an account
switch len(marker) {
case common.HashLength:
ctx = append(ctx, []interface{}{"at", common.BytesToHash(marker)}...)
case 2 * common.HashLength:
ctx = append(ctx, []interface{}{
"in", common.BytesToHash(marker[:common.HashLength]),
"at", common.BytesToHash(marker[common.HashLength:]),
}...)
}
accIt := trie.NewIterator(accTrie.NodeIterator(nil))
// Add the usual measurements
ctx = append(ctx, []interface{}{
"accounts", gs.accounts,
"slots", gs.slots,
"storage", gs.storage,
"elapsed", common.PrettyDuration(time.Since(gs.start)),
}...)
// Calculate the estimated indexing time based on current stats
if len(marker) > 0 {
if done := binary.BigEndian.Uint64(marker[:8]) - gs.origin; done > 0 {
left := math.MaxUint64 - binary.BigEndian.Uint64(marker[:8])
speed := done/uint64(time.Since(gs.start)/time.Millisecond+1) + 1 // +1s to avoid division by zero
ctx = append(ctx, []interface{}{
"eta", common.PrettyDuration(time.Duration(left/speed) * time.Millisecond),
}...)
}
}
log.Info(msg, ctx...)
}
// generateSnapshot regenerates a brand new snapshot based on an existing state
// database and head block asynchronously. The snapshot is returned immediately
// and generation is continued in the background until done.
func generateSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, wiper chan struct{}) *diskLayer {
// Wipe any previously existing snapshot from the database if no wiper is
// currenty in progress.
if wiper == nil {
wiper = wipeSnapshot(diskdb, true)
}
// Create a new disk layer with an initialized state marker at zero
rawdb.WriteSnapshotRoot(diskdb, root)
base := &diskLayer{
diskdb: diskdb,
triedb: triedb,
root: root,
cache: fastcache.New(cache * 1024 * 1024),
genMarker: []byte{}, // Initialized but empty!
genAbort: make(chan chan *generatorStats),
}
go base.generate(&generatorStats{wiping: wiper, start: time.Now()})
return base
}
// generate is a background thread that iterates over the state and storage tries,
// constructing the state snapshot. All the arguments are purely for statistics
// gethering and logging, since the method surfs the blocks as they arrive, often
// being restarted.
func (dl *diskLayer) generate(stats *generatorStats) {
// If a database wipe is in operation, wait until it's done
if stats.wiping != nil {
stats.Log("Wiper running, state snapshotting paused", dl.genMarker)
select {
// If wiper is done, resume normal mode of operation
case <-stats.wiping:
stats.wiping = nil
stats.start = time.Now()
// If generator was aboted during wipe, return
case abort := <-dl.genAbort:
abort <- stats
return
}
}
// Create an account and state iterator pointing to the current generator marker
accTrie, err := trie.NewSecure(dl.root, dl.triedb)
if err != nil {
// The account trie is missing (GC), surf the chain until one becomes available
stats.Log("Trie missing, state snapshotting paused", dl.genMarker)
abort := <-dl.genAbort
abort <- stats
return
}
stats.Log("Resuming state snapshot generation", dl.genMarker)
var accMarker []byte
if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that
accMarker = dl.genMarker[:common.HashLength]
}
accIt := trie.NewIterator(accTrie.NodeIterator(accMarker))
batch := dl.diskdb.NewBatch()
// Iterate from the previous marker and continue generating the state snapshot
logged := time.Now()
for accIt.Next() {
var (
curStorageCount int
curStorageNodes int
curAccountSize common.StorageSize
curStorageSize common.StorageSize
accountHash = common.BytesToHash(accIt.Key)
)
// Retrieve the current account and flatten it into the internal format
accountHash := common.BytesToHash(accIt.Key)
var acc struct {
Nonce uint64
Balance *big.Int
@ -144,63 +164,97 @@ func generateSnapshot(db ethdb.KeyValueStore, journal string, root common.Hash)
CodeHash []byte
}
if err := rlp.DecodeBytes(accIt.Value, &acc); err != nil {
return nil, err
log.Crit("Invalid account encountered during snapshot creation", "err", err)
}
data := AccountRLP(acc.Nonce, acc.Balance, acc.Root, acc.CodeHash)
curAccountSize += common.StorageSize(1 + common.HashLength + len(data))
rawdb.WriteAccountSnapshot(batch, accountHash, data)
if batch.ValueSize() > ethdb.IdealBatchSize {
batch.Write()
batch.Reset()
// If the account is not yet in-progress, write it out
if accMarker == nil || !bytes.Equal(accountHash[:], accMarker) {
rawdb.WriteAccountSnapshot(batch, accountHash, data)
stats.storage += common.StorageSize(1 + common.HashLength + len(data))
stats.accounts++
}
if acc.Root != emptyRoot {
storeTrie, err := trie.NewSecure(acc.Root, triedb)
if err != nil {
return nil, err
}
storeIt := trie.NewIterator(storeTrie.NodeIterator(nil))
for storeIt.Next() {
curStorageSize += common.StorageSize(1 + 2*common.HashLength + len(storeIt.Value))
curStorageCount++
// If we've exceeded our batch allowance or termination was requested, flush to disk
var abort chan *generatorStats
select {
case abort = <-dl.genAbort:
default:
}
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
// Only write and set the marker if we actually did something useful
if batch.ValueSize() > 0 {
batch.Write()
batch.Reset()
dl.lock.Lock()
dl.genMarker = accountHash[:]
dl.lock.Unlock()
}
if abort != nil {
stats.Log("Aborting state snapshot generation", accountHash[:])
abort <- stats
return
}
}
// If the account is in-progress, continue where we left off (otherwise iterate all)
if acc.Root != emptyRoot {
storeTrie, err := trie.NewSecure(acc.Root, dl.triedb)
if err != nil {
log.Crit("Storage trie inaccessible for snapshot generation", "err", err)
}
var storeMarker []byte
if accMarker != nil && bytes.Equal(accountHash[:], accMarker) && len(dl.genMarker) > common.HashLength {
storeMarker = dl.genMarker[common.HashLength:]
}
storeIt := trie.NewIterator(storeTrie.NodeIterator(storeMarker))
for storeIt.Next() {
rawdb.WriteStorageSnapshot(batch, accountHash, common.BytesToHash(storeIt.Key), storeIt.Value)
if batch.ValueSize() > ethdb.IdealBatchSize {
batch.Write()
batch.Reset()
stats.storage += common.StorageSize(1 + 2*common.HashLength + len(storeIt.Value))
stats.slots++
// If we've exceeded our batch allowance or termination was requested, flush to disk
var abort chan *generatorStats
select {
case abort = <-dl.genAbort:
default:
}
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
// Only write and set the marker if we actually did something useful
if batch.ValueSize() > 0 {
batch.Write()
batch.Reset()
dl.lock.Lock()
dl.genMarker = append(accountHash[:], storeIt.Key...)
dl.lock.Unlock()
}
if abort != nil {
stats.Log("Aborting state snapshot generation", append(accountHash[:], storeIt.Key...))
abort <- stats
return
}
}
}
curStorageNodes = storeIt.Nodes
}
accountCount++
storageCount += curStorageCount
accountSize += curAccountSize
storageSize += curStorageSize
storageNodes += curStorageNodes
if time.Since(logged) > 8*time.Second {
fmt.Printf("%#x: %9s + %9s (%6d slots, %6d nodes), total %9s (%d accs, %d nodes) + %9s (%d slots, %d nodes)\n", accIt.Key, curAccountSize.TerminalString(), curStorageSize.TerminalString(), curStorageCount, curStorageNodes, accountSize.TerminalString(), accountCount, accIt.Nodes, storageSize.TerminalString(), storageCount, storageNodes)
stats.Log("Generating state snapshot", accIt.Key)
logged = time.Now()
}
// Some account processed, unmark the marker
accMarker = nil
}
fmt.Printf("Totals: %9s (%d accs, %d nodes) + %9s (%d slots, %d nodes)\n", accountSize.TerminalString(), accountCount, accIt.Nodes, storageSize.TerminalString(), storageCount, storageNodes)
// Update the snapshot block marker and write any remainder data
rawdb.WriteSnapshotRoot(batch, root)
batch.Write()
batch.Reset()
// Compact the snapshot section of the database to get rid of unused space
log.Info("Compacting snapshot in chain database")
if err := db.Compact([]byte{'s'}, []byte{'s' + 1}); err != nil {
return nil, err
// Snapshot fully generated, set the marker to nil
if batch.ValueSize() > 0 {
batch.Write()
}
// New snapshot generated, construct a brand new base layer
cache := fastcache.New(512 * 1024 * 1024)
return &diskLayer{
journal: journal,
db: db,
cache: cache,
root: root,
}, nil
log.Info("Generated state snapshot", "accounts", stats.accounts, "slots", stats.slots,
"storage", stats.storage, "elapsed", common.PrettyDuration(time.Since(stats.start)))
dl.lock.Lock()
dl.genMarker = nil
dl.lock.Unlock()
// Someone will be looking for us, wait it out
abort := <-dl.genAbort
abort <- nil
}