474 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			474 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
|   | // Copyright 2016 The go-ethereum Authors | ||
|  | // This file is part of the go-ethereum library. | ||
|  | // | ||
|  | // The go-ethereum library is free software: you can redistribute it and/or modify | ||
|  | // it under the terms of the GNU Lesser General Public License as published by | ||
|  | // the Free Software Foundation, either version 3 of the License, or | ||
|  | // (at your option) any later version. | ||
|  | // | ||
|  | // The go-ethereum library is distributed in the hope that it will be useful, | ||
|  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
|  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
|  | // GNU Lesser General Public License for more details. | ||
|  | // | ||
|  | // You should have received a copy of the GNU Lesser General Public License | ||
|  | // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | ||
|  | 
 | ||
|  | // disk storage layer for the package bzz | ||
|  | // DbStore implements the ChunkStore interface and is used by the DPA as | ||
|  | // persistent storage of chunks | ||
|  | // it implements purging based on access count allowing for external control of | ||
|  | // max capacity | ||
|  | 
 | ||
|  | package storage | ||
|  | 
 | ||
|  | import ( | ||
|  | 	"bytes" | ||
|  | 	"encoding/binary" | ||
|  | 	"fmt" | ||
|  | 	"sync" | ||
|  | 
 | ||
|  | 	"github.com/ethereum/go-ethereum/logger" | ||
|  | 	"github.com/ethereum/go-ethereum/logger/glog" | ||
|  | 	"github.com/ethereum/go-ethereum/rlp" | ||
|  | 	"github.com/syndtr/goleveldb/leveldb" | ||
|  | 	"github.com/syndtr/goleveldb/leveldb/iterator" | ||
|  | ) | ||
|  | 
 | ||
|  | const ( | ||
|  | 	defaultDbCapacity = 5000000 | ||
|  | 	defaultRadius     = 0 // not yet used | ||
|  | 
 | ||
|  | 	gcArraySize      = 10000 | ||
|  | 	gcArrayFreeRatio = 0.1 | ||
|  | 
 | ||
|  | 	// key prefixes for leveldb storage | ||
|  | 	kpIndex = 0 | ||
|  | 	kpData  = 1 | ||
|  | ) | ||
|  | 
 | ||
|  | var ( | ||
|  | 	keyAccessCnt = []byte{2} | ||
|  | 	keyEntryCnt  = []byte{3} | ||
|  | 	keyDataIdx   = []byte{4} | ||
|  | 	keyGCPos     = []byte{5} | ||
|  | ) | ||
|  | 
 | ||
|  | type gcItem struct { | ||
|  | 	idx    uint64 | ||
|  | 	value  uint64 | ||
|  | 	idxKey []byte | ||
|  | } | ||
|  | 
 | ||
|  | type DbStore struct { | ||
|  | 	db *LDBDatabase | ||
|  | 
 | ||
|  | 	// this should be stored in db, accessed transactionally | ||
|  | 	entryCnt, accessCnt, dataIdx, capacity uint64 | ||
|  | 
 | ||
|  | 	gcPos, gcStartPos []byte | ||
|  | 	gcArray           []*gcItem | ||
|  | 
 | ||
|  | 	hashfunc Hasher | ||
|  | 
 | ||
|  | 	lock sync.Mutex | ||
|  | } | ||
|  | 
 | ||
|  | func NewDbStore(path string, hash Hasher, capacity uint64, radius int) (s *DbStore, err error) { | ||
|  | 	s = new(DbStore) | ||
|  | 
 | ||
|  | 	s.hashfunc = hash | ||
|  | 
 | ||
|  | 	s.db, err = NewLDBDatabase(path) | ||
|  | 	if err != nil { | ||
|  | 		return | ||
|  | 	} | ||
|  | 
 | ||
|  | 	s.setCapacity(capacity) | ||
|  | 
 | ||
|  | 	s.gcStartPos = make([]byte, 1) | ||
|  | 	s.gcStartPos[0] = kpIndex | ||
|  | 	s.gcArray = make([]*gcItem, gcArraySize) | ||
|  | 
 | ||
|  | 	data, _ := s.db.Get(keyEntryCnt) | ||
|  | 	s.entryCnt = BytesToU64(data) | ||
|  | 	data, _ = s.db.Get(keyAccessCnt) | ||
|  | 	s.accessCnt = BytesToU64(data) | ||
|  | 	data, _ = s.db.Get(keyDataIdx) | ||
|  | 	s.dataIdx = BytesToU64(data) | ||
|  | 	s.gcPos, _ = s.db.Get(keyGCPos) | ||
|  | 	if s.gcPos == nil { | ||
|  | 		s.gcPos = s.gcStartPos | ||
|  | 	} | ||
|  | 	return | ||
|  | } | ||
|  | 
 | ||
