core, eth: split eth package, implement snap protocol (#21482)

This commit splits the eth package, separating the handling of eth and snap protocols. It also includes the capability to run snap sync (https://github.com/ethereum/devp2p/blob/master/caps/snap.md) , but does not enable it by default. 

Co-authored-by: Marius van der Wijden <m.vanderwijden@live.de>
Co-authored-by: Martin Holst Swende <martin@swende.se>
This commit is contained in:
Péter Szilágyi
2020-12-14 11:27:15 +02:00
committed by GitHub
parent 00d10e610f
commit 017831dd5b
74 changed files with 8246 additions and 3411 deletions

View File

@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
@ -38,7 +39,6 @@ import (
)
var (
MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request
MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
@ -89,7 +89,7 @@ var (
errCancelContentProcessing = errors.New("content processing canceled (requested)")
errCanceled = errors.New("syncing canceled (requested)")
errNoSyncActive = errors.New("no sync active")
errTooOld = errors.New("peer doesn't speak recent enough protocol version (need version >= 63)")
errTooOld = errors.New("peer doesn't speak recent enough protocol version (need version >= 64)")
)
type Downloader struct {
@ -131,20 +131,22 @@ type Downloader struct {
ancientLimit uint64 // The maximum block number which can be regarded as ancient data.
// Channels
headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
headerCh chan dataPack // Channel receiving inbound block headers
bodyCh chan dataPack // Channel receiving inbound block bodies
receiptCh chan dataPack // Channel receiving inbound receipts
bodyWakeCh chan bool // Channel to signal the block body fetcher of new tasks
receiptWakeCh chan bool // Channel to signal the receipt fetcher of new tasks
headerProcCh chan []*types.Header // Channel to feed the header processor new tasks
// State sync
pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root
pivotLock sync.RWMutex // Lock protecting pivot header reads from updates
snapSync bool // Whether to run state sync over the snap protocol
SnapSyncer *snap.Syncer // TODO(karalabe): make private! hack for now
stateSyncStart chan *stateSync
trackStateReq chan *stateReq
stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
stateCh chan dataPack // Channel receiving inbound node state data
// Cancellation and termination
cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
@ -237,6 +239,7 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
headerProcCh: make(chan []*types.Header, 1),
quitCh: make(chan struct{}),
stateCh: make(chan dataPack),
SnapSyncer: snap.NewSyncer(stateDb, stateBloom),
stateSyncStart: make(chan *stateSync),
syncStatsState: stateSyncStats{
processed: rawdb.ReadFastTrieProgress(stateDb),
@ -286,19 +289,16 @@ func (d *Downloader) Synchronising() bool {
return atomic.LoadInt32(&d.synchronising) > 0
}
// SyncBloomContains tests if the syncbloom filter contains the given hash:
// - false: the bloom definitely does not contain hash
// - true: the bloom maybe contains hash
//
// While the bloom is being initialized (or is closed), all queries will return true.
func (d *Downloader) SyncBloomContains(hash []byte) bool {
return d.stateBloom == nil || d.stateBloom.Contains(hash)
}
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error {
logger := log.New("peer", id)
func (d *Downloader) RegisterPeer(id string, version uint, peer Peer) error {
var logger log.Logger
if len(id) < 16 {
// Tests use short IDs, don't choke on them
logger = log.New("peer", id)
} else {
logger = log.New("peer", id[:16])
}
logger.Trace("Registering sync peer")
if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil {
logger.Error("Failed to register sync peer", "err", err)
@ -310,7 +310,7 @@ func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error {
}
// RegisterLightPeer injects a light client peer, wrapping it so it appears as a regular peer.
func (d *Downloader) RegisterLightPeer(id string, version int, peer LightPeer) error {
func (d *Downloader) RegisterLightPeer(id string, version uint, peer LightPeer) error {
return d.RegisterPeer(id, version, &lightPeerWrapper{peer})
}
@ -319,7 +319,13 @@ func (d *Downloader) RegisterLightPeer(id string, version int, peer LightPeer) e
// the queue.
func (d *Downloader) UnregisterPeer(id string) error {
// Unregister the peer from the active peer set and revoke any fetch tasks
logger := log.New("peer", id)
var logger log.Logger
if len(id) < 16 {
// Tests use short IDs, don't choke on them
logger = log.New("peer", id)
} else {
logger = log.New("peer", id[:16])
}
logger.Trace("Unregistering sync peer")
if err := d.peers.Unregister(id); err != nil {
logger.Error("Failed to unregister sync peer", "err", err)
@ -381,6 +387,16 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
if mode == FullSync && d.stateBloom != nil {
d.stateBloom.Close()
}
// If snap sync was requested, create the snap scheduler and switch to fast
// sync mode. Long term we could drop fast sync or merge the two together,
// but until snap becomes prevalent, we should support both. TODO(karalabe).
if mode == SnapSync {
if !d.snapSync {
log.Warn("Enabling snapshot sync prototype")
d.snapSync = true
}
mode = FastSync
}
// Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems)
d.peers.Reset()
@ -443,8 +459,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
d.mux.Post(DoneEvent{latest})
}
}()
if p.version < 63 {
return errTooOld
if p.version < 64 {
return fmt.Errorf("%w, peer version: %d", errTooOld, p.version)
}
mode := d.getMode()
@ -1910,27 +1926,53 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
// DeliverHeaders injects a new batch of block headers received from a remote
// node into the download schedule.
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) error {
return d.deliver(d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
}
// DeliverBodies injects a new batch of block bodies received from a remote node.
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) error {
return d.deliver(d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
}
// DeliverReceipts injects a new batch of receipts received from a remote node.
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) error {
return d.deliver(d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
}
// DeliverNodeData injects a new batch of node state data received from a remote node.
func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
func (d *Downloader) DeliverNodeData(id string, data [][]byte) error {
return d.deliver(d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
}
// DeliverSnapPacket is invoked from a peer's message handler when it transmits a
// data packet for the local node to consume.
func (d *Downloader) DeliverSnapPacket(peer *snap.Peer, packet snap.Packet) error {
switch packet := packet.(type) {
case *snap.AccountRangePacket:
hashes, accounts, err := packet.Unpack()
if err != nil {
return err
}
return d.SnapSyncer.OnAccounts(peer, packet.ID, hashes, accounts, packet.Proof)
case *snap.StorageRangesPacket:
hashset, slotset := packet.Unpack()
return d.SnapSyncer.OnStorage(peer, packet.ID, hashset, slotset, packet.Proof)
case *snap.ByteCodesPacket:
return d.SnapSyncer.OnByteCodes(peer, packet.ID, packet.Codes)
case *snap.TrieNodesPacket:
return d.SnapSyncer.OnTrieNodes(peer, packet.ID, packet.Nodes)
default:
return fmt.Errorf("unexpected snap packet type: %T", packet)
}
}
// deliver injects a new batch of data received from a remote node.
func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
func (d *Downloader) deliver(destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
// Update the delivery metrics for both good and failed deliveries
inMeter.Mark(int64(packet.Items()))
defer func() {

View File

@ -390,7 +390,7 @@ func (dl *downloadTester) Rollback(hashes []common.Hash) {
}
// newPeer registers a new block download source into the downloader.
func (dl *downloadTester) newPeer(id string, version int, chain *testChain) error {
func (dl *downloadTester) newPeer(id string, version uint, chain *testChain) error {
dl.lock.Lock()
defer dl.lock.Unlock()
@ -518,8 +518,6 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng
// Tests that simple synchronization against a canonical chain works correctly.
// In this test common ancestor lookup should be short circuited and not require
// binary searching.
func TestCanonicalSynchronisation63Full(t *testing.T) { testCanonicalSynchronisation(t, 63, FullSync) }
func TestCanonicalSynchronisation63Fast(t *testing.T) { testCanonicalSynchronisation(t, 63, FastSync) }
func TestCanonicalSynchronisation64Full(t *testing.T) { testCanonicalSynchronisation(t, 64, FullSync) }
func TestCanonicalSynchronisation64Fast(t *testing.T) { testCanonicalSynchronisation(t, 64, FastSync) }
func TestCanonicalSynchronisation65Full(t *testing.T) { testCanonicalSynchronisation(t, 65, FullSync) }
@ -528,7 +526,7 @@ func TestCanonicalSynchronisation65Light(t *testing.T) {
testCanonicalSynchronisation(t, 65, LightSync)
}
func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
func testCanonicalSynchronisation(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
@ -547,14 +545,12 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
// Tests that if a large batch of blocks are being downloaded, it is throttled
// until the cached blocks are retrieved.
func TestThrottling63Full(t *testing.T) { testThrottling(t, 63, FullSync) }
func TestThrottling63Fast(t *testing.T) { testThrottling(t, 63, FastSync) }
func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) }
func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
func TestThrottling65Full(t *testing.T) { testThrottling(t, 65, FullSync) }
func TestThrottling65Fast(t *testing.T) { testThrottling(t, 65, FastSync) }
func testThrottling(t *testing.T, protocol int, mode SyncMode) {
func testThrottling(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
@ -632,15 +628,13 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
// Tests that simple synchronization against a forked chain works correctly. In
// this test common ancestor lookup should *not* be short circuited, and a full
// binary search should be executed.
func TestForkedSync63Full(t *testing.T) { testForkedSync(t, 63, FullSync) }
func TestForkedSync63Fast(t *testing.T) { testForkedSync(t, 63, FastSync) }
func TestForkedSync64Full(t *testing.T) { testForkedSync(t, 64, FullSync) }
func TestForkedSync64Fast(t *testing.T) { testForkedSync(t, 64, FastSync) }
func TestForkedSync65Full(t *testing.T) { testForkedSync(t, 65, FullSync) }
func TestForkedSync65Fast(t *testing.T) { testForkedSync(t, 65, FastSync) }
func TestForkedSync65Light(t *testing.T) { testForkedSync(t, 65, LightSync) }
func testForkedSync(t *testing.T, protocol int, mode SyncMode) {
func testForkedSync(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
@ -665,15 +659,13 @@ func testForkedSync(t *testing.T, protocol int, mode SyncMode) {
// Tests that synchronising against a much shorter but much heavyer fork works
// corrently and is not dropped.
func TestHeavyForkedSync63Full(t *testing.T) { testHeavyForkedSync(t, 63, FullSync) }
func TestHeavyForkedSync63Fast(t *testing.T) { testHeavyForkedSync(t, 63, FastSync) }
func TestHeavyForkedSync64Full(t *testing.T) { testHeavyForkedSync(t, 64, FullSync) }
func TestHeavyForkedSync64Fast(t *testing.T) { testHeavyForkedSync(t, 64, FastSync) }
func TestHeavyForkedSync65Full(t *testing.T) { testHeavyForkedSync(t, 65, FullSync) }
func TestHeavyForkedSync65Fast(t *testing.T) { testHeavyForkedSync(t, 65, FastSync) }
func TestHeavyForkedSync65Light(t *testing.T) { testHeavyForkedSync(t, 65, LightSync) }
func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
func testHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
@ -700,15 +692,13 @@ func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
// Tests that chain forks are contained within a certain interval of the current
// chain head, ensuring that malicious peers cannot waste resources by feeding
// long dead chains.
func TestBoundedForkedSync63Full(t *testing.T) { testBoundedForkedSync(t, 63, FullSync) }
func TestBoundedForkedSync63Fast(t *testing.T) { testBoundedForkedSync(t, 63, FastSync) }
func TestBoundedForkedSync64Full(t *testing.T) { testBoundedForkedSync(t, 64, FullSync) }
func TestBoundedForkedSync64Fast(t *testing.T) { testBoundedForkedSync(t, 64, FastSync) }
func TestBoundedForkedSync65Full(t *testing.T) { testBoundedForkedSync(t, 65, FullSync) }
func TestBoundedForkedSync65Fast(t *testing.T) { testBoundedForkedSync(t, 65, FastSync) }
func TestBoundedForkedSync65Light(t *testing.T) { testBoundedForkedSync(t, 65, LightSync) }
func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) {
func testBoundedForkedSync(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
@ -734,15 +724,13 @@ func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) {
// Tests that chain forks are contained within a certain interval of the current
// chain head for short but heavy forks too. These are a bit special because they
// take different ancestor lookup paths.
func TestBoundedHeavyForkedSync63Full(t *testing.T) { testBoundedHeavyForkedSync(t, 63, FullSync) }
func TestBoundedHeavyForkedSync63Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 63, FastSync) }
func TestBoundedHeavyForkedSync64Full(t *testing.T) { testBoundedHeavyForkedSync(t, 64, FullSync) }
func TestBoundedHeavyForkedSync64Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 64, FastSync) }
func TestBoundedHeavyForkedSync65Full(t *testing.T) { testBoundedHeavyForkedSync(t, 65, FullSync) }
func TestBoundedHeavyForkedSync65Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 65, FastSync) }
func TestBoundedHeavyForkedSync65Light(t *testing.T) { testBoundedHeavyForkedSync(t, 65, LightSync) }
func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
func testBoundedHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
@ -786,15 +774,13 @@ func TestInactiveDownloader63(t *testing.T) {
}
// Tests that a canceled download wipes all previously accumulated state.
func TestCancel63Full(t *testing.T) { testCancel(t, 63, FullSync) }
func TestCancel63Fast(t *testing.T) { testCancel(t, 63, FastSync) }
func TestCancel64Full(t *testing.T) { testCancel(t, 64, FullSync) }
func TestCancel64Fast(t *testing.T) { testCancel(t, 64, FastSync) }
func TestCancel65Full(t *testing.T) { testCancel(t, 65, FullSync) }
func TestCancel65Fast(t *testing.T) { testCancel(t, 65, FastSync) }
func TestCancel65Light(t *testing.T) { testCancel(t, 65, LightSync) }
func testCancel(t *testing.T, protocol int, mode SyncMode) {
func testCancel(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
@ -819,15 +805,13 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) {
}
// Tests that synchronisation from multiple peers works as intended (multi thread sanity test).
func TestMultiSynchronisation63Full(t *testing.T) { testMultiSynchronisation(t, 63, FullSync) }
func TestMultiSynchronisation63Fast(t *testing.T) { testMultiSynchronisation(t, 63, FastSync) }
func TestMultiSynchronisation64Full(t *testing.T) { testMultiSynchronisation(t, 64, FullSync) }
func TestMultiSynchronisation64Fast(t *testing.T) { testMultiSynchronisation(t, 64, FastSync) }
func TestMultiSynchronisation65Full(t *testing.T) { testMultiSynchronisation(t, 65, FullSync) }
func TestMultiSynchronisation65Fast(t *testing.T) { testMultiSynchronisation(t, 65, FastSync) }
func TestMultiSynchronisation65Light(t *testing.T) { testMultiSynchronisation(t, 65, LightSync) }
func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
func testMultiSynchronisation(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
@ -849,15 +833,13 @@ func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
// Tests that synchronisations behave well in multi-version protocol environments
// and not wreak havoc on other nodes in the network.
func TestMultiProtoSynchronisation63Full(t *testing.T) { testMultiProtoSync(t, 63, FullSync) }
func TestMultiProtoSynchronisation63Fast(t *testing.T) { testMultiProtoSync(t, 63, FastSync) }
func TestMultiProtoSynchronisation64Full(t *testing.T) { testMultiProtoSync(t, 64, FullSync) }
func TestMultiProtoSynchronisation64Fast(t *testing.T) { testMultiProtoSync(t, 64, FastSync) }
func TestMultiProtoSynchronisation65Full(t *testing.T) { testMultiProtoSync(t, 65, FullSync) }
func TestMultiProtoSynchronisation65Fast(t *testing.T) { testMultiProtoSync(t, 65, FastSync) }
func TestMultiProtoSynchronisation65Light(t *testing.T) { testMultiProtoSync(t, 65, LightSync) }
func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
@ -888,15 +870,13 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
// Tests that if a block is empty (e.g. header only), no body request should be
// made, and instead the header should be assembled into a whole block in itself.
func TestEmptyShortCircuit63Full(t *testing.T) { testEmptyShortCircuit(t, 63, FullSync) }
func TestEmptyShortCircuit63Fast(t *testing.T) { testEmptyShortCircuit(t, 63, FastSync) }
func TestEmptyShortCircuit64Full(t *testing.T) { testEmptyShortCircuit(t, 64, FullSync) }
func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, FastSync) }
func TestEmptyShortCircuit65Full(t *testing.T) { testEmptyShortCircuit(t, 65, FullSync) }
func TestEmptyShortCircuit65Fast(t *testing.T) { testEmptyShortCircuit(t, 65, FastSync) }
func TestEmptyShortCircuit65Light(t *testing.T) { testEmptyShortCircuit(t, 65, LightSync) }
func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
@ -942,15 +922,13 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
// Tests that headers are enqueued continuously, preventing malicious nodes from
// stalling the downloader by feeding gapped header chains.
func TestMissingHeaderAttack63Full(t *testing.T) { testMissingHeaderAttack(t, 63, FullSync) }
func TestMissingHeaderAttack63Fast(t *testing.T) { testMissingHeaderAttack(t, 63, FastSync) }
func TestMissingHeaderAttack64Full(t *testing.T) { testMissingHeaderAttack(t, 64, FullSync) }
func TestMissingHeaderAttack64Fast(t *testing.T) { testMissingHeaderAttack(t, 64, FastSync) }
func TestMissingHeaderAttack65Full(t *testing.T) { testMissingHeaderAttack(t, 65, FullSync) }
func TestMissingHeaderAttack65Fast(t *testing.T) { testMissingHeaderAttack(t, 65, FastSync) }
func TestMissingHeaderAttack65Light(t *testing.T) { testMissingHeaderAttack(t, 65, LightSync) }
func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
func testMissingHeaderAttack(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
@ -974,15 +952,13 @@ func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
// Tests that if requested headers are shifted (i.e. first is missing), the queue
// detects the invalid numbering.
func TestShiftedHeaderAttack63Full(t *testing.T) { testShiftedHeaderAttack(t, 63, FullSync) }
func TestShiftedHeaderAttack63Fast(t *testing.T) { testShiftedHeaderAttack(t, 63, FastSync) }
func TestShiftedHeaderAttack64Full(t *testing.T) { testShiftedHeaderAttack(t, 64, FullSync) }
func TestShiftedHeaderAttack64Fast(t *testing.T) { testShiftedHeaderAttack(t, 64, FastSync) }
func TestShiftedHeaderAttack65Full(t *testing.T) { testShiftedHeaderAttack(t, 65, FullSync) }
func TestShiftedHeaderAttack65Fast(t *testing.T) { testShiftedHeaderAttack(t, 65, FastSync) }
func TestShiftedHeaderAttack65Light(t *testing.T) { testShiftedHeaderAttack(t, 65, LightSync) }
func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
func testShiftedHeaderAttack(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
@ -1011,11 +987,10 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
// Tests that upon detecting an invalid header, the recent ones are rolled back
// for various failure scenarios. Afterwards a full sync is attempted to make
// sure no state was corrupted.
func TestInvalidHeaderRollback63Fast(t *testing.T) { testInvalidHeaderRollback(t, 63, FastSync) }
func TestInvalidHeaderRollback64Fast(t *testing.T) { testInvalidHeaderRollback(t, 64, FastSync) }
func TestInvalidHeaderRollback65Fast(t *testing.T) { testInvalidHeaderRollback(t, 65, FastSync) }
func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
func testInvalidHeaderRollback(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
@ -1103,15 +1078,13 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
// Tests that a peer advertising a high TD doesn't get to stall the downloader
// afterwards by not sending any useful hashes.
func TestHighTDStarvationAttack63Full(t *testing.T) { testHighTDStarvationAttack(t, 63, FullSync) }
func TestHighTDStarvationAttack63Fast(t *testing.T) { testHighTDStarvationAttack(t, 63, FastSync) }
func TestHighTDStarvationAttack64Full(t *testing.T) { testHighTDStarvationAttack(t, 64, FullSync) }
func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttack(t, 64, FastSync) }
func TestHighTDStarvationAttack65Full(t *testing.T) { testHighTDStarvationAttack(t, 65, FullSync) }
func TestHighTDStarvationAttack65Fast(t *testing.T) { testHighTDStarvationAttack(t, 65, FastSync) }
func TestHighTDStarvationAttack65Light(t *testing.T) { testHighTDStarvationAttack(t, 65, LightSync) }
func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
func testHighTDStarvationAttack(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
@ -1125,11 +1098,10 @@ func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
}
// Tests that misbehaving peers are disconnected, whilst behaving ones are not.
func TestBlockHeaderAttackerDropping63(t *testing.T) { testBlockHeaderAttackerDropping(t, 63) }
func TestBlockHeaderAttackerDropping64(t *testing.T) { testBlockHeaderAttackerDropping(t, 64) }
func TestBlockHeaderAttackerDropping65(t *testing.T) { testBlockHeaderAttackerDropping(t, 65) }
func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
func testBlockHeaderAttackerDropping(t *testing.T, protocol uint) {
t.Parallel()
// Define the disconnection requirement for individual hash fetch errors
@ -1179,15 +1151,13 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
// Tests that synchronisation progress (origin block number, current block number
// and highest block number) is tracked and updated correctly.
func TestSyncProgress63Full(t *testing.T) { testSyncProgress(t, 63, FullSync) }
func TestSyncProgress63Fast(t *testing.T) { testSyncProgress(t, 63, FastSync) }
func TestSyncProgress64Full(t *testing.T) { testSyncProgress(t, 64, FullSync) }
func TestSyncProgress64Fast(t *testing.T) { testSyncProgress(t, 64, FastSync) }
func TestSyncProgress65Full(t *testing.T) { testSyncProgress(t, 65, FullSync) }
func TestSyncProgress65Fast(t *testing.T) { testSyncProgress(t, 65, FastSync) }
func TestSyncProgress65Light(t *testing.T) { testSyncProgress(t, 65, LightSync) }
func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
@ -1263,21 +1233,19 @@ func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.Sync
// Tests that synchronisation progress (origin block number and highest block
// number) is tracked and updated correctly in case of a fork (or manual head
// revertal).
func TestForkedSyncProgress63Full(t *testing.T) { testForkedSyncProgress(t, 63, FullSync) }
func TestForkedSyncProgress63Fast(t *testing.T) { testForkedSyncProgress(t, 63, FastSync) }
func TestForkedSyncProgress64Full(t *testing.T) { testForkedSyncProgress(t, 64, FullSync) }
func TestForkedSyncProgress64Fast(t *testing.T) { testForkedSyncProgress(t, 64, FastSync) }
func TestForkedSyncProgress65Full(t *testing.T) { testForkedSyncProgress(t, 65, FullSync) }
func TestForkedSyncProgress65Fast(t *testing.T) { testForkedSyncProgress(t, 65, FastSync) }
func TestForkedSyncProgress65Light(t *testing.T) { testForkedSyncProgress(t, 65, LightSync) }
func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
func testForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
chainA := testChainForkLightA.shorten(testChainBase.len() + MaxHashFetch)
chainB := testChainForkLightB.shorten(testChainBase.len() + MaxHashFetch)
chainA := testChainForkLightA.shorten(testChainBase.len() + MaxHeaderFetch)
chainB := testChainForkLightB.shorten(testChainBase.len() + MaxHeaderFetch)
// Set a sync init hook to catch progress changes
starting := make(chan struct{})
@ -1339,15 +1307,13 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
// Tests that if synchronisation is aborted due to some failure, then the progress
// origin is not updated in the next sync cycle, as it should be considered the
// continuation of the previous sync and not a new instance.
func TestFailedSyncProgress63Full(t *testing.T) { testFailedSyncProgress(t, 63, FullSync) }
func TestFailedSyncProgress63Fast(t *testing.T) { testFailedSyncProgress(t, 63, FastSync) }
func TestFailedSyncProgress64Full(t *testing.T) { testFailedSyncProgress(t, 64, FullSync) }
func TestFailedSyncProgress64Fast(t *testing.T) { testFailedSyncProgress(t, 64, FastSync) }
func TestFailedSyncProgress65Full(t *testing.T) { testFailedSyncProgress(t, 65, FullSync) }
func TestFailedSyncProgress65Fast(t *testing.T) { testFailedSyncProgress(t, 65, FastSync) }
func TestFailedSyncProgress65Light(t *testing.T) { testFailedSyncProgress(t, 65, LightSync) }
func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
func testFailedSyncProgress(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
@ -1412,15 +1378,13 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
// Tests that if an attacker fakes a chain height, after the attack is detected,
// the progress height is successfully reduced at the next sync invocation.
func TestFakedSyncProgress63Full(t *testing.T) { testFakedSyncProgress(t, 63, FullSync) }
func TestFakedSyncProgress63Fast(t *testing.T) { testFakedSyncProgress(t, 63, FastSync) }
func TestFakedSyncProgress64Full(t *testing.T) { testFakedSyncProgress(t, 64, FullSync) }
func TestFakedSyncProgress64Fast(t *testing.T) { testFakedSyncProgress(t, 64, FastSync) }
func TestFakedSyncProgress65Full(t *testing.T) { testFakedSyncProgress(t, 65, FullSync) }
func TestFakedSyncProgress65Fast(t *testing.T) { testFakedSyncProgress(t, 65, FastSync) }
func TestFakedSyncProgress65Light(t *testing.T) { testFakedSyncProgress(t, 65, LightSync) }
func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
func testFakedSyncProgress(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
tester := newTester()
@ -1489,31 +1453,15 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
// This test reproduces an issue where unexpected deliveries would
// block indefinitely if they arrived at the right time.
func TestDeliverHeadersHang(t *testing.T) {
func TestDeliverHeadersHang64Full(t *testing.T) { testDeliverHeadersHang(t, 64, FullSync) }
func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) }
func TestDeliverHeadersHang65Full(t *testing.T) { testDeliverHeadersHang(t, 65, FullSync) }
func TestDeliverHeadersHang65Fast(t *testing.T) { testDeliverHeadersHang(t, 65, FastSync) }
func TestDeliverHeadersHang65Light(t *testing.T) { testDeliverHeadersHang(t, 65, LightSync) }
func testDeliverHeadersHang(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
testCases := []struct {
protocol int
syncMode SyncMode
}{
{63, FullSync},
{63, FastSync},
{64, FullSync},
{64, FastSync},
{64, LightSync},
{65, FullSync},
{65, FastSync},
{65, LightSync},
}
for _, tc := range testCases {
t.Run(fmt.Sprintf("protocol %d mode %v", tc.protocol, tc.syncMode), func(t *testing.T) {
t.Parallel()
testDeliverHeadersHang(t, tc.protocol, tc.syncMode)
})
}
}
func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
master := newTester()
defer master.terminate()
chain := testChainBase.shorten(15)
@ -1664,15 +1612,13 @@ func TestRemoteHeaderRequestSpan(t *testing.T) {
// Tests that peers below a pre-configured checkpoint block are prevented from
// being fast-synced from, avoiding potential cheap eclipse attacks.
func TestCheckpointEnforcement63Full(t *testing.T) { testCheckpointEnforcement(t, 63, FullSync) }
func TestCheckpointEnforcement63Fast(t *testing.T) { testCheckpointEnforcement(t, 63, FastSync) }
func TestCheckpointEnforcement64Full(t *testing.T) { testCheckpointEnforcement(t, 64, FullSync) }
func TestCheckpointEnforcement64Fast(t *testing.T) { testCheckpointEnforcement(t, 64, FastSync) }
func TestCheckpointEnforcement65Full(t *testing.T) { testCheckpointEnforcement(t, 65, FullSync) }
func TestCheckpointEnforcement65Fast(t *testing.T) { testCheckpointEnforcement(t, 65, FastSync) }
func TestCheckpointEnforcement65Light(t *testing.T) { testCheckpointEnforcement(t, 65, LightSync) }
func testCheckpointEnforcement(t *testing.T, protocol int, mode SyncMode) {
func testCheckpointEnforcement(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()
// Create a new tester with a particular hard coded checkpoint block

View File

@ -24,7 +24,8 @@ type SyncMode uint32
const (
FullSync SyncMode = iota // Synchronise the entire blockchain history from full blocks
FastSync // Quickly download the headers, full sync only at the chain head
FastSync // Quickly download the headers, full sync only at the chain
SnapSync // Download the chain and the state via compact snashots
LightSync // Download only the headers and terminate afterwards
)
@ -39,6 +40,8 @@ func (mode SyncMode) String() string {
return "full"
case FastSync:
return "fast"
case SnapSync:
return "snap"
case LightSync:
return "light"
default:
@ -52,6 +55,8 @@ func (mode SyncMode) MarshalText() ([]byte, error) {
return []byte("full"), nil
case FastSync:
return []byte("fast"), nil
case SnapSync:
return []byte("snap"), nil
case LightSync:
return []byte("light"), nil
default:
@ -65,6 +70,8 @@ func (mode *SyncMode) UnmarshalText(text []byte) error {
*mode = FullSync
case "fast":
*mode = FastSync
case "snap":
*mode = SnapSync
case "light":
*mode = LightSync
default:

View File

@ -69,7 +69,7 @@ type peerConnection struct {
peer Peer
version int // Eth protocol version number to switch strategies
version uint // Eth protocol version number to switch strategies
log log.Logger // Contextual logger to add extra infos to peer logs
lock sync.RWMutex
}
@ -112,7 +112,7 @@ func (w *lightPeerWrapper) RequestNodeData([]common.Hash) error {
}
// newPeerConnection creates a new downloader peer.
func newPeerConnection(id string, version int, peer Peer, logger log.Logger) *peerConnection {
func newPeerConnection(id string, version uint, peer Peer, logger log.Logger) *peerConnection {
return &peerConnection{
id: id,
lacking: make(map[common.Hash]struct{}),
@ -457,7 +457,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.headerThroughput
}
return ps.idlePeers(63, 65, idle, throughput)
return ps.idlePeers(64, 65, idle, throughput)
}
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
@ -471,7 +471,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.blockThroughput
}
return ps.idlePeers(63, 65, idle, throughput)
return ps.idlePeers(64, 65, idle, throughput)
}
// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
@ -485,7 +485,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.receiptThroughput
}
return ps.idlePeers(63, 65, idle, throughput)
return ps.idlePeers(64, 65, idle, throughput)
}
// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
@ -499,13 +499,13 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.stateThroughput
}
return ps.idlePeers(63, 65, idle, throughput)
return ps.idlePeers(64, 65, idle, throughput)
}
// idlePeers retrieves a flat list of all currently idle peers satisfying the
// protocol version constraints, using the provided function to check idleness.
// The resulting set of peers are sorted by their measure throughput.
func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peerConnection) bool, throughput func(*peerConnection) float64) ([]*peerConnection, int) {
func (ps *peerSet) idlePeers(minProtocol, maxProtocol uint, idleCheck func(*peerConnection) bool, throughput func(*peerConnection) float64) ([]*peerConnection, int) {
ps.lock.RLock()
defer ps.lock.RUnlock()

View File

@ -113,24 +113,24 @@ type queue struct {
mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
// Headers are "special", they download in batches, supported by a skeleton chain
headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers
headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable
headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations
headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers
headerProced int // [eth/62] Number of headers already processed from the results
headerOffset uint64 // [eth/62] Number of the first header in the result cache
headerContCh chan bool // [eth/62] Channel to notify when header download finishes
headerHead common.Hash // Hash of the last queued header to verify order
headerTaskPool map[uint64]*types.Header // Pending header retrieval tasks, mapping starting indexes to skeleton headers
headerTaskQueue *prque.Prque // Priority queue of the skeleton indexes to fetch the filling headers for
headerPeerMiss map[string]map[uint64]struct{} // Set of per-peer header batches known to be unavailable
headerPendPool map[string]*fetchRequest // Currently pending header retrieval operations
headerResults []*types.Header // Result cache accumulating the completed headers
headerProced int // Number of headers already processed from the results
headerOffset uint64 // Number of the first header in the result cache
headerContCh chan bool // Channel to notify when header download finishes
// All data retrievals below are based on an already assembles header chain
blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers
blockTaskQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for
blockPendPool map[string]*fetchRequest // [eth/62] Currently pending block (body) retrieval operations
blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers
blockTaskQueue *prque.Prque // Priority queue of the headers to fetch the blocks (bodies) for
blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations
receiptTaskPool map[common.Hash]*types.Header // [eth/63] Pending receipt retrieval tasks, mapping hashes to headers
receiptTaskQueue *prque.Prque // [eth/63] Priority queue of the headers to fetch the receipts for
receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations
receiptTaskPool map[common.Hash]*types.Header // Pending receipt retrieval tasks, mapping hashes to headers
receiptTaskQueue *prque.Prque // Priority queue of the headers to fetch the receipts for
receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations
resultCache *resultStore // Downloaded but not yet delivered fetch results
resultSize common.StorageSize // Approximate size of a block (exponential moving average)
@ -690,6 +690,13 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
q.lock.Lock()
defer q.lock.Unlock()
var logger log.Logger
if len(id) < 16 {
// Tests use short IDs, don't choke on them
logger = log.New("peer", id)
} else {
logger = log.New("peer", id[:16])
}
// Short circuit if the data was never requested
request := q.headerPendPool[id]
if request == nil {
@ -704,10 +711,10 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
accepted := len(headers) == MaxHeaderFetch
if accepted {
if headers[0].Number.Uint64() != request.From {
log.Trace("First header broke chain ordering", "peer", id, "number", headers[0].Number, "hash", headers[0].Hash(), request.From)
logger.Trace("First header broke chain ordering", "number", headers[0].Number, "hash", headers[0].Hash(), "expected", request.From)
accepted = false
} else if headers[len(headers)-1].Hash() != target {
log.Trace("Last header broke skeleton structure ", "peer", id, "number", headers[len(headers)-1].Number, "hash", headers[len(headers)-1].Hash(), "expected", target)
logger.Trace("Last header broke skeleton structure ", "number", headers[len(headers)-1].Number, "hash", headers[len(headers)-1].Hash(), "expected", target)
accepted = false
}
}
@ -716,12 +723,12 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
for i, header := range headers[1:] {
hash := header.Hash()
if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
log.Warn("Header broke chain ordering", "peer", id, "number", header.Number, "hash", hash, "expected", want)
logger.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", want)
accepted = false
break
}
if parentHash != header.ParentHash {
log.Warn("Header broke chain ancestry", "peer", id, "number", header.Number, "hash", hash)
logger.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash)
accepted = false
break
}
@ -731,7 +738,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
}
// If the batch of headers wasn't accepted, mark as unavailable
if !accepted {
log.Trace("Skeleton filling not accepted", "peer", id, "from", request.From)
logger.Trace("Skeleton filling not accepted", "from", request.From)
miss := q.headerPeerMiss[id]
if miss == nil {
@ -758,7 +765,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
select {
case headerProcCh <- process:
log.Trace("Pre-scheduled new headers", "peer", id, "count", len(process), "from", process[0].Number)
logger.Trace("Pre-scheduled new headers", "count", len(process), "from", process[0].Number)
q.headerProced += len(process)
default:
}

View File

@ -101,8 +101,16 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
finished []*stateReq // Completed or failed requests
timeout = make(chan *stateReq) // Timed out active requests
)
// Run the state sync.
log.Trace("State sync starting", "root", s.root)
defer func() {
// Cancel active request timers on exit. Also set peers to idle so they're
// available for the next sync.
for _, req := range active {
req.timer.Stop()
req.peer.SetNodeDataIdle(int(req.nItems), time.Now())
}
}()
go s.run()
defer s.Cancel()
@ -252,8 +260,9 @@ func (d *Downloader) spindownStateSync(active map[string]*stateReq, finished []*
type stateSync struct {
d *Downloader // Downloader instance to access and manage current peerset
sched *trie.Sync // State trie sync scheduler defining the tasks
keccak hash.Hash // Keccak256 hasher to verify deliveries with
root common.Hash // State root currently being synced
sched *trie.Sync // State trie sync scheduler defining the tasks
keccak hash.Hash // Keccak256 hasher to verify deliveries with
trieTasks map[common.Hash]*trieTask // Set of trie node tasks currently queued for retrieval
codeTasks map[common.Hash]*codeTask // Set of byte code tasks currently queued for retrieval
@ -268,8 +277,6 @@ type stateSync struct {
cancelOnce sync.Once // Ensures cancel only ever gets called once
done chan struct{} // Channel to signal termination completion
err error // Any error hit during sync (set before completion)
root common.Hash
}
// trieTask represents a single trie node download task, containing a set of
@ -290,6 +297,7 @@ type codeTask struct {
func newStateSync(d *Downloader, root common.Hash) *stateSync {
return &stateSync{
d: d,
root: root,
sched: state.NewStateSync(root, d.stateDB, d.stateBloom),
keccak: sha3.NewLegacyKeccak256(),
trieTasks: make(map[common.Hash]*trieTask),
@ -298,7 +306,6 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync {
cancel: make(chan struct{}),
done: make(chan struct{}),
started: make(chan struct{}),
root: root,
}
}
@ -306,7 +313,12 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync {
// it finishes, and finally notifying any goroutines waiting for the loop to
// finish.
func (s *stateSync) run() {
s.err = s.loop()
close(s.started)
if s.d.snapSync {
s.err = s.d.SnapSyncer.Sync(s.root, s.cancel)
} else {
s.err = s.loop()
}
close(s.done)
}
@ -318,7 +330,9 @@ func (s *stateSync) Wait() error {
// Cancel cancels the sync and waits until it has shut down.
func (s *stateSync) Cancel() error {
s.cancelOnce.Do(func() { close(s.cancel) })
s.cancelOnce.Do(func() {
close(s.cancel)
})
return s.Wait()
}
@ -329,7 +343,6 @@ func (s *stateSync) Cancel() error {
// pushed here async. The reason is to decouple processing from data receipt
// and timeouts.
func (s *stateSync) loop() (err error) {
close(s.started)
// Listen for new peer events to assign tasks to them
newPeer := make(chan *peerConnection, 1024)
peerSub := s.d.peers.SubscribeNewPeers(newPeer)