core: remove redundant storage of transactions and receipts (#14801)

* core: remove redundant storage of transactions and receipts

* core, eth, internal: new transaction schema usage polishes

* eth: implement upgrade mechanism for db deduplication

* core, eth: drop old sequential key db upgrader

* eth: close last iterator on successful db upgrage

* core: prefix the lookup entries to make their purpose clearer
This commit is contained in:
Péter Szilágyi
2017-07-14 19:39:53 +03:00
committed by GitHub
parent 8d6a5a3581
commit 0ff35e170d
12 changed files with 285 additions and 639 deletions

View File

@ -59,8 +59,8 @@ type LesServer interface {
type Ethereum struct {
chainConfig *params.ChainConfig
// Channel for shutting down the service
shutdownChan chan bool // Channel for shutting down the ethereum
stopDbUpgrade func() // stop chain db sequential key upgrade
shutdownChan chan bool // Channel for shutting down the ethereum
stopDbUpgrade func() error // stop chain db sequential key upgrade
// Handlers
txPool *core.TxPool
blockchain *core.BlockChain
@ -103,7 +103,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
if err != nil {
return nil, err
}
stopDbUpgrade := upgradeSequentialKeys(chainDb)
stopDbUpgrade := upgradeDeduplicateData(chainDb)
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
return nil, genesisErr

View File

@ -33,24 +33,15 @@ func TestMipmapUpgrade(t *testing.T) {
genesis := new(core.Genesis).MustCommit(db)
chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {
var receipts types.Receipts
switch i {
case 1:
receipt := types.NewReceipt(nil, new(big.Int))
receipt.Logs = []*types.Log{{Address: addr}}
gen.AddUncheckedReceipt(receipt)
receipts = types.Receipts{receipt}
case 2:
receipt := types.NewReceipt(nil, new(big.Int))
receipt.Logs = []*types.Log{{Address: addr}}
gen.AddUncheckedReceipt(receipt)
receipts = types.Receipts{receipt}
}
// store the receipts
err := core.WriteReceipts(db, receipts)
if err != nil {
t.Fatal(err)
}
})
for i, block := range chain {

View File

@ -19,237 +19,120 @@ package eth
import (
"bytes"
"encoding/binary"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
var useSequentialKeys = []byte("dbUpgrade_20160530sequentialKeys")
var deduplicateData = []byte("dbUpgrade_20170714deduplicateData")
// upgradeSequentialKeys checks the chain database version and
// upgradeDeduplicateData checks the chain database version and
// starts a background process to make upgrades if necessary.
// Returns a stop function that blocks until the process has
// been safely stopped.
func upgradeSequentialKeys(db ethdb.Database) (stopFn func()) {
data, _ := db.Get(useSequentialKeys)
func upgradeDeduplicateData(db ethdb.Database) func() error {
// If the database is already converted or empty, bail out
data, _ := db.Get(deduplicateData)
if len(data) > 0 && data[0] == 42 {
return nil // already converted
}
if data, _ := db.Get([]byte("LastHeader")); len(data) == 0 {
db.Put(useSequentialKeys, []byte{42})
return nil // empty database, nothing to do
}
log.Warn("Upgrading chain database to use sequential keys")
stopChn := make(chan struct{})
stoppedChn := make(chan struct{})
go func() {
stopFn := func() bool {
select {
case <-time.After(time.Microsecond * 100): // make sure other processes don't get starved
case <-stopChn:
return true
}
return false
}
err, stopped := upgradeSequentialCanonicalNumbers(db, stopFn)
if err == nil && !stopped {
err, stopped = upgradeSequentialBlocks(db, stopFn)
}
if err == nil && !stopped {
err, stopped = upgradeSequentialOrphanedReceipts(db, stopFn)
}
if err == nil && !stopped {
log.Info("Database conversion successful")
db.Put(useSequentialKeys, []byte{42})
}
if err != nil {
log.Error("Database conversion failed", "err", err)
}
close(stoppedChn)
}()
return func() {
close(stopChn)
<-stoppedChn
}
}
// upgradeSequentialCanonicalNumbers reads all old format canonical numbers from
// the database, writes them in new format and deletes the old ones if successful.
func upgradeSequentialCanonicalNumbers(db ethdb.Database, stopFn func() bool) (error, bool) {
prefix := []byte("block-num-")
it := db.(*ethdb.LDBDatabase).NewIterator()
defer func() {
it.Release()
}()
it.Seek(prefix)
cnt := 0
for bytes.HasPrefix(it.Key(), prefix) {
keyPtr := it.Key()
if len(keyPtr) < 20 {
cnt++
if cnt%100000 == 0 {
it.Release()
it = db.(*ethdb.LDBDatabase).NewIterator()
it.Seek(keyPtr)
log.Info("Converting canonical numbers", "count", cnt)
}
number := big.NewInt(0).SetBytes(keyPtr[10:]).Uint64()
newKey := []byte("h12345678n")
binary.BigEndian.PutUint64(newKey[1:9], number)
if err := db.Put(newKey, it.Value()); err != nil {
return err, false
}
if err := db.Delete(keyPtr); err != nil {
return err, false
}
}
if stopFn() {
return nil, true
}
it.Next()
}
if cnt > 0 {
log.Info("converted canonical numbers", "count", cnt)
}
return nil, false
}
// upgradeSequentialBlocks reads all old format block headers, bodies, TDs and block
// receipts from the database, writes them in new format and deletes the old ones
// if successful.
func upgradeSequentialBlocks(db ethdb.Database, stopFn func() bool) (error, bool) {
prefix := []byte("block-")
it := db.(*ethdb.LDBDatabase).NewIterator()
defer func() {
it.Release()
}()
it.Seek(prefix)
cnt := 0
for bytes.HasPrefix(it.Key(), prefix) {
keyPtr := it.Key()
if len(keyPtr) >= 38 {
cnt++
if cnt%10000 == 0 {
it.Release()
it = db.(*ethdb.LDBDatabase).NewIterator()
it.Seek(keyPtr)
log.Info("Converting blocks", "count", cnt)
}
// convert header, body, td and block receipts
var keyPrefix [38]byte
copy(keyPrefix[:], keyPtr[0:38])
hash := keyPrefix[6:38]
if err := upgradeSequentialBlockData(db, hash); err != nil {
return err, false
}
// delete old db entries belonging to this hash
for bytes.HasPrefix(it.Key(), keyPrefix[:]) {
if err := db.Delete(it.Key()); err != nil {
return err, false
}
it.Next()
}
if err := db.Delete(append([]byte("receipts-block-"), hash...)); err != nil {
return err, false
}
} else {
it.Next()
}
if stopFn() {
return nil, true
}
}
if cnt > 0 {
log.Info("Converted blocks", "count", cnt)
}
return nil, false
}
// upgradeSequentialOrphanedReceipts removes any old format block receipts from the
// database that did not have a corresponding block
func upgradeSequentialOrphanedReceipts(db ethdb.Database, stopFn func() bool) (error, bool) {
prefix := []byte("receipts-block-")
it := db.(*ethdb.LDBDatabase).NewIterator()
defer it.Release()
it.Seek(prefix)
cnt := 0
for bytes.HasPrefix(it.Key(), prefix) {
// phase 2 already converted receipts belonging to existing
// blocks, just remove if there's anything left
cnt++
if err := db.Delete(it.Key()); err != nil {
return err, false
}
if stopFn() {
return nil, true
}
it.Next()
}
if cnt > 0 {
log.Info("Removed orphaned block receipts", "count", cnt)
}
return nil, false
}
// upgradeSequentialBlockData upgrades the header, body, td and block receipts
// database entries belonging to a single hash (doesn't delete old data).
func upgradeSequentialBlockData(db ethdb.Database, hash []byte) error {
// get old chain data and block number
headerRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-header")...))
if len(headerRLP) == 0 {
return nil
}
header := new(types.Header)
if err := rlp.Decode(bytes.NewReader(headerRLP), header); err != nil {
return err
if data, _ := db.Get([]byte("LastHeader")); len(data) == 0 {
db.Put(deduplicateData, []byte{42})
return nil
}
number := header.Number.Uint64()
bodyRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-body")...))
tdRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-td")...))
receiptsRLP, _ := db.Get(append([]byte("receipts-block-"), hash...))
// store new hash -> number association
encNum := make([]byte, 8)
binary.BigEndian.PutUint64(encNum, number)
if err := db.Put(append([]byte("H"), hash...), encNum); err != nil {
return err
}
// store new chain data
if err := db.Put(append(append([]byte("h"), encNum...), hash...), headerRLP); err != nil {
return err
}
if len(tdRLP) != 0 {
if err := db.Put(append(append(append([]byte("h"), encNum...), hash...), []byte("t")...), tdRLP); err != nil {
return err
// Start the deduplication upgrade on a new goroutine
log.Warn("Upgrading database to use lookup entries")
stop := make(chan chan error)
go func() {
// Create an iterator to read the entire database and covert old lookup entires
it := db.(*ethdb.LDBDatabase).NewIterator()
defer func() {
if it != nil {
it.Release()
}
}()
var (
converted uint64
failed error
)
for failed == nil && it.Next() {
// Skip any entries that don't look like old transaction meta entires (<hash>0x01)
key := it.Key()
if len(key) != common.HashLength+1 || key[common.HashLength] != 0x01 {
continue
}
// Skip any entries that don't contain metadata (name clash between <hash>0x01 and <some-prefix><hash>)
var meta struct {
BlockHash common.Hash
BlockIndex uint64
Index uint64
}
if err := rlp.DecodeBytes(it.Value(), &meta); err != nil {
continue
}
// Skip any already upgraded entries (clash due to <hash> ending with 0x01 (old suffix))
hash := key[:common.HashLength]
if hash[0] == byte('l') {
// Potential clash, the "old" `hash` must point to a live transaction.
if tx, _, _, _ := core.GetTransaction(db, common.BytesToHash(hash)); tx == nil || !bytes.Equal(tx.Hash().Bytes(), hash) {
continue
}
}
// Convert the old metadata to a new lookup entry, delete duplicate data
if failed = db.Put(append([]byte("l"), hash...), it.Value()); failed == nil { // Write the new looku entry
if failed = db.Delete(hash); failed == nil { // Delete the duplicate transaction data
if failed = db.Delete(append([]byte("receipts-"), hash...)); failed == nil { // Delete the duplicate receipt data
if failed = db.Delete(key); failed != nil { // Delete the old transaction metadata
break
}
}
}
}
// Bump the conversion counter, and recreate the iterator occasionally to
// avoid too high memory consumption.
converted++
if converted%100000 == 0 {
it.Release()
it = db.(*ethdb.LDBDatabase).NewIterator()
it.Seek(key)
log.Info("Deduplicating database entries", "deduped", converted)
}
// Check for termination, or continue after a bit of a timeout
select {
case errc := <-stop:
errc <- nil
return
case <-time.After(time.Microsecond * 100):
}
}
}
if len(bodyRLP) != 0 {
if err := db.Put(append(append([]byte("b"), encNum...), hash...), bodyRLP); err != nil {
return err
// Upgrade finished, mark a such and terminate
if failed == nil {
log.Info("Database deduplication successful", "deduped", converted)
db.Put(deduplicateData, []byte{42})
} else {
log.Error("Database deduplication failed", "deduped", converted, "err", failed)
}
it.Release()
it = nil
errc := <-stop
errc <- failed
}()
// Assembly the cancellation callback
return func() error {
errc := make(chan error)
stop <- errc
return <-errc
}
if len(receiptsRLP) != 0 {
if err := db.Put(append(append([]byte("r"), encNum...), hash...), receiptsRLP); err != nil {
return err
}
}
return nil
}
func addMipmapBloomBins(db ethdb.Database) (err error) {

View File

@ -82,12 +82,6 @@ func BenchmarkMipmaps(b *testing.B) {
gen.AddUncheckedReceipt(receipt)
}
// store the receipts
err := core.WriteReceipts(db, receipts)
if err != nil {
b.Fatal(err)
}
core.WriteMipmapBloom(db, uint64(i+1), receipts)
})
for i, block := range chain {
@ -183,12 +177,6 @@ func TestFilters(t *testing.T) {
gen.AddUncheckedReceipt(receipt)
receipts = types.Receipts{receipt}
}
// store the receipts
err := core.WriteReceipts(db, receipts)
if err != nil {
t.Fatal(err)
}
// i is used as block number for the writes but since the i
// starts at 0 and block 0 (genesis) is already present increment
// by one