core, eth: faster snapshot generation (#22504)
* eth/protocols: persist received state segments * core: initial implementation * core/state/snapshot: add tests * core, eth: updates * eth/protocols/snapshot: count flat state size * core/state: add metrics * core/state/snapshot: skip unnecessary deletion * core/state/snapshot: rename * core/state/snapshot: use the global batch * core/state/snapshot: add logs and fix wiping * core/state/snapshot: fix * core/state/snapshot: save generation progress even if the batch is empty * core/state/snapshot: fixes * core/state/snapshot: fix initial account range length * core/state/snapshot: fix initial account range * eth/protocols/snap: store flat states during the healing * eth/protocols/snap: print logs * core/state/snapshot: refactor (#4) * core/state/snapshot: refactor * core/state/snapshot: tiny fix and polish Co-authored-by: rjl493456442 <garyrong0905@gmail.com> * core, eth: fixes * core, eth: fix healing writer * core, trie, eth: fix paths * eth/protocols/snap: fix encoding * eth, core: add debug log * core/state/generate: release iterator asap (#5) core/state/snapshot: less copy core/state/snapshot: revert split loop core/state/snapshot: handle storage becoming empty, improve test robustness core/state: test modified codehash core/state/snapshot: polish * core/state/snapshot: optimize stats counter * core, eth: add metric * core/state/snapshot: update comments * core/state/snapshot: improve tests * core/state/snapshot: replace secure trie with standard trie * core/state/snapshot: wrap return as the struct * core/state/snapshot: skip wiping correct states * core/state/snapshot: updates * core/state/snapshot: fixes * core/state/snapshot: fix panic due to reference flaw in closure * core/state/snapshot: fix errors in state generation logic + fix log output * core/state/snapshot: remove an error case * core/state/snapshot: fix condition-check for exhausted snap state * core/state/snapshot: use stackTrie for small tries * core/state/snapshot: don't resolve small storage tries in vain * core/state/snapshot: properly clean up storage of deleted accounts * core/state/snapshot: avoid RLP-encoding in some cases + minor nitpicks * core/state/snapshot: fix error (+testcase) * core/state/snapshot: clean up tests a bit * core/state/snapshot: work in progress on better tests * core/state/snapshot: polish code * core/state/snapshot: fix trie iteration abortion trigger * core/state/snapshot: fixes flaws * core/state/snapshot: remove panic * core/state/snapshot: fix abort * core/state/snapshot: more tests (plus failing testcase) * core/state/snapshot: more testcases + fix for failing test * core/state/snapshot: testcase for malformed data * core/state/snapshot: some test nitpicks * core/state/snapshot: improvements to logging * core/state/snapshot: testcase to demo error in abortion * core/state/snapshot: fix abortion * cmd/geth: make verify-state report the root * trie: fix failing test * core/state/snapshot: add timer metrics * core/state/snapshot: fix metrics * core/state/snapshot: udpate tests * eth/protocols/snap: write snapshot account even if code or state is needed * core/state/snapshot: fix diskmore check * core/state/snapshot: review fixes * core/state/snapshot: improve error message * cmd/geth: rename 'error' to 'err' in logs * core/state/snapshot: fix some review concerns * core/state/snapshot, eth/protocols/snap: clear snapshot marker when starting/resuming snap sync * core: add error log * core/state/snapshot: use proper timers for metrics collection * core/state/snapshot: address some review concerns * eth/protocols/snap: improved log message * eth/protocols/snap: fix heal logs to condense infos * core/state/snapshot: wait for generator termination before restarting * core/state/snapshot: revert timers to counters to track total time Co-authored-by: Martin Holst Swende <martin@swende.se> Co-authored-by: Péter Szilágyi <peterke@gmail.com>
This commit is contained in:
@ -29,6 +29,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/ethereum/go-ethereum/core/state/snapshot"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
@ -51,7 +52,7 @@ const (
|
||||
// maxRequestSize is the maximum number of bytes to request from a remote peer.
|
||||
maxRequestSize = 512 * 1024
|
||||
|
||||
// maxStorageSetRequestCountis th maximum number of contracts to request the
|
||||
// maxStorageSetRequestCount is the maximum number of contracts to request the
|
||||
// storage of in a single query. If this number is too low, we're not filling
|
||||
// responses fully and waste round trip times. If it's too high, we're capping
|
||||
// responses and waste bandwidth.
|
||||
@ -435,9 +436,14 @@ type Syncer struct {
|
||||
bytecodeHealDups uint64 // Number of bytecodes already processed
|
||||
bytecodeHealNops uint64 // Number of bytecodes not requested
|
||||
|
||||
startTime time.Time // Time instance when snapshot sync started
|
||||
startAcc common.Hash // Account hash where sync started from
|
||||
logTime time.Time // Time instance when status was last reported
|
||||
stateWriter ethdb.Batch // Shared batch writer used for persisting raw states
|
||||
accountHealed uint64 // Number of accounts downloaded during the healing stage
|
||||
accountHealedBytes common.StorageSize // Number of raw account bytes persisted to disk during the healing stage
|
||||
storageHealed uint64 // Number of storage slots downloaded during the healing stage
|
||||
storageHealedBytes common.StorageSize // Number of raw storage bytes persisted to disk during the healing stage
|
||||
|
||||
startTime time.Time // Time instance when snapshot sync started
|
||||
logTime time.Time // Time instance when status was last reported
|
||||
|
||||
pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown
|
||||
lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, root)
|
||||
@ -477,6 +483,7 @@ func NewSyncer(db ethdb.KeyValueStore) *Syncer {
|
||||
bytecodeHealReqFails: make(chan *bytecodeHealRequest),
|
||||
trienodeHealResps: make(chan *trienodeHealResponse),
|
||||
bytecodeHealResps: make(chan *bytecodeHealResponse),
|
||||
stateWriter: db.NewBatch(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -544,7 +551,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
|
||||
s.lock.Lock()
|
||||
s.root = root
|
||||
s.healer = &healTask{
|
||||
scheduler: state.NewStateSync(root, s.db, nil),
|
||||
scheduler: state.NewStateSync(root, s.db, nil, s.onHealState),
|
||||
trieTasks: make(map[common.Hash]trie.SyncPath),
|
||||
codeTasks: make(map[common.Hash]struct{}),
|
||||
}
|
||||
@ -560,6 +567,11 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
|
||||
log.Debug("Snapshot sync already completed")
|
||||
return nil
|
||||
}
|
||||
// If sync is still not finished, we need to ensure that any marker is wiped.
|
||||
// Otherwise, it may happen that requests for e.g. genesis-data is delivered
|
||||
// from the snapshot data, instead of from the trie
|
||||
snapshot.ClearSnapshotMarker(s.db)
|
||||
|
||||
defer func() { // Persist any progress, independent of failure
|
||||
for _, task := range s.tasks {
|
||||
s.forwardAccountTask(task)
|
||||
@ -569,6 +581,14 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
|
||||
}()
|
||||
|
||||
log.Debug("Starting snapshot sync cycle", "root", root)
|
||||
|
||||
// Flush out the last committed raw states
|
||||
defer func() {
|
||||
if s.stateWriter.ValueSize() > 0 {
|
||||
s.stateWriter.Write()
|
||||
s.stateWriter.Reset()
|
||||
}
|
||||
}()
|
||||
defer s.report(true)
|
||||
|
||||
// Whether sync completed or not, disregard any future packets
|
||||
@ -1694,7 +1714,7 @@ func (s *Syncer) processBytecodeResponse(res *bytecodeResponse) {
|
||||
// processStorageResponse integrates an already validated storage response
|
||||
// into the account tasks.
|
||||
func (s *Syncer) processStorageResponse(res *storageResponse) {
|
||||
// Switch the suntask from pending to idle
|
||||
// Switch the subtask from pending to idle
|
||||
if res.subTask != nil {
|
||||
res.subTask.req = nil
|
||||
}
|
||||
@ -1826,6 +1846,14 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
|
||||
nodes++
|
||||
}
|
||||
it.Release()
|
||||
|
||||
// Persist the received storage segements. These flat state maybe
|
||||
// outdated during the sync, but it can be fixed later during the
|
||||
// snapshot generation.
|
||||
for j := 0; j < len(res.hashes[i]); j++ {
|
||||
rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j])
|
||||
bytes += common.StorageSize(1 + 2*common.HashLength + len(res.slots[i][j]))
|
||||
}
|
||||
}
|
||||
if err := batch.Write(); err != nil {
|
||||
log.Crit("Failed to persist storage slots", "err", err)
|
||||
@ -1983,6 +2011,14 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
|
||||
}
|
||||
it.Release()
|
||||
|
||||
// Persist the received account segements. These flat state maybe
|
||||
// outdated during the sync, but it can be fixed later during the
|
||||
// snapshot generation.
|
||||
for i, hash := range res.hashes {
|
||||
blob := snapshot.SlimAccountRLP(res.accounts[i].Nonce, res.accounts[i].Balance, res.accounts[i].Root, res.accounts[i].CodeHash)
|
||||
rawdb.WriteAccountSnapshot(batch, hash, blob)
|
||||
bytes += common.StorageSize(1 + common.HashLength + len(blob))
|
||||
}
|
||||
if err := batch.Write(); err != nil {
|
||||
log.Crit("Failed to persist accounts", "err", err)
|
||||
}
|
||||
@ -2569,6 +2605,33 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e
|
||||
return nil
|
||||
}
|
||||
|
||||
// onHealState is a callback method to invoke when a flat state(account
|
||||
// or storage slot) is downloded during the healing stage. The flat states
|
||||
// can be persisted blindly and can be fixed later in the generation stage.
|
||||
// Note it's not concurrent safe, please handle the concurrent issue outside.
|
||||
func (s *Syncer) onHealState(paths [][]byte, value []byte) error {
|
||||
if len(paths) == 1 {
|
||||
var account state.Account
|
||||
if err := rlp.DecodeBytes(value, &account); err != nil {
|
||||
return nil
|
||||
}
|
||||
blob := snapshot.SlimAccountRLP(account.Nonce, account.Balance, account.Root, account.CodeHash)
|
||||
rawdb.WriteAccountSnapshot(s.stateWriter, common.BytesToHash(paths[0]), blob)
|
||||
s.accountHealed += 1
|
||||
s.accountHealedBytes += common.StorageSize(1 + common.HashLength + len(blob))
|
||||
}
|
||||
if len(paths) == 2 {
|
||||
rawdb.WriteStorageSnapshot(s.stateWriter, common.BytesToHash(paths[0]), common.BytesToHash(paths[1]), value)
|
||||
s.storageHealed += 1
|
||||
s.storageHealedBytes += common.StorageSize(1 + 2*common.HashLength + len(value))
|
||||
}
|
||||
if s.stateWriter.ValueSize() > ethdb.IdealBatchSize {
|
||||
s.stateWriter.Write() // It's fine to ignore the error here
|
||||
s.stateWriter.Reset()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// hashSpace is the total size of the 256 bit hash space for accounts.
|
||||
var hashSpace = new(big.Int).Exp(common.Big2, common.Big256, nil)
|
||||
|
||||
@ -2632,7 +2695,9 @@ func (s *Syncer) reportHealProgress(force bool) {
|
||||
var (
|
||||
trienode = fmt.Sprintf("%d@%v", s.trienodeHealSynced, s.trienodeHealBytes.TerminalString())
|
||||
bytecode = fmt.Sprintf("%d@%v", s.bytecodeHealSynced, s.bytecodeHealBytes.TerminalString())
|
||||
accounts = fmt.Sprintf("%d@%v", s.accountHealed, s.accountHealedBytes.TerminalString())
|
||||
storage = fmt.Sprintf("%d@%v", s.storageHealed, s.storageHealedBytes.TerminalString())
|
||||
)
|
||||
log.Info("State heal in progress", "nodes", trienode, "codes", bytecode,
|
||||
"pending", s.healer.scheduler.Pending())
|
||||
log.Info("State heal in progress", "accounts", accounts, "slots", storage,
|
||||
"codes", bytecode, "nodes", trienode, "pending", s.healer.scheduler.Pending())
|
||||
}
|
||||
|
Reference in New Issue
Block a user