eth/downloader: fetch data proportionally to peer capacity
This commit is contained in:
@ -703,7 +703,7 @@ func (q *queue) Revoke(peerId string) {
|
||||
|
||||
// ExpireBlocks checks for in flight requests that exceeded a timeout allowance,
|
||||
// canceling them and returning the responsible peers for penalisation.
|
||||
func (q *queue) ExpireBlocks(timeout time.Duration) []string {
|
||||
func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
@ -712,7 +712,7 @@ func (q *queue) ExpireBlocks(timeout time.Duration) []string {
|
||||
|
||||
// ExpireBodies checks for in flight block body requests that exceeded a timeout
|
||||
// allowance, canceling them and returning the responsible peers for penalisation.
|
||||
func (q *queue) ExpireBodies(timeout time.Duration) []string {
|
||||
func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
@ -721,7 +721,7 @@ func (q *queue) ExpireBodies(timeout time.Duration) []string {
|
||||
|
||||
// ExpireReceipts checks for in flight receipt requests that exceeded a timeout
|
||||
// allowance, canceling them and returning the responsible peers for penalisation.
|
||||
func (q *queue) ExpireReceipts(timeout time.Duration) []string {
|
||||
func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
@ -730,7 +730,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) []string {
|
||||
|
||||
// ExpireNodeData checks for in flight node data requests that exceeded a timeout
|
||||
// allowance, canceling them and returning the responsible peers for penalisation.
|
||||
func (q *queue) ExpireNodeData(timeout time.Duration) []string {
|
||||
func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
@ -743,9 +743,9 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string {
|
||||
// Note, this method expects the queue lock to be already held. The
|
||||
// reason the lock is not obtained in here is because the parameters already need
|
||||
// to access the queue, so they already need a lock anyway.
|
||||
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string {
|
||||
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
|
||||
// Iterate over the expired requests and return each to the queue
|
||||
peers := []string{}
|
||||
expiries := make(map[string]int)
|
||||
for id, request := range pendPool {
|
||||
if time.Since(request.Time) > timeout {
|
||||
// Update the metrics with the timeout
|
||||
@ -758,25 +758,32 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
|
||||
for _, header := range request.Headers {
|
||||
taskQueue.Push(header, -float32(header.Number.Uint64()))
|
||||
}
|
||||
peers = append(peers, id)
|
||||
// Add the peer to the expiry report along the the number of failed requests
|
||||
expirations := len(request.Hashes)
|
||||
if expirations < len(request.Headers) {
|
||||
expirations = len(request.Headers)
|
||||
}
|
||||
expiries[id] = expirations
|
||||
}
|
||||
}
|
||||
// Remove the expired requests from the pending pool
|
||||
for _, id := range peers {
|
||||
for id, _ := range expiries {
|
||||
delete(pendPool, id)
|
||||
}
|
||||
return peers
|
||||
return expiries
|
||||
}
|
||||
|
||||
// DeliverBlocks injects a block retrieval response into the download queue.
|
||||
func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
|
||||
// DeliverBlocks injects a block retrieval response into the download queue. The
|
||||
// method returns the number of blocks accepted from the delivery and also wakes
|
||||
// any threads waiting for data delivery.
|
||||
func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
// Short circuit if the blocks were never requested
|
||||
request := q.blockPendPool[id]
|
||||
if request == nil {
|
||||
return errNoFetchesPending
|
||||
return 0, errNoFetchesPending
|
||||
}
|
||||
blockReqTimer.UpdateSince(request.Time)
|
||||
delete(q.blockPendPool, id)
|
||||
@ -788,7 +795,7 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
|
||||
}
|
||||
}
|
||||
// Iterate over the downloaded blocks and add each of them
|
||||
errs := make([]error, 0)
|
||||
accepted, errs := 0, make([]error, 0)
|
||||
for _, block := range blocks {
|
||||
// Skip any blocks that were not requested
|
||||
hash := block.Hash()
|
||||
@ -811,28 +818,33 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
|
||||
|
||||
delete(request.Hashes, hash)
|
||||
delete(q.hashPool, hash)
|
||||
accepted++
|
||||
}
|
||||
// Return all failed or missing fetches to the queue
|
||||
for hash, index := range request.Hashes {
|
||||
q.hashQueue.Push(hash, float32(index))
|
||||
}
|
||||
// Wake up WaitResults
|
||||
q.active.Signal()
|
||||
if accepted > 0 {
|
||||
q.active.Signal()
|
||||
}
|
||||
// If none of the blocks were good, it's a stale delivery
|
||||
switch {
|
||||
case len(errs) == 0:
|
||||
return nil
|
||||
return accepted, nil
|
||||
case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
|
||||
return errs[0]
|
||||
return accepted, errs[0]
|
||||
case len(errs) == len(blocks):
|
||||
return errStaleDelivery
|
||||
return accepted, errStaleDelivery
|
||||
default:
|
||||
return fmt.Errorf("multiple failures: %v", errs)
|
||||
return accepted, fmt.Errorf("multiple failures: %v", errs)
|
||||
}
|
||||
}
|
||||
|
||||
// DeliverBodies injects a block body retrieval response into the results queue.
|
||||
func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error {
|
||||
// The method returns the number of blocks bodies accepted from the delivery and
|
||||
// also wakes any threads waiting for data delivery.
|
||||
func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
@ -848,7 +860,9 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi
|
||||
}
|
||||
|
||||
// DeliverReceipts injects a receipt retrieval response into the results queue.
|
||||
func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error {
|
||||
// The method returns the number of transaction receipts accepted from the delivery
|
||||
// and also wakes any threads waiting for data delivery.
|
||||
func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
@ -867,12 +881,14 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error
|
||||
// Note, this method expects the queue lock to be already held for writing. The
|
||||
// reason the lock is not obtained in here is because the parameters already need
|
||||
// to access the queue, so they already need a lock anyway.
|
||||
func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest,
|
||||
donePool map[common.Hash]struct{}, reqTimer metrics.Timer, results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) error {
|
||||
func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
|
||||
pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
|
||||
results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {
|
||||
|
||||
// Short circuit if the data was never requested
|
||||
request := pendPool[id]
|
||||
if request == nil {
|
||||
return errNoFetchesPending
|
||||
return 0, errNoFetchesPending
|
||||
}
|
||||
reqTimer.UpdateSince(request.Time)
|
||||
delete(pendPool, id)
|
||||
@ -885,8 +901,9 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
|
||||
}
|
||||
// Assemble each of the results with their headers and retrieved data parts
|
||||
var (
|
||||
failure error
|
||||
useful bool
|
||||
accepted int
|
||||
failure error
|
||||
useful bool
|
||||
)
|
||||
for i, header := range request.Headers {
|
||||
// Short circuit assembly if no more fetch results are found
|
||||
@ -906,6 +923,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
|
||||
donePool[header.Hash()] = struct{}{}
|
||||
q.resultCache[index].Pending--
|
||||
useful = true
|
||||
accepted++
|
||||
|
||||
// Clean up a successful fetch
|
||||
request.Headers[i] = nil
|
||||
@ -918,27 +936,31 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
|
||||
}
|
||||
}
|
||||
// Wake up WaitResults
|
||||
q.active.Signal()
|
||||
if accepted > 0 {
|
||||
q.active.Signal()
|
||||
}
|
||||
// If none of the data was good, it's a stale delivery
|
||||
switch {
|
||||
case failure == nil || failure == errInvalidChain:
|
||||
return failure
|
||||
return accepted, failure
|
||||
case useful:
|
||||
return fmt.Errorf("partial failure: %v", failure)
|
||||
return accepted, fmt.Errorf("partial failure: %v", failure)
|
||||
default:
|
||||
return errStaleDelivery
|
||||
return accepted, errStaleDelivery
|
||||
}
|
||||
}
|
||||
|
||||
// DeliverNodeData injects a node state data retrieval response into the queue.
|
||||
func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) error {
|
||||
// The method returns the number of node state entries originally requested, and
|
||||
// the number of them actually accepted from the delivery.
|
||||
func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
// Short circuit if the data was never requested
|
||||
request := q.statePendPool[id]
|
||||
if request == nil {
|
||||
return errNoFetchesPending
|
||||
return 0, errNoFetchesPending
|
||||
}
|
||||
stateReqTimer.UpdateSince(request.Time)
|
||||
delete(q.statePendPool, id)
|
||||
@ -950,10 +972,10 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
|
||||
}
|
||||
}
|
||||
// Iterate over the downloaded data and verify each of them
|
||||
errs := make([]error, 0)
|
||||
accepted, errs := 0, make([]error, 0)
|
||||
process := []trie.SyncResult{}
|
||||
for _, blob := range data {
|
||||
// Skip any blocks that were not requested
|
||||
// Skip any state trie entires that were not requested
|
||||
hash := common.BytesToHash(crypto.Sha3(blob))
|
||||
if _, ok := request.Hashes[hash]; !ok {
|
||||
errs = append(errs, fmt.Errorf("non-requested state data %x", hash))
|
||||
@ -961,6 +983,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
|
||||
}
|
||||
// Inject the next state trie item into the processing queue
|
||||
process = append(process, trie.SyncResult{hash, blob})
|
||||
accepted++
|
||||
|
||||
delete(request.Hashes, hash)
|
||||
delete(q.stateTaskPool, hash)
|
||||
@ -978,11 +1001,11 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
|
||||
// If none of the data items were good, it's a stale delivery
|
||||
switch {
|
||||
case len(errs) == 0:
|
||||
return nil
|
||||
return accepted, nil
|
||||
case len(errs) == len(request.Hashes):
|
||||
return errStaleDelivery
|
||||
return accepted, errStaleDelivery
|
||||
default:
|
||||
return fmt.Errorf("multiple failures: %v", errs)
|
||||
return accepted, fmt.Errorf("multiple failures: %v", errs)
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user