212 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			212 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| 
								 | 
							
								// Copyright 2016 The go-ethereum Authors
							 | 
						||
| 
								 | 
							
								// This file is part of the go-ethereum library.
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// The go-ethereum library is free software: you can redistribute it and/or modify
							 | 
						||
| 
								 | 
							
								// it under the terms of the GNU Lesser General Public License as published by
							 | 
						||
| 
								 | 
							
								// the Free Software Foundation, either version 3 of the License, or
							 | 
						||
| 
								 | 
							
								// (at your option) any later version.
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// The go-ethereum library is distributed in the hope that it will be useful,
							 | 
						||
| 
								 | 
							
								// but WITHOUT ANY WARRANTY; without even the implied warranty of
							 | 
						||
| 
								 | 
							
								// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
							 | 
						||
| 
								 | 
							
								// GNU Lesser General Public License for more details.
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// You should have received a copy of the GNU Lesser General Public License
							 | 
						||
| 
								 | 
							
								// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								package storage
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import (
							 | 
						||
| 
								 | 
							
									"encoding/binary"
							 | 
						||
| 
								 | 
							
									"fmt"
							 | 
						||
| 
								 | 
							
									"io"
							 | 
						||
| 
								 | 
							
									"math"
							 | 
						||
| 
								 | 
							
									"strings"
							 | 
						||
| 
								 | 
							
									"sync"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									"github.com/ethereum/go-ethereum/common"
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								const (
							 | 
						||
| 
								 | 
							
									processors = 8
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								type Tree struct {
							 | 
						||
| 
								 | 
							
									Chunks int64
							 | 
						||
| 
								 | 
							
									Levels []map[int64]*Node
							 | 
						||
| 
								 | 
							
									Lock   sync.RWMutex
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								type Node struct {
							 | 
						||
| 
								 | 
							
									Pending  int64
							 | 
						||
| 
								 | 
							
									Size     uint64
							 | 
						||
| 
								 | 
							
									Children []common.Hash
							 | 
						||
| 
								 | 
							
									Last     bool
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (self *Node) String() string {
							 | 
						||
| 
								 | 
							
									var children []string
							 | 
						||
| 
								 | 
							
									for _, node := range self.Children {
							 | 
						||
| 
								 | 
							
										children = append(children, node.Hex())
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									return fmt.Sprintf("pending: %v, size: %v, last :%v, children: %v", self.Pending, self.Size, self.Last, strings.Join(children, ", "))
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								type Task struct {
							 | 
						||
| 
								 | 
							
									Index int64 // Index of the chunk being processed
							 | 
						||
| 
								 | 
							
									Size  uint64
							 | 
						||
| 
								 | 
							
									Data  []byte // Binary blob of the chunk
							 | 
						||
| 
								 | 
							
									Last  bool
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								type PyramidChunker struct {
							 | 
						||
| 
								 | 
							
									hashFunc    Hasher
							 | 
						||
| 
								 | 
							
									chunkSize   int64
							 | 
						||
| 
								 | 
							
									hashSize    int64
							 | 
						||
| 
								 | 
							
									branches    int64
							 | 
						||
| 
								 | 
							
									workerCount int
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func NewPyramidChunker(params *ChunkerParams) (self *PyramidChunker) {
							 | 
						||
| 
								 | 
							
									self = &PyramidChunker{}
							 | 
						||
| 
								 | 
							
									self.hashFunc = MakeHashFunc(params.Hash)
							 | 
						||
| 
								 | 
							
									self.branches = params.Branches
							 | 
						||
| 
								 | 
							
									self.hashSize = int64(self.hashFunc().Size())
							 | 
						||
| 
								 | 
							
									self.chunkSize = self.hashSize * self.branches
							 | 
						||
| 
								 | 
							
									self.workerCount = 1
							 | 
						||
| 
								 | 
							
									return
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									chunks := (size + self.chunkSize - 1) / self.chunkSize
							 | 
						||
| 
								 | 
							
									depth := int(math.Ceil(math.Log(float64(chunks))/math.Log(float64(self.branches)))) + 1
							 | 
						||
| 
								 | 
							
									// glog.V(logger.Detail).Infof("chunks: %v, depth: %v", chunks, depth)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									results := Tree{
							 | 
						||
| 
								 | 
							
										Chunks: chunks,
							 | 
						||
| 
								 | 
							
										Levels: make([]map[int64]*Node, depth),
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									for i := 0; i < depth; i++ {
							 | 
						||
| 
								 | 
							
										results.Levels[i] = make(map[int64]*Node)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									// Create a pool of workers to crunch through the file
							 | 
						||
| 
								 | 
							
									tasks := make(chan *Task, 2*processors)
							 | 
						||
| 
								 | 
							
									pend := new(sync.WaitGroup)
							 | 
						||
| 
								 | 
							
									abortC := make(chan bool)
							 | 
						||
| 
								 | 
							
									for i := 0; i < processors; i++ {
							 | 
						||
| 
								 | 
							
										pend.Add(1)
							 | 
						||
| 
								 | 
							
										go self.processor(pend, swg, tasks, chunkC, &results)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									// Feed the chunks into the task pool
							 | 
						||
| 
								 | 
							
									for index := 0; ; index++ {
							 | 
						||
| 
								 | 
							
										buffer := make([]byte, self.chunkSize+8)
							 | 
						||
| 
								 | 
							
										n, err := data.Read(buffer[8:])
							 | 
						||
| 
								 | 
							
										last := err == io.ErrUnexpectedEOF || err == io.EOF
							 | 
						||
| 
								 | 
							
										// glog.V(logger.Detail).Infof("n: %v, index: %v, depth: %v", n, index, depth)
							 | 
						||
| 
								 | 
							
										if err != nil && !last {
							 | 
						||
| 
								 | 
							
											// glog.V(logger.Info).Infof("error: %v", err)
							 | 
						||
| 
								 | 
							
											close(abortC)
							 | 
						||
| 
								 | 
							
											break
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										binary.LittleEndian.PutUint64(buffer[:8], uint64(n))
							 | 
						||
| 
								 | 
							
										pend.Add(1)
							 | 
						||
| 
								 | 
							
										// glog.V(logger.Info).Infof("-> task %v (%v)", index, n)
							 | 
						||
| 
								 | 
							
										select {
							 | 
						||
| 
								 | 
							
										case tasks <- &Task{Index: int64(index), Size: uint64(n), Data: buffer[:n+8], Last: last}:
							 | 
						||
| 
								 | 
							
										case <-abortC:
							 | 
						||
| 
								 | 
							
											return nil, err
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										if last {
							 | 
						||
| 
								 | 
							
											// glog.V(logger.Info).Infof("last task %v (%v)", index, n)
							 | 
						||
| 
								 | 
							
											break
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									// Wait for the workers and return
							 | 
						||
| 
								 | 
							
									close(tasks)
							 | 
						||
| 
								 | 
							
									pend.Wait()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// glog.V(logger.Info).Infof("len: %v", results.Levels[0][0])
							 | 
						||
| 
								 | 
							
									key := results.Levels[0][0].Children[0][:]
							 | 
						||
| 
								 | 
							
									return key, nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Task, chunkC chan *Chunk, results *Tree) {
							 | 
						||
| 
								 | 
							
									defer pend.Done()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// glog.V(logger.Info).Infof("processor started")
							 | 
						||
| 
								 | 
							
									// Start processing leaf chunks ad infinitum
							 | 
						||
| 
								 | 
							
									hasher := self.hashFunc()
							 | 
						||
| 
								 | 
							
									for task := range tasks {
							 | 
						||
| 
								 | 
							
										depth, pow := len(results.Levels)-1, self.branches
							 | 
						||
| 
								 | 
							
										// glog.V(logger.Info).Infof("task: %v, last: %v", task.Index, task.Last)
							 | 
						||
| 
								 | 
							
										size := task.Size
							 | 
						||
| 
								 | 
							
										data := task.Data
							 | 
						||
| 
								 | 
							
										var node *Node
							 | 
						||
| 
								 | 
							
										for depth >= 0 {
							 | 
						||
| 
								 | 
							
											// New chunk received, reset the hasher and start processing
							 | 
						||
| 
								 | 
							
											hasher.Reset()
							 | 
						||
| 
								 | 
							
											if node == nil { // Leaf node, hash the data chunk
							 | 
						||
| 
								 | 
							
												hasher.Write(task.Data)
							 | 
						||
| 
								 | 
							
											} else { // Internal node, hash the children
							 | 
						||
| 
								 | 
							
												size = node.Size
							 | 
						||
| 
								 | 
							
												data = make([]byte, hasher.Size()*len(node.Children)+8)
							 | 
						||
| 
								 | 
							
												binary.LittleEndian.PutUint64(data[:8], size)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
												hasher.Write(data[:8])
							 | 
						||
| 
								 | 
							
												for i, hash := range node.Children {
							 | 
						||
| 
								 | 
							
													copy(data[i*hasher.Size()+8:], hash[:])
							 | 
						||
| 
								 | 
							
													hasher.Write(hash[:])
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
											hash := hasher.Sum(nil)
							 | 
						||
| 
								 | 
							
											last := task.Last || (node != nil) && node.Last
							 | 
						||
| 
								 | 
							
											// Insert the subresult into the memoization tree
							 | 
						||
| 
								 | 
							
											results.Lock.Lock()
							 | 
						||
| 
								 | 
							
											if node = results.Levels[depth][task.Index/pow]; node == nil {
							 | 
						||
| 
								 | 
							
												// Figure out the pending tasks
							 | 
						||
| 
								 | 
							
												pending := self.branches
							 | 
						||
| 
								 | 
							
												if task.Index/pow == results.Chunks/pow {
							 | 
						||
| 
								 | 
							
													pending = (results.Chunks + pow/self.branches - 1) / (pow / self.branches) % self.branches
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
												node = &Node{pending, 0, make([]common.Hash, pending), last}
							 | 
						||
| 
								 | 
							
												results.Levels[depth][task.Index/pow] = node
							 | 
						||
| 
								 | 
							
												// glog.V(logger.Info).Infof("create node %v, %v (%v children, all pending)", depth, task.Index/pow, pending)
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
											node.Pending--
							 | 
						||
| 
								 | 
							
											// glog.V(logger.Info).Infof("pending now: %v", node.Pending)
							 | 
						||
| 
								 | 
							
											i := task.Index / (pow / self.branches) % self.branches
							 | 
						||
| 
								 | 
							
											if last {
							 | 
						||
| 
								 | 
							
												node.Last = true
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
											copy(node.Children[i][:], hash)
							 | 
						||
| 
								 | 
							
											node.Size += size
							 | 
						||
| 
								 | 
							
											left := node.Pending
							 | 
						||
| 
								 | 
							
											// glog.V(logger.Info).Infof("left pending now: %v, node size: %v", left, node.Size)
							 | 
						||
| 
								 | 
							
											if chunkC != nil {
							 | 
						||
| 
								 | 
							
												if swg != nil {
							 | 
						||
| 
								 | 
							
													swg.Add(1)
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
												select {
							 | 
						||
| 
								 | 
							
												case chunkC <- &Chunk{Key: hash, SData: data, wg: swg}:
							 | 
						||
| 
								 | 
							
													// case <- self.quitC
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
											if depth+1 < len(results.Levels) {
							 | 
						||
| 
								 | 
							
												delete(results.Levels[depth+1], task.Index/(pow/self.branches))
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
											results.Lock.Unlock()
							 | 
						||
| 
								 | 
							
											// If there's more work to be done, leave for others
							 | 
						||
| 
								 | 
							
											// glog.V(logger.Info).Infof("left %v", left)
							 | 
						||
| 
								 | 
							
											if left > 0 {
							 | 
						||
| 
								 | 
							
												break
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
											// We're the last ones in this batch, merge the children together
							 | 
						||
| 
								 | 
							
											depth--
							 | 
						||
| 
								 | 
							
											pow *= self.branches
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										pend.Done()
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 |