eth: request id dispatcher and direct req/reply APIs (#23576)
* eth: request ID based message dispatcher * eth: fix dispatcher cancellation, rework fetchers idleness tracker * eth/downloader: drop peers who refuse to serve advertised chains
This commit is contained in:
@ -54,8 +54,8 @@ var (
|
||||
// fetchRequest is a currently running data retrieval operation.
|
||||
type fetchRequest struct {
|
||||
Peer *peerConnection // Peer to which the request was sent
|
||||
From uint64 // [eth/62] Requested chain element index (used for skeleton fills only)
|
||||
Headers []*types.Header // [eth/62] Requested headers, sorted by request order
|
||||
From uint64 // Requested chain element index (used for skeleton fills only)
|
||||
Headers []*types.Header // Requested headers, sorted by request order
|
||||
Time time.Time // Time when the request was made
|
||||
}
|
||||
|
||||
@ -127,10 +127,12 @@ type queue struct {
|
||||
blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers
|
||||
blockTaskQueue *prque.Prque // Priority queue of the headers to fetch the blocks (bodies) for
|
||||
blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations
|
||||
blockWakeCh chan bool // Channel to notify the block fetcher of new tasks
|
||||
|
||||
receiptTaskPool map[common.Hash]*types.Header // Pending receipt retrieval tasks, mapping hashes to headers
|
||||
receiptTaskQueue *prque.Prque // Priority queue of the headers to fetch the receipts for
|
||||
receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations
|
||||
receiptWakeCh chan bool // Channel to notify when receipt fetcher of new tasks
|
||||
|
||||
resultCache *resultStore // Downloaded but not yet delivered fetch results
|
||||
resultSize common.StorageSize // Approximate size of a block (exponential moving average)
|
||||
@ -146,9 +148,11 @@ type queue struct {
|
||||
func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue {
|
||||
lock := new(sync.RWMutex)
|
||||
q := &queue{
|
||||
headerContCh: make(chan bool),
|
||||
headerContCh: make(chan bool, 1),
|
||||
blockTaskQueue: prque.New(nil),
|
||||
blockWakeCh: make(chan bool, 1),
|
||||
receiptTaskQueue: prque.New(nil),
|
||||
receiptWakeCh: make(chan bool, 1),
|
||||
active: sync.NewCond(lock),
|
||||
lock: lock,
|
||||
}
|
||||
@ -196,8 +200,8 @@ func (q *queue) PendingHeaders() int {
|
||||
return q.headerTaskQueue.Size()
|
||||
}
|
||||
|
||||
// PendingBlocks retrieves the number of block (body) requests pending for retrieval.
|
||||
func (q *queue) PendingBlocks() int {
|
||||
// PendingBodies retrieves the number of block body requests pending for retrieval.
|
||||
func (q *queue) PendingBodies() int {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
@ -212,15 +216,6 @@ func (q *queue) PendingReceipts() int {
|
||||
return q.receiptTaskQueue.Size()
|
||||
}
|
||||
|
||||
// InFlightHeaders retrieves whether there are header fetch requests currently
|
||||
// in flight.
|
||||
func (q *queue) InFlightHeaders() bool {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
return len(q.headerPendPool) > 0
|
||||
}
|
||||
|
||||
// InFlightBlocks retrieves whether there are block fetch requests currently in
|
||||
// flight.
|
||||
func (q *queue) InFlightBlocks() bool {
|
||||
@ -318,7 +313,7 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
|
||||
q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
|
||||
}
|
||||
// Queue for receipt retrieval
|
||||
if q.mode == FastSync && !header.EmptyReceipts() {
|
||||
if q.mode == SnapSync && !header.EmptyReceipts() {
|
||||
if _, ok := q.receiptTaskPool[hash]; ok {
|
||||
log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash)
|
||||
} else {
|
||||
@ -383,6 +378,13 @@ func (q *queue) Results(block bool) []*fetchResult {
|
||||
throttleThreshold := uint64((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize)
|
||||
throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold)
|
||||
|
||||
// With results removed from the cache, wake throttled fetchers
|
||||
for _, ch := range []chan bool{q.blockWakeCh, q.receiptWakeCh} {
|
||||
select {
|
||||
case ch <- true:
|
||||
default:
|
||||
}
|
||||
}
|
||||
// Log some info at certain times
|
||||
if time.Since(q.lastStatLog) > 60*time.Second {
|
||||
q.lastStatLog = time.Now()
|
||||
@ -503,7 +505,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
|
||||
// we can ask the resultcache if this header is within the
|
||||
// "prioritized" segment of blocks. If it is not, we need to throttle
|
||||
|
||||
stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == FastSync)
|
||||
stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == SnapSync)
|
||||
if stale {
|
||||
// Don't put back in the task queue, this item has already been
|
||||
// delivered upstream
|
||||
@ -566,40 +568,6 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
|
||||
return request, progress, throttled
|
||||
}
|
||||
|
||||
// CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue.
|
||||
func (q *queue) CancelHeaders(request *fetchRequest) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
q.cancel(request, q.headerTaskQueue, q.headerPendPool)
|
||||
}
|
||||
|
||||
// CancelBodies aborts a body fetch request, returning all pending headers to the
|
||||
// task queue.
|
||||
func (q *queue) CancelBodies(request *fetchRequest) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
q.cancel(request, q.blockTaskQueue, q.blockPendPool)
|
||||
}
|
||||
|
||||
// CancelReceipts aborts a body fetch request, returning all pending headers to
|
||||
// the task queue.
|
||||
func (q *queue) CancelReceipts(request *fetchRequest) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
|
||||
}
|
||||
|
||||
// Cancel aborts a fetch request, returning all pending hashes to the task queue.
|
||||
func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
|
||||
if request.From > 0 {
|
||||
taskQueue.Push(request.From, -int64(request.From))
|
||||
}
|
||||
for _, header := range request.Headers {
|
||||
taskQueue.Push(header, -int64(header.Number.Uint64()))
|
||||
}
|
||||
delete(pendPool, request.Peer.id)
|
||||
}
|
||||
|
||||
// Revoke cancels all pending requests belonging to a given peer. This method is
|
||||
// meant to be called during a peer drop to quickly reassign owned data fetches
|
||||
// to remaining nodes.
|
||||
@ -607,6 +575,10 @@ func (q *queue) Revoke(peerID string) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
if request, ok := q.headerPendPool[peerID]; ok {
|
||||
q.headerTaskQueue.Push(request.From, -int64(request.From))
|
||||
delete(q.headerPendPool, peerID)
|
||||
}
|
||||
if request, ok := q.blockPendPool[peerID]; ok {
|
||||
for _, header := range request.Headers {
|
||||
q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
|
||||
@ -621,62 +593,60 @@ func (q *queue) Revoke(peerID string) {
|
||||
}
|
||||
}
|
||||
|
||||
// ExpireHeaders checks for in flight requests that exceeded a timeout allowance,
|
||||
// canceling them and returning the responsible peers for penalisation.
|
||||
func (q *queue) ExpireHeaders(timeout time.Duration) map[string]int {
|
||||
// ExpireHeaders cancels a request that timed out and moves the pending fetch
|
||||
// task back into the queue for rescheduling.
|
||||
func (q *queue) ExpireHeaders(peer string) int {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
return q.expire(timeout, q.headerPendPool, q.headerTaskQueue, headerTimeoutMeter)
|
||||
headerTimeoutMeter.Mark(1)
|
||||
return q.expire(peer, q.headerPendPool, q.headerTaskQueue)
|
||||
}
|
||||
|
||||
// ExpireBodies checks for in flight block body requests that exceeded a timeout
|
||||
// allowance, canceling them and returning the responsible peers for penalisation.
|
||||
func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
|
||||
func (q *queue) ExpireBodies(peer string) int {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
return q.expire(timeout, q.blockPendPool, q.blockTaskQueue, bodyTimeoutMeter)
|
||||
bodyTimeoutMeter.Mark(1)
|
||||
return q.expire(peer, q.blockPendPool, q.blockTaskQueue)
|
||||
}
|
||||
|
||||
// ExpireReceipts checks for in flight receipt requests that exceeded a timeout
|
||||
// allowance, canceling them and returning the responsible peers for penalisation.
|
||||
func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
|
||||
func (q *queue) ExpireReceipts(peer string) int {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter)
|
||||
receiptTimeoutMeter.Mark(1)
|
||||
return q.expire(peer, q.receiptPendPool, q.receiptTaskQueue)
|
||||
}
|
||||
|
||||
// expire is the generic check that move expired tasks from a pending pool back
|
||||
// into a task pool, returning all entities caught with expired tasks.
|
||||
// expire is the generic check that moves a specific expired task from a pending
|
||||
// pool back into a task pool.
|
||||
//
|
||||
// Note, this method expects the queue lock to be already held. The
|
||||
// reason the lock is not obtained in here is because the parameters already need
|
||||
// to access the queue, so they already need a lock anyway.
|
||||
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
|
||||
// Iterate over the expired requests and return each to the queue
|
||||
expiries := make(map[string]int)
|
||||
for id, request := range pendPool {
|
||||
if time.Since(request.Time) > timeout {
|
||||
// Update the metrics with the timeout
|
||||
timeoutMeter.Mark(1)
|
||||
|
||||
// Return any non satisfied requests to the pool
|
||||
if request.From > 0 {
|
||||
taskQueue.Push(request.From, -int64(request.From))
|
||||
}
|
||||
for _, header := range request.Headers {
|
||||
taskQueue.Push(header, -int64(header.Number.Uint64()))
|
||||
}
|
||||
// Add the peer to the expiry report along the number of failed requests
|
||||
expiries[id] = len(request.Headers)
|
||||
|
||||
// Remove the expired requests from the pending pool directly
|
||||
delete(pendPool, id)
|
||||
}
|
||||
// Note, this method expects the queue lock to be already held. The reason the
|
||||
// lock is not obtained in here is that the parameters already need to access
|
||||
// the queue, so they already need a lock anyway.
|
||||
func (q *queue) expire(peer string, pendPool map[string]*fetchRequest, taskQueue *prque.Prque) int {
|
||||
// Retrieve the request being expired and log an error if it's non-existnet,
|
||||
// as there's no order of events that should lead to such expirations.
|
||||
req := pendPool[peer]
|
||||
if req == nil {
|
||||
log.Error("Expired request does not exist", "peer", peer)
|
||||
return 0
|
||||
}
|
||||
return expiries
|
||||
delete(pendPool, peer)
|
||||
|
||||
// Return any non-satisfied requests to the pool
|
||||
if req.From > 0 {
|
||||
taskQueue.Push(req.From, -int64(req.From))
|
||||
}
|
||||
for _, header := range req.Headers {
|
||||
taskQueue.Push(header, -int64(header.Number.Uint64()))
|
||||
}
|
||||
return len(req.Headers)
|
||||
}
|
||||
|
||||
// DeliverHeaders injects a header retrieval response into the header results
|
||||
@ -684,7 +654,7 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
|
||||
// if they do not map correctly to the skeleton.
|
||||
//
|
||||
// If the headers are accepted, the method makes an attempt to deliver the set
|
||||
// of ready headers to the processor to keep the pipeline full. However it will
|
||||
// of ready headers to the processor to keep the pipeline full. However, it will
|
||||
// not block to prevent stalling other pending deliveries.
|
||||
func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) {
|
||||
q.lock.Lock()
|
||||
@ -700,11 +670,14 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
|
||||
// Short circuit if the data was never requested
|
||||
request := q.headerPendPool[id]
|
||||
if request == nil {
|
||||
headerDropMeter.Mark(int64(len(headers)))
|
||||
return 0, errNoFetchesPending
|
||||
}
|
||||
headerReqTimer.UpdateSince(request.Time)
|
||||
delete(q.headerPendPool, id)
|
||||
|
||||
headerReqTimer.UpdateSince(request.Time)
|
||||
headerInMeter.Mark(int64(len(headers)))
|
||||
|
||||
// Ensure headers can be mapped onto the skeleton chain
|
||||
target := q.headerTaskPool[request.From].Hash()
|
||||
|
||||
@ -739,6 +712,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
|
||||
// If the batch of headers wasn't accepted, mark as unavailable
|
||||
if !accepted {
|
||||
logger.Trace("Skeleton filling not accepted", "from", request.From)
|
||||
headerDropMeter.Mark(int64(len(headers)))
|
||||
|
||||
miss := q.headerPeerMiss[id]
|
||||
if miss == nil {
|
||||
@ -783,6 +757,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
|
||||
func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
trieHasher := trie.NewStackTrie(nil)
|
||||
validate := func(index int, header *types.Header) error {
|
||||
if types.DeriveSha(types.Transactions(txLists[index]), trieHasher) != header.TxHash {
|
||||
@ -800,7 +775,7 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi
|
||||
result.SetBodyDone()
|
||||
}
|
||||
return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool,
|
||||
bodyReqTimer, len(txLists), validate, reconstruct)
|
||||
bodyReqTimer, bodyInMeter, bodyDropMeter, len(txLists), validate, reconstruct)
|
||||
}
|
||||
|
||||
// DeliverReceipts injects a receipt retrieval response into the results queue.
|
||||
@ -809,6 +784,7 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi
|
||||
func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
trieHasher := trie.NewStackTrie(nil)
|
||||
validate := func(index int, header *types.Header) error {
|
||||
if types.DeriveSha(types.Receipts(receiptList[index]), trieHasher) != header.ReceiptHash {
|
||||
@ -821,7 +797,7 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int,
|
||||
result.SetReceiptsDone()
|
||||
}
|
||||
return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool,
|
||||
receiptReqTimer, len(receiptList), validate, reconstruct)
|
||||
receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct)
|
||||
}
|
||||
|
||||
// deliver injects a data retrieval response into the results queue.
|
||||
@ -830,18 +806,22 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int,
|
||||
// reason this lock is not obtained in here is because the parameters already need
|
||||
// to access the queue, so they already need a lock anyway.
|
||||
func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
|
||||
taskQueue *prque.Prque, pendPool map[string]*fetchRequest, reqTimer metrics.Timer,
|
||||
taskQueue *prque.Prque, pendPool map[string]*fetchRequest,
|
||||
reqTimer metrics.Timer, resInMeter metrics.Meter, resDropMeter metrics.Meter,
|
||||
results int, validate func(index int, header *types.Header) error,
|
||||
reconstruct func(index int, result *fetchResult)) (int, error) {
|
||||
|
||||
// Short circuit if the data was never requested
|
||||
request := pendPool[id]
|
||||
if request == nil {
|
||||
resDropMeter.Mark(int64(results))
|
||||
return 0, errNoFetchesPending
|
||||
}
|
||||
reqTimer.UpdateSince(request.Time)
|
||||
delete(pendPool, id)
|
||||
|
||||
reqTimer.UpdateSince(request.Time)
|
||||
resInMeter.Mark(int64(results))
|
||||
|
||||
// If no data items were retrieved, mark them as unavailable for the origin peer
|
||||
if results == 0 {
|
||||
for _, header := range request.Headers {
|
||||
@ -883,6 +863,8 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
|
||||
delete(taskPool, hashes[accepted])
|
||||
accepted++
|
||||
}
|
||||
resDropMeter.Mark(int64(results - accepted))
|
||||
|
||||
// Return all failed or missing fetches to the queue
|
||||
for _, header := range request.Headers[accepted:] {
|
||||
taskQueue.Push(header, -int64(header.Number.Uint64()))
|
||||
|
Reference in New Issue
Block a user