|  | type dpaDBIndex struct { | ||
|  | 	Idx    uint64 | ||
|  | 	Access uint64 | ||
|  | } | ||
|  | 
 | ||
|  | func BytesToU64(data []byte) uint64 { | ||
|  | 	if len(data) < 8 { | ||
|  | 		return 0 | ||
|  | 	} | ||
|  | 	return binary.LittleEndian.Uint64(data) | ||
|  | } | ||
|  | 
 | ||
|  | func U64ToBytes(val uint64) []byte { | ||
|  | 	data := make([]byte, 8) | ||
|  | 	binary.LittleEndian.PutUint64(data, val) | ||
|  | 	return data | ||
|  | } | ||
|  | 
 | ||
|  | func getIndexGCValue(index *dpaDBIndex) uint64 { | ||
|  | 	return index.Access | ||
|  | } | ||
|  | 
 | ||
|  | func (s *DbStore) updateIndexAccess(index *dpaDBIndex) { | ||
|  | 	index.Access = s.accessCnt | ||
|  | } | ||
|  | 
 | ||
|  | func getIndexKey(hash Key) []byte { | ||
|  | 	HashSize := len(hash) | ||
|  | 	key := make([]byte, HashSize+1) | ||
|  | 	key[0] = 0 | ||
|  | 	copy(key[1:], hash[:]) | ||
|  | 	return key | ||
|  | } | ||
|  | 
 | ||
|  | func getDataKey(idx uint64) []byte { | ||
|  | 	key := make([]byte, 9) | ||
|  | 	key[0] = 1 | ||
|  | 	binary.BigEndian.PutUint64(key[1:9], idx) | ||
|  | 
 | ||
|  | 	return key | ||
|  | } | ||
|  | 
 | ||
|  | func encodeIndex(index *dpaDBIndex) []byte { | ||
|  | 	data, _ := rlp.EncodeToBytes(index) | ||
|  | 	return data | ||
|  | } | ||
|  | 
 | ||
|  | func encodeData(chunk *Chunk) []byte { | ||
|  | 	return chunk.SData | ||
|  | } | ||
|  | 
 | ||
|  | func decodeIndex(data []byte, index *dpaDBIndex) { | ||
|  | 	dec := rlp.NewStream(bytes.NewReader(data), 0) | ||
|  | 	dec.Decode(index) | ||
|  | } | ||
|  | 
 | ||
|  | func decodeData(data []byte, chunk *Chunk) { | ||
|  | 	chunk.SData = data | ||
|  | 	chunk.Size = int64(binary.LittleEndian.Uint64(data[0:8])) | ||
|  | } | ||
|  | 
 | ||
|  | func gcListPartition(list []*gcItem, left int, right int, pivotIndex int) int { | ||
|  | 	pivotValue := list[pivotIndex].value | ||
|  | 	dd := list[pivotIndex] | ||
|  | 	list[pivotIndex] = list[right] | ||
|  | 	list[right] = dd | ||
|  | 	storeIndex := left | ||
|  | 	for i := left; i < right; i++ { | ||
|  | 		if list[i].value < pivotValue { | ||
|  | 			dd = list[storeIndex] | ||
|  | 			list[storeIndex] = list[i] | ||
|  | 			list[i] = dd | ||
|  | 			storeIndex++ | ||
|  | 		} | ||
|  | 	} | ||
|  | 	dd = list[storeIndex] | ||
|  | 	list[storeIndex] = list[right] | ||
|  | 	list[right] = dd | ||
|  | 	return storeIndex | ||
|  | } | ||
|  | 
 | ||
