eth/downloader: implement beacon sync (#23982)

* eth/downloader: implement beacon sync

* eth/downloader: fix a crash if the beacon chain is reduced in length

* eth/downloader: fix beacon sync start/stop thrashing data race

* eth/downloader: use a non-nil pivot even in degenerate sync requests

* eth/downloader: don't touch internal state on beacon Head retrieval

* eth/downloader: fix spelling mistakes

* eth/downloader: fix some typos

* eth: integrate legacy/beacon sync switchover and UX

* eth: handle UX wise being stuck on post-merge TTD

* core, eth: integrate the beacon client with the beacon sync

* eth/catalyst: make some warning messages nicer

* eth/downloader: remove Ethereum 1&2 notions in favor of merge

* core/beacon, eth: clean up engine API returns a bit

* eth/downloader: add skeleton extension tests

* eth/catalyst: keep non-kiln spec, handle mining on ttd

* eth/downloader: add beacon header retrieval tests

* eth: fixed spelling, commented failing tests out

* eth/downloader: review fixes

* eth/downloader: drop peers failing to deliver beacon headers

* core/rawdb: track beacon sync data in db inspect

* eth: fix review concerns

* internal/web3ext: nit

Co-authored-by: Marius van der Wijden <m.vanderwijden@live.de>
This commit is contained in:
Péter Szilágyi
2022-03-11 14:14:45 +02:00
committed by GitHub
parent 1b58e42802
commit 8f66ea3786
22 changed files with 2918 additions and 303 deletions

View File

@ -0,0 +1,289 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// beaconBackfiller is the chain and state backfilling that can be commenced once
// the skeleton syncer has successfully reverse downloaded all the headers up to
// the genesis block or an existing header in the database. Its operation is fully
// directed by the skeleton sync's head/tail events.
type beaconBackfiller struct {
downloader *Downloader // Downloader to direct via this callback implementation
syncMode SyncMode // Sync mode to use for backfilling the skeleton chains
success func() // Callback to run on successful sync cycle completion
filling bool // Flag whether the downloader is backfilling or not
started chan struct{} // Notification channel whether the downloader inited
lock sync.Mutex // Mutex protecting the sync lock
}
// newBeaconBackfiller is a helper method to create the backfiller.
func newBeaconBackfiller(dl *Downloader, success func()) backfiller {
return &beaconBackfiller{
downloader: dl,
success: success,
}
}
// suspend cancels any background downloader threads.
func (b *beaconBackfiller) suspend() {
// If no filling is running, don't waste cycles
b.lock.Lock()
filling := b.filling
started := b.started
b.lock.Unlock()
if !filling {
return
}
// A previous filling should be running, though it may happen that it hasn't
// yet started (being done on a new goroutine). Many concurrent beacon head
// announcements can lead to sync start/stop thrashing. In that case we need
// to wait for initialization before we can safely cancel it. It is safe to
// read this channel multiple times, it gets closed on startup.
<-started
// Now that we're sure the downloader successfully started up, we can cancel
// it safely without running the risk of data races.
b.downloader.Cancel()
}
// resume starts the downloader threads for backfilling state and chain data.
func (b *beaconBackfiller) resume() {
b.lock.Lock()
if b.filling {
// If a previous filling cycle is still running, just ignore this start
// request. // TODO(karalabe): We should make this channel driven
b.lock.Unlock()
return
}
b.filling = true
b.started = make(chan struct{})
mode := b.syncMode
b.lock.Unlock()
// Start the backfilling on its own thread since the downloader does not have
// its own lifecycle runloop.
go func() {
// Set the backfiller to non-filling when download completes
defer func() {
b.lock.Lock()
b.filling = false
b.lock.Unlock()
}()
// If the downloader fails, report an error as in beacon chain mode there
// should be no errors as long as the chain we're syncing to is valid.
if err := b.downloader.synchronise("", common.Hash{}, nil, nil, mode, true, b.started); err != nil {
log.Error("Beacon backfilling failed", "err", err)
return
}
// Synchronization succeeded. Since this happens async, notify the outer
// context to disable snap syncing and enable transaction propagation.
if b.success != nil {
b.success()
}
}()
}
// setMode updates the sync mode from the current one to the requested one. If
// there's an active sync in progress, it will be cancelled and restarted.
func (b *beaconBackfiller) setMode(mode SyncMode) {
// Update the old sync mode and track if it was changed
b.lock.Lock()
updated := b.syncMode != mode
filling := b.filling
b.syncMode = mode
b.lock.Unlock()
// If the sync mode was changed mid-sync, restart. This should never ever
// really happen, we just handle it to detect programming errors.
if !updated || !filling {
return
}
log.Error("Downloader sync mode changed mid-run", "old", mode.String(), "new", mode.String())
b.suspend()
b.resume()
}
// BeaconSync is the post-merge version of the chain synchronization, where the
// chain is not downloaded from genesis onward, rather from trusted head announces
// backwards.
//
// Internally backfilling and state sync is done the same way, but the header
// retrieval and scheduling is replaced.
func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header) error {
return d.beaconSync(mode, head, true)
}
// BeaconExtend is an optimistic version of BeaconSync, where an attempt is made
// to extend the current beacon chain with a new header, but in case of a mismatch,
// the old sync will not be terminated and reorged, rather the new head is dropped.
//
// This is useful if a beacon client is feeding us large chunks of payloads to run,
// but is not setting the head after each.
func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error {
return d.beaconSync(mode, head, false)
}
// beaconSync is the post-merge version of the chain synchronization, where the
// chain is not downloaded from genesis onward, rather from trusted head announces
// backwards.
//
// Internally backfilling and state sync is done the same way, but the header
// retrieval and scheduling is replaced.
func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, force bool) error {
// When the downloader starts a sync cycle, it needs to be aware of the sync
// mode to use (full, snap). To keep the skeleton chain oblivious, inject the
// mode into the backfiller directly.
//
// Super crazy dangerous type cast. Should be fine (TM), we're only using a
// different backfiller implementation for skeleton tests.
d.skeleton.filler.(*beaconBackfiller).setMode(mode)
// Signal the skeleton sync to switch to a new head, however it wants
if err := d.skeleton.Sync(head, force); err != nil {
return err
}
return nil
}
// findBeaconAncestor tries to locate the common ancestor link of the local chain
// and the beacon chain just requested. In the general case when our node was in
// sync and on the correct chain, checking the top N links should already get us
// a match. In the rare scenario when we ended up on a long reorganisation (i.e.
// none of the head links match), we do a binary search to find the ancestor.
func (d *Downloader) findBeaconAncestor() uint64 {
// Figure out the current local head position
var chainHead *types.Header
switch d.getMode() {
case FullSync:
chainHead = d.blockchain.CurrentBlock().Header()
case SnapSync:
chainHead = d.blockchain.CurrentFastBlock().Header()
default:
chainHead = d.lightchain.CurrentHeader()
}
number := chainHead.Number.Uint64()
// If the head is present in the skeleton chain, return that
if chainHead.Hash() == d.skeleton.Header(number).Hash() {
return number
}
// Head header not present, binary search to find the ancestor
start, end := uint64(0), number
beaconHead, err := d.skeleton.Head()
if err != nil {
panic(fmt.Sprintf("failed to read skeleton head: %v", err)) // can't reach this method without a head
}
if number := beaconHead.Number.Uint64(); end > number {
// This shouldn't really happen in a healty network, but if the consensus
// clients feeds us a shorter chain as the canonical, we should not attempt
// to access non-existent skeleton items.
log.Warn("Beacon head lower than local chain", "beacon", number, "local", end)
end = number
}
for start+1 < end {
// Split our chain interval in two, and request the hash to cross check
check := (start + end) / 2
h := d.skeleton.Header(check)
n := h.Number.Uint64()
var known bool
switch d.getMode() {
case FullSync:
known = d.blockchain.HasBlock(h.Hash(), n)
case SnapSync:
known = d.blockchain.HasFastBlock(h.Hash(), n)
default:
known = d.lightchain.HasHeader(h.Hash(), n)
}
if !known {
end = check
continue
}
start = check
}
return start
}
// fetchBeaconHeaders feeds skeleton headers to the downloader queue for scheduling
// until sync errors or is finished.
func (d *Downloader) fetchBeaconHeaders(from uint64) error {
head, err := d.skeleton.Head()
if err != nil {
return err
}
for {
// Retrieve a batch of headers and feed it to the header processor
var (
headers = make([]*types.Header, 0, maxHeadersProcess)
hashes = make([]common.Hash, 0, maxHeadersProcess)
)
for i := 0; i < maxHeadersProcess && from <= head.Number.Uint64(); i++ {
headers = append(headers, d.skeleton.Header(from))
hashes = append(hashes, headers[i].Hash())
from++
}
if len(headers) > 0 {
log.Trace("Scheduling new beacon headers", "count", len(headers), "from", from-uint64(len(headers)))
select {
case d.headerProcCh <- &headerTask{
headers: headers,
hashes: hashes,
}:
case <-d.cancelCh:
return errCanceled
}
}
// If we still have headers to import, loop and keep pushing them
if from <= head.Number.Uint64() {
continue
}
// If the pivot block is committed, signal header sync termination
if atomic.LoadInt32(&d.committed) == 1 {
select {
case d.headerProcCh <- nil:
return nil
case <-d.cancelCh:
return errCanceled
}
}
// State sync still going, wait a bit for new headers and retry
log.Trace("Pivot not yet committed, waiting...")
select {
case <-time.After(fsHeaderContCheck):
case <-d.cancelCh:
return errCanceled
}
head, err = d.skeleton.Head()
if err != nil {
return err
}
}
}

