245 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			245 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2016 The go-ethereum Authors
 | |
| // This file is part of the go-ethereum library.
 | |
| //
 | |
| // The go-ethereum library is free software: you can redistribute it and/or modify
 | |
| // it under the terms of the GNU Lesser General Public License as published by
 | |
| // the Free Software Foundation, either version 3 of the License, or
 | |
| // (at your option) any later version.
 | |
| //
 | |
| // The go-ethereum library is distributed in the hope that it will be useful,
 | |
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | |
| // GNU Lesser General Public License for more details.
 | |
| //
 | |
| // You should have received a copy of the GNU Lesser General Public License
 | |
| // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| package storage
 | |
| 
 | |
| import (
 | |
| 	"errors"
 | |
| 	"io"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/ethereum/go-ethereum/logger"
 | |
| 	"github.com/ethereum/go-ethereum/logger/glog"
 | |
| )
 | |
| 
 | |
| /*
 | |
| DPA provides the client API entrypoints Store and Retrieve to store and retrieve
 | |
| It can store anything that has a byte slice representation, so files or serialised objects etc.
 | |
| 
 | |
| Storage: DPA calls the Chunker to segment the input datastream of any size to a merkle hashed tree of chunks. The key of the root block is returned to the client.
 | |
| 
 | |
| Retrieval: given the key of the root block, the DPA retrieves the block chunks and reconstructs the original data and passes it back as a lazy reader. A lazy reader is a reader with on-demand delayed processing, i.e. the chunks needed to reconstruct a large file are only fetched and processed if that particular part of the document is actually read.
 | |
| 
 | |
| As the chunker produces chunks, DPA dispatches them to its own chunk store
 | |
| implementation for storage or retrieval.
 | |
| */
 | |
| 
 | |
| const (
 | |
| 	storeChanCapacity           = 100
 | |
| 	retrieveChanCapacity        = 100
 | |
| 	singletonSwarmDbCapacity    = 50000
 | |
| 	singletonSwarmCacheCapacity = 500
 | |
| 	maxStoreProcesses           = 8
 | |
| 	maxRetrieveProcesses        = 8
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	notFound = errors.New("not found")
 | |
| )
 | |
| 
 | |
| type DPA struct {
 | |
| 	ChunkStore
 | |
| 	storeC    chan *Chunk
 | |
| 	retrieveC chan *Chunk
 | |
| 	Chunker   Chunker
 | |
| 
 | |
| 	lock    sync.Mutex
 | |
| 	running bool
 | |
| 	wg      *sync.WaitGroup
 | |
| 	quitC   chan bool
 | |
| }
 | |
| 
 | |
| // for testing locally
 | |
| func NewLocalDPA(datadir string) (*DPA, error) {
 | |
| 
 | |
| 	hash := MakeHashFunc("SHA256")
 | |
| 
 | |
| 	dbStore, err := NewDbStore(datadir, hash, singletonSwarmDbCapacity, 0)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return NewDPA(&LocalStore{
 | |
| 		NewMemStore(dbStore, singletonSwarmCacheCapacity),
 | |
| 		dbStore,
 | |
| 	}, NewChunkerParams()), nil
 | |
| }
 | |
| 
 | |