|  | func gcListSelect(list []*gcItem, left int, right int, n int) int { | ||
|  | 	if left == right { | ||
|  | 		return left | ||
|  | 	} | ||
|  | 	pivotIndex := (left + right) / 2 | ||
|  | 	pivotIndex = gcListPartition(list, left, right, pivotIndex) | ||
|  | 	if n == pivotIndex { | ||
|  | 		return n | ||
|  | 	} else { | ||
|  | 		if n < pivotIndex { | ||
|  | 			return gcListSelect(list, left, pivotIndex-1, n) | ||
|  | 		} else { | ||
|  | 			return gcListSelect(list, pivotIndex+1, right, n) | ||
|  | 		} | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | func (s *DbStore) collectGarbage(ratio float32) { | ||
|  | 	it := s.db.NewIterator() | ||
|  | 	it.Seek(s.gcPos) | ||
|  | 	if it.Valid() { | ||
|  | 		s.gcPos = it.Key() | ||
|  | 	} else { | ||
|  | 		s.gcPos = nil | ||
|  | 	} | ||
|  | 	gcnt := 0 | ||
|  | 
 | ||
|  | 	for (gcnt < gcArraySize) && (uint64(gcnt) < s.entryCnt) { | ||
|  | 
 | ||
|  | 		if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) { | ||
|  | 			it.Seek(s.gcStartPos) | ||
|  | 			if it.Valid() { | ||
|  | 				s.gcPos = it.Key() | ||
|  | 			} else { | ||
|  | 				s.gcPos = nil | ||
|  | 			} | ||
|  | 		} | ||
|  | 
 | ||
|  | 		if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) { | ||
|  | 			break | ||
|  | 		} | ||
|  | 
 | ||
|  | 		gci := new(gcItem) | ||
|  | 		gci.idxKey = s.gcPos | ||
|  | 		var index dpaDBIndex | ||
|  | 		decodeIndex(it.Value(), &index) | ||
|  | 		gci.idx = index.Idx | ||
|  | 		// the smaller, the more likely to be gc'd | ||
|  | 		gci.value = getIndexGCValue(&index) | ||
|  | 		s.gcArray[gcnt] = gci | ||
|  | 		gcnt++ | ||
|  | 		it.Next() | ||
|  | 		if it.Valid() { | ||
|  | 			s.gcPos = it.Key() | ||
|  | 		} else { | ||
|  | 			s.gcPos = nil | ||
|  | 		} | ||
|  | 	} | ||
|  | 	it.Release() | ||
|  | 
 | ||
|  | 	cutidx := gcListSelect(s.gcArray, 0, gcnt-1, int(float32(gcnt)*ratio)) | ||
|  | 	cutval := s.gcArray[cutidx].value | ||
|  | 
 | ||
|  | 	// fmt.Print(gcnt, " ", s.entryCnt, " ") | ||
|  | 
 | ||
|  | 	// actual gc | ||
|  | 	for i := 0; i < gcnt; i++ { | ||
|  | 		if s.gcArray[i].value <= cutval { | ||
|  | 			batch := new(leveldb.Batch) | ||
|  | 			batch.Delete(s.gcArray[i].idxKey) | ||
|  | 			batch.Delete(getDataKey(s.gcArray[i].idx)) | ||
|  | 			s.entryCnt-- | ||
|  | 			batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt)) | ||
|  | 			s.db.Write(batch) | ||
|  | 		} | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// fmt.Println(s.entryCnt) | ||
|  | 
 | ||
|  | 	s.db.Put(keyGCPos, s.gcPos) | ||
|  | } | ||
|  | 
 | ||
|  | func (s *DbStore) Counter() uint64 { | ||
|  | 	s.lock.Lock() | ||
|  | 	defer s.lock.Unlock() | ||
|  | 	return s.dataIdx | ||
|  | } | ||
|  | 
 | ||
