trie: make fullnode children hash calculation concurrently (#15131)
* trie: make fullnode children hash calculation concurrently * trie: thread out only on topmost fullnode * trie: clean up full node children hash calculation * trie: minor code fixups
This commit is contained in:
		
				
					committed by
					
						 Péter Szilágyi
						Péter Szilágyi
					
				
			
			
				
	
			
			
			
						parent
						
							e4c9fd29a3
						
					
				
				
					commit
					0f7fbb85d6
				
			
							
								
								
									
										111
									
								
								trie/hasher.go
									
									
									
									
									
								
							
							
						
						
									
										111
									
								
								trie/hasher.go
									
									
									
									
									
								
							| @@ -26,27 +26,46 @@ import ( | ||||
| 	"github.com/ethereum/go-ethereum/rlp" | ||||
| ) | ||||
|  | ||||
| type hasher struct { | ||||
| 	tmp                  *bytes.Buffer | ||||
| 	sha                  hash.Hash | ||||
| 	cachegen, cachelimit uint16 | ||||
| // calculator is a utility used by the hasher to calculate the hash value of the tree node. | ||||
| type calculator struct { | ||||
| 	sha    hash.Hash | ||||
| 	buffer *bytes.Buffer | ||||
| } | ||||
|  | ||||
| // hashers live in a global pool. | ||||
| var hasherPool = sync.Pool{ | ||||
| // calculatorPool is a set of temporary calculators that may be individually saved and retrieved. | ||||
| var calculatorPool = sync.Pool{ | ||||
| 	New: func() interface{} { | ||||
| 		return &hasher{tmp: new(bytes.Buffer), sha: sha3.NewKeccak256()} | ||||
| 		return &calculator{buffer: new(bytes.Buffer), sha: sha3.NewKeccak256()} | ||||
| 	}, | ||||
| } | ||||
|  | ||||
| // hasher hasher is used to calculate the hash value of the whole tree. | ||||
| type hasher struct { | ||||
| 	cachegen   uint16 | ||||
| 	cachelimit uint16 | ||||
| 	threaded   bool | ||||
| 	mu         sync.Mutex | ||||
| } | ||||
|  | ||||
| func newHasher(cachegen, cachelimit uint16) *hasher { | ||||
| 	h := hasherPool.Get().(*hasher) | ||||
| 	h.cachegen, h.cachelimit = cachegen, cachelimit | ||||
| 	h := &hasher{ | ||||
| 		cachegen:   cachegen, | ||||
| 		cachelimit: cachelimit, | ||||
| 	} | ||||
| 	return h | ||||
| } | ||||
|  | ||||
| func returnHasherToPool(h *hasher) { | ||||
| 	hasherPool.Put(h) | ||||
| // newCalculator retrieves a cleaned calculator from calculator pool. | ||||
| func (h *hasher) newCalculator() *calculator { | ||||
| 	calculator := calculatorPool.Get().(*calculator) | ||||
| 	calculator.buffer.Reset() | ||||
| 	calculator.sha.Reset() | ||||
| 	return calculator | ||||
| } | ||||
|  | ||||
| // returnCalculator returns a no longer used calculator to the pool. | ||||
| func (h *hasher) returnCalculator(calculator *calculator) { | ||||
| 	calculatorPool.Put(calculator) | ||||
| } | ||||
|  | ||||
| // hash collapses a node down into a hash node, also returning a copy of the | ||||
| @@ -123,15 +142,48 @@ func (h *hasher) hashChildren(original node, db DatabaseWriter) (node, node, err | ||||
| 		// Hash the full node's children, caching the newly hashed subtrees | ||||
| 		collapsed, cached := n.copy(), n.copy() | ||||
|  | ||||
| 		for i := 0; i < 16; i++ { | ||||
| 			if n.Children[i] != nil { | ||||
| 				collapsed.Children[i], cached.Children[i], err = h.hash(n.Children[i], db, false) | ||||
| 				if err != nil { | ||||
| 					return original, original, err | ||||
| 				} | ||||
| 			} else { | ||||
| 				collapsed.Children[i] = valueNode(nil) // Ensure that nil children are encoded as empty strings. | ||||
| 		// hashChild is a helper to hash a single child, which is called either on the | ||||
| 		// same thread as the caller or in a goroutine for the toplevel branching. | ||||
| 		hashChild := func(index int, wg *sync.WaitGroup) { | ||||
| 			if wg != nil { | ||||
| 				defer wg.Done() | ||||
| 			} | ||||
| 			// Ensure that nil children are encoded as empty strings. | ||||
| 			if collapsed.Children[index] == nil { | ||||
| 				collapsed.Children[index] = valueNode(nil) | ||||
| 				return | ||||
| 			} | ||||
| 			// Hash all other children properly | ||||
| 			var herr error | ||||
| 			collapsed.Children[index], cached.Children[index], herr = h.hash(n.Children[index], db, false) | ||||
| 			if herr != nil { | ||||
| 				h.mu.Lock() // rarely if ever locked, no congenstion | ||||
| 				err = herr | ||||
| 				h.mu.Unlock() | ||||
| 			} | ||||
| 		} | ||||
| 		// If we're not running in threaded mode yet, span a goroutine for each child | ||||
| 		if !h.threaded { | ||||
| 			// Disable further threading | ||||
| 			h.threaded = true | ||||
|  | ||||
| 			// Hash all the children concurrently | ||||
| 			var wg sync.WaitGroup | ||||
| 			for i := 0; i < 16; i++ { | ||||
| 				wg.Add(1) | ||||
| 				go hashChild(i, &wg) | ||||
| 			} | ||||
| 			wg.Wait() | ||||
|  | ||||
| 			// Reenable threading for subsequent hash calls | ||||
| 			h.threaded = false | ||||
| 		} else { | ||||
| 			for i := 0; i < 16; i++ { | ||||
| 				hashChild(i, nil) | ||||
| 			} | ||||
| 		} | ||||
| 		if err != nil { | ||||
| 			return original, original, err | ||||
| 		} | ||||
| 		cached.Children[16] = n.Children[16] | ||||
| 		if collapsed.Children[16] == nil { | ||||
| @@ -150,24 +202,29 @@ func (h *hasher) store(n node, db DatabaseWriter, force bool) (node, error) { | ||||
| 	if _, isHash := n.(hashNode); n == nil || isHash { | ||||
| 		return n, nil | ||||
| 	} | ||||
| 	calculator := h.newCalculator() | ||||
| 	defer h.returnCalculator(calculator) | ||||
|  | ||||
| 	// Generate the RLP encoding of the node | ||||
| 	h.tmp.Reset() | ||||
| 	if err := rlp.Encode(h.tmp, n); err != nil { | ||||
| 	if err := rlp.Encode(calculator.buffer, n); err != nil { | ||||
| 		panic("encode error: " + err.Error()) | ||||
| 	} | ||||
|  | ||||
| 	if h.tmp.Len() < 32 && !force { | ||||
| 	if calculator.buffer.Len() < 32 && !force { | ||||
| 		return n, nil // Nodes smaller than 32 bytes are stored inside their parent | ||||
| 	} | ||||
| 	// Larger nodes are replaced by their hash and stored in the database. | ||||
| 	hash, _ := n.cache() | ||||
| 	if hash == nil { | ||||
| 		h.sha.Reset() | ||||
| 		h.sha.Write(h.tmp.Bytes()) | ||||
| 		hash = hashNode(h.sha.Sum(nil)) | ||||
| 		calculator.sha.Write(calculator.buffer.Bytes()) | ||||
| 		hash = hashNode(calculator.sha.Sum(nil)) | ||||
| 	} | ||||
| 	if db != nil { | ||||
| 		return hash, db.Put(hash, h.tmp.Bytes()) | ||||
| 		// db might be a leveldb batch, which is not safe for concurrent writes | ||||
| 		h.mu.Lock() | ||||
| 		err := db.Put(hash, calculator.buffer.Bytes()) | ||||
| 		h.mu.Unlock() | ||||
|  | ||||
| 		return hash, err | ||||
| 	} | ||||
| 	return hash, nil | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user