Merge pull request #2647 from karalabe/fastsync-critical-resilience
eth/downloader: make fast sync resilient to critical section fails
This commit is contained in:
		@@ -73,6 +73,7 @@ var (
 | 
				
			|||||||
	fsHeaderForceVerify    = 24   // Number of headers to verify before and after the pivot to accept it
 | 
						fsHeaderForceVerify    = 24   // Number of headers to verify before and after the pivot to accept it
 | 
				
			||||||
	fsPivotInterval        = 512  // Number of headers out of which to randomize the pivot point
 | 
						fsPivotInterval        = 512  // Number of headers out of which to randomize the pivot point
 | 
				
			||||||
	fsMinFullBlocks        = 1024 // Number of blocks to retrieve fully even in fast sync
 | 
						fsMinFullBlocks        = 1024 // Number of blocks to retrieve fully even in fast sync
 | 
				
			||||||
 | 
						fsCriticalTrials       = 10   // Number of times to retry in the cricical section before bailing
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var (
 | 
					var (
 | 
				
			||||||
@@ -103,13 +104,15 @@ var (
 | 
				
			|||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type Downloader struct {
 | 
					type Downloader struct {
 | 
				
			||||||
	mode   SyncMode       // Synchronisation mode defining the strategy used (per sync cycle)
 | 
						mode SyncMode       // Synchronisation mode defining the strategy used (per sync cycle)
 | 
				
			||||||
	noFast bool           // Flag to disable fast syncing in case of a security error
 | 
						mux  *event.TypeMux // Event multiplexer to announce sync operation events
 | 
				
			||||||
	mux    *event.TypeMux // Event multiplexer to announce sync operation events
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	queue *queue   // Scheduler for selecting the hashes to download
 | 
						queue *queue   // Scheduler for selecting the hashes to download
 | 
				
			||||||
	peers *peerSet // Set of active peers from which download can proceed
 | 
						peers *peerSet // Set of active peers from which download can proceed
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						fsPivotLock  *types.Header // Pivot header on critical section entry (cannot change between retries)
 | 
				
			||||||
 | 
						fsPivotFails int           // Number of fast sync failures in the critical section
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	interrupt int32 // Atomic boolean to signal termination
 | 
						interrupt int32 // Atomic boolean to signal termination
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Statistics
 | 
						// Statistics
 | 
				
			||||||
@@ -314,6 +317,15 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
 | 
				
			|||||||
		default:
 | 
							default:
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						for _, ch := range []chan dataPack{d.hashCh, d.blockCh, d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} {
 | 
				
			||||||
 | 
							for empty := false; !empty; {
 | 
				
			||||||
 | 
								select {
 | 
				
			||||||
 | 
								case <-ch:
 | 
				
			||||||
 | 
								default:
 | 
				
			||||||
 | 
									empty = true
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	for empty := false; !empty; {
 | 
						for empty := false; !empty; {
 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
		case <-d.headerProcCh:
 | 
							case <-d.headerProcCh:
 | 
				
			||||||
@@ -330,7 +342,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// Set the requested sync mode, unless it's forbidden
 | 
						// Set the requested sync mode, unless it's forbidden
 | 
				
			||||||
	d.mode = mode
 | 
						d.mode = mode
 | 
				
			||||||
	if d.mode == FastSync && d.noFast {
 | 
						if d.mode == FastSync && d.fsPivotFails >= fsCriticalTrials {
 | 
				
			||||||
		d.mode = FullSync
 | 
							d.mode = FullSync
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// Retrieve the origin peer and initiate the downloading process
 | 
						// Retrieve the origin peer and initiate the downloading process
 | 
				
			||||||
@@ -413,12 +425,17 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
 | 
				
			|||||||
			pivot = height
 | 
								pivot = height
 | 
				
			||||||
		case FastSync:
 | 
							case FastSync:
 | 
				
			||||||
			// Calculate the new fast/slow sync pivot point
 | 
								// Calculate the new fast/slow sync pivot point
 | 
				
			||||||
			pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
 | 
								if d.fsPivotLock == nil {
 | 
				
			||||||
			if err != nil {
 | 
									pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
 | 
				
			||||||
				panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
 | 
									if err != nil {
 | 
				
			||||||
			}
 | 
										panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
 | 
				
			||||||
			if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
 | 
									}
 | 
				
			||||||
				pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
 | 
									if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
 | 
				
			||||||
 | 
										pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								} else {
 | 
				
			||||||
 | 
									// Pivot point locked in, use this and do not pick a new one!
 | 
				
			||||||
 | 
									pivot = d.fsPivotLock.Number.Uint64()
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			// If the point is below the origin, move origin back to ensure state download
 | 
								// If the point is below the origin, move origin back to ensure state download
 | 
				
			||||||
			if pivot < origin {
 | 
								if pivot < origin {
 | 
				
			||||||
@@ -1218,8 +1235,12 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
 | 
				
			|||||||
			// If no more headers are inbound, notify the content fetchers and return
 | 
								// If no more headers are inbound, notify the content fetchers and return
 | 
				
			||||||
			if packet.Items() == 0 {
 | 
								if packet.Items() == 0 {
 | 
				
			||||||
				glog.V(logger.Debug).Infof("%v: no available headers", p)
 | 
									glog.V(logger.Debug).Infof("%v: no available headers", p)
 | 
				
			||||||
				d.headerProcCh <- nil
 | 
									select {
 | 
				
			||||||
				return nil
 | 
									case d.headerProcCh <- nil:
 | 
				
			||||||
 | 
										return nil
 | 
				
			||||||
 | 
									case <-d.cancelCh:
 | 
				
			||||||
 | 
										return errCancelHeaderFetch
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			headers := packet.(*headerPack).headers
 | 
								headers := packet.(*headerPack).headers
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -1611,9 +1632,18 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
 | 
				
			|||||||
			glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
 | 
								glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
 | 
				
			||||||
				len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number())
 | 
									len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			// If we're already past the pivot point, this could be an attack, disable fast sync
 | 
								// If we're already past the pivot point, this could be an attack, thread carefully
 | 
				
			||||||
			if rollback[len(rollback)-1].Number.Uint64() > pivot {
 | 
								if rollback[len(rollback)-1].Number.Uint64() > pivot {
 | 
				
			||||||
				d.noFast = true
 | 
									// If we didn't ever fail, lock in te pivot header (must! not! change!)
 | 
				
			||||||
 | 
									if d.fsPivotFails == 0 {
 | 
				
			||||||
 | 
										for _, header := range rollback {
 | 
				
			||||||
 | 
											if header.Number.Uint64() == pivot {
 | 
				
			||||||
 | 
												glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4])
 | 
				
			||||||
 | 
												d.fsPivotLock = header
 | 
				
			||||||
 | 
											}
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									d.fsPivotFails++
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
@@ -1712,6 +1742,13 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
 | 
				
			|||||||
						rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
 | 
											rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
 | 
									// If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
 | 
				
			||||||
 | 
									if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot {
 | 
				
			||||||
 | 
										if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() {
 | 
				
			||||||
 | 
											glog.V(logger.Warn).Infof("Pivot doesn't match locked in version: have #%v [%x…], want #%v [%x…]", pivot.Number, pivot.Hash().Bytes()[:4], d.fsPivotLock.Number, d.fsPivotLock.Hash().Bytes()[:4])
 | 
				
			||||||
 | 
											return errInvalidChain
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
				// Unless we're doing light chains, schedule the headers for associated content retrieval
 | 
									// Unless we're doing light chains, schedule the headers for associated content retrieval
 | 
				
			||||||
				if d.mode == FullSync || d.mode == FastSync {
 | 
									if d.mode == FullSync || d.mode == FastSync {
 | 
				
			||||||
					// If we've reached the allowed number of pending headers, stall a bit
 | 
										// If we've reached the allowed number of pending headers, stall a bit
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -149,22 +149,25 @@ type downloadTester struct {
 | 
				
			|||||||
	peerReceipts map[string]map[common.Hash]types.Receipts // Receipts belonging to different test peers
 | 
						peerReceipts map[string]map[common.Hash]types.Receipts // Receipts belonging to different test peers
 | 
				
			||||||
	peerChainTds map[string]map[common.Hash]*big.Int       // Total difficulties of the blocks in the peer chains
 | 
						peerChainTds map[string]map[common.Hash]*big.Int       // Total difficulties of the blocks in the peer chains
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						peerMissingStates map[string]map[common.Hash]bool // State entries that fast sync should not return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	lock sync.RWMutex
 | 
						lock sync.RWMutex
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// newTester creates a new downloader test mocker.
 | 
					// newTester creates a new downloader test mocker.
 | 
				
			||||||
func newTester() *downloadTester {
 | 
					func newTester() *downloadTester {
 | 
				
			||||||
	tester := &downloadTester{
 | 
						tester := &downloadTester{
 | 
				
			||||||
		ownHashes:    []common.Hash{genesis.Hash()},
 | 
							ownHashes:         []common.Hash{genesis.Hash()},
 | 
				
			||||||
		ownHeaders:   map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
 | 
							ownHeaders:        map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
 | 
				
			||||||
		ownBlocks:    map[common.Hash]*types.Block{genesis.Hash(): genesis},
 | 
							ownBlocks:         map[common.Hash]*types.Block{genesis.Hash(): genesis},
 | 
				
			||||||
		ownReceipts:  map[common.Hash]types.Receipts{genesis.Hash(): nil},
 | 
							ownReceipts:       map[common.Hash]types.Receipts{genesis.Hash(): nil},
 | 
				
			||||||
		ownChainTd:   map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()},
 | 
							ownChainTd:        map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()},
 | 
				
			||||||
		peerHashes:   make(map[string][]common.Hash),
 | 
							peerHashes:        make(map[string][]common.Hash),
 | 
				
			||||||
		peerHeaders:  make(map[string]map[common.Hash]*types.Header),
 | 
							peerHeaders:       make(map[string]map[common.Hash]*types.Header),
 | 
				
			||||||
		peerBlocks:   make(map[string]map[common.Hash]*types.Block),
 | 
							peerBlocks:        make(map[string]map[common.Hash]*types.Block),
 | 
				
			||||||
		peerReceipts: make(map[string]map[common.Hash]types.Receipts),
 | 
							peerReceipts:      make(map[string]map[common.Hash]types.Receipts),
 | 
				
			||||||
		peerChainTds: make(map[string]map[common.Hash]*big.Int),
 | 
							peerChainTds:      make(map[string]map[common.Hash]*big.Int),
 | 
				
			||||||
 | 
							peerMissingStates: make(map[string]map[common.Hash]bool),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	tester.stateDb, _ = ethdb.NewMemDatabase()
 | 
						tester.stateDb, _ = ethdb.NewMemDatabase()
 | 
				
			||||||
	tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00})
 | 
						tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00})
 | 
				
			||||||
@@ -408,6 +411,7 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha
 | 
				
			|||||||
		dl.peerBlocks[id] = make(map[common.Hash]*types.Block)
 | 
							dl.peerBlocks[id] = make(map[common.Hash]*types.Block)
 | 
				
			||||||
		dl.peerReceipts[id] = make(map[common.Hash]types.Receipts)
 | 
							dl.peerReceipts[id] = make(map[common.Hash]types.Receipts)
 | 
				
			||||||
		dl.peerChainTds[id] = make(map[common.Hash]*big.Int)
 | 
							dl.peerChainTds[id] = make(map[common.Hash]*big.Int)
 | 
				
			||||||
 | 
							dl.peerMissingStates[id] = make(map[common.Hash]bool)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		genesis := hashes[len(hashes)-1]
 | 
							genesis := hashes[len(hashes)-1]
 | 
				
			||||||
		if header := headers[genesis]; header != nil {
 | 
							if header := headers[genesis]; header != nil {
 | 
				
			||||||
@@ -648,7 +652,9 @@ func (dl *downloadTester) peerGetNodeDataFn(id string, delay time.Duration) func
 | 
				
			|||||||
		results := make([][]byte, 0, len(hashes))
 | 
							results := make([][]byte, 0, len(hashes))
 | 
				
			||||||
		for _, hash := range hashes {
 | 
							for _, hash := range hashes {
 | 
				
			||||||
			if data, err := testdb.Get(hash.Bytes()); err == nil {
 | 
								if data, err := testdb.Get(hash.Bytes()); err == nil {
 | 
				
			||||||
				results = append(results, data)
 | 
									if !dl.peerMissingStates[id][hash] {
 | 
				
			||||||
 | 
										results = append(results, data)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		go dl.downloader.DeliverNodeData(id, results)
 | 
							go dl.downloader.DeliverNodeData(id, results)
 | 
				
			||||||
@@ -1288,7 +1294,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
 | 
				
			|||||||
	tester.newPeer("withhold-attack", protocol, hashes, headers, blocks, receipts)
 | 
						tester.newPeer("withhold-attack", protocol, hashes, headers, blocks, receipts)
 | 
				
			||||||
	missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1
 | 
						missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	tester.downloader.noFast = false
 | 
						tester.downloader.fsPivotFails = 0
 | 
				
			||||||
	tester.downloader.syncInitHook = func(uint64, uint64) {
 | 
						tester.downloader.syncInitHook = func(uint64, uint64) {
 | 
				
			||||||
		for i := missing; i <= len(hashes); i++ {
 | 
							for i := missing; i <= len(hashes); i++ {
 | 
				
			||||||
			delete(tester.peerHeaders["withhold-attack"], hashes[len(hashes)-i])
 | 
								delete(tester.peerHeaders["withhold-attack"], hashes[len(hashes)-i])
 | 
				
			||||||
@@ -1307,6 +1313,8 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
 | 
				
			|||||||
			t.Errorf("fast sync pivot block #%d not rolled back", head)
 | 
								t.Errorf("fast sync pivot block #%d not rolled back", head)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						tester.downloader.fsPivotFails = fsCriticalTrials
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Synchronise with the valid peer and make sure sync succeeds. Since the last
 | 
						// Synchronise with the valid peer and make sure sync succeeds. Since the last
 | 
				
			||||||
	// rollback should also disable fast syncing for this process, verify that we
 | 
						// rollback should also disable fast syncing for this process, verify that we
 | 
				
			||||||
	// did a fresh full sync. Note, we can't assert anything about the receipts
 | 
						// did a fresh full sync. Note, we can't assert anything about the receipts
 | 
				
			||||||
@@ -1749,3 +1757,41 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Tests that if fast sync aborts in the critical section, it can restart a few
 | 
				
			||||||
 | 
					// times before giving up.
 | 
				
			||||||
 | 
					func TestFastCriticalRestarts63(t *testing.T) { testFastCriticalRestarts(t, 63) }
 | 
				
			||||||
 | 
					func TestFastCriticalRestarts64(t *testing.T) { testFastCriticalRestarts(t, 64) }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func testFastCriticalRestarts(t *testing.T, protocol int) {
 | 
				
			||||||
 | 
						t.Parallel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Create a large enough blockchin to actually fast sync on
 | 
				
			||||||
 | 
						targetBlocks := fsMinFullBlocks + 2*fsPivotInterval - 15
 | 
				
			||||||
 | 
						hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Create a tester peer with the critical section state roots missing (force failures)
 | 
				
			||||||
 | 
						tester := newTester()
 | 
				
			||||||
 | 
						tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for i := 0; i < fsPivotInterval; i++ {
 | 
				
			||||||
 | 
							tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Synchronise with the peer a few times and make sure they fail until the retry limit
 | 
				
			||||||
 | 
						for i := 0; i < fsCriticalTrials; i++ {
 | 
				
			||||||
 | 
							// Attempt a sync and ensure it fails properly
 | 
				
			||||||
 | 
							if err := tester.sync("peer", nil, FastSync); err == nil {
 | 
				
			||||||
 | 
								t.Fatalf("failing fast sync succeeded: %v", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							// If it's the first failure, pivot should be locked => reenable all others to detect pivot changes
 | 
				
			||||||
 | 
							if i == 0 {
 | 
				
			||||||
 | 
								tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							time.Sleep(100 * time.Millisecond) // Make sure no in-flight requests remain
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Retry limit exhausted, downloader will switch to full sync, should succeed
 | 
				
			||||||
 | 
						if err := tester.sync("peer", nil, FastSync); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("failed to synchronise blocks in slow sync: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						assertOwnChain(t, tester, targetBlocks+1)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user