eth/downloader: preallocate the block cache
This commit is contained in:
parent
1d7bf3d39f
commit
b40c796ff7
@ -341,12 +341,12 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
|
|||||||
active.getHashes(head)
|
active.getHashes(head)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// We're done, allocate the download cache and proceed pulling the blocks
|
// We're done, prepare the download cache and proceed pulling the blocks
|
||||||
offset := 0
|
offset := 0
|
||||||
if block := d.getBlock(head); block != nil {
|
if block := d.getBlock(head); block != nil {
|
||||||
offset = int(block.NumberU64() + 1)
|
offset = int(block.NumberU64() + 1)
|
||||||
}
|
}
|
||||||
d.queue.Alloc(offset)
|
d.queue.Prepare(offset)
|
||||||
finished = true
|
finished = true
|
||||||
|
|
||||||
case blockPack := <-d.blockCh:
|
case blockPack := <-d.blockCh:
|
||||||
@ -504,7 +504,7 @@ out:
|
|||||||
}
|
}
|
||||||
// Get a possible chunk. If nil is returned no chunk
|
// Get a possible chunk. If nil is returned no chunk
|
||||||
// could be returned due to no hashes available.
|
// could be returned due to no hashes available.
|
||||||
request := d.queue.Reserve(peer)
|
request := d.queue.Reserve(peer, peer.Capacity())
|
||||||
if request == nil {
|
if request == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -551,7 +551,7 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
|
|||||||
if peer == nil {
|
if peer == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
request := d.queue.Reserve(peer)
|
request := d.queue.Reserve(peer, MaxBlockFetch)
|
||||||
if request == nil {
|
if request == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -186,7 +186,7 @@ func TestSynchronisation(t *testing.T) {
|
|||||||
if err := tester.sync("peer", hashes[0]); err != nil {
|
if err := tester.sync("peer", hashes[0]); err != nil {
|
||||||
t.Fatalf("failed to synchronise blocks: %v", err)
|
t.Fatalf("failed to synchronise blocks: %v", err)
|
||||||
}
|
}
|
||||||
if queued := len(tester.downloader.queue.blockCache); queued != targetBlocks {
|
if queued := len(tester.downloader.queue.blockPool); queued != targetBlocks {
|
||||||
t.Fatalf("synchronised block mismatch: have %v, want %v", queued, targetBlocks)
|
t.Fatalf("synchronised block mismatch: have %v, want %v", queued, targetBlocks)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -50,10 +50,11 @@ type queue struct {
|
|||||||
// newQueue creates a new download queue for scheduling block retrieval.
|
// newQueue creates a new download queue for scheduling block retrieval.
|
||||||
func newQueue() *queue {
|
func newQueue() *queue {
|
||||||
return &queue{
|
return &queue{
|
||||||
hashPool: make(map[common.Hash]int),
|
hashPool: make(map[common.Hash]int),
|
||||||
hashQueue: prque.New(),
|
hashQueue: prque.New(),
|
||||||
pendPool: make(map[string]*fetchRequest),
|
pendPool: make(map[string]*fetchRequest),
|
||||||
blockPool: make(map[common.Hash]int),
|
blockPool: make(map[common.Hash]int),
|
||||||
|
blockCache: make([]*Block, blockCacheLimit),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -70,7 +71,7 @@ func (q *queue) Reset() {
|
|||||||
|
|
||||||
q.blockPool = make(map[common.Hash]int)
|
q.blockPool = make(map[common.Hash]int)
|
||||||
q.blockOffset = 0
|
q.blockOffset = 0
|
||||||
q.blockCache = nil
|
q.blockCache = make([]*Block, blockCacheLimit)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size retrieves the number of hashes in the queue, returning separately for
|
// Size retrieves the number of hashes in the queue, returning separately for
|
||||||
@ -208,7 +209,7 @@ func (q *queue) TakeBlocks() []*Block {
|
|||||||
|
|
||||||
// Reserve reserves a set of hashes for the given peer, skipping any previously
|
// Reserve reserves a set of hashes for the given peer, skipping any previously
|
||||||
// failed download.
|
// failed download.
|
||||||
func (q *queue) Reserve(p *peer) *fetchRequest {
|
func (q *queue) Reserve(p *peer, count int) *fetchRequest {
|
||||||
q.lock.Lock()
|
q.lock.Lock()
|
||||||
defer q.lock.Unlock()
|
defer q.lock.Unlock()
|
||||||
|
|
||||||
@ -345,20 +346,12 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Alloc ensures that the block cache is the correct size, given a starting
|
// Prepare configures the block cache offset to allow accepting inbound blocks.
|
||||||
// offset, and a memory cap.
|
func (q *queue) Prepare(offset int) {
|
||||||
func (q *queue) Alloc(offset int) {
|
|
||||||
q.lock.Lock()
|
q.lock.Lock()
|
||||||
defer q.lock.Unlock()
|
defer q.lock.Unlock()
|
||||||
|
|
||||||
if q.blockOffset < offset {
|
if q.blockOffset < offset {
|
||||||
q.blockOffset = offset
|
q.blockOffset = offset
|
||||||
}
|
}
|
||||||
size := len(q.hashPool)
|
|
||||||
if size > blockCacheLimit {
|
|
||||||
size = blockCacheLimit
|
|
||||||
}
|
|
||||||
if len(q.blockCache) < size {
|
|
||||||
q.blockCache = append(q.blockCache, make([]*Block, size-len(q.blockCache))...)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user