all: clean up and proerly abstract database access
This commit is contained in:
12
ethdb/.gitignore
vendored
12
ethdb/.gitignore
vendored
@ -1,12 +0,0 @@
|
||||
# See http://help.github.com/ignore-files/ for more about ignoring files.
|
||||
#
|
||||
# If you find yourself ignoring temporary files generated by your text editor
|
||||
# or operating system, you probably want to add a global ignore instead:
|
||||
# git config --global core.excludesfile ~/.gitignore_global
|
||||
|
||||
/tmp
|
||||
*/**/*un~
|
||||
*un~
|
||||
.DS_Store
|
||||
*/**/.DS_Store
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2014 The go-ethereum Authors
|
||||
// Copyright 2018 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
@ -16,37 +16,29 @@
|
||||
|
||||
package ethdb
|
||||
|
||||
// Code using batches should try to add this much data to the batch.
|
||||
// The value was determined empirically.
|
||||
// IdealBatchSize defines the size of the data batches should ideally add in one
|
||||
// write.
|
||||
const IdealBatchSize = 100 * 1024
|
||||
|
||||
// Putter wraps the database write operation supported by both batches and regular databases.
|
||||
type Putter interface {
|
||||
Put(key []byte, value []byte) error
|
||||
}
|
||||
|
||||
// Deleter wraps the database delete operation supported by both batches and regular databases.
|
||||
type Deleter interface {
|
||||
Delete(key []byte) error
|
||||
}
|
||||
|
||||
// Database wraps all database operations. All methods are safe for concurrent use.
|
||||
type Database interface {
|
||||
Putter
|
||||
Deleter
|
||||
Get(key []byte) ([]byte, error)
|
||||
Has(key []byte) (bool, error)
|
||||
Close()
|
||||
NewBatch() Batch
|
||||
}
|
||||
|
||||
// Batch is a write-only database that commits changes to its host database
|
||||
// when Write is called. Batch cannot be used concurrently.
|
||||
// when Write is called. A batch cannot be used concurrently.
|
||||
type Batch interface {
|
||||
Putter
|
||||
Writer
|
||||
Deleter
|
||||
ValueSize() int // amount of data in the batch
|
||||
|
||||
// ValueSize retrieves the amount of data queued up for writing.
|
||||
ValueSize() int
|
||||
|
||||
// Write flushes any accumulated data to disk.
|
||||
Write() error
|
||||
|
||||
// Reset resets the batch for reuse
|
||||
Reset()
|
||||
}
|
||||
|
||||
// Batcher wraps the NewBatch method of a backing data store.
|
||||
type Batcher interface {
|
||||
// NewBatch creates a write-only database that buffers changes to its host db
|
||||
// until a final write is called.
|
||||
NewBatch() Batch
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2014 The go-ethereum Authors
|
||||
// Copyright 2018 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
@ -14,372 +14,72 @@
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// +build !js
|
||||
|
||||
// Package database defines the interfaces for an Ethereum data store.
|
||||
package ethdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
import "io"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/errors"
|
||||
"github.com/syndtr/goleveldb/leveldb/filter"
|
||||
"github.com/syndtr/goleveldb/leveldb/iterator"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
"github.com/syndtr/goleveldb/leveldb/util"
|
||||
)
|
||||
// Reader wraps the Has and Get method of a backing data store.
|
||||
type Reader interface {
|
||||
// Has retrieves if a key is present in the key-value data store.
|
||||
Has(key []byte) (bool, error)
|
||||
|
||||
const (
|
||||
writePauseWarningThrottler = 1 * time.Minute
|
||||
)
|
||||
|
||||
var OpenFileLimit = 64
|
||||
|
||||
type LDBDatabase struct {
|
||||
fn string // filename for reporting
|
||||
db *leveldb.DB // LevelDB instance
|
||||
|
||||
compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
|
||||
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
|
||||
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
|
||||
writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
|
||||
writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
|
||||
diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
|
||||
diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written
|
||||
|
||||
quitLock sync.Mutex // Mutex protecting the quit channel access
|
||||
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
|
||||
|
||||
log log.Logger // Contextual logger tracking the database path
|
||||
// Get retrieves the given key if it's present in the key-value data store.
|
||||
Get(key []byte) ([]byte, error)
|
||||
}
|
||||
|
||||
// NewLDBDatabase returns a LevelDB wrapped object.
|
||||
func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) {
|
||||
logger := log.New("database", file)
|
||||
|
||||
// Ensure we have some minimal caching and file guarantees
|
||||
if cache < 16 {
|
||||
cache = 16
|
||||
}
|
||||
if handles < 16 {
|
||||
handles = 16
|
||||
}
|
||||
logger.Info("Allocated cache and file handles", "cache", common.StorageSize(cache*1024*1024), "handles", handles)
|
||||
|
||||
// Open the db and recover any potential corruptions
|
||||
db, err := leveldb.OpenFile(file, &opt.Options{
|
||||
OpenFilesCacheCapacity: handles,
|
||||
BlockCacheCapacity: cache / 2 * opt.MiB,
|
||||
WriteBuffer: cache / 4 * opt.MiB, // Two of these are used internally
|
||||
Filter: filter.NewBloomFilter(10),
|
||||
})
|
||||
if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
|
||||
db, err = leveldb.RecoverFile(file, nil)
|
||||
}
|
||||
// (Re)check for errors and abort if opening of the db failed
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &LDBDatabase{
|
||||
fn: file,
|
||||
db: db,
|
||||
log: logger,
|
||||
}, nil
|
||||
// Writer wraps the Put method of a backing data store.
|
||||
type Writer interface {
|
||||
// Put inserts the given value into the key-value data store.
|
||||
Put(key []byte, value []byte) error
|
||||
}
|
||||
|
||||
// Path returns the path to the database directory.
|
||||
func (db *LDBDatabase) Path() string {
|
||||
return db.fn
|
||||
// Deleter wraps the Delete method of a backing data store.
|
||||
type Deleter interface {
|
||||
// Delete removes the key from the key-value data store.
|
||||
Delete(key []byte) error
|
||||
}
|
||||
|
||||
// Put puts the given key / value to the queue
|
||||
func (db *LDBDatabase) Put(key []byte, value []byte) error {
|
||||
return db.db.Put(key, value, nil)
|
||||
// Stater wraps the Stat method of a backing data store.
|
||||
type Stater interface {
|
||||
// Stat returns a particular internal stat of the database.
|
||||
Stat(property string) (string, error)
|
||||
}
|
||||
|
||||
func (db *LDBDatabase) Has(key []byte) (bool, error) {
|
||||
return db.db.Has(key, nil)
|
||||
// Compacter wraps the Compact method of a backing data store.
|
||||
type Compacter interface {
|
||||
// Compact flattens the underlying data store for the given key range. In essence,
|
||||
// deleted and overwritten versions are discarded, and the data is rearranged to
|
||||
// reduce the cost of operations needed to access them.
|
||||
//
|
||||
// A nil start is treated as a key before all keys in the data store; a nil limit
|
||||
// is treated as a key after all keys in the data store. If both is nil then it
|
||||
// will compact entire data store.
|
||||
Compact(start []byte, limit []byte) error
|
||||
}
|
||||
|
||||
// Get returns the given key if it's present.
|
||||
func (db *LDBDatabase) Get(key []byte) ([]byte, error) {
|
||||
dat, err := db.db.Get(key, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dat, nil
|
||||
// KeyValueStore contains all the methods required to allow handling different
|
||||
// key-value data stores backing the high level database.
|
||||
type KeyValueStore interface {
|
||||
Reader
|
||||
Writer
|
||||
Deleter
|
||||
Batcher
|
||||
Iteratee
|
||||
Stater
|
||||
Compacter
|
||||
io.Closer
|
||||
}
|
||||
|
||||
// Delete deletes the key from the queue and database
|
||||
func (db *LDBDatabase) Delete(key []byte) error {
|
||||
return db.db.Delete(key, nil)
|
||||
}
|
||||
|
||||
func (db *LDBDatabase) NewIterator() iterator.Iterator {
|
||||
return db.db.NewIterator(nil, nil)
|
||||
}
|
||||
|
||||
// NewIteratorWithPrefix returns a iterator to iterate over subset of database content with a particular prefix.
|
||||
func (db *LDBDatabase) NewIteratorWithPrefix(prefix []byte) iterator.Iterator {
|
||||
return db.db.NewIterator(util.BytesPrefix(prefix), nil)
|
||||
}
|
||||
|
||||
func (db *LDBDatabase) Close() {
|
||||
// Stop the metrics collection to avoid internal database races
|
||||
db.quitLock.Lock()
|
||||
defer db.quitLock.Unlock()
|
||||
|
||||
if db.quitChan != nil {
|
||||
errc := make(chan error)
|
||||
db.quitChan <- errc
|
||||
if err := <-errc; err != nil {
|
||||
db.log.Error("Metrics collection failed", "err", err)
|
||||
}
|
||||
db.quitChan = nil
|
||||
}
|
||||
err := db.db.Close()
|
||||
if err == nil {
|
||||
db.log.Info("Database closed")
|
||||
} else {
|
||||
db.log.Error("Failed to close database", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (db *LDBDatabase) LDB() *leveldb.DB {
|
||||
return db.db
|
||||
}
|
||||
|
||||
// Meter configures the database metrics collectors and
|
||||
func (db *LDBDatabase) Meter(prefix string) {
|
||||
// Initialize all the metrics collector at the requested prefix
|
||||
db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil)
|
||||
db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil)
|
||||
db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil)
|
||||
db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil)
|
||||
db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil)
|
||||
db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
|
||||
db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)
|
||||
|
||||
// Create a quit channel for the periodic collector and run it
|
||||
db.quitLock.Lock()
|
||||
db.quitChan = make(chan chan error)
|
||||
db.quitLock.Unlock()
|
||||
|
||||
go db.meter(3 * time.Second)
|
||||
}
|
||||
|
||||
// meter periodically retrieves internal leveldb counters and reports them to
|
||||
// the metrics subsystem.
|
||||
//
|
||||
// This is how a stats table look like (currently):
|
||||
// Compactions
|
||||
// Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)
|
||||
// -------+------------+---------------+---------------+---------------+---------------
|
||||
// 0 | 0 | 0.00000 | 1.27969 | 0.00000 | 12.31098
|
||||
// 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294
|
||||
// 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884
|
||||
// 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000
|
||||
//
|
||||
// This is how the write delay look like (currently):
|
||||
// DelayN:5 Delay:406.604657ms Paused: false
|
||||
//
|
||||
// This is how the iostats look like (currently):
|
||||
// Read(MB):3895.04860 Write(MB):3654.64712
|
||||
func (db *LDBDatabase) meter(refresh time.Duration) {
|
||||
// Create the counters to store current and previous compaction values
|
||||
compactions := make([][]float64, 2)
|
||||
for i := 0; i < 2; i++ {
|
||||
compactions[i] = make([]float64, 3)
|
||||
}
|
||||
// Create storage for iostats.
|
||||
var iostats [2]float64
|
||||
|
||||
// Create storage and warning log tracer for write delay.
|
||||
var (
|
||||
delaystats [2]int64
|
||||
lastWritePaused time.Time
|
||||
)
|
||||
|
||||
var (
|
||||
errc chan error
|
||||
merr error
|
||||
)
|
||||
|
||||
// Iterate ad infinitum and collect the stats
|
||||
for i := 1; errc == nil && merr == nil; i++ {
|
||||
// Retrieve the database stats
|
||||
stats, err := db.db.GetProperty("leveldb.stats")
|
||||
if err != nil {
|
||||
db.log.Error("Failed to read database stats", "err", err)
|
||||
merr = err
|
||||
continue
|
||||
}
|
||||
// Find the compaction table, skip the header
|
||||
lines := strings.Split(stats, "\n")
|
||||
for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" {
|
||||
lines = lines[1:]
|
||||
}
|
||||
if len(lines) <= 3 {
|
||||
db.log.Error("Compaction table not found")
|
||||
merr = errors.New("compaction table not found")
|
||||
continue
|
||||
}
|
||||
lines = lines[3:]
|
||||
|
||||
// Iterate over all the table rows, and accumulate the entries
|
||||
for j := 0; j < len(compactions[i%2]); j++ {
|
||||
compactions[i%2][j] = 0
|
||||
}
|
||||
for _, line := range lines {
|
||||
parts := strings.Split(line, "|")
|
||||
if len(parts) != 6 {
|
||||
break
|
||||
}
|
||||
for idx, counter := range parts[3:] {
|
||||
value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64)
|
||||
if err != nil {
|
||||
db.log.Error("Compaction entry parsing failed", "err", err)
|
||||
merr = err
|
||||
continue
|
||||
}
|
||||
compactions[i%2][idx] += value
|
||||
}
|
||||
}
|
||||
// Update all the requested meters
|
||||
if db.compTimeMeter != nil {
|
||||
db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
|
||||
}
|
||||
if db.compReadMeter != nil {
|
||||
db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
|
||||
}
|
||||
if db.compWriteMeter != nil {
|
||||
db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
|
||||
}
|
||||
|
||||
// Retrieve the write delay statistic
|
||||
writedelay, err := db.db.GetProperty("leveldb.writedelay")
|
||||
if err != nil {
|
||||
db.log.Error("Failed to read database write delay statistic", "err", err)
|
||||
merr = err
|
||||
continue
|
||||
}
|
||||
var (
|
||||
delayN int64
|
||||
delayDuration string
|
||||
duration time.Duration
|
||||
paused bool
|
||||
)
|
||||
if n, err := fmt.Sscanf(writedelay, "DelayN:%d Delay:%s Paused:%t", &delayN, &delayDuration, &paused); n != 3 || err != nil {
|
||||
db.log.Error("Write delay statistic not found")
|
||||
merr = err
|
||||
continue
|
||||
}
|
||||
duration, err = time.ParseDuration(delayDuration)
|
||||
if err != nil {
|
||||
db.log.Error("Failed to parse delay duration", "err", err)
|
||||
merr = err
|
||||
continue
|
||||
}
|
||||
if db.writeDelayNMeter != nil {
|
||||
db.writeDelayNMeter.Mark(delayN - delaystats[0])
|
||||
}
|
||||
if db.writeDelayMeter != nil {
|
||||
db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
|
||||
}
|
||||
// If a warning that db is performing compaction has been displayed, any subsequent
|
||||
// warnings will be withheld for one minute not to overwhelm the user.
|
||||
if paused && delayN-delaystats[0] == 0 && duration.Nanoseconds()-delaystats[1] == 0 &&
|
||||
time.Now().After(lastWritePaused.Add(writePauseWarningThrottler)) {
|
||||
db.log.Warn("Database compacting, degraded performance")
|
||||
lastWritePaused = time.Now()
|
||||
}
|
||||
delaystats[0], delaystats[1] = delayN, duration.Nanoseconds()
|
||||
|
||||
// Retrieve the database iostats.
|
||||
ioStats, err := db.db.GetProperty("leveldb.iostats")
|
||||
if err != nil {
|
||||
db.log.Error("Failed to read database iostats", "err", err)
|
||||
merr = err
|
||||
continue
|
||||
}
|
||||
var nRead, nWrite float64
|
||||
parts := strings.Split(ioStats, " ")
|
||||
if len(parts) < 2 {
|
||||
db.log.Error("Bad syntax of ioStats", "ioStats", ioStats)
|
||||
merr = fmt.Errorf("bad syntax of ioStats %s", ioStats)
|
||||
continue
|
||||
}
|
||||
if n, err := fmt.Sscanf(parts[0], "Read(MB):%f", &nRead); n != 1 || err != nil {
|
||||
db.log.Error("Bad syntax of read entry", "entry", parts[0])
|
||||
merr = err
|
||||
continue
|
||||
}
|
||||
if n, err := fmt.Sscanf(parts[1], "Write(MB):%f", &nWrite); n != 1 || err != nil {
|
||||
db.log.Error("Bad syntax of write entry", "entry", parts[1])
|
||||
merr = err
|
||||
continue
|
||||
}
|
||||
if db.diskReadMeter != nil {
|
||||
db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
|
||||
}
|
||||
if db.diskWriteMeter != nil {
|
||||
db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
|
||||
}
|
||||
iostats[0], iostats[1] = nRead, nWrite
|
||||
|
||||
// Sleep a bit, then repeat the stats collection
|
||||
select {
|
||||
case errc = <-db.quitChan:
|
||||
// Quit requesting, stop hammering the database
|
||||
case <-time.After(refresh):
|
||||
// Timeout, gather a new set of stats
|
||||
}
|
||||
}
|
||||
|
||||
if errc == nil {
|
||||
errc = <-db.quitChan
|
||||
}
|
||||
errc <- merr
|
||||
}
|
||||
|
||||
func (db *LDBDatabase) NewBatch() Batch {
|
||||
return &ldbBatch{db: db.db, b: new(leveldb.Batch)}
|
||||
}
|
||||
|
||||
type ldbBatch struct {
|
||||
db *leveldb.DB
|
||||
b *leveldb.Batch
|
||||
size int
|
||||
}
|
||||
|
||||
func (b *ldbBatch) Put(key, value []byte) error {
|
||||
b.b.Put(key, value)
|
||||
b.size += len(value)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *ldbBatch) Delete(key []byte) error {
|
||||
b.b.Delete(key)
|
||||
b.size += 1
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *ldbBatch) Write() error {
|
||||
return b.db.Write(b.b, nil)
|
||||
}
|
||||
|
||||
func (b *ldbBatch) ValueSize() int {
|
||||
return b.size
|
||||
}
|
||||
|
||||
func (b *ldbBatch) Reset() {
|
||||
b.b.Reset()
|
||||
b.size = 0
|
||||
// Database contains all the methods required by the high level database to not
|
||||
// only access the key-value data store but also the chain freezer.
|
||||
type Database interface {
|
||||
Reader
|
||||
Writer
|
||||
Deleter
|
||||
Batcher
|
||||
Iteratee
|
||||
Stater
|
||||
Compacter
|
||||
io.Closer
|
||||
}
|
||||
|
@ -1,68 +0,0 @@
|
||||
// Copyright 2014 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// +build js
|
||||
|
||||
package ethdb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
var errNotSupported = errors.New("ethdb: not supported")
|
||||
|
||||
type LDBDatabase struct {
|
||||
}
|
||||
|
||||
// NewLDBDatabase returns a LevelDB wrapped object.
|
||||
func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) {
|
||||
return nil, errNotSupported
|
||||
}
|
||||
|
||||
// Path returns the path to the database directory.
|
||||
func (db *LDBDatabase) Path() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// Put puts the given key / value to the queue
|
||||
func (db *LDBDatabase) Put(key []byte, value []byte) error {
|
||||
return errNotSupported
|
||||
}
|
||||
|
||||
func (db *LDBDatabase) Has(key []byte) (bool, error) {
|
||||
return false, errNotSupported
|
||||
}
|
||||
|
||||
// Get returns the given key if it's present.
|
||||
func (db *LDBDatabase) Get(key []byte) ([]byte, error) {
|
||||
return nil, errNotSupported
|
||||
}
|
||||
|
||||
// Delete deletes the key from the queue and database
|
||||
func (db *LDBDatabase) Delete(key []byte) error {
|
||||
return errNotSupported
|
||||
}
|
||||
|
||||
func (db *LDBDatabase) Close() {
|
||||
}
|
||||
|
||||
// Meter configures the database metrics collectors and
|
||||
func (db *LDBDatabase) Meter(prefix string) {
|
||||
}
|
||||
|
||||
func (db *LDBDatabase) NewBatch() Batch {
|
||||
return nil
|
||||
}
|
@ -1,25 +0,0 @@
|
||||
// Copyright 2014 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// +build js
|
||||
|
||||
package ethdb_test
|
||||
|
||||
import (
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
)
|
||||
|
||||
var _ ethdb.Database = ðdb.LDBDatabase{}
|
@ -1,214 +0,0 @@
|
||||
// Copyright 2014 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// +build !js
|
||||
|
||||
package ethdb_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
)
|
||||
|
||||
func newTestLDB() (*ethdb.LDBDatabase, func()) {
|
||||
dirname, err := ioutil.TempDir(os.TempDir(), "ethdb_test_")
|
||||
if err != nil {
|
||||
panic("failed to create test file: " + err.Error())
|
||||
}
|
||||
db, err := ethdb.NewLDBDatabase(dirname, 0, 0)
|
||||
if err != nil {
|
||||
panic("failed to create test database: " + err.Error())
|
||||
}
|
||||
|
||||
return db, func() {
|
||||
db.Close()
|
||||
os.RemoveAll(dirname)
|
||||
}
|
||||
}
|
||||
|
||||
var test_values = []string{"", "a", "1251", "\x00123\x00"}
|
||||
|
||||
func TestLDB_PutGet(t *testing.T) {
|
||||
db, remove := newTestLDB()
|
||||
defer remove()
|
||||
testPutGet(db, t)
|
||||
}
|
||||
|
||||
func TestMemoryDB_PutGet(t *testing.T) {
|
||||
testPutGet(ethdb.NewMemDatabase(), t)
|
||||
}
|
||||
|
||||
func testPutGet(db ethdb.Database, t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
for _, k := range test_values {
|
||||
err := db.Put([]byte(k), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("put failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, k := range test_values {
|
||||
data, err := db.Get([]byte(k))
|
||||
if err != nil {
|
||||
t.Fatalf("get failed: %v", err)
|
||||
}
|
||||
if len(data) != 0 {
|
||||
t.Fatalf("get returned wrong result, got %q expected nil", string(data))
|
||||
}
|
||||
}
|
||||
|
||||
_, err := db.Get([]byte("non-exist-key"))
|
||||
if err == nil {
|
||||
t.Fatalf("expect to return a not found error")
|
||||
}
|
||||
|
||||
for _, v := range test_values {
|
||||
err := db.Put([]byte(v), []byte(v))
|
||||
if err != nil {
|
||||
t.Fatalf("put failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, v := range test_values {
|
||||
data, err := db.Get([]byte(v))
|
||||
if err != nil {
|
||||
t.Fatalf("get failed: %v", err)
|
||||
}
|
||||
if !bytes.Equal(data, []byte(v)) {
|
||||
t.Fatalf("get returned wrong result, got %q expected %q", string(data), v)
|
||||
}
|
||||
}
|
||||
|
||||
for _, v := range test_values {
|
||||
err := db.Put([]byte(v), []byte("?"))
|
||||
if err != nil {
|
||||
t.Fatalf("put override failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, v := range test_values {
|
||||
data, err := db.Get([]byte(v))
|
||||
if err != nil {
|
||||
t.Fatalf("get failed: %v", err)
|
||||
}
|
||||
if !bytes.Equal(data, []byte("?")) {
|
||||
t.Fatalf("get returned wrong result, got %q expected ?", string(data))
|
||||
}
|
||||
}
|
||||
|
||||
for _, v := range test_values {
|
||||
orig, err := db.Get([]byte(v))
|
||||
if err != nil {
|
||||
t.Fatalf("get failed: %v", err)
|
||||
}
|
||||
orig[0] = byte(0xff)
|
||||
data, err := db.Get([]byte(v))
|
||||
if err != nil {
|
||||
t.Fatalf("get failed: %v", err)
|
||||
}
|
||||
if !bytes.Equal(data, []byte("?")) {
|
||||
t.Fatalf("get returned wrong result, got %q expected ?", string(data))
|
||||
}
|
||||
}
|
||||
|
||||
for _, v := range test_values {
|
||||
err := db.Delete([]byte(v))
|
||||
if err != nil {
|
||||
t.Fatalf("delete %q failed: %v", v, err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, v := range test_values {
|
||||
_, err := db.Get([]byte(v))
|
||||
if err == nil {
|
||||
t.Fatalf("got deleted value %q", v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLDB_ParallelPutGet(t *testing.T) {
|
||||
db, remove := newTestLDB()
|
||||
defer remove()
|
||||
testParallelPutGet(db, t)
|
||||
}
|
||||
|
||||
func TestMemoryDB_ParallelPutGet(t *testing.T) {
|
||||
testParallelPutGet(ethdb.NewMemDatabase(), t)
|
||||
}
|
||||
|
||||
func testParallelPutGet(db ethdb.Database, t *testing.T) {
|
||||
const n = 8
|
||||
var pending sync.WaitGroup
|
||||
|
||||
pending.Add(n)
|
||||
for i := 0; i < n; i++ {
|
||||
go func(key string) {
|
||||
defer pending.Done()
|
||||
err := db.Put([]byte(key), []byte("v"+key))
|
||||
if err != nil {
|
||||
panic("put failed: " + err.Error())
|
||||
}
|
||||
}(strconv.Itoa(i))
|
||||
}
|
||||
pending.Wait()
|
||||
|
||||
pending.Add(n)
|
||||
for i := 0; i < n; i++ {
|
||||
go func(key string) {
|
||||
defer pending.Done()
|
||||
data, err := db.Get([]byte(key))
|
||||
if err != nil {
|
||||
panic("get failed: " + err.Error())
|
||||
}
|
||||
if !bytes.Equal(data, []byte("v"+key)) {
|
||||
panic(fmt.Sprintf("get failed, got %q expected %q", []byte(data), []byte("v"+key)))
|
||||
}
|
||||
}(strconv.Itoa(i))
|
||||
}
|
||||
pending.Wait()
|
||||
|
||||
pending.Add(n)
|
||||
for i := 0; i < n; i++ {
|
||||
go func(key string) {
|
||||
defer pending.Done()
|
||||
err := db.Delete([]byte(key))
|
||||
if err != nil {
|
||||
panic("delete failed: " + err.Error())
|
||||
}
|
||||
}(strconv.Itoa(i))
|
||||
}
|
||||
pending.Wait()
|
||||
|
||||
pending.Add(n)
|
||||
for i := 0; i < n; i++ {
|
||||
go func(key string) {
|
||||
defer pending.Done()
|
||||
_, err := db.Get([]byte(key))
|
||||
if err == nil {
|
||||
panic("get succeeded")
|
||||
}
|
||||
}(strconv.Itoa(i))
|
||||
}
|
||||
pending.Wait()
|
||||
}
|
61
ethdb/iterator.go
Normal file
61
ethdb/iterator.go
Normal file
@ -0,0 +1,61 @@
|
||||
// Copyright 2018 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package ethdb
|
||||
|
||||
// Iterator iterates over a database's key/value pairs in ascending key order.
|
||||
//
|
||||
// When it encounters an error any seek will return false and will yield no key/
|
||||
// value pairs. The error can be queried by calling the Error method. Calling
|
||||
// Release is still necessary.
|
||||
//
|
||||
// An iterator must be released after use, but it is not necessary to read an
|
||||
// iterator until exhaustion. An iterator is not safe for concurrent use, but it
|
||||
// is safe to use multiple iterators concurrently.
|
||||
type Iterator interface {
|
||||
// Next moves the iterator to the next key/value pair. It returns whether the
|
||||
// iterator is exhausted.
|
||||
Next() bool
|
||||
|
||||
// Error returns any accumulated error. Exhausting all the key/value pairs
|
||||
// is not considered to be an error.
|
||||
Error() error
|
||||
|
||||
// Key returns the key of the current key/value pair, or nil if done. The caller
|
||||
// should not modify the contents of the returned slice, and its contents may
|
||||
// change on the next call to Next.
|
||||
Key() []byte
|
||||
|
||||
// Value returns the value of the current key/value pair, or nil if done. The
|
||||
// caller should not modify the contents of the returned slice, and its contents
|
||||
// may change on the next call to Next.
|
||||
Value() []byte
|
||||
|
||||
// Release releases associated resources. Release should always succeed and can
|
||||
// be called multiple times without causing error.
|
||||
Release()
|
||||
}
|
||||
|
||||
// Iteratee wraps the NewIterator methods of a backing data store.
|
||||
type Iteratee interface {
|
||||
// NewIterator creates a binary-alphabetical iterator over the entire keyspace
|
||||
// contained within the key-value database.
|
||||
NewIterator() Iterator
|
||||
|
||||
// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset
|
||||
// of database content with a particular key prefix.
|
||||
NewIteratorWithPrefix(prefix []byte) Iterator
|
||||
}
|
418
ethdb/leveldb/leveldb.go
Normal file
418
ethdb/leveldb/leveldb.go
Normal file
@ -0,0 +1,418 @@
|
||||
// Copyright 2014 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// +build !js
|
||||
|
||||
// Package leveldb implements the key-value database layer based on LevelDB.
|
||||
package leveldb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/errors"
|
||||
"github.com/syndtr/goleveldb/leveldb/filter"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
"github.com/syndtr/goleveldb/leveldb/util"
|
||||
)
|
||||
|
||||
const (
|
||||
// leveldbDegradationWarnInterval specifies how often warning should be printed
|
||||
// if the leveldb database cannot keep up with requested writes.
|
||||
leveldbDegradationWarnInterval = time.Minute
|
||||
|
||||
// leveldbMinCache is the minimum amount of memory in megabytes to allocate to
|
||||
// leveldb read and write caching, split half and half.
|
||||
leveldbMinCache = 16
|
||||
|
||||
// leveldbMinHandles is the minimum number of files handles to allocate to the
|
||||
// open database files.
|
||||
leveldbMinHandles = 16
|
||||
|
||||
// metricsGatheringInterval specifies the interval to retrieve leveldb database
|
||||
// compaction, io and pause stats to report to the user.
|
||||
metricsGatheringInterval = 3 * time.Second
|
||||
)
|
||||
|
||||
// LevelDBDatabase is a persistent key-value store. Apart from basic data storage
|
||||
// functionality it also supports batch writes and iterating over the keyspace in
|
||||
// binary-alphabetical order.
|
||||
type LevelDBDatabase struct {
|
||||
fn string // filename for reporting
|
||||
db *leveldb.DB // LevelDB instance
|
||||
|
||||
compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
|
||||
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
|
||||
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
|
||||
writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
|
||||
writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
|
||||
diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
|
||||
diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written
|
||||
|
||||
quitLock sync.Mutex // Mutex protecting the quit channel access
|
||||
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
|
||||
|
||||
log log.Logger // Contextual logger tracking the database path
|
||||
}
|
||||
|
||||
// New returns a wrapped LevelDB object. The namespace is the prefix that the
|
||||
// metrics reporting should use for surfacing internal stats.
|
||||
func New(file string, cache int, handles int, namespace string) (*LevelDBDatabase, error) {
|
||||
// Ensure we have some minimal caching and file guarantees
|
||||
if cache < leveldbMinCache {
|
||||
cache = leveldbMinCache
|
||||
}
|
||||
if handles < leveldbMinHandles {
|
||||
handles = leveldbMinHandles
|
||||
}
|
||||
logger := log.New("database", file)
|
||||
logger.Info("Allocated cache and file handles", "cache", common.StorageSize(cache*1024*1024), "handles", handles)
|
||||
|
||||
// Open the db and recover any potential corruptions
|
||||
db, err := leveldb.OpenFile(file, &opt.Options{
|
||||
OpenFilesCacheCapacity: handles,
|
||||
BlockCacheCapacity: cache / 2 * opt.MiB,
|
||||
WriteBuffer: cache / 4 * opt.MiB, // Two of these are used internally
|
||||
Filter: filter.NewBloomFilter(10),
|
||||
})
|
||||
if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
|
||||
db, err = leveldb.RecoverFile(file, nil)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Assemble the wrapper with all the registered metrics
|
||||
ldb := &LevelDBDatabase{
|
||||
fn: file,
|
||||
db: db,
|
||||
log: logger,
|
||||
quitChan: make(chan chan error),
|
||||
}
|
||||
ldb.compTimeMeter = metrics.NewRegisteredMeter(namespace+"compact/time", nil)
|
||||
ldb.compReadMeter = metrics.NewRegisteredMeter(namespace+"compact/input", nil)
|
||||
ldb.compWriteMeter = metrics.NewRegisteredMeter(namespace+"compact/output", nil)
|
||||
ldb.diskReadMeter = metrics.NewRegisteredMeter(namespace+"disk/read", nil)
|
||||
ldb.diskWriteMeter = metrics.NewRegisteredMeter(namespace+"disk/write", nil)
|
||||
ldb.writeDelayMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/duration", nil)
|
||||
ldb.writeDelayNMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/counter", nil)
|
||||
|
||||
// Start up the metrics gathering and return
|
||||
go ldb.meter(metricsGatheringInterval)
|
||||
return ldb, nil
|
||||
}
|
||||
|
||||
// Close stops the metrics collection, flushes any pending data to disk and closes
|
||||
// all io accesses to the underlying key-value store.
|
||||
func (db *LevelDBDatabase) Close() error {
|
||||
db.quitLock.Lock()
|
||||
defer db.quitLock.Unlock()
|
||||
|
||||
if db.quitChan != nil {
|
||||
errc := make(chan error)
|
||||
db.quitChan <- errc
|
||||
if err := <-errc; err != nil {
|
||||
db.log.Error("Metrics collection failed", "err", err)
|
||||
}
|
||||
db.quitChan = nil
|
||||
}
|
||||
return db.db.Close()
|
||||
}
|
||||
|
||||
// Has retrieves if a key is present in the key-value store.
|
||||
func (db *LevelDBDatabase) Has(key []byte) (bool, error) {
|
||||
return db.db.Has(key, nil)
|
||||
}
|
||||
|
||||
// Get retrieves the given key if it's present in the key-value store.
|
||||
func (db *LevelDBDatabase) Get(key []byte) ([]byte, error) {
|
||||
dat, err := db.db.Get(key, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dat, nil
|
||||
}
|
||||
|
||||
// Put inserts the given value into the key-value store.
|
||||
func (db *LevelDBDatabase) Put(key []byte, value []byte) error {
|
||||
return db.db.Put(key, value, nil)
|
||||
}
|
||||
|
||||
// Delete removes the key from the key-value store.
|
||||
func (db *LevelDBDatabase) Delete(key []byte) error {
|
||||
return db.db.Delete(key, nil)
|
||||
}
|
||||
|
||||
// NewBatch creates a write-only key-value store that buffers changes to its host
|
||||
// database until a final write is called.
|
||||
func (db *LevelDBDatabase) NewBatch() ethdb.Batch {
|
||||
return &levelDBBatch{
|
||||
db: db.db,
|
||||
b: new(leveldb.Batch),
|
||||
}
|
||||
}
|
||||
|
||||
// NewIterator creates a binary-alphabetical iterator over the entire keyspace
|
||||
// contained within the leveldb database.
|
||||
func (db *LevelDBDatabase) NewIterator() ethdb.Iterator {
|
||||
return db.NewIteratorWithPrefix(nil)
|
||||
}
|
||||
|
||||
// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset
|
||||
// of database content with a particular key prefix.
|
||||
func (db *LevelDBDatabase) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator {
|
||||
return db.db.NewIterator(util.BytesPrefix(prefix), nil)
|
||||
}
|
||||
|
||||
// Stat returns a particular internal stat of the database.
|
||||
func (db *LevelDBDatabase) Stat(property string) (string, error) {
|
||||
return db.db.GetProperty(property)
|
||||
}
|
||||
|
||||
// Compact flattens the underlying data store for the given key range. In essence,
|
||||
// deleted and overwritten versions are discarded, and the data is rearranged to
|
||||
// reduce the cost of operations needed to access them.
|
||||
//
|
||||
// A nil start is treated as a key before all keys in the data store; a nil limit
|
||||
// is treated as a key after all keys in the data store. If both is nil then it
|
||||
// will compact entire data store.
|
||||
func (db *LevelDBDatabase) Compact(start []byte, limit []byte) error {
|
||||
return db.db.CompactRange(util.Range{Start: start, Limit: limit})
|
||||
}
|
||||
|
||||
// Path returns the path to the database directory.
|
||||
func (db *LevelDBDatabase) Path() string {
|
||||
return db.fn
|
||||
}
|
||||
|
||||
// meter periodically retrieves internal leveldb counters and reports them to
|
||||
// the metrics subsystem.
|
||||
//
|
||||
// This is how a LevelDB stats table looks like (currently):
|
||||
// Compactions
|
||||
// Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)
|
||||
// -------+------------+---------------+---------------+---------------+---------------
|
||||
// 0 | 0 | 0.00000 | 1.27969 | 0.00000 | 12.31098
|
||||
// 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294
|
||||
// 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884
|
||||
// 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000
|
||||
//
|
||||
// This is how the write delay look like (currently):
|
||||
// DelayN:5 Delay:406.604657ms Paused: false
|
||||
//
|
||||
// This is how the iostats look like (currently):
|
||||
// Read(MB):3895.04860 Write(MB):3654.64712
|
||||
func (db *LevelDBDatabase) meter(refresh time.Duration) {
|
||||
// Create the counters to store current and previous compaction values
|
||||
compactions := make([][]float64, 2)
|
||||
for i := 0; i < 2; i++ {
|
||||
compactions[i] = make([]float64, 3)
|
||||
}
|
||||
// Create storage for iostats.
|
||||
var iostats [2]float64
|
||||
|
||||
// Create storage and warning log tracer for write delay.
|
||||
var (
|
||||
delaystats [2]int64
|
||||
lastWritePaused time.Time
|
||||
)
|
||||
|
||||
var (
|
||||
errc chan error
|
||||
merr error
|
||||
)
|
||||
|
||||
// Iterate ad infinitum and collect the stats
|
||||
for i := 1; errc == nil && merr == nil; i++ {
|
||||
// Retrieve the database stats
|
||||
stats, err := db.db.GetProperty("leveldb.stats")
|
||||
if err != nil {
|
||||
db.log.Error("Failed to read database stats", "err", err)
|
||||
merr = err
|
||||
continue
|
||||
}
|
||||
// Find the compaction table, skip the header
|
||||
lines := strings.Split(stats, "\n")
|
||||
for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" {
|
||||
lines = lines[1:]
|
||||
}
|
||||
if len(lines) <= 3 {
|
||||
db.log.Error("Compaction leveldbTable not found")
|
||||
merr = errors.New("compaction leveldbTable not found")
|
||||
continue
|
||||
}
|
||||
lines = lines[3:]
|
||||
|
||||
// Iterate over all the leveldbTable rows, and accumulate the entries
|
||||
for j := 0; j < len(compactions[i%2]); j++ {
|
||||
compactions[i%2][j] = 0
|
||||
}
|
||||
for _, line := range lines {
|
||||
parts := strings.Split(line, "|")
|
||||
if len(parts) != 6 {
|
||||
break
|
||||
}
|
||||
for idx, counter := range parts[3:] {
|
||||
value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64)
|
||||
if err != nil {
|
||||
db.log.Error("Compaction entry parsing failed", "err", err)
|
||||
merr = err
|
||||
continue
|
||||
}
|
||||
compactions[i%2][idx] += value
|
||||
}
|
||||
}
|
||||
// Update all the requested meters
|
||||
if db.compTimeMeter != nil {
|
||||
db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
|
||||
}
|
||||
if db.compReadMeter != nil {
|
||||
db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
|
||||
}
|
||||
if db.compWriteMeter != nil {
|
||||
db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
|
||||
}
|
||||
|
||||
// Retrieve the write delay statistic
|
||||
writedelay, err := db.db.GetProperty("leveldb.writedelay")
|
||||
if err != nil {
|
||||
db.log.Error("Failed to read database write delay statistic", "err", err)
|
||||
merr = err
|
||||
continue
|
||||
}
|
||||
var (
|
||||
delayN int64
|
||||
delayDuration string
|
||||
duration time.Duration
|
||||
paused bool
|
||||
)
|
||||
if n, err := fmt.Sscanf(writedelay, "DelayN:%d Delay:%s Paused:%t", &delayN, &delayDuration, &paused); n != 3 || err != nil {
|
||||
db.log.Error("Write delay statistic not found")
|
||||
merr = err
|
||||
continue
|
||||
}
|
||||
duration, err = time.ParseDuration(delayDuration)
|
||||
if err != nil {
|
||||
db.log.Error("Failed to parse delay duration", "err", err)
|
||||
merr = err
|
||||
continue
|
||||
}
|
||||
if db.writeDelayNMeter != nil {
|
||||
db.writeDelayNMeter.Mark(delayN - delaystats[0])
|
||||
}
|
||||
if db.writeDelayMeter != nil {
|
||||
db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
|
||||
}
|
||||
// If a warning that db is performing compaction has been displayed, any subsequent
|
||||
// warnings will be withheld for one minute not to overwhelm the user.
|
||||
if paused && delayN-delaystats[0] == 0 && duration.Nanoseconds()-delaystats[1] == 0 &&
|
||||
time.Now().After(lastWritePaused.Add(leveldbDegradationWarnInterval)) {
|
||||
db.log.Warn("Database compacting, degraded performance")
|
||||
lastWritePaused = time.Now()
|
||||
}
|
||||
delaystats[0], delaystats[1] = delayN, duration.Nanoseconds()
|
||||
|
||||
// Retrieve the database iostats.
|
||||
ioStats, err := db.db.GetProperty("leveldb.iostats")
|
||||
if err != nil {
|
||||
db.log.Error("Failed to read database iostats", "err", err)
|
||||
merr = err
|
||||
continue
|
||||
}
|
||||
var nRead, nWrite float64
|
||||
parts := strings.Split(ioStats, " ")
|
||||
if len(parts) < 2 {
|
||||
db.log.Error("Bad syntax of ioStats", "ioStats", ioStats)
|
||||
merr = fmt.Errorf("bad syntax of ioStats %s", ioStats)
|
||||
continue
|
||||
}
|
||||
if n, err := fmt.Sscanf(parts[0], "Read(MB):%f", &nRead); n != 1 || err != nil {
|
||||
db.log.Error("Bad syntax of read entry", "entry", parts[0])
|
||||
merr = err
|
||||
continue
|
||||
}
|
||||
if n, err := fmt.Sscanf(parts[1], "Write(MB):%f", &nWrite); n != 1 || err != nil {
|
||||
db.log.Error("Bad syntax of write entry", "entry", parts[1])
|
||||
merr = err
|
||||
continue
|
||||
}
|
||||
if db.diskReadMeter != nil {
|
||||
db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
|
||||
}
|
||||
if db.diskWriteMeter != nil {
|
||||
db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
|
||||
}
|
||||
iostats[0], iostats[1] = nRead, nWrite
|
||||
|
||||
// Sleep a bit, then repeat the stats collection
|
||||
select {
|
||||
case errc = <-db.quitChan:
|
||||
// Quit requesting, stop hammering the database
|
||||
case <-time.After(refresh):
|
||||
// Timeout, gather a new set of stats
|
||||
}
|
||||
}
|
||||
|
||||
if errc == nil {
|
||||
errc = <-db.quitChan
|
||||
}
|
||||
errc <- merr
|
||||
}
|
||||
|
||||
// levelDBBatch is a write-only leveldb batch that commits changes to its host
|
||||
// database when Write is called. A batch cannot be used concurrently.
|
||||
type levelDBBatch struct {
|
||||
db *leveldb.DB
|
||||
b *leveldb.Batch
|
||||
size int
|
||||
}
|
||||
|
||||
// Put inserts the given value into the batch for later committing.
|
||||
func (b *levelDBBatch) Put(key, value []byte) error {
|
||||
b.b.Put(key, value)
|
||||
b.size += len(value)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete inserts the a key removal into the batch for later committing.
|
||||
func (b *levelDBBatch) Delete(key []byte) error {
|
||||
b.b.Delete(key)
|
||||
b.size += 1
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValueSize retrieves the amount of data queued up for writing.
|
||||
func (b *levelDBBatch) ValueSize() int {
|
||||
return b.size
|
||||
}
|
||||
|
||||
// Write flushes any accumulated data to disk.
|
||||
func (b *levelDBBatch) Write() error {
|
||||
return b.db.Write(b.b, nil)
|
||||
}
|
||||
|
||||
// Reset resets the batch for reuse.
|
||||
func (b *levelDBBatch) Reset() {
|
||||
b.b.Reset()
|
||||
b.size = 0
|
||||
}
|
@ -1,143 +0,0 @@
|
||||
// Copyright 2014 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package ethdb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
)
|
||||
|
||||
/*
|
||||
* This is a test memory database. Do not use for any production it does not get persisted
|
||||
*/
|
||||
type MemDatabase struct {
|
||||
db map[string][]byte
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
func NewMemDatabase() *MemDatabase {
|
||||
return &MemDatabase{
|
||||
db: make(map[string][]byte),
|
||||
}
|
||||
}
|
||||
|
||||
func NewMemDatabaseWithCap(size int) *MemDatabase {
|
||||
return &MemDatabase{
|
||||
db: make(map[string][]byte, size),
|
||||
}
|
||||
}
|
||||
|
||||
func (db *MemDatabase) Put(key []byte, value []byte) error {
|
||||
db.lock.Lock()
|
||||
defer db.lock.Unlock()
|
||||
|
||||
db.db[string(key)] = common.CopyBytes(value)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *MemDatabase) Has(key []byte) (bool, error) {
|
||||
db.lock.RLock()
|
||||
defer db.lock.RUnlock()
|
||||
|
||||
_, ok := db.db[string(key)]
|
||||
return ok, nil
|
||||
}
|
||||
|
||||
func (db *MemDatabase) Get(key []byte) ([]byte, error) {
|
||||
db.lock.RLock()
|
||||
defer db.lock.RUnlock()
|
||||
|
||||
if entry, ok := db.db[string(key)]; ok {
|
||||
return common.CopyBytes(entry), nil
|
||||
}
|
||||
return nil, errors.New("not found")
|
||||
}
|
||||
|
||||
func (db *MemDatabase) Keys() [][]byte {
|
||||
db.lock.RLock()
|
||||
defer db.lock.RUnlock()
|
||||
|
||||
keys := [][]byte{}
|
||||
for key := range db.db {
|
||||
keys = append(keys, []byte(key))
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
func (db *MemDatabase) Delete(key []byte) error {
|
||||
db.lock.Lock()
|
||||
defer db.lock.Unlock()
|
||||
|
||||
delete(db.db, string(key))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *MemDatabase) Close() {}
|
||||
|
||||
func (db *MemDatabase) NewBatch() Batch {
|
||||
return &memBatch{db: db}
|
||||
}
|
||||
|
||||
func (db *MemDatabase) Len() int { return len(db.db) }
|
||||
|
||||
type kv struct {
|
||||
k, v []byte
|
||||
del bool
|
||||
}
|
||||
|
||||
type memBatch struct {
|
||||
db *MemDatabase
|
||||
writes []kv
|
||||
size int
|
||||
}
|
||||
|
||||
func (b *memBatch) Put(key, value []byte) error {
|
||||
b.writes = append(b.writes, kv{common.CopyBytes(key), common.CopyBytes(value), false})
|
||||
b.size += len(value)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *memBatch) Delete(key []byte) error {
|
||||
b.writes = append(b.writes, kv{common.CopyBytes(key), nil, true})
|
||||
b.size += 1
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *memBatch) Write() error {
|
||||
b.db.lock.Lock()
|
||||
defer b.db.lock.Unlock()
|
||||
|
||||
for _, kv := range b.writes {
|
||||
if kv.del {
|
||||
delete(b.db.db, string(kv.k))
|
||||
continue
|
||||
}
|
||||
b.db.db[string(kv.k)] = kv.v
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *memBatch) ValueSize() int {
|
||||
return b.size
|
||||
}
|
||||
|
||||
func (b *memBatch) Reset() {
|
||||
b.writes = b.writes[:0]
|
||||
b.size = 0
|
||||
}
|
298
ethdb/memorydb/memorydb.go
Normal file
298
ethdb/memorydb/memorydb.go
Normal file
@ -0,0 +1,298 @@
|
||||
// Copyright 2014 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// Package memorydb implements the key-value database layer based on memory maps.
|
||||
package memorydb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
)
|
||||
|
||||
var (
|
||||
// errMemorydbClosed is returned if a memory database was already closed at the
|
||||
// invocation of a data access operation.
|
||||
errMemorydbClosed = errors.New("database closed")
|
||||
|
||||
// errMemorydbNotFound is returned if a key is requested that is not found in
|
||||
// the provided memory database.
|
||||
errMemorydbNotFound = errors.New("not found")
|
||||
)
|
||||
|
||||
// MemoryDatabase is an ephemeral key-value store. Apart from basic data storage
|
||||
// functionality it also supports batch writes and iterating over the keyspace in
|
||||
// binary-alphabetical order.
|
||||
type MemoryDatabase struct {
|
||||
db map[string][]byte
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
// New returns a wrapped map with all the required database interface methods
|
||||
// implemented.
|
||||
func New() *MemoryDatabase {
|
||||
return &MemoryDatabase{
|
||||
db: make(map[string][]byte),
|
||||
}
|
||||
}
|
||||
|
||||
// NewWithCap returns a wrapped map pre-allocated to the provided capcity with
|
||||
// all the required database interface methods implemented.
|
||||
func NewWithCap(size int) *MemoryDatabase {
|
||||
return &MemoryDatabase{
|
||||
db: make(map[string][]byte, size),
|
||||
}
|
||||
}
|
||||
|
||||
// Close deallocates the internal map and ensures any consecutive data access op
|
||||
// failes with an error.
|
||||
func (db *MemoryDatabase) Close() error {
|
||||
db.lock.Lock()
|
||||
defer db.lock.Unlock()
|
||||
|
||||
db.db = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// Has retrieves if a key is present in the key-value store.
|
||||
func (db *MemoryDatabase) Has(key []byte) (bool, error) {
|
||||
db.lock.RLock()
|
||||
defer db.lock.RUnlock()
|
||||
|
||||
if db.db == nil {
|
||||
return false, errMemorydbClosed
|
||||
}
|
||||
_, ok := db.db[string(key)]
|
||||
return ok, nil
|
||||
}
|
||||
|
||||
// Get retrieves the given key if it's present in the key-value store.
|
||||
func (db *MemoryDatabase) Get(key []byte) ([]byte, error) {
|
||||
db.lock.RLock()
|
||||
defer db.lock.RUnlock()
|
||||
|
||||
if db.db == nil {
|
||||
return nil, errMemorydbClosed
|
||||
}
|
||||
if entry, ok := db.db[string(key)]; ok {
|
||||
return common.CopyBytes(entry), nil
|
||||
}
|
||||
return nil, errMemorydbNotFound
|
||||
}
|
||||
|
||||
// Put inserts the given value into the key-value store.
|
||||
func (db *MemoryDatabase) Put(key []byte, value []byte) error {
|
||||
db.lock.Lock()
|
||||
defer db.lock.Unlock()
|
||||
|
||||
if db.db == nil {
|
||||
return errMemorydbClosed
|
||||
}
|
||||
db.db[string(key)] = common.CopyBytes(value)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete removes the key from the key-value store.
|
||||
func (db *MemoryDatabase) Delete(key []byte) error {
|
||||
db.lock.Lock()
|
||||
defer db.lock.Unlock()
|
||||
|
||||
if db.db == nil {
|
||||
return errMemorydbClosed
|
||||
}
|
||||
delete(db.db, string(key))
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewBatch creates a write-only key-value store that buffers changes to its host
|
||||
// database until a final write is called.
|
||||
func (db *MemoryDatabase) NewBatch() ethdb.Batch {
|
||||
return &memoryBatch{
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
// NewIterator creates a binary-alphabetical iterator over the entire keyspace
|
||||
// contained within the memory database.
|
||||
func (db *MemoryDatabase) NewIterator() ethdb.Iterator {
|
||||
return db.NewIteratorWithPrefix(nil)
|
||||
}
|
||||
|
||||
// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset
|
||||
// of database content with a particular key prefix.
|
||||
func (db *MemoryDatabase) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator {
|
||||
db.lock.RLock()
|
||||
defer db.lock.RUnlock()
|
||||
|
||||
var (
|
||||
pr = string(prefix)
|
||||
keys = make([]string, 0, len(db.db))
|
||||
values = make([][]byte, 0, len(db.db))
|
||||
)
|
||||
// Collect the keys from the memory database corresponding to the given prefix
|
||||
for key := range db.db {
|
||||
if strings.HasPrefix(key, pr) {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
}
|
||||
// Sort the items and retrieve the associated values
|
||||
sort.Strings(keys)
|
||||
for _, key := range keys {
|
||||
values = append(values, db.db[key])
|
||||
}
|
||||
return &memoryIterator{
|
||||
keys: keys,
|
||||
values: values,
|
||||
}
|
||||
}
|
||||
|
||||
// Stat returns a particular internal stat of the database.
|
||||
func (db *MemoryDatabase) Stat(property string) (string, error) {
|
||||
return "", errors.New("unknown property")
|
||||
}
|
||||
|
||||
// Compact is not supported on a memory database.
|
||||
func (db *MemoryDatabase) Compact(start []byte, limit []byte) error {
|
||||
return errors.New("unsupported operation")
|
||||
}
|
||||
|
||||
// Len returns the number of entries currently present in the memory database.
|
||||
//
|
||||
// Note, this method is only used for testing (i.e. not public in general) and
|
||||
// does not have explicit checks for closed-ness to allow simpler testing code.
|
||||
func (db *MemoryDatabase) Len() int {
|
||||
db.lock.RLock()
|
||||
defer db.lock.RUnlock()
|
||||
|
||||
return len(db.db)
|
||||
}
|
||||
|
||||
// keyvalue is a key-value tuple tagged with a deletion field to allow creating
|
||||
// memory-database write batches.
|
||||
type keyvalue struct {
|
||||
key []byte
|
||||
value []byte
|
||||
delete bool
|
||||
}
|
||||
|
||||
// memoryBatch is a write-only memory batch that commits changes to its host
|
||||
// database when Write is called. A batch cannot be used concurrently.
|
||||
type memoryBatch struct {
|
||||
db *MemoryDatabase
|
||||
writes []keyvalue
|
||||
size int
|
||||
}
|
||||
|
||||
// Put inserts the given value into the batch for later committing.
|
||||
func (b *memoryBatch) Put(key, value []byte) error {
|
||||
b.writes = append(b.writes, keyvalue{common.CopyBytes(key), common.CopyBytes(value), false})
|
||||
b.size += len(value)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete inserts the a key removal into the batch for later committing.
|
||||
func (b *memoryBatch) Delete(key []byte) error {
|
||||
b.writes = append(b.writes, keyvalue{common.CopyBytes(key), nil, true})
|
||||
b.size += 1
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValueSize retrieves the amount of data queued up for writing.
|
||||
func (b *memoryBatch) ValueSize() int {
|
||||
return b.size
|
||||
}
|
||||
|
||||
// Write flushes any accumulated data to the memory database.
|
||||
func (b *memoryBatch) Write() error {
|
||||
b.db.lock.Lock()
|
||||
defer b.db.lock.Unlock()
|
||||
|
||||
for _, keyvalue := range b.writes {
|
||||
if keyvalue.delete {
|
||||
delete(b.db.db, string(keyvalue.key))
|
||||
continue
|
||||
}
|
||||
b.db.db[string(keyvalue.key)] = keyvalue.value
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reset resets the batch for reuse.
|
||||
func (b *memoryBatch) Reset() {
|
||||
b.writes = b.writes[:0]
|
||||
b.size = 0
|
||||
}
|
||||
|
||||
// memoryIterator can walk over the (potentially partial) keyspace of a memory
|
||||
// key value store. Internally it is a deep copy of the entire iterated state,
|
||||
// sorted by keys.
|
||||
type memoryIterator struct {
|
||||
inited bool
|
||||
keys []string
|
||||
values [][]byte
|
||||
}
|
||||
|
||||
// Next moves the iterator to the next key/value pair. It returns whether the
|
||||
// iterator is exhausted.
|
||||
func (it *memoryIterator) Next() bool {
|
||||
// If the iterator was not yet initialized, do it now
|
||||
if !it.inited {
|
||||
it.inited = true
|
||||
return len(it.keys) > 0
|
||||
}
|
||||
// Iterator already initialize, advance it
|
||||
if len(it.keys) > 0 {
|
||||
it.keys = it.keys[1:]
|
||||
it.values = it.values[1:]
|
||||
}
|
||||
return len(it.keys) > 0
|
||||
}
|
||||
|
||||
// Error returns any accumulated error. Exhausting all the key/value pairs
|
||||
// is not considered to be an error. A memory iterator cannot encounter errors.
|
||||
func (it *memoryIterator) Error() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Key returns the key of the current key/value pair, or nil if done. The caller
|
||||
// should not modify the contents of the returned slice, and its contents may
|
||||
// change on the next call to Next.
|
||||
func (it *memoryIterator) Key() []byte {
|
||||
if len(it.keys) > 0 {
|
||||
return []byte(it.keys[0])
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Value returns the value of the current key/value pair, or nil if done. The
|
||||
// caller should not modify the contents of the returned slice, and its contents
|
||||
// may change on the next call to Next.
|
||||
func (it *memoryIterator) Value() []byte {
|
||||
if len(it.values) > 0 {
|
||||
return it.values[0]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Release releases associated resources. Release should always succeed and can
|
||||
// be called multiple times without causing error.
|
||||
func (it *memoryIterator) Release() {
|
||||
it.keys, it.values = nil, nil
|
||||
}
|
100
ethdb/memorydb/memorydb_test.go
Normal file
100
ethdb/memorydb/memorydb_test.go
Normal file
@ -0,0 +1,100 @@
|
||||
// Copyright 2019 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package memorydb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Tests that key-value iteration on top of a memory database works.
|
||||
func TestMemoryDBIterator(t *testing.T) {
|
||||
tests := []struct {
|
||||
content map[string]string
|
||||
prefix string
|
||||
order []string
|
||||
}{
|
||||
// Empty databases should be iterable
|
||||
{map[string]string{}, "", nil},
|
||||
{map[string]string{}, "non-existent-prefix", nil},
|
||||
|
||||
// Single-item databases should be iterable
|
||||
{map[string]string{"key": "val"}, "", []string{"key"}},
|
||||
{map[string]string{"key": "val"}, "k", []string{"key"}},
|
||||
{map[string]string{"key": "val"}, "l", nil},
|
||||
|
||||
// Multi-item databases should be fully iterable
|
||||
{
|
||||
map[string]string{"k1": "v1", "k5": "v5", "k2": "v2", "k4": "v4", "k3": "v3"},
|
||||
"",
|
||||
[]string{"k1", "k2", "k3", "k4", "k5"},
|
||||
},
|
||||
{
|
||||
map[string]string{"k1": "v1", "k5": "v5", "k2": "v2", "k4": "v4", "k3": "v3"},
|
||||
"k",
|
||||
[]string{"k1", "k2", "k3", "k4", "k5"},
|
||||
},
|
||||
{
|
||||
map[string]string{"k1": "v1", "k5": "v5", "k2": "v2", "k4": "v4", "k3": "v3"},
|
||||
"l",
|
||||
nil,
|
||||
},
|
||||
// Multi-item databases should be prefix-iterable
|
||||
{
|
||||
map[string]string{
|
||||
"ka1": "va1", "ka5": "va5", "ka2": "va2", "ka4": "va4", "ka3": "va3",
|
||||
"kb1": "vb1", "kb5": "vb5", "kb2": "vb2", "kb4": "vb4", "kb3": "vb3",
|
||||
},
|
||||
"ka",
|
||||
[]string{"ka1", "ka2", "ka3", "ka4", "ka5"},
|
||||
},
|
||||
{
|
||||
map[string]string{
|
||||
"ka1": "va1", "ka5": "va5", "ka2": "va2", "ka4": "va4", "ka3": "va3",
|
||||
"kb1": "vb1", "kb5": "vb5", "kb2": "vb2", "kb4": "vb4", "kb3": "vb3",
|
||||
},
|
||||
"kc",
|
||||
nil,
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
// Create the key-value data store
|
||||
db := New()
|
||||
for key, val := range tt.content {
|
||||
if err := db.Put([]byte(key), []byte(val)); err != nil {
|
||||
t.Fatalf("test %d: failed to insert item %s:%s into database: %v", i, key, val, err)
|
||||
}
|
||||
}
|
||||
// Iterate over the database with the given configs and verify the results
|
||||
it, idx := db.NewIteratorWithPrefix([]byte(tt.prefix)), 0
|
||||
for it.Next() {
|
||||
if !bytes.Equal(it.Key(), []byte(tt.order[idx])) {
|
||||
t.Errorf("test %d: item %d: key mismatch: have %s, want %s", i, idx, string(it.Key()), tt.order[idx])
|
||||
}
|
||||
if !bytes.Equal(it.Value(), []byte(tt.content[tt.order[idx]])) {
|
||||
t.Errorf("test %d: item %d: value mismatch: have %s, want %s", i, idx, string(it.Value()), tt.content[tt.order[idx]])
|
||||
}
|
||||
idx++
|
||||
}
|
||||
if err := it.Error(); err != nil {
|
||||
t.Errorf("test %d: iteration failed: %v", i, err)
|
||||
}
|
||||
if idx != len(tt.order) {
|
||||
t.Errorf("test %d: iteration terminated prematurely: have %d, want %d", i, idx, len(tt.order))
|
||||
}
|
||||
}
|
||||
}
|
@ -1,51 +0,0 @@
|
||||
// Copyright 2014 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package ethdb
|
||||
|
||||
type table struct {
|
||||
db Database
|
||||
prefix string
|
||||
}
|
||||
|
||||
// NewTable returns a Database object that prefixes all keys with a given
|
||||
// string.
|
||||
func NewTable(db Database, prefix string) Database {
|
||||
return &table{
|
||||
db: db,
|
||||
prefix: prefix,
|
||||
}
|
||||
}
|
||||
|
||||
func (dt *table) Put(key []byte, value []byte) error {
|
||||
return dt.db.Put(append([]byte(dt.prefix), key...), value)
|
||||
}
|
||||
|
||||
func (dt *table) Has(key []byte) (bool, error) {
|
||||
return dt.db.Has(append([]byte(dt.prefix), key...))
|
||||
}
|
||||
|
||||
func (dt *table) Get(key []byte) ([]byte, error) {
|
||||
return dt.db.Get(append([]byte(dt.prefix), key...))
|
||||
}
|
||||
|
||||
func (dt *table) Delete(key []byte) error {
|
||||
return dt.db.Delete(append([]byte(dt.prefix), key...))
|
||||
}
|
||||
|
||||
func (dt *table) Close() {
|
||||
// Do nothing; don't close the underlying DB.
|
||||
}
|
@ -1,51 +0,0 @@
|
||||
// Copyright 2014 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package ethdb
|
||||
|
||||
type tableBatch struct {
|
||||
batch Batch
|
||||
prefix string
|
||||
}
|
||||
|
||||
// NewTableBatch returns a Batch object which prefixes all keys with a given string.
|
||||
func NewTableBatch(db Database, prefix string) Batch {
|
||||
return &tableBatch{db.NewBatch(), prefix}
|
||||
}
|
||||
|
||||
func (dt *table) NewBatch() Batch {
|
||||
return &tableBatch{dt.db.NewBatch(), dt.prefix}
|
||||
}
|
||||
|
||||
func (tb *tableBatch) Put(key, value []byte) error {
|
||||
return tb.batch.Put(append([]byte(tb.prefix), key...), value)
|
||||
}
|
||||
|
||||
func (tb *tableBatch) Delete(key []byte) error {
|
||||
return tb.batch.Delete(append([]byte(tb.prefix), key...))
|
||||
}
|
||||
|
||||
func (tb *tableBatch) Write() error {
|
||||
return tb.batch.Write()
|
||||
}
|
||||
|
||||
func (tb *tableBatch) ValueSize() int {
|
||||
return tb.batch.ValueSize()
|
||||
}
|
||||
|
||||
func (tb *tableBatch) Reset() {
|
||||
tb.batch.Reset()
|
||||
}
|
Reference in New Issue
Block a user