all: add read-only option to database (#22407)

* all: add read-only option to database

* all: fixes tests

* cmd/geth: migrate flags

* cmd/geth: fix the compact

* cmd/geth: fix the format

* cmd/geth: fix log

* cmd: add chain-readonly

* core: add readonly notion to freezer

* core/rawdb: add log

* core/rawdb: fix freezer close

* cmd: fix

* cmd, core: construct db

* core: update tests
This commit is contained in:
gary rong
2021-03-23 02:06:30 +08:00
committed by GitHub
parent aab35600bc
commit 0c70b83e00
26 changed files with 267 additions and 192 deletions

View File

@ -157,7 +157,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) {
b.Fatalf("cannot create temporary directory: %v", err)
}
defer os.RemoveAll(dir)
db, err = rawdb.NewLevelDBDatabase(dir, 128, 128, "")
db, err = rawdb.NewLevelDBDatabase(dir, 128, 128, "", false)
if err != nil {
b.Fatalf("cannot create temporary database: %v", err)
}
@ -255,7 +255,7 @@ func benchWriteChain(b *testing.B, full bool, count uint64) {
if err != nil {
b.Fatalf("cannot create temporary directory: %v", err)
}
db, err := rawdb.NewLevelDBDatabase(dir, 128, 1024, "")
db, err := rawdb.NewLevelDBDatabase(dir, 128, 1024, "", false)
if err != nil {
b.Fatalf("error opening database at %v: %v", dir, err)
}
@ -272,7 +272,7 @@ func benchReadChain(b *testing.B, full bool, count uint64) {
}
defer os.RemoveAll(dir)
db, err := rawdb.NewLevelDBDatabase(dir, 128, 1024, "")
db, err := rawdb.NewLevelDBDatabase(dir, 128, 1024, "", false)
if err != nil {
b.Fatalf("error opening database at %v: %v", dir, err)
}
@ -283,7 +283,7 @@ func benchReadChain(b *testing.B, full bool, count uint64) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
db, err := rawdb.NewLevelDBDatabase(dir, 128, 1024, "")
db, err := rawdb.NewLevelDBDatabase(dir, 128, 1024, "", false)
if err != nil {
b.Fatalf("error opening database at %v: %v", dir, err)
}

View File

@ -1762,7 +1762,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) {
}
os.RemoveAll(datadir)
db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "")
db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false)
if err != nil {
t.Fatalf("Failed to create persistent database: %v", err)
}
@ -1817,7 +1817,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) {
}
// Force run a freeze cycle
type freezer interface {
Freeze(threshold uint64)
Freeze(threshold uint64) error
Ancients() (uint64, error)
}
db.(freezer).Freeze(tt.freezeThreshold)
@ -1830,7 +1830,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) {
db.Close()
// Start a new blockchain back up and see where the repait leads us
db, err = rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "")
db, err = rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false)
if err != nil {
t.Fatalf("Failed to reopen persistent database: %v", err)
}

View File

@ -1961,7 +1961,7 @@ func testSetHead(t *testing.T, tt *rewindTest, snapshots bool) {
}
os.RemoveAll(datadir)
db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "")
db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false)
if err != nil {
t.Fatalf("Failed to create persistent database: %v", err)
}
@ -2023,7 +2023,7 @@ func testSetHead(t *testing.T, tt *rewindTest, snapshots bool) {
}
// Force run a freeze cycle
type freezer interface {
Freeze(threshold uint64)
Freeze(threshold uint64) error
Ancients() (uint64, error)
}
db.(freezer).Freeze(tt.freezeThreshold)

View File

