| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | // Copyright 2020 The go-ethereum Authors | 
					
						
							|  |  |  | // This file is part of the go-ethereum library. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  | // it under the terms of the GNU Lesser General Public License as published by | 
					
						
							|  |  |  | // the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  | // (at your option) any later version. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is distributed in the hope that it will be useful, | 
					
						
							|  |  |  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | 
					
						
							|  |  |  | // GNU Lesser General Public License for more details. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // You should have received a copy of the GNU Lesser General Public License | 
					
						
							|  |  |  | // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package state | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | 	"sync" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 	"github.com/ethereum/go-ethereum/common" | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/log" | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/metrics" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | var ( | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | 	// triePrefetchMetricsPrefix is the prefix under which to publis the metrics. | 
					
						
							|  |  |  | 	triePrefetchMetricsPrefix = "trie/prefetch/" | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | // triePrefetcher is an active prefetcher, which receives accounts or storage | 
					
						
							|  |  |  | // items and does trie-loading of them. The goal is to get as much useful content | 
					
						
							|  |  |  | // into the caches as possible. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // Note, the prefetcher's API is not thread safe. | 
					
						
							|  |  |  | type triePrefetcher struct { | 
					
						
							|  |  |  | 	db       Database                    // Database to fetch trie nodes through | 
					
						
							|  |  |  | 	root     common.Hash                 // Root hash of theaccount trie for metrics | 
					
						
							|  |  |  | 	fetches  map[common.Hash]Trie        // Partially or fully fetcher tries | 
					
						
							|  |  |  | 	fetchers map[common.Hash]*subfetcher // Subfetchers for each trie | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	deliveryMissMeter metrics.Meter | 
					
						
							|  |  |  | 	accountLoadMeter  metrics.Meter | 
					
						
							|  |  |  | 	accountDupMeter   metrics.Meter | 
					
						
							|  |  |  | 	accountSkipMeter  metrics.Meter | 
					
						
							|  |  |  | 	accountWasteMeter metrics.Meter | 
					
						
							|  |  |  | 	storageLoadMeter  metrics.Meter | 
					
						
							|  |  |  | 	storageDupMeter   metrics.Meter | 
					
						
							|  |  |  | 	storageSkipMeter  metrics.Meter | 
					
						
							|  |  |  | 	storageWasteMeter metrics.Meter | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | // newTriePrefetcher | 
					
						
							|  |  |  | func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { | 
					
						
							|  |  |  | 	prefix := triePrefetchMetricsPrefix + namespace | 
					
						
							|  |  |  | 	p := &triePrefetcher{ | 
					
						
							|  |  |  | 		db:       db, | 
					
						
							|  |  |  | 		root:     root, | 
					
						
							|  |  |  | 		fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), | 
					
						
							|  |  |  | 		accountLoadMeter:  metrics.GetOrRegisterMeter(prefix+"/account/load", nil), | 
					
						
							|  |  |  | 		accountDupMeter:   metrics.GetOrRegisterMeter(prefix+"/account/dup", nil), | 
					
						
							|  |  |  | 		accountSkipMeter:  metrics.GetOrRegisterMeter(prefix+"/account/skip", nil), | 
					
						
							|  |  |  | 		accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil), | 
					
						
							|  |  |  | 		storageLoadMeter:  metrics.GetOrRegisterMeter(prefix+"/storage/load", nil), | 
					
						
							|  |  |  | 		storageDupMeter:   metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil), | 
					
						
							|  |  |  | 		storageSkipMeter:  metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), | 
					
						
							|  |  |  | 		storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | 	return p | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | // close iterates over all the subfetchers, aborts any that were left spinning | 
					
						
							|  |  |  | // and reports the stats to the metrics subsystem. | 
					
						
							|  |  |  | func (p *triePrefetcher) close() { | 
					
						
							|  |  |  | 	for _, fetcher := range p.fetchers { | 
					
						
							|  |  |  | 		fetcher.abort() // safe to do multiple times | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | 		if metrics.Enabled { | 
					
						
							|  |  |  | 			if fetcher.root == p.root { | 
					
						
							|  |  |  | 				p.accountLoadMeter.Mark(int64(len(fetcher.seen))) | 
					
						
							|  |  |  | 				p.accountDupMeter.Mark(int64(fetcher.dups)) | 
					
						
							|  |  |  | 				p.accountSkipMeter.Mark(int64(len(fetcher.tasks))) | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | 				for _, key := range fetcher.used { | 
					
						
							|  |  |  | 					delete(fetcher.seen, string(key)) | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 				} | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | 				p.accountWasteMeter.Mark(int64(len(fetcher.seen))) | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 			} else { | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | 				p.storageLoadMeter.Mark(int64(len(fetcher.seen))) | 
					
						
							|  |  |  | 				p.storageDupMeter.Mark(int64(fetcher.dups)) | 
					
						
							|  |  |  | 				p.storageSkipMeter.Mark(int64(len(fetcher.tasks))) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 				for _, key := range fetcher.used { | 
					
						
							|  |  |  | 					delete(fetcher.seen, string(key)) | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 				} | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | 				p.storageWasteMeter.Mark(int64(len(fetcher.seen))) | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | 	// Clear out all fetchers (will crash on a second call, deliberate) | 
					
						
							|  |  |  | 	p.fetchers = nil | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | // copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data | 
					
						
							|  |  |  | // already loaded will be copied over, but no goroutines will be started. This | 
					
						
							|  |  |  | // is mostly used in the miner which creates a copy of it's actively mutated | 
					
						
							|  |  |  | // state to be sealed while it may further mutate the state. | 
					
						
							|  |  |  | func (p *triePrefetcher) copy() *triePrefetcher { | 
					
						
							|  |  |  | 	copy := &triePrefetcher{ | 
					
						
							|  |  |  | 		db:      p.db, | 
					
						
							|  |  |  | 		root:    p.root, | 
					
						
							|  |  |  | 		fetches: make(map[common.Hash]Trie), // Active prefetchers use the fetches map | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		deliveryMissMeter: p.deliveryMissMeter, | 
					
						
							|  |  |  | 		accountLoadMeter:  p.accountLoadMeter, | 
					
						
							|  |  |  | 		accountDupMeter:   p.accountDupMeter, | 
					
						
							|  |  |  | 		accountSkipMeter:  p.accountSkipMeter, | 
					
						
							|  |  |  | 		accountWasteMeter: p.accountWasteMeter, | 
					
						
							|  |  |  | 		storageLoadMeter:  p.storageLoadMeter, | 
					
						
							|  |  |  | 		storageDupMeter:   p.storageDupMeter, | 
					
						
							|  |  |  | 		storageSkipMeter:  p.storageSkipMeter, | 
					
						
							|  |  |  | 		storageWasteMeter: p.storageWasteMeter, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	// If the prefetcher is already a copy, duplicate the data | 
					
						
							|  |  |  | 	if p.fetches != nil { | 
					
						
							|  |  |  | 		for root, fetch := range p.fetches { | 
					
						
							|  |  |  | 			copy.fetches[root] = p.db.CopyTrie(fetch) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return copy | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	// Otherwise we're copying an active fetcher, retrieve the current states | 
					
						
							|  |  |  | 	for root, fetcher := range p.fetchers { | 
					
						
							|  |  |  | 		copy.fetches[root] = fetcher.peek() | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | 	return copy | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | // prefetch schedules a batch of trie items to prefetch. | 
					
						
							|  |  |  | func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte) { | 
					
						
							|  |  |  | 	// If the prefetcher is an inactive one, bail out | 
					
						
							|  |  |  | 	if p.fetches != nil { | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	// Active fetcher, schedule the retrievals | 
					
						
							|  |  |  | 	fetcher := p.fetchers[root] | 
					
						
							|  |  |  | 	if fetcher == nil { | 
					
						
							|  |  |  | 		fetcher = newSubfetcher(p.db, root) | 
					
						
							|  |  |  | 		p.fetchers[root] = fetcher | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | 	fetcher.schedule(keys) | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | // trie returns the trie matching the root hash, or nil if the prefetcher doesn't | 
					
						
							|  |  |  | // have it. | 
					
						
							|  |  |  | func (p *triePrefetcher) trie(root common.Hash) Trie { | 
					
						
							|  |  |  | 	// If the prefetcher is inactive, return from existing deep copies | 
					
						
							|  |  |  | 	if p.fetches != nil { | 
					
						
							|  |  |  | 		trie := p.fetches[root] | 
					
						
							|  |  |  | 		if trie == nil { | 
					
						
							|  |  |  | 			p.deliveryMissMeter.Mark(1) | 
					
						
							|  |  |  | 			return nil | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return p.db.CopyTrie(trie) | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | 	// Otherwise the prefetcher is active, bail if no trie was prefetched for this root | 
					
						
							|  |  |  | 	fetcher := p.fetchers[root] | 
					
						
							|  |  |  | 	if fetcher == nil { | 
					
						
							|  |  |  | 		p.deliveryMissMeter.Mark(1) | 
					
						
							|  |  |  | 		return nil | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | 	// Interrupt the prefetcher if it's by any chance still running and return | 
					
						
							|  |  |  | 	// a copy of any pre-loaded trie. | 
					
						
							|  |  |  | 	fetcher.abort() // safe to do multiple times | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	trie := fetcher.peek() | 
					
						
							|  |  |  | 	if trie == nil { | 
					
						
							|  |  |  | 		p.deliveryMissMeter.Mark(1) | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return trie | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | // used marks a batch of state items used to allow creating statistics as to | 
					
						
							|  |  |  | // how useful or wasteful the prefetcher is. | 
					
						
							|  |  |  | func (p *triePrefetcher) used(root common.Hash, used [][]byte) { | 
					
						
							|  |  |  | 	if fetcher := p.fetchers[root]; fetcher != nil { | 
					
						
							|  |  |  | 		fetcher.used = used | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // subfetcher is a trie fetcher goroutine responsible for pulling entries for a | 
					
						
							|  |  |  | // single trie. It is spawned when a new root is encountered and lives until the | 
					
						
							|  |  |  | // main prefetcher is paused and either all requested items are processed or if | 
					
						
							|  |  |  | // the trie being worked on is retrieved from the prefetcher. | 
					
						
							|  |  |  | type subfetcher struct { | 
					
						
							|  |  |  | 	db   Database    // Database to load trie nodes through | 
					
						
							|  |  |  | 	root common.Hash // Root hash of the trie to prefetch | 
					
						
							|  |  |  | 	trie Trie        // Trie being populated with nodes | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	tasks [][]byte   // Items queued up for retrieval | 
					
						
							|  |  |  | 	lock  sync.Mutex // Lock protecting the task queue | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	wake chan struct{}  // Wake channel if a new task is scheduled | 
					
						
							|  |  |  | 	stop chan struct{}  // Channel to interrupt processing | 
					
						
							|  |  |  | 	term chan struct{}  // Channel to signal iterruption | 
					
						
							|  |  |  | 	copy chan chan Trie // Channel to request a copy of the current trie | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	seen map[string]struct{} // Tracks the entries already loaded | 
					
						
							|  |  |  | 	dups int                 // Number of duplicate preload tasks | 
					
						
							|  |  |  | 	used [][]byte            // Tracks the entries used in the end | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // newSubfetcher creates a goroutine to prefetch state items belonging to a | 
					
						
							|  |  |  | // particular root hash. | 
					
						
							|  |  |  | func newSubfetcher(db Database, root common.Hash) *subfetcher { | 
					
						
							|  |  |  | 	sf := &subfetcher{ | 
					
						
							|  |  |  | 		db:   db, | 
					
						
							|  |  |  | 		root: root, | 
					
						
							|  |  |  | 		wake: make(chan struct{}, 1), | 
					
						
							|  |  |  | 		stop: make(chan struct{}), | 
					
						
							|  |  |  | 		term: make(chan struct{}), | 
					
						
							|  |  |  | 		copy: make(chan chan Trie), | 
					
						
							|  |  |  | 		seen: make(map[string]struct{}), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	go sf.loop() | 
					
						
							|  |  |  | 	return sf | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // schedule adds a batch of trie keys to the queue to prefetch. | 
					
						
							|  |  |  | func (sf *subfetcher) schedule(keys [][]byte) { | 
					
						
							|  |  |  | 	// Append the tasks to the current queue | 
					
						
							|  |  |  | 	sf.lock.Lock() | 
					
						
							|  |  |  | 	sf.tasks = append(sf.tasks, keys...) | 
					
						
							|  |  |  | 	sf.lock.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Notify the prefetcher, it's fine if it's already terminated | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 	select { | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | 	case sf.wake <- struct{}{}: | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 	default: | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | // peek tries to retrieve a deep copy of the fetcher's trie in whatever form it | 
					
						
							|  |  |  | // is currently. | 
					
						
							|  |  |  | func (sf *subfetcher) peek() Trie { | 
					
						
							|  |  |  | 	ch := make(chan Trie) | 
					
						
							|  |  |  | 	select { | 
					
						
							|  |  |  | 	case sf.copy <- ch: | 
					
						
							|  |  |  | 		// Subfetcher still alive, return copy from it | 
					
						
							|  |  |  | 		return <-ch | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	case <-sf.term: | 
					
						
							|  |  |  | 		// Subfetcher already terminated, return a copy directly | 
					
						
							|  |  |  | 		if sf.trie == nil { | 
					
						
							|  |  |  | 			return nil | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return sf.db.CopyTrie(sf.trie) | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // abort interrupts the subfetcher immediately. It is safe to call abort multiple | 
					
						
							|  |  |  | // times but it is not thread safe. | 
					
						
							|  |  |  | func (sf *subfetcher) abort() { | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 	select { | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | 	case <-sf.stop: | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 	default: | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | 		close(sf.stop) | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | 	<-sf.term | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-08 15:01:49 +02:00
										 |  |  | // loop waits for new tasks to be scheduled and keeps loading them until it runs | 
					
						
							|  |  |  | // out of tasks or its underlying trie is retrieved for committing. | 
					
						
							|  |  |  | func (sf *subfetcher) loop() { | 
					
						
							|  |  |  | 	// No matter how the loop stops, signal anyone waiting that it's terminated | 
					
						
							|  |  |  | 	defer close(sf.term) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Start by opening the trie and stop processing if it fails | 
					
						
							|  |  |  | 	trie, err := sf.db.OpenTrie(sf.root) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	sf.trie = trie | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Trie opened successfully, keep prefetching items | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case <-sf.wake: | 
					
						
							|  |  |  | 			// Subfetcher was woken up, retrieve any tasks to avoid spinning the lock | 
					
						
							|  |  |  | 			sf.lock.Lock() | 
					
						
							|  |  |  | 			tasks := sf.tasks | 
					
						
							|  |  |  | 			sf.tasks = nil | 
					
						
							|  |  |  | 			sf.lock.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			// Prefetch any tasks until the loop is interrupted | 
					
						
							|  |  |  | 			for i, task := range tasks { | 
					
						
							|  |  |  | 				select { | 
					
						
							|  |  |  | 				case <-sf.stop: | 
					
						
							|  |  |  | 					// If termination is requested, add any leftover back and return | 
					
						
							|  |  |  | 					sf.lock.Lock() | 
					
						
							|  |  |  | 					sf.tasks = append(sf.tasks, tasks[i:]...) | 
					
						
							|  |  |  | 					sf.lock.Unlock() | 
					
						
							|  |  |  | 					return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 				case ch := <-sf.copy: | 
					
						
							|  |  |  | 					// Somebody wants a copy of the current trie, grant them | 
					
						
							|  |  |  | 					ch <- sf.db.CopyTrie(sf.trie) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 				default: | 
					
						
							|  |  |  | 					// No termination request yet, prefetch the next entry | 
					
						
							|  |  |  | 					taskid := string(task) | 
					
						
							|  |  |  | 					if _, ok := sf.seen[taskid]; ok { | 
					
						
							|  |  |  | 						sf.dups++ | 
					
						
							|  |  |  | 					} else { | 
					
						
							|  |  |  | 						sf.trie.TryGet(task) | 
					
						
							|  |  |  | 						sf.seen[taskid] = struct{}{} | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		case ch := <-sf.copy: | 
					
						
							|  |  |  | 			// Somebody wants a copy of the current trie, grant them | 
					
						
							|  |  |  | 			ch <- sf.db.CopyTrie(sf.trie) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		case <-sf.stop: | 
					
						
							|  |  |  | 			// Termination is requested, abort and leave remaining tasks | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2020-02-05 13:12:09 +01:00
										 |  |  | } |