core, trie: intermediate mempool between trie and database (#15857)

This commit reduces database I/O by not writing every state trie to disk.
This commit is contained in:
Péter Szilágyi
2018-02-05 18:40:32 +02:00
committed by Felix Lange
parent 59336283c0
commit 55599ee95d
69 changed files with 1958 additions and 1164 deletions

View File

@ -20,7 +20,6 @@ import (
"fmt"
"hash"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
@ -294,6 +293,9 @@ func (s *stateSync) loop() error {
case <-s.cancel:
return errCancelStateFetch
case <-s.d.cancelCh:
return errCancelStateFetch
case req := <-s.deliver:
// Response, disconnect or timeout triggered, drop the peer if stalling
log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut())
@ -304,15 +306,11 @@ func (s *stateSync) loop() error {
s.d.dropPeer(req.peer.id)
}
// Process all the received blobs and check for stale delivery
stale, err := s.process(req)
if err != nil {
if err := s.process(req); err != nil {
log.Warn("Node data write error", "err", err)
return err
}
// The the delivery contains requested data, mark the node idle (otherwise it's a timed out delivery)
if !stale {
req.peer.SetNodeDataIdle(len(req.response))
}
req.peer.SetNodeDataIdle(len(req.response))
}
}
return s.commit(true)
@ -352,6 +350,7 @@ func (s *stateSync) assignTasks() {
case s.d.trackStateReq <- req:
req.peer.FetchNodeData(req.items)
case <-s.cancel:
case <-s.d.cancelCh:
}
}
}
@ -390,7 +389,7 @@ func (s *stateSync) fillTasks(n int, req *stateReq) {
// process iterates over a batch of delivered state data, injecting each item
// into a running state sync, re-queuing any items that were requested but not
// delivered.
func (s *stateSync) process(req *stateReq) (bool, error) {
func (s *stateSync) process(req *stateReq) error {
// Collect processing stats and update progress if valid data was received
duplicate, unexpected := 0, 0
@ -401,7 +400,7 @@ func (s *stateSync) process(req *stateReq) (bool, error) {
}(time.Now())
// Iterate over all the delivered data and inject one-by-one into the trie
progress, stale := false, len(req.response) > 0
progress := false
for _, blob := range req.response {
prog, hash, err := s.processNodeData(blob)
@ -415,20 +414,12 @@ func (s *stateSync) process(req *stateReq) (bool, error) {
case trie.ErrAlreadyProcessed:
duplicate++
default:
return stale, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
return fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
}
// If the node delivered a requested item, mark the delivery non-stale
if _, ok := req.tasks[hash]; ok {
delete(req.tasks, hash)
stale = false
}
}
// If we're inside the critical section, reset fail counter since we progressed.
if progress && atomic.LoadUint32(&s.d.fsPivotFails) > 1 {
log.Trace("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&s.d.fsPivotFails))
atomic.StoreUint32(&s.d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
}
// Put unfulfilled tasks back into the retry queue
npeers := s.d.peers.Len()
for hash, task := range req.tasks {
@ -441,12 +432,12 @@ func (s *stateSync) process(req *stateReq) (bool, error) {
// If we've requested the node too many times already, it may be a malicious
// sync where nobody has the right data. Abort.
if len(task.attempts) >= npeers {
return stale, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
return fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
}
// Missing item, place into the retry queue.
s.tasks[hash] = task
}
return stale, nil
return nil
}
// processNodeData tries to inject a trie node data blob delivered from a remote