eth/downloader: match capabilities when querying idle peers
This commit is contained in:
		@@ -816,7 +816,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
			// Send a download request to all idle peers, until throttled
 | 
								// Send a download request to all idle peers, until throttled
 | 
				
			||||||
			throttled := false
 | 
								throttled := false
 | 
				
			||||||
			for _, peer := range d.peers.IdlePeers() {
 | 
								for _, peer := range d.peers.IdlePeers(eth61) {
 | 
				
			||||||
				// Short circuit if throttling activated
 | 
									// Short circuit if throttling activated
 | 
				
			||||||
				if d.queue.Throttle() {
 | 
									if d.queue.Throttle() {
 | 
				
			||||||
					throttled = true
 | 
										throttled = true
 | 
				
			||||||
@@ -1255,7 +1255,7 @@ func (d *Downloader) fetchBodies(from uint64) error {
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
			// Send a download request to all idle peers, until throttled
 | 
								// Send a download request to all idle peers, until throttled
 | 
				
			||||||
			queuedEmptyBlocks, throttled := false, false
 | 
								queuedEmptyBlocks, throttled := false, false
 | 
				
			||||||
			for _, peer := range d.peers.IdlePeers() {
 | 
								for _, peer := range d.peers.IdlePeers(eth62) {
 | 
				
			||||||
				// Short circuit if throttling activated
 | 
									// Short circuit if throttling activated
 | 
				
			||||||
				if d.queue.Throttle() {
 | 
									if d.queue.Throttle() {
 | 
				
			||||||
					throttled = true
 | 
										throttled = true
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -205,9 +205,17 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha
 | 
				
			|||||||
	dl.lock.Lock()
 | 
						dl.lock.Lock()
 | 
				
			||||||
	defer dl.lock.Unlock()
 | 
						defer dl.lock.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	err := dl.downloader.RegisterPeer(id, version, hashes[0],
 | 
						var err error
 | 
				
			||||||
		dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay),
 | 
						switch version {
 | 
				
			||||||
		dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay))
 | 
						case 61:
 | 
				
			||||||
 | 
							err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), nil, nil, nil)
 | 
				
			||||||
 | 
						case 62:
 | 
				
			||||||
 | 
							err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay))
 | 
				
			||||||
 | 
						case 63:
 | 
				
			||||||
 | 
							err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay))
 | 
				
			||||||
 | 
						case 64:
 | 
				
			||||||
 | 
							err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	if err == nil {
 | 
						if err == nil {
 | 
				
			||||||
		// Assign the owned hashes and blocks to the peer (deep copy)
 | 
							// Assign the owned hashes and blocks to the peer (deep copy)
 | 
				
			||||||
		dl.peerHashes[id] = make([]common.Hash, len(hashes))
 | 
							dl.peerHashes[id] = make([]common.Hash, len(hashes))
 | 
				
			||||||
@@ -618,6 +626,41 @@ func testMultiSynchronisation(t *testing.T, protocol int) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Tests that synchronisations behave well in multi-version protocol environments
 | 
				
			||||||
 | 
					// and not wreak havok on other nodes in the network.
 | 
				
			||||||
 | 
					func TestMultiProtocolSynchronisation61(t *testing.T) { testMultiProtocolSynchronisation(t, 61) }
 | 
				
			||||||
 | 
					func TestMultiProtocolSynchronisation62(t *testing.T) { testMultiProtocolSynchronisation(t, 62) }
 | 
				
			||||||
 | 
					func TestMultiProtocolSynchronisation63(t *testing.T) { testMultiProtocolSynchronisation(t, 63) }
 | 
				
			||||||
 | 
					func TestMultiProtocolSynchronisation64(t *testing.T) { testMultiProtocolSynchronisation(t, 64) }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func testMultiProtocolSynchronisation(t *testing.T, protocol int) {
 | 
				
			||||||
 | 
						// Create a small enough block chain to download
 | 
				
			||||||
 | 
						targetBlocks := blockCacheLimit - 15
 | 
				
			||||||
 | 
						hashes, blocks := makeChain(targetBlocks, 0, genesis)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Create peers of every type
 | 
				
			||||||
 | 
						tester := newTester()
 | 
				
			||||||
 | 
						tester.newPeer("peer 61", 61, hashes, blocks)
 | 
				
			||||||
 | 
						tester.newPeer("peer 62", 62, hashes, blocks)
 | 
				
			||||||
 | 
						tester.newPeer("peer 63", 63, hashes, blocks)
 | 
				
			||||||
 | 
						tester.newPeer("peer 64", 64, hashes, blocks)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Synchronise with the requestd peer and make sure all blocks were retrieved
 | 
				
			||||||
 | 
						if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("failed to synchronise blocks: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
 | 
				
			||||||
 | 
							t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Check that no peers have been dropped off
 | 
				
			||||||
 | 
						for _, version := range []int{61, 62, 63, 64} {
 | 
				
			||||||
 | 
							peer := fmt.Sprintf("peer %d", version)
 | 
				
			||||||
 | 
							if _, ok := tester.peerHashes[peer]; !ok {
 | 
				
			||||||
 | 
								t.Errorf("%s dropped", peer)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Tests that if a block is empty (i.e. header only), no body request should be
 | 
					// Tests that if a block is empty (i.e. header only), no body request should be
 | 
				
			||||||
// made, and instead the header should be assembled into a whole block in itself.
 | 
					// made, and instead the header should be assembled into a whole block in itself.
 | 
				
			||||||
func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) }
 | 
					func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) }
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -312,16 +312,18 @@ func (ps *peerSet) AllPeers() []*peer {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// IdlePeers retrieves a flat list of all the currently idle peers within the
 | 
					// IdlePeers retrieves a flat list of all the currently idle peers within the
 | 
				
			||||||
// active peer set, ordered by their reputation.
 | 
					// active peer set, ordered by their reputation.
 | 
				
			||||||
func (ps *peerSet) IdlePeers() []*peer {
 | 
					func (ps *peerSet) IdlePeers(version int) []*peer {
 | 
				
			||||||
	ps.lock.RLock()
 | 
						ps.lock.RLock()
 | 
				
			||||||
	defer ps.lock.RUnlock()
 | 
						defer ps.lock.RUnlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	list := make([]*peer, 0, len(ps.peers))
 | 
						list := make([]*peer, 0, len(ps.peers))
 | 
				
			||||||
	for _, p := range ps.peers {
 | 
						for _, p := range ps.peers {
 | 
				
			||||||
 | 
							if (version == eth61 && p.version == eth61) || (version >= eth62 && p.version >= eth62) {
 | 
				
			||||||
			if atomic.LoadInt32(&p.idle) == 0 {
 | 
								if atomic.LoadInt32(&p.idle) == 0 {
 | 
				
			||||||
				list = append(list, p)
 | 
									list = append(list, p)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	for i := 0; i < len(list); i++ {
 | 
						for i := 0; i < len(list); i++ {
 | 
				
			||||||
		for j := i + 1; j < len(list); j++ {
 | 
							for j := i + 1; j < len(list); j++ {
 | 
				
			||||||
			if atomic.LoadInt32(&list[i].rep) < atomic.LoadInt32(&list[j].rep) {
 | 
								if atomic.LoadInt32(&list[i].rep) < atomic.LoadInt32(&list[j].rep) {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user