core: initial version of state snapshots

This commit is contained in:
Péter Szilágyi
2019-08-06 13:40:28 +03:00
parent 2a5ed1a1d3
commit 542df8898e
30 changed files with 1635 additions and 85 deletions

View File

@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
@ -61,6 +62,10 @@ var (
storageUpdateTimer = metrics.NewRegisteredTimer("chain/storage/updates", nil)
storageCommitTimer = metrics.NewRegisteredTimer("chain/storage/commits", nil)
snapshotAccountReadTimer = metrics.NewRegisteredTimer("chain/snapshot/accountreads", nil)
snapshotStorageReadTimer = metrics.NewRegisteredTimer("chain/snapshot/storagereads", nil)
snapshotCommitTimer = metrics.NewRegisteredTimer("chain/snapshot/commits", nil)
blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
@ -135,9 +140,10 @@ type BlockChain struct {
chainConfig *params.ChainConfig // Chain & network configuration
cacheConfig *CacheConfig // Cache configuration for pruning
db ethdb.Database // Low level persistent database to store final content in
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.SnapshotTree // Snapshot tree for fast trie leaf access
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
hc *HeaderChain
rmLogsFeed event.Feed
@ -293,6 +299,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
}
}
// Load any existing snapshot, regenerating it if loading failed
head := bc.CurrentBlock()
if bc.snaps, err = snapshot.New(bc.db, "snapshot.rlp", head.NumberU64(), head.Root()); err != nil {
return nil, err
}
// Take ownership of this particular state
go bc.update()
return bc, nil
@ -339,7 +350,7 @@ func (bc *BlockChain) loadLastState() error {
return bc.Reset()
}
// Make sure the state associated with the block is available
if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil {
if _, err := state.New(currentBlock.Root(), bc.stateCache, bc.snaps); err != nil {
// Dangling block without a state associated, init from scratch
log.Warn("Head state missing, repairing chain", "number", currentBlock.Number(), "hash", currentBlock.Hash())
if err := bc.repair(&currentBlock); err != nil {
@ -401,7 +412,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
if newHeadBlock == nil {
newHeadBlock = bc.genesisBlock
} else {
if _, err := state.New(newHeadBlock.Root(), bc.stateCache); err != nil {
if _, err := state.New(newHeadBlock.Root(), bc.stateCache, bc.snaps); err != nil {
// Rewound state missing, rolled back to before pivot, reset to genesis
newHeadBlock = bc.genesisBlock
}
@ -524,7 +535,7 @@ func (bc *BlockChain) State() (*state.StateDB, error) {
// StateAt returns a new mutable state based on a particular point in time.
func (bc *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) {
return state.New(root, bc.stateCache)
return state.New(root, bc.stateCache, bc.snaps)
}
// StateCache returns the caching database underpinning the blockchain instance.
@ -576,7 +587,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
func (bc *BlockChain) repair(head **types.Block) error {
for {
// Abort if we've rewound to a head block that does have associated state
if _, err := state.New((*head).Root(), bc.stateCache); err == nil {
if _, err := state.New((*head).Root(), bc.stateCache, bc.snaps); err == nil {
log.Info("Rewound blockchain to past state", "number", (*head).Number(), "hash", (*head).Hash())
return nil
}
@ -839,6 +850,10 @@ func (bc *BlockChain) Stop() {
bc.wg.Wait()
// Ensure that the entirety of the state snapshot is journalled to disk.
if err := bc.snaps.Journal(bc.CurrentBlock().Root()); err != nil {
log.Error("Failed to journal state snapshot", "err", err)
}
// Ensure the state of a recent block is also stored to disk before exiting.
// We're writing three different states to catch different restart scenarios:
// - HEAD: So we don't need to reprocess any blocks in the general case
@ -1647,7 +1662,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err := state.New(parent.Root, bc.stateCache)
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return it.index, err
}
@ -1656,9 +1671,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
var followupInterrupt uint32
if !bc.cacheConfig.TrieCleanNoPrefetch {
if followup, err := it.peek(); followup != nil && err == nil {
throwaway, _ := state.New(parent.Root, bc.stateCache)
throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)
go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, interrupt)
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
blockPrefetchExecuteTimer.Update(time.Since(start))
if atomic.LoadUint32(interrupt) == 1 {
@ -1676,14 +1691,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
return it.index, err
}
// Update the metrics touched during block processing
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete, we can mark them
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete, we can mark them
triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
trieproc := statedb.AccountReads + statedb.AccountUpdates
trieproc += statedb.StorageReads + statedb.StorageUpdates
trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates
trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates
blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash)
@ -1712,10 +1729,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
atomic.StoreUint32(&followupInterrupt, 1)
// Update the metrics touched during block commit
accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them
blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits)
blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits)
blockInsertTimer.UpdateSince(start)
switch status {