Merge pull request #1314 from karalabe/handle-fetcher-attacks-2
eth/fetcher: handle and test various DOS attacks
This commit is contained in:
		| @@ -20,6 +20,8 @@ const ( | ||||
| 	fetchTimeout  = 5 * time.Second        // Maximum alloted time to return an explicitly requested block | ||||
| 	maxUncleDist  = 7                      // Maximum allowed backward distance from the chain head | ||||
| 	maxQueueDist  = 32                     // Maximum allowed distance from the chain head to queue | ||||
| 	hashLimit     = 256                    // Maximum number of unique blocks a peer may have announced | ||||
| 	blockLimit    = 64                     // Maximum number of unique blocks a per may have delivered | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| @@ -74,12 +76,14 @@ type Fetcher struct { | ||||
| 	quit   chan struct{} | ||||
|  | ||||
| 	// Announce states | ||||
| 	announces map[string]int              // Per peer announce counts to prevent memory exhaustion | ||||
| 	announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching | ||||
| 	fetching  map[common.Hash]*announce   // Announced blocks, currently fetching | ||||
|  | ||||
| 	// Block cache | ||||
| 	queue  *prque.Prque             // Queue containing the import operations (block number sorted) | ||||
| 	queued map[common.Hash]struct{} // Presence set of already queued blocks (to dedup imports) | ||||
| 	queue  *prque.Prque            // Queue containing the import operations (block number sorted) | ||||
| 	queues map[string]int          // Per peer block counts to prevent memory exhaustion | ||||
| 	queued map[common.Hash]*inject // Set of already queued blocks (to dedup imports) | ||||
|  | ||||
| 	// Callbacks | ||||
| 	getBlock       blockRetrievalFn   // Retrieves a block from the local chain | ||||
| @@ -88,6 +92,10 @@ type Fetcher struct { | ||||
| 	chainHeight    chainHeightFn      // Retrieves the current chain's height | ||||
| 	insertChain    chainInsertFn      // Injects a batch of blocks into the chain | ||||
| 	dropPeer       peerDropFn         // Drops a peer for misbehaving | ||||
|  | ||||
| 	// Testing hooks | ||||
| 	fetchingHook func([]common.Hash) // Method to call upon starting a block fetch | ||||
| 	importedHook func(*types.Block)  // Method to call upon successful block import | ||||
| } | ||||
|  | ||||
| // New creates a block fetcher to retrieve blocks based on hash announcements. | ||||
| @@ -98,10 +106,12 @@ func New(getBlock blockRetrievalFn, validateBlock blockValidatorFn, broadcastBlo | ||||
| 		filter:         make(chan chan []*types.Block), | ||||
| 		done:           make(chan common.Hash), | ||||
| 		quit:           make(chan struct{}), | ||||
| 		announces:      make(map[string]int), | ||||
| 		announced:      make(map[common.Hash][]*announce), | ||||
| 		fetching:       make(map[common.Hash]*announce), | ||||
| 		queue:          prque.New(), | ||||
| 		queued:         make(map[common.Hash]struct{}), | ||||
| 		queues:         make(map[string]int), | ||||
| 		queued:         make(map[common.Hash]*inject), | ||||
| 		getBlock:       getBlock, | ||||
| 		validateBlock:  validateBlock, | ||||
| 		broadcastBlock: broadcastBlock, | ||||
| @@ -189,23 +199,24 @@ func (f *Fetcher) loop() { | ||||
| 		// Clean up any expired block fetches | ||||
| 		for hash, announce := range f.fetching { | ||||
| 			if time.Since(announce.time) > fetchTimeout { | ||||
| 				delete(f.announced, hash) | ||||
| 				delete(f.fetching, hash) | ||||
| 				f.forgetHash(hash) | ||||
| 			} | ||||
| 		} | ||||
| 		// Import any queued blocks that could potentially fit | ||||
| 		height := f.chainHeight() | ||||
| 		for !f.queue.Empty() { | ||||
| 			op := f.queue.PopItem().(*inject) | ||||
| 			number := op.block.NumberU64() | ||||
|  | ||||
| 			// If too high up the chain or phase, continue later | ||||
| 			number := op.block.NumberU64() | ||||
| 			if number > height+1 { | ||||
| 				f.queue.Push(op, -float32(op.block.NumberU64())) | ||||
| 				break | ||||
| 			} | ||||
| 			// Otherwise if fresh and still unknown, try and import | ||||
| 			if number+maxUncleDist < height || f.getBlock(op.block.Hash()) != nil { | ||||
| 			hash := op.block.Hash() | ||||
| 			if number+maxUncleDist < height || f.getBlock(hash) != nil { | ||||
| 				f.forgetBlock(hash) | ||||
| 				continue | ||||
| 			} | ||||
| 			f.insert(op.origin, op.block) | ||||
| @@ -217,10 +228,17 @@ func (f *Fetcher) loop() { | ||||
| 			return | ||||
|  | ||||
| 		case notification := <-f.notify: | ||||
| 			// A block was announced, schedule if it's not yet downloading | ||||
| 			// A block was announced, make sure the peer isn't DOSing us | ||||
| 			count := f.announces[notification.origin] + 1 | ||||
| 			if count > hashLimit { | ||||
| 				glog.V(logger.Debug).Infof("Peer %s: exceeded outstanding announces (%d)", notification.origin, hashLimit) | ||||
| 				break | ||||
| 			} | ||||
| 			// All is well, schedule the announce if block's not yet downloading | ||||
| 			if _, ok := f.fetching[notification.hash]; ok { | ||||
| 				break | ||||
| 			} | ||||
| 			f.announces[notification.origin] = count | ||||
| 			f.announced[notification.hash] = append(f.announced[notification.hash], notification) | ||||
| 			if len(f.announced) == 1 { | ||||
| 				f.reschedule(fetch) | ||||
| @@ -232,9 +250,8 @@ func (f *Fetcher) loop() { | ||||
|  | ||||
| 		case hash := <-f.done: | ||||
| 			// A pending import finished, remove all traces of the notification | ||||
| 			delete(f.announced, hash) | ||||
| 			delete(f.fetching, hash) | ||||
| 			delete(f.queued, hash) | ||||
| 			f.forgetHash(hash) | ||||
| 			f.forgetBlock(hash) | ||||
|  | ||||
| 		case <-fetch.C: | ||||
| 			// At least one block's timer ran out, check for needing retrieval | ||||
| @@ -242,12 +259,15 @@ func (f *Fetcher) loop() { | ||||
|  | ||||
| 			for hash, announces := range f.announced { | ||||
| 				if time.Since(announces[0].time) > arriveTimeout-gatherSlack { | ||||
| 					// Pick a random peer to retrieve from, reset all others | ||||
| 					announce := announces[rand.Intn(len(announces))] | ||||
| 					f.forgetHash(hash) | ||||
|  | ||||
| 					// If the block still didn't arrive, queue for fetching | ||||
| 					if f.getBlock(hash) == nil { | ||||
| 						request[announce.origin] = append(request[announce.origin], hash) | ||||
| 						f.fetching[hash] = announce | ||||
| 					} | ||||
| 					delete(f.announced, hash) | ||||
| 				} | ||||
| 			} | ||||
| 			// Send out all block requests | ||||
| @@ -261,7 +281,14 @@ func (f *Fetcher) loop() { | ||||
|  | ||||
| 					glog.V(logger.Detail).Infof("Peer %s: fetching %s", peer, list) | ||||
| 				} | ||||
| 				go f.fetching[hashes[0]].fetch(hashes) | ||||
| 				// Create a closure of the fetch and schedule in on a new thread | ||||
| 				fetcher, hashes := f.fetching[hashes[0]].fetch, hashes | ||||
| 				go func() { | ||||
| 					if f.fetchingHook != nil { | ||||
| 						f.fetchingHook(hashes) | ||||
| 					} | ||||
| 					fetcher(hashes) | ||||
| 				}() | ||||
| 			} | ||||
| 			// Schedule the next fetch if blocks are still pending | ||||
| 			f.reschedule(fetch) | ||||
| @@ -285,7 +312,7 @@ func (f *Fetcher) loop() { | ||||
| 					if f.getBlock(hash) == nil { | ||||
| 						explicit = append(explicit, block) | ||||
| 					} else { | ||||
| 						delete(f.fetching, hash) | ||||
| 						f.forgetHash(hash) | ||||
| 					} | ||||
| 				} else { | ||||
| 					download = append(download, block) | ||||
| @@ -328,6 +355,12 @@ func (f *Fetcher) reschedule(fetch *time.Timer) { | ||||
| func (f *Fetcher) enqueue(peer string, block *types.Block) { | ||||
| 	hash := block.Hash() | ||||
|  | ||||
| 	// Ensure the peer isn't DOSing us | ||||
| 	count := f.queues[peer] + 1 | ||||
| 	if count > blockLimit { | ||||
| 		glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x], exceeded allowance (%d)", peer, block.NumberU64(), hash.Bytes()[:4], blockLimit) | ||||
| 		return | ||||
| 	} | ||||
| 	// Discard any past or too distant blocks | ||||
| 	if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { | ||||
| 		glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist) | ||||
| @@ -335,8 +368,13 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) { | ||||
| 	} | ||||
| 	// Schedule the block for future importing | ||||
| 	if _, ok := f.queued[hash]; !ok { | ||||
| 		f.queued[hash] = struct{}{} | ||||
| 		f.queue.Push(&inject{origin: peer, block: block}, -float32(block.NumberU64())) | ||||
| 		op := &inject{ | ||||
| 			origin: peer, | ||||
| 			block:  block, | ||||
| 		} | ||||
| 		f.queues[peer] = count | ||||
| 		f.queued[hash] = op | ||||
| 		f.queue.Push(op, -float32(block.NumberU64())) | ||||
|  | ||||
| 		if glog.V(logger.Debug) { | ||||
| 			glog.Infof("Peer %s: queued block #%d [%x], total %v", peer, block.NumberU64(), hash.Bytes()[:4], f.queue.Size()) | ||||
| @@ -375,5 +413,44 @@ func (f *Fetcher) insert(peer string, block *types.Block) { | ||||
| 		} | ||||
| 		// If import succeeded, broadcast the block | ||||
| 		go f.broadcastBlock(block, false) | ||||
|  | ||||
| 		// Invoke the testing hook if needed | ||||
| 		if f.importedHook != nil { | ||||
| 			f.importedHook(block) | ||||
| 		} | ||||
| 	}() | ||||
| } | ||||
|  | ||||
| // forgetHash removes all traces of a block announcement from the fetcher's | ||||
| // internal state. | ||||
| func (f *Fetcher) forgetHash(hash common.Hash) { | ||||
| 	// Remove all pending announces and decrement DOS counters | ||||
| 	for _, announce := range f.announced[hash] { | ||||
| 		f.announces[announce.origin]-- | ||||
| 		if f.announces[announce.origin] == 0 { | ||||
| 			delete(f.announces, announce.origin) | ||||
| 		} | ||||
| 	} | ||||
| 	delete(f.announced, hash) | ||||
|  | ||||
| 	// Remove any pending fetches and decrement the DOS counters | ||||
| 	if announce := f.fetching[hash]; announce != nil { | ||||
| 		f.announces[announce.origin]-- | ||||
| 		if f.announces[announce.origin] == 0 { | ||||
| 			delete(f.announces, announce.origin) | ||||
| 		} | ||||
| 		delete(f.fetching, hash) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // forgetBlock removes all traces of a queued block frmo the fetcher's internal | ||||
| // state. | ||||
| func (f *Fetcher) forgetBlock(hash common.Hash) { | ||||
| 	if insert := f.queued[hash]; insert != nil { | ||||
| 		f.queues[insert.origin]-- | ||||
| 		if f.queues[insert.origin] == 0 { | ||||
| 			delete(f.queues, insert.origin) | ||||
| 		} | ||||
| 		delete(f.queued, hash) | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -159,11 +159,42 @@ func (f *fetcherTester) makeFetcher(blocks map[common.Hash]*types.Block) blockRe | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // verifyImportEvent verifies that one single event arrive on an import channel. | ||||
| func verifyImportEvent(t *testing.T, imported chan *types.Block) { | ||||
| 	select { | ||||
| 	case <-imported: | ||||
| 	case <-time.After(time.Second): | ||||
| 		t.Fatalf("import timeout") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // verifyImportCount verifies that exactly count number of events arrive on an | ||||
| // import hook channel. | ||||
| func verifyImportCount(t *testing.T, imported chan *types.Block, count int) { | ||||
| 	for i := 0; i < count; i++ { | ||||
| 		select { | ||||
| 		case <-imported: | ||||
| 		case <-time.After(time.Second): | ||||
| 			t.Fatalf("block %d: import timeout", i) | ||||
| 		} | ||||
| 	} | ||||
| 	verifyImportDone(t, imported) | ||||
| } | ||||
|  | ||||
| // verifyImportDone verifies that no more events are arriving on an import channel. | ||||
| func verifyImportDone(t *testing.T, imported chan *types.Block) { | ||||
| 	select { | ||||
| 	case <-imported: | ||||
| 		t.Fatalf("extra block imported") | ||||
| 	case <-time.After(50 * time.Millisecond): | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Tests that a fetcher accepts block announcements and initiates retrievals for | ||||
| // them, successfully importing into the local chain. | ||||
| func TestSequentialAnnouncements(t *testing.T) { | ||||
| 	// Create a chain of blocks to import | ||||
| 	targetBlocks := 24 | ||||
| 	targetBlocks := 4 * hashLimit | ||||
| 	hashes := createHashes(targetBlocks, knownHash) | ||||
| 	blocks := createBlocksFromHashes(hashes) | ||||
|  | ||||
| @@ -171,20 +202,21 @@ func TestSequentialAnnouncements(t *testing.T) { | ||||
| 	fetcher := tester.makeFetcher(blocks) | ||||
|  | ||||
| 	// Iteratively announce blocks until all are imported | ||||
| 	for i := len(hashes) - 1; i >= 0; i-- { | ||||
| 	imported := make(chan *types.Block) | ||||
| 	tester.fetcher.importedHook = func(block *types.Block) { imported <- block } | ||||
|  | ||||
| 	for i := len(hashes) - 2; i >= 0; i-- { | ||||
| 		tester.fetcher.Notify("valid", hashes[i], time.Now().Add(-arriveTimeout), fetcher) | ||||
| 		time.Sleep(50 * time.Millisecond) | ||||
| 	} | ||||
| 	if imported := len(tester.blocks); imported != targetBlocks+1 { | ||||
| 		t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) | ||||
| 		verifyImportEvent(t, imported) | ||||
| 	} | ||||
| 	verifyImportDone(t, imported) | ||||
| } | ||||
|  | ||||
| // Tests that if blocks are announced by multiple peers (or even the same buggy | ||||
| // peer), they will only get downloaded at most once. | ||||
| func TestConcurrentAnnouncements(t *testing.T) { | ||||
| 	// Create a chain of blocks to import | ||||
| 	targetBlocks := 24 | ||||
| 	targetBlocks := 4 * hashLimit | ||||
| 	hashes := createHashes(targetBlocks, knownHash) | ||||
| 	blocks := createBlocksFromHashes(hashes) | ||||
|  | ||||
| @@ -198,16 +230,18 @@ func TestConcurrentAnnouncements(t *testing.T) { | ||||
| 		return fetcher(hashes) | ||||
| 	} | ||||
| 	// Iteratively announce blocks until all are imported | ||||
| 	for i := len(hashes) - 1; i >= 0; i-- { | ||||
| 	imported := make(chan *types.Block) | ||||
| 	tester.fetcher.importedHook = func(block *types.Block) { imported <- block } | ||||
|  | ||||
| 	for i := len(hashes) - 2; i >= 0; i-- { | ||||
| 		tester.fetcher.Notify("first", hashes[i], time.Now().Add(-arriveTimeout), wrapper) | ||||
| 		tester.fetcher.Notify("second", hashes[i], time.Now().Add(-arriveTimeout+time.Millisecond), wrapper) | ||||
| 		tester.fetcher.Notify("second", hashes[i], time.Now().Add(-arriveTimeout-time.Millisecond), wrapper) | ||||
|  | ||||
| 		time.Sleep(50 * time.Millisecond) | ||||
| 	} | ||||
| 	if imported := len(tester.blocks); imported != targetBlocks+1 { | ||||
| 		t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) | ||||
| 		verifyImportEvent(t, imported) | ||||
| 	} | ||||
| 	verifyImportDone(t, imported) | ||||
|  | ||||
| 	// Make sure no blocks were retrieved twice | ||||
| 	if int(counter) != targetBlocks { | ||||
| 		t.Fatalf("retrieval count mismatch: have %v, want %v", counter, targetBlocks) | ||||
| @@ -218,7 +252,7 @@ func TestConcurrentAnnouncements(t *testing.T) { | ||||
| // results in a valid import. | ||||
| func TestOverlappingAnnouncements(t *testing.T) { | ||||
| 	// Create a chain of blocks to import | ||||
| 	targetBlocks := 24 | ||||
| 	targetBlocks := 4 * hashLimit | ||||
| 	hashes := createHashes(targetBlocks, knownHash) | ||||
| 	blocks := createBlocksFromHashes(hashes) | ||||
|  | ||||
| @@ -226,16 +260,21 @@ func TestOverlappingAnnouncements(t *testing.T) { | ||||
| 	fetcher := tester.makeFetcher(blocks) | ||||
|  | ||||
| 	// Iteratively announce blocks, but overlap them continuously | ||||
| 	delay, overlap := 50*time.Millisecond, time.Duration(5) | ||||
| 	for i := len(hashes) - 1; i >= 0; i-- { | ||||
| 		tester.fetcher.Notify("valid", hashes[i], time.Now().Add(-arriveTimeout+overlap*delay), fetcher) | ||||
| 		time.Sleep(delay) | ||||
| 	} | ||||
| 	time.Sleep(overlap * delay) | ||||
| 	fetching := make(chan []common.Hash) | ||||
| 	imported := make(chan *types.Block, len(hashes)-1) | ||||
| 	tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes } | ||||
| 	tester.fetcher.importedHook = func(block *types.Block) { imported <- block } | ||||
|  | ||||
| 	if imported := len(tester.blocks); imported != targetBlocks+1 { | ||||
| 		t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) | ||||
| 	for i := len(hashes) - 2; i >= 0; i-- { | ||||
| 		tester.fetcher.Notify("valid", hashes[i], time.Now().Add(-arriveTimeout), fetcher) | ||||
| 		select { | ||||
| 		case <-fetching: | ||||
| 		case <-time.After(time.Second): | ||||
| 			t.Fatalf("hash %d: announce timeout", len(hashes)-i) | ||||
| 		} | ||||
| 	} | ||||
| 	// Wait for all the imports to complete and check count | ||||
| 	verifyImportCount(t, imported, len(hashes)-1) | ||||
| } | ||||
|  | ||||
| // Tests that announces already being retrieved will not be duplicated. | ||||
| @@ -280,56 +319,52 @@ func TestPendingDeduplication(t *testing.T) { | ||||
| // imported when all the gaps are filled in. | ||||
| func TestRandomArrivalImport(t *testing.T) { | ||||
| 	// Create a chain of blocks to import, and choose one to delay | ||||
| 	targetBlocks := 24 | ||||
| 	hashes := createHashes(targetBlocks, knownHash) | ||||
| 	hashes := createHashes(maxQueueDist, knownHash) | ||||
| 	blocks := createBlocksFromHashes(hashes) | ||||
| 	skip := targetBlocks / 2 | ||||
| 	skip := maxQueueDist / 2 | ||||
|  | ||||
| 	tester := newTester() | ||||
| 	fetcher := tester.makeFetcher(blocks) | ||||
|  | ||||
| 	// Iteratively announce blocks, skipping one entry | ||||
| 	imported := make(chan *types.Block, len(hashes)-1) | ||||
| 	tester.fetcher.importedHook = func(block *types.Block) { imported <- block } | ||||
|  | ||||
| 	for i := len(hashes) - 1; i >= 0; i-- { | ||||
| 		if i != skip { | ||||
| 			tester.fetcher.Notify("valid", hashes[i], time.Now().Add(-arriveTimeout), fetcher) | ||||
| 			time.Sleep(50 * time.Millisecond) | ||||
| 			time.Sleep(time.Millisecond) | ||||
| 		} | ||||
| 	} | ||||
| 	// Finally announce the skipped entry and check full import | ||||
| 	tester.fetcher.Notify("valid", hashes[skip], time.Now().Add(-arriveTimeout), fetcher) | ||||
| 	time.Sleep(50 * time.Millisecond) | ||||
|  | ||||
| 	if imported := len(tester.blocks); imported != targetBlocks+1 { | ||||
| 		t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) | ||||
| 	} | ||||
| 	verifyImportCount(t, imported, len(hashes)-1) | ||||
| } | ||||
|  | ||||
| // Tests that direct block enqueues (due to block propagation vs. hash announce) | ||||
| // are correctly schedule, filling and import queue gaps. | ||||
| func TestQueueGapFill(t *testing.T) { | ||||
| 	// Create a chain of blocks to import, and choose one to not announce at all | ||||
| 	targetBlocks := 24 | ||||
| 	hashes := createHashes(targetBlocks, knownHash) | ||||
| 	hashes := createHashes(maxQueueDist, knownHash) | ||||
| 	blocks := createBlocksFromHashes(hashes) | ||||
| 	skip := targetBlocks / 2 | ||||
| 	skip := maxQueueDist / 2 | ||||
|  | ||||
| 	tester := newTester() | ||||
| 	fetcher := tester.makeFetcher(blocks) | ||||
|  | ||||
| 	// Iteratively announce blocks, skipping one entry | ||||
| 	imported := make(chan *types.Block, len(hashes)-1) | ||||
| 	tester.fetcher.importedHook = func(block *types.Block) { imported <- block } | ||||
|  | ||||
| 	for i := len(hashes) - 1; i >= 0; i-- { | ||||
| 		if i != skip { | ||||
| 			tester.fetcher.Notify("valid", hashes[i], time.Now().Add(-arriveTimeout), fetcher) | ||||
| 			time.Sleep(50 * time.Millisecond) | ||||
| 			time.Sleep(time.Millisecond) | ||||
| 		} | ||||
| 	} | ||||
| 	// Fill the missing block directly as if propagated | ||||
| 	tester.fetcher.Enqueue("valid", blocks[hashes[skip]]) | ||||
| 	time.Sleep(50 * time.Millisecond) | ||||
|  | ||||
| 	if imported := len(tester.blocks); imported != targetBlocks+1 { | ||||
| 		t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) | ||||
| 	} | ||||
| 	verifyImportCount(t, imported, len(hashes)-1) | ||||
| } | ||||
|  | ||||
| // Tests that blocks arriving from various sources (multiple propagations, hash | ||||
| @@ -348,9 +383,15 @@ func TestImportDeduplication(t *testing.T) { | ||||
| 		atomic.AddUint32(&counter, uint32(len(blocks))) | ||||
| 		return tester.insertChain(blocks) | ||||
| 	} | ||||
| 	// Instrument the fetching and imported events | ||||
| 	fetching := make(chan []common.Hash) | ||||
| 	imported := make(chan *types.Block, len(hashes)-1) | ||||
| 	tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes } | ||||
| 	tester.fetcher.importedHook = func(block *types.Block) { imported <- block } | ||||
|  | ||||
| 	// Announce the duplicating block, wait for retrieval, and also propagate directly | ||||
| 	tester.fetcher.Notify("valid", hashes[0], time.Now().Add(-arriveTimeout), fetcher) | ||||
| 	time.Sleep(50 * time.Millisecond) | ||||
| 	<-fetching | ||||
|  | ||||
| 	tester.fetcher.Enqueue("valid", blocks[hashes[0]]) | ||||
| 	tester.fetcher.Enqueue("valid", blocks[hashes[0]]) | ||||
| @@ -358,11 +399,8 @@ func TestImportDeduplication(t *testing.T) { | ||||
|  | ||||
| 	// Fill the missing block directly as if propagated, and check import uniqueness | ||||
| 	tester.fetcher.Enqueue("valid", blocks[hashes[1]]) | ||||
| 	time.Sleep(50 * time.Millisecond) | ||||
| 	verifyImportCount(t, imported, 2) | ||||
|  | ||||
| 	if imported := len(tester.blocks); imported != 3 { | ||||
| 		t.Fatalf("synchronised block mismatch: have %v, want %v", imported, 3) | ||||
| 	} | ||||
| 	if counter != 2 { | ||||
| 		t.Fatalf("import invocation count mismatch: have %v, want %v", counter, 2) | ||||
| 	} | ||||
| @@ -395,3 +433,92 @@ func TestDistantDiscarding(t *testing.T) { | ||||
| 		t.Fatalf("fetcher queued future block") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Tests that a peer is unable to use unbounded memory with sending infinite | ||||
| // block announcements to a node, but that even in the face of such an attack, | ||||
| // the fetcher remains operational. | ||||
| func TestHashMemoryExhaustionAttack(t *testing.T) { | ||||
| 	// Create a tester with instrumented import hooks | ||||
| 	tester := newTester() | ||||
|  | ||||
| 	imported := make(chan *types.Block) | ||||
| 	tester.fetcher.importedHook = func(block *types.Block) { imported <- block } | ||||
|  | ||||
| 	// Create a valid chain and an infinite junk chain | ||||
| 	hashes := createHashes(hashLimit+2*maxQueueDist, knownHash) | ||||
| 	blocks := createBlocksFromHashes(hashes) | ||||
| 	valid := tester.makeFetcher(blocks) | ||||
|  | ||||
| 	attack := createHashes(hashLimit+2*maxQueueDist, unknownHash) | ||||
| 	attacker := tester.makeFetcher(nil) | ||||
|  | ||||
| 	// Feed the tester a huge hashset from the attacker, and a limited from the valid peer | ||||
| 	for i := 0; i < len(attack); i++ { | ||||
| 		if i < maxQueueDist { | ||||
| 			tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], time.Now(), valid) | ||||
| 		} | ||||
| 		tester.fetcher.Notify("attacker", attack[i], time.Now(), attacker) | ||||
| 	} | ||||
| 	if len(tester.fetcher.announced) != hashLimit+maxQueueDist { | ||||
| 		t.Fatalf("queued announce count mismatch: have %d, want %d", len(tester.fetcher.announced), hashLimit+maxQueueDist) | ||||
| 	} | ||||
| 	// Wait for fetches to complete | ||||
| 	verifyImportCount(t, imported, maxQueueDist) | ||||
|  | ||||
| 	// Feed the remaining valid hashes to ensure DOS protection state remains clean | ||||
| 	for i := len(hashes) - maxQueueDist - 2; i >= 0; i-- { | ||||
| 		tester.fetcher.Notify("valid", hashes[i], time.Now().Add(-arriveTimeout), valid) | ||||
| 		verifyImportEvent(t, imported) | ||||
| 	} | ||||
| 	verifyImportDone(t, imported) | ||||
| } | ||||
|  | ||||
| // Tests that blocks sent to the fetcher (either through propagation or via hash | ||||
| // announces and retrievals) don't pile up indefinitely, exhausting available | ||||
| // system memory. | ||||
| func TestBlockMemoryExhaustionAttack(t *testing.T) { | ||||
| 	// Create a tester with instrumented import hooks | ||||
| 	tester := newTester() | ||||
|  | ||||
| 	imported := make(chan *types.Block) | ||||
| 	tester.fetcher.importedHook = func(block *types.Block) { imported <- block } | ||||
|  | ||||
| 	// Create a valid chain and a batch of dangling (but in range) blocks | ||||
| 	hashes := createHashes(blockLimit+2*maxQueueDist, knownHash) | ||||
| 	blocks := createBlocksFromHashes(hashes) | ||||
|  | ||||
| 	attack := make(map[common.Hash]*types.Block) | ||||
| 	for len(attack) < blockLimit+2*maxQueueDist { | ||||
| 		hashes := createHashes(maxQueueDist-1, unknownHash) | ||||
| 		blocks := createBlocksFromHashes(hashes) | ||||
| 		for _, hash := range hashes[:maxQueueDist-2] { | ||||
| 			attack[hash] = blocks[hash] | ||||
| 		} | ||||
| 	} | ||||
| 	// Try to feed all the attacker blocks make sure only a limited batch is accepted | ||||
| 	for _, block := range attack { | ||||
| 		tester.fetcher.Enqueue("attacker", block) | ||||
| 	} | ||||
| 	time.Sleep(100 * time.Millisecond) | ||||
| 	if queued := tester.fetcher.queue.Size(); queued != blockLimit { | ||||
| 		t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit) | ||||
| 	} | ||||
| 	// Queue up a batch of valid blocks, and check that a new peer is allowed to do so | ||||
| 	for i := 0; i < maxQueueDist-1; i++ { | ||||
| 		tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-3-i]]) | ||||
| 	} | ||||
| 	time.Sleep(100 * time.Millisecond) | ||||
| 	if queued := tester.fetcher.queue.Size(); queued != blockLimit+maxQueueDist-1 { | ||||
| 		t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit+maxQueueDist-1) | ||||
| 	} | ||||
| 	// Insert the missing piece (and sanity check the import) | ||||
| 	tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2]]) | ||||
| 	verifyImportCount(t, imported, maxQueueDist) | ||||
|  | ||||
| 	// Insert the remaining blocks in chunks to ensure clean DOS protection | ||||
| 	for i := maxQueueDist; i < len(hashes)-1; i++ { | ||||
| 		tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2-i]]) | ||||
| 		verifyImportEvent(t, imported) | ||||
| 	} | ||||
| 	verifyImportDone(t, imported) | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user