| func NewDPA(store ChunkStore, params *ChunkerParams) *DPA {
 | |
| 	chunker := NewTreeChunker(params)
 | |
| 	return &DPA{
 | |
| 		Chunker:    chunker,
 | |
| 		ChunkStore: store,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Public API. Main entry point for document retrieval directly. Used by the
 | |
| // FS-aware API and httpaccess
 | |
| // Chunk retrieval blocks on netStore requests with a timeout so reader will
 | |
| // report error if retrieval of chunks within requested range time out.
 | |
| func (self *DPA) Retrieve(key Key) LazySectionReader {
 | |
| 	return self.Chunker.Join(key, self.retrieveC)
 | |
| }
 | |
| 
 | |
| // Public API. Main entry point for document storage directly. Used by the
 | |
| // FS-aware API and httpaccess
 | |
| func (self *DPA) Store(data io.Reader, size int64, swg *sync.WaitGroup, wwg *sync.WaitGroup) (key Key, err error) {
 | |
| 	return self.Chunker.Split(data, size, self.storeC, swg, wwg)
 | |
| }
 | |
| 
 | |
| func (self *DPA) Start() {
 | |
| 	self.lock.Lock()
 | |
| 	defer self.lock.Unlock()
 | |
| 	if self.running {
 | |
| 		return
 | |
| 	}
 | |
| 	self.running = true
 | |
| 	self.retrieveC = make(chan *Chunk, retrieveChanCapacity)
 | |
| 	self.storeC = make(chan *Chunk, storeChanCapacity)
 | |
| 	self.quitC = make(chan bool)
 | |
| 	self.storeLoop()
 | |
| 	self.retrieveLoop()
 | |
| }
 | |
| 
 | |
| func (self *DPA) Stop() {
 | |
| 	self.lock.Lock()
 | |
| 	defer self.lock.Unlock()
 | |
| 	if !self.running {
 | |
| 		return
 | |
| 	}
 | |
| 	self.running = false
 | |
| 	close(self.quitC)
 | |
| }
 | |
| 
 | |
| // retrieveLoop dispatches the parallel chunk retrieval requests received on the
 | |
| // retrieve channel to its ChunkStore  (NetStore or LocalStore)
 | |
| func (self *DPA) retrieveLoop() {
 | |
| 	for i := 0; i < maxRetrieveProcesses; i++ {
 | |
| 		go self.retrieveWorker()
 | |
| 	}
 | |
| 	glog.V(logger.Detail).Infof("dpa: retrieve loop spawning %v workers", maxRetrieveProcesses)
 | |
| }
 | |
| 
 | |
| func (self *DPA) retrieveWorker() {
 | |
| 	for chunk := range self.retrieveC {
 | |
| 		glog.V(logger.Detail).Infof("dpa: retrieve loop : chunk %v", chunk.Key.Log())
 | |
| 		storedChunk, err := self.Get(chunk.Key)
 | |
| 		if err == notFound {
 | |
| 			glog.V(logger.Detail).Infof("chunk %v not found", chunk.Key.Log())
 | |
| 		} else if err != nil {
 | |
| 			glog.V(logger.Detail).Infof("error retrieving chunk %v: %v", chunk.Key.Log(), err)
 | |
| 		} else {
 | |
| 			chunk.SData = storedChunk.SData
 | |
| 			chunk.Size = storedChunk.Size
 | |
| 		}
 | |
| 		close(chunk.C)
 | |
| 
 | |
| 		select {
 | |
| 		case <-self.quitC:
 | |
| 			return
 | |
| 		default:
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // storeLoop dispatches the parallel chunk store request processors
 | |
| // received on the store channel to its ChunkStore (NetStore or LocalStore)
 | |
| func (self *DPA) storeLoop() {
 | |
| 	for i := 0; i < maxStoreProcesses; i++ {
 | |
| 		go self.storeWorker()
 | |
| 	}
 | |
| 	glog.V(logger.Detail).Infof("dpa: store spawning %v workers", maxStoreProcesses)
 | |
| }
 | |
| 
 | |
| func (self *DPA) storeWorker() {
 | |
| 
 | |
| 	for chunk := range self.storeC {
 | |
| 		self.Put(chunk)
 | |
| 		if chunk.wg != nil {
 | |
| 			glog.V(logger.Detail).Infof("dpa: store processor %v", chunk.Key.Log())
 | |
| 			chunk.wg.Done()
 | |
| 
 | |
| 		}
 | |
| 		select {
 | |
| 		case <-self.quitC:
 | |
| 			return
 | |
| 		default:
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // DpaChunkStore implements the ChunkStore interface,
 | |
| // this chunk access layer assumed 2 chunk stores
 | |
| // local storage eg. LocalStore and network storage eg., NetStore
 | |
| // access by calling network is blocking with a timeout
 | |
| 
 | |
| type dpaChunkStore struct {
 | |
| 	n          int
 | |
| 	localStore ChunkStore
 | |
| 	netStore   ChunkStore
 | |
| }
 | |
| 
 | |
| func NewDpaChunkStore(localStore, netStore ChunkStore) *dpaChunkStore {
 | |
| 	return &dpaChunkStore{0, localStore, netStore}
 | |
| }
 | |
| 
 | |
| // Get is the entrypoint for local retrieve requests
 | |
| // waits for response or times out
 | |
| func (self *dpaChunkStore) Get(key Key) (chunk *Chunk, err error) {
 | |
| 	chunk, err = self.netStore.Get(key)
 | |
| 	// timeout := time.Now().Add(searchTimeout)
 | |
| 	if chunk.SData != nil {
 | |
| 		glog.V(logger.Detail).Infof("DPA.Get: %v found locally, %d bytes", key.Log(), len(chunk.SData))
 | |
| 		return
 | |
| 	}
 | |
| 	// TODO: use self.timer time.Timer and reset with defer disableTimer
 | |
| 	timer := time.After(searchTimeout)
 | |
| 	select {
 | |
| 	case <-timer:
 | |
| 		glog.V(logger.Detail).Infof("DPA.Get: %v request time out ", key.Log())
 | |
| 		err = notFound
 | |
| 	case <-chunk.Req.C:
 | |
| 		glog.V(logger.Detail).Infof("DPA.Get: %v retrieved, %d bytes (%p)", key.Log(), len(chunk.SData), chunk)
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // Put is the entrypoint for local store requests coming from storeLoop
 | |
| func (self *dpaChunkStore) Put(entry *Chunk) {
 | |
| 	chunk, err := self.localStore.Get(entry.Key)
 | |
| 	if err != nil {
 | |
| 		glog.V(logger.Detail).Infof("DPA.Put: %v new chunk. call netStore.Put", entry.Key.Log())
 | |
| 		chunk = entry
 | |
| 	} else if chunk.SData == nil {
 | |
| 		glog.V(logger.Detail).Infof("DPA.Put: %v request entry found", entry.Key.Log())
 | |
| 		chunk.SData = entry.SData
 | |
| 		chunk.Size = entry.Size
 | |
| 	} else {
 | |
| 		glog.V(logger.Detail).Infof("DPA.Put: %v chunk already known", entry.Key.Log())
 | |
| 		return
 | |
| 	}
 | |
| 	// from this point on the storage logic is the same with network storage requests
 | |
| 	glog.V(logger.Detail).Infof("DPA.Put %v: %v", self.n, chunk.Key.Log())
 | |
| 	self.n++
 | |
| 	self.netStore.Put(chunk)
 | |
| }
 | |
| 
 | |
| // Close chunk store
 | |
| func (self *dpaChunkStore) Close() {
 | |
| 	return
 | |
| }
 |