eth/downloader: make fast sync resilient to critical section fails
This commit is contained in:
@ -73,6 +73,7 @@ var (
|
||||
fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
|
||||
fsPivotInterval = 512 // Number of headers out of which to randomize the pivot point
|
||||
fsMinFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync
|
||||
fsCriticalTrials = 10 // Number of times to retry in the cricical section before bailing
|
||||
)
|
||||
|
||||
var (
|
||||
@ -103,13 +104,15 @@ var (
|
||||
)
|
||||
|
||||
type Downloader struct {
|
||||
mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
|
||||
noFast bool // Flag to disable fast syncing in case of a security error
|
||||
mux *event.TypeMux // Event multiplexer to announce sync operation events
|
||||
mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
|
||||
mux *event.TypeMux // Event multiplexer to announce sync operation events
|
||||
|
||||
queue *queue // Scheduler for selecting the hashes to download
|
||||
peers *peerSet // Set of active peers from which download can proceed
|
||||
|
||||
fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
|
||||
fsPivotFails int // Number of fast sync failures in the critical section
|
||||
|
||||
interrupt int32 // Atomic boolean to signal termination
|
||||
|
||||
// Statistics
|
||||
@ -314,6 +317,15 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
|
||||
default:
|
||||
}
|
||||
}
|
||||
for _, ch := range []chan dataPack{d.hashCh, d.blockCh, d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} {
|
||||
for empty := false; !empty; {
|
||||
select {
|
||||
case <-ch:
|
||||
default:
|
||||
empty = true
|
||||
}
|
||||
}
|
||||
}
|
||||
for empty := false; !empty; {
|
||||
select {
|
||||
case <-d.headerProcCh:
|
||||
@ -330,7 +342,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
|
||||
|
||||
// Set the requested sync mode, unless it's forbidden
|
||||
d.mode = mode
|
||||
if d.mode == FastSync && d.noFast {
|
||||
if d.mode == FastSync && d.fsPivotFails >= fsCriticalTrials {
|
||||
d.mode = FullSync
|
||||
}
|
||||
// Retrieve the origin peer and initiate the downloading process
|
||||
@ -413,12 +425,17 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
|
||||
pivot = height
|
||||
case FastSync:
|
||||
// Calculate the new fast/slow sync pivot point
|
||||
pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
|
||||
}
|
||||
if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
|
||||
pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
|
||||
if d.fsPivotLock == nil {
|
||||
pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
|
||||
}
|
||||
if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
|
||||
pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
|
||||
}
|
||||
} else {
|
||||
// Pivot point locked in, use this and do not pick a new one!
|
||||
pivot = d.fsPivotLock.Number.Uint64()
|
||||
}
|
||||
// If the point is below the origin, move origin back to ensure state download
|
||||
if pivot < origin {
|
||||
@ -1218,8 +1235,12 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
|
||||
// If no more headers are inbound, notify the content fetchers and return
|
||||
if packet.Items() == 0 {
|
||||
glog.V(logger.Debug).Infof("%v: no available headers", p)
|
||||
d.headerProcCh <- nil
|
||||
return nil
|
||||
select {
|
||||
case d.headerProcCh <- nil:
|
||||
return nil
|
||||
case <-d.cancelCh:
|
||||
return errCancelHeaderFetch
|
||||
}
|
||||
}
|
||||
headers := packet.(*headerPack).headers
|
||||
|
||||
@ -1611,9 +1632,18 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
|
||||
glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
|
||||
len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number())
|
||||
|
||||
// If we're already past the pivot point, this could be an attack, disable fast sync
|
||||
// If we're already past the pivot point, this could be an attack, thread carefully
|
||||
if rollback[len(rollback)-1].Number.Uint64() > pivot {
|
||||
d.noFast = true
|
||||
// If we didn't ever fail, lock in te pivot header (must! not! change!)
|
||||
if d.fsPivotFails == 0 {
|
||||
for _, header := range rollback {
|
||||
if header.Number.Uint64() == pivot {
|
||||
glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4])
|
||||
d.fsPivotLock = header
|
||||
}
|
||||
}
|
||||
}
|
||||
d.fsPivotFails++
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -1712,6 +1742,13 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
|
||||
rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
|
||||
}
|
||||
}
|
||||
// If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
|
||||
if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot {
|
||||
if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() {
|
||||
glog.V(logger.Warn).Infof("Pivot doesn't match locked in version: have #%v [%x…], want #%v [%x…]", pivot.Number, pivot.Hash().Bytes()[:4], d.fsPivotLock.Number, d.fsPivotLock.Hash().Bytes()[:4])
|
||||
return errInvalidChain
|
||||
}
|
||||
}
|
||||
// Unless we're doing light chains, schedule the headers for associated content retrieval
|
||||
if d.mode == FullSync || d.mode == FastSync {
|
||||
// If we've reached the allowed number of pending headers, stall a bit
|
||||
|
Reference in New Issue
Block a user