|  | func (s *DbStore) Put(chunk *Chunk) { | ||
|  | 	s.lock.Lock() | ||
|  | 	defer s.lock.Unlock() | ||
|  | 
 | ||
|  | 	ikey := getIndexKey(chunk.Key) | ||
|  | 	var index dpaDBIndex | ||
|  | 
 | ||
|  | 	if s.tryAccessIdx(ikey, &index) { | ||
|  | 		if chunk.dbStored != nil { | ||
|  | 			close(chunk.dbStored) | ||
|  | 		} | ||
|  | 		return // already exists, only update access | ||
|  | 	} | ||
|  | 
 | ||
|  | 	data := encodeData(chunk) | ||
|  | 	//data := ethutil.Encode([]interface{}{entry}) | ||
|  | 
 | ||
|  | 	if s.entryCnt >= s.capacity { | ||
|  | 		s.collectGarbage(gcArrayFreeRatio) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	batch := new(leveldb.Batch) | ||
|  | 
 | ||
|  | 	batch.Put(getDataKey(s.dataIdx), data) | ||
|  | 
 | ||
|  | 	index.Idx = s.dataIdx | ||
|  | 	s.updateIndexAccess(&index) | ||
|  | 
 | ||
|  | 	idata := encodeIndex(&index) | ||
|  | 	batch.Put(ikey, idata) | ||
|  | 
 | ||
|  | 	batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt)) | ||
|  | 	s.entryCnt++ | ||
|  | 	batch.Put(keyDataIdx, U64ToBytes(s.dataIdx)) | ||
|  | 	s.dataIdx++ | ||
|  | 	batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt)) | ||
|  | 	s.accessCnt++ | ||
|  | 
 | ||
|  | 	s.db.Write(batch) | ||
|  | 	if chunk.dbStored != nil { | ||
|  | 		close(chunk.dbStored) | ||
|  | 	} | ||
|  | 	glog.V(logger.Detail).Infof("DbStore.Put: %v. db storage counter: %v ", chunk.Key.Log(), s.dataIdx) | ||
|  | } | ||
|  | 
 | ||
|  | // try to find index; if found, update access cnt and return true | ||
|  | func (s *DbStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool { | ||
|  | 	idata, err := s.db.Get(ikey) | ||
|  | 	if err != nil { | ||
|  | 		return false | ||
|  | 	} | ||
|  | 	decodeIndex(idata, index) | ||
|  | 
 | ||
|  | 	batch := new(leveldb.Batch) | ||
|  | 
 | ||
|  | 	batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt)) | ||
|  | 	s.accessCnt++ | ||
|  | 	s.updateIndexAccess(index) | ||
|  | 	idata = encodeIndex(index) | ||
|  | 	batch.Put(ikey, idata) | ||
|  | 
 | ||
|  | 	s.db.Write(batch) | ||
|  | 
 | ||
|  | 	return true | ||
|  | } | ||
|  | 
 | ||
