core, eth, ethdb, trie: simplify range proofs

This commit is contained in:
Péter Szilágyi
2021-04-28 23:09:15 +03:00
parent a81cf0d2b3
commit fae165a5de
12 changed files with 149 additions and 237 deletions

View File

@ -202,9 +202,8 @@ type storageResponse struct {
accounts []common.Hash // Account hashes requested, may be only partially filled
roots []common.Hash // Storage roots requested, may be only partially filled
hashes [][]common.Hash // Storage slot hashes in the returned range
slots [][][]byte // Storage slot values in the returned range
nodes []ethdb.KeyValueStore // Database containing the reconstructed trie nodes
hashes [][]common.Hash // Storage slot hashes in the returned range
slots [][][]byte // Storage slot values in the returned range
cont bool // Whether the last storage range has a continuation
}
@ -680,12 +679,22 @@ func (s *Syncer) loadSyncStatus() {
}
s.tasks = progress.Tasks
for _, task := range s.tasks {
task.genBatch = s.db.NewBatch()
task.genBatch = ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
task.genTrie = trie.NewStackTrie(task.genBatch)
for _, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
subtask.genBatch = s.db.NewBatch()
subtask.genBatch = ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
subtask.genTrie = trie.NewStackTrie(task.genBatch)
}
}
@ -729,7 +738,12 @@ func (s *Syncer) loadSyncStatus() {
// Make sure we don't overflow if the step is not a proper divisor
last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
}
batch := s.db.NewBatch()
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
s.tasks = append(s.tasks, &accountTask{
Next: next,
Last: last,
@ -746,19 +760,14 @@ func (s *Syncer) loadSyncStatus() {
func (s *Syncer) saveSyncStatus() {
// Serialize any partial progress to disk before spinning down
for _, task := range s.tasks {
keys, bytes := task.genBatch.KeyCount(), task.genBatch.ValueSize()
if err := task.genBatch.Write(); err != nil {
log.Error("Failed to persist account slots", "err", err)
}
s.accountBytes += common.StorageSize(keys*common.HashLength + bytes)
for _, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
keys, bytes := subtask.genBatch.KeyCount(), subtask.genBatch.ValueSize()
if err := subtask.genBatch.Write(); err != nil {
log.Error("Failed to persist storage slots", "err", err)
}
s.accountBytes += common.StorageSize(keys*common.HashLength + bytes)
}
}
}
@ -1763,12 +1772,15 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
if res.subTask != nil {
res.subTask.req = nil
}
batch := s.db.NewBatch()
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
var (
slots int
nodes int
bytes common.StorageSize
slots int
oldStorageBytes = s.storageBytes
)
// Iterate over all the accounts and reconstruct their storage tries from the
// delivered slots
@ -1829,7 +1841,12 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
r := newHashRange(lastKey, chunks)
// Our first task is the one that was just filled by this response.
batch := s.db.NewBatch()
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
tasks = append(tasks, &storageTask{
Next: common.Hash{},
Last: r.End(),
@ -1838,7 +1855,12 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
genTrie: trie.NewStackTrie(batch),
})
for r.Next() {
batch := s.db.NewBatch()
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
tasks = append(tasks, &storageTask{
Next: r.Start(),
Last: r.End(),
@ -1883,27 +1905,23 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
}
}
}
// Iterate over all the reconstructed trie nodes and push them to disk
// if the contract is fully delivered. If it's chunked, the trie nodes
// will be reconstructed later.
// Iterate over all the complete contracts, reconstruct the trie nodes and
// push them to disk. If the contract is chunked, the trie nodes will be
// reconstructed later.
slots += len(res.hashes[i])
if i < len(res.hashes)-1 || res.subTask == nil {
it := res.nodes[i].NewIterator(nil, nil)
for it.Next() {
batch.Put(it.Key(), it.Value())
bytes += common.StorageSize(common.HashLength + len(it.Value()))
nodes++
tr := trie.NewStackTrie(batch)
for j := 0; j < len(res.hashes[i]); j++ {
tr.Update(res.hashes[i][j][:], res.slots[i][j])
}
it.Release()
tr.Commit()
}
// Persist the received storage segements. These flat state maybe
// outdated during the sync, but it can be fixed later during the
// snapshot generation.
for j := 0; j < len(res.hashes[i]); j++ {
rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j])
bytes += common.StorageSize(1 + 2*common.HashLength + len(res.slots[i][j]))
// If we're storing large contracts, generate the trie nodes
// on the fly to not trash the gluing points
@ -1926,15 +1944,11 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
}
}
}
if data := res.subTask.genBatch.ValueSize(); data > ethdb.IdealBatchSize || res.subTask.done {
keys := res.subTask.genBatch.KeyCount()
if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize || res.subTask.done {
if err := res.subTask.genBatch.Write(); err != nil {
log.Error("Failed to persist stack slots", "err", err)
}
res.subTask.genBatch.Reset()
bytes += common.StorageSize(keys*common.HashLength + data)
nodes += keys
}
}
// Flush anything written just now and update the stats
@ -1942,9 +1956,8 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
log.Crit("Failed to persist storage slots", "err", err)
}
s.storageSynced += uint64(slots)
s.storageBytes += bytes
log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "nodes", nodes, "bytes", bytes)
log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "bytes", s.storageBytes-oldStorageBytes)
// If this delivery completed the last pending task, forward the account task
// to the next chunk
@ -2042,18 +2055,20 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
// Persist the received account segements. These flat state maybe
// outdated during the sync, but it can be fixed later during the
// snapshot generation.
var (
nodes int
bytes common.StorageSize
)
batch := s.db.NewBatch()
oldAccountBytes := s.accountBytes
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
for i, hash := range res.hashes {
if task.needCode[i] || task.needState[i] {
break
}
slim := snapshot.SlimAccountRLP(res.accounts[i].Nonce, res.accounts[i].Balance, res.accounts[i].Root, res.accounts[i].CodeHash)
rawdb.WriteAccountSnapshot(batch, hash, slim)
bytes += common.StorageSize(1 + common.HashLength + len(slim))
// If the task is complete, drop it into the stack trie to generate
// account trie nodes for it
@ -2069,7 +2084,6 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
if err := batch.Write(); err != nil {
log.Crit("Failed to persist accounts", "err", err)
}
s.accountBytes += bytes
s.accountSynced += uint64(len(res.accounts))
// Task filling persisted, push it the chunk marker forward to the first
@ -2091,17 +2105,13 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
log.Error("Failed to commit stack account", "err", err)
}
}
if data := task.genBatch.ValueSize(); data > ethdb.IdealBatchSize || task.done {
keys := task.genBatch.KeyCount()
if task.genBatch.ValueSize() > ethdb.IdealBatchSize || task.done {
if err := task.genBatch.Write(); err != nil {
log.Error("Failed to persist stack account", "err", err)
}
task.genBatch.Reset()
nodes += keys
bytes += common.StorageSize(keys*common.HashLength + data)
}
log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "nodes", nodes, "bytes", bytes)
log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "bytes", s.accountBytes-oldAccountBytes)
}
// OnAccounts is a callback method to invoke when a range of accounts are
@ -2176,7 +2186,7 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
if len(keys) > 0 {
end = keys[len(keys)-1]
}
_, cont, err := trie.VerifyRangeProof(root, req.origin[:], end, keys, accounts, proofdb)
cont, err := trie.VerifyRangeProof(root, req.origin[:], end, keys, accounts, proofdb)
if err != nil {
logger.Warn("Account range failed proof", "err", err)
// Signal this request as failed, and ready for rescheduling
@ -2393,10 +2403,8 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
s.lock.Unlock()
// Reconstruct the partial tries from the response and verify them
var (
dbs = make([]ethdb.KeyValueStore, len(hashes))
cont bool
)
var cont bool
for i := 0; i < len(hashes); i++ {
// Convert the keys and proofs into an internal format
keys := make([][]byte, len(hashes[i]))
@ -2413,7 +2421,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
if len(nodes) == 0 {
// No proof has been attached, the response must cover the entire key
// space and hash to the origin root.
dbs[i], _, err = trie.VerifyRangeProof(req.roots[i], nil, nil, keys, slots[i], nil)
_, err = trie.VerifyRangeProof(req.roots[i], nil, nil, keys, slots[i], nil)
if err != nil {
s.scheduleRevertStorageRequest(req) // reschedule request
logger.Warn("Storage slots failed proof", "err", err)
@ -2428,7 +2436,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
if len(keys) > 0 {
end = keys[len(keys)-1]
}
dbs[i], cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb)
cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb)
if err != nil {
s.scheduleRevertStorageRequest(req) // reschedule request
logger.Warn("Storage range failed proof", "err", err)
@ -2444,7 +2452,6 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
roots: req.roots,
hashes: hashes,
slots: slots,
nodes: dbs,
cont: cont,
}
select {