Merge pull request #1843 from karalabe/cleanup-downloader-channel
eth/downloader: always send termination wakes, clean leftover
This commit is contained in:
		@@ -154,7 +154,7 @@ type Downloader struct {
 | 
			
		||||
	blockCh   chan blockPack  // [eth/61] Channel receiving inbound blocks
 | 
			
		||||
	headerCh  chan headerPack // [eth/62] Channel receiving inbound block headers
 | 
			
		||||
	bodyCh    chan bodyPack   // [eth/62] Channel receiving inbound block bodies
 | 
			
		||||
	processCh chan bool       // Channel to signal the block fetcher of new or finished work
 | 
			
		||||
	wakeCh    chan bool       // Channel to signal the block/body fetcher of new tasks
 | 
			
		||||
 | 
			
		||||
	cancelCh   chan struct{} // Channel to cancel mid-flight syncs
 | 
			
		||||
	cancelLock sync.RWMutex  // Lock to protect the cancel channel in delivers
 | 
			
		||||
@@ -188,7 +188,7 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, he
 | 
			
		||||
		blockCh:     make(chan blockPack, 1),
 | 
			
		||||
		headerCh:    make(chan headerPack, 1),
 | 
			
		||||
		bodyCh:      make(chan bodyPack, 1),
 | 
			
		||||
		processCh:   make(chan bool, 1),
 | 
			
		||||
		wakeCh:      make(chan bool, 1),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -282,6 +282,10 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error
 | 
			
		||||
	d.queue.Reset()
 | 
			
		||||
	d.peers.Reset()
 | 
			
		||||
 | 
			
		||||
	select {
 | 
			
		||||
	case <-d.wakeCh:
 | 
			
		||||
	default:
 | 
			
		||||
	}
 | 
			
		||||
	// Create cancel channel for aborting mid-flight
 | 
			
		||||
	d.cancelLock.Lock()
 | 
			
		||||
	d.cancelCh = make(chan struct{})
 | 
			
		||||
@@ -633,7 +637,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
 | 
			
		||||
				glog.V(logger.Debug).Infof("%v: no available hashes", p)
 | 
			
		||||
 | 
			
		||||
				select {
 | 
			
		||||
				case d.processCh <- false:
 | 
			
		||||
				case d.wakeCh <- false:
 | 
			
		||||
				case <-d.cancelCh:
 | 
			
		||||
				}
 | 
			
		||||
				// If no hashes were retrieved at all, the peer violated it's TD promise that it had a
 | 
			
		||||
@@ -664,12 +668,18 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
 | 
			
		||||
				return errBadPeer
 | 
			
		||||
			}
 | 
			
		||||
			// Notify the block fetcher of new hashes, but stop if queue is full
 | 
			
		||||
			cont := d.queue.Pending() < maxQueuedHashes
 | 
			
		||||
			if d.queue.Pending() < maxQueuedHashes {
 | 
			
		||||
				// We still have hashes to fetch, send continuation wake signal (potential)
 | 
			
		||||
				select {
 | 
			
		||||
			case d.processCh <- cont:
 | 
			
		||||
				case d.wakeCh <- true:
 | 
			
		||||
				default:
 | 
			
		||||
				}
 | 
			
		||||
			if !cont {
 | 
			
		||||
			} else {
 | 
			
		||||
				// Hash limit reached, send a termination wake signal (enforced)
 | 
			
		||||
				select {
 | 
			
		||||
				case d.wakeCh <- false:
 | 
			
		||||
				case <-d.cancelCh:
 | 
			
		||||
				}
 | 
			
		||||
				return nil
 | 
			
		||||
			}
 | 
			
		||||
			// Queue not yet full, fetch the next batch
 | 
			
		||||
@@ -766,7 +776,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
 | 
			
		||||
			default:
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
		case cont := <-d.processCh:
 | 
			
		||||
		case cont := <-d.wakeCh:
 | 
			
		||||
			// The hash fetcher sent a continuation flag, check if it's done
 | 
			
		||||
			if !cont {
 | 
			
		||||
				finished = true
 | 
			
		||||
@@ -1053,7 +1063,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
 | 
			
		||||
				glog.V(logger.Debug).Infof("%v: no available headers", p)
 | 
			
		||||
 | 
			
		||||
				select {
 | 
			
		||||
				case d.processCh <- false:
 | 
			
		||||
				case d.wakeCh <- false:
 | 
			
		||||
				case <-d.cancelCh:
 | 
			
		||||
				}
 | 
			
		||||
				// If no headers were retrieved at all, the peer violated it's TD promise that it had a
 | 
			
		||||
@@ -1084,12 +1094,18 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
 | 
			
		||||
				return errBadPeer
 | 
			
		||||
			}
 | 
			
		||||
			// Notify the block fetcher of new headers, but stop if queue is full
 | 
			
		||||
			cont := d.queue.Pending() < maxQueuedHeaders
 | 
			
		||||
			if d.queue.Pending() < maxQueuedHeaders {
 | 
			
		||||
				// We still have headers to fetch, send continuation wake signal (potential)
 | 
			
		||||
				select {
 | 
			
		||||
			case d.processCh <- cont:
 | 
			
		||||
				case d.wakeCh <- true:
 | 
			
		||||
				default:
 | 
			
		||||
				}
 | 
			
		||||
			if !cont {
 | 
			
		||||
			} else {
 | 
			
		||||
				// Header limit reached, send a termination wake signal (enforced)
 | 
			
		||||
				select {
 | 
			
		||||
				case d.wakeCh <- false:
 | 
			
		||||
				case <-d.cancelCh:
 | 
			
		||||
				}
 | 
			
		||||
				return nil
 | 
			
		||||
			}
 | 
			
		||||
			// Queue not yet full, fetch the next batch
 | 
			
		||||
@@ -1104,8 +1120,8 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
 | 
			
		||||
 | 
			
		||||
			// Finish the sync gracefully instead of dumping the gathered data though
 | 
			
		||||
			select {
 | 
			
		||||
			case d.processCh <- false:
 | 
			
		||||
			default:
 | 
			
		||||
			case d.wakeCh <- false:
 | 
			
		||||
			case <-d.cancelCh:
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
@@ -1199,7 +1215,7 @@ func (d *Downloader) fetchBodies(from uint64) error {
 | 
			
		||||
			default:
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
		case cont := <-d.processCh:
 | 
			
		||||
		case cont := <-d.wakeCh:
 | 
			
		||||
			// The header fetcher sent a continuation flag, check if it's done
 | 
			
		||||
			if !cont {
 | 
			
		||||
				finished = true
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user