vendor: pull in latest changes for goleveldb (#15090)

This commit is contained in:
Péter Szilágyi
2017-09-05 16:04:32 +03:00
committed by Felix Lange
parent c91f7beb53
commit cd6c861dc5
8 changed files with 68 additions and 35 deletions

View File

@ -844,7 +844,7 @@ func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
// Has returns true if the DB does contains the given key.
//
// It is safe to modify the contents of the argument after Get returns.
// It is safe to modify the contents of the argument after Has returns.
func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
err = db.ok()
if err != nil {

View File

@ -289,7 +289,7 @@ func (db *DB) memCompaction() {
close(resumeC)
resumeC = nil
case <-db.closeC:
return
db.compactionExitTransact()
}
var (
@ -338,7 +338,7 @@ func (db *DB) memCompaction() {
case <-resumeC:
close(resumeC)
case <-db.closeC:
return
db.compactionExitTransact()
}
}

View File

@ -7,6 +7,7 @@
package leveldb
import (
"errors"
"sync/atomic"
"time"
@ -15,6 +16,10 @@ import (
"github.com/syndtr/goleveldb/leveldb/storage"
)
var (
errHasFrozenMem = errors.New("has frozen mem")
)
type memDB struct {
db *DB
*memdb.DB
@ -126,7 +131,7 @@ func (db *DB) newMem(n int) (mem *memDB, err error) {
defer db.memMu.Unlock()
if db.frozenMem != nil {
panic("still has frozen mem")
return nil, errHasFrozenMem
}
if db.journal == nil {

View File

@ -32,15 +32,24 @@ func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
}
func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
retryLimit := 3
retry:
// Wait for pending memdb compaction.
err = db.compTriggerWait(db.mcompCmdC)
if err != nil {
return
}
retryLimit--
// Create new memdb and journal.
mem, err = db.newMem(n)
if err != nil {
if err == errHasFrozenMem {
if retryLimit <= 0 {
panic("BUG: still has frozen memdb")
}
goto retry
}
return
}

View File

@ -8,6 +8,8 @@
//
// Create or open a database:
//
// // The returned DB instance is safe for concurrent use. Which mean that all
// // DB's methods may be called concurrently from multiple goroutine.
// db, err := leveldb.OpenFile("path/to/db", nil)
// ...
// defer db.Close()

View File

@ -234,14 +234,30 @@ func (fs *fileStorage) SetMeta(fd FileDesc) (err error) {
return
}
_, err = fmt.Fprintln(w, fsGenName(fd))
// Close the file first.
if cerr := w.Close(); cerr != nil {
fs.log(fmt.Sprintf("close CURRENT.%d: %v", fd.Num, cerr))
if err != nil {
fs.log(fmt.Sprintf("write CURRENT.%d: %v", fd.Num, err))
return
}
if err = w.Sync(); err != nil {
fs.log(fmt.Sprintf("flush CURRENT.%d: %v", fd.Num, err))
return
}
if err = w.Close(); err != nil {
fs.log(fmt.Sprintf("close CURRENT.%d: %v", fd.Num, err))
return
}
if err != nil {
return
}
return rename(path, filepath.Join(fs.path, "CURRENT"))
if err = rename(path, filepath.Join(fs.path, "CURRENT")); err != nil {
fs.log(fmt.Sprintf("rename CURRENT.%d: %v", fd.Num, err))
return
}
// Sync root directory.
if err = syncDir(fs.path); err != nil {
fs.log(fmt.Sprintf("syncDir: %v", err))
}
return
}
func (fs *fileStorage) GetMeta() (fd FileDesc, err error) {

View File

@ -581,6 +581,7 @@ func (r *Reader) readRawBlock(bh blockHandle, verifyChecksum bool) ([]byte, erro
case blockTypeSnappyCompression:
decLen, err := snappy.DecodedLen(data[:bh.length])
if err != nil {
r.bpool.Put(data)
return nil, r.newErrCorruptedBH(bh, err.Error())
}
decData := r.bpool.Get(decLen)