all: integrate the freezer with fast sync

* all: freezer style syncing

core, eth, les, light: clean up freezer relative APIs

core, eth, les, trie, ethdb, light: clean a bit

core, eth, les, light: add unit tests

core, light: rewrite setHead function

core, eth: fix downloader unit tests

core: add receipt chain insertion test

core: use constant instead of hardcoding table name

core: fix rollback

core: fix setHead

core/rawdb: remove canonical block first and then iterate side chain

core/rawdb, ethdb: add hasAncient interface

eth/downloader: calculate ancient limit via cht first

core, eth, ethdb: lots of fixes

* eth/downloader: print ancient disable log only for fast sync
This commit is contained in:
gary rong
2019-04-25 22:59:48 +08:00
committed by Péter Szilágyi
parent b6cac42e9f
commit 80469bea0c
26 changed files with 1076 additions and 326 deletions

View File

@ -129,6 +129,7 @@ type Downloader struct {
synchronising int32
notified int32
committed int32
ancientLimit uint64 // The maximum block number which can be regarded as ancient data.
// Channels
headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
@ -206,7 +207,7 @@ type BlockChain interface {
InsertChain(types.Blocks) (int, error)
// InsertReceiptChain inserts a batch of receipts into the local chain.
InsertReceiptChain(types.Blocks, []types.Receipts) (int, error)
InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error)
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
@ -475,12 +476,49 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
if d.mode == FastSync && pivot != 0 {
d.committed = 0
}
if d.mode == FastSync {
// Set the ancient data limitation.
// If we are running fast sync, all block data not greater than ancientLimit will
// be written to the ancient store. Otherwise, block data will be written to active
// database and then wait freezer to migrate.
//
// If there is checkpoint available, then calculate the ancientLimit through
// checkpoint. Otherwise calculate the ancient limit through the advertised
// height by remote peer.
//
// The reason for picking checkpoint first is: there exists an attack vector
// for height that: a malicious peer can give us a fake(very high) height,
// so that the ancient limit is also very high. And then the peer start to
// feed us valid blocks until head. All of these blocks might be written into
// the ancient store, the safe region for freezer is not enough.
if d.checkpoint != 0 && d.checkpoint > MaxForkAncestry+1 {
d.ancientLimit = height - MaxForkAncestry - 1
} else if height > MaxForkAncestry+1 {
d.ancientLimit = height - MaxForkAncestry - 1
}
frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.
// If a part of blockchain data has already been written into active store,
// disable the ancient style insertion explicitly.
if origin >= frozen && frozen != 0 {
d.ancientLimit = 0
log.Info("Disabling direct-ancient mode", "origin", origin, "ancient", frozen-1)
} else if d.ancientLimit > 0 {
log.Debug("Enabling direct-ancient mode", "ancient", d.ancientLimit)
}
// Rewind the ancient store and blockchain if reorg happens.
if origin+1 < frozen {
var hashes []common.Hash
for i := origin + 1; i < d.lightchain.CurrentHeader().Number.Uint64(); i++ {
hashes = append(hashes, rawdb.ReadCanonicalHash(d.stateDB, i))
}
d.lightchain.Rollback(hashes)
}
}
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(origin+1, d.mode)
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
}
fetchers := []func() error{
func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
@ -544,6 +582,9 @@ func (d *Downloader) cancel() {
func (d *Downloader) Cancel() {
d.cancel()
d.cancelWg.Wait()
d.ancientLimit = 0
log.Debug("Reset ancient limit to zero")
}
// Terminate interrupts the downloader, canceling all pending operations.
@ -1315,7 +1356,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
// queue until the stream ends or a failure occurs.
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
// Keep a count of uncertain headers to roll back
rollback := []*types.Header{}
var rollback []*types.Header
defer func() {
if len(rollback) > 0 {
// Flatten the headers and roll them back
@ -1409,11 +1450,10 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
limit = len(headers)
}
chunk := headers[:limit]
// In case of header only syncing, validate the chunk immediately
if d.mode == FastSync || d.mode == LightSync {
// Collect the yet unknown headers to mark them as uncertain
unknown := make([]*types.Header, 0, len(headers))
unknown := make([]*types.Header, 0, len(chunk))
for _, header := range chunk {
if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) {
unknown = append(unknown, header)
@ -1663,7 +1703,7 @@ func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *state
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
receipts[i] = result.Receipts
}
if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil {
if index, err := d.blockchain.InsertReceiptChain(blocks, receipts, d.ancientLimit); err != nil {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
return errInvalidChain
}
@ -1675,7 +1715,7 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash())
// Commit the pivot block as the new head, will require full sync from here on
if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}); err != nil {
if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}, d.ancientLimit); err != nil {
return err
}
if err := d.blockchain.FastSyncCommitHead(block.Hash()); err != nil {

View File

@ -57,6 +57,11 @@ type downloadTester struct {
ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester
ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain
ancientHeaders map[common.Hash]*types.Header // Ancient headers belonging to the tester
ancientBlocks map[common.Hash]*types.Block // Ancient blocks belonging to the tester
ancientReceipts map[common.Hash]types.Receipts // Ancient receipts belonging to the tester
ancientChainTd map[common.Hash]*big.Int // Ancient total difficulties of the blocks in the local chain
lock sync.RWMutex
}
@ -71,6 +76,12 @@ func newTester() *downloadTester {
ownBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis},
ownReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil},
ownChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()},
// Initialize ancient store with test genesis block
ancientHeaders: map[common.Hash]*types.Header{testGenesis.Hash(): testGenesis.Header()},
ancientBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis},
ancientReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil},
ancientChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()},
}
tester.stateDb = rawdb.NewMemoryDatabase()
tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00})
@ -122,6 +133,9 @@ func (dl *downloadTester) HasFastBlock(hash common.Hash, number uint64) bool {
dl.lock.RLock()
defer dl.lock.RUnlock()
if _, ok := dl.ancientReceipts[hash]; ok {
return true
}
_, ok := dl.ownReceipts[hash]
return ok
}
@ -131,6 +145,10 @@ func (dl *downloadTester) GetHeaderByHash(hash common.Hash) *types.Header {
dl.lock.RLock()
defer dl.lock.RUnlock()
header := dl.ancientHeaders[hash]
if header != nil {
return header
}
return dl.ownHeaders[hash]
}
@ -139,6 +157,10 @@ func (dl *downloadTester) GetBlockByHash(hash common.Hash) *types.Block {
dl.lock.RLock()
defer dl.lock.RUnlock()
block := dl.ancientBlocks[hash]
if block != nil {
return block
}
return dl.ownBlocks[hash]
}
@ -148,6 +170,9 @@ func (dl *downloadTester) CurrentHeader() *types.Header {
defer dl.lock.RUnlock()
for i := len(dl.ownHashes) - 1; i >= 0; i-- {
if header := dl.ancientHeaders[dl.ownHashes[i]]; header != nil {
return header
}
if header := dl.ownHeaders[dl.ownHashes[i]]; header != nil {
return header
}
@ -161,6 +186,12 @@ func (dl *downloadTester) CurrentBlock() *types.Block {
defer dl.lock.RUnlock()
for i := len(dl.ownHashes) - 1; i >= 0; i-- {
if block := dl.ancientBlocks[dl.ownHashes[i]]; block != nil {
if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil {
return block
}
return block
}
if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil {
if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil {
return block
@ -176,6 +207,9 @@ func (dl *downloadTester) CurrentFastBlock() *types.Block {
defer dl.lock.RUnlock()
for i := len(dl.ownHashes) - 1; i >= 0; i-- {
if block := dl.ancientBlocks[dl.ownHashes[i]]; block != nil {
return block
}
if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil {
return block
}
@ -198,6 +232,9 @@ func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int {
dl.lock.RLock()
defer dl.lock.RUnlock()
if td := dl.ancientChainTd[hash]; td != nil {
return td
}
return dl.ownChainTd[hash]
}
@ -254,7 +291,7 @@ func (dl *downloadTester) InsertChain(blocks types.Blocks) (i int, err error) {
}
// InsertReceiptChain injects a new batch of receipts into the simulated chain.
func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts) (i int, err error) {
func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts, ancientLimit uint64) (i int, err error) {
dl.lock.Lock()
defer dl.lock.Unlock()
@ -262,11 +299,25 @@ func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []typ
if _, ok := dl.ownHeaders[blocks[i].Hash()]; !ok {
return i, errors.New("unknown owner")
}
if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok {
return i, errors.New("unknown parent")
if _, ok := dl.ancientBlocks[blocks[i].ParentHash()]; !ok {
if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok {
return i, errors.New("unknown parent")
}
}
if blocks[i].NumberU64() <= ancientLimit {
dl.ancientBlocks[blocks[i].Hash()] = blocks[i]
dl.ancientReceipts[blocks[i].Hash()] = receipts[i]
// Migrate from active db to ancient db
dl.ancientHeaders[blocks[i].Hash()] = blocks[i].Header()
dl.ancientChainTd[blocks[i].Hash()] = new(big.Int).Add(dl.ancientChainTd[blocks[i].ParentHash()], blocks[i].Difficulty())
delete(dl.ownHeaders, blocks[i].Hash())
delete(dl.ownChainTd, blocks[i].Hash())
} else {
dl.ownBlocks[blocks[i].Hash()] = blocks[i]
dl.ownReceipts[blocks[i].Hash()] = receipts[i]
}
dl.ownBlocks[blocks[i].Hash()] = blocks[i]
dl.ownReceipts[blocks[i].Hash()] = receipts[i]
}
return len(blocks), nil
}
@ -284,6 +335,11 @@ func (dl *downloadTester) Rollback(hashes []common.Hash) {
delete(dl.ownHeaders, hashes[i])
delete(dl.ownReceipts, hashes[i])
delete(dl.ownBlocks, hashes[i])
delete(dl.ancientChainTd, hashes[i])
delete(dl.ancientHeaders, hashes[i])
delete(dl.ancientReceipts, hashes[i])
delete(dl.ancientBlocks, hashes[i])
}
}
@ -411,13 +467,13 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng
if tester.downloader.mode == LightSync {
blocks, receipts = 1, 1
}
if hs := len(tester.ownHeaders); hs != headers {
if hs := len(tester.ownHeaders) + len(tester.ancientHeaders) - 1; hs != headers {
t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers)
}
if bs := len(tester.ownBlocks); bs != blocks {
if bs := len(tester.ownBlocks) + len(tester.ancientBlocks) - 1; bs != blocks {
t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, blocks)
}
if rs := len(tester.ownReceipts); rs != receipts {
if rs := len(tester.ownReceipts) + len(tester.ancientReceipts) - 1; rs != receipts {
t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts)
}
}