@ -65,7 +65,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
}
os.RemoveAll(datadir)
db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "")
db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false)
if err != nil {
t.Fatalf("Failed to create persistent database: %v", err)
}
@ -261,7 +261,7 @@ func (snaptest *crashSnapshotTest) test(t *testing.T) {
db.Close()
// Start a new blockchain back up and see where the repair leads us
newdb, err := rawdb.NewLevelDBDatabaseWithFreezer(snaptest.datadir, 0, 0, snaptest.datadir, "")
newdb, err := rawdb.NewLevelDBDatabaseWithFreezer(snaptest.datadir, 0, 0, snaptest.datadir, "", false)
if err != nil {
t.Fatalf("Failed to reopen persistent database: %v", err)
}

View File

@ -651,7 +651,7 @@ func TestFastVsFullChains(t *testing.T) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "")
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
@ -725,7 +725,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(dir)
db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "")
db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "", false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
@ -1592,7 +1592,7 @@ func TestBlockchainRecovery(t *testing.T) {
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "")
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
@ -1649,7 +1649,7 @@ func TestIncompleteAncientReceiptChainInsertion(t *testing.T) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "")
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
@ -1848,7 +1848,7 @@ func testInsertKnownChainData(t *testing.T, typ string) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(dir)
chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "")
chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "", false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
@ -2128,7 +2128,7 @@ func TestTransactionIndices(t *testing.T) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "")
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
@ -2156,7 +2156,7 @@ func TestTransactionIndices(t *testing.T) {
// Init block chain with external ancients, check all needed indices has been indexed.
limit := []uint64{0, 32, 64, 128}
for _, l := range limit {
ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "")
ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
@ -2176,7 +2176,7 @@ func TestTransactionIndices(t *testing.T) {
}
// Reconstruct a block chain which only reserves HEAD-64 tx indices
ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "")
ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
@ -2255,7 +2255,7 @@ func TestSkipStaleTxIndicesInFastSync(t *testing.T) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "")
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}

View File

@ -825,3 +825,29 @@ func FindCommonAncestor(db ethdb.Reader, a, b *types.Header) *types.Header {
}
return a
}
// ReadHeadHeader returns the current canonical head header.
func ReadHeadHeader(db ethdb.Reader) *types.Header {
headHeaderHash := ReadHeadHeaderHash(db)
if headHeaderHash == (common.Hash{}) {
return nil
}
headHeaderNumber := ReadHeaderNumber(db, headHeaderHash)
if headHeaderNumber == nil {
return nil
}
return ReadHeader(db, headHeaderHash, *headHeaderNumber)
}
// ReadHeadHeader returns the current canonical head block.
func ReadHeadBlock(db ethdb.Reader) *types.Block {
headBlockHash := ReadHeadBlockHash(db)
if headBlockHash == (common.Hash{}) {
return nil
}
headBlockNumber := ReadHeaderNumber(db, headBlockHash)
if headBlockNumber == nil {
return nil
}
return ReadBlock(db, headBlockHash, *headBlockNumber)
}

View File

@ -440,7 +440,7 @@ func TestAncientStorage(t *testing.T) {
}
defer os.Remove(frdir)
db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "")
db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false)
if err != nil {
t.Fatalf("failed to create database with ancient backend")
}

View File

