[release/1.4.6] eth/downloader: stream partial skeleton filling to processor
(cherry picked from commit e86619e75d)
			
			
This commit is contained in:
		| @@ -87,6 +87,7 @@ type queue struct { | ||||
| 	headerPendPool  map[string]*fetchRequest       // [eth/62] Currently pending header retrieval operations | ||||
| 	headerDonePool  map[uint64]struct{}            // [eth/62] Set of the completed header fetches | ||||
| 	headerResults   []*types.Header                // [eth/62] Result cache accumulating the completed headers | ||||
| 	headerProced    int                            // [eth/62] Number of headers already processed from the results | ||||
| 	headerOffset    uint64                         // [eth/62] Number of the first header in the result cache | ||||
| 	headerContCh    chan bool                      // [eth/62] Channel to notify when header download finishes | ||||
|  | ||||
| @@ -365,6 +366,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { | ||||
| 	q.headerTaskQueue = prque.New() | ||||
| 	q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains | ||||
| 	q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch) | ||||
| 	q.headerProced = 0 | ||||
| 	q.headerOffset = from | ||||
| 	q.headerContCh = make(chan bool, 1) | ||||
|  | ||||
| @@ -378,14 +380,14 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { | ||||
|  | ||||
| // RetrieveHeaders retrieves the header chain assemble based on the scheduled | ||||
| // skeleton. | ||||
| func (q *queue) RetrieveHeaders() []*types.Header { | ||||
| func (q *queue) RetrieveHeaders() ([]*types.Header, int) { | ||||
| 	q.lock.Lock() | ||||
| 	defer q.lock.Unlock() | ||||
|  | ||||
| 	headers := q.headerResults | ||||
| 	q.headerResults = nil | ||||
| 	headers, proced := q.headerResults, q.headerProced | ||||
| 	q.headerResults, q.headerProced = nil, 0 | ||||
|  | ||||
| 	return headers | ||||
| 	return headers, proced | ||||
| } | ||||
|  | ||||
| // Schedule adds a set of headers for the download queue for scheduling, returning | ||||
| @@ -976,7 +978,11 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) { | ||||
| // DeliverHeaders injects a header retrieval response into the header results | ||||
| // cache. This method either accepts all headers it received, or none of them | ||||
| // if they do not map correctly to the skeleton. | ||||
| func (q *queue) DeliverHeaders(id string, headers []*types.Header) (int, error) { | ||||
| // | ||||
| // If the headers are accepted, the method makes an attempt to deliver the set | ||||
| // of ready headers to the processor to keep the pipeline full. However it will | ||||
| // not block to prevent stalling other pending deliveries. | ||||
| func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) { | ||||
| 	q.lock.Lock() | ||||
| 	defer q.lock.Unlock() | ||||
|  | ||||
| @@ -1030,10 +1036,27 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header) (int, error) | ||||
| 		q.headerTaskQueue.Push(request.From, -float32(request.From)) | ||||
| 		return 0, errors.New("delivery not accepted") | ||||
| 	} | ||||
| 	// Clean up a successful fetch, check for termination and return | ||||
| 	// Clean up a successful fetch and try to deliver any sub-results | ||||
| 	copy(q.headerResults[request.From-q.headerOffset:], headers) | ||||
| 	delete(q.headerTaskPool, request.From) | ||||
|  | ||||
| 	ready := 0 | ||||
| 	for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil { | ||||
| 		ready += MaxHeaderFetch | ||||
| 	} | ||||
| 	if ready > 0 { | ||||
| 		// Headers are ready for delivery, gather them and push forward (non blocking) | ||||
| 		process := make([]*types.Header, ready) | ||||
| 		copy(process, q.headerResults[q.headerProced:q.headerProced+ready]) | ||||
|  | ||||
| 		select { | ||||
| 		case headerProcCh <- process: | ||||
| 			glog.V(logger.Detail).Infof("%s: pre-scheduled %d headers from #%v", id, len(process), process[0].Number) | ||||
| 			q.headerProced += len(process) | ||||
| 		default: | ||||
| 		} | ||||
| 	} | ||||
| 	// Check for termination and return | ||||
| 	if len(q.headerTaskPool) == 0 { | ||||
| 		q.headerContCh <- false | ||||
| 	} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user