eth/downloader: concurrent receipt and state processing

This commit is contained in:
Péter Szilágyi
2015-10-07 12:14:30 +03:00
parent ab27bee25a
commit b97e34a8e4
15 changed files with 526 additions and 269 deletions

View File

@ -830,7 +830,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
}
// If there's nothing more to fetch, wait or terminate
if d.queue.PendingBlocks() == 0 {
if d.queue.InFlight() == 0 && finished {
if !d.queue.InFlightBlocks() && finished {
glog.V(logger.Debug).Infof("Block fetching completed")
return nil
}
@ -864,7 +864,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
}
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
if !throttled && d.queue.InFlight() == 0 && len(idles) == total {
if !throttled && !d.queue.InFlightBlocks() && len(idles) == total {
return errPeersUnavailable
}
}
@ -1124,7 +1124,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
if d.mode == FastSync || d.mode == LightSync {
if n, err := d.insertHeaders(headers, false); err != nil {
if n, err := d.insertHeaders(headers, headerCheckFrequency); err != nil {
glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headers[n].Number, headers[n].Hash().Bytes()[:4], err)
return errInvalidChain
}
@ -1194,8 +1194,8 @@ func (d *Downloader) fetchBodies(from uint64) error {
setIdle = func(p *peer) { p.SetBlocksIdle() }
)
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBodies, d.bodyFetchHook,
fetch, d.queue.CancelBodies, capacity, getIdles, setIdle, "Body")
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBodies,
d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, getIdles, setIdle, "Body")
glog.V(logger.Debug).Infof("Block body download terminated: %v", err)
return err
@ -1218,8 +1218,8 @@ func (d *Downloader) fetchReceipts(from uint64) error {
setIdle = func(p *peer) { p.SetReceiptsIdle() }
)
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.ThrottleReceipts, d.queue.ReserveReceipts, d.receiptFetchHook,
fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt")
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ThrottleReceipts, d.queue.ReserveReceipts,
d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt")
glog.V(logger.Debug).Infof("Receipt download terminated: %v", err)
return err
@ -1234,15 +1234,29 @@ func (d *Downloader) fetchNodeData() error {
var (
deliver = func(packet dataPack) error {
start := time.Now()
done, found, err := d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states)
return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
if err != nil {
// If the node data processing failed, the root hash is very wrong, abort
glog.V(logger.Error).Infof("peer %d: state processing failed: %v", packet.PeerId(), err)
d.cancel()
return
}
// Processing succeeded, notify state fetcher and processor of continuation
if d.queue.PendingNodeData() == 0 {
go d.process()
} else {
select {
case d.stateWakeCh <- true:
default:
}
}
// Log a message to the user and return
d.syncStatsLock.Lock()
defer d.syncStatsLock.Unlock()
d.syncStatsLock.Lock()
totalDone, totalKnown := d.syncStatsStateDone+uint64(done), d.syncStatsStateTotal+uint64(found)
d.syncStatsStateDone, d.syncStatsStateTotal = totalDone, totalKnown
d.syncStatsLock.Unlock()
glog.V(logger.Info).Infof("imported %d [%d / %d] state entries in %v.", done, totalDone, totalKnown, time.Since(start))
return err
d.syncStatsStateDone += uint64(delivered)
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
})
}
expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) }
throttle = func() bool { return false }
@ -1254,8 +1268,8 @@ func (d *Downloader) fetchNodeData() error {
setIdle = func(p *peer) { p.SetNodeDataIdle() }
)
err := d.fetchParts(errCancelReceiptFetch, d.stateCh, deliver, d.stateWakeCh, expire,
d.queue.PendingNodeData, throttle, reserve, nil, fetch, d.queue.CancelNodeData,
capacity, d.peers.ReceiptIdlePeers, setIdle, "State")
d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
d.queue.CancelNodeData, capacity, d.peers.ReceiptIdlePeers, setIdle, "State")
glog.V(logger.Debug).Infof("Node state data download terminated: %v", err)
return err
@ -1265,8 +1279,9 @@ func (d *Downloader) fetchNodeData() error {
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool,
expire func() []string, pending func() int, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header),
fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, idle func() ([]*peer, int), setIdle func(*peer), kind string) error {
expire func() []string, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
idle func() ([]*peer, int), setIdle func(*peer), kind string) error {
// Create a ticker to detect expired retreival tasks
ticker := time.NewTicker(100 * time.Millisecond)
@ -1378,14 +1393,14 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
}
// If there's nothing more to fetch, wait or terminate
if pending() == 0 {
if d.queue.InFlight() == 0 && finished {
if !inFlight() && finished {
glog.V(logger.Debug).Infof("%s fetching completed", kind)
return nil
}
break
}
// Send a download request to all idle peers, until throttled
progressed, throttled := false, false
progressed, throttled, running := false, false, inFlight()
idles, total := idle()
for _, peer := range idles {
@ -1423,10 +1438,11 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
glog.V(logger.Error).Infof("%v: %s fetch failed, rescheduling", peer, strings.ToLower(kind))
cancel(request)
}
running = true
}
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
if !progressed && !throttled && d.queue.InFlight() == 0 && len(idles) == total {
if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
return errPeersUnavailable
}
}
@ -1514,12 +1530,12 @@ func (d *Downloader) process() {
)
switch {
case len(headers) > 0:
index, err = d.insertHeaders(headers, true)
index, err = d.insertHeaders(headers, headerCheckFrequency)
case len(receipts) > 0:
index, err = d.insertReceipts(blocks, receipts)
if err == nil && blocks[len(blocks)-1].NumberU64() == d.queue.fastSyncPivot {
err = d.commitHeadBlock(blocks[len(blocks)-1].Hash())
index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash())
}
default:
index, err = d.insertBlocks(blocks)

