1081 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1081 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2016 The go-ethereum Authors
 | 
						|
// This file is part of the go-ethereum library.
 | 
						|
//
 | 
						|
// The go-ethereum library is free software: you can redistribute it and/or modify
 | 
						|
// it under the terms of the GNU Lesser General Public License as published by
 | 
						|
// the Free Software Foundation, either version 3 of the License, or
 | 
						|
// (at your option) any later version.
 | 
						|
//
 | 
						|
// The go-ethereum library is distributed in the hope that it will be useful,
 | 
						|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | 
						|
// GNU Lesser General Public License for more details.
 | 
						|
//
 | 
						|
// You should have received a copy of the GNU Lesser General Public License
 | 
						|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
// disk storage layer for the package bzz
 | 
						|
// DbStore implements the ChunkStore interface and is used by the FileStore as
 | 
						|
// persistent storage of chunks
 | 
						|
// it implements purging based on access count allowing for external control of
 | 
						|
// max capacity
 | 
						|
 | 
						|
package storage
 | 
						|
 | 
						|
import (
 | 
						|
	"archive/tar"
 | 
						|
	"bytes"
 | 
						|
	"context"
 | 
						|
	"encoding/binary"
 | 
						|
	"encoding/hex"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"io/ioutil"
 | 
						|
	"sync"
 | 
						|
 | 
						|
	"github.com/ethereum/go-ethereum/metrics"
 | 
						|
	"github.com/ethereum/go-ethereum/rlp"
 | 
						|
	"github.com/ethereum/go-ethereum/swarm/log"
 | 
						|
	"github.com/ethereum/go-ethereum/swarm/storage/mock"
 | 
						|
	"github.com/syndtr/goleveldb/leveldb"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	defaultGCRatio    = 10
 | 
						|
	defaultMaxGCRound = 10000
 | 
						|
	defaultMaxGCBatch = 5000
 | 
						|
 | 
						|
	wEntryCnt  = 1 << 0
 | 
						|
	wIndexCnt  = 1 << 1
 | 
						|
	wAccessCnt = 1 << 2
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	dbEntryCount = metrics.NewRegisteredCounter("ldbstore.entryCnt", nil)
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	keyIndex       = byte(0)
 | 
						|
	keyAccessCnt   = []byte{2}
 | 
						|
	keyEntryCnt    = []byte{3}
 | 
						|
	keyDataIdx     = []byte{4}
 | 
						|
	keyData        = byte(6)
 | 
						|
	keyDistanceCnt = byte(7)
 | 
						|
	keySchema      = []byte{8}
 | 
						|
	keyGCIdx       = byte(9) // access to chunk data index, used by garbage collection in ascending order from first entry
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	ErrDBClosed = errors.New("LDBStore closed")
 | 
						|
)
 | 
						|
 | 
						|
type LDBStoreParams struct {
 | 
						|
	*StoreParams
 | 
						|
	Path string
 | 
						|
	Po   func(Address) uint8
 | 
						|
}
 | 
						|
 | 
						|
// NewLDBStoreParams constructs LDBStoreParams with the specified values.
 | 
						|
