eth/downloader: stream partial skeleton filling to processor

This commit is contained in:
Péter Szilágyi
2016-04-19 12:27:37 +03:00
parent b40dc8a1da
commit e86619e75d
3 changed files with 55 additions and 19 deletions

View File

@ -87,6 +87,7 @@ type queue struct {
headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations
headerDonePool map[uint64]struct{} // [eth/62] Set of the completed header fetches
headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers
headerProced int // [eth/62] Number of headers already processed from the results
headerOffset uint64 // [eth/62] Number of the first header in the result cache
headerContCh chan bool // [eth/62] Channel to notify when header download finishes
@ -365,6 +366,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
q.headerTaskQueue = prque.New()
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
q.headerProced = 0
q.headerOffset = from
q.headerContCh = make(chan bool, 1)
@ -378,14 +380,14 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
// RetrieveHeaders retrieves the header chain assemble based on the scheduled
// skeleton.
func (q *queue) RetrieveHeaders() []*types.Header {
func (q *queue) RetrieveHeaders() ([]*types.Header, int) {
q.lock.Lock()
defer q.lock.Unlock()
headers := q.headerResults
q.headerResults = nil
headers, proced := q.headerResults, q.headerProced
q.headerResults, q.headerProced = nil, 0
return headers
return headers, proced
}
// Schedule adds a set of headers for the download queue for scheduling, returning
@ -976,7 +978,11 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) {
// DeliverHeaders injects a header retrieval response into the header results
// cache. This method either accepts all headers it received, or none of them
// if they do not map correctly to the skeleton.
func (q *queue) DeliverHeaders(id string, headers []*types.Header) (int, error) {
//
// If the headers are accepted, the method makes an attempt to deliver the set
// of ready headers to the processor to keep the pipeline full. However it will
// not block to prevent stalling other pending deliveries.
func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
@ -1030,10 +1036,27 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header) (int, error)
q.headerTaskQueue.Push(request.From, -float32(request.From))
return 0, errors.New("delivery not accepted")
}
// Clean up a successful fetch, check for termination and return
// Clean up a successful fetch and try to deliver any sub-results
copy(q.headerResults[request.From-q.headerOffset:], headers)
delete(q.headerTaskPool, request.From)
ready := 0
for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil {
ready += MaxHeaderFetch
}
if ready > 0 {
// Headers are ready for delivery, gather them and push forward (non blocking)
process := make([]*types.Header, ready)
copy(process, q.headerResults[q.headerProced:q.headerProced+ready])
select {
case headerProcCh <- process:
glog.V(logger.Detail).Infof("%s: pre-scheduled %d headers from #%v", id, len(process), process[0].Number)
q.headerProced += len(process)
default:
}
}
// Check for termination and return
if len(q.headerTaskPool) == 0 {
q.headerContCh <- false
}