vendor, ethdb: resume write operation asap (#17144)

* vendor: update leveldb

* ethdb: remove useless warning log
This commit is contained in:
gary rong
2018-07-12 16:07:51 +08:00
committed by Péter Szilágyi
parent a9835c1816
commit e8824f6e74
6 changed files with 276 additions and 173 deletions

View File

@@ -640,6 +640,16 @@ func (db *DB) tableNeedCompaction() bool {
return v.needCompaction()
}
// resumeWrite returns an indicator whether we should resume write operation if enough level0 files are compacted.
func (db *DB) resumeWrite() bool {
v := db.s.version()
defer v.release()
if v.tLen(0) < db.s.o.GetWriteL0PauseTrigger() {
return true
}
return false
}
func (db *DB) pauseCompaction(ch chan<- struct{}) {
select {
case ch <- struct{}{}:
@@ -653,6 +663,7 @@ type cCmd interface {
}
type cAuto struct {
// Note for table compaction, an empty ackC represents it's a compaction waiting command.
ackC chan<- error
}
@@ -765,8 +776,10 @@ func (db *DB) mCompaction() {
}
func (db *DB) tCompaction() {
var x cCmd
var ackQ []cCmd
var (
x cCmd
ackQ, waitQ []cCmd
)
defer func() {
if x := recover(); x != nil {
@@ -778,6 +791,10 @@ func (db *DB) tCompaction() {
ackQ[i].ack(ErrClosed)
ackQ[i] = nil
}
for i := range waitQ {
waitQ[i].ack(ErrClosed)
waitQ[i] = nil
}
if x != nil {
x.ack(ErrClosed)
}
@@ -795,12 +812,25 @@ func (db *DB) tCompaction() {
return
default:
}
// Resume write operation as soon as possible.
if len(waitQ) > 0 && db.resumeWrite() {
for i := range waitQ {
waitQ[i].ack(nil)
waitQ[i] = nil
}
waitQ = waitQ[:0]
}
} else {
for i := range ackQ {
ackQ[i].ack(nil)
ackQ[i] = nil
}
ackQ = ackQ[:0]
for i := range waitQ {
waitQ[i].ack(nil)
waitQ[i] = nil
}
waitQ = waitQ[:0]
select {
case x = <-db.tcompCmdC:
case ch := <-db.tcompPauseC:
@@ -813,7 +843,11 @@ func (db *DB) tCompaction() {
if x != nil {
switch cmd := x.(type) {
case cAuto:
ackQ = append(ackQ, x)
if cmd.ackC != nil {
waitQ = append(waitQ, x)
} else {
ackQ = append(ackQ, x)
}
case cRange:
x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
default: