| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | // Copyright 2016 The go-ethereum Authors | 
					
						
							|  |  |  | // This file is part of the go-ethereum library. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  | // it under the terms of the GNU Lesser General Public License as published by | 
					
						
							|  |  |  | // the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  | // (at your option) any later version. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is distributed in the hope that it will be useful, | 
					
						
							|  |  |  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | 
					
						
							|  |  |  | // GNU Lesser General Public License for more details. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // You should have received a copy of the GNU Lesser General Public License | 
					
						
							|  |  |  | // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package storage | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							| 
									
										
										
										
											2018-07-09 14:11:49 +02:00
										 |  |  | 	"context" | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 	"encoding/binary" | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	"errors" | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 	"io" | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	"io/ioutil" | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 	"sync" | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	"time" | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	ch "github.com/ethereum/go-ethereum/swarm/chunk" | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	"github.com/ethereum/go-ethereum/swarm/log" | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | /* | 
					
						
							|  |  |  |    The main idea of a pyramid chunker is to process the input data without knowing the entire size apriori. | 
					
						
							|  |  |  |    For this to be achieved, the chunker tree is built from the ground up until the data is exhausted. | 
					
						
							| 
									
										
										
										
											2017-10-30 01:23:23 +01:00
										 |  |  |    This opens up new aveneus such as easy append and other sort of modifications to the tree thereby avoiding | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  |    duplication of data chunks. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |    Below is an example of a two level chunks tree. The leaf chunks are called data chunks and all the above | 
					
						
							|  |  |  |    chunks are called tree chunks. The tree chunk above data chunks is level 0 and so on until it reaches | 
					
						
							|  |  |  |    the root tree chunk. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                                             T10                                        <- Tree chunk lvl1 | 
					
						
							|  |  |  |                                             | | 
					
						
							|  |  |  |                   __________________________|_____________________________ | 
					
						
							|  |  |  |                  /                  |                   |                \ | 
					
						
							|  |  |  |                 /                   |                   \                 \ | 
					
						
							|  |  |  |             __T00__             ___T01__           ___T02__           ___T03__         <- Tree chunks lvl 0 | 
					
						
							|  |  |  |            / /     \           / /      \         / /      \         / /      \ | 
					
						
							|  |  |  |           / /       \         / /        \       / /       \        / /        \ | 
					
						
							|  |  |  |          D1 D2 ... D128	     D1 D2 ... D128     D1 D2 ... D128     D1 D2 ... D128      <-  Data Chunks | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     The split function continuously read the data and creates data chunks and send them to storage. | 
					
						
							|  |  |  |     When certain no of data chunks are created (defaultBranches), a signal is sent to create a tree | 
					
						
							|  |  |  |     entry. When the level 0 tree entries reaches certain threshold (defaultBranches), another signal | 
					
						
							|  |  |  |     is sent to a tree entry one level up.. and so on... until only the data is exhausted AND only one | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  |     tree entry is present in certain level. The key of tree entry is given out as the rootAddress of the file. | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | */ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | var ( | 
					
						
							|  |  |  | 	errLoadingTreeRootChunk = errors.New("LoadTree Error: Could not load root chunk") | 
					
						
							|  |  |  | 	errLoadingTreeChunk     = errors.New("LoadTree Error: Could not load chunk") | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | const ( | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	ChunkProcessors = 8 | 
					
						
							|  |  |  | 	splitTimeout    = time.Minute * 5 | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | const ( | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	DataChunk = 0 | 
					
						
							|  |  |  | 	TreeChunk = 1 | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | type PyramidSplitterParams struct { | 
					
						
							|  |  |  | 	SplitterParams | 
					
						
							|  |  |  | 	getter Getter | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | func NewPyramidSplitterParams(addr Address, reader io.Reader, putter Putter, getter Getter, chunkSize int64) *PyramidSplitterParams { | 
					
						
							|  |  |  | 	hashSize := putter.RefSize() | 
					
						
							|  |  |  | 	return &PyramidSplitterParams{ | 
					
						
							|  |  |  | 		SplitterParams: SplitterParams{ | 
					
						
							|  |  |  | 			ChunkerParams: ChunkerParams{ | 
					
						
							|  |  |  | 				chunkSize: chunkSize, | 
					
						
							|  |  |  | 				hashSize:  hashSize, | 
					
						
							|  |  |  | 			}, | 
					
						
							|  |  |  | 			reader: reader, | 
					
						
							|  |  |  | 			putter: putter, | 
					
						
							|  |  |  | 			addr:   addr, | 
					
						
							|  |  |  | 		}, | 
					
						
							|  |  |  | 		getter: getter, | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | /* | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Address), the root hash of the entire content will fill this once processing finishes. | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	New chunks to store are store using the putter which the caller provides. | 
					
						
							|  |  |  | */ | 
					
						
							| 
									
										
										
										
											2018-07-09 14:11:49 +02:00
										 |  |  | func PyramidSplit(ctx context.Context, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) { | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, ch.DefaultSize)).Split(ctx) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-07-09 14:11:49 +02:00
										 |  |  | func PyramidAppend(ctx context.Context, addr Address, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) { | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, ch.DefaultSize)).Append(ctx) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | // Entry to create a tree node | 
					
						
							|  |  |  | type TreeEntry struct { | 
					
						
							|  |  |  | 	level         int | 
					
						
							|  |  |  | 	branchCount   int64 | 
					
						
							|  |  |  | 	subtreeSize   uint64 | 
					
						
							|  |  |  | 	chunk         []byte | 
					
						
							|  |  |  | 	key           []byte | 
					
						
							|  |  |  | 	index         int  // used in append to indicate the index of existing tree entry | 
					
						
							|  |  |  | 	updatePending bool // indicates if the entry is loaded from existing tree | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | func NewTreeEntry(pyramid *PyramidChunker) *TreeEntry { | 
					
						
							|  |  |  | 	return &TreeEntry{ | 
					
						
							|  |  |  | 		level:         0, | 
					
						
							|  |  |  | 		branchCount:   0, | 
					
						
							|  |  |  | 		subtreeSize:   0, | 
					
						
							|  |  |  | 		chunk:         make([]byte, pyramid.chunkSize+8), | 
					
						
							|  |  |  | 		key:           make([]byte, pyramid.hashSize), | 
					
						
							|  |  |  | 		index:         0, | 
					
						
							|  |  |  | 		updatePending: false, | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | // Used by the hash processor to create a data/tree chunk and send to storage | 
					
						
							|  |  |  | type chunkJob struct { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	key      Address | 
					
						
							|  |  |  | 	chunk    []byte | 
					
						
							|  |  |  | 	parentWg *sync.WaitGroup | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type PyramidChunker struct { | 
					
						
							|  |  |  | 	chunkSize   int64 | 
					
						
							|  |  |  | 	hashSize    int64 | 
					
						
							|  |  |  | 	branches    int64 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	reader      io.Reader | 
					
						
							|  |  |  | 	putter      Putter | 
					
						
							|  |  |  | 	getter      Getter | 
					
						
							|  |  |  | 	key         Address | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	workerCount int64 | 
					
						
							| 
									
										
										
										
											2017-11-08 11:45:52 +01:00
										 |  |  | 	workerLock  sync.RWMutex | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	jobC        chan *chunkJob | 
					
						
							|  |  |  | 	wg          *sync.WaitGroup | 
					
						
							|  |  |  | 	errC        chan error | 
					
						
							|  |  |  | 	quitC       chan bool | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	rootAddress []byte | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	chunkLevel  [][]*TreeEntry | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | func NewPyramidSplitter(params *PyramidSplitterParams) (pc *PyramidChunker) { | 
					
						
							|  |  |  | 	pc = &PyramidChunker{} | 
					
						
							|  |  |  | 	pc.reader = params.reader | 
					
						
							|  |  |  | 	pc.hashSize = params.hashSize | 
					
						
							|  |  |  | 	pc.branches = params.chunkSize / pc.hashSize | 
					
						
							|  |  |  | 	pc.chunkSize = pc.hashSize * pc.branches | 
					
						
							|  |  |  | 	pc.putter = params.putter | 
					
						
							|  |  |  | 	pc.getter = params.getter | 
					
						
							|  |  |  | 	pc.key = params.addr | 
					
						
							|  |  |  | 	pc.workerCount = 0 | 
					
						
							|  |  |  | 	pc.jobC = make(chan *chunkJob, 2*ChunkProcessors) | 
					
						
							|  |  |  | 	pc.wg = &sync.WaitGroup{} | 
					
						
							|  |  |  | 	pc.errC = make(chan error) | 
					
						
							|  |  |  | 	pc.quitC = make(chan bool) | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	pc.rootAddress = make([]byte, pc.hashSize) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	pc.chunkLevel = make([][]*TreeEntry, pc.branches) | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 	return | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | func (pc *PyramidChunker) Join(addr Address, getter Getter, depth int) LazySectionReader { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	return &LazyChunkReader{ | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 		addr:      addr, | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		depth:     depth, | 
					
						
							|  |  |  | 		chunkSize: pc.chunkSize, | 
					
						
							|  |  |  | 		branches:  pc.branches, | 
					
						
							|  |  |  | 		hashSize:  pc.hashSize, | 
					
						
							|  |  |  | 		getter:    getter, | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | func (pc *PyramidChunker) incrementWorkerCount() { | 
					
						
							|  |  |  | 	pc.workerLock.Lock() | 
					
						
							|  |  |  | 	defer pc.workerLock.Unlock() | 
					
						
							|  |  |  | 	pc.workerCount += 1 | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | } | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | func (pc *PyramidChunker) getWorkerCount() int64 { | 
					
						
							|  |  |  | 	pc.workerLock.Lock() | 
					
						
							|  |  |  | 	defer pc.workerLock.Unlock() | 
					
						
							|  |  |  | 	return pc.workerCount | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | func (pc *PyramidChunker) decrementWorkerCount() { | 
					
						
							|  |  |  | 	pc.workerLock.Lock() | 
					
						
							|  |  |  | 	defer pc.workerLock.Unlock() | 
					
						
							|  |  |  | 	pc.workerCount -= 1 | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-07-09 14:11:49 +02:00
										 |  |  | func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(context.Context) error, err error) { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	log.Debug("pyramid.chunker: Split()") | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	pc.wg.Add(1) | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	pc.prepareChunks(ctx, false) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 	// closes internal error channel if all subprocesses in the workgroup finished | 
					
						
							|  |  |  | 	go func() { | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// waiting for all chunks to finish | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		pc.wg.Wait() | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 		//We close errC here because this is passed down to 8 parallel routines underneath. | 
					
						
							|  |  |  | 		// if a error happens in one of them.. that particular routine raises error... | 
					
						
							|  |  |  | 		// once they all complete successfully, the control comes back and we can safely close this here. | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		close(pc.errC) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	defer close(pc.quitC) | 
					
						
							|  |  |  | 	defer pc.putter.Close() | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 	select { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	case err := <-pc.errC: | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 			return nil, nil, err | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	case <-ctx.Done(): | 
					
						
							|  |  |  | 		_ = pc.putter.Wait(ctx) //??? | 
					
						
							|  |  |  | 		return nil, nil, ctx.Err() | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	return pc.rootAddress, pc.putter.Wait, nil | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-07-09 14:11:49 +02:00
										 |  |  | func (pc *PyramidChunker) Append(ctx context.Context) (k Address, wait func(context.Context) error, err error) { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	log.Debug("pyramid.chunker: Append()") | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	// Load the right most unfinished tree chunks in every level | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	pc.loadTree(ctx) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	pc.wg.Add(1) | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	pc.prepareChunks(ctx, true) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 	// closes internal error channel if all subprocesses in the workgroup finished | 
					
						
							|  |  |  | 	go func() { | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// waiting for all chunks to finish | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		pc.wg.Wait() | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		close(pc.errC) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	defer close(pc.quitC) | 
					
						
							|  |  |  | 	defer pc.putter.Close() | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 	select { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	case err := <-pc.errC: | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 			return nil, nil, err | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		} | 
					
						
							|  |  |  | 	case <-time.NewTimer(splitTimeout).C: | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	return pc.rootAddress, pc.putter.Wait, nil | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | } | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | func (pc *PyramidChunker) processor(ctx context.Context, id int64) { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	defer pc.decrementWorkerCount() | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	for { | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 		select { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		case job, ok := <-pc.jobC: | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 			if !ok { | 
					
						
							|  |  |  | 				return | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 			pc.processChunk(ctx, id, job) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		case <-pc.quitC: | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 			return | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | func (pc *PyramidChunker) processChunk(ctx context.Context, id int64, job *chunkJob) { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	log.Debug("pyramid.chunker: processChunk()", "id", id) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	ref, err := pc.putter.Put(ctx, job.chunk) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 		select { | 
					
						
							|  |  |  | 		case pc.errC <- err: | 
					
						
							|  |  |  | 		case <-pc.quitC: | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// report hash of this chunk one level up (keys corresponds to the proper subslice of the parent chunk) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	copy(job.key, ref) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 	// send off new chunk to storage | 
					
						
							|  |  |  | 	job.parentWg.Done() | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | func (pc *PyramidChunker) loadTree(ctx context.Context) error { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	log.Debug("pyramid.chunker: loadTree()") | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	// Get the root chunk to get the total size | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	chunkData, err := pc.getter.Get(ctx, Reference(pc.key)) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		return errLoadingTreeRootChunk | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	chunkSize := int64(chunkData.Size()) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	log.Trace("pyramid.chunker: root chunk", "chunk.Size", chunkSize, "pc.chunkSize", pc.chunkSize) | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	//if data size is less than a chunk... add a parent with update as pending | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	if chunkSize <= pc.chunkSize { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		newEntry := &TreeEntry{ | 
					
						
							|  |  |  | 			level:         0, | 
					
						
							|  |  |  | 			branchCount:   1, | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 			subtreeSize:   uint64(chunkSize), | 
					
						
							|  |  |  | 			chunk:         make([]byte, pc.chunkSize+8), | 
					
						
							|  |  |  | 			key:           make([]byte, pc.hashSize), | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 			index:         0, | 
					
						
							|  |  |  | 			updatePending: true, | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		copy(newEntry.chunk[8:], pc.key) | 
					
						
							|  |  |  | 		pc.chunkLevel[0] = append(pc.chunkLevel[0], newEntry) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	var treeSize int64 | 
					
						
							|  |  |  | 	var depth int | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	treeSize = pc.chunkSize | 
					
						
							|  |  |  | 	for ; treeSize < chunkSize; treeSize *= pc.branches { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		depth++ | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	log.Trace("pyramid.chunker", "depth", depth) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Add the root chunk entry | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	branchCount := int64(len(chunkData)-8) / pc.hashSize | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	newEntry := &TreeEntry{ | 
					
						
							| 
									
										
										
										
											2017-11-10 18:06:45 +01:00
										 |  |  | 		level:         depth - 1, | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		branchCount:   branchCount, | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		subtreeSize:   uint64(chunkSize), | 
					
						
							|  |  |  | 		chunk:         chunkData, | 
					
						
							|  |  |  | 		key:           pc.key, | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		index:         0, | 
					
						
							|  |  |  | 		updatePending: true, | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	pc.chunkLevel[depth-1] = append(pc.chunkLevel[depth-1], newEntry) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Add the rest of the tree | 
					
						
							| 
									
										
										
										
											2018-01-03 15:14:47 +03:00
										 |  |  | 	for lvl := depth - 1; lvl >= 1; lvl-- { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 		//TODO(jmozah): instead of loading finished branches and then trim in the end, | 
					
						
							|  |  |  | 		//avoid loading them in the first place | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		for _, ent := range pc.chunkLevel[lvl] { | 
					
						
							|  |  |  | 			branchCount = int64(len(ent.chunk)-8) / pc.hashSize | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 			for i := int64(0); i < branchCount; i++ { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 				key := ent.chunk[8+(i*pc.hashSize) : 8+((i+1)*pc.hashSize)] | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 				newChunkData, err := pc.getter.Get(ctx, Reference(key)) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 				if err != nil { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 					return errLoadingTreeChunk | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 				} | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 				newChunkSize := newChunkData.Size() | 
					
						
							|  |  |  | 				bewBranchCount := int64(len(newChunkData)-8) / pc.hashSize | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 				newEntry := &TreeEntry{ | 
					
						
							| 
									
										
										
										
											2017-11-10 18:06:45 +01:00
										 |  |  | 					level:         lvl - 1, | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 					branchCount:   bewBranchCount, | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 					subtreeSize:   newChunkSize, | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 					chunk:         newChunkData, | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 					key:           key, | 
					
						
							|  |  |  | 					index:         0, | 
					
						
							|  |  |  | 					updatePending: true, | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 				} | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 				pc.chunkLevel[lvl-1] = append(pc.chunkLevel[lvl-1], newEntry) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 			// We need to get only the right most unfinished branch.. so trim all finished branches | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 			if int64(len(pc.chunkLevel[lvl-1])) >= pc.branches { | 
					
						
							|  |  |  | 				pc.chunkLevel[lvl-1] = nil | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | func (pc *PyramidChunker) prepareChunks(ctx context.Context, isAppend bool) { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	log.Debug("pyramid.chunker: prepareChunks", "isAppend", isAppend) | 
					
						
							|  |  |  | 	defer pc.wg.Done() | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 	chunkWG := &sync.WaitGroup{} | 
					
						
							| 
									
										
										
										
											2017-08-08 20:34:35 +03:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	pc.incrementWorkerCount() | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	go pc.processor(ctx, pc.workerCount) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	parent := NewTreeEntry(pc) | 
					
						
							|  |  |  | 	var unfinishedChunkData ChunkData | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	var unfinishedChunkSize uint64 | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	if isAppend && len(pc.chunkLevel[0]) != 0 { | 
					
						
							|  |  |  | 		lastIndex := len(pc.chunkLevel[0]) - 1 | 
					
						
							|  |  |  | 		ent := pc.chunkLevel[0][lastIndex] | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		if ent.branchCount < pc.branches { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 			parent = &TreeEntry{ | 
					
						
							|  |  |  | 				level:         0, | 
					
						
							|  |  |  | 				branchCount:   ent.branchCount, | 
					
						
							|  |  |  | 				subtreeSize:   ent.subtreeSize, | 
					
						
							|  |  |  | 				chunk:         ent.chunk, | 
					
						
							|  |  |  | 				key:           ent.key, | 
					
						
							|  |  |  | 				index:         lastIndex, | 
					
						
							|  |  |  | 				updatePending: true, | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 			lastBranch := parent.branchCount - 1 | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 			lastAddress := parent.chunk[8+lastBranch*pc.hashSize : 8+(lastBranch+1)*pc.hashSize] | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 			var err error | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 			unfinishedChunkData, err = pc.getter.Get(ctx, lastAddress) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				pc.errC <- err | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			unfinishedChunkSize = unfinishedChunkData.Size() | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 			if unfinishedChunkSize < uint64(pc.chunkSize) { | 
					
						
							|  |  |  | 				parent.subtreeSize = parent.subtreeSize - unfinishedChunkSize | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 				parent.branchCount = parent.branchCount - 1 | 
					
						
							|  |  |  | 			} else { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 				unfinishedChunkData = nil | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	for index := 0; ; index++ { | 
					
						
							|  |  |  | 		var err error | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		chunkData := make([]byte, pc.chunkSize+8) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		var readBytes int | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		if unfinishedChunkData != nil { | 
					
						
							|  |  |  | 			copy(chunkData, unfinishedChunkData) | 
					
						
							|  |  |  | 			readBytes += int(unfinishedChunkSize) | 
					
						
							|  |  |  | 			unfinishedChunkData = nil | 
					
						
							|  |  |  | 			log.Trace("pyramid.chunker: found unfinished chunk", "readBytes", readBytes) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		var res []byte | 
					
						
							|  |  |  | 		res, err = ioutil.ReadAll(io.LimitReader(pc.reader, int64(len(chunkData)-(8+readBytes)))) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// hack for ioutil.ReadAll: | 
					
						
							|  |  |  | 		// a successful call to ioutil.ReadAll returns err == nil, not err == EOF, whereas we | 
					
						
							|  |  |  | 		// want to propagate the io.EOF error | 
					
						
							|  |  |  | 		if len(res) == 0 && err == nil { | 
					
						
							|  |  |  | 			err = io.EOF | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		copy(chunkData[8+readBytes:], res) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		readBytes += len(res) | 
					
						
							|  |  |  | 		log.Trace("pyramid.chunker: copied all data", "readBytes", readBytes) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			if err == io.EOF || err == io.ErrUnexpectedEOF { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 				pc.cleanChunkLevels() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 				// Check if we are appending or the chunk is the only one. | 
					
						
							|  |  |  | 				if parent.branchCount == 1 && (pc.depth() == 0 || isAppend) { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 					// Data is exactly one chunk.. pick the last chunk key as root | 
					
						
							|  |  |  | 					chunkWG.Wait() | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 					lastChunksAddress := parent.chunk[8 : 8+pc.hashSize] | 
					
						
							|  |  |  | 					copy(pc.rootAddress, lastChunksAddress) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 					break | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} else { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 				close(pc.quitC) | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 				break | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-30 01:23:23 +01:00
										 |  |  | 		// Data ended in chunk boundary.. just signal to start bulding tree | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		if readBytes == 0 { | 
					
						
							|  |  |  | 			pc.buildTree(isAppend, parent, chunkWG, true, nil) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 			break | 
					
						
							|  |  |  | 		} else { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 			pkey := pc.enqueueDataChunk(chunkData, uint64(readBytes), parent, chunkWG) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 			// update tree related parent data structures | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 			parent.subtreeSize += uint64(readBytes) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 			parent.branchCount++ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			// Data got exhausted... signal to send any parent tree related chunks | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 			if int64(readBytes) < pc.chunkSize { | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 				pc.cleanChunkLevels() | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 				// only one data chunk .. so dont add any parent chunk | 
					
						
							|  |  |  | 				if parent.branchCount <= 1 { | 
					
						
							|  |  |  | 					chunkWG.Wait() | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 					if isAppend || pc.depth() == 0 { | 
					
						
							|  |  |  | 						// No need to build the tree if the depth is 0 | 
					
						
							|  |  |  | 						// or we are appending. | 
					
						
							|  |  |  | 						// Just use the last key. | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 						copy(pc.rootAddress, pkey) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 					} else { | 
					
						
							|  |  |  | 						// We need to build the tree and and provide the lonely | 
					
						
							|  |  |  | 						// chunk key to replace the last tree chunk key. | 
					
						
							|  |  |  | 						pc.buildTree(isAppend, parent, chunkWG, true, pkey) | 
					
						
							|  |  |  | 					} | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 					break | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 				pc.buildTree(isAppend, parent, chunkWG, true, nil) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 				break | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 			if parent.branchCount == pc.branches { | 
					
						
							|  |  |  | 				pc.buildTree(isAppend, parent, chunkWG, false, nil) | 
					
						
							|  |  |  | 				parent = NewTreeEntry(pc) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		workers := pc.getWorkerCount() | 
					
						
							|  |  |  | 		if int64(len(pc.jobC)) > workers && workers < ChunkProcessors { | 
					
						
							|  |  |  | 			pc.incrementWorkerCount() | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 			go pc.processor(ctx, pc.workerCount) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | func (pc *PyramidChunker) buildTree(isAppend bool, ent *TreeEntry, chunkWG *sync.WaitGroup, last bool, lonelyChunkKey []byte) { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	chunkWG.Wait() | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	pc.enqueueTreeChunk(ent, chunkWG, last) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 	compress := false | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	endLvl := pc.branches | 
					
						
							|  |  |  | 	for lvl := int64(0); lvl < pc.branches; lvl++ { | 
					
						
							|  |  |  | 		lvlCount := int64(len(pc.chunkLevel[lvl])) | 
					
						
							|  |  |  | 		if lvlCount >= pc.branches { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 			endLvl = lvl + 1 | 
					
						
							|  |  |  | 			compress = true | 
					
						
							|  |  |  | 			break | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-12 18:05:47 +00:00
										 |  |  | 	if !compress && !last { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Wait for all the keys to be processed before compressing the tree | 
					
						
							|  |  |  | 	chunkWG.Wait() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for lvl := int64(ent.level); lvl < endLvl; lvl++ { | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		lvlCount := int64(len(pc.chunkLevel[lvl])) | 
					
						
							| 
									
										
										
										
											2017-12-12 18:05:47 +00:00
										 |  |  | 		if lvlCount == 1 && last { | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 			copy(pc.rootAddress, pc.chunkLevel[lvl][0].key) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		for startCount := int64(0); startCount < lvlCount; startCount += pc.branches { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 			endCount := startCount + pc.branches | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 			if endCount > lvlCount { | 
					
						
							|  |  |  | 				endCount = lvlCount | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			var nextLvlCount int64 | 
					
						
							|  |  |  | 			var tempEntry *TreeEntry | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 			if len(pc.chunkLevel[lvl+1]) > 0 { | 
					
						
							|  |  |  | 				nextLvlCount = int64(len(pc.chunkLevel[lvl+1]) - 1) | 
					
						
							|  |  |  | 				tempEntry = pc.chunkLevel[lvl+1][nextLvlCount] | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2017-12-12 18:05:47 +00:00
										 |  |  | 			if isAppend && tempEntry != nil && tempEntry.updatePending { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 				updateEntry := &TreeEntry{ | 
					
						
							|  |  |  | 					level:         int(lvl + 1), | 
					
						
							|  |  |  | 					branchCount:   0, | 
					
						
							|  |  |  | 					subtreeSize:   0, | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 					chunk:         make([]byte, pc.chunkSize+8), | 
					
						
							|  |  |  | 					key:           make([]byte, pc.hashSize), | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 					index:         int(nextLvlCount), | 
					
						
							|  |  |  | 					updatePending: true, | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 				for index := int64(0); index < lvlCount; index++ { | 
					
						
							|  |  |  | 					updateEntry.branchCount++ | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 					updateEntry.subtreeSize += pc.chunkLevel[lvl][index].subtreeSize | 
					
						
							|  |  |  | 					copy(updateEntry.chunk[8+(index*pc.hashSize):8+((index+1)*pc.hashSize)], pc.chunkLevel[lvl][index].key[:pc.hashSize]) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 				} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 				pc.enqueueTreeChunk(updateEntry, chunkWG, last) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 			} else { | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 				noOfBranches := endCount - startCount | 
					
						
							|  |  |  | 				newEntry := &TreeEntry{ | 
					
						
							|  |  |  | 					level:         int(lvl + 1), | 
					
						
							|  |  |  | 					branchCount:   noOfBranches, | 
					
						
							|  |  |  | 					subtreeSize:   0, | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 					chunk:         make([]byte, (noOfBranches*pc.hashSize)+8), | 
					
						
							|  |  |  | 					key:           make([]byte, pc.hashSize), | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 					index:         int(nextLvlCount), | 
					
						
							|  |  |  | 					updatePending: false, | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 				index := int64(0) | 
					
						
							|  |  |  | 				for i := startCount; i < endCount; i++ { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 					entry := pc.chunkLevel[lvl][i] | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 					newEntry.subtreeSize += entry.subtreeSize | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 					copy(newEntry.chunk[8+(index*pc.hashSize):8+((index+1)*pc.hashSize)], entry.key[:pc.hashSize]) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 					index++ | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 				// Lonely chunk key is the key of the last chunk that is only one on the last branch. | 
					
						
							|  |  |  | 				// In this case, ignore the its tree chunk key and replace it with the lonely chunk key. | 
					
						
							|  |  |  | 				if lonelyChunkKey != nil { | 
					
						
							|  |  |  | 					// Overwrite the last tree chunk key with the lonely data chunk key. | 
					
						
							|  |  |  | 					copy(newEntry.chunk[int64(len(newEntry.chunk))-pc.hashSize:], lonelyChunkKey[:pc.hashSize]) | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 				pc.enqueueTreeChunk(newEntry, chunkWG, last) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-12 18:05:47 +00:00
										 |  |  | 		if !isAppend { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 			chunkWG.Wait() | 
					
						
							| 
									
										
										
										
											2017-12-12 18:05:47 +00:00
										 |  |  | 			if compress { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 				pc.chunkLevel[lvl] = nil | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | } | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | func (pc *PyramidChunker) enqueueTreeChunk(ent *TreeEntry, chunkWG *sync.WaitGroup, last bool) { | 
					
						
							|  |  |  | 	if ent != nil && ent.branchCount > 0 { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 		// wait for data chunks to get over before processing the tree chunk | 
					
						
							| 
									
										
										
										
											2017-12-12 18:05:47 +00:00
										 |  |  | 		if last { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 			chunkWG.Wait() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		binary.LittleEndian.PutUint64(ent.chunk[:8], ent.subtreeSize) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		ent.key = make([]byte, pc.hashSize) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		chunkWG.Add(1) | 
					
						
							|  |  |  | 		select { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		case pc.jobC <- &chunkJob{ent.key, ent.chunk[:ent.branchCount*pc.hashSize+8], chunkWG}: | 
					
						
							|  |  |  | 		case <-pc.quitC: | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Update or append based on weather it is a new entry or being reused | 
					
						
							| 
									
										
										
										
											2017-12-12 18:05:47 +00:00
										 |  |  | 		if ent.updatePending { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 			chunkWG.Wait() | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 			pc.chunkLevel[ent.level][ent.index] = ent | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		} else { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 			pc.chunkLevel[ent.level] = append(pc.chunkLevel[ent.level], ent) | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | func (pc *PyramidChunker) enqueueDataChunk(chunkData []byte, size uint64, parent *TreeEntry, chunkWG *sync.WaitGroup) Address { | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	binary.LittleEndian.PutUint64(chunkData[:8], size) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	pkey := parent.chunk[8+parent.branchCount*pc.hashSize : 8+(parent.branchCount+1)*pc.hashSize] | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 	chunkWG.Add(1) | 
					
						
							|  |  |  | 	select { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	case pc.jobC <- &chunkJob{pkey, chunkData[:size+8], chunkWG}: | 
					
						
							|  |  |  | 	case <-pc.quitC: | 
					
						
							| 
									
										
										
										
											2017-09-22 01:52:51 +05:30
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return pkey | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-11-08 11:45:52 +01:00
										 |  |  | } | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | // depth returns the number of chunk levels. | 
					
						
							|  |  |  | // It is used to detect if there is only one data chunk | 
					
						
							|  |  |  | // left for the last branch. | 
					
						
							|  |  |  | func (pc *PyramidChunker) depth() (d int) { | 
					
						
							|  |  |  | 	for _, l := range pc.chunkLevel { | 
					
						
							|  |  |  | 		if l == nil { | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		d++ | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // cleanChunkLevels removes gaps (nil levels) between chunk levels | 
					
						
							|  |  |  | // that are not nil. | 
					
						
							|  |  |  | func (pc *PyramidChunker) cleanChunkLevels() { | 
					
						
							|  |  |  | 	for i, l := range pc.chunkLevel { | 
					
						
							|  |  |  | 		if l == nil { | 
					
						
							|  |  |  | 			pc.chunkLevel = append(pc.chunkLevel[:i], append(pc.chunkLevel[i+1:], nil)...) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } |