core, eth, trie: use common/prque (#17508)

This commit is contained in:
Wenbiao Zheng
2018-09-03 23:33:21 +08:00
committed by Felix Lange
parent 6fc8494620
commit 6a33954731
10 changed files with 30 additions and 327 deletions

View File

@ -26,10 +26,10 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
var (
@ -105,11 +105,11 @@ func newQueue() *queue {
headerPendPool: make(map[string]*fetchRequest),
headerContCh: make(chan bool),
blockTaskPool: make(map[common.Hash]*types.Header),
blockTaskQueue: prque.New(),
blockTaskQueue: prque.New(nil),
blockPendPool: make(map[string]*fetchRequest),
blockDonePool: make(map[common.Hash]struct{}),
receiptTaskPool: make(map[common.Hash]*types.Header),
receiptTaskQueue: prque.New(),
receiptTaskQueue: prque.New(nil),
receiptPendPool: make(map[string]*fetchRequest),
receiptDonePool: make(map[common.Hash]struct{}),
resultCache: make([]*fetchResult, blockCacheItems),
@ -277,7 +277,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
}
// Schedule all the header retrieval tasks for the skeleton assembly
q.headerTaskPool = make(map[uint64]*types.Header)
q.headerTaskQueue = prque.New()
q.headerTaskQueue = prque.New(nil)
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
q.headerProced = 0
@ -288,7 +288,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
index := from + uint64(i*MaxHeaderFetch)
q.headerTaskPool[index] = header
q.headerTaskQueue.Push(index, -float32(index))
q.headerTaskQueue.Push(index, -int64(index))
}
}
@ -334,11 +334,11 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
}
// Queue the header for content retrieval
q.blockTaskPool[hash] = header
q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
if q.mode == FastSync {
q.receiptTaskPool[hash] = header
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
inserts = append(inserts, header)
q.headerHead = hash
@ -436,7 +436,7 @@ func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
}
// Merge all the skipped batches back
for _, from := range skip {
q.headerTaskQueue.Push(from, -float32(from))
q.headerTaskQueue.Push(from, -int64(from))
}
// Assemble and return the block download request
if send == 0 {
@ -542,7 +542,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
}
// Merge all the skipped headers back
for _, header := range skip {
taskQueue.Push(header, -float32(header.Number.Uint64()))
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
if progress {
// Wake WaitResults, resultCache was modified
@ -585,10 +585,10 @@ func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool m
defer q.lock.Unlock()
if request.From > 0 {
taskQueue.Push(request.From, -float32(request.From))
taskQueue.Push(request.From, -int64(request.From))
}
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(pendPool, request.Peer.id)
}
@ -602,13 +602,13 @@ func (q *queue) Revoke(peerID string) {
if request, ok := q.blockPendPool[peerID]; ok {
for _, header := range request.Headers {
q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(q.blockPendPool, peerID)
}
if request, ok := q.receiptPendPool[peerID]; ok {
for _, header := range request.Headers {
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(q.receiptPendPool, peerID)
}
@ -657,10 +657,10 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
// Return any non satisfied requests to the pool
if request.From > 0 {
taskQueue.Push(request.From, -float32(request.From))
taskQueue.Push(request.From, -int64(request.From))
}
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
// Add the peer to the expiry report along the number of failed requests
expiries[id] = len(request.Headers)
@ -731,7 +731,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
}
miss[request.From] = struct{}{}
q.headerTaskQueue.Push(request.From, -float32(request.From))
q.headerTaskQueue.Push(request.From, -int64(request.From))
return 0, errors.New("delivery not accepted")
}
// Clean up a successful fetch and try to deliver any sub-results
@ -854,7 +854,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
// Return all failed or missing fetches to the queue
for _, header := range request.Headers {
if header != nil {
taskQueue.Push(header, -float32(header.Number.Uint64()))
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
}
// Wake up WaitResults