cmd/geth: implement data import and export (#22931)
This PR offers two more database sub commands for exporting and importing data. Two exporters are implemented: preimage and snapshot data respectively. The import command is generic, it can take any data export and import into leveldb. The data format has a 'magic' for disambiguation, and a version field for future compatibility.
This commit is contained in:
212
cmd/utils/cmd.go
212
cmd/utils/cmd.go
@ -18,7 +18,9 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"compress/gzip"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
@ -270,6 +272,7 @@ func ExportAppendChain(blockchain *core.BlockChain, fn string, first uint64, las
|
||||
}
|
||||
|
||||
// ImportPreimages imports a batch of exported hash preimages into the database.
|
||||
// It's a part of the deprecated functionality, should be removed in the future.
|
||||
func ImportPreimages(db ethdb.Database, fn string) error {
|
||||
log.Info("Importing preimages", "file", fn)
|
||||
|
||||
@ -280,7 +283,7 @@ func ImportPreimages(db ethdb.Database, fn string) error {
|
||||
}
|
||||
defer fh.Close()
|
||||
|
||||
var reader io.Reader = fh
|
||||
var reader io.Reader = bufio.NewReader(fh)
|
||||
if strings.HasSuffix(fn, ".gz") {
|
||||
if reader, err = gzip.NewReader(reader); err != nil {
|
||||
return err
|
||||
@ -288,7 +291,7 @@ func ImportPreimages(db ethdb.Database, fn string) error {
|
||||
}
|
||||
stream := rlp.NewStream(reader, 0)
|
||||
|
||||
// Import the preimages in batches to prevent disk trashing
|
||||
// Import the preimages in batches to prevent disk thrashing
|
||||
preimages := make(map[common.Hash][]byte)
|
||||
|
||||
for {
|
||||
@ -317,6 +320,7 @@ func ImportPreimages(db ethdb.Database, fn string) error {
|
||||
|
||||
// ExportPreimages exports all known hash preimages into the specified file,
|
||||
// truncating any data already present in the file.
|
||||
// It's a part of the deprecated functionality, should be removed in the future.
|
||||
func ExportPreimages(db ethdb.Database, fn string) error {
|
||||
log.Info("Exporting preimages", "file", fn)
|
||||
|
||||
@ -344,3 +348,207 @@ func ExportPreimages(db ethdb.Database, fn string) error {
|
||||
log.Info("Exported preimages", "file", fn)
|
||||
return nil
|
||||
}
|
||||
|
||||
// exportHeader is used in the export/import flow. When we do an export,
|
||||
// the first element we output is the exportHeader.
|
||||
// Whenever a backwards-incompatible change is made, the Version header
|
||||
// should be bumped.
|
||||
// If the importer sees a higher version, it should reject the import.
|
||||
type exportHeader struct {
|
||||
Magic string // Always set to 'gethdbdump' for disambiguation
|
||||
Version uint64
|
||||
Kind string
|
||||
UnixTime uint64
|
||||
}
|
||||
|
||||
const exportMagic = "gethdbdump"
|
||||
const (
|
||||
OpBatchAdd = 0
|
||||
OpBatchDel = 1
|
||||
)
|
||||
|
||||
// ImportLDBData imports a batch of snapshot data into the database
|
||||
func ImportLDBData(db ethdb.Database, f string, startIndex int64, interrupt chan struct{}) error {
|
||||
log.Info("Importing leveldb data", "file", f)
|
||||
|
||||
// Open the file handle and potentially unwrap the gzip stream
|
||||
fh, err := os.Open(f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer fh.Close()
|
||||
|
||||
var reader io.Reader = bufio.NewReader(fh)
|
||||
if strings.HasSuffix(f, ".gz") {
|
||||
if reader, err = gzip.NewReader(reader); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
stream := rlp.NewStream(reader, 0)
|
||||
|
||||
// Read the header
|
||||
var header exportHeader
|
||||
if err := stream.Decode(&header); err != nil {
|
||||
return fmt.Errorf("could not decode header: %v", err)
|
||||
}
|
||||
if header.Magic != exportMagic {
|
||||
return errors.New("incompatible data, wrong magic")
|
||||
}
|
||||
if header.Version != 0 {
|
||||
return fmt.Errorf("incompatible version %d, (support only 0)", header.Version)
|
||||
}
|
||||
log.Info("Importing data", "file", f, "type", header.Kind, "data age",
|
||||
common.PrettyDuration(time.Since(time.Unix(int64(header.UnixTime), 0))))
|
||||
|
||||
// Import the snapshot in batches to prevent disk thrashing
|
||||
var (
|
||||
count int64
|
||||
start = time.Now()
|
||||
logged = time.Now()
|
||||
batch = db.NewBatch()
|
||||
)
|
||||
for {
|
||||
// Read the next entry
|
||||
var (
|
||||
op byte
|
||||
key, val []byte
|
||||
)
|
||||
if err := stream.Decode(&op); err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return err
|
||||
}
|
||||
if err := stream.Decode(&key); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := stream.Decode(&val); err != nil {
|
||||
return err
|
||||
}
|
||||
if count < startIndex {
|
||||
count++
|
||||
continue
|
||||
}
|
||||
switch op {
|
||||
case OpBatchDel:
|
||||
batch.Delete(key)
|
||||
case OpBatchAdd:
|
||||
batch.Put(key, val)
|
||||
default:
|
||||
return fmt.Errorf("unknown op %d\n", op)
|
||||
}
|
||||
if batch.ValueSize() > ethdb.IdealBatchSize {
|
||||
if err := batch.Write(); err != nil {
|
||||
return err
|
||||
}
|
||||
batch.Reset()
|
||||
}
|
||||
// Check interruption emitted by ctrl+c
|
||||
if count%1000 == 0 {
|
||||
select {
|
||||
case <-interrupt:
|
||||
if err := batch.Write(); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Info("External data import interrupted", "file", f, "count", count, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
}
|
||||
if count%1000 == 0 && time.Since(logged) > 8*time.Second {
|
||||
log.Info("Importing external data", "file", f, "count", count, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
logged = time.Now()
|
||||
}
|
||||
count += 1
|
||||
}
|
||||
// Flush the last batch snapshot data
|
||||
if batch.ValueSize() > 0 {
|
||||
if err := batch.Write(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
log.Info("Imported chain data", "file", f, "count", count,
|
||||
"elapsed", common.PrettyDuration(time.Since(start)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// ChainDataIterator is an interface wraps all necessary functions to iterate
|
||||
// the exporting chain data.
|
||||
type ChainDataIterator interface {
|
||||
// Next returns the key-value pair for next exporting entry in the iterator.
|
||||
// When the end is reached, it will return (0, nil, nil, false).
|
||||
Next() (byte, []byte, []byte, bool)
|
||||
|
||||
// Release releases associated resources. Release should always succeed and can
|
||||
// be called multiple times without causing error.
|
||||
Release()
|
||||
}
|
||||
|
||||
// ExportChaindata exports the given data type (truncating any data already present)
|
||||
// in the file. If the suffix is 'gz', gzip compression is used.
|
||||
func ExportChaindata(fn string, kind string, iter ChainDataIterator, interrupt chan struct{}) error {
|
||||
log.Info("Exporting chain data", "file", fn, "kind", kind)
|
||||
defer iter.Release()
|
||||
|
||||
// Open the file handle and potentially wrap with a gzip stream
|
||||
fh, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.ModePerm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer fh.Close()
|
||||
|
||||
var writer io.Writer = fh
|
||||
if strings.HasSuffix(fn, ".gz") {
|
||||
writer = gzip.NewWriter(writer)
|
||||
defer writer.(*gzip.Writer).Close()
|
||||
}
|
||||
// Write the header
|
||||
if err := rlp.Encode(writer, &exportHeader{
|
||||
Magic: exportMagic,
|
||||
Version: 0,
|
||||
Kind: kind,
|
||||
UnixTime: uint64(time.Now().Unix()),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
// Extract data from source iterator and dump them out to file
|
||||
var (
|
||||
count int64
|
||||
start = time.Now()
|
||||
logged = time.Now()
|
||||
)
|
||||
for {
|
||||
op, key, val, ok := iter.Next()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if err := rlp.Encode(writer, op); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := rlp.Encode(writer, key); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := rlp.Encode(writer, val); err != nil {
|
||||
return err
|
||||
}
|
||||
if count%1000 == 0 {
|
||||
// Check interruption emitted by ctrl+c
|
||||
select {
|
||||
case <-interrupt:
|
||||
log.Info("Chain data exporting interrupted", "file", fn,
|
||||
"kind", kind, "count", count, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
if time.Since(logged) > 8*time.Second {
|
||||
log.Info("Exporting chain data", "file", fn, "kind", kind,
|
||||
"count", count, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
logged = time.Now()
|
||||
}
|
||||
}
|
||||
count++
|
||||
}
|
||||
log.Info("Exported chain data", "file", fn, "kind", kind, "count", count,
|
||||
"elapsed", common.PrettyDuration(time.Since(start)))
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user