View File

@ -30,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@ -79,6 +78,7 @@ var (
errCanceled = errors.New("syncing canceled (requested)")
errTooOld = errors.New("peer's protocol version too old")
errNoAncestorFound = errors.New("no common ancestor found")
ErrMergeTransition = errors.New("legacy sync reached the merge")
)
// peerDropFn is a callback type for dropping a peer detected as malicious.
@ -123,6 +123,9 @@ type Downloader struct {
// Channels
headerProcCh chan *headerTask // Channel to feed the header processor new tasks
// Skeleton sync
skeleton *skeleton // Header skeleton to backfill the chain with (eth2 mode)
// State sync
pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root
pivotLock sync.RWMutex // Lock protecting pivot header reads from updates
@ -201,7 +204,7 @@ type BlockChain interface {
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, success func()) *Downloader {
if lightchain == nil {
lightchain = chain
}
@ -219,6 +222,8 @@ func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain Bl
SnapSyncer: snap.NewSyncer(stateDb),
stateSyncStart: make(chan *stateSync),
}
dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success))
go dl.stateFetcher()
return dl
}
@ -318,10 +323,10 @@ func (d *Downloader) UnregisterPeer(id string) error {
return nil
}
// Synchronise tries to sync up our local block chain with a remote peer, both
// LegacySync tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
err := d.synchronise(id, head, td, mode)
func (d *Downloader) LegacySync(id string, head common.Hash, td, ttd *big.Int, mode SyncMode) error {
err := d.synchronise(id, head, td, ttd, mode, false, nil)
switch err {
case nil, errBusy, errCanceled:
@ -340,6 +345,9 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
}
return err
}
if errors.Is(err, ErrMergeTransition) {
return err // This is an expected fault, don't keep printing it in a spin-loop
}
log.Warn("Synchronisation failed, retrying", "err", err)
return err
}
@ -347,7 +355,21 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
// synchronise will select the peer and use it for synchronising. If an empty string is given
// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the
// checks fail an error will be returned. This method is synchronous
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int, mode SyncMode, beaconMode bool, beaconPing chan struct{}) error {
// The beacon header syncer is async. It will start this synchronization and
// will continue doing other tasks. However, if synchornization needs to be
// cancelled, the syncer needs to know if we reached the startup point (and
// inited the cancel cannel) or not yet. Make sure that we'll signal even in
// case of a failure.
if beaconPing != nil {
defer func() {
select {
case <-beaconPing: // already notified
default:
close(beaconPing) // weird exit condition, notify that it's safe to cancel (the nothing)
}
}()
}
// Mock out the synchronisation if testing
if d.synchroniseMock != nil {
return d.synchroniseMock(id, hash)
@ -362,9 +384,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
log.Info("Block synchronisation started")
}
// If snap sync was requested, create the snap scheduler and switch to snap
// sync mode. Long term we could drop snap sync or merge the two together,
// but until snap becomes prevalent, we should support both. TODO(karalabe).
if mode == SnapSync {
// Snap sync uses the snapshot namespace to store potentially flakey data until
// sync completely heals and finishes. Pause snapshot maintenance in the mean-
@ -402,11 +421,17 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
atomic.StoreUint32(&d.mode, uint32(mode))
// Retrieve the origin peer and initiate the downloading process
p := d.peers.Peer(id)
if p == nil {
return errUnknownPeer
var p *peerConnection
if !beaconMode { // Beacon mode doesn't need a peer to sync from
p = d.peers.Peer(id)
if p == nil {
return errUnknownPeer
}
}
return d.syncWithPeer(p, hash, td)
if beaconPing != nil {
close(beaconPing)
}
return d.syncWithPeer(p, hash, td, ttd, beaconMode)
}
func (d *Downloader) getMode() SyncMode {
@ -415,7 +440,7 @@ func (d *Downloader) getMode() SyncMode {
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *big.Int, beaconMode bool) (err error) {
d.mux.Post(StartEvent{})
defer func() {
// reset on error
@ -426,33 +451,54 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
d.mux.Post(DoneEvent{latest})
}
}()
if p.version < eth.ETH66 {
return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, eth.ETH66)
}
mode := d.getMode()
log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode)
if !beaconMode {
log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode)
} else {
log.Debug("Backfilling with the network", "mode", mode)
}
defer func(start time.Time) {
log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start)))
}(time.Now())
// Look up the sync boundaries: the common ancestor and the target block
latest, pivot, err := d.fetchHead(p)
if err != nil {
return err
var latest, pivot *types.Header
if !beaconMode {
// In legacy mode, use the master peer to retrieve the headers from
latest, pivot, err = d.fetchHead(p)
if err != nil {
return err
}
} else {
// In beacon mode, user the skeleton chain to retrieve the headers from
latest, err = d.skeleton.Head()
if err != nil {
return err
}
if latest.Number.Uint64() > uint64(fsMinFullBlocks) {
pivot = d.skeleton.Header(latest.Number.Uint64() - uint64(fsMinFullBlocks))
}
}
// If no pivot block was returned, the head is below the min full block
// threshold (i.e. new chain). In that case we won't really snap sync
// anyway, but still need a valid pivot block to avoid some code hitting
// nil panics on access.
if mode == SnapSync && pivot == nil {
// If no pivot block was returned, the head is below the min full block
// threshold (i.e. new chain). In that case we won't really snap sync
// anyway, but still need a valid pivot block to avoid some code hitting
// nil panics on an access.
pivot = d.blockchain.CurrentBlock().Header()
}
height := latest.Number.Uint64()
origin, err := d.findAncestor(p, latest)
if err != nil {
return err
var origin uint64
if !beaconMode {
// In legacy mode, reach out to the network and find the ancestor
origin, err = d.findAncestor(p, latest)
if err != nil {
return err
}
} else {
// In beacon mode, use the skeleton chain for the ancestor lookup
origin = d.findBeaconAncestor()
}
d.syncStatsLock.Lock()
if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
@ -523,11 +569,19 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
}
var headerFetcher func() error
if !beaconMode {
// In legacy mode, headers are retrieved from the network
headerFetcher = func() error { return d.fetchHeaders(p, origin+1, latest.Number.Uint64()) }
} else {
// In beacon mode, headers are served by the skeleton syncer
headerFetcher = func() error { return d.fetchBeaconHeaders(origin + 1) }
}
fetchers := []func() error{
func() error { return d.fetchHeaders(p, origin+1, latest.Number.Uint64()) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and snap sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during snap sync
func() error { return d.processHeaders(origin+1, td) },
headerFetcher, // Headers are always retrieved
func() error { return d.fetchBodies(origin+1, beaconMode) }, // Bodies are retrieved during normal and snap sync
func() error { return d.fetchReceipts(origin+1, beaconMode) }, // Receipts are retrieved during snap sync
func() error { return d.processHeaders(origin+1, td, ttd, beaconMode) },
}
if mode == SnapSync {
d.pivotLock.Lock()
@ -536,7 +590,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
fetchers = append(fetchers, func() error { return d.processSnapSyncContent() })
} else if mode == FullSync {
fetchers = append(fetchers, d.processFullSyncContent)
fetchers = append(fetchers, func() error { return d.processFullSyncContent(ttd, beaconMode) })
}
return d.spawnSync(fetchers)
}
@ -602,6 +656,9 @@ func (d *Downloader) Terminate() {
case <-d.quitCh:
default:
close(d.quitCh)
// Terminate the internal beacon syncer
d.skeleton.Terminate()
}
d.quitLock.Unlock()
@ -1127,7 +1184,7 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
log.Debug("Filling up skeleton", "from", from)
d.queue.ScheduleSkeleton(from, skeleton)
err := d.concurrentFetch((*headerQueue)(d))
err := d.concurrentFetch((*headerQueue)(d), false)
if err != nil {
log.Debug("Skeleton fill failed", "err", err)
}
@ -1141,9 +1198,9 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
// fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchBodies(from uint64) error {
func (d *Downloader) fetchBodies(from uint64, beaconMode bool) error {
log.Debug("Downloading block bodies", "origin", from)
err := d.concurrentFetch((*bodyQueue)(d))
err := d.concurrentFetch((*bodyQueue)(d), beaconMode)
log.Debug("Block body download terminated", "err", err)
return err
@ -1152,9 +1209,9 @@ func (d *Downloader) fetchBodies(from uint64) error {
// fetchReceipts iteratively downloads the scheduled block receipts, taking any
// available peers, reserving a chunk of receipts for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchReceipts(from uint64) error {
func (d *Downloader) fetchReceipts(from uint64, beaconMode bool) error {
log.Debug("Downloading receipts", "origin", from)
err := d.concurrentFetch((*receiptQueue)(d))
err := d.concurrentFetch((*receiptQueue)(d), beaconMode)
log.Debug("Receipt download terminated", "err", err)
return err
@ -1163,7 +1220,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode bool) error {
// Keep a count of uncertain headers to roll back
var (
rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis)
@ -1211,35 +1268,40 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
case <-d.cancelCh:
}
}
// If no headers were retrieved at all, the peer violated its TD promise that it had a
// better chain compared to ours. The only exception is if its promised blocks were
// already imported by other means (e.g. fetcher):
//
// R <remote peer>, L <local node>: Both at block 10
// R: Mine block 11, and propagate it to L
// L: Queue block 11 for import
// L: Notice that R's head and TD increased compared to ours, start sync
// L: Import of block 11 finishes
// L: Sync begins, and finds common ancestor at 11
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if mode != LightSync {
head := d.blockchain.CurrentBlock()
if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {
return errStallingPeer
// If we're in legacy sync mode, we need to check total difficulty
// violations from malicious peers. That is not needed in beacon
// mode and we can skip to terminating sync.
if !beaconMode {
// If no headers were retrieved at all, the peer violated its TD promise that it had a
// better chain compared to ours. The only exception is if its promised blocks were
// already imported by other means (e.g. fetcher):
//
// R <remote peer>, L <local node>: Both at block 10
// R: Mine block 11, and propagate it to L
// L: Queue block 11 for import
// L: Notice that R's head and TD increased compared to ours, start sync
// L: Import of block 11 finishes
// L: Sync begins, and finds common ancestor at 11
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if mode != LightSync {
head := d.blockchain.CurrentBlock()
if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {
return errStallingPeer
}
}
}
// If snap or light syncing, ensure promised headers are indeed delivered. This is
// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
// of delivering the post-pivot blocks that would flag the invalid content.
//
// This check cannot be executed "as is" for full imports, since blocks may still be
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if mode == SnapSync || mode == LightSync {
head := d.lightchain.CurrentHeader()
if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
// If snap or light syncing, ensure promised headers are indeed delivered. This is
// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
// of delivering the post-pivot blocks that would flag the invalid content.
//
// This check cannot be executed "as is" for full imports, since blocks may still be
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if mode == SnapSync || mode == LightSync {
head := d.lightchain.CurrentHeader()
if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
}
}
}
// Disable any rollback and return
@ -1281,6 +1343,37 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
if chunkHeaders[len(chunkHeaders)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
frequency = 1
}
// Although the received headers might be all valid, a legacy
// PoW/PoA sync must not accept post-merge headers. Make sure
// that any transition is rejected at this point.
var (
rejected []*types.Header
td *big.Int
)
if !beaconMode && ttd != nil {
td = d.blockchain.GetTd(chunkHeaders[0].ParentHash, chunkHeaders[0].Number.Uint64()-1)
if td == nil {
// This should never really happen, but handle gracefully for now
log.Error("Failed to retrieve parent header TD", "number", chunkHeaders[0].Number.Uint64()-1, "hash", chunkHeaders[0].ParentHash)
return fmt.Errorf("%w: parent TD missing", errInvalidChain)
}
for i, header := range chunkHeaders {
td = new(big.Int).Add(td, header.Difficulty)
if td.Cmp(ttd) >= 0 {
// Terminal total difficulty reached, allow the last header in
if new(big.Int).Sub(td, header.Difficulty).Cmp(ttd) < 0 {
chunkHeaders, rejected = chunkHeaders[:i+1], chunkHeaders[i+1:]
if len(rejected) > 0 {
// Make a nicer user log as to the first TD truly rejected
td = new(big.Int).Add(td, rejected[0].Difficulty)
}
} else {
chunkHeaders, rejected = chunkHeaders[:i], chunkHeaders[i:]
}
break
}
}
}
if n, err := d.lightchain.InsertHeaderChain(chunkHeaders, frequency); err != nil {
rollbackErr = err
@ -1300,6 +1393,13 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
rollback = 1
}
}
if len(rejected) != 0 {
// Merge threshold reached, stop importing, but don't roll back
rollback = 0
log.Info("Legacy sync reached merge threshold", "number", rejected[0].Number, "hash", rejected[0].Hash(), "td", td, "ttd", ttd)
return ErrMergeTransition
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if mode == FullSync || mode == SnapSync {
@ -1342,7 +1442,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
}
// processFullSyncContent takes fetch results from the queue and imports them into the chain.
func (d *Downloader) processFullSyncContent() error {
func (d *Downloader) processFullSyncContent(ttd *big.Int, beaconMode bool) error {
for {
results := d.queue.Results(true)
if len(results) == 0 {
@ -1351,9 +1451,44 @@ func (d *Downloader) processFullSyncContent() error {
if d.chainInsertHook != nil {
d.chainInsertHook(results)
}
// Although the received blocks might be all valid, a legacy PoW/PoA sync
// must not accept post-merge blocks. Make sure that pre-merge blocks are
// imported, but post-merge ones are rejected.
var (
rejected []*fetchResult
td *big.Int
)
if !beaconMode && ttd != nil {
td = d.blockchain.GetTd(results[0].Header.ParentHash, results[0].Header.Number.Uint64()-1)
if td == nil {
// This should never really happen, but handle gracefully for now
log.Error("Failed to retrieve parent block TD", "number", results[0].Header.Number.Uint64()-1, "hash", results[0].Header.ParentHash)
return fmt.Errorf("%w: parent TD missing", errInvalidChain)
}
for i, result := range results {
td = new(big.Int).Add(td, result.Header.Difficulty)
if td.Cmp(ttd) >= 0 {
// Terminal total difficulty reached, allow the last block in
if new(big.Int).Sub(td, result.Header.Difficulty).Cmp(ttd) < 0 {
results, rejected = results[:i+1], results[i+1:]
if len(rejected) > 0 {
// Make a nicer user log as to the first TD truly rejected
td = new(big.Int).Add(td, rejected[0].Header.Difficulty)
}
} else {
results, rejected = results[:i], results[i:]
}
break
}
}
}
if err := d.importBlockResults(results); err != nil {
return err
}
if len(rejected) != 0 {
log.Info("Legacy sync reached merge threshold", "number", rejected[0].Header.Number, "hash", rejected[0].Header.Hash(), "td", td, "ttd", ttd)
return ErrMergeTransition
}
}
}

View File

@ -75,7 +75,7 @@ func newTester() *downloadTester {
chain: chain,
peers: make(map[string]*downloadTesterPeer),
}
tester.downloader = New(0, db, new(event.TypeMux), tester.chain, nil, tester.dropPeer)
tester.downloader = New(0, db, new(event.TypeMux), tester.chain, nil, tester.dropPeer, nil)
return tester
}
@ -96,7 +96,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
td = dl.peers[id].chain.GetTd(head.Hash(), head.NumberU64())
}
// Synchronise with the chosen peer and ensure proper cleanup afterwards
err := dl.downloader.synchronise(id, head.Hash(), td, mode)
err := dl.downloader.synchronise(id, head.Hash(), td, nil, mode, false, nil)
select {
case <-dl.downloader.cancelCh:
// Ok, downloader fully cancelled after sync cycle
@ -971,7 +971,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol uint) {
// Simulate a synchronisation and check the required result
tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result }
tester.downloader.Synchronise(id, tester.chain.Genesis().Hash(), big.NewInt(1000), FullSync)
tester.downloader.LegacySync(id, tester.chain.Genesis().Hash(), big.NewInt(1000), nil, FullSync)
if _, ok := tester.peers[id]; !ok != tt.drop {
t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop)
}

View File

@ -76,7 +76,7 @@ type typedQueue interface {
// concurrentFetch iteratively downloads scheduled block parts, taking available
// peers, reserving a chunk of fetch requests for each and waiting for delivery
// or timeouts.
func (d *Downloader) concurrentFetch(queue typedQueue) error {
func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {
// Create a delivery channel to accept responses from all peers
responses := make(chan *eth.Response)
@ -127,7 +127,7 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
finished := false
for {
// Short circuit if we lost all our peers
if d.peers.Len() == 0 {
if d.peers.Len() == 0 && !beaconMode {
return errNoPeers
}
// If there's nothing more to fetch, wait or terminate
@ -209,7 +209,7 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
}
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 {
if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 && !beaconMode {
return errPeersUnavailable
}
}

View File

@ -294,19 +294,19 @@ func (ps *peerSet) AllPeers() []*peerConnection {
// peerCapacitySort implements sort.Interface.
// It sorts peer connections by capacity (descending).
type peerCapacitySort struct {
p []*peerConnection
tp []int
peers []*peerConnection
caps []int
}
func (ps *peerCapacitySort) Len() int {
return len(ps.p)
return len(ps.peers)
}
func (ps *peerCapacitySort) Less(i, j int) bool {
return ps.tp[i] > ps.tp[j]
return ps.caps[i] > ps.caps[j]
}
func (ps *peerCapacitySort) Swap(i, j int) {
ps.p[i], ps.p[j] = ps.p[j], ps.p[i]
ps.tp[i], ps.tp[j] = ps.tp[j], ps.tp[i]
ps.peers[i], ps.peers[j] = ps.peers[j], ps.peers[i]
ps.caps[i], ps.caps[j] = ps.caps[j], ps.caps[i]
}

1032
eth/downloader/skeleton.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,874 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"encoding/json"
"errors"
"fmt"
"math/big"
"os"
"sync/atomic"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/log"
)
// hookedBackfiller is a tester backfiller with all interface methods mocked and
// hooked so tests can implement only the things they need.
type hookedBackfiller struct {
// suspendHook is an optional hook to be called when the filler is requested
// to be suspended.
suspendHook func()
// resumeHook is an optional hook to be called when the filler is requested
// to be resumed.
resumeHook func()
}
// newHookedBackfiller creates a hooked backfiller with all callbacks disabled,
// essentially acting as a noop.
func newHookedBackfiller() backfiller {
return new(hookedBackfiller)
}
// suspend requests the backfiller to abort any running full or snap sync
// based on the skeleton chain as it might be invalid. The backfiller should
// gracefully handle multiple consecutive suspends without a resume, even
// on initial sartup.
func (hf *hookedBackfiller) suspend() {
if hf.suspendHook != nil {
hf.suspendHook()
}
}
// resume requests the backfiller to start running fill or snap sync based on
// the skeleton chain as it has successfully been linked. Appending new heads
// to the end of the chain will not result in suspend/resume cycles.
func (hf *hookedBackfiller) resume() {
if hf.resumeHook != nil {
hf.resumeHook()
}
}
// skeletonTestPeer is a mock peer that can only serve header requests from a
// pre-perated header chain (which may be arbitrarily wrong for testing).
//
// Requesting anything else from these peers will hard panic. Note, do *not*
// implement any other methods. We actually want to make sure that the skeleton
// syncer only depends on - and will only ever do so - on header requests.
type skeletonTestPeer struct {
id string // Unique identifier of the mock peer
headers []*types.Header // Headers to serve when requested
serve func(origin uint64) []*types.Header // Hook to allow custom responses
served uint64 // Number of headers served by this peer
dropped uint64 // Flag whether the peer was dropped (stop responding)
}
// newSkeletonTestPeer creates a new mock peer to test the skeleton sync with.
func newSkeletonTestPeer(id string, headers []*types.Header) *skeletonTestPeer {
return &skeletonTestPeer{
id: id,
headers: headers,
}
}
// newSkeletonTestPeer creates a new mock peer to test the skeleton sync with,
// and sets an optional serve hook that can return headers for delivery instead
// of the predefined chain. Useful for emulating malicious behavior that would
// otherwise require dedicated peer types.
func newSkeletonTestPeerWithHook(id string, headers []*types.Header, serve func(origin uint64) []*types.Header) *skeletonTestPeer {
return &skeletonTestPeer{
id: id,
headers: headers,
serve: serve,
}
}
// RequestHeadersByNumber constructs a GetBlockHeaders function based on a numbered
// origin; associated with a particular peer in the download tester. The returned
// function can be used to retrieve batches of headers from the particular peer.
func (p *skeletonTestPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool, sink chan *eth.Response) (*eth.Request, error) {
// Since skeleton test peer are in-memory mocks, dropping the does not make
// them inaccepssible. As such, check a local `dropped` field to see if the
// peer has been dropped and should not respond any more.
if atomic.LoadUint64(&p.dropped) != 0 {
return nil, errors.New("peer already dropped")
}
// Skeleton sync retrieves batches of headers going backward without gaps.
// This ensures we can follow a clean parent progression without any reorg
// hiccups. There is no need for any other type of header retrieval, so do
// panic if there's such a request.
if !reverse || skip != 0 {
// Note, if other clients want to do these kinds of requests, it's their
// problem, it will still work. We just don't want *us* making complicated
// requests without a very strong reason to.
panic(fmt.Sprintf("invalid header retrieval: reverse %v, want true; skip %d, want 0", reverse, skip))
}
// If the skeleton syncer requests the genesis block, panic. Whilst it could
// be considered a valid request, our code specifically should not request it
// ever since we want to link up headers to an existing local chain, which at
// worse will be the genesis.
if int64(origin)-int64(amount) < 0 {
panic(fmt.Sprintf("headers requested before (or at) genesis: origin %d, amount %d", origin, amount))
}
// To make concurrency easier, the skeleton syncer always requests fixed size
// batches of headers. Panic if the peer is requested an amount other than the
// configured batch size (apart from the request leading to the genesis).
if amount > requestHeaders || (amount < requestHeaders && origin > uint64(amount)) {
panic(fmt.Sprintf("non-chunk size header batch requested: requested %d, want %d, origin %d", amount, requestHeaders, origin))
}
// Simple reverse header retrieval. Fill from the peer's chain and return.
// If the tester has a serve hook set, try to use that before falling back
// to the default behavior.
var headers []*types.Header
if p.serve != nil {
headers = p.serve(origin)
}
if headers == nil {
headers = make([]*types.Header, 0, amount)
if len(p.headers) > int(origin) { // Don't serve headers if we're missing the origin
for i := 0; i < amount; i++ {
// Consider nil headers as a form of attack and withhold them. Nil
// cannot be decoded from RLP, so it's not possible to produce an
// attack by sending/receiving those over eth.
header := p.headers[int(origin)-i]
if header == nil {
continue
}
headers = append(headers, header)
}
}
}
atomic.AddUint64(&p.served, uint64(len(headers)))
hashes := make([]common.Hash, len(headers))
for i, header := range headers {
hashes[i] = header.Hash()
}
// Deliver the headers to the downloader
req := &eth.Request{
Peer: p.id,
}
res := &eth.Response{
Req: req,
Res: (*eth.BlockHeadersPacket)(&headers),
Meta: hashes,
Time: 1,
Done: make(chan error),
}
go func() {
sink <- res
if err := <-res.Done; err != nil {
log.Warn("Skeleton test peer response rejected", "err", err)
atomic.AddUint64(&p.dropped, 1)
}
}()
return req, nil
}
func (p *skeletonTestPeer) Head() (common.Hash, *big.Int) {
panic("skeleton sync must not request the remote head")
}
func (p *skeletonTestPeer) RequestHeadersByHash(common.Hash, int, int, bool, chan *eth.Response) (*eth.Request, error) {
panic("skeleton sync must not request headers by hash")
}
func (p *skeletonTestPeer) RequestBodies([]common.Hash, chan *eth.Response) (*eth.Request, error) {
panic("skeleton sync must not request block bodies")
}
func (p *skeletonTestPeer) RequestReceipts([]common.Hash, chan *eth.Response) (*eth.Request, error) {
panic("skeleton sync must not request receipts")
}
// Tests various sync initialzations based on previous leftovers in the database
// and announced heads.
func TestSkeletonSyncInit(t *testing.T) {
// Create a few key headers
var (
genesis = &types.Header{Number: big.NewInt(0)}
block49 = &types.Header{Number: big.NewInt(49)}
block49B = &types.Header{Number: big.NewInt(49), Extra: []byte("B")}
block50 = &types.Header{Number: big.NewInt(50), ParentHash: block49.Hash()}
)
tests := []struct {
headers []*types.Header // Database content (beside the genesis)
oldstate []*subchain // Old sync state with various interrupted subchains
head *types.Header // New head header to announce to reorg to
newstate []*subchain // Expected sync state after the reorg
}{
// Completely empty database with only the genesis set. The sync is expected
// to create a single subchain with the requested head.
{
head: block50,
newstate: []*subchain{{Head: 50, Tail: 50}},
},
// Empty database with only the genesis set with a leftover empty sync
// progess. This is a synthetic case, just for the sake of covering things.
{
oldstate: []*subchain{},
head: block50,
newstate: []*subchain{{Head: 50, Tail: 50}},
},
// A single leftover subchain is present, older than the new head. The
// old subchain should be left as is and a new one appended to the sync
// status.
{
oldstate: []*subchain{{Head: 10, Tail: 5}},
head: block50,
newstate: []*subchain{
{Head: 50, Tail: 50},
{Head: 10, Tail: 5},
},
},
// Multiple leftover subchains are present, older than the new head. The
// old subchains should be left as is and a new one appended to the sync
// status.
{
oldstate: []*subchain{
{Head: 20, Tail: 15},
{Head: 10, Tail: 5},
},
head: block50,
newstate: []*subchain{
{Head: 50, Tail: 50},
{Head: 20, Tail: 15},
{Head: 10, Tail: 5},
},
},
// A single leftover subchain is present, newer than the new head. The
// newer subchain should be deleted and a fresh one created for the head.
{
oldstate: []*subchain{{Head: 65, Tail: 60}},
head: block50,
newstate: []*subchain{{Head: 50, Tail: 50}},
},
// Multiple leftover subchain is present, newer than the new head. The
// newer subchains should be deleted and a fresh one created for the head.
{
oldstate: []*subchain{
{Head: 75, Tail: 70},
{Head: 65, Tail: 60},
},
head: block50,
newstate: []*subchain{{Head: 50, Tail: 50}},
},
// Two leftover subchains are present, one fully older and one fully
// newer than the announced head. The head should delete the newer one,
// keeping the older one.
{
oldstate: []*subchain{
{Head: 65, Tail: 60},
{Head: 10, Tail: 5},
},
head: block50,
newstate: []*subchain{
{Head: 50, Tail: 50},
{Head: 10, Tail: 5},
},
},
// Multiple leftover subchains are present, some fully older and some
// fully newer than the announced head. The head should delete the newer
// ones, keeping the older ones.
{
oldstate: []*subchain{
{Head: 75, Tail: 70},
{Head: 65, Tail: 60},
{Head: 20, Tail: 15},
{Head: 10, Tail: 5},
},
head: block50,
newstate: []*subchain{
{Head: 50, Tail: 50},
{Head: 20, Tail: 15},
{Head: 10, Tail: 5},
},
},
// A single leftover subchain is present and the new head is extending
// it with one more header. We expect the subchain head to be pushed
// forward.
{
headers: []*types.Header{block49},
oldstate: []*subchain{{Head: 49, Tail: 5}},
head: block50,
newstate: []*subchain{{Head: 50, Tail: 5}},
},
// A single leftover subchain is present and although the new head does
// extend it number wise, the hash chain does not link up. We expect a
// new subchain to be created for the dangling head.
{
headers: []*types.Header{block49B},
oldstate: []*subchain{{Head: 49, Tail: 5}},
head: block50,
newstate: []*subchain{
{Head: 50, Tail: 50},
{Head: 49, Tail: 5},
},
},
// A single leftover subchain is present. A new head is announced that
// links into the middle of it, correctly anchoring into an existing
// header. We expect the old subchain to be truncated and extended with
// the new head.
{
headers: []*types.Header{block49},
oldstate: []*subchain{{Head: 100, Tail: 5}},
head: block50,
newstate: []*subchain{{Head: 50, Tail: 5}},
},
// A single leftover subchain is present. A new head is announced that
// links into the middle of it, but does not anchor into an existing
// header. We expect the old subchain to be truncated and a new chain
// be created for the dangling head.
{
headers: []*types.Header{block49B},
oldstate: []*subchain{{Head: 100, Tail: 5}},
head: block50,
newstate: []*subchain{
{Head: 50, Tail: 50},
{Head: 49, Tail: 5},
},
},
}
for i, tt := range tests {
// Create a fresh database and initialize it with the starting state
db := rawdb.NewMemoryDatabase()
rawdb.WriteHeader(db, genesis)
for _, header := range tt.headers {
rawdb.WriteSkeletonHeader(db, header)
}
if tt.oldstate != nil {
blob, _ := json.Marshal(&skeletonProgress{Subchains: tt.oldstate})
rawdb.WriteSkeletonSyncStatus(db, blob)
}
// Create a skeleton sync and run a cycle
wait := make(chan struct{})
skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller())
skeleton.syncStarting = func() { close(wait) }
skeleton.Sync(tt.head, true)
<-wait
skeleton.Terminate()
// Ensure the correct resulting sync status
var progress skeletonProgress
json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress)
if len(progress.Subchains) != len(tt.newstate) {
t.Errorf("test %d: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.newstate))
continue
}
for j := 0; j < len(progress.Subchains); j++ {
if progress.Subchains[j].Head != tt.newstate[j].Head {
t.Errorf("test %d: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.newstate[j].Head)
}
if progress.Subchains[j].Tail != tt.newstate[j].Tail {
t.Errorf("test %d: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.newstate[j].Tail)
}
}
}
}
// Tests that a running skeleton sync can be extended with properly linked up
// headers but not with side chains.
func TestSkeletonSyncExtend(t *testing.T) {
// Create a few key headers
var (
genesis = &types.Header{Number: big.NewInt(0)}
block49 = &types.Header{Number: big.NewInt(49)}
block49B = &types.Header{Number: big.NewInt(49), Extra: []byte("B")}
block50 = &types.Header{Number: big.NewInt(50), ParentHash: block49.Hash()}
block51 = &types.Header{Number: big.NewInt(51), ParentHash: block50.Hash()}
)
tests := []struct {
head *types.Header // New head header to announce to reorg to
extend *types.Header // New head header to announce to extend with
newstate []*subchain // Expected sync state after the reorg
err error // Whether extension succeeds or not
}{
// Initialize a sync and try to extend it with a subsequent block.
{
head: block49,
extend: block50,
newstate: []*subchain{
{Head: 50, Tail: 49},
},
},
// Initialize a sync and try to extend it with the existing head block.
{
head: block49,
extend: block49,
newstate: []*subchain{
{Head: 49, Tail: 49},
},
err: errReorgDenied,
},
// Initialize a sync and try to extend it with a sibling block.
{
head: block49,
extend: block49B,
newstate: []*subchain{
{Head: 49, Tail: 49},
},
err: errReorgDenied,
},
// Initialize a sync and try to extend it with a number-wise sequential
// header, but a hash wise non-linking one.
{
head: block49B,
extend: block50,
newstate: []*subchain{
{Head: 49, Tail: 49},
},
err: errReorgDenied,
},
// Initialize a sync and try to extend it with a non-linking future block.
{
head: block49,
extend: block51,
newstate: []*subchain{
{Head: 49, Tail: 49},
},
err: errReorgDenied,
},
// Initialize a sync and try to extend it with a past canonical block.
{
head: block50,
extend: block49,
newstate: []*subchain{
{Head: 50, Tail: 50},
},
err: errReorgDenied,
},
// Initialize a sync and try to extend it with a past sidechain block.
{
head: block50,
extend: block49B,
newstate: []*subchain{
{Head: 50, Tail: 50},
},
err: errReorgDenied,
},
}
for i, tt := range tests {
// Create a fresh database and initialize it with the starting state
db := rawdb.NewMemoryDatabase()
rawdb.WriteHeader(db, genesis)
// Create a skeleton sync and run a cycle
wait := make(chan struct{})
skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller())
skeleton.syncStarting = func() { close(wait) }
skeleton.Sync(tt.head, true)
<-wait
if err := skeleton.Sync(tt.extend, false); err != tt.err {
t.Errorf("extension failure mismatch: have %v, want %v", err, tt.err)
}
skeleton.Terminate()
// Ensure the correct resulting sync status
var progress skeletonProgress
json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress)
if len(progress.Subchains) != len(tt.newstate) {
t.Errorf("test %d: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.newstate))
continue
}
for j := 0; j < len(progress.Subchains); j++ {
if progress.Subchains[j].Head != tt.newstate[j].Head {
t.Errorf("test %d: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.newstate[j].Head)
}
if progress.Subchains[j].Tail != tt.newstate[j].Tail {
t.Errorf("test %d: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.newstate[j].Tail)
}
}
}
}
// Tests that the skeleton sync correctly retrieves headers from one or more
// peers without duplicates or other strange side effects.
func TestSkeletonSyncRetrievals(t *testing.T) {
log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
// Since skeleton headers don't need to be meaningful, beyond a parent hash
// progression, create a long fake chain to test with.
chain := []*types.Header{{Number: big.NewInt(0)}}
for i := 1; i < 10000; i++ {
chain = append(chain, &types.Header{
ParentHash: chain[i-1].Hash(),
Number: big.NewInt(int64(i)),
})
}
tests := []struct {
headers []*types.Header // Database content (beside the genesis)
oldstate []*subchain // Old sync state with various interrupted subchains
head *types.Header // New head header to announce to reorg to
peers []*skeletonTestPeer // Initial peer set to start the sync with
midstate []*subchain // Expected sync state after initial cycle
midserve uint64 // Expected number of header retrievals after initial cycle
middrop uint64 // Expectd number of peers dropped after initial cycle
newHead *types.Header // New header to annount on top of the old one
newPeer *skeletonTestPeer // New peer to join the skeleton syncer
endstate []*subchain // Expected sync state after the post-init event
endserve uint64 // Expected number of header retrievals after the post-init event
enddrop uint64 // Expectd number of peers dropped after the post-init event
}{
// Completely empty database with only the genesis set. The sync is expected
// to create a single subchain with the requested head. No peers however, so
// the sync should be stuck without any progression.
//
// When a new peer is added, it should detect the join and fill the headers
// to the genesis block.
{
head: chain[len(chain)-1],
midstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: uint64(len(chain) - 1)}},
newPeer: newSkeletonTestPeer("test-peer", chain),
endstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}},
endserve: uint64(len(chain) - 2), // len - head - genesis
},
// Completely empty database with only the genesis set. The sync is expected
// to create a single subchain with the requested head. With one valid peer,
// the sync is expected to complete already in the initial round.
//
// Adding a second peer should not have any effect.
{
head: chain[len(chain)-1],
peers: []*skeletonTestPeer{newSkeletonTestPeer("test-peer-1", chain)},
midstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}},
midserve: uint64(len(chain) - 2), // len - head - genesis
newPeer: newSkeletonTestPeer("test-peer-2", chain),
endstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}},
endserve: uint64(len(chain) - 2), // len - head - genesis
},
// Completely empty database with only the genesis set. The sync is expected
// to create a single subchain with the requested head. With many valid peers,
// the sync is expected to complete already in the initial round.
//
// Adding a new peer should not have any effect.
{
head: chain[len(chain)-1],
peers: []*skeletonTestPeer{
newSkeletonTestPeer("test-peer-1", chain),
newSkeletonTestPeer("test-peer-2", chain),
newSkeletonTestPeer("test-peer-3", chain),
},
midstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}},
midserve: uint64(len(chain) - 2), // len - head - genesis
newPeer: newSkeletonTestPeer("test-peer-4", chain),
endstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}},
endserve: uint64(len(chain) - 2), // len - head - genesis
},
// This test checks if a peer tries to withhold a header - *on* the sync
// boundary - instead of sending the requested amount. The malicious short
// package should not be accepted.
//
// Joining with a new peer should however unblock the sync.
{
head: chain[requestHeaders+100],
peers: []*skeletonTestPeer{
newSkeletonTestPeer("header-skipper", append(append(append([]*types.Header{}, chain[:99]...), nil), chain[100:]...)),
},
midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}},
midserve: requestHeaders + 101 - 3, // len - head - genesis - missing
middrop: 1, // penalize shortened header deliveries
newPeer: newSkeletonTestPeer("good-peer", chain),
endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}},
endserve: (requestHeaders + 101 - 3) + (100 - 1), // midserve + lenrest - genesis
enddrop: 1, // no new drops
},
// This test checks if a peer tries to withhold a header - *off* the sync
// boundary - instead of sending the requested amount. The malicious short
// package should not be accepted.
//
// Joining with a new peer should however unblock the sync.
{
head: chain[requestHeaders+100],
peers: []*skeletonTestPeer{
newSkeletonTestPeer("header-skipper", append(append(append([]*types.Header{}, chain[:50]...), nil), chain[51:]...)),
},
midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}},
midserve: requestHeaders + 101 - 3, // len - head - genesis - missing
middrop: 1, // penalize shortened header deliveries
newPeer: newSkeletonTestPeer("good-peer", chain),
endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}},
endserve: (requestHeaders + 101 - 3) + (100 - 1), // midserve + lenrest - genesis
enddrop: 1, // no new drops
},
// This test checks if a peer tries to duplicate a header - *on* the sync
// boundary - instead of sending the correct sequence. The malicious duped
// package should not be accepted.
//
// Joining with a new peer should however unblock the sync.
{
head: chain[requestHeaders+100], // We want to force the 100th header to be a request boundary
peers: []*skeletonTestPeer{
newSkeletonTestPeer("header-duper", append(append(append([]*types.Header{}, chain[:99]...), chain[98]), chain[100:]...)),
},
midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}},
midserve: requestHeaders + 101 - 2, // len - head - genesis
middrop: 1, // penalize invalid header sequences
newPeer: newSkeletonTestPeer("good-peer", chain),
endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}},
endserve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis
enddrop: 1, // no new drops
},
// This test checks if a peer tries to duplicate a header - *off* the sync
// boundary - instead of sending the correct sequence. The malicious duped
// package should not be accepted.
//
// Joining with a new peer should however unblock the sync.
{
head: chain[requestHeaders+100], // We want to force the 100th header to be a request boundary
peers: []*skeletonTestPeer{
newSkeletonTestPeer("header-duper", append(append(append([]*types.Header{}, chain[:50]...), chain[49]), chain[51:]...)),
},
midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}},
midserve: requestHeaders + 101 - 2, // len - head - genesis
middrop: 1, // penalize invalid header sequences
newPeer: newSkeletonTestPeer("good-peer", chain),
endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}},
endserve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis
enddrop: 1, // no new drops
},
// This test checks if a peer tries to inject a different header - *on*
// the sync boundary - instead of sending the correct sequence. The bad
// package should not be accepted.
//
// Joining with a new peer should however unblock the sync.
{
head: chain[requestHeaders+100], // We want to force the 100th header to be a request boundary
peers: []*skeletonTestPeer{
newSkeletonTestPeer("header-changer",
append(
append(
append([]*types.Header{}, chain[:99]...),
&types.Header{
ParentHash: chain[98].Hash(),
Number: big.NewInt(int64(99)),
GasLimit: 1,
},
), chain[100:]...,
),
),
},
midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}},
midserve: requestHeaders + 101 - 2, // len - head - genesis
middrop: 1, // different set of headers, drop // TODO(karalabe): maybe just diff sync?
newPeer: newSkeletonTestPeer("good-peer", chain),
endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}},
endserve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis
enddrop: 1, // no new drops
},
// This test checks if a peer tries to inject a different header - *off*
// the sync boundary - instead of sending the correct sequence. The bad
// package should not be accepted.
//
// Joining with a new peer should however unblock the sync.
{
head: chain[requestHeaders+100], // We want to force the 100th header to be a request boundary
peers: []*skeletonTestPeer{
newSkeletonTestPeer("header-changer",
append(
append(
append([]*types.Header{}, chain[:50]...),
&types.Header{
ParentHash: chain[49].Hash(),
Number: big.NewInt(int64(50)),
GasLimit: 1,
},
), chain[51:]...,
),
),
},
midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}},
midserve: requestHeaders + 101 - 2, // len - head - genesis
middrop: 1, // different set of headers, drop
newPeer: newSkeletonTestPeer("good-peer", chain),
endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}},
endserve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis
enddrop: 1, // no new drops
},
// This test reproduces a bug caught during review (kudos to @holiman)
// where a subchain is merged with a previously interrupted one, causing
// pending data in the scratch space to become "invalid" (since we jump
// ahead during subchain merge). In that case it is expected to ignore
// the queued up data instead of trying to process on top of a shifted
// task set.
//
// The test is a bit convoluted since it needs to trigger a concurrency
// issue. First we sync up an initial chain of 2x512 items. Then announce
// 2x512+2 as head and delay delivering the head batch to fill the scratch
// space first. The delivery head should merge with the previous download
// and the scratch space must not be consumed further.
{
head: chain[2*requestHeaders],
peers: []*skeletonTestPeer{
newSkeletonTestPeerWithHook("peer-1", chain, func(origin uint64) []*types.Header {
if origin == chain[2*requestHeaders+2].Number.Uint64() {
time.Sleep(100 * time.Millisecond)
}
return nil // Fallback to default behavior, just delayed
}),
newSkeletonTestPeerWithHook("peer-2", chain, func(origin uint64) []*types.Header {
if origin == chain[2*requestHeaders+2].Number.Uint64() {
time.Sleep(100 * time.Millisecond)
}
return nil // Fallback to default behavior, just delayed
}),
},
midstate: []*subchain{{Head: 2 * requestHeaders, Tail: 1}},
midserve: 2*requestHeaders - 1, // len - head - genesis
newHead: chain[2*requestHeaders+2],
endstate: []*subchain{{Head: 2*requestHeaders + 2, Tail: 1}},
endserve: 4 * requestHeaders,
},
}
for i, tt := range tests {
// Create a fresh database and initialize it with the starting state
db := rawdb.NewMemoryDatabase()
rawdb.WriteHeader(db, chain[0])
// Create a peer set to feed headers through
peerset := newPeerSet()
for _, peer := range tt.peers {
peerset.Register(newPeerConnection(peer.id, eth.ETH66, peer, log.New("id", peer.id)))
}
// Create a peer dropper to track malicious peers
dropped := make(map[string]int)
drop := func(peer string) {
if p := peerset.Peer(peer); p != nil {
atomic.AddUint64(&p.peer.(*skeletonTestPeer).dropped, 1)
}
peerset.Unregister(peer)
dropped[peer]++
}
// Create a skeleton sync and run a cycle
skeleton := newSkeleton(db, peerset, drop, newHookedBackfiller())
skeleton.Sync(tt.head, true)
// Wait a bit (bleah) for the initial sync loop to go to idle. This might
// be either a finish or a never-start hence why there's no event to hook.
time.Sleep(250 * time.Millisecond)
// Check the post-init mid state if it matches the required results
var progress skeletonProgress
json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress)
if len(progress.Subchains) != len(tt.midstate) {
t.Errorf("test %d, mid state: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.midstate))
continue
}
for j := 0; j < len(progress.Subchains); j++ {
if progress.Subchains[j].Head != tt.midstate[j].Head {
t.Errorf("test %d, mid state: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.midstate[j].Head)
}
if progress.Subchains[j].Tail != tt.midstate[j].Tail {
t.Errorf("test %d, mid state: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.midstate[j].Tail)
}
}
var served uint64
for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served)
}
if served != tt.midserve {
t.Errorf("test %d, mid state: served headers mismatch: have %d, want %d", i, served, tt.midserve)
}
var drops uint64
for _, peer := range tt.peers {
drops += atomic.LoadUint64(&peer.dropped)
}
if drops != tt.middrop {
t.Errorf("test %d, mid state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)
}
// Apply the post-init events if there's any
if tt.newHead != nil {
skeleton.Sync(tt.newHead, true)
}
if tt.newPeer != nil {
if err := peerset.Register(newPeerConnection(tt.newPeer.id, eth.ETH66, tt.newPeer, log.New("id", tt.newPeer.id))); err != nil {
t.Errorf("test %d: failed to register new peer: %v", i, err)
}
}
// Wait a bit (bleah) for the second sync loop to go to idle. This might
// be either a finish or a never-start hence why there's no event to hook.
time.Sleep(250 * time.Millisecond)
// Check the post-init mid state if it matches the required results
json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress)
if len(progress.Subchains) != len(tt.endstate) {
t.Errorf("test %d, end state: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.endstate))
continue
}
for j := 0; j < len(progress.Subchains); j++ {
if progress.Subchains[j].Head != tt.endstate[j].Head {
t.Errorf("test %d, end state: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.endstate[j].Head)
}
if progress.Subchains[j].Tail != tt.endstate[j].Tail {
t.Errorf("test %d, end state: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.endstate[j].Tail)
}
}
// Check that the peers served no more headers than we actually needed
served = 0
for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served)
}
if tt.newPeer != nil {
served += atomic.LoadUint64(&tt.newPeer.served)
}
if served != tt.endserve {
t.Errorf("test %d, end state: served headers mismatch: have %d, want %d", i, served, tt.endserve)
}
drops = 0
for _, peer := range tt.peers {
drops += atomic.LoadUint64(&peer.dropped)
}
if tt.newPeer != nil {
drops += atomic.LoadUint64(&tt.newPeer.dropped)
}
if drops != tt.middrop {
t.Errorf("test %d, end state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)
}
// Clean up any leftover skeleton sync resources
skeleton.Terminate()
}
}