@ -57,7 +57,10 @@ func (frdb *freezerdb) Close() error {
// Freeze is a helper method used for external testing to trigger and block until
// a freeze cycle completes, without having to sleep for a minute to trigger the
// automatic background run.
func (frdb *freezerdb) Freeze(threshold uint64) {
func (frdb *freezerdb) Freeze(threshold uint64) error {
if frdb.AncientStore.(*freezer).readonly {
return errReadOnly
}
// Set the freezer threshold to a temporary value
defer func(old uint64) {
atomic.StoreUint64(&frdb.AncientStore.(*freezer).threshold, old)
@ -68,6 +71,7 @@ func (frdb *freezerdb) Freeze(threshold uint64) {
trigger := make(chan struct{}, 1)
frdb.AncientStore.(*freezer).trigger <- trigger
<-trigger
return nil
}
// nofreezedb is a database wrapper that disables freezer data retrievals.
@ -121,9 +125,9 @@ func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {
// NewDatabaseWithFreezer creates a high level database on top of a given key-
// value data store with a freezer moving immutable chain segments into cold
// storage.
func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string) (ethdb.Database, error) {
func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string, readonly bool) (ethdb.Database, error) {
// Create the idle freezer instance
frdb, err := newFreezer(freezer, namespace)
frdb, err := newFreezer(freezer, namespace, readonly)
if err != nil {
return nil, err
}
@ -192,8 +196,9 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st
}
}
// Freezer is consistent with the key-value database, permit combining the two
go frdb.freeze(db)
if !frdb.readonly {
go frdb.freeze(db)
}
return &freezerdb{
KeyValueStore: db,
AncientStore: frdb,
@ -215,8 +220,8 @@ func NewMemoryDatabaseWithCap(size int) ethdb.Database {
// NewLevelDBDatabase creates a persistent key-value database without a freezer
// moving immutable chain segments into cold storage.
func NewLevelDBDatabase(file string, cache int, handles int, namespace string) (ethdb.Database, error) {
db, err := leveldb.New(file, cache, handles, namespace)
func NewLevelDBDatabase(file string, cache int, handles int, namespace string, readonly bool) (ethdb.Database, error) {
db, err := leveldb.New(file, cache, handles, namespace, readonly)
if err != nil {
return nil, err
}
@ -225,12 +230,12 @@ func NewLevelDBDatabase(file string, cache int, handles int, namespace string) (
// NewLevelDBDatabaseWithFreezer creates a persistent key-value database with a
// freezer moving immutable chain segments into cold storage.
func NewLevelDBDatabaseWithFreezer(file string, cache int, handles int, freezer string, namespace string) (ethdb.Database, error) {
kvdb, err := leveldb.New(file, cache, handles, namespace)
func NewLevelDBDatabaseWithFreezer(file string, cache int, handles int, freezer string, namespace string, readonly bool) (ethdb.Database, error) {
kvdb, err := leveldb.New(file, cache, handles, namespace, readonly)
if err != nil {
return nil, err
}
frdb, err := NewDatabaseWithFreezer(kvdb, freezer, namespace)
frdb, err := NewDatabaseWithFreezer(kvdb, freezer, namespace, readonly)
if err != nil {
kvdb.Close()
return nil, err

View File

@ -35,6 +35,10 @@ import (
)
var (
// errReadOnly is returned if the freezer is opened in read only mode. All the
// mutations are disallowed.
errReadOnly = errors.New("read only")
// errUnknownTable is returned if the user attempts to read from a table that is
// not tracked by the freezer.
errUnknownTable = errors.New("unknown table")
@ -73,6 +77,7 @@ type freezer struct {
frozen uint64 // Number of blocks already frozen
threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)
readonly bool
tables map[string]*freezerTable // Data tables for storing everything
instanceLock fileutil.Releaser // File-system lock to prevent double opens
@ -84,7 +89,7 @@ type freezer struct {
// newFreezer creates a chain freezer that moves ancient chain data into
// append-only flat file containers.
func newFreezer(datadir string, namespace string) (*freezer, error) {
func newFreezer(datadir string, namespace string, readonly bool) (*freezer, error) {
// Create the initial freezer object
var (
readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
@ -106,6 +111,7 @@ func newFreezer(datadir string, namespace string) (*freezer, error) {
}
// Open all the supported data tables
freezer := &freezer{
readonly: readonly,
threshold: params.FullImmutabilityThreshold,
tables: make(map[string]*freezerTable),
instanceLock: lock,
@ -130,7 +136,7 @@ func newFreezer(datadir string, namespace string) (*freezer, error) {
lock.Release()
return nil, err
}
log.Info("Opened ancient database", "database", datadir)
log.Info("Opened ancient database", "database", datadir, "readonly", readonly)
return freezer, nil
}
@ -138,7 +144,7 @@ func newFreezer(datadir string, namespace string) (*freezer, error) {
func (f *freezer) Close() error {
var errs []error
f.closeOnce.Do(func() {
f.quit <- struct{}{}
close(f.quit)
for _, table := range f.tables {
if err := table.Close(); err != nil {
errs = append(errs, err)
@ -191,6 +197,9 @@ func (f *freezer) AncientSize(kind string) (uint64, error) {
// injection will be rejected. But if two injections with same number happen at
// the same time, we can get into the trouble.
func (f *freezer) AppendAncient(number uint64, hash, header, body, receipts, td []byte) (err error) {
if f.readonly {
return errReadOnly
}
// Ensure the binary blobs we are appending is continuous with freezer.
if atomic.LoadUint64(&f.frozen) != number {
return errOutOrderInsertion
@ -233,6 +242,9 @@ func (f *freezer) AppendAncient(number uint64, hash, header, body, receipts, td
// TruncateAncients discards any recent data above the provided threshold number.
func (f *freezer) TruncateAncients(items uint64) error {
if f.readonly {
return errReadOnly
}
if atomic.LoadUint64(&f.frozen) <= items {
return nil
}

View File

@ -85,8 +85,12 @@ type Pruner struct {
}
// NewPruner creates the pruner instance.
func NewPruner(db ethdb.Database, headHeader *types.Header, datadir, trieCachePath string, bloomSize uint64) (*Pruner, error) {
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, headHeader.Root, false, false, false)
func NewPruner(db ethdb.Database, datadir, trieCachePath string, bloomSize uint64) (*Pruner, error) {
headBlock := rawdb.ReadHeadBlock(db)
if headBlock == nil {
return nil, errors.New("Failed to load head block")
}
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, headBlock.Root(), false, false, false)
if err != nil {
return nil, err // The relevant snapshot(s) might not exist
}
@ -104,7 +108,7 @@ func NewPruner(db ethdb.Database, headHeader *types.Header, datadir, trieCachePa
stateBloom: stateBloom,
datadir: datadir,
trieCachePath: trieCachePath,
headHeader: headHeader,
headHeader: headBlock.Header(),
snaptree: snaptree,
}, nil
}
@ -350,9 +354,9 @@ func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) err
if stateBloomPath == "" {
return nil // nothing to recover
}
headHeader, err := getHeadHeader(db)
if err != nil {
return err
headBlock := rawdb.ReadHeadBlock(db)
if headBlock == nil {
return errors.New("Failed to load head block")
}
// Initialize the snapshot tree in recovery mode to handle this special case:
// - Users run the `prune-state` command multiple times
@ -362,7 +366,7 @@ func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) err
// - The state HEAD is rewound already because of multiple incomplete `prune-state`
// In this case, even the state HEAD is not exactly matched with snapshot, it
// still feasible to recover the pruning correctly.
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, headHeader.Root, false, false, true)
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, headBlock.Root(), false, false, true)
if err != nil {
return err // The relevant snapshot(s) might not exist
}
@ -382,7 +386,7 @@ func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) err
// otherwise the dangling state will be left.
var (
found bool
layers = snaptree.Snapshots(headHeader.Root, 128, true)
layers = snaptree.Snapshots(headBlock.Root(), 128, true)
middleRoots = make(map[common.Hash]struct{})
)
for _, layer := range layers {
@ -506,22 +510,6 @@ func findBloomFilter(datadir string) (string, common.Hash, error) {
return stateBloomPath, stateBloomRoot, nil
}
func getHeadHeader(db ethdb.Database) (*types.Header, error) {
headHeaderHash := rawdb.ReadHeadBlockHash(db)
if headHeaderHash == (common.Hash{}) {
return nil, errors.New("empty head block hash")
}
headHeaderNumber := rawdb.ReadHeaderNumber(db, headHeaderHash)
if headHeaderNumber == nil {
return nil, errors.New("empty head block number")
}
headHeader := rawdb.ReadHeader(db, headHeaderHash, *headHeaderNumber)
if headHeader == nil {
return nil, errors.New("empty head header")
}
return headHeader, nil
}
const warningLog = `
WARNING!

View File

@ -524,7 +524,7 @@ func TestDiskSeek(t *testing.T) {
t.Fatal(err)
} else {
defer os.RemoveAll(dir)
diskdb, err := leveldb.New(dir, 256, 0, "")
diskdb, err := leveldb.New(dir, 256, 0, "", false)
if err != nil {
t.Fatal(err)
}