core, eth: split the db blocks into headers and bodies
This commit is contained in:
108
eth/backend.go
108
eth/backend.go
@ -18,6 +18,7 @@
|
||||
package eth
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/ecdsa"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@ -269,11 +270,7 @@ func New(config *Config) (*Ethereum, error) {
|
||||
newdb = func(path string) (common.Database, error) { return ethdb.NewLDBDatabase(path, config.DatabaseCache) }
|
||||
}
|
||||
|
||||
// attempt to merge database together, upgrading from an old version
|
||||
if err := mergeDatabases(config.DataDir, newdb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Open the chain database and perform any upgrades needed
|
||||
chainDb, err := newdb(filepath.Join(config.DataDir, "chaindata"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("blockchain db err: %v", err)
|
||||
@ -281,6 +278,10 @@ func New(config *Config) (*Ethereum, error) {
|
||||
if db, ok := chainDb.(*ethdb.LDBDatabase); ok {
|
||||
db.Meter("eth/db/chaindata/")
|
||||
}
|
||||
if err := upgradeChainDatabase(chainDb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dappDb, err := newdb(filepath.Join(config.DataDir, "dapp"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("dapp db err: %v", err)
|
||||
@ -721,74 +722,55 @@ func saveBlockchainVersion(db common.Database, bcVersion int) {
|
||||
}
|
||||
}
|
||||
|
||||
// mergeDatabases when required merge old database layout to one single database
|
||||
func mergeDatabases(datadir string, newdb func(path string) (common.Database, error)) error {
|
||||
// Check if already upgraded
|
||||
data := filepath.Join(datadir, "chaindata")
|
||||
if _, err := os.Stat(data); !os.IsNotExist(err) {
|
||||
// upgradeChainDatabase ensures that the chain database stores block split into
|
||||
// separate header and body entries.
|
||||
func upgradeChainDatabase(db common.Database) error {
|
||||
// Short circuit if the head block is stored already as separate header and body
|
||||
data, err := db.Get([]byte("LastBlock"))
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
// make sure it's not just a clean path
|
||||
chainPath := filepath.Join(datadir, "blockchain")
|
||||
if _, err := os.Stat(chainPath); os.IsNotExist(err) {
|
||||
head := common.BytesToHash(data)
|
||||
|
||||
if block := core.GetBlockByHashOld(db, head); block == nil {
|
||||
return nil
|
||||
}
|
||||
glog.Infoln("Database upgrade required. Upgrading...")
|
||||
// At least some of the database is still the old format, upgrade (skip the head block!)
|
||||
glog.V(logger.Info).Info("Old database detected, upgrading...")
|
||||
|
||||
database, err := newdb(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating data db err: %v", err)
|
||||
}
|
||||
defer database.Close()
|
||||
if db, ok := db.(*ethdb.LDBDatabase); ok {
|
||||
blockPrefix := []byte("block-hash-")
|
||||
for it := db.NewIterator(); it.Next(); {
|
||||
// Skip anything other than a combined block
|
||||
if !bytes.HasPrefix(it.Key(), blockPrefix) {
|
||||
continue
|
||||
}
|
||||
// Skip the head block (merge last to signal upgrade completion)
|
||||
if bytes.HasSuffix(it.Key(), head.Bytes()) {
|
||||
continue
|
||||
}
|
||||
// Load the block, split and serialize (order!)
|
||||
block := core.GetBlockByHashOld(db, common.BytesToHash(bytes.TrimPrefix(it.Key(), blockPrefix)))
|
||||
|
||||
// Migrate blocks
|
||||
chainDb, err := newdb(chainPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("state db err: %v", err)
|
||||
}
|
||||
defer chainDb.Close()
|
||||
|
||||
if chain, ok := chainDb.(*ethdb.LDBDatabase); ok {
|
||||
glog.Infoln("Merging blockchain database...")
|
||||
it := chain.NewIterator()
|
||||
for it.Next() {
|
||||
database.Put(it.Key(), it.Value())
|
||||
if err := core.WriteBody(db, block); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := core.WriteHeader(db, block.Header()); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := db.Delete(it.Key()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
it.Release()
|
||||
}
|
||||
// Lastly, upgrade the head block, disabling the upgrade mechanism
|
||||
current := core.GetBlockByHashOld(db, head)
|
||||
|
||||
// Migrate state
|
||||
stateDb, err := newdb(filepath.Join(datadir, "state"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("state db err: %v", err)
|
||||
}
|
||||
defer stateDb.Close()
|
||||
|
||||
if state, ok := stateDb.(*ethdb.LDBDatabase); ok {
|
||||
glog.Infoln("Merging state database...")
|
||||
it := state.NewIterator()
|
||||
for it.Next() {
|
||||
database.Put(it.Key(), it.Value())
|
||||
if err := core.WriteBody(db, current); err != nil {
|
||||
return err
|
||||
}
|
||||
it.Release()
|
||||
}
|
||||
|
||||
// Migrate transaction / receipts
|
||||
extraDb, err := newdb(filepath.Join(datadir, "extra"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("state db err: %v", err)
|
||||
}
|
||||
defer extraDb.Close()
|
||||
|
||||
if extra, ok := extraDb.(*ethdb.LDBDatabase); ok {
|
||||
glog.Infoln("Merging transaction database...")
|
||||
|
||||
it := extra.NewIterator()
|
||||
for it.Next() {
|
||||
database.Put(it.Key(), it.Value())
|
||||
if err := core.WriteHeader(db, current.Header()); err != nil {
|
||||
return err
|
||||
}
|
||||
it.Release()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -345,33 +345,33 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
||||
if err := msg.Decode(&query); err != nil {
|
||||
return errResp(ErrDecode, "%v: %v", msg, err)
|
||||
}
|
||||
// Gather blocks until the fetch or network limits is reached
|
||||
// Gather headers until the fetch or network limits is reached
|
||||
var (
|
||||
bytes common.StorageSize
|
||||
headers []*types.Header
|
||||
unknown bool
|
||||
)
|
||||
for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
|
||||
// Retrieve the next block satisfying the query
|
||||
var origin *types.Block
|
||||
// Retrieve the next header satisfying the query
|
||||
var origin *types.Header
|
||||
if query.Origin.Hash != (common.Hash{}) {
|
||||
origin = pm.chainman.GetBlock(query.Origin.Hash)
|
||||
origin = pm.chainman.GetHeader(query.Origin.Hash)
|
||||
} else {
|
||||
origin = pm.chainman.GetBlockByNumber(query.Origin.Number)
|
||||
origin = pm.chainman.GetHeaderByNumber(query.Origin.Number)
|
||||
}
|
||||
if origin == nil {
|
||||
break
|
||||
}
|
||||
headers = append(headers, origin.Header())
|
||||
bytes += origin.Size()
|
||||
headers = append(headers, origin)
|
||||
bytes += 500 // Approximate, should be good enough estimate
|
||||
|
||||
// Advance to the next block of the query
|
||||
// Advance to the next header of the query
|
||||
switch {
|
||||
case query.Origin.Hash != (common.Hash{}) && query.Reverse:
|
||||
// Hash based traversal towards the genesis block
|
||||
for i := 0; i < int(query.Skip)+1; i++ {
|
||||
if block := pm.chainman.GetBlock(query.Origin.Hash); block != nil {
|
||||
query.Origin.Hash = block.ParentHash()
|
||||
if header := pm.chainman.GetHeader(query.Origin.Hash); header != nil {
|
||||
query.Origin.Hash = header.ParentHash
|
||||
} else {
|
||||
unknown = true
|
||||
break
|
||||
@ -379,9 +379,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
||||
}
|
||||
case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
|
||||
// Hash based traversal towards the leaf block
|
||||
if block := pm.chainman.GetBlockByNumber(origin.NumberU64() + query.Skip + 1); block != nil {
|
||||
if pm.chainman.GetBlockHashesFromHash(block.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
|
||||
query.Origin.Hash = block.Hash()
|
||||
if header := pm.chainman.GetHeaderByNumber(origin.Number.Uint64() + query.Skip + 1); header != nil {
|
||||
if pm.chainman.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
|
||||
query.Origin.Hash = header.Hash()
|
||||
} else {
|
||||
unknown = true
|
||||
}
|
||||
@ -452,23 +452,24 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
||||
// Gather blocks until the fetch or network limits is reached
|
||||
var (
|
||||
hash common.Hash
|
||||
bytes common.StorageSize
|
||||
bodies []*blockBody
|
||||
bytes int
|
||||
bodies []*blockBodyRLP
|
||||
)
|
||||
for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
|
||||
//Retrieve the hash of the next block
|
||||
// Retrieve the hash of the next block
|
||||
if err := msgStream.Decode(&hash); err == rlp.EOL {
|
||||
break
|
||||
} else if err != nil {
|
||||
return errResp(ErrDecode, "msg %v: %v", msg, err)
|
||||
}
|
||||
// Retrieve the requested block, stopping if enough was found
|
||||
if block := pm.chainman.GetBlock(hash); block != nil {
|
||||
bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()})
|
||||
bytes += block.Size()
|
||||
// Retrieve the requested block body, stopping if enough was found
|
||||
if data := pm.chainman.GetBodyRLP(hash); len(data) != 0 {
|
||||
body := blockBodyRLP(data)
|
||||
bodies = append(bodies, &body)
|
||||
bytes += len(body)
|
||||
}
|
||||
}
|
||||
return p.SendBlockBodies(bodies)
|
||||
return p.SendBlockBodiesRLP(bodies)
|
||||
|
||||
case p.version >= eth63 && msg.Code == GetNodeDataMsg:
|
||||
// Decode the retrieval message
|
||||
|
@ -184,6 +184,12 @@ func (p *peer) SendBlockBodies(bodies []*blockBody) error {
|
||||
return p2p.Send(p.rw, BlockBodiesMsg, blockBodiesData(bodies))
|
||||
}
|
||||
|
||||
// SendBlockBodiesRLP sends a batch of block contents to the remote peer from
|
||||
// an already RLP encoded format.
|
||||
func (p *peer) SendBlockBodiesRLP(bodies []*blockBodyRLP) error {
|
||||
return p2p.Send(p.rw, BlockBodiesMsg, blockBodiesRLPData(bodies))
|
||||
}
|
||||
|
||||
// SendNodeData sends a batch of arbitrary internal data, corresponding to the
|
||||
// hashes requested.
|
||||
func (p *peer) SendNodeData(data [][]byte) error {
|
||||
|
@ -213,6 +213,22 @@ type blockBody struct {
|
||||
// blockBodiesData is the network packet for block content distribution.
|
||||
type blockBodiesData []*blockBody
|
||||
|
||||
// blockBodyRLP represents the RLP encoded data content of a single block.
|
||||
type blockBodyRLP []byte
|
||||
|
||||
// EncodeRLP is a specialized encoder for a block body to pass the already
|
||||
// encoded body RLPs from the database on, without double encoding.
|
||||
func (b *blockBodyRLP) EncodeRLP(w io.Writer) error {
|
||||
if _, err := w.Write([]byte(*b)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// blockBodiesRLPData is the network packet for block content distribution
|
||||
// based on original RLP formatting (i.e. skip the db-decode/proto-encode).
|
||||
type blockBodiesRLPData []*blockBodyRLP
|
||||
|
||||
// nodeDataData is the network response packet for a node data retrieval.
|
||||
type nodeDataData []struct {
|
||||
Value []byte
|
||||
|
Reference in New Issue
Block a user