core, eth, trie: fix data races and merge/review issues
This commit is contained in:
@ -142,9 +142,11 @@ type Fetcher struct {
|
||||
dropPeer peerDropFn // Drops a peer for misbehaving
|
||||
|
||||
// Testing hooks
|
||||
fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
|
||||
completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
|
||||
importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62)
|
||||
announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
|
||||
queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
|
||||
fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
|
||||
completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
|
||||
importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62)
|
||||
}
|
||||
|
||||
// New creates a block fetcher to retrieve blocks based on hash announcements.
|
||||
@ -324,11 +326,16 @@ func (f *Fetcher) loop() {
|
||||
height := f.chainHeight()
|
||||
for !f.queue.Empty() {
|
||||
op := f.queue.PopItem().(*inject)
|
||||
|
||||
if f.queueChangeHook != nil {
|
||||
f.queueChangeHook(op.block.Hash(), false)
|
||||
}
|
||||
// If too high up the chain or phase, continue later
|
||||
number := op.block.NumberU64()
|
||||
if number > height+1 {
|
||||
f.queue.Push(op, -float32(op.block.NumberU64()))
|
||||
if f.queueChangeHook != nil {
|
||||
f.queueChangeHook(op.block.Hash(), true)
|
||||
}
|
||||
break
|
||||
}
|
||||
// Otherwise if fresh and still unknown, try and import
|
||||
@ -372,6 +379,9 @@ func (f *Fetcher) loop() {
|
||||
}
|
||||
f.announces[notification.origin] = count
|
||||
f.announced[notification.hash] = append(f.announced[notification.hash], notification)
|
||||
if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
|
||||
f.announceChangeHook(notification.hash, true)
|
||||
}
|
||||
if len(f.announced) == 1 {
|
||||
f.rescheduleFetch(fetchTimer)
|
||||
}
|
||||
@ -714,7 +724,9 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
|
||||
f.queues[peer] = count
|
||||
f.queued[hash] = op
|
||||
f.queue.Push(op, -float32(block.NumberU64()))
|
||||
|
||||
if f.queueChangeHook != nil {
|
||||
f.queueChangeHook(op.block.Hash(), true)
|
||||
}
|
||||
if glog.V(logger.Debug) {
|
||||
glog.Infof("Peer %s: queued block #%d [%x…], total %v", peer, block.NumberU64(), hash.Bytes()[:4], f.queue.Size())
|
||||
}
|
||||
@ -781,7 +793,9 @@ func (f *Fetcher) forgetHash(hash common.Hash) {
|
||||
}
|
||||
}
|
||||
delete(f.announced, hash)
|
||||
|
||||
if f.announceChangeHook != nil {
|
||||
f.announceChangeHook(hash, false)
|
||||
}
|
||||
// Remove any pending fetches and decrement the DOS counters
|
||||
if announce := f.fetching[hash]; announce != nil {
|
||||
f.announces[announce.origin]--
|
||||
|
@ -145,6 +145,9 @@ func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) {
|
||||
// dropPeer is an emulator for the peer removal, simply accumulating the various
|
||||
// peers dropped by the fetcher.
|
||||
func (f *fetcherTester) dropPeer(peer string) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
f.drops[peer] = true
|
||||
}
|
||||
|
||||
@ -608,8 +611,11 @@ func TestDistantPropagationDiscarding(t *testing.T) {
|
||||
|
||||
// Create a tester and simulate a head block being the middle of the above chain
|
||||
tester := newTester()
|
||||
|
||||
tester.lock.Lock()
|
||||
tester.hashes = []common.Hash{head}
|
||||
tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
|
||||
tester.lock.Unlock()
|
||||
|
||||
// Ensure that a block with a lower number than the threshold is discarded
|
||||
tester.fetcher.Enqueue("lower", blocks[hashes[low]])
|
||||
@ -641,8 +647,11 @@ func testDistantAnnouncementDiscarding(t *testing.T, protocol int) {
|
||||
|
||||
// Create a tester and simulate a head block being the middle of the above chain
|
||||
tester := newTester()
|
||||
|
||||
tester.lock.Lock()
|
||||
tester.hashes = []common.Hash{head}
|
||||
tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
|
||||
tester.lock.Unlock()
|
||||
|
||||
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
|
||||
bodyFetcher := tester.makeBodyFetcher(blocks, 0)
|
||||
@ -687,14 +696,22 @@ func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
|
||||
tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
|
||||
verifyImportEvent(t, imported, false)
|
||||
|
||||
if !tester.drops["bad"] {
|
||||
tester.lock.RLock()
|
||||
dropped := tester.drops["bad"]
|
||||
tester.lock.RUnlock()
|
||||
|
||||
if !dropped {
|
||||
t.Fatalf("peer with invalid numbered announcement not dropped")
|
||||
}
|
||||
// Make sure a good announcement passes without a drop
|
||||
tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
|
||||
verifyImportEvent(t, imported, true)
|
||||
|
||||
if tester.drops["good"] {
|
||||
tester.lock.RLock()
|
||||
dropped = tester.drops["good"]
|
||||
tester.lock.RUnlock()
|
||||
|
||||
if dropped {
|
||||
t.Fatalf("peer with valid numbered announcement dropped")
|
||||
}
|
||||
verifyImportDone(t, imported)
|
||||
@ -752,9 +769,15 @@ func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
|
||||
// Create a tester with instrumented import hooks
|
||||
tester := newTester()
|
||||
|
||||
imported := make(chan *types.Block)
|
||||
imported, announces := make(chan *types.Block), int32(0)
|
||||
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
|
||||
|
||||
tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) {
|
||||
if added {
|
||||
atomic.AddInt32(&announces, 1)
|
||||
} else {
|
||||
atomic.AddInt32(&announces, -1)
|
||||
}
|
||||
}
|
||||
// Create a valid chain and an infinite junk chain
|
||||
targetBlocks := hashLimit + 2*maxQueueDist
|
||||
hashes, blocks := makeChain(targetBlocks, 0, genesis)
|
||||
@ -782,8 +805,8 @@ func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
|
||||
tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), nil, attackerHeaderFetcher, attackerBodyFetcher)
|
||||
}
|
||||
}
|
||||
if len(tester.fetcher.announced) != hashLimit+maxQueueDist {
|
||||
t.Fatalf("queued announce count mismatch: have %d, want %d", len(tester.fetcher.announced), hashLimit+maxQueueDist)
|
||||
if count := atomic.LoadInt32(&announces); count != hashLimit+maxQueueDist {
|
||||
t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist)
|
||||
}
|
||||
// Wait for fetches to complete
|
||||
verifyImportCount(t, imported, maxQueueDist)
|
||||
@ -807,9 +830,15 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) {
|
||||
// Create a tester with instrumented import hooks
|
||||
tester := newTester()
|
||||
|
||||
imported := make(chan *types.Block)
|
||||
imported, enqueued := make(chan *types.Block), int32(0)
|
||||
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
|
||||
|
||||
tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) {
|
||||
if added {
|
||||
atomic.AddInt32(&enqueued, 1)
|
||||
} else {
|
||||
atomic.AddInt32(&enqueued, -1)
|
||||
}
|
||||
}
|
||||
// Create a valid chain and a batch of dangling (but in range) blocks
|
||||
targetBlocks := hashLimit + 2*maxQueueDist
|
||||
hashes, blocks := makeChain(targetBlocks, 0, genesis)
|
||||
@ -825,7 +854,7 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) {
|
||||
tester.fetcher.Enqueue("attacker", block)
|
||||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
if queued := tester.fetcher.queue.Size(); queued != blockLimit {
|
||||
if queued := atomic.LoadInt32(&enqueued); queued != blockLimit {
|
||||
t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit)
|
||||
}
|
||||
// Queue up a batch of valid blocks, and check that a new peer is allowed to do so
|
||||
@ -833,7 +862,7 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) {
|
||||
tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-3-i]])
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if queued := tester.fetcher.queue.Size(); queued != blockLimit+maxQueueDist-1 {
|
||||
if queued := atomic.LoadInt32(&enqueued); queued != blockLimit+maxQueueDist-1 {
|
||||
t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit+maxQueueDist-1)
|
||||
}
|
||||
// Insert the missing piece (and sanity check the import)
|
||||
|
Reference in New Issue
Block a user