|  | func (s *DbStore) Get(key Key) (chunk *Chunk, err error) { | ||
|  | 	s.lock.Lock() | ||
|  | 	defer s.lock.Unlock() | ||
|  | 
 | ||
|  | 	var index dpaDBIndex | ||
|  | 
 | ||
|  | 	if s.tryAccessIdx(getIndexKey(key), &index) { | ||
|  | 		var data []byte | ||
|  | 		data, err = s.db.Get(getDataKey(index.Idx)) | ||
|  | 		if err != nil { | ||
|  | 			return | ||
|  | 		} | ||
|  | 
 | ||
|  | 		hasher := s.hashfunc() | ||
|  | 		hasher.Write(data) | ||
|  | 		hash := hasher.Sum(nil) | ||
|  | 		if bytes.Compare(hash, key) != 0 { | ||
|  | 			s.db.Delete(getDataKey(index.Idx)) | ||
|  | 			err = fmt.Errorf("invalid chunk. hash=%x, key=%v", hash, key[:]) | ||
|  | 			return | ||
|  | 		} | ||
|  | 
 | ||
|  | 		chunk = &Chunk{ | ||
|  | 			Key: key, | ||
|  | 		} | ||
|  | 		decodeData(data, chunk) | ||
|  | 	} else { | ||
|  | 		err = notFound | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return | ||
|  | 
 | ||
|  | } | ||
|  | 
 | ||
|  | func (s *DbStore) updateAccessCnt(key Key) { | ||
|  | 
 | ||
|  | 	s.lock.Lock() | ||
|  | 	defer s.lock.Unlock() | ||
|  | 
 | ||
|  | 	var index dpaDBIndex | ||
|  | 	s.tryAccessIdx(getIndexKey(key), &index) // result_chn == nil, only update access cnt | ||
|  | 
 | ||
|  | } | ||
|  | 
 | ||
|  | func (s *DbStore) setCapacity(c uint64) { | ||
|  | 
 | ||
|  | 	s.lock.Lock() | ||
|  | 	defer s.lock.Unlock() | ||
|  | 
 | ||
|  | 	s.capacity = c | ||
|  | 
 | ||
|  | 	if s.entryCnt > c { | ||
|  | 		var ratio float32 | ||
|  | 		ratio = float32(1.01) - float32(c)/float32(s.entryCnt) | ||
|  | 		if ratio < gcArrayFreeRatio { | ||
|  | 			ratio = gcArrayFreeRatio | ||
|  | 		} | ||
|  | 		if ratio > 1 { | ||
|  | 			ratio = 1 | ||
|  | 		} | ||
|  | 		for s.entryCnt > c { | ||
|  | 			s.collectGarbage(ratio) | ||
|  | 		} | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | func (s *DbStore) getEntryCnt() uint64 { | ||
|  | 	return s.entryCnt | ||
|  | } | ||
|  | 
 | ||
|  | func (s *DbStore) close() { | ||
|  | 	s.db.Close() | ||
|  | } | ||
|  | 
 | ||
|  | //  describes a section of the DbStore representing the unsynced | ||
|  | // domain relevant to a peer | ||
|  | // Start - Stop designate a continuous area Keys in an address space | ||
|  | // typically the addresses closer to us than to the peer but not closer | ||
|  | // another closer peer in between | ||
|  | // From - To designates a time interval typically from the last disconnect | ||
|  | // till the latest connection (real time traffic is relayed) | ||
|  | type DbSyncState struct { | ||
|  | 	Start, Stop Key | ||
|  | 	First, Last uint64 | ||
|  | } | ||
|  | 
 | ||
|  | // implements the syncer iterator interface | ||
|  | // iterates by storage index (~ time of storage = first entry to db) | ||
|  | type dbSyncIterator struct { | ||
|  | 	it iterator.Iterator | ||
|  | 	DbSyncState | ||
|  | } | ||
|  | 
 | ||
|  | // initialises a sync iterator from a syncToken (passed in with the handshake) | ||
|  | func (self *DbStore) NewSyncIterator(state DbSyncState) (si *dbSyncIterator, err error) { | ||
|  | 	if state.First > state.Last { | ||
|  | 		return nil, fmt.Errorf("no entries found") | ||
|  | 	} | ||
|  | 	si = &dbSyncIterator{ | ||
|  | 		it:          self.db.NewIterator(), | ||
|  | 		DbSyncState: state, | ||
|  | 	} | ||
|  | 	si.it.Seek(getIndexKey(state.Start)) | ||
|  | 	return si, nil | ||
|  | } | ||
|  | 
 | ||
|  | // walk the area from Start to Stop and returns items within time interval | ||
|  | // First to Last | ||
|  | func (self *dbSyncIterator) Next() (key Key) { | ||
|  | 	for self.it.Valid() { | ||
|  | 		dbkey := self.it.Key() | ||
|  | 		if dbkey[0] != 0 { | ||
|  | 			break | ||
|  | 		} | ||
|  | 		key = Key(make([]byte, len(dbkey)-1)) | ||
|  | 		copy(key[:], dbkey[1:]) | ||
|  | 		if bytes.Compare(key[:], self.Start) <= 0 { | ||
|  | 			self.it.Next() | ||
|  | 			continue | ||
|  | 		} | ||
|  | 		if bytes.Compare(key[:], self.Stop) > 0 { | ||
|  | 			break | ||
|  | 		} | ||
|  | 		var index dpaDBIndex | ||
|  | 		decodeIndex(self.it.Value(), &index) | ||
|  | 		self.it.Next() | ||
|  | 		if (index.Idx >= self.First) && (index.Idx < self.Last) { | ||
|  | 			return | ||
|  | 		} | ||
|  | 	} | ||
|  | 	self.it.Release() | ||
|  | 	return nil | ||
|  | } |