eth/downloader: circumvent hash reordering attacks
This commit is contained in:
		| @@ -33,7 +33,7 @@ var ( | |||||||
| 	errEmptyHashSet     = errors.New("empty hash set by peer") | 	errEmptyHashSet     = errors.New("empty hash set by peer") | ||||||
| 	errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") | 	errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") | ||||||
| 	errAlreadyInPool    = errors.New("hash already in pool") | 	errAlreadyInPool    = errors.New("hash already in pool") | ||||||
| 	errBlockNumberOverflow = errors.New("received block which overflows") | 	ErrInvalidChain     = errors.New("retrieved hash chain is invalid") | ||||||
| 	errCancelHashFetch  = errors.New("hash fetching cancelled (requested)") | 	errCancelHashFetch  = errors.New("hash fetching cancelled (requested)") | ||||||
| 	errCancelBlockFetch = errors.New("block downloading cancelled (requested)") | 	errCancelBlockFetch = errors.New("block downloading cancelled (requested)") | ||||||
| 	errNoSyncActive     = errors.New("no sync active") | 	errNoSyncActive     = errors.New("no sync active") | ||||||
| @@ -334,8 +334,14 @@ out: | |||||||
| 			// If the peer was previously banned and failed to deliver it's pack | 			// If the peer was previously banned and failed to deliver it's pack | ||||||
| 			// in a reasonable time frame, ignore it's message. | 			// in a reasonable time frame, ignore it's message. | ||||||
| 			if peer := d.peers.Peer(blockPack.peerId); peer != nil { | 			if peer := d.peers.Peer(blockPack.peerId); peer != nil { | ||||||
| 				// Deliver the received chunk of blocks, but drop the peer if invalid | 				// Deliver the received chunk of blocks | ||||||
| 				if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { | 				if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { | ||||||
|  | 					if err == ErrInvalidChain { | ||||||
|  | 						// The hash chain is invalid (blocks are not ordered properly), abort | ||||||
|  | 						d.queue.Reset() | ||||||
|  | 						return err | ||||||
|  | 					} | ||||||
|  | 					// Peer did deliver, but some blocks were off, penalize | ||||||
| 					glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err) | 					glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err) | ||||||
| 					peer.Demote() | 					peer.Demote() | ||||||
| 					break | 					break | ||||||
|   | |||||||
| @@ -75,9 +75,40 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types | |||||||
| 	return tester | 	return tester | ||||||
| } | } | ||||||
|  |  | ||||||
| func (dl *downloadTester) sync(peerId string, hash common.Hash) error { | // sync is a simple wrapper around the downloader to start synchronisation and | ||||||
|  | // block until it returns | ||||||
|  | func (dl *downloadTester) sync(peerId string, head common.Hash) error { | ||||||
| 	dl.activePeerId = peerId | 	dl.activePeerId = peerId | ||||||
| 	return dl.downloader.Synchronise(peerId, hash) | 	return dl.downloader.Synchronise(peerId, head) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // syncTake is starts synchronising with a remote peer, but concurrently it also | ||||||
|  | // starts fetching blocks that the downloader retrieved. IT blocks until both go | ||||||
|  | // routines terminate. | ||||||
|  | func (dl *downloadTester) syncTake(peerId string, head common.Hash) (types.Blocks, error) { | ||||||
|  | 	// Start a block collector to take blocks as they become available | ||||||
|  | 	done := make(chan struct{}) | ||||||
|  | 	took := []*types.Block{} | ||||||
|  | 	go func() { | ||||||
|  | 		for running := true; running; { | ||||||
|  | 			select { | ||||||
|  | 			case <-done: | ||||||
|  | 				running = false | ||||||
|  | 			default: | ||||||
|  | 				time.Sleep(time.Millisecond) | ||||||
|  | 			} | ||||||
|  | 			// Take a batch of blocks and accumulate | ||||||
|  | 			took = append(took, dl.downloader.TakeBlocks()...) | ||||||
|  | 		} | ||||||
|  | 		done <- struct{}{} | ||||||
|  | 	}() | ||||||
|  | 	// Start the downloading, sync the taker and return | ||||||
|  | 	err := dl.sync(peerId, head) | ||||||
|  |  | ||||||
|  | 	done <- struct{}{} | ||||||
|  | 	<-done | ||||||
|  |  | ||||||
|  | 	return took, err | ||||||
| } | } | ||||||
|  |  | ||||||
| func (dl *downloadTester) insertBlocks(blocks types.Blocks) { | func (dl *downloadTester) insertBlocks(blocks types.Blocks) { | ||||||
| @@ -264,32 +295,7 @@ func TestThrottling(t *testing.T) { | |||||||
| 	tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{}) | 	tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{}) | ||||||
|  |  | ||||||
| 	// Concurrently download and take the blocks | 	// Concurrently download and take the blocks | ||||||
| 	errc := make(chan error, 1) | 	took, err := tester.syncTake("peer1", hashes[0]) | ||||||
| 	go func() { |  | ||||||
| 		errc <- tester.sync("peer1", hashes[0]) |  | ||||||
| 	}() |  | ||||||
|  |  | ||||||
| 	done := make(chan struct{}) |  | ||||||
| 	took := []*types.Block{} |  | ||||||
| 	go func() { |  | ||||||
| 		for running := true; running; { |  | ||||||
| 			select { |  | ||||||
| 			case <-done: |  | ||||||
| 				running = false |  | ||||||
| 			default: |  | ||||||
| 				time.Sleep(time.Millisecond) |  | ||||||
| 			} |  | ||||||
| 			// Take a batch of blocks and accumulate |  | ||||||
| 			took = append(took, tester.downloader.TakeBlocks()...) |  | ||||||
| 		} |  | ||||||
| 		done <- struct{}{} |  | ||||||
| 	}() |  | ||||||
|  |  | ||||||
| 	// Synchronise the two threads and verify |  | ||||||
| 	err := <-errc |  | ||||||
| 	done <- struct{}{} |  | ||||||
| 	<-done |  | ||||||
|  |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("failed to synchronise blocks: %v", err) | 		t.Fatalf("failed to synchronise blocks: %v", err) | ||||||
| 	} | 	} | ||||||
| @@ -395,3 +401,31 @@ func TestNonExistingBlockAttack(t *testing.T) { | |||||||
| 		t.Fatalf("failed to synchronise blocks: %v", err) | 		t.Fatalf("failed to synchronise blocks: %v", err) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Tests that if a malicious peer is returning hashes in a weird order, that the | ||||||
|  | // sync throttler doesn't choke on them waiting for the valid blocks. | ||||||
|  | func TestInvalidHashOrderAttack(t *testing.T) { | ||||||
|  | 	// Create a valid long chain, but reverse some hashes within | ||||||
|  | 	hashes := createHashes(0, 4*blockCacheLimit) | ||||||
|  | 	blocks := createBlocksFromHashes(hashes) | ||||||
|  |  | ||||||
|  | 	reverse := make([]common.Hash, len(hashes)) | ||||||
|  | 	copy(reverse, hashes) | ||||||
|  |  | ||||||
|  | 	for i := len(hashes) / 4; i < 2*len(hashes)/4; i++ { | ||||||
|  | 		reverse[i], reverse[len(hashes)-i-1] = reverse[len(hashes)-i-1], reverse[i] | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Try and sync with the malicious node and check that it fails | ||||||
|  | 	tester := newTester(t, reverse, blocks) | ||||||
|  | 	tester.newPeer("attack", big.NewInt(10000), reverse[0]) | ||||||
|  | 	if _, err := tester.syncTake("attack", reverse[0]); err != ErrInvalidChain { | ||||||
|  | 		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain) | ||||||
|  | 	} | ||||||
|  | 	// Ensure that a valid chain can still pass sync | ||||||
|  | 	tester.hashes = hashes | ||||||
|  | 	tester.newPeer("valid", big.NewInt(20000), hashes[0]) | ||||||
|  | 	if _, err := tester.syncTake("valid", hashes[0]); err != nil { | ||||||
|  | 		t.Fatalf("failed to synchronise blocks: %v", err) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|   | |||||||
| @@ -298,18 +298,17 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { | |||||||
| 	// Iterate over the downloaded blocks and add each of them | 	// Iterate over the downloaded blocks and add each of them | ||||||
| 	errs := make([]error, 0) | 	errs := make([]error, 0) | ||||||
| 	for _, block := range blocks { | 	for _, block := range blocks { | ||||||
| 		// Skip any blocks that fall outside the cache range |  | ||||||
| 		index := int(block.NumberU64()) - q.blockOffset |  | ||||||
| 		if index >= len(q.blockCache) || index < 0 { |  | ||||||
| 			//fmt.Printf("block cache overflown (N=%v O=%v, C=%v)", block.Number(), q.blockOffset, len(q.blockCache)) |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 		// Skip any blocks that were not requested | 		// Skip any blocks that were not requested | ||||||
| 		hash := block.Hash() | 		hash := block.Hash() | ||||||
| 		if _, ok := request.Hashes[hash]; !ok { | 		if _, ok := request.Hashes[hash]; !ok { | ||||||
| 			errs = append(errs, fmt.Errorf("non-requested block %v", hash)) | 			errs = append(errs, fmt.Errorf("non-requested block %v", hash)) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  | 		// If a requested block falls out of the range, the hash chain is invalid | ||||||
|  | 		index := int(block.NumberU64()) - q.blockOffset | ||||||
|  | 		if index >= len(q.blockCache) || index < 0 { | ||||||
|  | 			return ErrInvalidChain | ||||||
|  | 		} | ||||||
| 		// Otherwise merge the block and mark the hash block | 		// Otherwise merge the block and mark the hash block | ||||||
| 		q.blockCache[index] = block | 		q.blockCache[index] = block | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user