core/rawdb: support starting offset for future deletion

This commit is contained in:
Martin Holst Swende
2019-04-14 21:25:32 +02:00
committed by Péter Szilágyi
parent 80469bea0c
commit 331de17e4d
2 changed files with 175 additions and 30 deletions

View File

@ -81,9 +81,14 @@ type freezerTable struct {
head *os.File // File descriptor for the data head of the table
files map[uint32]*os.File // open files
headId uint32 // number of the currently active head file
tailId uint32 // number of the earliest file
index *os.File // File descriptor for the indexEntry file of the table
items uint64 // Number of items stored in the table
// In the case that old items are deleted (from the tail), we use itemOffset
// to count how many historic items have gone missing.
items uint64 // Number of items stored in the table (including items removed from tail)
itemOffset uint32 // Offset (number of discarded items)
headBytes uint32 // Number of bytes written to the head file
readMeter metrics.Meter // Meter for measuring the effective amount of data read
writeMeter metrics.Meter // Meter for measuring the effective amount of data written
@ -164,10 +169,19 @@ func (t *freezerTable) repair() error {
// Open the head file
var (
firstIndex indexEntry
lastIndex indexEntry
contentSize int64
contentExp int64
)
// Read index zero, determine what file is the earliest
// and what item offset to use
t.index.ReadAt(buffer, 0)
firstIndex.unmarshalBinary(buffer)
t.tailId = firstIndex.offset
t.itemOffset = firstIndex.filenum
t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
lastIndex.unmarshalBinary(buffer)
t.head, err = t.openFile(lastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND)
@ -225,7 +239,7 @@ func (t *freezerTable) repair() error {
return err
}
// Update the item and byte counters and return
t.items = uint64(offsetsSize/indexEntrySize - 1) // last indexEntry points to the end of the data file
t.items = uint64(t.itemOffset) + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file
t.headBytes = uint32(contentSize)
t.headId = lastIndex.filenum
@ -245,7 +259,7 @@ func (t *freezerTable) preopen() (err error) {
// The repair might have already opened (some) files
t.releaseFilesAfter(0, false)
// Open all except head in RDONLY
for i := uint32(0); i < t.headId; i++ {
for i := uint32(t.tailId); i < t.headId; i++ {
if _, err = t.openFile(i, os.O_RDONLY); err != nil {
return err
}
@ -259,7 +273,8 @@ func (t *freezerTable) preopen() (err error) {
func (t *freezerTable) truncate(items uint64) error {
t.lock.Lock()
defer t.lock.Unlock()
// If out item count is corrent, don't do anything
// If our item count is correct, don't do anything
if atomic.LoadUint64(&t.items) <= items {
return nil
}
@ -275,6 +290,7 @@ func (t *freezerTable) truncate(items uint64) error {
}
var expected indexEntry
expected.unmarshalBinary(buffer)
// We might need to truncate back to older files
if expected.filenum != t.headId {
// If already open for reading, force-reopen for writing
@ -290,7 +306,6 @@ func (t *freezerTable) truncate(items uint64) error {
t.head = newHead
atomic.StoreUint32(&t.headId, expected.filenum)
}
if err := t.head.Truncate(int64(expected.offset)); err != nil {
return err
}
@ -330,9 +345,9 @@ func (t *freezerTable) openFile(num uint32, flag int) (f *os.File, err error) {
if f, exist = t.files[num]; !exist {
var name string
if t.noCompression {
name = fmt.Sprintf("%s.%d.rdat", t.name, num)
name = fmt.Sprintf("%s.%04d.rdat", t.name, num)
} else {
name = fmt.Sprintf("%s.%d.cdat", t.name, num)
name = fmt.Sprintf("%s.%04d.cdat", t.name, num)
}
f, err = os.OpenFile(filepath.Join(t.path, name), flag, 0644)
if err != nil {
@ -376,11 +391,13 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
t.lock.RLock()
// Ensure the table is still accessible
if t.index == nil || t.head == nil {
t.lock.RUnlock()
return errClosed
}
// Ensure only the next item can be written, nothing else
if atomic.LoadUint64(&t.items) != item {
panic(fmt.Sprintf("appending unexpected item: want %d, have %d", t.items, item))
t.lock.RUnlock()
return fmt.Errorf("appending unexpected item: want %d, have %d", t.items, item)
}
// Encode the blob and write it into the data file
if !t.noCompression {
@ -461,13 +478,20 @@ func (t *freezerTable) Retrieve(item uint64) ([]byte, error) {
if atomic.LoadUint64(&t.items) <= item {
return nil, errOutOfBounds
}
// Ensure the item was not deleted from the tail either
offset := atomic.LoadUint32(&t.itemOffset)
if uint64(offset) > item {
return nil, errOutOfBounds
}
t.lock.RLock()
startOffset, endOffset, filenum, err := t.getBounds(item)
startOffset, endOffset, filenum, err := t.getBounds(item - uint64(offset))
if err != nil {
t.lock.RUnlock()
return nil, err
}
dataFile, exist := t.files[filenum]
if !exist {
t.lock.RUnlock()
return nil, fmt.Errorf("missing data file %d", filenum)
}
// Retrieve the data itself, decompress and return
@ -499,3 +523,26 @@ func (t *freezerTable) Sync() error {
}
return t.head.Sync()
}
// printIndex is a debug print utility function for testing
func (t *freezerTable) printIndex() {
buf := make([]byte, indexEntrySize)
fmt.Printf("|-----------------|\n")
fmt.Printf("| fileno | offset |\n")
fmt.Printf("|--------+--------|\n")
for i := uint64(0); ; i++ {
if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)); err != nil {
break
}
var entry indexEntry
entry.unmarshalBinary(buf)
fmt.Printf("| %03d | %03d | \n", entry.filenum, entry.offset)
if i > 100 {
fmt.Printf(" ... \n")
break
}
}
fmt.Printf("|-----------------|\n")
}