func NewLDBStoreParams(storeparams *StoreParams, path string) *LDBStoreParams {
 | 
						|
	return &LDBStoreParams{
 | 
						|
		StoreParams: storeparams,
 | 
						|
		Path:        path,
 | 
						|
		Po:          func(k Address) (ret uint8) { return uint8(Proximity(storeparams.BaseKey, k[:])) },
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
type garbage struct {
 | 
						|
	maxRound int           // maximum number of chunks to delete in one garbage collection round
 | 
						|
	maxBatch int           // maximum number of chunks to delete in one db request batch
 | 
						|
	ratio    int           // 1/x ratio to calculate the number of chunks to gc on a low capacity db
 | 
						|
	count    int           // number of chunks deleted in running round
 | 
						|
	target   int           // number of chunks to delete in running round
 | 
						|
	batch    *dbBatch      // the delete batch
 | 
						|
	runC     chan struct{} // struct in chan means gc is NOT running
 | 
						|
}
 | 
						|
 | 
						|
type LDBStore struct {
 | 
						|
	db *LDBDatabase
 | 
						|
 | 
						|
	// this should be stored in db, accessed transactionally
 | 
						|
	entryCnt  uint64 // number of items in the LevelDB
 | 
						|
	accessCnt uint64 // ever-accumulating number increased every time we read/access an entry
 | 
						|
	dataIdx   uint64 // similar to entryCnt, but we only increment it
 | 
						|
	capacity  uint64
 | 
						|
	bucketCnt []uint64
 | 
						|
 | 
						|
	hashfunc SwarmHasher
 | 
						|
	po       func(Address) uint8
 | 
						|
 | 
						|
	batchesC chan struct{}
 | 
						|
	closed   bool
 | 
						|
	batch    *dbBatch
 | 
						|
	lock     sync.RWMutex
 | 
						|
	quit     chan struct{}
 | 
						|
	gc       *garbage
 | 
						|
 | 
						|
	// Functions encodeDataFunc is used to bypass
 | 
						|
	// the default functionality of DbStore with
 | 
						|
	// mock.NodeStore for testing purposes.
 | 
						|
	encodeDataFunc func(chunk Chunk) []byte
 | 
						|
	// If getDataFunc is defined, it will be used for
 | 
						|
	// retrieving the chunk data instead from the local
 | 
						|
	// LevelDB database.
 | 
						|
	getDataFunc func(key Address) (data []byte, err error)
 | 
						|
}
 | 
						|
 | 
						|
type dbBatch struct {
 | 
						|
	*leveldb.Batch
 | 
						|
	err error
 | 
						|
	c   chan struct{}
 | 
						|
}
 | 
						|
 | 
						|
func newBatch() *dbBatch {
 | 
						|
	return &dbBatch{Batch: new(leveldb.Batch), c: make(chan struct{})}
 | 
						|
}
 | 
						|
 | 
						|
// TODO: Instead of passing the distance function, just pass the address from which distances are calculated
 | 
						|
// to avoid the appearance of a pluggable distance metric and opportunities of bugs associated with providing
 | 
						|
// a function different from the one that is actually used.
 | 
						|
func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) {
 | 
						|
	s = new(LDBStore)
 | 
						|
	s.hashfunc = params.Hash
 | 
						|
	s.quit = make(chan struct{})
 | 
						|
 | 
						|
	s.batchesC = make(chan struct{}, 1)
 | 
						|
	go s.writeBatches()
 | 
						|
	s.batch = newBatch()
 | 
						|
	// associate encodeData with default functionality
 | 
						|
	s.encodeDataFunc = encodeData
 | 
						|
 | 
						|
	s.db, err = NewLDBDatabase(params.Path)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	s.po = params.Po
 | 
						|
	s.setCapacity(params.DbCapacity)
 | 
						|
 | 
						|
	s.bucketCnt = make([]uint64, 0x100)
 | 
						|
	for i := 0; i < 0x100; i++ {
 | 
						|
		k := make([]byte, 2)
 | 
						|
		k[0] = keyDistanceCnt
 | 
						|
		k[1] = uint8(i)
 | 
						|
		cnt, _ := s.db.Get(k)
 | 
						|
		s.bucketCnt[i] = BytesToU64(cnt)
 | 
						|
	}
 | 
						|
	data, _ := s.db.Get(keyEntryCnt)
 | 
						|
	s.entryCnt = BytesToU64(data)
 | 
						|
	data, _ = s.db.Get(keyAccessCnt)
 | 
						|
	s.accessCnt = BytesToU64(data)
 | 
						|
	data, _ = s.db.Get(keyDataIdx)
 | 
						|
	s.dataIdx = BytesToU64(data)
 | 
						|
 | 
						|
	// set up garbage collection
 | 
						|
	s.gc = &garbage{
 | 
						|
		maxBatch: defaultMaxGCBatch,
 | 
						|
		maxRound: defaultMaxGCRound,
 | 
						|
		ratio:    defaultGCRatio,
 | 
						|
	}
 | 
						|
 | 
						|
	s.gc.runC = make(chan struct{}, 1)
 | 
						|
	s.gc.runC <- struct{}{}
 | 
						|
 | 
						|
	return s, nil
 | 
						|
}
 | 
						|
 | 
						|
// MarkAccessed increments the access counter as a best effort for a chunk, so
 | 
						|
// the chunk won't get garbage collected.
 | 
						|
func (s *LDBStore) MarkAccessed(addr Address) {
 | 
						|
	s.lock.Lock()
 | 
						|
	defer s.lock.Unlock()
 | 
						|
 | 
						|
	if s.closed {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	proximity := s.po(addr)
 | 
						|
	s.tryAccessIdx(addr, proximity)
 | 
						|
}
 | 
						|
 | 
						|
// initialize and set values for processing of gc round
 | 
						|
func (s *LDBStore) startGC(c int) {
 | 
						|
 | 
						|
	s.gc.count = 0
 | 
						|
	// calculate the target number of deletions
 | 
						|
	if c >= s.gc.maxRound {
 | 
						|
		s.gc.target = s.gc.maxRound
 | 
						|
	} else {
 | 
						|
		s.gc.target = c / s.gc.ratio
 | 
						|
	}
 | 
						|
	s.gc.batch = newBatch()
 | 
						|
	log.Debug("startgc", "requested", c, "target", s.gc.target)
 | 
						|
}
 | 
						|
 | 
						|
// NewMockDbStore creates a new instance of DbStore with
 | 
						|
// mockStore set to a provided value. If mockStore argument is nil,
 | 
						|
// this function behaves exactly as NewDbStore.
 | 
						|
func NewMockDbStore(params *LDBStoreParams, mockStore *mock.NodeStore) (s *LDBStore, err error) {
 | 
						|
	s, err = NewLDBStore(params)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// replace put and get with mock store functionality
 | 
						|
	if mockStore != nil {
 | 
						|
		s.encodeDataFunc = newMockEncodeDataFunc(mockStore)
 | 
						|
		s.getDataFunc = newMockGetDataFunc(mockStore)
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
type dpaDBIndex struct {
 | 
						|
	Idx    uint64
 | 
						|
	Access uint64
 | 
						|
}
 | 
						|
 | 
						|
func BytesToU64(data []byte) uint64 {
 | 
						|
	if len(data) < 8 {
 | 
						|
		return 0
 | 
						|
	}
 | 
						|
	return binary.BigEndian.Uint64(data)
 | 
						|
}
 | 
						|
 | 
						|
func U64ToBytes(val uint64) []byte {
 | 
						|
	data := make([]byte, 8)
 | 
						|
	binary.BigEndian.PutUint64(data, val)
 | 
						|
	return data
 | 
						|
}
 | 
						|
 | 
						|
func getIndexKey(hash Address) []byte {
 | 
						|
	hashSize := len(hash)
 | 
						|
	key := make([]byte, hashSize+1)
 | 
						|
	key[0] = keyIndex
 | 
						|
	copy(key[1:], hash[:])
 | 
						|
	return key
 | 
						|
}
 | 
						|
 | 
						|
func getDataKey(idx uint64, po uint8) []byte {
 | 
						|
	key := make([]byte, 10)
 | 
						|
	key[0] = keyData
 | 
						|
	key[1] = po
 | 
						|
	binary.BigEndian.PutUint64(key[2:], idx)
 | 
						|
 | 
						|
	return key
 | 
						|
}
 | 
						|
 | 
						|
func getGCIdxKey(index *dpaDBIndex) []byte {
 | 
						|
	key := make([]byte, 9)
 | 
						|
	key[0] = keyGCIdx
 | 
						|
	binary.BigEndian.PutUint64(key[1:], index.Access)
 | 
						|
	return key
 | 
						|
}
 | 
						|
 | 
						|
func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte {
 | 
						|
	val := make([]byte, 41) // po = 1, index.Index = 8, Address = 32
 | 
						|
	val[0] = po
 | 
						|
	binary.BigEndian.PutUint64(val[1:], index.Idx)
 | 
						|
	copy(val[9:], addr)
 | 
						|
	return val
 | 
						|
}
 | 
						|
 | 
						|
func parseIdxKey(key []byte) (byte, []byte) {
 | 
						|
	return key[0], key[1:]
 | 
						|
}
 | 
						|
 | 
						|
func parseGCIdxEntry(accessCnt []byte, val []byte) (index *dpaDBIndex, po uint8, addr Address) {
 | 
						|
	index = &dpaDBIndex{
 | 
						|
		Idx:    binary.BigEndian.Uint64(val[1:]),
 | 
						|
		Access: binary.BigEndian.Uint64(accessCnt),
 | 
						|
	}
 | 
						|
	po = val[0]
 | 
						|
	addr = val[9:]
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func encodeIndex(index *dpaDBIndex) []byte {
 | 
						|
	data, _ := rlp.EncodeToBytes(index)
 | 
						|
	return data
 | 
						|
}
 | 
						|
 | 
						|
func encodeData(chunk Chunk) []byte {
 | 
						|
	// Always create a new underlying array for the returned byte slice.
 | 
						|
	// The chunk.Address array may be used in the returned slice which
 | 
						|
	// may be changed later in the code or by the LevelDB, resulting
 | 
						|
	// that the Address is changed as well.
 | 
						|
	return append(append([]byte{}, chunk.Address()[:]...), chunk.Data()...)
 | 
						|
}
 | 
						|
 | 
						|
func decodeIndex(data []byte, index *dpaDBIndex) error {
 | 
						|
	dec := rlp.NewStream(bytes.NewReader(data), 0)
 | 
						|
	return dec.Decode(index)
 | 
						|
}
 | 
						|
 | 
						|
func decodeData(addr Address, data []byte) (*chunk, error) {
 | 
						|
	return NewChunk(addr, data[32:]), nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *LDBStore) collectGarbage() error {
 | 
						|
 | 
						|
	// prevent duplicate gc from starting when one is already running
 | 
						|
	select {
 | 
						|
	case <-s.gc.runC:
 | 
						|
	default:
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	s.lock.Lock()
 | 
						|
	entryCnt := s.entryCnt
 | 
						|
	s.lock.Unlock()
 | 
						|
 | 
						|
	metrics.GetOrRegisterCounter("ldbstore.collectgarbage", nil).Inc(1)
 | 
						|
 | 
						|
	// calculate the amount of chunks to collect and reset counter
 | 
						|
	s.startGC(int(entryCnt))
 | 
						|
	log.Debug("collectGarbage", "target", s.gc.target, "entryCnt", entryCnt)
 | 
						|
 | 
						|
	var totalDeleted int
 | 
						|
	for s.gc.count < s.gc.target {
 | 
						|
		it := s.db.NewIterator()
 | 
						|
		ok := it.Seek([]byte{keyGCIdx})
 | 
						|
		var singleIterationCount int
 | 
						|
 | 
						|
		// every batch needs a lock so we avoid entries changing accessidx in the meantime
 | 
						|
		s.lock.Lock()
 | 
						|
		for ; ok && (singleIterationCount < s.gc.maxBatch); ok = it.Next() {
 | 
						|
 | 
						|
			// quit if no more access index keys
 | 
						|
			itkey := it.Key()
 | 
						|
			if (itkey == nil) || (itkey[0] != keyGCIdx) {
 | 
						|
				break
 | 
						|
			}
 | 
						|
 | 
						|
			// get chunk data entry from access index
 | 
						|
			val := it.Value()
 | 
						|
			index, po, hash := parseGCIdxEntry(itkey[1:], val)
 | 
						|
			keyIdx := make([]byte, 33)
 | 
						|
			keyIdx[0] = keyIndex
 | 
						|
			copy(keyIdx[1:], hash)
 | 
						|
 | 
						|
			// add delete operation to batch
 | 
						|
			s.delete(s.gc.batch.Batch, index, keyIdx, po)
 | 
						|
			singleIterationCount++
 | 
						|
			s.gc.count++
 | 
						|
			log.Trace("garbage collect enqueued chunk for deletion", "key", hash)
 | 
						|
 | 
						|
			// break if target is not on max garbage batch boundary
 | 
						|
			if s.gc.count >= s.gc.target {
 | 
						|
				break
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		s.writeBatch(s.gc.batch, wEntryCnt)
 | 
						|
		s.lock.Unlock()
 | 
						|
		it.Release()
 | 
						|
		log.Trace("garbage collect batch done", "batch", singleIterationCount, "total", s.gc.count)
 | 
						|
	}
 | 
						|
 | 
						|
	s.gc.runC <- struct{}{}
 | 
						|
	log.Debug("garbage collect done", "c", s.gc.count)
 | 
						|
 | 
						|
	metrics.GetOrRegisterCounter("ldbstore.collectgarbage.delete", nil).Inc(int64(totalDeleted))
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Export writes all chunks from the store to a tar archive, returning the
 | 
						|
// number of chunks written.
 | 
						|
func (s *LDBStore) Export(out io.Writer) (int64, error) {
 | 
						|
	tw := tar.NewWriter(out)
 | 
						|
	defer tw.Close()
 | 
						|
 | 
						|
	it := s.db.NewIterator()
 | 
						|
	defer it.Release()
 | 
						|
	var count int64
 | 
						|
	for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
 | 
						|
		key := it.Key()
 | 
						|
		if (key == nil) || (key[0] != keyIndex) {
 | 
						|
			break
 | 
						|
		}
 | 
						|
 | 
						|
		var index dpaDBIndex
 | 
						|
 | 
						|
		hash := key[1:]
 | 
						|
		decodeIndex(it.Value(), &index)
 | 
						|
		po := s.po(hash)
 | 
						|
		datakey := getDataKey(index.Idx, po)
 | 
						|
		log.Trace("store.export", "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po)
 | 
						|
		data, err := s.db.Get(datakey)
 | 
						|
		if err != nil {
 | 
						|
			log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key, err))
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		hdr := &tar.Header{
 | 
						|
			Name: hex.EncodeToString(hash),
 | 
						|
			Mode: 0644,
 | 
						|
			Size: int64(len(data)),
 | 
						|
		}
 | 
						|
		if err := tw.WriteHeader(hdr); err != nil {
 | 
						|
			return count, err
 | 
						|
		}
 | 
						|
		if _, err := tw.Write(data); err != nil {
 | 
						|
			return count, err
 | 
						|
		}
 | 
						|
		count++
 | 
						|
	}
 | 
						|
 | 
						|
	return count, nil
 | 
						|
}
 | 
						|
 | 
						|
// of chunks read.
 | 
						|
func (s *LDBStore) Import(in io.Reader) (int64, error) {
 | 
						|
	tr := tar.NewReader(in)
 | 
						|
 | 
						|
	ctx, cancel := context.WithCancel(context.Background())
 | 
						|
	defer cancel()
 | 
						|
 | 
						|
	countC := make(chan int64)
 | 
						|
	errC := make(chan error)
 | 
						|
	var count int64
 | 
						|
	go func() {
 | 
						|
		for {
 | 
						|
			hdr, err := tr.Next()
 | 
						|
			if err == io.EOF {
 | 
						|
				break
 | 
						|
			} else if err != nil {
 | 
						|
				select {
 | 
						|
				case errC <- err:
 | 
						|
				case <-ctx.Done():
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			if len(hdr.Name) != 64 {
 | 
						|
				log.Warn("ignoring non-chunk file", "name", hdr.Name)
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			keybytes, err := hex.DecodeString(hdr.Name)
 | 
						|
			if err != nil {
 | 
						|
				log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err)
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			data, err := ioutil.ReadAll(tr)
 | 
						|
			if err != nil {
 | 
						|
				select {
 | 
						|
				case errC <- err:
 | 
						|
				case <-ctx.Done():
 | 
						|
				}
 | 
						|
			}
 | 
						|
			key := Address(keybytes)
 | 
						|
			chunk := NewChunk(key, data[32:])
 | 
						|
 | 
						|
			go func() {
 | 
						|
				select {
 | 
						|
				case errC <- s.Put(ctx, chunk):
 | 
						|
				case <-ctx.Done():
 | 
						|
				}
 | 
						|
			}()
 | 
						|
 | 
						|
			count++
 | 
						|
		}
 | 
						|
		countC <- count
 | 
						|
	}()
 | 
						|
 | 
						|
	// wait for all chunks to be stored
 | 
						|
	i := int64(0)
 | 
						|
	var total int64
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case err := <-errC:
 | 
						|
			if err != nil {
 | 
						|
				return count, err
 | 
						|
			}
 | 
						|
			i++
 | 
						|
		case total = <-countC:
 | 
						|
		case <-ctx.Done():
 | 
						|
			return i, ctx.Err()
 | 
						|
		}
 | 
						|
		if total > 0 && i == total {
 | 
						|
			return total, nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Cleanup iterates over the database and deletes chunks if they pass the `f` condition
 | 
						|
func (s *LDBStore) Cleanup(f func(*chunk) bool) {
 | 
						|
	var errorsFound, removed, total int
 | 
						|
 | 
						|
	it := s.db.NewIterator()
 | 
						|
	defer it.Release()
 | 
						|
	for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
 | 
						|
		key := it.Key()
 | 
						|
		if (key == nil) || (key[0] != keyIndex) {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		total++
 | 
						|
		var index dpaDBIndex
 | 
						|
		err := decodeIndex(it.Value(), &index)
 | 
						|
		if err != nil {
 | 
						|
			log.Warn("Cannot decode")
 | 
						|
			errorsFound++
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		hash := key[1:]
 | 
						|
		po := s.po(hash)
 | 
						|
		datakey := getDataKey(index.Idx, po)
 | 
						|
		data, err := s.db.Get(datakey)
 | 
						|
		if err != nil {
 | 
						|
			found := false
 | 
						|
 | 
						|
			// highest possible proximity is 255
 | 
						|
			for po = 1; po <= 255; po++ {
 | 
						|
				datakey = getDataKey(index.Idx, po)
 | 
						|
				data, err = s.db.Get(datakey)
 | 
						|
				if err == nil {
 | 
						|
					found = true
 | 
						|
					break
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			if !found {
 | 
						|
				log.Warn(fmt.Sprintf("Chunk %x found but count not be accessed with any po", key))
 | 
						|
				errorsFound++
 | 
						|
				continue
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		ck := data[:32]
 | 
						|
		c, err := decodeData(ck, data)
 | 
						|
		if err != nil {
 | 
						|
			log.Error("decodeData error", "err", err)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		cs := int64(binary.LittleEndian.Uint64(c.sdata[:8]))
 | 
						|
		log.Trace("chunk", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs)
 | 
						|
 | 
						|
		// if chunk is to be removed
 | 
						|
		if f(c) {
 | 
						|
			log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs)
 | 
						|
			s.deleteNow(&index, getIndexKey(key[1:]), po)
 | 
						|
			removed++
 | 
						|
			errorsFound++
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed))
 | 
						|
}
 | 
						|
 | 
						|
// CleanGCIndex rebuilds the garbage collector index from scratch, while
 | 
						|
// removing inconsistent elements, e.g., indices with missing data chunks.
 | 
						|
// WARN: it's a pretty heavy, long running function.
 | 
						|
func (s *LDBStore) CleanGCIndex() error {
 | 
						|
	s.lock.Lock()
 | 
						|
	defer s.lock.Unlock()
 | 
						|
 | 
						|
	batch := leveldb.Batch{}
 | 
						|
 | 
						|
	var okEntryCount uint64
 | 
						|
	var totalEntryCount uint64
 | 
						|
 | 
						|
	// throw out all gc indices, we will rebuild from cleaned index
 | 
						|
	it := s.db.NewIterator()
 | 
						|
	it.Seek([]byte{keyGCIdx})
 | 
						|
	var gcDeletes int
 | 
						|
	for it.Valid() {
 | 
						|
		rowType, _ := parseIdxKey(it.Key())
 | 
						|
		if rowType != keyGCIdx {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		batch.Delete(it.Key())
 | 
						|
		gcDeletes++
 | 
						|
		it.Next()
 | 
						|
	}
 | 
						|
	log.Debug("gc", "deletes", gcDeletes)
 | 
						|
	if err := s.db.Write(&batch); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	batch.Reset()
 | 
						|
 | 
						|
	it.Release()
 | 
						|
 | 
						|
	// corrected po index pointer values
 | 
						|
	var poPtrs [256]uint64
 | 
						|
 | 
						|
	// set to true if chunk count not on 4096 iteration boundary
 | 
						|
	var doneIterating bool
 | 
						|
 | 
						|
	// last key index in previous iteration
 | 
						|
	lastIdxKey := []byte{keyIndex}
 | 
						|
 | 
						|
	// counter for debug output
 | 
						|
	var cleanBatchCount int
 | 
						|
 | 
						|
	// go through all key index entries
 | 
						|
	for !doneIterating {
 | 
						|
		cleanBatchCount++
 | 
						|
		var idxs []dpaDBIndex
 | 
						|
		var chunkHashes [][]byte
 | 
						|
		var pos []uint8
 | 
						|
		it := s.db.NewIterator()
 | 
						|
 | 
						|
		it.Seek(lastIdxKey)
 | 
						|
 | 
						|
		// 4096 is just a nice number, don't look for any hidden meaning here...
 | 
						|
		var i int
 | 
						|
		for i = 0; i < 4096; i++ {
 | 
						|
 | 
						|
			// this really shouldn't happen unless database is empty
 | 
						|
			// but let's keep it to be safe
 | 
						|
			if !it.Valid() {
 | 
						|
				doneIterating = true
 | 
						|
				break
 | 
						|
			}
 | 
						|
 | 
						|
			// if it's not keyindex anymore we're done iterating
 | 
						|
			rowType, chunkHash := parseIdxKey(it.Key())
 | 
						|
			if rowType != keyIndex {
 | 
						|
				doneIterating = true
 | 
						|
				break
 | 
						|
			}
 | 
						|
 | 
						|
			// decode the retrieved index
 | 
						|
			var idx dpaDBIndex
 | 
						|
			err := decodeIndex(it.Value(), &idx)
 | 
						|
			if err != nil {
 | 
						|
				return fmt.Errorf("corrupt index: %v", err)
 | 
						|
			}
 | 
						|
			po := s.po(chunkHash)
 | 
						|
			lastIdxKey = it.Key()
 | 
						|
 | 
						|
			// if we don't find the data key, remove the entry
 | 
						|
			// if we find it, add to the array of new gc indices to create
 | 
						|
			dataKey := getDataKey(idx.Idx, po)
 | 
						|
			_, err = s.db.Get(dataKey)
 | 
						|
			if err != nil {
 | 
						|
				log.Warn("deleting inconsistent index (missing data)", "key", chunkHash)
 | 
						|
				batch.Delete(it.Key())
 | 
						|
			} else {
 | 
						|
				idxs = append(idxs, idx)
 | 
						|
				chunkHashes = append(chunkHashes, chunkHash)
 | 
						|
				pos = append(pos, po)
 | 
						|
				okEntryCount++
 | 
						|
				if idx.Idx > poPtrs[po] {
 | 
						|
					poPtrs[po] = idx.Idx
 | 
						|
				}
 | 
						|
			}
 | 
						|
			totalEntryCount++
 | 
						|
			it.Next()
 | 
						|
		}
 | 
						|
		it.Release()
 | 
						|
 | 
						|
		// flush the key index corrections
 | 
						|
		err := s.db.Write(&batch)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		batch.Reset()
 | 
						|
 | 
						|
		// add correct gc indices
 | 
						|
		for i, okIdx := range idxs {
 | 
						|
			gcIdxKey := getGCIdxKey(&okIdx)
 | 
						|
			gcIdxData := getGCIdxValue(&okIdx, pos[i], chunkHashes[i])
 | 
						|
			batch.Put(gcIdxKey, gcIdxData)
 | 
						|
			log.Trace("clean ok", "key", chunkHashes[i], "gcKey", gcIdxKey, "gcData", gcIdxData)
 | 
						|
		}
 | 
						|
 | 
						|
		// flush them
 | 
						|
		err = s.db.Write(&batch)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		batch.Reset()
 | 
						|
 | 
						|
		log.Debug("clean gc index pass", "batch", cleanBatchCount, "checked", i, "kept", len(idxs))
 | 
						|
	}
 | 
						|
 | 
						|
	log.Debug("gc cleanup entries", "ok", okEntryCount, "total", totalEntryCount, "batchlen", batch.Len())
 | 
						|
 | 
						|
	// lastly add updated entry count
 | 
						|
	var entryCount [8]byte
 | 
						|
	binary.BigEndian.PutUint64(entryCount[:], okEntryCount)
 | 
						|
	batch.Put(keyEntryCnt, entryCount[:])
 | 
						|
 | 
						|
	// and add the new po index pointers
 | 
						|
	var poKey [2]byte
 | 
						|
	poKey[0] = keyDistanceCnt
 | 
						|
	for i, poPtr := range poPtrs {
 | 
						|
		poKey[1] = uint8(i)
 | 
						|
		if poPtr == 0 {
 | 
						|
			batch.Delete(poKey[:])
 | 
						|
		} else {
 | 
						|
			var idxCount [8]byte
 | 
						|
			binary.BigEndian.PutUint64(idxCount[:], poPtr)
 | 
						|
			batch.Put(poKey[:], idxCount[:])
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// if you made it this far your harddisk has survived. Congratulations
 | 
						|
	return s.db.Write(&batch)
 | 
						|
}
 | 
						|
 | 
						|
// Delete is removes a chunk and updates indices.
 | 
						|
// Is thread safe
 | 
						|
func (s *LDBStore) Delete(addr Address) error {
 | 
						|
	s.lock.Lock()
 | 
						|
	defer s.lock.Unlock()
 | 
						|
 | 
						|
	ikey := getIndexKey(addr)
 | 
						|
 | 
						|
	idata, err := s.db.Get(ikey)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	var idx dpaDBIndex
 | 
						|
	decodeIndex(idata, &idx)
 | 
						|
	proximity := s.po(addr)
 | 
						|
	return s.deleteNow(&idx, ikey, proximity)
 | 
						|
}
 | 
						|
 | 
						|
// executes one delete operation immediately
 | 
						|
// see *LDBStore.delete
 | 
						|
func (s *LDBStore) deleteNow(idx *dpaDBIndex, idxKey []byte, po uint8) error {
 | 
						|
	batch := new(leveldb.Batch)
 | 
						|
	s.delete(batch, idx, idxKey, po)
 | 
						|
	return s.db.Write(batch)
 | 
						|
}
 | 
						|
 | 
						|
// adds a delete chunk operation to the provided batch
 | 
						|
// if called directly, decrements entrycount regardless if the chunk exists upon deletion. Risk of wrap to max uint64
 | 
						|
func (s *LDBStore) delete(batch *leveldb.Batch, idx *dpaDBIndex, idxKey []byte, po uint8) {
 | 
						|
	metrics.GetOrRegisterCounter("ldbstore.delete", nil).Inc(1)
 | 
						|
 | 
						|
	gcIdxKey := getGCIdxKey(idx)
 | 
						|
	batch.Delete(gcIdxKey)
 | 
						|
	dataKey := getDataKey(idx.Idx, po)
 | 
						|
	batch.Delete(dataKey)
 | 
						|
	batch.Delete(idxKey)
 | 
						|
	s.entryCnt--
 | 
						|
	dbEntryCount.Dec(1)
 | 
						|
	cntKey := make([]byte, 2)
 | 
						|
	cntKey[0] = keyDistanceCnt
 | 
						|
	cntKey[1] = po
 | 
						|
	batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
 | 
						|
	batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))
 | 
						|
}
 | 
						|
 | 
						|
func (s *LDBStore) BinIndex(po uint8) uint64 {
 | 
						|
	s.lock.RLock()
 | 
						|
	defer s.lock.RUnlock()
 | 
						|
	return s.bucketCnt[po]
 | 
						|
}
 | 
						|
 | 
						|
// Put adds a chunk to the database, adding indices and incrementing global counters.
 | 
						|
// If it already exists, it merely increments the access count of the existing entry.
 | 
						|
// Is thread safe
 | 
						|
func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error {
 | 
						|
	metrics.GetOrRegisterCounter("ldbstore.put", nil).Inc(1)
 | 
						|
	log.Trace("ldbstore.put", "key", chunk.Address())
 | 
						|
 | 
						|
	ikey := getIndexKey(chunk.Address())
 | 
						|
	var index dpaDBIndex
 | 
						|
 | 
						|
	po := s.po(chunk.Address())
 | 
						|
 | 
						|
	s.lock.Lock()
 | 
						|
 | 
						|
	if s.closed {
 | 
						|
		s.lock.Unlock()
 | 
						|
		return ErrDBClosed
 | 
						|
	}
 | 
						|
	batch := s.batch
 | 
						|
 | 
						|
	log.Trace("ldbstore.put: s.db.Get", "key", chunk.Address(), "ikey", fmt.Sprintf("%x", ikey))
 | 
						|
	_, err := s.db.Get(ikey)
 | 
						|
	if err != nil {
 | 
						|
		s.doPut(chunk, &index, po)
 | 
						|
	}
 | 
						|
	idata := encodeIndex(&index)
 | 
						|
	s.batch.Put(ikey, idata)
 | 
						|
 | 
						|
	// add the access-chunkindex index for garbage collection
 | 
						|
	gcIdxKey := getGCIdxKey(&index)
 | 
						|
	gcIdxData := getGCIdxValue(&index, po, chunk.Address())
 | 
						|
	s.batch.Put(gcIdxKey, gcIdxData)
 | 
						|
	s.lock.Unlock()
 | 
						|
 | 
						|
	select {
 | 
						|
	case s.batchesC <- struct{}{}:
 | 
						|
	default:
 | 
						|
	}
 | 
						|
 | 
						|
	select {
 | 
						|
	case <-batch.c:
 | 
						|
		return batch.err
 | 
						|
	case <-ctx.Done():
 | 
						|
		return ctx.Err()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// force putting into db, does not check or update necessary indices
 | 
						|
func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) {
 | 
						|
	data := s.encodeDataFunc(chunk)
 | 
						|
	dkey := getDataKey(s.dataIdx, po)
 | 
						|
	s.batch.Put(dkey, data)
 | 
						|
	index.Idx = s.dataIdx
 | 
						|
	s.bucketCnt[po] = s.dataIdx
 | 
						|
	s.entryCnt++
 | 
						|
	dbEntryCount.Inc(1)
 | 
						|
	s.dataIdx++
 | 
						|
	index.Access = s.accessCnt
 | 
						|
	s.accessCnt++
 | 
						|
	cntKey := make([]byte, 2)
 | 
						|
	cntKey[0] = keyDistanceCnt
 | 
						|
	cntKey[1] = po
 | 
						|
	s.batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))
 | 
						|
}
 | 
						|
 | 
						|
func (s *LDBStore) writeBatches() {
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-s.quit:
 | 
						|
			log.Debug("DbStore: quit batch write loop")
 | 
						|
			return
 | 
						|
		case <-s.batchesC:
 | 
						|
			err := s.writeCurrentBatch()
 | 
						|
			if err != nil {
 | 
						|
				log.Debug("DbStore: quit batch write loop", "err", err.Error())
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
func (s *LDBStore) writeCurrentBatch() error {
 | 
						|
	s.lock.Lock()
 | 
						|
	defer s.lock.Unlock()
 | 
						|
	b := s.batch
 | 
						|
	l := b.Len()
 | 
						|
	if l == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	s.batch = newBatch()
 | 
						|
	b.err = s.writeBatch(b, wEntryCnt|wAccessCnt|wIndexCnt)
 | 
						|
	close(b.c)
 | 
						|
	if s.entryCnt >= s.capacity {
 | 
						|
		go s.collectGarbage()
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// must be called non concurrently
 | 
						|
func (s *LDBStore) writeBatch(b *dbBatch, wFlag uint8) error {
 | 
						|
	if wFlag&wEntryCnt > 0 {
 | 
						|
		b.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
 | 
						|
	}
 | 
						|
	if wFlag&wIndexCnt > 0 {
 | 
						|
		b.Put(keyDataIdx, U64ToBytes(s.dataIdx))
 | 
						|
	}
 | 
						|
	if wFlag&wAccessCnt > 0 {
 | 
						|
		b.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
 | 
						|
	}
 | 
						|
	l := b.Len()
 | 
						|
	if err := s.db.Write(b.Batch); err != nil {
 | 
						|
		return fmt.Errorf("unable to write batch: %v", err)
 | 
						|
	}
 | 
						|
	log.Trace(fmt.Sprintf("batch write (%d entries)", l))
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// newMockEncodeDataFunc returns a function that stores the chunk data
 | 
						|
// to a mock store to bypass the default functionality encodeData.
 | 
						|
// The constructed function always returns the nil data, as DbStore does
 | 
						|
// not need to store the data, but still need to create the index.
 | 
						|
func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte {
 | 
						|
	return func(chunk Chunk) []byte {
 | 
						|
		if err := mockStore.Put(chunk.Address(), encodeData(chunk)); err != nil {
 | 
						|
			log.Error(fmt.Sprintf("%T: Chunk %v put: %v", mockStore, chunk.Address().Log(), err))
 | 
						|
		}
 | 
						|
		return chunk.Address()[:]
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// tryAccessIdx tries to find index entry. If found then increments the access
 | 
						|
// count for garbage collection and returns the index entry and true for found,
 | 
						|
// otherwise returns nil and false.
 | 
						|
func (s *LDBStore) tryAccessIdx(addr Address, po uint8) (*dpaDBIndex, bool) {
 | 
						|
	ikey := getIndexKey(addr)
 | 
						|
	idata, err := s.db.Get(ikey)
 | 
						|
	if err != nil {
 | 
						|
		return nil, false
 | 
						|
	}
 | 
						|
 | 
						|
	index := new(dpaDBIndex)
 | 
						|
	decodeIndex(idata, index)
 | 
						|
	oldGCIdxKey := getGCIdxKey(index)
 | 
						|
	s.batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
 | 
						|
	index.Access = s.accessCnt
 | 
						|
	idata = encodeIndex(index)
 | 
						|
	s.accessCnt++
 | 
						|
	s.batch.Put(ikey, idata)
 | 
						|
	newGCIdxKey := getGCIdxKey(index)
 | 
						|
	newGCIdxData := getGCIdxValue(index, po, ikey[1:])
 | 
						|
	s.batch.Delete(oldGCIdxKey)
 | 
						|
	s.batch.Put(newGCIdxKey, newGCIdxData)
 | 
						|
	select {
 | 
						|
	case s.batchesC <- struct{}{}:
 | 
						|
	default:
 | 
						|
	}
 | 
						|
	return index, true
 | 
						|
}
 | 
						|
 | 
						|
// GetSchema is returning the current named schema of the datastore as read from LevelDB
 | 
						|
func (s *LDBStore) GetSchema() (string, error) {
 | 
						|
	s.lock.Lock()
 | 
						|
	defer s.lock.Unlock()
 | 
						|
 | 
						|
	data, err := s.db.Get(keySchema)
 | 
						|
	if err != nil {
 | 
						|
		if err == leveldb.ErrNotFound {
 | 
						|
			return DbSchemaNone, nil
 | 
						|
		}
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
 | 
						|
	return string(data), nil
 | 
						|
}
 | 
						|
 | 
						|
// PutSchema is saving a named schema to the LevelDB datastore
 | 
						|
func (s *LDBStore) PutSchema(schema string) error {
 | 
						|
	s.lock.Lock()
 | 
						|
	defer s.lock.Unlock()
 | 
						|
 | 
						|
	return s.db.Put(keySchema, []byte(schema))
 | 
						|
}
 | 
						|
 | 
						|
// Get retrieves the chunk matching the provided key from the database.
 | 
						|
// If the chunk entry does not exist, it returns an error
 | 
						|
// Updates access count and is thread safe
 | 
						|
func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) {
 | 
						|
	metrics.GetOrRegisterCounter("ldbstore.get", nil).Inc(1)
 | 
						|
	log.Trace("ldbstore.get", "key", addr)
 | 
						|
 | 
						|
	s.lock.Lock()
 | 
						|
	defer s.lock.Unlock()
 | 
						|
	return s.get(addr)
 | 
						|
}
 | 
						|
 | 
						|
// Has queries the underlying DB if a chunk with the given address is stored
 | 
						|
// Returns true if the chunk is found, false if not
 | 
						|
func (s *LDBStore) Has(_ context.Context, addr Address) bool {
 | 
						|
	s.lock.RLock()
 | 
						|
	defer s.lock.RUnlock()
 | 
						|
 | 
						|
	ikey := getIndexKey(addr)
 | 
						|
	_, err := s.db.Get(ikey)
 | 
						|
 | 
						|
	return err == nil
 | 
						|
}
 | 
						|
 | 
						|
// TODO: To conform with other private methods of this object indices should not be updated
 | 
						|
func (s *LDBStore) get(addr Address) (chunk *chunk, err error) {
 | 
						|
	if s.closed {
 | 
						|
		return nil, ErrDBClosed
 | 
						|
	}
 | 
						|
	proximity := s.po(addr)
 | 
						|
	index, found := s.tryAccessIdx(addr, proximity)
 | 
						|
	if found {
 | 
						|
		var data []byte
 | 
						|
		if s.getDataFunc != nil {
 | 
						|
			// if getDataFunc is defined, use it to retrieve the chunk data
 | 
						|
			log.Trace("ldbstore.get retrieve with getDataFunc", "key", addr)
 | 
						|
			data, err = s.getDataFunc(addr)
 | 
						|
			if err != nil {
 | 
						|
				return
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			// default DbStore functionality to retrieve chunk data
 | 
						|
			datakey := getDataKey(index.Idx, proximity)
 | 
						|
			data, err = s.db.Get(datakey)
 | 
						|
			log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", index.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity)
 | 
						|
			if err != nil {
 | 
						|
				log.Trace("ldbstore.get chunk found but could not be accessed", "key", addr, "err", err)
 | 
						|
				s.deleteNow(index, getIndexKey(addr), s.po(addr))
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		return decodeData(addr, data)
 | 
						|
	} else {
 | 
						|
		err = ErrChunkNotFound
 | 
						|
	}
 | 
						|
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// newMockGetFunc returns a function that reads chunk data from
 | 
						|
// the mock database, which is used as the value for DbStore.getFunc
 | 
						|
// to bypass the default functionality of DbStore with a mock store.
 | 
						|
func newMockGetDataFunc(mockStore *mock.NodeStore) func(addr Address) (data []byte, err error) {
 | 
						|
	return func(addr Address) (data []byte, err error) {
 | 
						|
		data, err = mockStore.Get(addr)
 | 
						|
		if err == mock.ErrNotFound {
 | 
						|
			// preserve ErrChunkNotFound error
 | 
						|
			err = ErrChunkNotFound
 | 
						|
		}
 | 
						|
		return data, err
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *LDBStore) setCapacity(c uint64) {
 | 
						|
	s.lock.Lock()
 | 
						|
	defer s.lock.Unlock()
 | 
						|
 | 
						|
	s.capacity = c
 | 
						|
 | 
						|
	for s.entryCnt > c {
 | 
						|
		s.collectGarbage()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *LDBStore) Close() {
 | 
						|
	close(s.quit)
 | 
						|
	s.lock.Lock()
 | 
						|
	s.closed = true
 | 
						|
	s.lock.Unlock()
 | 
						|
	// force writing out current batch
 | 
						|
	s.writeCurrentBatch()
 | 
						|
	close(s.batchesC)
 | 
						|
	s.db.Close()
 | 
						|
}
 | 
						|
 | 
						|
// SyncIterator(start, stop, po, f) calls f on each hash of a bin po from start to stop
 | 
						|
func (s *LDBStore) SyncIterator(since uint64, until uint64, po uint8, f func(Address, uint64) bool) error {
 | 
						|
	metrics.GetOrRegisterCounter("ldbstore.synciterator", nil).Inc(1)
 | 
						|
 | 
						|
	sincekey := getDataKey(since, po)
 | 
						|
	untilkey := getDataKey(until, po)
 | 
						|
	it := s.db.NewIterator()
 | 
						|
	defer it.Release()
 | 
						|
 | 
						|
	for ok := it.Seek(sincekey); ok; ok = it.Next() {
 | 
						|
		metrics.GetOrRegisterCounter("ldbstore.synciterator.seek", nil).Inc(1)
 | 
						|
 | 
						|
		dbkey := it.Key()
 | 
						|
		if dbkey[0] != keyData || dbkey[1] != po || bytes.Compare(untilkey, dbkey) < 0 {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		key := make([]byte, 32)
 | 
						|
		val := it.Value()
 | 
						|
		copy(key, val[:32])
 | 
						|
		if !f(Address(key), binary.BigEndian.Uint64(dbkey[2:])) {
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return it.Error()
 | 
						|
}
 |