eth, rpc: standardize the chain sync progress counters
This commit is contained in:
@ -130,10 +130,9 @@ type Downloader struct {
|
||||
interrupt int32 // Atomic boolean to signal termination
|
||||
|
||||
// Statistics
|
||||
importStart time.Time // Instance when the last blocks were taken from the cache
|
||||
importQueue []*Block // Previously taken blocks to check import progress
|
||||
importDone int // Number of taken blocks already imported from the last batch
|
||||
importLock sync.Mutex
|
||||
syncStatsOrigin uint64 // Origin block number where syncing started at
|
||||
syncStatsHeight uint64 // Highest block number known when syncing started
|
||||
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
|
||||
|
||||
// Callbacks
|
||||
hasBlock hashCheckFn // Checks if a block is present in the chain
|
||||
@ -161,6 +160,7 @@ type Downloader struct {
|
||||
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
|
||||
|
||||
// Testing hooks
|
||||
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
|
||||
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
|
||||
chainInsertHook func([]*Block) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
|
||||
}
|
||||
@ -192,27 +192,14 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, he
|
||||
}
|
||||
}
|
||||
|
||||
// Stats retrieves the current status of the downloader.
|
||||
func (d *Downloader) Stats() (pending int, cached int, importing int, estimate time.Duration) {
|
||||
// Fetch the download status
|
||||
pending, cached = d.queue.Size()
|
||||
// Boundaries retrieves the synchronisation boundaries, specifically the origin
|
||||
// block where synchronisation started at (may have failed/suspended) and the
|
||||
// latest known block which the synchonisation targets.
|
||||
func (d *Downloader) Boundaries() (uint64, uint64) {
|
||||
d.syncStatsLock.RLock()
|
||||
defer d.syncStatsLock.RUnlock()
|
||||
|
||||
// Figure out the import progress
|
||||
d.importLock.Lock()
|
||||
defer d.importLock.Unlock()
|
||||
|
||||
for len(d.importQueue) > 0 && d.hasBlock(d.importQueue[0].RawBlock.Hash()) {
|
||||
d.importQueue = d.importQueue[1:]
|
||||
d.importDone++
|
||||
}
|
||||
importing = len(d.importQueue)
|
||||
|
||||
// Make an estimate on the total sync
|
||||
estimate = 0
|
||||
if d.importDone > 0 {
|
||||
estimate = time.Since(d.importStart) / time.Duration(d.importDone) * time.Duration(pending+cached+importing)
|
||||
}
|
||||
return
|
||||
return d.syncStatsOrigin, d.syncStatsHeight
|
||||
}
|
||||
|
||||
// Synchronising returns whether the downloader is currently retrieving blocks.
|
||||
@ -333,14 +320,29 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
|
||||
|
||||
switch {
|
||||
case p.version == eth61:
|
||||
// Old eth/61, use forward, concurrent hash and block retrieval algorithm
|
||||
number, err := d.findAncestor61(p)
|
||||
// Look up the sync boundaries: the common ancestor and the target block
|
||||
latest, err := d.fetchHeight61(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
origin, err := d.findAncestor61(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.syncStatsLock.Lock()
|
||||
if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin {
|
||||
d.syncStatsOrigin = origin
|
||||
}
|
||||
d.syncStatsHeight = latest
|
||||
d.syncStatsLock.Unlock()
|
||||
|
||||
// Initiate the sync using a concurrent hash and block retrieval algorithm
|
||||
if d.syncInitHook != nil {
|
||||
d.syncInitHook(origin, latest)
|
||||
}
|
||||
errc := make(chan error, 2)
|
||||
go func() { errc <- d.fetchHashes61(p, td, number+1) }()
|
||||
go func() { errc <- d.fetchBlocks61(number + 1) }()
|
||||
go func() { errc <- d.fetchHashes61(p, td, origin+1) }()
|
||||
go func() { errc <- d.fetchBlocks61(origin + 1) }()
|
||||
|
||||
// If any fetcher fails, cancel the other
|
||||
if err := <-errc; err != nil {
|
||||
@ -351,14 +353,29 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
|
||||
return <-errc
|
||||
|
||||
case p.version >= eth62:
|
||||
// New eth/62, use forward, concurrent header and block body retrieval algorithm
|
||||
number, err := d.findAncestor(p)
|
||||
// Look up the sync boundaries: the common ancestor and the target block
|
||||
latest, err := d.fetchHeight(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
origin, err := d.findAncestor(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.syncStatsLock.Lock()
|
||||
if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin {
|
||||
d.syncStatsOrigin = origin
|
||||
}
|
||||
d.syncStatsHeight = latest
|
||||
d.syncStatsLock.Unlock()
|
||||
|
||||
// Initiate the sync using a concurrent hash and block retrieval algorithm
|
||||
if d.syncInitHook != nil {
|
||||
d.syncInitHook(origin, latest)
|
||||
}
|
||||
errc := make(chan error, 2)
|
||||
go func() { errc <- d.fetchHeaders(p, td, number+1) }()
|
||||
go func() { errc <- d.fetchBodies(number + 1) }()
|
||||
go func() { errc <- d.fetchHeaders(p, td, origin+1) }()
|
||||
go func() { errc <- d.fetchBodies(origin + 1) }()
|
||||
|
||||
// If any fetcher fails, cancel the other
|
||||
if err := <-errc; err != nil {
|
||||
@ -401,6 +418,50 @@ func (d *Downloader) Terminate() {
|
||||
d.cancel()
|
||||
}
|
||||
|
||||
// fetchHeight61 retrieves the head block of the remote peer to aid in estimating
|
||||
// the total time a pending synchronisation would take.
|
||||
func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
|
||||
glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
|
||||
|
||||
// Request the advertised remote head block and wait for the response
|
||||
go p.getBlocks([]common.Hash{p.head})
|
||||
|
||||
timeout := time.After(blockSoftTTL)
|
||||
for {
|
||||
select {
|
||||
case <-d.cancelCh:
|
||||
return 0, errCancelBlockFetch
|
||||
|
||||
case <-d.headerCh:
|
||||
// Out of bounds eth/62 block headers received, ignore them
|
||||
|
||||
case <-d.bodyCh:
|
||||
// Out of bounds eth/62 block bodies received, ignore them
|
||||
|
||||
case <-d.hashCh:
|
||||
// Out of bounds hashes received, ignore them
|
||||
|
||||
case blockPack := <-d.blockCh:
|
||||
// Discard anything not from the origin peer
|
||||
if blockPack.peerId != p.id {
|
||||
glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", blockPack.peerId)
|
||||
break
|
||||
}
|
||||
// Make sure the peer actually gave something valid
|
||||
blocks := blockPack.blocks
|
||||
if len(blocks) != 1 {
|
||||
glog.V(logger.Debug).Infof("%v: invalid number of head blocks: %d != 1", p, len(blocks))
|
||||
return 0, errBadPeer
|
||||
}
|
||||
return blocks[0].NumberU64(), nil
|
||||
|
||||
case <-timeout:
|
||||
glog.V(logger.Debug).Infof("%v: head block timeout", p)
|
||||
return 0, errTimeout
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// findAncestor61 tries to locate the common ancestor block of the local chain and
|
||||
// a remote peers blockchain. In the general case when our node was in sync and
|
||||
// on the correct chain, checking the top N blocks should already get us a match.
|
||||
@ -776,6 +837,50 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
|
||||
}
|
||||
}
|
||||
|
||||
// fetchHeight retrieves the head header of the remote peer to aid in estimating
|
||||
// the total time a pending synchronisation would take.
|
||||
func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
|
||||
glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
|
||||
|
||||
// Request the advertised remote head block and wait for the response
|
||||
go p.getRelHeaders(p.head, 1, 0, false)
|
||||
|
||||
timeout := time.After(headerTTL)
|
||||
for {
|
||||
select {
|
||||
case <-d.cancelCh:
|
||||
return 0, errCancelBlockFetch
|
||||
|
||||
case headerPack := <-d.headerCh:
|
||||
// Discard anything not from the origin peer
|
||||
if headerPack.peerId != p.id {
|
||||
glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId)
|
||||
break
|
||||
}
|
||||
// Make sure the peer actually gave something valid
|
||||
headers := headerPack.headers
|
||||
if len(headers) != 1 {
|
||||
glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
|
||||
return 0, errBadPeer
|
||||
}
|
||||
return headers[0].Number.Uint64(), nil
|
||||
|
||||
case <-d.bodyCh:
|
||||
// Out of bounds block bodies received, ignore them
|
||||
|
||||
case <-d.hashCh:
|
||||
// Out of bounds eth/61 hashes received, ignore them
|
||||
|
||||
case <-d.blockCh:
|
||||
// Out of bounds eth/61 blocks received, ignore them
|
||||
|
||||
case <-timeout:
|
||||
glog.V(logger.Debug).Infof("%v: head header timeout", p)
|
||||
return 0, errTimeout
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// findAncestor tries to locate the common ancestor block of the local chain and
|
||||
// a remote peers blockchain. In the general case when our node was in sync and
|
||||
// on the correct chain, checking the top N blocks should already get us a match.
|
||||
@ -1203,16 +1308,10 @@ func (d *Downloader) process() {
|
||||
d.process()
|
||||
}
|
||||
}()
|
||||
// Release the lock upon exit (note, before checking for reentry!), and set
|
||||
// Release the lock upon exit (note, before checking for reentry!)
|
||||
// the import statistics to zero.
|
||||
defer func() {
|
||||
d.importLock.Lock()
|
||||
d.importQueue = nil
|
||||
d.importDone = 0
|
||||
d.importLock.Unlock()
|
||||
defer atomic.StoreInt32(&d.processing, 0)
|
||||
|
||||
atomic.StoreInt32(&d.processing, 0)
|
||||
}()
|
||||
// Repeat the processing as long as there are blocks to import
|
||||
for {
|
||||
// Fetch the next batch of blocks
|
||||
@ -1223,13 +1322,6 @@ func (d *Downloader) process() {
|
||||
if d.chainInsertHook != nil {
|
||||
d.chainInsertHook(blocks)
|
||||
}
|
||||
// Reset the import statistics
|
||||
d.importLock.Lock()
|
||||
d.importStart = time.Now()
|
||||
d.importQueue = blocks
|
||||
d.importDone = 0
|
||||
d.importLock.Unlock()
|
||||
|
||||
// Actually import the blocks
|
||||
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
|
||||
for len(blocks) != 0 {
|
||||
|
Reference in New Issue
Block a user