core/rawdb, cmd, ethdb, eth: implement freezer tail deletion (#23954)

* core/rawdb, cmd, ethdb, eth: implement freezer tail deletion

* core/rawdb: address comments from martin and sina

* core/rawdb: fixes cornercase in tail deletion

* core/rawdb: separate metadata into a standalone file

* core/rawdb: remove unused code

* core/rawdb: add random test

* core/rawdb: polish code

* core/rawdb: fsync meta file before manipulating the index

* core/rawdb: fix typo

* core/rawdb: address comments
This commit is contained in:
rjl493456442
2022-03-10 16:37:23 +08:00
committed by GitHub
parent 8c8a9e5ca1
commit 538a868384
14 changed files with 1102 additions and 132 deletions

View File

@ -47,20 +47,19 @@ var (
)
// indexEntry contains the number/id of the file that the data resides in, aswell as the
// offset within the file to the end of the data
// offset within the file to the end of the data.
// In serialized form, the filenum is stored as uint16.
type indexEntry struct {
filenum uint32 // stored as uint16 ( 2 bytes)
offset uint32 // stored as uint32 ( 4 bytes)
filenum uint32 // stored as uint16 ( 2 bytes )
offset uint32 // stored as uint32 ( 4 bytes )
}
const indexEntrySize = 6
// unmarshalBinary deserializes binary b into the rawIndex entry.
func (i *indexEntry) unmarshalBinary(b []byte) error {
func (i *indexEntry) unmarshalBinary(b []byte) {
i.filenum = uint32(binary.BigEndian.Uint16(b[:2]))
i.offset = binary.BigEndian.Uint32(b[2:6])
return nil
}
// append adds the encoded entry to the end of b.
@ -75,14 +74,14 @@ func (i *indexEntry) append(b []byte) []byte {
// bounds returns the start- and end- offsets, and the file number of where to
// read there data item marked by the two index entries. The two entries are
// assumed to be sequential.
func (start *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) {
if start.filenum != end.filenum {
func (i *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) {
if i.filenum != end.filenum {
// If a piece of data 'crosses' a data-file,
// it's actually in one piece on the second data-file.
// We return a zero-indexEntry for the second file as start
return 0, end.offset, end.filenum
}
return start.offset, end.offset, end.filenum
return i.offset, end.offset, end.filenum
}
// freezerTable represents a single chained data table within the freezer (e.g. blocks).
@ -92,7 +91,15 @@ type freezerTable struct {
// WARNING: The `items` field is accessed atomically. On 32 bit platforms, only
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
items uint64 // Number of items stored in the table (including items removed from tail)
items uint64 // Number of items stored in the table (including items removed from tail)
itemOffset uint64 // Number of items removed from the table
// itemHidden is the number of items marked as deleted. Tail deletion is
// only supported at file level which means the actual deletion will be
// delayed until the entire data file is marked as deleted. Before that
// these items will be hidden to prevent being visited again. The value
// should never be lower than itemOffset.
itemHidden uint64
noCompression bool // if true, disables snappy compression. Note: does not work retroactively
readonly bool
@ -101,14 +108,11 @@ type freezerTable struct {
path string
head *os.File // File descriptor for the data head of the table
index *os.File // File descriptor for the indexEntry file of the table
meta *os.File // File descriptor for metadata of the table
files map[uint32]*os.File // open files
headId uint32 // number of the currently active head file
tailId uint32 // number of the earliest file
index *os.File // File descriptor for the indexEntry file of the table
// In the case that old items are deleted (from the tail), we use itemOffset
// to count how many historic items have gone missing.
itemOffset uint32 // Offset (number of discarded items)
headBytes int64 // Number of bytes written to the head file
readMeter metrics.Meter // Meter for measuring the effective amount of data read
@ -124,46 +128,8 @@ func NewFreezerTable(path, name string, disableSnappy, readonly bool) (*freezerT
return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, freezerTableSize, disableSnappy, readonly)
}
// openFreezerFileForAppend opens a freezer table file and seeks to the end
func openFreezerFileForAppend(filename string) (*os.File, error) {
// Open the file without the O_APPEND flag
// because it has differing behaviour during Truncate operations
// on different OS's
file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
// Seek to end for append
if _, err = file.Seek(0, io.SeekEnd); err != nil {
return nil, err
}
return file, nil
}
// openFreezerFileForReadOnly opens a freezer table file for read only access
func openFreezerFileForReadOnly(filename string) (*os.File, error) {
return os.OpenFile(filename, os.O_RDONLY, 0644)
}
// openFreezerFileTruncated opens a freezer table making sure it is truncated
func openFreezerFileTruncated(filename string) (*os.File, error) {
return os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
}
// truncateFreezerFile resizes a freezer table file and seeks to the end
func truncateFreezerFile(file *os.File, size int64) error {
if err := file.Truncate(size); err != nil {
return err
}
// Seek to end for append
if _, err := file.Seek(0, io.SeekEnd); err != nil {
return err
}
return nil
}
// newTable opens a freezer table, creating the data and index files if they are
// non existent. Both files are truncated to the shortest common length to ensure
// non-existent. Both files are truncated to the shortest common length to ensure
// they don't go out of sync.
func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression, readonly bool) (*freezerTable, error) {
// Ensure the containing directory exists and open the indexEntry file
@ -172,28 +138,40 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr
}
var idxName string
if noCompression {
// Raw idx
idxName = fmt.Sprintf("%s.ridx", name)
idxName = fmt.Sprintf("%s.ridx", name) // raw index file
} else {
// Compressed idx
idxName = fmt.Sprintf("%s.cidx", name)
idxName = fmt.Sprintf("%s.cidx", name) // compressed index file
}
var (
err error
offsets *os.File
err error
index *os.File
meta *os.File
)
if readonly {
// Will fail if table doesn't exist
offsets, err = openFreezerFileForReadOnly(filepath.Join(path, idxName))
index, err = openFreezerFileForReadOnly(filepath.Join(path, idxName))
if err != nil {
return nil, err
}
// Will fail if the table is legacy(no metadata)
meta, err = openFreezerFileForReadOnly(filepath.Join(path, fmt.Sprintf("%s.meta", name)))
if err != nil {
return nil, err
}
} else {
offsets, err = openFreezerFileForAppend(filepath.Join(path, idxName))
}
if err != nil {
return nil, err
index, err = openFreezerFileForAppend(filepath.Join(path, idxName))
if err != nil {
return nil, err
}
meta, err = openFreezerFileForAppend(filepath.Join(path, fmt.Sprintf("%s.meta", name)))
if err != nil {
return nil, err
}
}
// Create the table and repair any past inconsistency
tab := &freezerTable{
index: offsets,
index: index,
meta: meta,
files: make(map[uint32]*os.File),
readMeter: readMeter,
writeMeter: writeMeter,
@ -220,7 +198,7 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr
return tab, nil
}
// repair cross checks the head and the index file and truncates them to
// repair cross-checks the head and the index file and truncates them to
// be in sync with each other after a potential crash / data loss.
func (t *freezerTable) repair() error {
// Create a temporary offset buffer to init files with and read indexEntry into
@ -258,11 +236,27 @@ func (t *freezerTable) repair() error {
t.index.ReadAt(buffer, 0)
firstIndex.unmarshalBinary(buffer)
// Assign the tail fields with the first stored index.
// The total removed items is represented with an uint32,
// which is not enough in theory but enough in practice.
// TODO: use uint64 to represent total removed items.
t.tailId = firstIndex.filenum
t.itemOffset = firstIndex.offset
t.itemOffset = uint64(firstIndex.offset)
t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
lastIndex.unmarshalBinary(buffer)
// Load metadata from the file
meta, err := loadMetadata(t.meta, t.itemOffset)
if err != nil {
return err
}
t.itemHidden = meta.VirtualTail
// Read the last index, use the default value in case the freezer is empty
if offsetsSize == indexEntrySize {
lastIndex = indexEntry{filenum: t.tailId, offset: 0}
} else {
t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
lastIndex.unmarshalBinary(buffer)
}
if t.readonly {
t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForReadOnly)
} else {
@ -278,7 +272,6 @@ func (t *freezerTable) repair() error {
// Keep truncating both files until they come in sync
contentExp = int64(lastIndex.offset)
for contentExp != contentSize {
// Truncate the head file to the last offset pointer
if contentExp < contentSize {
@ -295,9 +288,16 @@ func (t *freezerTable) repair() error {
return err
}
offsetsSize -= indexEntrySize
t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
// Read the new head index, use the default value in case
// the freezer is already empty.
var newLastIndex indexEntry
newLastIndex.unmarshalBinary(buffer)
if offsetsSize == indexEntrySize {
newLastIndex = indexEntry{filenum: t.tailId, offset: 0}
} else {
t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
newLastIndex.unmarshalBinary(buffer)
}
// We might have slipped back into an earlier head-file here
if newLastIndex.filenum != lastIndex.filenum {
// Release earlier opened file
@ -325,12 +325,21 @@ func (t *freezerTable) repair() error {
if err := t.head.Sync(); err != nil {
return err
}
if err := t.meta.Sync(); err != nil {
return err
}
}
// Update the item and byte counters and return
t.items = uint64(t.itemOffset) + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file
t.items = t.itemOffset + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file
t.headBytes = contentSize
t.headId = lastIndex.filenum
// Delete the leftover files because of head deletion
t.releaseFilesAfter(t.headId, true)
// Delete the leftover files because of tail deletion
t.releaseFilesBefore(t.tailId, true)
// Close opened files and preopen all files
if err := t.preopen(); err != nil {
return err
@ -346,6 +355,7 @@ func (t *freezerTable) repair() error {
func (t *freezerTable) preopen() (err error) {
// The repair might have already opened (some) files
t.releaseFilesAfter(0, false)
// Open all except head in RDONLY
for i := t.tailId; i < t.headId; i++ {
if _, err = t.openFile(i, openFreezerFileForReadOnly); err != nil {
@ -361,16 +371,19 @@ func (t *freezerTable) preopen() (err error) {
return err
}
// truncate discards any recent data above the provided threshold number.
func (t *freezerTable) truncate(items uint64) error {
// truncateHead discards any recent data above the provided threshold number.
func (t *freezerTable) truncateHead(items uint64) error {
t.lock.Lock()
defer t.lock.Unlock()
// If our item count is correct, don't do anything
// Ensure the given truncate target falls in the correct range
existing := atomic.LoadUint64(&t.items)
if existing <= items {
return nil
}
if items < atomic.LoadUint64(&t.itemHidden) {
return errors.New("truncation below tail")
}
// We need to truncate, save the old size for metrics tracking
oldSize, err := t.sizeNolock()
if err != nil {
@ -382,17 +395,24 @@ func (t *freezerTable) truncate(items uint64) error {
log = t.logger.Warn // Only loud warn if we delete multiple items
}
log("Truncating freezer table", "items", existing, "limit", items)
if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil {
// Truncate the index file first, the tail position is also considered
// when calculating the new freezer table length.
length := items - atomic.LoadUint64(&t.itemOffset)
if err := truncateFreezerFile(t.index, int64(length+1)*indexEntrySize); err != nil {
return err
}
// Calculate the new expected size of the data file and truncate it
buffer := make([]byte, indexEntrySize)
if _, err := t.index.ReadAt(buffer, int64(items*indexEntrySize)); err != nil {
return err
}
var expected indexEntry
expected.unmarshalBinary(buffer)
if length == 0 {
expected = indexEntry{filenum: t.tailId, offset: 0}
} else {
buffer := make([]byte, indexEntrySize)
if _, err := t.index.ReadAt(buffer, int64(length*indexEntrySize)); err != nil {
return err
}
expected.unmarshalBinary(buffer)
}
// We might need to truncate back to older files
if expected.filenum != t.headId {
// If already open for reading, force-reopen for writing
@ -421,7 +441,110 @@ func (t *freezerTable) truncate(items uint64) error {
return err
}
t.sizeGauge.Dec(int64(oldSize - newSize))
return nil
}
// truncateTail discards any recent data before the provided threshold number.
func (t *freezerTable) truncateTail(items uint64) error {
t.lock.Lock()
defer t.lock.Unlock()
// Ensure the given truncate target falls in the correct range
if atomic.LoadUint64(&t.itemHidden) >= items {
return nil
}
if atomic.LoadUint64(&t.items) < items {
return errors.New("truncation above head")
}
// Load the new tail index by the given new tail position
var (
newTailId uint32
buffer = make([]byte, indexEntrySize)
)
if atomic.LoadUint64(&t.items) == items {
newTailId = t.headId
} else {
offset := items - atomic.LoadUint64(&t.itemOffset)
if _, err := t.index.ReadAt(buffer, int64((offset+1)*indexEntrySize)); err != nil {
return err
}
var newTail indexEntry
newTail.unmarshalBinary(buffer)
newTailId = newTail.filenum
}
// Update the virtual tail marker and hidden these entries in table.
atomic.StoreUint64(&t.itemHidden, items)
if err := writeMetadata(t.meta, newMetadata(items)); err != nil {
return err
}
// Hidden items still fall in the current tail file, no data file
// can be dropped.
if t.tailId == newTailId {
return nil
}
// Hidden items fall in the incorrect range, returns the error.
if t.tailId > newTailId {
return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, newTailId)
}
// Hidden items exceed the current tail file, drop the relevant
// data files. We need to truncate, save the old size for metrics
// tracking.
oldSize, err := t.sizeNolock()
if err != nil {
return err
}
// Count how many items can be deleted from the file.
var (
newDeleted = items
deleted = atomic.LoadUint64(&t.itemOffset)
)
for current := items - 1; current >= deleted; current -= 1 {
if _, err := t.index.ReadAt(buffer, int64((current-deleted+1)*indexEntrySize)); err != nil {
return err
}
var pre indexEntry
pre.unmarshalBinary(buffer)
if pre.filenum != newTailId {
break
}
newDeleted = current
}
// Commit the changes of metadata file first before manipulating
// the indexes file.
if err := t.meta.Sync(); err != nil {
return err
}
// Truncate the deleted index entries from the index file.
err = copyFrom(t.index.Name(), t.index.Name(), indexEntrySize*(newDeleted-deleted+1), func(f *os.File) error {
tailIndex := indexEntry{
filenum: newTailId,
offset: uint32(newDeleted),
}
_, err := f.Write(tailIndex.append(nil))
return err
})
if err != nil {
return err
}
// Reopen the modified index file to load the changes
if err := t.index.Close(); err != nil {
return err
}
t.index, err = openFreezerFileForAppend(t.index.Name())
if err != nil {
return err
}
// Release any files before the current tail
t.tailId = newTailId
atomic.StoreUint64(&t.itemOffset, newDeleted)
t.releaseFilesBefore(t.tailId, true)
// Retrieve the new size and update the total size counter
newSize, err := t.sizeNolock()
if err != nil {
return err
}
t.sizeGauge.Dec(int64(oldSize - newSize))
return nil
}
@ -436,6 +559,11 @@ func (t *freezerTable) Close() error {
}
t.index = nil
if err := t.meta.Close(); err != nil {
errs = append(errs, err)
}
t.meta = nil
for _, f := range t.files {
if err := f.Close(); err != nil {
errs = append(errs, err)
@ -490,6 +618,19 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) {
}
}
// releaseFilesBefore closes all open files with a lower number, and optionally also deletes the files
func (t *freezerTable) releaseFilesBefore(num uint32, remove bool) {
for fnum, f := range t.files {
if fnum < num {
delete(t.files, fnum)
f.Close()
if remove {
os.Remove(f.Name())
}
}
}
}
// getIndices returns the index entries for the given from-item, covering 'count' items.
// N.B: The actual number of returned indices for N items will always be N+1 (unless an
// error is returned).
@ -498,7 +639,7 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) {
// it will return error.
func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) {
// Apply the table-offset
from = from - uint64(t.itemOffset)
from = from - t.itemOffset
// For reading N items, we need N+1 indices.
buffer := make([]byte, (count+1)*indexEntrySize)
if _, err := t.index.ReadAt(buffer, int64(from*indexEntrySize)); err != nil {
@ -583,18 +724,21 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
t.lock.RLock()
defer t.lock.RUnlock()
// Ensure the table and the item is accessible
// Ensure the table and the item are accessible
if t.index == nil || t.head == nil {
return nil, nil, errClosed
}
itemCount := atomic.LoadUint64(&t.items) // max number
var (
items = atomic.LoadUint64(&t.items) // the total items(head + 1)
hidden = atomic.LoadUint64(&t.itemHidden) // the number of hidden items
)
// Ensure the start is written, not deleted from the tail, and that the
// caller actually wants something
if itemCount <= start || uint64(t.itemOffset) > start || count == 0 {
if items <= start || hidden > start || count == 0 {
return nil, nil, errOutOfBounds
}
if start+count > itemCount {
count = itemCount - start
if start+count > items {
count = items - start
}
var (
output = make([]byte, maxBytes) // Buffer to read data into
@ -670,10 +814,10 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
return output[:outputSize], sizes, nil
}
// has returns an indicator whether the specified number data
// exists in the freezer table.
// has returns an indicator whether the specified number data is still accessible
// in the freezer table.
func (t *freezerTable) has(number uint64) bool {
return atomic.LoadUint64(&t.items) > number
return atomic.LoadUint64(&t.items) > number && atomic.LoadUint64(&t.itemHidden) <= number
}
// size returns the total data size in the freezer table.
@ -727,6 +871,9 @@ func (t *freezerTable) Sync() error {
if err := t.index.Sync(); err != nil {
return err
}
if err := t.meta.Sync(); err != nil {
return err
}
return t.head.Sync()
}
@ -744,13 +891,20 @@ func (t *freezerTable) dumpIndexString(start, stop int64) string {
}
func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) {
meta, err := readMetadata(t.meta)
if err != nil {
fmt.Fprintf(w, "Failed to decode freezer table %v\n", err)
return
}
fmt.Fprintf(w, "Version %d deleted %d, hidden %d\n", meta.Version, atomic.LoadUint64(&t.itemOffset), atomic.LoadUint64(&t.itemHidden))
buf := make([]byte, indexEntrySize)
fmt.Fprintf(w, "| number | fileno | offset |\n")
fmt.Fprintf(w, "|--------|--------|--------|\n")
for i := uint64(start); ; i++ {
if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)); err != nil {
if _, err := t.index.ReadAt(buf, int64((i+1)*indexEntrySize)); err != nil {
break
}
var entry indexEntry