les: implement new les fetcher (#20692)

* cmd, consensus, eth, les: implement light fetcher

* les: address comment

* les: address comment

* les: address comments

* les: check td after delivery

* les: add linearExpiredValue for error counter

* les: fix import

* les: fix dead lock

* les: order announces by td

* les: encapsulate invalid counter

* les: address comment

* les: add more checks during the delivery

* les: fix log

* eth, les: fix lint

* eth/fetcher: address comment
This commit is contained in:
gary rong
2020-07-28 23:02:35 +08:00
committed by GitHub
parent 93da0cf8a1
commit 28c5a8a54b
22 changed files with 1366 additions and 1062 deletions

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package fetcher contains the announcement based blocks or transaction synchronisation.
// Package fetcher contains the announcement based header, blocks or transaction synchronisation.
package fetcher
import (
@ -31,6 +31,7 @@ import (
)
const (
lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested
arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction
@ -39,7 +40,7 @@ const (
const (
maxUncleDist = 7 // Maximum allowed backward distance from the chain head
maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
hashLimit = 256 // Maximum number of unique blocks a peer may have announced
hashLimit = 256 // Maximum number of unique blocks or headers a peer may have announced
blockLimit = 64 // Maximum number of unique blocks a peer may have delivered
)
@ -63,9 +64,10 @@ var (
bodyFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/bodies/out", nil)
)
var (
errTerminated = errors.New("terminated")
)
var errTerminated = errors.New("terminated")
// HeaderRetrievalFn is a callback type for retrieving a header from the local chain.
type HeaderRetrievalFn func(common.Hash) *types.Header
// blockRetrievalFn is a callback type for retrieving a block from the local chain.
type blockRetrievalFn func(common.Hash) *types.Block
@ -85,6 +87,9 @@ type blockBroadcasterFn func(block *types.Block, propagate bool)
// chainHeightFn is a callback type to retrieve the current chain height.
type chainHeightFn func() uint64
// headersInsertFn is a callback type to insert a batch of headers into the local chain.
type headersInsertFn func(headers []*types.Header) (int, error)
// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
type chainInsertFn func(types.Blocks) (int, error)
@ -121,18 +126,38 @@ type bodyFilterTask struct {
time time.Time // Arrival time of the blocks' contents
}
// blockInject represents a schedules import operation.
type blockInject struct {
// blockOrHeaderInject represents a schedules import operation.
type blockOrHeaderInject struct {
origin string
block *types.Block
header *types.Header // Used for light mode fetcher which only cares about header.
block *types.Block // Used for normal mode fetcher which imports full block.
}
// number returns the block number of the injected object.
func (inject *blockOrHeaderInject) number() uint64 {
if inject.header != nil {
return inject.header.Number.Uint64()
}
return inject.block.NumberU64()
}
// number returns the block hash of the injected object.
func (inject *blockOrHeaderInject) hash() common.Hash {
if inject.header != nil {
return inject.header.Hash()
}
return inject.block.Hash()
}
// BlockFetcher is responsible for accumulating block announcements from various peers
// and scheduling them for retrieval.
type BlockFetcher struct {
light bool // The indicator whether it's a light fetcher or normal one.
// Various event channels
notify chan *blockAnnounce
inject chan *blockInject
inject chan *blockOrHeaderInject
headerFilter chan chan *headerFilterTask
bodyFilter chan chan *bodyFilterTask
@ -148,31 +173,34 @@ type BlockFetcher struct {
completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing
// Block cache
queue *prque.Prque // Queue containing the import operations (block number sorted)
queues map[string]int // Per peer block counts to prevent memory exhaustion
queued map[common.Hash]*blockInject // Set of already queued blocks (to dedupe imports)
queue *prque.Prque // Queue containing the import operations (block number sorted)
queues map[string]int // Per peer block counts to prevent memory exhaustion
queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)
// Callbacks
getHeader HeaderRetrievalFn // Retrieves a header from the local chain
getBlock blockRetrievalFn // Retrieves a block from the local chain
verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
chainHeight chainHeightFn // Retrieves the current chain's height
insertHeaders headersInsertFn // Injects a batch of headers into the chain
insertChain chainInsertFn // Injects a batch of blocks into the chain
dropPeer peerDropFn // Drops a peer for misbehaving
// Testing hooks
announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list
queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62)
announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list
queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
importedHook func(*types.Header, *types.Block) // Method to call upon successful header or block import (both eth/61 and eth/62)
}
// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
func NewBlockFetcher(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
return &BlockFetcher{
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockInject),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
@ -184,11 +212,13 @@ func NewBlockFetcher(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, b
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New(nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockInject),
queued: make(map[common.Hash]*blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
}
@ -228,7 +258,7 @@ func (f *BlockFetcher) Notify(peer string, hash common.Hash, number uint64, time
// Enqueue tries to fill gaps the fetcher's future import queue.
func (f *BlockFetcher) Enqueue(peer string, block *types.Block) error {
op := &blockInject{
op := &blockOrHeaderInject{
origin: peer,
block: block,
}
@ -315,13 +345,13 @@ func (f *BlockFetcher) loop() {
// Import any queued blocks that could potentially fit
height := f.chainHeight()
for !f.queue.Empty() {
op := f.queue.PopItem().(*blockInject)
hash := op.block.Hash()
op := f.queue.PopItem().(*blockOrHeaderInject)
hash := op.hash()
if f.queueChangeHook != nil {
f.queueChangeHook(hash, false)
}
// If too high up the chain or phase, continue later
number := op.block.NumberU64()
number := op.number()
if number > height+1 {
f.queue.Push(op, -int64(number))
if f.queueChangeHook != nil {
@ -330,11 +360,15 @@ func (f *BlockFetcher) loop() {
break
}
// Otherwise if fresh and still unknown, try and import
if number+maxUncleDist < height || f.getBlock(hash) != nil {
if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) {
f.forgetBlock(hash)
continue
}
f.insert(op.origin, op.block)
if f.light {
f.importHeaders(op.origin, op.header)
} else {
f.importBlocks(op.origin, op.block)
}
}
// Wait for an outside event to occur
select {
@ -379,7 +413,13 @@ func (f *BlockFetcher) loop() {
case op := <-f.inject:
// A direct block insertion was requested, try and fill any pending gaps
blockBroadcastInMeter.Mark(1)
f.enqueue(op.origin, op.block)
// Now only direct block injection is allowed, drop the header injection
// here silently if we receive.
if f.light {
continue
}
f.enqueue(op.origin, nil, op.block)
case hash := <-f.done:
// A pending import finished, remove all traces of the notification
@ -391,13 +431,19 @@ func (f *BlockFetcher) loop() {
request := make(map[string][]common.Hash)
for hash, announces := range f.announced {
if time.Since(announces[0].time) > arriveTimeout-gatherSlack {
// In current LES protocol(les2/les3), only header announce is
// available, no need to wait too much time for header broadcast.
timeout := arriveTimeout - gatherSlack
if f.light {
timeout = 0
}
if time.Since(announces[0].time) > timeout {
// Pick a random peer to retrieve from, reset all others
announce := announces[rand.Intn(len(announces))]
f.forgetHash(hash)
// If the block still didn't arrive, queue for fetching
if f.getBlock(hash) == nil {
if (f.light && f.getHeader(hash) == nil) || (!f.light && f.getBlock(hash) == nil) {
request[announce.origin] = append(request[announce.origin], hash)
f.fetching[hash] = announce
}
@ -465,7 +511,7 @@ func (f *BlockFetcher) loop() {
// Split the batch of headers into unknown ones (to return to the caller),
// known incomplete ones (requiring body retrievals) and completed blocks.
unknown, incomplete, complete := []*types.Header{}, []*blockAnnounce{}, []*types.Block{}
unknown, incomplete, complete, lightHeaders := []*types.Header{}, []*blockAnnounce{}, []*types.Block{}, []*blockAnnounce{}
for _, header := range task.headers {
hash := header.Hash()
@ -478,6 +524,16 @@ func (f *BlockFetcher) loop() {
f.forgetHash(hash)
continue
}
// Collect all headers only if we are running in light
// mode and the headers are not imported by other means.
if f.light {
if f.getHeader(hash) == nil {
announce.header = header
lightHeaders = append(lightHeaders, announce)
}
f.forgetHash(hash)
continue
}
// Only keep if not imported by other means
if f.getBlock(hash) == nil {
announce.header = header
@ -522,10 +578,14 @@ func (f *BlockFetcher) loop() {
f.rescheduleComplete(completeTimer)
}
}
// Schedule the header for light fetcher import
for _, announce := range lightHeaders {
f.enqueue(announce.origin, announce.header, nil)
}
// Schedule the header-only blocks for import
for _, block := range complete {
if announce := f.completing[block.Hash()]; announce != nil {
f.enqueue(announce.origin, block)
f.enqueue(announce.origin, nil, block)
}
}
@ -592,7 +652,7 @@ func (f *BlockFetcher) loop() {
// Schedule the retrieved blocks for ordered import
for _, block := range blocks {
if announce := f.completing[block.Hash()]; announce != nil {
f.enqueue(announce.origin, block)
f.enqueue(announce.origin, nil, block)
}
}
}
@ -605,6 +665,12 @@ func (f *BlockFetcher) rescheduleFetch(fetch *time.Timer) {
if len(f.announced) == 0 {
return
}
// Schedule announcement retrieval quickly for light mode
// since server won't send any headers to client.
if f.light {
fetch.Reset(lightTimeout)
return
}
// Otherwise find the earliest expiring announcement
earliest := time.Now()
for _, announces := range f.announced {
@ -631,46 +697,88 @@ func (f *BlockFetcher) rescheduleComplete(complete *time.Timer) {
complete.Reset(gatherSlack - time.Since(earliest))
}
// enqueue schedules a new future import operation, if the block to be imported
// has not yet been seen.
func (f *BlockFetcher) enqueue(peer string, block *types.Block) {
hash := block.Hash()
// enqueue schedules a new header or block import operation, if the component
// to be imported has not yet been seen.
func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.Block) {
var (
hash common.Hash
number uint64
)
if header != nil {
hash, number = header.Hash(), header.Number.Uint64()
} else {
hash, number = block.Hash(), block.NumberU64()
}
// Ensure the peer isn't DOSing us
count := f.queues[peer] + 1
if count > blockLimit {
log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
log.Debug("Discarded delivered header or block, exceeded allowance", "peer", peer, "number", number, "hash", hash, "limit", blockLimit)
blockBroadcastDOSMeter.Mark(1)
f.forgetHash(hash)
return
}
// Discard any past or too distant blocks
if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
if dist := int64(number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
log.Debug("Discarded delivered header or block, too far away", "peer", peer, "number", number, "hash", hash, "distance", dist)
blockBroadcastDropMeter.Mark(1)
f.forgetHash(hash)
return
}
// Schedule the block for future importing
if _, ok := f.queued[hash]; !ok {
op := &blockInject{
origin: peer,
block: block,
op := &blockOrHeaderInject{origin: peer}
if header != nil {
op.header = header
} else {
op.block = block
}
f.queues[peer] = count
f.queued[hash] = op
f.queue.Push(op, -int64(block.NumberU64()))
f.queue.Push(op, -int64(number))
if f.queueChangeHook != nil {
f.queueChangeHook(op.block.Hash(), true)
f.queueChangeHook(hash, true)
}
log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
log.Debug("Queued delivered header or block", "peer", peer, "number", number, "hash", hash, "queued", f.queue.Size())
}
}
// insert spawns a new goroutine to run a block insertion into the chain. If the
// importHeaders spawns a new goroutine to run a header insertion into the chain.
// If the header's number is at the same height as the current import phase, it
// updates the phase states accordingly.
func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
hash := header.Hash()
log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash)
go func() {
defer func() { f.done <- hash }()
// If the parent's unknown, abort insertion
parent := f.getHeader(header.ParentHash)
if parent == nil {
log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash)
return
}
// Validate the header and if something went wrong, drop the peer
if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock {
log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
f.dropPeer(peer)
return
}
// Run the actual import and log any issues
if _, err := f.insertHeaders([]*types.Header{header}); err != nil {
log.Debug("Propagated header import failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
return
}
// Invoke the testing hook if needed
if f.importedHook != nil {
f.importedHook(header, nil)
}
}()
}
// importBlocks spawns a new goroutine to run a block insertion into the chain. If the
// block's number is at the same height as the current import phase, it updates
// the phase states accordingly.
func (f *BlockFetcher) insert(peer string, block *types.Block) {
func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
hash := block.Hash()
// Run the import on a new thread
@ -711,7 +819,7 @@ func (f *BlockFetcher) insert(peer string, block *types.Block) {
// Invoke the testing hook if needed
if f.importedHook != nil {
f.importedHook(block)
f.importedHook(nil, block)
}
}()
}

View File

@ -78,26 +78,36 @@ func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common
type fetcherTester struct {
fetcher *BlockFetcher
hashes []common.Hash // Hash chain belonging to the tester
blocks map[common.Hash]*types.Block // Blocks belonging to the tester
drops map[string]bool // Map of peers dropped by the fetcher
hashes []common.Hash // Hash chain belonging to the tester
headers map[common.Hash]*types.Header // Headers belonging to the tester
blocks map[common.Hash]*types.Block // Blocks belonging to the tester
drops map[string]bool // Map of peers dropped by the fetcher
lock sync.RWMutex
}
// newTester creates a new fetcher test mocker.
func newTester() *fetcherTester {
func newTester(light bool) *fetcherTester {
tester := &fetcherTester{
hashes: []common.Hash{genesis.Hash()},
blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
drops: make(map[string]bool),
hashes: []common.Hash{genesis.Hash()},
headers: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
drops: make(map[string]bool),
}
tester.fetcher = NewBlockFetcher(tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertChain, tester.dropPeer)
tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertHeaders, tester.insertChain, tester.dropPeer)
tester.fetcher.Start()
return tester
}
// getHeader retrieves a header from the tester's block chain.
func (f *fetcherTester) getHeader(hash common.Hash) *types.Header {
f.lock.RLock()
defer f.lock.RUnlock()
return f.headers[hash]
}
// getBlock retrieves a block from the tester's block chain.
func (f *fetcherTester) getBlock(hash common.Hash) *types.Block {
f.lock.RLock()
@ -120,9 +130,33 @@ func (f *fetcherTester) chainHeight() uint64 {
f.lock.RLock()
defer f.lock.RUnlock()
if f.fetcher.light {
return f.headers[f.hashes[len(f.hashes)-1]].Number.Uint64()
}
return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64()
}
// insertChain injects a new headers into the simulated chain.
func (f *fetcherTester) insertHeaders(headers []*types.Header) (int, error) {
f.lock.Lock()
defer f.lock.Unlock()
for i, header := range headers {
// Make sure the parent in known
if _, ok := f.headers[header.ParentHash]; !ok {
return i, errors.New("unknown parent")
}
// Discard any new blocks if the same height already exists
if header.Number.Uint64() <= f.headers[f.hashes[len(f.hashes)-1]].Number.Uint64() {
return i, nil
}
// Otherwise build our current chain
f.hashes = append(f.hashes, header.Hash())
f.headers[header.Hash()] = header
}
return 0, nil
}
// insertChain injects a new blocks into the simulated chain.
func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) {
f.lock.Lock()
@ -233,7 +267,7 @@ func verifyCompletingEvent(t *testing.T, completing chan []common.Hash, arrive b
}
// verifyImportEvent verifies that one single event arrive on an import channel.
func verifyImportEvent(t *testing.T, imported chan *types.Block, arrive bool) {
func verifyImportEvent(t *testing.T, imported chan interface{}, arrive bool) {
if arrive {
select {
case <-imported:
@ -251,7 +285,7 @@ func verifyImportEvent(t *testing.T, imported chan *types.Block, arrive bool) {
// verifyImportCount verifies that exactly count number of events arrive on an
// import hook channel.
func verifyImportCount(t *testing.T, imported chan *types.Block, count int) {
func verifyImportCount(t *testing.T, imported chan interface{}, count int) {
for i := 0; i < count; i++ {
select {
case <-imported:
@ -263,7 +297,7 @@ func verifyImportCount(t *testing.T, imported chan *types.Block, count int) {
}
// verifyImportDone verifies that no more events are arriving on an import channel.
func verifyImportDone(t *testing.T, imported chan *types.Block) {
func verifyImportDone(t *testing.T, imported chan interface{}) {
select {
case <-imported:
t.Fatalf("extra block imported")
@ -271,45 +305,62 @@ func verifyImportDone(t *testing.T, imported chan *types.Block) {
}
}
// Tests that a fetcher accepts block announcements and initiates retrievals for
// them, successfully importing into the local chain.
func TestSequentialAnnouncements62(t *testing.T) { testSequentialAnnouncements(t, 62) }
func TestSequentialAnnouncements63(t *testing.T) { testSequentialAnnouncements(t, 63) }
func TestSequentialAnnouncements64(t *testing.T) { testSequentialAnnouncements(t, 64) }
// verifyChainHeight verifies the chain height is as expected.
func verifyChainHeight(t *testing.T, fetcher *fetcherTester, height uint64) {
if fetcher.chainHeight() != height {
t.Fatalf("chain height mismatch, got %d, want %d", fetcher.chainHeight(), height)
}
}
func testSequentialAnnouncements(t *testing.T, protocol int) {
// Tests that a fetcher accepts block/header announcements and initiates retrievals
// for them, successfully importing into the local chain.
func TestFullSequentialAnnouncements(t *testing.T) { testSequentialAnnouncements(t, false) }
func TestLightSequentialAnnouncements(t *testing.T) { testSequentialAnnouncements(t, true) }
func testSequentialAnnouncements(t *testing.T, light bool) {
// Create a chain of blocks to import
targetBlocks := 4 * hashLimit
hashes, blocks := makeChain(targetBlocks, 0, genesis)
tester := newTester()
tester := newTester(light)
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
// Iteratively announce blocks until all are imported
imported := make(chan *types.Block)
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
imported := make(chan interface{})
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
if light {
if header == nil {
t.Fatalf("Fetcher try to import empty header")
}
imported <- header
} else {
if block == nil {
t.Fatalf("Fetcher try to import empty block")
}
imported <- block
}
}
for i := len(hashes) - 2; i >= 0; i-- {
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
verifyImportEvent(t, imported, true)
}
verifyImportDone(t, imported)
verifyChainHeight(t, tester, uint64(len(hashes)-1))
}
// Tests that if blocks are announced by multiple peers (or even the same buggy
// peer), they will only get downloaded at most once.
func TestConcurrentAnnouncements62(t *testing.T) { testConcurrentAnnouncements(t, 62) }
func TestConcurrentAnnouncements63(t *testing.T) { testConcurrentAnnouncements(t, 63) }
func TestConcurrentAnnouncements64(t *testing.T) { testConcurrentAnnouncements(t, 64) }
func TestFullConcurrentAnnouncements(t *testing.T) { testConcurrentAnnouncements(t, false) }
func TestLightConcurrentAnnouncements(t *testing.T) { testConcurrentAnnouncements(t, true) }
func testConcurrentAnnouncements(t *testing.T, protocol int) {
func testConcurrentAnnouncements(t *testing.T, light bool) {
// Create a chain of blocks to import
targetBlocks := 4 * hashLimit
hashes, blocks := makeChain(targetBlocks, 0, genesis)
// Assemble a tester with a built in counter for the requests
tester := newTester()
tester := newTester(light)
firstHeaderFetcher := tester.makeHeaderFetcher("first", blocks, -gatherSlack)
firstBodyFetcher := tester.makeBodyFetcher("first", blocks, 0)
secondHeaderFetcher := tester.makeHeaderFetcher("second", blocks, -gatherSlack)
@ -325,9 +376,20 @@ func testConcurrentAnnouncements(t *testing.T, protocol int) {
return secondHeaderFetcher(hash)
}
// Iteratively announce blocks until all are imported
imported := make(chan *types.Block)
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
imported := make(chan interface{})
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
if light {
if header == nil {
t.Fatalf("Fetcher try to import empty header")
}
imported <- header
} else {
if block == nil {
t.Fatalf("Fetcher try to import empty block")
}
imported <- block
}
}
for i := len(hashes) - 2; i >= 0; i-- {
tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), firstHeaderWrapper, firstBodyFetcher)
tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
@ -340,30 +402,42 @@ func testConcurrentAnnouncements(t *testing.T, protocol int) {
if int(counter) != targetBlocks {
t.Fatalf("retrieval count mismatch: have %v, want %v", counter, targetBlocks)
}
verifyChainHeight(t, tester, uint64(len(hashes)-1))
}
// Tests that announcements arriving while a previous is being fetched still
// results in a valid import.
func TestOverlappingAnnouncements62(t *testing.T) { testOverlappingAnnouncements(t, 62) }
func TestOverlappingAnnouncements63(t *testing.T) { testOverlappingAnnouncements(t, 63) }
func TestOverlappingAnnouncements64(t *testing.T) { testOverlappingAnnouncements(t, 64) }
func TestFullOverlappingAnnouncements(t *testing.T) { testOverlappingAnnouncements(t, false) }
func TestLightOverlappingAnnouncements(t *testing.T) { testOverlappingAnnouncements(t, true) }
func testOverlappingAnnouncements(t *testing.T, protocol int) {
func testOverlappingAnnouncements(t *testing.T, light bool) {
// Create a chain of blocks to import
targetBlocks := 4 * hashLimit
hashes, blocks := makeChain(targetBlocks, 0, genesis)
tester := newTester()
tester := newTester(light)
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
// Iteratively announce blocks, but overlap them continuously
overlap := 16
imported := make(chan *types.Block, len(hashes)-1)
imported := make(chan interface{}, len(hashes)-1)
for i := 0; i < overlap; i++ {
imported <- nil
}
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
if light {
if header == nil {
t.Fatalf("Fetcher try to import empty header")
}
imported <- header
} else {
if block == nil {
t.Fatalf("Fetcher try to import empty block")
}
imported <- block
}
}
for i := len(hashes) - 2; i >= 0; i-- {
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
@ -375,19 +449,19 @@ func testOverlappingAnnouncements(t *testing.T, protocol int) {
}
// Wait for all the imports to complete and check count
verifyImportCount(t, imported, overlap)
verifyChainHeight(t, tester, uint64(len(hashes)-1))
}
// Tests that announces already being retrieved will not be duplicated.
func TestPendingDeduplication62(t *testing.T) { testPendingDeduplication(t, 62) }
func TestPendingDeduplication63(t *testing.T) { testPendingDeduplication(t, 63) }
func TestPendingDeduplication64(t *testing.T) { testPendingDeduplication(t, 64) }
func TestFullPendingDeduplication(t *testing.T) { testPendingDeduplication(t, false) }
func TestLightPendingDeduplication(t *testing.T) { testPendingDeduplication(t, true) }
func testPendingDeduplication(t *testing.T, protocol int) {
func testPendingDeduplication(t *testing.T, light bool) {
// Create a hash and corresponding block
hashes, blocks := makeChain(1, 0, genesis)
// Assemble a tester with a built in counter and delayed fetcher
tester := newTester()
tester := newTester(light)
headerFetcher := tester.makeHeaderFetcher("repeater", blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher("repeater", blocks, 0)
@ -403,42 +477,58 @@ func testPendingDeduplication(t *testing.T, protocol int) {
}()
return nil
}
checkNonExist := func() bool {
return tester.getBlock(hashes[0]) == nil
}
if light {
checkNonExist = func() bool {
return tester.getHeader(hashes[0]) == nil
}
}
// Announce the same block many times until it's fetched (wait for any pending ops)
for tester.getBlock(hashes[0]) == nil {
for checkNonExist() {
tester.fetcher.Notify("repeater", hashes[0], 1, time.Now().Add(-arriveTimeout), headerWrapper, bodyFetcher)
time.Sleep(time.Millisecond)
}
time.Sleep(delay)
// Check that all blocks were imported and none fetched twice
if imported := len(tester.blocks); imported != 2 {
t.Fatalf("synchronised block mismatch: have %v, want %v", imported, 2)
}
if int(counter) != 1 {
t.Fatalf("retrieval count mismatch: have %v, want %v", counter, 1)
}
verifyChainHeight(t, tester, 1)
}
// Tests that announcements retrieved in a random order are cached and eventually
// imported when all the gaps are filled in.
func TestRandomArrivalImport62(t *testing.T) { testRandomArrivalImport(t, 62) }
func TestRandomArrivalImport63(t *testing.T) { testRandomArrivalImport(t, 63) }
func TestRandomArrivalImport64(t *testing.T) { testRandomArrivalImport(t, 64) }
func TestFullRandomArrivalImport(t *testing.T) { testRandomArrivalImport(t, false) }
func TestLightRandomArrivalImport(t *testing.T) { testRandomArrivalImport(t, true) }
func testRandomArrivalImport(t *testing.T, protocol int) {
func testRandomArrivalImport(t *testing.T, light bool) {
// Create a chain of blocks to import, and choose one to delay
targetBlocks := maxQueueDist
hashes, blocks := makeChain(targetBlocks, 0, genesis)
skip := targetBlocks / 2
tester := newTester()
tester := newTester(light)
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
// Iteratively announce blocks, skipping one entry
imported := make(chan *types.Block, len(hashes)-1)
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
imported := make(chan interface{}, len(hashes)-1)
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
if light {
if header == nil {
t.Fatalf("Fetcher try to import empty header")
}
imported <- header
} else {
if block == nil {
t.Fatalf("Fetcher try to import empty block")
}
imported <- block
}
}
for i := len(hashes) - 1; i >= 0; i-- {
if i != skip {
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
@ -448,27 +538,24 @@ func testRandomArrivalImport(t *testing.T, protocol int) {
// Finally announce the skipped entry and check full import
tester.fetcher.Notify("valid", hashes[skip], uint64(len(hashes)-skip-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
verifyImportCount(t, imported, len(hashes)-1)
verifyChainHeight(t, tester, uint64(len(hashes)-1))
}
// Tests that direct block enqueues (due to block propagation vs. hash announce)
// are correctly schedule, filling and import queue gaps.
func TestQueueGapFill62(t *testing.T) { testQueueGapFill(t, 62) }
func TestQueueGapFill63(t *testing.T) { testQueueGapFill(t, 63) }
func TestQueueGapFill64(t *testing.T) { testQueueGapFill(t, 64) }
func testQueueGapFill(t *testing.T, protocol int) {
func TestQueueGapFill(t *testing.T) {
// Create a chain of blocks to import, and choose one to not announce at all
targetBlocks := maxQueueDist
hashes, blocks := makeChain(targetBlocks, 0, genesis)
skip := targetBlocks / 2
tester := newTester()
tester := newTester(false)
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
// Iteratively announce blocks, skipping one entry
imported := make(chan *types.Block, len(hashes)-1)
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
imported := make(chan interface{}, len(hashes)-1)
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
for i := len(hashes) - 1; i >= 0; i-- {
if i != skip {
@ -479,20 +566,17 @@ func testQueueGapFill(t *testing.T, protocol int) {
// Fill the missing block directly as if propagated
tester.fetcher.Enqueue("valid", blocks[hashes[skip]])
verifyImportCount(t, imported, len(hashes)-1)
verifyChainHeight(t, tester, uint64(len(hashes)-1))
}
// Tests that blocks arriving from various sources (multiple propagations, hash
// announces, etc) do not get scheduled for import multiple times.
func TestImportDeduplication62(t *testing.T) { testImportDeduplication(t, 62) }
func TestImportDeduplication63(t *testing.T) { testImportDeduplication(t, 63) }
func TestImportDeduplication64(t *testing.T) { testImportDeduplication(t, 64) }
func testImportDeduplication(t *testing.T, protocol int) {
func TestImportDeduplication(t *testing.T) {
// Create two blocks to import (one for duplication, the other for stalling)
hashes, blocks := makeChain(2, 0, genesis)
// Create the tester and wrap the importer with a counter
tester := newTester()
tester := newTester(false)
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
@ -503,9 +587,9 @@ func testImportDeduplication(t *testing.T, protocol int) {
}
// Instrument the fetching and imported events
fetching := make(chan []common.Hash)
imported := make(chan *types.Block, len(hashes)-1)
imported := make(chan interface{}, len(hashes)-1)
tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
// Announce the duplicating block, wait for retrieval, and also propagate directly
tester.fetcher.Notify("valid", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
@ -534,7 +618,7 @@ func TestDistantPropagationDiscarding(t *testing.T) {
low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
// Create a tester and simulate a head block being the middle of the above chain
tester := newTester()
tester := newTester(false)
tester.lock.Lock()
tester.hashes = []common.Hash{head}
@ -558,11 +642,10 @@ func TestDistantPropagationDiscarding(t *testing.T) {
// Tests that announcements with numbers much lower or higher than out current
// head get discarded to prevent wasting resources on useless blocks from faulty
// peers.
func TestDistantAnnouncementDiscarding62(t *testing.T) { testDistantAnnouncementDiscarding(t, 62) }
func TestDistantAnnouncementDiscarding63(t *testing.T) { testDistantAnnouncementDiscarding(t, 63) }
func TestDistantAnnouncementDiscarding64(t *testing.T) { testDistantAnnouncementDiscarding(t, 64) }
func TestFullDistantAnnouncementDiscarding(t *testing.T) { testDistantAnnouncementDiscarding(t, false) }
func TestLightDistantAnnouncementDiscarding(t *testing.T) { testDistantAnnouncementDiscarding(t, true) }
func testDistantAnnouncementDiscarding(t *testing.T, protocol int) {
func testDistantAnnouncementDiscarding(t *testing.T, light bool) {
// Create a long chain to import and define the discard boundaries
hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
head := hashes[len(hashes)/2]
@ -570,10 +653,11 @@ func testDistantAnnouncementDiscarding(t *testing.T, protocol int) {
low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
// Create a tester and simulate a head block being the middle of the above chain
tester := newTester()
tester := newTester(light)
tester.lock.Lock()
tester.hashes = []common.Hash{head}
tester.headers = map[common.Hash]*types.Header{head: blocks[head].Header()}
tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
tester.lock.Unlock()
@ -601,21 +685,31 @@ func testDistantAnnouncementDiscarding(t *testing.T, protocol int) {
// Tests that peers announcing blocks with invalid numbers (i.e. not matching
// the headers provided afterwards) get dropped as malicious.
func TestInvalidNumberAnnouncement62(t *testing.T) { testInvalidNumberAnnouncement(t, 62) }
func TestInvalidNumberAnnouncement63(t *testing.T) { testInvalidNumberAnnouncement(t, 63) }
func TestInvalidNumberAnnouncement64(t *testing.T) { testInvalidNumberAnnouncement(t, 64) }
func TestFullInvalidNumberAnnouncement(t *testing.T) { testInvalidNumberAnnouncement(t, false) }
func TestLightInvalidNumberAnnouncement(t *testing.T) { testInvalidNumberAnnouncement(t, true) }
func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
func testInvalidNumberAnnouncement(t *testing.T, light bool) {
// Create a single block to import and check numbers against
hashes, blocks := makeChain(1, 0, genesis)
tester := newTester()
tester := newTester(light)
badHeaderFetcher := tester.makeHeaderFetcher("bad", blocks, -gatherSlack)
badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0)
imported := make(chan *types.Block)
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
imported := make(chan interface{})
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
if light {
if header == nil {
t.Fatalf("Fetcher try to import empty header")
}
imported <- header
} else {
if block == nil {
t.Fatalf("Fetcher try to import empty block")
}
imported <- block
}
}
// Announce a block with a bad number, check for immediate drop
tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher)
verifyImportEvent(t, imported, false)
@ -646,15 +740,11 @@ func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
// Tests that if a block is empty (i.e. header only), no body request should be
// made, and instead the header should be assembled into a whole block in itself.
func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) }
func TestEmptyBlockShortCircuit63(t *testing.T) { testEmptyBlockShortCircuit(t, 63) }
func TestEmptyBlockShortCircuit64(t *testing.T) { testEmptyBlockShortCircuit(t, 64) }
func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
func TestEmptyBlockShortCircuit(t *testing.T) {
// Create a chain of blocks to import
hashes, blocks := makeChain(32, 0, genesis)
tester := newTester()
tester := newTester(false)
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
@ -665,9 +755,13 @@ func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
completing := make(chan []common.Hash)
tester.fetcher.completingHook = func(hashes []common.Hash) { completing <- hashes }
imported := make(chan *types.Block)
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
imported := make(chan interface{})
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
if block == nil {
t.Fatalf("Fetcher try to import empty block")
}
imported <- block
}
// Iteratively announce blocks until all are imported
for i := len(hashes) - 2; i >= 0; i-- {
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
@ -687,16 +781,12 @@ func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
// Tests that a peer is unable to use unbounded memory with sending infinite
// block announcements to a node, but that even in the face of such an attack,
// the fetcher remains operational.
func TestHashMemoryExhaustionAttack62(t *testing.T) { testHashMemoryExhaustionAttack(t, 62) }
func TestHashMemoryExhaustionAttack63(t *testing.T) { testHashMemoryExhaustionAttack(t, 63) }
func TestHashMemoryExhaustionAttack64(t *testing.T) { testHashMemoryExhaustionAttack(t, 64) }
func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
func TestHashMemoryExhaustionAttack(t *testing.T) {
// Create a tester with instrumented import hooks
tester := newTester()
tester := newTester(false)
imported, announces := make(chan *types.Block), int32(0)
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
imported, announces := make(chan interface{}), int32(0)
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) {
if added {
atomic.AddInt32(&announces, 1)
@ -740,10 +830,10 @@ func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
// system memory.
func TestBlockMemoryExhaustionAttack(t *testing.T) {
// Create a tester with instrumented import hooks
tester := newTester()
tester := newTester(false)
imported, enqueued := make(chan *types.Block), int32(0)
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
imported, enqueued := make(chan interface{}), int32(0)
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) {
if added {
atomic.AddInt32(&enqueued, 1)