View File

@ -268,7 +268,7 @@ func (dl *downloadTester) getTd(hash common.Hash) *big.Int {
}
// insertHeaders injects a new batch of headers into the simulated chain.
func (dl *downloadTester) insertHeaders(headers []*types.Header, verify bool) (int, error) {
func (dl *downloadTester) insertHeaders(headers []*types.Header, checkFreq int) (int, error) {
dl.lock.Lock()
defer dl.lock.Unlock()
@ -1262,7 +1262,7 @@ func testForkedSyncBoundaries(t *testing.T, protocol int, mode SyncMode) {
pending.Wait()
// Simulate a successful sync above the fork
tester.downloader.syncStatsOrigin = tester.downloader.syncStatsHeight
tester.downloader.syncStatsChainOrigin = tester.downloader.syncStatsChainHeight
// Synchronise with the second fork and check boundary resets
tester.newPeer("fork B", protocol, hashesB, headersB, blocksB, receiptsB)

View File

@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
@ -93,8 +94,10 @@ type queue struct {
stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for
statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations
stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly
stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator
stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly
stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator
stateProcessors int32 // [eth/63] Number of currently running state processors
stateSchedLock sync.RWMutex // [eth/63] Lock serializing access to the state scheduler
resultCache []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block-chain
@ -175,18 +178,40 @@ func (q *queue) PendingReceipts() int {
// PendingNodeData retrieves the number of node data entries pending for retrieval.
func (q *queue) PendingNodeData() int {
q.lock.RLock()
defer q.lock.RUnlock()
q.stateSchedLock.RLock()
defer q.stateSchedLock.RUnlock()
return q.stateTaskQueue.Size()
if q.stateScheduler != nil {
return q.stateScheduler.Pending()
}
return 0
}
// InFlight retrieves the number of fetch requests currently in flight.
func (q *queue) InFlight() int {
// InFlightBlocks retrieves whether there are block fetch requests currently in
// flight.
func (q *queue) InFlightBlocks() bool {
q.lock.RLock()
defer q.lock.RUnlock()
return len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
return len(q.blockPendPool) > 0
}
// InFlightReceipts retrieves whether there are receipt fetch requests currently
// in flight.
func (q *queue) InFlightReceipts() bool {
q.lock.RLock()
defer q.lock.RUnlock()
return len(q.receiptPendPool) > 0
}
// InFlightNodeData retrieves whether there are node data entry fetch requests
// currently in flight.
func (q *queue) InFlightNodeData() bool {
q.lock.RLock()
defer q.lock.RUnlock()
return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0
}
// Idle returns if the queue is fully idle or has some data still inside. This
@ -199,6 +224,12 @@ func (q *queue) Idle() bool {
pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
cached := len(q.blockDonePool) + len(q.receiptDonePool)
q.stateSchedLock.RLock()
if q.stateScheduler != nil {
queued += q.stateScheduler.Pending()
}
q.stateSchedLock.RUnlock()
return (queued + pending + cached) == 0
}
@ -299,12 +330,9 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
}
if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot {
// Pivoting point of the fast sync, retrieve the state tries
q.stateSchedLock.Lock()
q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase)
for _, hash := range q.stateScheduler.Missing(0) {
q.stateTaskPool[hash] = q.stateTaskIndex
q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex))
q.stateTaskIndex++
}
q.stateSchedLock.Unlock()
}
inserts = append(inserts, header)
q.headerHead = hash
@ -325,8 +353,13 @@ func (q *queue) GetHeadResult() *fetchResult {
if q.resultCache[0].Pending > 0 {
return nil
}
if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot && len(q.stateTaskPool) > 0 {
return nil
if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot {
if len(q.stateTaskPool) > 0 {
return nil
}
if q.PendingNodeData() > 0 {
return nil
}
}
return q.resultCache[0]
}
@ -345,8 +378,13 @@ func (q *queue) TakeResults() []*fetchResult {
break
}
// The fast sync pivot block may only be processed after state fetch completes
if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot && len(q.stateTaskPool) > 0 {
break
if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot {
if len(q.stateTaskPool) > 0 {
break
}
if q.PendingNodeData() > 0 {
break
}
}
// If we've just inserted the fast sync pivot, stop as the following batch needs different insertion
if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 {
@ -373,26 +411,34 @@ func (q *queue) TakeResults() []*fetchResult {
// ReserveBlocks reserves a set of block hashes for the given peer, skipping any
// previously failed download.
func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest {
return q.reserveHashes(p, count, q.hashQueue, q.blockPendPool, len(q.resultCache)-len(q.blockDonePool))
return q.reserveHashes(p, count, q.hashQueue, nil, q.blockPendPool, len(q.resultCache)-len(q.blockDonePool))
}
// ReserveNodeData reserves a set of node data hashes for the given peer, skipping
// any previously failed download.
func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest {
return q.reserveHashes(p, count, q.stateTaskQueue, q.statePendPool, 0)
// Create a task generator to fetch status-fetch tasks if all schedules ones are done
generator := func(max int) {
q.stateSchedLock.Lock()
defer q.stateSchedLock.Unlock()
for _, hash := range q.stateScheduler.Missing(max) {
q.stateTaskPool[hash] = q.stateTaskIndex
q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex))
q.stateTaskIndex++
}
}
return q.reserveHashes(p, count, q.stateTaskQueue, generator, q.statePendPool, count)
}
// reserveHashes reserves a set of hashes for the given peer, skipping previously
// failed ones.
func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, maxPending int) *fetchRequest {
func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGen func(int), pendPool map[string]*fetchRequest, maxPending int) *fetchRequest {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the pool has been depleted, or if the peer's already
// downloading something (sanity check not to corrupt state)
if taskQueue.Empty() {
return nil
}
// Short circuit if the peer's already downloading something (sanity check not
// to corrupt state)
if _, ok := pendPool[p.id]; ok {
return nil
}
@ -403,6 +449,13 @@ func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, pendPo
allowance -= len(request.Hashes)
}
}
// If there's a task generator, ask it to fill our task queue
if taskGen != nil && taskQueue.Size() < allowance {
taskGen(allowance - taskQueue.Size())
}
if taskQueue.Empty() {
return nil
}
// Retrieve a batch of hashes, skipping previously failed ones
send := make(map[common.Hash]int)
skip := make(map[common.Hash]int)
@ -809,14 +862,14 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
}
// DeliverNodeData injects a node state data retrieval response into the queue.
func (q *queue) DeliverNodeData(id string, data [][]byte) (int, int, error) {
func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) error {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the data was never requested
request := q.statePendPool[id]
if request == nil {
return 0, 0, errNoFetchesPending
return errNoFetchesPending
}
stateReqTimer.UpdateSince(request.Time)
delete(q.statePendPool, id)
@ -829,7 +882,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte) (int, int, error) {
}
// Iterate over the downloaded data and verify each of them
errs := make([]error, 0)
processed := 0
process := []trie.SyncResult{}
for _, blob := range data {
// Skip any blocks that were not requested
hash := common.BytesToHash(crypto.Sha3(blob))
@ -837,43 +890,60 @@ func (q *queue) DeliverNodeData(id string, data [][]byte) (int, int, error) {
errs = append(errs, fmt.Errorf("non-requested state data %x", hash))
continue
}
// Inject the next state trie item into the database
if err := q.stateScheduler.Process([]trie.SyncResult{{hash, blob}}); err != nil {
errs = []error{err}
break
}
processed++
// Inject the next state trie item into the processing queue
process = append(process, trie.SyncResult{hash, blob})
delete(request.Hashes, hash)
delete(q.stateTaskPool, hash)
}
// Start the asynchronous node state data injection
atomic.AddInt32(&q.stateProcessors, 1)
go func() {
defer atomic.AddInt32(&q.stateProcessors, -1)
q.deliverNodeData(process, callback)
}()
// Return all failed or missing fetches to the queue
for hash, index := range request.Hashes {
q.stateTaskQueue.Push(hash, float32(index))
}
// Also enqueue any newly required state trie nodes
discovered := 0
if len(q.stateTaskPool) < maxQueuedStates {
for _, hash := range q.stateScheduler.Missing(4 * MaxStateFetch) {
q.stateTaskPool[hash] = q.stateTaskIndex
q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex))
q.stateTaskIndex++
discovered++
}
}
// If none of the data items were good, it's a stale delivery
switch {
case len(errs) == 0:
return processed, discovered, nil
return nil
case len(errs) == len(request.Hashes):
return processed, discovered, errStaleDelivery
return errStaleDelivery
default:
return processed, discovered, fmt.Errorf("multiple failures: %v", errs)
return fmt.Errorf("multiple failures: %v", errs)
}
}
// deliverNodeData is the asynchronous node data processor that injects a batch
// of sync results into the state scheduler.
func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) {
// Process results one by one to permit task fetches in between
for i, result := range results {
q.stateSchedLock.Lock()
if q.stateScheduler == nil {
// Syncing aborted since this async delivery started, bail out
q.stateSchedLock.Unlock()
callback(errNoFetchesPending, i)
return
}
if _, err := q.stateScheduler.Process([]trie.SyncResult{result}); err != nil {
// Processing a state result failed, bail out
q.stateSchedLock.Unlock()
callback(err, i)
return
}
// Item processing succeeded, release the lock (temporarily)
q.stateSchedLock.Unlock()
}
callback(nil, len(results))
}
// Prepare configures the result cache to allow accepting and caching inbound
// fetch results.
func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64) {

View File

@ -52,7 +52,7 @@ type headBlockCommitterFn func(common.Hash) error
type tdRetrievalFn func(common.Hash) *big.Int
// headerChainInsertFn is a callback type to insert a batch of headers into the local chain.
type headerChainInsertFn func([]*types.Header, bool) (int, error)
type headerChainInsertFn func([]*types.Header, int) (int, error)
// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain.
type blockChainInsertFn func(types.Blocks) (int, error)