les: duplicate downloader and fetcher to allow progressive refactoring

This commit is contained in:
Péter Szilágyi
2021-09-10 10:55:48 +03:00
parent 90987db733
commit 9e17648d8c
27 changed files with 8746 additions and 22 deletions

166
les/downloader/api.go Normal file
View File

@ -0,0 +1,166 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"context"
"sync"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
)
// PublicDownloaderAPI provides an API which gives information about the current synchronisation status.
// It offers only methods that operates on data that can be available to anyone without security risks.
type PublicDownloaderAPI struct {
d *Downloader
mux *event.TypeMux
installSyncSubscription chan chan interface{}
uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest
}
// NewPublicDownloaderAPI create a new PublicDownloaderAPI. The API has an internal event loop that
// listens for events from the downloader through the global event mux. In case it receives one of
// these events it broadcasts it to all syncing subscriptions that are installed through the
// installSyncSubscription channel.
func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI {
api := &PublicDownloaderAPI{
d: d,
mux: m,
installSyncSubscription: make(chan chan interface{}),
uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest),
}
go api.eventLoop()
return api
}
// eventLoop runs a loop until the event mux closes. It will install and uninstall new
// sync subscriptions and broadcasts sync status updates to the installed sync subscriptions.
func (api *PublicDownloaderAPI) eventLoop() {
var (
sub = api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
syncSubscriptions = make(map[chan interface{}]struct{})
)
for {
select {
case i := <-api.installSyncSubscription:
syncSubscriptions[i] = struct{}{}
case u := <-api.uninstallSyncSubscription:
delete(syncSubscriptions, u.c)
close(u.uninstalled)
case event := <-sub.Chan():
if event == nil {
return
}
var notification interface{}
switch event.Data.(type) {
case StartEvent:
notification = &SyncingResult{
Syncing: true,
Status: api.d.Progress(),
}
case DoneEvent, FailedEvent:
notification = false
}
// broadcast
for c := range syncSubscriptions {
c <- notification
}
}
}
}
// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
go func() {
statuses := make(chan interface{})
sub := api.SubscribeSyncStatus(statuses)
for {
select {
case status := <-statuses:
notifier.Notify(rpcSub.ID, status)
case <-rpcSub.Err():
sub.Unsubscribe()
return
case <-notifier.Closed():
sub.Unsubscribe()
return
}
}
}()
return rpcSub, nil
}
// SyncingResult provides information about the current synchronisation status for this node.
type SyncingResult struct {
Syncing bool `json:"syncing"`
Status ethereum.SyncProgress `json:"status"`
}
// uninstallSyncSubscriptionRequest uninstalles a syncing subscription in the API event loop.
type uninstallSyncSubscriptionRequest struct {
c chan interface{}
uninstalled chan interface{}
}
// SyncStatusSubscription represents a syncing subscription.
type SyncStatusSubscription struct {
api *PublicDownloaderAPI // register subscription in event loop of this api instance
c chan interface{} // channel where events are broadcasted to
unsubOnce sync.Once // make sure unsubscribe logic is executed once
}
// Unsubscribe uninstalls the subscription from the DownloadAPI event loop.
// The status channel that was passed to subscribeSyncStatus isn't used anymore
// after this method returns.
func (s *SyncStatusSubscription) Unsubscribe() {
s.unsubOnce.Do(func() {
req := uninstallSyncSubscriptionRequest{s.c, make(chan interface{})}
s.api.uninstallSyncSubscription <- &req
for {
select {
case <-s.c:
// drop new status events until uninstall confirmation
continue
case <-req.uninstalled:
return
}
}
})
}
// SubscribeSyncStatus creates a subscription that will broadcast new synchronisation updates.
// The given channel must receive interface values, the result can either
func (api *PublicDownloaderAPI) SubscribeSyncStatus(status chan interface{}) *SyncStatusSubscription {
api.installSyncSubscription <- status
return &SyncStatusSubscription{api: api, c: status}
}

2014
les/downloader/downloader.go Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

25
les/downloader/events.go Normal file
View File

@ -0,0 +1,25 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import "github.com/ethereum/go-ethereum/core/types"
type DoneEvent struct {
Latest *types.Header
}
type StartEvent struct{}
type FailedEvent struct{ Err error }

45
les/downloader/metrics.go Normal file
View File

@ -0,0 +1,45 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Contains the metrics collected by the downloader.
package downloader
import (
"github.com/ethereum/go-ethereum/metrics"
)
var (
headerInMeter = metrics.NewRegisteredMeter("eth/downloader/headers/in", nil)
headerReqTimer = metrics.NewRegisteredTimer("eth/downloader/headers/req", nil)
headerDropMeter = metrics.NewRegisteredMeter("eth/downloader/headers/drop", nil)
headerTimeoutMeter = metrics.NewRegisteredMeter("eth/downloader/headers/timeout", nil)
bodyInMeter = metrics.NewRegisteredMeter("eth/downloader/bodies/in", nil)
bodyReqTimer = metrics.NewRegisteredTimer("eth/downloader/bodies/req", nil)
bodyDropMeter = metrics.NewRegisteredMeter("eth/downloader/bodies/drop", nil)
bodyTimeoutMeter = metrics.NewRegisteredMeter("eth/downloader/bodies/timeout", nil)
receiptInMeter = metrics.NewRegisteredMeter("eth/downloader/receipts/in", nil)
receiptReqTimer = metrics.NewRegisteredTimer("eth/downloader/receipts/req", nil)
receiptDropMeter = metrics.NewRegisteredMeter("eth/downloader/receipts/drop", nil)
receiptTimeoutMeter = metrics.NewRegisteredMeter("eth/downloader/receipts/timeout", nil)
stateInMeter = metrics.NewRegisteredMeter("eth/downloader/states/in", nil)
stateDropMeter = metrics.NewRegisteredMeter("eth/downloader/states/drop", nil)
throttleCounter = metrics.NewRegisteredCounter("eth/downloader/throttle", nil)
)

81
les/downloader/modes.go Normal file
View File

@ -0,0 +1,81 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import "fmt"
// SyncMode represents the synchronisation mode of the downloader.
// It is a uint32 as it is used with atomic operations.
type SyncMode uint32
const (
FullSync SyncMode = iota // Synchronise the entire blockchain history from full blocks
FastSync // Quickly download the headers, full sync only at the chain
SnapSync // Download the chain and the state via compact snapshots
LightSync // Download only the headers and terminate afterwards
)
func (mode SyncMode) IsValid() bool {
return mode >= FullSync && mode <= LightSync
}
// String implements the stringer interface.
func (mode SyncMode) String() string {
switch mode {
case FullSync:
return "full"
case FastSync:
return "fast"
case SnapSync:
return "snap"
case LightSync:
return "light"
default:
return "unknown"
}
}
func (mode SyncMode) MarshalText() ([]byte, error) {
switch mode {
case FullSync:
return []byte("full"), nil
case FastSync:
return []byte("fast"), nil
case SnapSync:
return []byte("snap"), nil
case LightSync:
return []byte("light"), nil
default:
return nil, fmt.Errorf("unknown sync mode %d", mode)
}
}
func (mode *SyncMode) UnmarshalText(text []byte) error {
switch string(text) {
case "full":
*mode = FullSync
case "fast":
*mode = FastSync
case "snap":
*mode = SnapSync
case "light":
*mode = LightSync
default:
return fmt.Errorf(`unknown sync mode %q, want "full", "fast" or "light"`, text)
}
return nil
}

501
les/downloader/peer.go Normal file
View File

@ -0,0 +1,501 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Contains the active peer-set of the downloader, maintaining both failures
// as well as reputation metrics to prioritize the block retrievals.
package downloader
import (
"errors"
"math/big"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/msgrate"
)
const (
maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items
)
var (
errAlreadyFetching = errors.New("already fetching blocks from peer")
errAlreadyRegistered = errors.New("peer is already registered")
errNotRegistered = errors.New("peer is not registered")
)
// peerConnection represents an active peer from which hashes and blocks are retrieved.
type peerConnection struct {
id string // Unique identifier of the peer
headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1)
blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1)
headerStarted time.Time // Time instance when the last header fetch was started
blockStarted time.Time // Time instance when the last block (body) fetch was started
receiptStarted time.Time // Time instance when the last receipt fetch was started
stateStarted time.Time // Time instance when the last node data fetch was started
rates *msgrate.Tracker // Tracker to hone in on the number of items retrievable per second
lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
peer Peer
version uint // Eth protocol version number to switch strategies
log log.Logger // Contextual logger to add extra infos to peer logs
lock sync.RWMutex
}
// LightPeer encapsulates the methods required to synchronise with a remote light peer.
type LightPeer interface {
Head() (common.Hash, *big.Int)
RequestHeadersByHash(common.Hash, int, int, bool) error
RequestHeadersByNumber(uint64, int, int, bool) error
}
// Peer encapsulates the methods required to synchronise with a remote full peer.
type Peer interface {
LightPeer
RequestBodies([]common.Hash) error
RequestReceipts([]common.Hash) error
RequestNodeData([]common.Hash) error
}
// lightPeerWrapper wraps a LightPeer struct, stubbing out the Peer-only methods.
type lightPeerWrapper struct {
peer LightPeer
}
func (w *lightPeerWrapper) Head() (common.Hash, *big.Int) { return w.peer.Head() }
func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse bool) error {
return w.peer.RequestHeadersByHash(h, amount, skip, reverse)
}
func (w *lightPeerWrapper) RequestHeadersByNumber(i uint64, amount int, skip int, reverse bool) error {
return w.peer.RequestHeadersByNumber(i, amount, skip, reverse)
}
func (w *lightPeerWrapper) RequestBodies([]common.Hash) error {
panic("RequestBodies not supported in light client mode sync")
}
func (w *lightPeerWrapper) RequestReceipts([]common.Hash) error {
panic("RequestReceipts not supported in light client mode sync")
}
func (w *lightPeerWrapper) RequestNodeData([]common.Hash) error {
panic("RequestNodeData not supported in light client mode sync")
}
// newPeerConnection creates a new downloader peer.
func newPeerConnection(id string, version uint, peer Peer, logger log.Logger) *peerConnection {
return &peerConnection{
id: id,
lacking: make(map[common.Hash]struct{}),
peer: peer,
version: version,
log: logger,
}
}
// Reset clears the internal state of a peer entity.
func (p *peerConnection) Reset() {
p.lock.Lock()
defer p.lock.Unlock()
atomic.StoreInt32(&p.headerIdle, 0)
atomic.StoreInt32(&p.blockIdle, 0)
atomic.StoreInt32(&p.receiptIdle, 0)
atomic.StoreInt32(&p.stateIdle, 0)
p.lacking = make(map[common.Hash]struct{})
}
// FetchHeaders sends a header retrieval request to the remote peer.
func (p *peerConnection) FetchHeaders(from uint64, count int) error {
// Short circuit if the peer is already fetching
if !atomic.CompareAndSwapInt32(&p.headerIdle, 0, 1) {
return errAlreadyFetching
}
p.headerStarted = time.Now()
// Issue the header retrieval request (absolute upwards without gaps)
go p.peer.RequestHeadersByNumber(from, count, 0, false)
return nil
}
// FetchBodies sends a block body retrieval request to the remote peer.
func (p *peerConnection) FetchBodies(request *fetchRequest) error {
// Short circuit if the peer is already fetching
if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) {
return errAlreadyFetching
}
p.blockStarted = time.Now()
go func() {
// Convert the header set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Headers))
for _, header := range request.Headers {
hashes = append(hashes, header.Hash())
}
p.peer.RequestBodies(hashes)
}()
return nil
}
// FetchReceipts sends a receipt retrieval request to the remote peer.
func (p *peerConnection) FetchReceipts(request *fetchRequest) error {
// Short circuit if the peer is already fetching
if !atomic.CompareAndSwapInt32(&p.receiptIdle, 0, 1) {
return errAlreadyFetching
}
p.receiptStarted = time.Now()
go func() {
// Convert the header set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Headers))
for _, header := range request.Headers {
hashes = append(hashes, header.Hash())
}
p.peer.RequestReceipts(hashes)
}()
return nil
}
// FetchNodeData sends a node state data retrieval request to the remote peer.
func (p *peerConnection) FetchNodeData(hashes []common.Hash) error {
// Short circuit if the peer is already fetching
if !atomic.CompareAndSwapInt32(&p.stateIdle, 0, 1) {
return errAlreadyFetching
}
p.stateStarted = time.Now()
go p.peer.RequestNodeData(hashes)
return nil
}
// SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval
// requests. Its estimated header retrieval throughput is updated with that measured
// just now.
func (p *peerConnection) SetHeadersIdle(delivered int, deliveryTime time.Time) {
p.rates.Update(eth.BlockHeadersMsg, deliveryTime.Sub(p.headerStarted), delivered)
atomic.StoreInt32(&p.headerIdle, 0)
}
// SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval
// requests. Its estimated body retrieval throughput is updated with that measured
// just now.
func (p *peerConnection) SetBodiesIdle(delivered int, deliveryTime time.Time) {
p.rates.Update(eth.BlockBodiesMsg, deliveryTime.Sub(p.blockStarted), delivered)
atomic.StoreInt32(&p.blockIdle, 0)
}
// SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt
// retrieval requests. Its estimated receipt retrieval throughput is updated
// with that measured just now.
func (p *peerConnection) SetReceiptsIdle(delivered int, deliveryTime time.Time) {
p.rates.Update(eth.ReceiptsMsg, deliveryTime.Sub(p.receiptStarted), delivered)
atomic.StoreInt32(&p.receiptIdle, 0)
}
// SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie
// data retrieval requests. Its estimated state retrieval throughput is updated
// with that measured just now.
func (p *peerConnection) SetNodeDataIdle(delivered int, deliveryTime time.Time) {
p.rates.Update(eth.NodeDataMsg, deliveryTime.Sub(p.stateStarted), delivered)
atomic.StoreInt32(&p.stateIdle, 0)
}
// HeaderCapacity retrieves the peers header download allowance based on its
// previously discovered throughput.
func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int {
cap := p.rates.Capacity(eth.BlockHeadersMsg, targetRTT)
if cap > MaxHeaderFetch {
cap = MaxHeaderFetch
}
return cap
}
// BlockCapacity retrieves the peers block download allowance based on its
// previously discovered throughput.
func (p *peerConnection) BlockCapacity(targetRTT time.Duration) int {
cap := p.rates.Capacity(eth.BlockBodiesMsg, targetRTT)
if cap > MaxBlockFetch {
cap = MaxBlockFetch
}
return cap
}
// ReceiptCapacity retrieves the peers receipt download allowance based on its
// previously discovered throughput.
func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int {
cap := p.rates.Capacity(eth.ReceiptsMsg, targetRTT)
if cap > MaxReceiptFetch {
cap = MaxReceiptFetch
}
return cap
}
// NodeDataCapacity retrieves the peers state download allowance based on its
// previously discovered throughput.
func (p *peerConnection) NodeDataCapacity(targetRTT time.Duration) int {
cap := p.rates.Capacity(eth.NodeDataMsg, targetRTT)
if cap > MaxStateFetch {
cap = MaxStateFetch
}
return cap
}
// MarkLacking appends a new entity to the set of items (blocks, receipts, states)
// that a peer is known not to have (i.e. have been requested before). If the
// set reaches its maximum allowed capacity, items are randomly dropped off.
func (p *peerConnection) MarkLacking(hash common.Hash) {
p.lock.Lock()
defer p.lock.Unlock()
for len(p.lacking) >= maxLackingHashes {
for drop := range p.lacking {
delete(p.lacking, drop)
break
}
}
p.lacking[hash] = struct{}{}
}
// Lacks retrieves whether the hash of a blockchain item is on the peers lacking
// list (i.e. whether we know that the peer does not have it).
func (p *peerConnection) Lacks(hash common.Hash) bool {
p.lock.RLock()
defer p.lock.RUnlock()
_, ok := p.lacking[hash]
return ok
}
// peerSet represents the collection of active peer participating in the chain
// download procedure.
type peerSet struct {
peers map[string]*peerConnection
rates *msgrate.Trackers // Set of rate trackers to give the sync a common beat
newPeerFeed event.Feed
peerDropFeed event.Feed
lock sync.RWMutex
}
// newPeerSet creates a new peer set top track the active download sources.
func newPeerSet() *peerSet {
return &peerSet{
peers: make(map[string]*peerConnection),
rates: msgrate.NewTrackers(log.New("proto", "eth")),
}
}
// SubscribeNewPeers subscribes to peer arrival events.
func (ps *peerSet) SubscribeNewPeers(ch chan<- *peerConnection) event.Subscription {
return ps.newPeerFeed.Subscribe(ch)
}
// SubscribePeerDrops subscribes to peer departure events.
func (ps *peerSet) SubscribePeerDrops(ch chan<- *peerConnection) event.Subscription {
return ps.peerDropFeed.Subscribe(ch)
}
// Reset iterates over the current peer set, and resets each of the known peers
// to prepare for a next batch of block retrieval.
func (ps *peerSet) Reset() {
ps.lock.RLock()
defer ps.lock.RUnlock()
for _, peer := range ps.peers {
peer.Reset()
}
}
// Register injects a new peer into the working set, or returns an error if the
// peer is already known.
//
// The method also sets the starting throughput values of the new peer to the
// average of all existing peers, to give it a realistic chance of being used
// for data retrievals.
func (ps *peerSet) Register(p *peerConnection) error {
// Register the new peer with some meaningful defaults
ps.lock.Lock()
if _, ok := ps.peers[p.id]; ok {
ps.lock.Unlock()
return errAlreadyRegistered
}
p.rates = msgrate.NewTracker(ps.rates.MeanCapacities(), ps.rates.MedianRoundTrip())
if err := ps.rates.Track(p.id, p.rates); err != nil {
return err
}
ps.peers[p.id] = p
ps.lock.Unlock()
ps.newPeerFeed.Send(p)
return nil
}
// Unregister removes a remote peer from the active set, disabling any further
// actions to/from that particular entity.
func (ps *peerSet) Unregister(id string) error {
ps.lock.Lock()
p, ok := ps.peers[id]
if !ok {
ps.lock.Unlock()
return errNotRegistered
}
delete(ps.peers, id)
ps.rates.Untrack(id)
ps.lock.Unlock()
ps.peerDropFeed.Send(p)
return nil
}
// Peer retrieves the registered peer with the given id.
func (ps *peerSet) Peer(id string) *peerConnection {
ps.lock.RLock()
defer ps.lock.RUnlock()
return ps.peers[id]
}
// Len returns if the current number of peers in the set.
func (ps *peerSet) Len() int {
ps.lock.RLock()
defer ps.lock.RUnlock()
return len(ps.peers)
}
// AllPeers retrieves a flat list of all the peers within the set.
func (ps *peerSet) AllPeers() []*peerConnection {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peerConnection, 0, len(ps.peers))
for _, p := range ps.peers {
list = append(list, p)
}
return list
}
// HeaderIdlePeers retrieves a flat list of all the currently header-idle peers
// within the active peer set, ordered by their reputation.
func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.headerIdle) == 0
}
throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.BlockHeadersMsg, time.Second)
}
return ps.idlePeers(eth.ETH66, eth.ETH66, idle, throughput)
}
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
// the active peer set, ordered by their reputation.
func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.blockIdle) == 0
}
throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.BlockBodiesMsg, time.Second)
}
return ps.idlePeers(eth.ETH66, eth.ETH66, idle, throughput)
}
// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
// within the active peer set, ordered by their reputation.
func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.receiptIdle) == 0
}
throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.ReceiptsMsg, time.Second)
}
return ps.idlePeers(eth.ETH66, eth.ETH66, idle, throughput)
}
// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
// peers within the active peer set, ordered by their reputation.
func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.stateIdle) == 0
}
throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.NodeDataMsg, time.Second)
}
return ps.idlePeers(eth.ETH66, eth.ETH66, idle, throughput)
}
// idlePeers retrieves a flat list of all currently idle peers satisfying the
// protocol version constraints, using the provided function to check idleness.
// The resulting set of peers are sorted by their capacity.
func (ps *peerSet) idlePeers(minProtocol, maxProtocol uint, idleCheck func(*peerConnection) bool, capacity func(*peerConnection) int) ([]*peerConnection, int) {
ps.lock.RLock()
defer ps.lock.RUnlock()
var (
total = 0
idle = make([]*peerConnection, 0, len(ps.peers))
tps = make([]int, 0, len(ps.peers))
)
for _, p := range ps.peers {
if p.version >= minProtocol && p.version <= maxProtocol {
if idleCheck(p) {
idle = append(idle, p)
tps = append(tps, capacity(p))
}
total++
}
}
// And sort them
sortPeers := &peerCapacitySort{idle, tps}
sort.Sort(sortPeers)
return sortPeers.p, total
}
// peerCapacitySort implements sort.Interface.
// It sorts peer connections by capacity (descending).
type peerCapacitySort struct {
p []*peerConnection
tp []int
}
func (ps *peerCapacitySort) Len() int {
return len(ps.p)
}
func (ps *peerCapacitySort) Less(i, j int) bool {
return ps.tp[i] > ps.tp[j]
}
func (ps *peerCapacitySort) Swap(i, j int) {
ps.p[i], ps.p[j] = ps.p[j], ps.p[i]
ps.tp[i], ps.tp[j] = ps.tp[j], ps.tp[i]
}

913
les/downloader/queue.go Normal file
View File

@ -0,0 +1,913 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Contains the block download scheduler to collect download tasks and schedule
// them in an ordered, and throttled way.
package downloader
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/trie"
)
const (
bodyType = uint(0)
receiptType = uint(1)
)
var (
blockCacheMaxItems = 8192 // Maximum number of blocks to cache before throttling the download
blockCacheInitialItems = 2048 // Initial number of blocks to start fetching, before we know the sizes of the blocks
blockCacheMemory = 256 * 1024 * 1024 // Maximum amount of memory to use for block caching
blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones
)
var (
errNoFetchesPending = errors.New("no fetches pending")
errStaleDelivery = errors.New("stale delivery")
)
// fetchRequest is a currently running data retrieval operation.
type fetchRequest struct {
Peer *peerConnection // Peer to which the request was sent
From uint64 // [eth/62] Requested chain element index (used for skeleton fills only)
Headers []*types.Header // [eth/62] Requested headers, sorted by request order
Time time.Time // Time when the request was made
}
// fetchResult is a struct collecting partial results from data fetchers until
// all outstanding pieces complete and the result as a whole can be processed.
type fetchResult struct {
pending int32 // Flag telling what deliveries are outstanding
Header *types.Header
Uncles []*types.Header
Transactions types.Transactions
Receipts types.Receipts
}
func newFetchResult(header *types.Header, fastSync bool) *fetchResult {
item := &fetchResult{
Header: header,
}
if !header.EmptyBody() {
item.pending |= (1 << bodyType)
}
if fastSync && !header.EmptyReceipts() {
item.pending |= (1 << receiptType)
}
return item
}
// SetBodyDone flags the body as finished.
func (f *fetchResult) SetBodyDone() {
if v := atomic.LoadInt32(&f.pending); (v & (1 << bodyType)) != 0 {
atomic.AddInt32(&f.pending, -1)
}
}
// AllDone checks if item is done.
func (f *fetchResult) AllDone() bool {
return atomic.LoadInt32(&f.pending) == 0
}
// SetReceiptsDone flags the receipts as finished.
func (f *fetchResult) SetReceiptsDone() {
if v := atomic.LoadInt32(&f.pending); (v & (1 << receiptType)) != 0 {
atomic.AddInt32(&f.pending, -2)
}
}
// Done checks if the given type is done already
func (f *fetchResult) Done(kind uint) bool {
v := atomic.LoadInt32(&f.pending)
return v&(1<<kind) == 0
}
// queue represents hashes that are either need fetching or are being fetched
type queue struct {
mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
// Headers are "special", they download in batches, supported by a skeleton chain
headerHead common.Hash // Hash of the last queued header to verify order
headerTaskPool map[uint64]*types.Header // Pending header retrieval tasks, mapping starting indexes to skeleton headers
headerTaskQueue *prque.Prque // Priority queue of the skeleton indexes to fetch the filling headers for
headerPeerMiss map[string]map[uint64]struct{} // Set of per-peer header batches known to be unavailable
headerPendPool map[string]*fetchRequest // Currently pending header retrieval operations
headerResults []*types.Header // Result cache accumulating the completed headers
headerProced int // Number of headers already processed from the results
headerOffset uint64 // Number of the first header in the result cache
headerContCh chan bool // Channel to notify when header download finishes
// All data retrievals below are based on an already assembles header chain
blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers
blockTaskQueue *prque.Prque // Priority queue of the headers to fetch the blocks (bodies) for
blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations
receiptTaskPool map[common.Hash]*types.Header // Pending receipt retrieval tasks, mapping hashes to headers
receiptTaskQueue *prque.Prque // Priority queue of the headers to fetch the receipts for
receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations
resultCache *resultStore // Downloaded but not yet delivered fetch results
resultSize common.StorageSize // Approximate size of a block (exponential moving average)
lock *sync.RWMutex
active *sync.Cond
closed bool
lastStatLog time.Time
}
// newQueue creates a new download queue for scheduling block retrieval.
func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue {
lock := new(sync.RWMutex)
q := &queue{
headerContCh: make(chan bool),
blockTaskQueue: prque.New(nil),
receiptTaskQueue: prque.New(nil),
active: sync.NewCond(lock),
lock: lock,
}
q.Reset(blockCacheLimit, thresholdInitialSize)
return q
}
// Reset clears out the queue contents.
func (q *queue) Reset(blockCacheLimit int, thresholdInitialSize int) {
q.lock.Lock()
defer q.lock.Unlock()
q.closed = false
q.mode = FullSync
q.headerHead = common.Hash{}
q.headerPendPool = make(map[string]*fetchRequest)
q.blockTaskPool = make(map[common.Hash]*types.Header)
q.blockTaskQueue.Reset()
q.blockPendPool = make(map[string]*fetchRequest)
q.receiptTaskPool = make(map[common.Hash]*types.Header)
q.receiptTaskQueue.Reset()
q.receiptPendPool = make(map[string]*fetchRequest)
q.resultCache = newResultStore(blockCacheLimit)
q.resultCache.SetThrottleThreshold(uint64(thresholdInitialSize))
}
// Close marks the end of the sync, unblocking Results.
// It may be called even if the queue is already closed.
func (q *queue) Close() {
q.lock.Lock()
q.closed = true
q.active.Signal()
q.lock.Unlock()
}
// PendingHeaders retrieves the number of header requests pending for retrieval.
func (q *queue) PendingHeaders() int {
q.lock.Lock()
defer q.lock.Unlock()
return q.headerTaskQueue.Size()
}
// PendingBlocks retrieves the number of block (body) requests pending for retrieval.
func (q *queue) PendingBlocks() int {
q.lock.Lock()
defer q.lock.Unlock()
return q.blockTaskQueue.Size()
}
// PendingReceipts retrieves the number of block receipts pending for retrieval.
func (q *queue) PendingReceipts() int {
q.lock.Lock()
defer q.lock.Unlock()
return q.receiptTaskQueue.Size()
}
// InFlightHeaders retrieves whether there are header fetch requests currently
// in flight.
func (q *queue) InFlightHeaders() bool {
q.lock.Lock()
defer q.lock.Unlock()
return len(q.headerPendPool) > 0
}
// InFlightBlocks retrieves whether there are block fetch requests currently in
// flight.
func (q *queue) InFlightBlocks() bool {
q.lock.Lock()
defer q.lock.Unlock()
return len(q.blockPendPool) > 0
}
// InFlightReceipts retrieves whether there are receipt fetch requests currently
// in flight.
func (q *queue) InFlightReceipts() bool {
q.lock.Lock()
defer q.lock.Unlock()
return len(q.receiptPendPool) > 0
}
// Idle returns if the queue is fully idle or has some data still inside.
func (q *queue) Idle() bool {
q.lock.Lock()
defer q.lock.Unlock()
queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size()
pending := len(q.blockPendPool) + len(q.receiptPendPool)
return (queued + pending) == 0
}
// ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
// up an already retrieved header skeleton.
func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
q.lock.Lock()
defer q.lock.Unlock()
// No skeleton retrieval can be in progress, fail hard if so (huge implementation bug)
if q.headerResults != nil {
panic("skeleton assembly already in progress")
}
// Schedule all the header retrieval tasks for the skeleton assembly
q.headerTaskPool = make(map[uint64]*types.Header)
q.headerTaskQueue = prque.New(nil)
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
q.headerProced = 0
q.headerOffset = from
q.headerContCh = make(chan bool, 1)
for i, header := range skeleton {
index := from + uint64(i*MaxHeaderFetch)
q.headerTaskPool[index] = header
q.headerTaskQueue.Push(index, -int64(index))
}
}
// RetrieveHeaders retrieves the header chain assemble based on the scheduled
// skeleton.
func (q *queue) RetrieveHeaders() ([]*types.Header, int) {
q.lock.Lock()
defer q.lock.Unlock()
headers, proced := q.headerResults, q.headerProced
q.headerResults, q.headerProced = nil, 0
return headers, proced
}
// Schedule adds a set of headers for the download queue for scheduling, returning
// the new headers encountered.
func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
q.lock.Lock()
defer q.lock.Unlock()
// Insert all the headers prioritised by the contained block number
inserts := make([]*types.Header, 0, len(headers))
for _, header := range headers {
// Make sure chain order is honoured and preserved throughout
hash := header.Hash()
if header.Number == nil || header.Number.Uint64() != from {
log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", from)
break
}
if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash {
log.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash)
break
}
// Make sure no duplicate requests are executed
// We cannot skip this, even if the block is empty, since this is
// what triggers the fetchResult creation.
if _, ok := q.blockTaskPool[hash]; ok {
log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash)
} else {
q.blockTaskPool[hash] = header
q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
// Queue for receipt retrieval
if q.mode == FastSync && !header.EmptyReceipts() {
if _, ok := q.receiptTaskPool[hash]; ok {
log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash)
} else {
q.receiptTaskPool[hash] = header
q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
}
inserts = append(inserts, header)
q.headerHead = hash
from++
}
return inserts
}
// Results retrieves and permanently removes a batch of fetch results from
// the cache. the result slice will be empty if the queue has been closed.
// Results can be called concurrently with Deliver and Schedule,
// but assumes that there are not two simultaneous callers to Results
func (q *queue) Results(block bool) []*fetchResult {
// Abort early if there are no items and non-blocking requested
if !block && !q.resultCache.HasCompletedItems() {
return nil
}
closed := false
for !closed && !q.resultCache.HasCompletedItems() {
// In order to wait on 'active', we need to obtain the lock.
// That may take a while, if someone is delivering at the same
// time, so after obtaining the lock, we check again if there
// are any results to fetch.
// Also, in-between we ask for the lock and the lock is obtained,
// someone can have closed the queue. In that case, we should
// return the available results and stop blocking
q.lock.Lock()
if q.resultCache.HasCompletedItems() || q.closed {
q.lock.Unlock()
break
}
// No items available, and not closed
q.active.Wait()
closed = q.closed
q.lock.Unlock()
}
// Regardless if closed or not, we can still deliver whatever we have
results := q.resultCache.GetCompleted(maxResultsProcess)
for _, result := range results {
// Recalculate the result item weights to prevent memory exhaustion
size := result.Header.Size()
for _, uncle := range result.Uncles {
size += uncle.Size()
}
for _, receipt := range result.Receipts {
size += receipt.Size()
}
for _, tx := range result.Transactions {
size += tx.Size()
}
q.resultSize = common.StorageSize(blockCacheSizeWeight)*size +
(1-common.StorageSize(blockCacheSizeWeight))*q.resultSize
}
// Using the newly calibrated resultsize, figure out the new throttle limit
// on the result cache
throttleThreshold := uint64((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize)
throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold)
// Log some info at certain times
if time.Since(q.lastStatLog) > 60*time.Second {
q.lastStatLog = time.Now()
info := q.Stats()
info = append(info, "throttle", throttleThreshold)
log.Info("Downloader queue stats", info...)
}
return results
}
func (q *queue) Stats() []interface{} {
q.lock.RLock()
defer q.lock.RUnlock()
return q.stats()
}
func (q *queue) stats() []interface{} {
return []interface{}{
"receiptTasks", q.receiptTaskQueue.Size(),
"blockTasks", q.blockTaskQueue.Size(),
"itemSize", q.resultSize,
}
}
// ReserveHeaders reserves a set of headers for the given peer, skipping any
// previously failed batches.
func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the peer's already downloading something (sanity check to
// not corrupt state)
if _, ok := q.headerPendPool[p.id]; ok {
return nil
}
// Retrieve a batch of hashes, skipping previously failed ones
send, skip := uint64(0), []uint64{}
for send == 0 && !q.headerTaskQueue.Empty() {
from, _ := q.headerTaskQueue.Pop()
if q.headerPeerMiss[p.id] != nil {
if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok {
skip = append(skip, from.(uint64))
continue
}
}
send = from.(uint64)
}
// Merge all the skipped batches back
for _, from := range skip {
q.headerTaskQueue.Push(from, -int64(from))
}
// Assemble and return the block download request
if send == 0 {
return nil
}
request := &fetchRequest{
Peer: p,
From: send,
Time: time.Now(),
}
q.headerPendPool[p.id] = request
return request
}
// ReserveBodies reserves a set of body fetches for the given peer, skipping any
// previously failed downloads. Beside the next batch of needed fetches, it also
// returns a flag whether empty blocks were queued requiring processing.
func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, bool) {
q.lock.Lock()
defer q.lock.Unlock()
return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, bodyType)
}
// ReserveReceipts reserves a set of receipt fetches for the given peer, skipping
// any previously failed downloads. Beside the next batch of needed fetches, it
// also returns a flag whether empty receipts were queued requiring importing.
func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bool, bool) {
q.lock.Lock()
defer q.lock.Unlock()
return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, receiptType)
}
// reserveHeaders reserves a set of data download operations for a given peer,
// skipping any previously failed ones. This method is a generic version used
// by the individual special reservation functions.
//
// Note, this method expects the queue lock to be already held for writing. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
//
// Returns:
// item - the fetchRequest
// progress - whether any progress was made
// throttle - if the caller should throttle for a while
func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
pendPool map[string]*fetchRequest, kind uint) (*fetchRequest, bool, bool) {
// Short circuit if the pool has been depleted, or if the peer's already
// downloading something (sanity check not to corrupt state)
if taskQueue.Empty() {
return nil, false, true
}
if _, ok := pendPool[p.id]; ok {
return nil, false, false
}
// Retrieve a batch of tasks, skipping previously failed ones
send := make([]*types.Header, 0, count)
skip := make([]*types.Header, 0)
progress := false
throttled := false
for proc := 0; len(send) < count && !taskQueue.Empty(); proc++ {
// the task queue will pop items in order, so the highest prio block
// is also the lowest block number.
h, _ := taskQueue.Peek()
header := h.(*types.Header)
// we can ask the resultcache if this header is within the
// "prioritized" segment of blocks. If it is not, we need to throttle
stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == FastSync)
if stale {
// Don't put back in the task queue, this item has already been
// delivered upstream
taskQueue.PopItem()
progress = true
delete(taskPool, header.Hash())
proc = proc - 1
log.Error("Fetch reservation already delivered", "number", header.Number.Uint64())
continue
}
if throttle {
// There are no resultslots available. Leave it in the task queue
// However, if there are any left as 'skipped', we should not tell
// the caller to throttle, since we still want some other
// peer to fetch those for us
throttled = len(skip) == 0
break
}
if err != nil {
// this most definitely should _not_ happen
log.Warn("Failed to reserve headers", "err", err)
// There are no resultslots available. Leave it in the task queue
break
}
if item.Done(kind) {
// If it's a noop, we can skip this task
delete(taskPool, header.Hash())
taskQueue.PopItem()
proc = proc - 1
progress = true
continue
}
// Remove it from the task queue
taskQueue.PopItem()
// Otherwise unless the peer is known not to have the data, add to the retrieve list
if p.Lacks(header.Hash()) {
skip = append(skip, header)
} else {
send = append(send, header)
}
}
// Merge all the skipped headers back
for _, header := range skip {
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
if q.resultCache.HasCompletedItems() {
// Wake Results, resultCache was modified
q.active.Signal()
}
// Assemble and return the block download request
if len(send) == 0 {
return nil, progress, throttled
}
request := &fetchRequest{
Peer: p,
Headers: send,
Time: time.Now(),
}
pendPool[p.id] = request
return request, progress, throttled
}
// CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue.
func (q *queue) CancelHeaders(request *fetchRequest) {
q.lock.Lock()
defer q.lock.Unlock()
q.cancel(request, q.headerTaskQueue, q.headerPendPool)
}
// CancelBodies aborts a body fetch request, returning all pending headers to the
// task queue.
func (q *queue) CancelBodies(request *fetchRequest) {
q.lock.Lock()
defer q.lock.Unlock()
q.cancel(request, q.blockTaskQueue, q.blockPendPool)
}
// CancelReceipts aborts a body fetch request, returning all pending headers to
// the task queue.
func (q *queue) CancelReceipts(request *fetchRequest) {
q.lock.Lock()
defer q.lock.Unlock()
q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
}
// Cancel aborts a fetch request, returning all pending hashes to the task queue.
func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
if request.From > 0 {
taskQueue.Push(request.From, -int64(request.From))
}
for _, header := range request.Headers {
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(pendPool, request.Peer.id)
}
// Revoke cancels all pending requests belonging to a given peer. This method is
// meant to be called during a peer drop to quickly reassign owned data fetches
// to remaining nodes.
func (q *queue) Revoke(peerID string) {
q.lock.Lock()
defer q.lock.Unlock()
if request, ok := q.blockPendPool[peerID]; ok {
for _, header := range request.Headers {
q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(q.blockPendPool, peerID)
}
if request, ok := q.receiptPendPool[peerID]; ok {
for _, header := range request.Headers {
q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(q.receiptPendPool, peerID)
}
}
// ExpireHeaders checks for in flight requests that exceeded a timeout allowance,
// canceling them and returning the responsible peers for penalisation.
func (q *queue) ExpireHeaders(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
return q.expire(timeout, q.headerPendPool, q.headerTaskQueue, headerTimeoutMeter)
}
// ExpireBodies checks for in flight block body requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
return q.expire(timeout, q.blockPendPool, q.blockTaskQueue, bodyTimeoutMeter)
}
// ExpireReceipts checks for in flight receipt requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter)
}
// expire is the generic check that move expired tasks from a pending pool back
// into a task pool, returning all entities caught with expired tasks.
//
// Note, this method expects the queue lock to be already held. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
// Iterate over the expired requests and return each to the queue
expiries := make(map[string]int)
for id, request := range pendPool {
if time.Since(request.Time) > timeout {
// Update the metrics with the timeout
timeoutMeter.Mark(1)
// Return any non satisfied requests to the pool
if request.From > 0 {
taskQueue.Push(request.From, -int64(request.From))
}
for _, header := range request.Headers {
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
// Add the peer to the expiry report along the number of failed requests
expiries[id] = len(request.Headers)
// Remove the expired requests from the pending pool directly
delete(pendPool, id)
}
}
return expiries
}
// DeliverHeaders injects a header retrieval response into the header results
// cache. This method either accepts all headers it received, or none of them
// if they do not map correctly to the skeleton.
//
// If the headers are accepted, the method makes an attempt to deliver the set
// of ready headers to the processor to keep the pipeline full. However it will
// not block to prevent stalling other pending deliveries.
func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
var logger log.Logger
if len(id) < 16 {
// Tests use short IDs, don't choke on them
logger = log.New("peer", id)
} else {
logger = log.New("peer", id[:16])
}
// Short circuit if the data was never requested
request := q.headerPendPool[id]
if request == nil {
return 0, errNoFetchesPending
}
headerReqTimer.UpdateSince(request.Time)
delete(q.headerPendPool, id)
// Ensure headers can be mapped onto the skeleton chain
target := q.headerTaskPool[request.From].Hash()
accepted := len(headers) == MaxHeaderFetch
if accepted {
if headers[0].Number.Uint64() != request.From {
logger.Trace("First header broke chain ordering", "number", headers[0].Number, "hash", headers[0].Hash(), "expected", request.From)
accepted = false
} else if headers[len(headers)-1].Hash() != target {
logger.Trace("Last header broke skeleton structure ", "number", headers[len(headers)-1].Number, "hash", headers[len(headers)-1].Hash(), "expected", target)
accepted = false
}
}
if accepted {
parentHash := headers[0].Hash()
for i, header := range headers[1:] {
hash := header.Hash()
if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
logger.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", want)
accepted = false
break
}
if parentHash != header.ParentHash {
logger.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash)
accepted = false
break
}
// Set-up parent hash for next round
parentHash = hash
}
}
// If the batch of headers wasn't accepted, mark as unavailable
if !accepted {
logger.Trace("Skeleton filling not accepted", "from", request.From)
miss := q.headerPeerMiss[id]
if miss == nil {
q.headerPeerMiss[id] = make(map[uint64]struct{})
miss = q.headerPeerMiss[id]
}
miss[request.From] = struct{}{}
q.headerTaskQueue.Push(request.From, -int64(request.From))
return 0, errors.New("delivery not accepted")
}
// Clean up a successful fetch and try to deliver any sub-results
copy(q.headerResults[request.From-q.headerOffset:], headers)
delete(q.headerTaskPool, request.From)
ready := 0
for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil {
ready += MaxHeaderFetch
}
if ready > 0 {
// Headers are ready for delivery, gather them and push forward (non blocking)
process := make([]*types.Header, ready)
copy(process, q.headerResults[q.headerProced:q.headerProced+ready])
select {
case headerProcCh <- process:
logger.Trace("Pre-scheduled new headers", "count", len(process), "from", process[0].Number)
q.headerProced += len(process)
default:
}
}
// Check for termination and return
if len(q.headerTaskPool) == 0 {
q.headerContCh <- false
}
return len(headers), nil
}
// DeliverBodies injects a block body retrieval response into the results queue.
// The method returns the number of blocks bodies accepted from the delivery and
// also wakes any threads waiting for data delivery.
func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
trieHasher := trie.NewStackTrie(nil)
validate := func(index int, header *types.Header) error {
if types.DeriveSha(types.Transactions(txLists[index]), trieHasher) != header.TxHash {
return errInvalidBody
}
if types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
return errInvalidBody
}
return nil
}
reconstruct := func(index int, result *fetchResult) {
result.Transactions = txLists[index]
result.Uncles = uncleLists[index]
result.SetBodyDone()
}
return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool,
bodyReqTimer, len(txLists), validate, reconstruct)
}
// DeliverReceipts injects a receipt retrieval response into the results queue.
// The method returns the number of transaction receipts accepted from the delivery
// and also wakes any threads waiting for data delivery.
func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
trieHasher := trie.NewStackTrie(nil)
validate := func(index int, header *types.Header) error {
if types.DeriveSha(types.Receipts(receiptList[index]), trieHasher) != header.ReceiptHash {
return errInvalidReceipt
}
return nil
}
reconstruct := func(index int, result *fetchResult) {
result.Receipts = receiptList[index]
result.SetReceiptsDone()
}
return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool,
receiptReqTimer, len(receiptList), validate, reconstruct)
}
// deliver injects a data retrieval response into the results queue.
//
// Note, this method expects the queue lock to be already held for writing. The
// reason this lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
taskQueue *prque.Prque, pendPool map[string]*fetchRequest, reqTimer metrics.Timer,
results int, validate func(index int, header *types.Header) error,
reconstruct func(index int, result *fetchResult)) (int, error) {
// Short circuit if the data was never requested
request := pendPool[id]
if request == nil {
return 0, errNoFetchesPending
}
reqTimer.UpdateSince(request.Time)
delete(pendPool, id)
// If no data items were retrieved, mark them as unavailable for the origin peer
if results == 0 {
for _, header := range request.Headers {
request.Peer.MarkLacking(header.Hash())
}
}
// Assemble each of the results with their headers and retrieved data parts
var (
accepted int
failure error
i int
hashes []common.Hash
)
for _, header := range request.Headers {
// Short circuit assembly if no more fetch results are found
if i >= results {
break
}
// Validate the fields
if err := validate(i, header); err != nil {
failure = err
break
}
hashes = append(hashes, header.Hash())
i++
}
for _, header := range request.Headers[:i] {
if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil {
reconstruct(accepted, res)
} else {
// else: betweeen here and above, some other peer filled this result,
// or it was indeed a no-op. This should not happen, but if it does it's
// not something to panic about
log.Error("Delivery stale", "stale", stale, "number", header.Number.Uint64(), "err", err)
failure = errStaleDelivery
}
// Clean up a successful fetch
delete(taskPool, hashes[accepted])
accepted++
}
// Return all failed or missing fetches to the queue
for _, header := range request.Headers[accepted:] {
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
// Wake up Results
if accepted > 0 {
q.active.Signal()
}
if failure == nil {
return accepted, nil
}
// If none of the data was good, it's a stale delivery
if accepted > 0 {
return accepted, fmt.Errorf("partial failure: %v", failure)
}
return accepted, fmt.Errorf("%w: %v", failure, errStaleDelivery)
}
// Prepare configures the result cache to allow accepting and caching inbound
// fetch results.
func (q *queue) Prepare(offset uint64, mode SyncMode) {
q.lock.Lock()
defer q.lock.Unlock()
// Prepare the queue for sync results
q.resultCache.Prepare(offset)
q.mode = mode
}

View File

@ -0,0 +1,452 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"fmt"
"math/big"
"math/rand"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
)
var (
testdb = rawdb.NewMemoryDatabase()
genesis = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000000000))
)
// makeChain creates a chain of n blocks starting at and including parent.
// the returned hash chain is ordered head->parent. In addition, every 3rd block
// contains a transaction and every 5th an uncle to allow testing correct block
// reassembly.
func makeChain(n int, seed byte, parent *types.Block, empty bool) ([]*types.Block, []types.Receipts) {
blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testdb, n, func(i int, block *core.BlockGen) {
block.SetCoinbase(common.Address{seed})
// Add one tx to every secondblock
if !empty && i%2 == 0 {
signer := types.MakeSigner(params.TestChainConfig, block.Number())
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, block.BaseFee(), nil), signer, testKey)
if err != nil {
panic(err)
}
block.AddTx(tx)
}
})
return blocks, receipts
}
type chainData struct {
blocks []*types.Block
offset int
}
var chain *chainData
var emptyChain *chainData
func init() {
// Create a chain of blocks to import
targetBlocks := 128
blocks, _ := makeChain(targetBlocks, 0, genesis, false)
chain = &chainData{blocks, 0}
blocks, _ = makeChain(targetBlocks, 0, genesis, true)
emptyChain = &chainData{blocks, 0}
}
func (chain *chainData) headers() []*types.Header {
hdrs := make([]*types.Header, len(chain.blocks))
for i, b := range chain.blocks {
hdrs[i] = b.Header()
}
return hdrs
}
func (chain *chainData) Len() int {
return len(chain.blocks)
}
func dummyPeer(id string) *peerConnection {
p := &peerConnection{
id: id,
lacking: make(map[common.Hash]struct{}),
}
return p
}
func TestBasics(t *testing.T) {
numOfBlocks := len(emptyChain.blocks)
numOfReceipts := len(emptyChain.blocks) / 2
q := newQueue(10, 10)
if !q.Idle() {
t.Errorf("new queue should be idle")
}
q.Prepare(1, FastSync)
if res := q.Results(false); len(res) != 0 {
t.Fatal("new queue should have 0 results")
}
// Schedule a batch of headers
q.Schedule(chain.headers(), 1)
if q.Idle() {
t.Errorf("queue should not be idle")
}
if got, exp := q.PendingBlocks(), chain.Len(); got != exp {
t.Errorf("wrong pending block count, got %d, exp %d", got, exp)
}
// Only non-empty receipts get added to task-queue
if got, exp := q.PendingReceipts(), 64; got != exp {
t.Errorf("wrong pending receipt count, got %d, exp %d", got, exp)
}
// Items are now queued for downloading, next step is that we tell the
// queue that a certain peer will deliver them for us
{
peer := dummyPeer("peer-1")
fetchReq, _, throttle := q.ReserveBodies(peer, 50)
if !throttle {
// queue size is only 10, so throttling should occur
t.Fatal("should throttle")
}
// But we should still get the first things to fetch
if got, exp := len(fetchReq.Headers), 5; got != exp {
t.Fatalf("expected %d requests, got %d", exp, got)
}
if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(1); got != exp {
t.Fatalf("expected header %d, got %d", exp, got)
}
}
if exp, got := q.blockTaskQueue.Size(), numOfBlocks-10; exp != got {
t.Errorf("expected block task queue to be %d, got %d", exp, got)
}
if exp, got := q.receiptTaskQueue.Size(), numOfReceipts; exp != got {
t.Errorf("expected receipt task queue to be %d, got %d", exp, got)
}
{
peer := dummyPeer("peer-2")
fetchReq, _, throttle := q.ReserveBodies(peer, 50)
// The second peer should hit throttling
if !throttle {
t.Fatalf("should not throttle")
}
// And not get any fetches at all, since it was throttled to begin with
if fetchReq != nil {
t.Fatalf("should have no fetches, got %d", len(fetchReq.Headers))
}
}
if exp, got := q.blockTaskQueue.Size(), numOfBlocks-10; exp != got {
t.Errorf("expected block task queue to be %d, got %d", exp, got)
}
if exp, got := q.receiptTaskQueue.Size(), numOfReceipts; exp != got {
t.Errorf("expected receipt task queue to be %d, got %d", exp, got)
}
{
// The receipt delivering peer should not be affected
// by the throttling of body deliveries
peer := dummyPeer("peer-3")
fetchReq, _, throttle := q.ReserveReceipts(peer, 50)
if !throttle {
// queue size is only 10, so throttling should occur
t.Fatal("should throttle")
}
// But we should still get the first things to fetch
if got, exp := len(fetchReq.Headers), 5; got != exp {
t.Fatalf("expected %d requests, got %d", exp, got)
}
if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(1); got != exp {
t.Fatalf("expected header %d, got %d", exp, got)
}
}
if exp, got := q.blockTaskQueue.Size(), numOfBlocks-10; exp != got {
t.Errorf("expected block task queue to be %d, got %d", exp, got)
}
if exp, got := q.receiptTaskQueue.Size(), numOfReceipts-5; exp != got {
t.Errorf("expected receipt task queue to be %d, got %d", exp, got)
}
if got, exp := q.resultCache.countCompleted(), 0; got != exp {
t.Errorf("wrong processable count, got %d, exp %d", got, exp)
}
}
func TestEmptyBlocks(t *testing.T) {
numOfBlocks := len(emptyChain.blocks)
q := newQueue(10, 10)
q.Prepare(1, FastSync)
// Schedule a batch of headers
q.Schedule(emptyChain.headers(), 1)
if q.Idle() {
t.Errorf("queue should not be idle")
}
if got, exp := q.PendingBlocks(), len(emptyChain.blocks); got != exp {
t.Errorf("wrong pending block count, got %d, exp %d", got, exp)
}
if got, exp := q.PendingReceipts(), 0; got != exp {
t.Errorf("wrong pending receipt count, got %d, exp %d", got, exp)
}
// They won't be processable, because the fetchresults haven't been
// created yet
if got, exp := q.resultCache.countCompleted(), 0; got != exp {
t.Errorf("wrong processable count, got %d, exp %d", got, exp)
}
// Items are now queued for downloading, next step is that we tell the
// queue that a certain peer will deliver them for us
// That should trigger all of them to suddenly become 'done'
{
// Reserve blocks
peer := dummyPeer("peer-1")
fetchReq, _, _ := q.ReserveBodies(peer, 50)
// there should be nothing to fetch, blocks are empty
if fetchReq != nil {
t.Fatal("there should be no body fetch tasks remaining")
}
}
if q.blockTaskQueue.Size() != numOfBlocks-10 {
t.Errorf("expected block task queue to be %d, got %d", numOfBlocks-10, q.blockTaskQueue.Size())
}
if q.receiptTaskQueue.Size() != 0 {
t.Errorf("expected receipt task queue to be %d, got %d", 0, q.receiptTaskQueue.Size())
}
{
peer := dummyPeer("peer-3")
fetchReq, _, _ := q.ReserveReceipts(peer, 50)
// there should be nothing to fetch, blocks are empty
if fetchReq != nil {
t.Fatal("there should be no body fetch tasks remaining")
}
}
if q.blockTaskQueue.Size() != numOfBlocks-10 {
t.Errorf("expected block task queue to be %d, got %d", numOfBlocks-10, q.blockTaskQueue.Size())
}
if q.receiptTaskQueue.Size() != 0 {
t.Errorf("expected receipt task queue to be %d, got %d", 0, q.receiptTaskQueue.Size())
}
if got, exp := q.resultCache.countCompleted(), 10; got != exp {
t.Errorf("wrong processable count, got %d, exp %d", got, exp)
}
}
// XTestDelivery does some more extensive testing of events that happen,
// blocks that become known and peers that make reservations and deliveries.
// disabled since it's not really a unit-test, but can be executed to test
// some more advanced scenarios
func XTestDelivery(t *testing.T) {
// the outside network, holding blocks
blo, rec := makeChain(128, 0, genesis, false)
world := newNetwork()
world.receipts = rec
world.chain = blo
world.progress(10)
if false {
log.Root().SetHandler(log.StdoutHandler)
}
q := newQueue(10, 10)
var wg sync.WaitGroup
q.Prepare(1, FastSync)
wg.Add(1)
go func() {
// deliver headers
defer wg.Done()
c := 1
for {
//fmt.Printf("getting headers from %d\n", c)
hdrs := world.headers(c)
l := len(hdrs)
//fmt.Printf("scheduling %d headers, first %d last %d\n",
// l, hdrs[0].Number.Uint64(), hdrs[len(hdrs)-1].Number.Uint64())
q.Schedule(hdrs, uint64(c))
c += l
}
}()
wg.Add(1)
go func() {
// collect results
defer wg.Done()
tot := 0
for {
res := q.Results(true)
tot += len(res)
fmt.Printf("got %d results, %d tot\n", len(res), tot)
// Now we can forget about these
world.forget(res[len(res)-1].Header.Number.Uint64())
}
}()
wg.Add(1)
go func() {
defer wg.Done()
// reserve body fetch
i := 4
for {
peer := dummyPeer(fmt.Sprintf("peer-%d", i))
f, _, _ := q.ReserveBodies(peer, rand.Intn(30))
if f != nil {
var emptyList []*types.Header
var txs [][]*types.Transaction
var uncles [][]*types.Header
numToSkip := rand.Intn(len(f.Headers))
for _, hdr := range f.Headers[0 : len(f.Headers)-numToSkip] {
txs = append(txs, world.getTransactions(hdr.Number.Uint64()))
uncles = append(uncles, emptyList)
}
time.Sleep(100 * time.Millisecond)
_, err := q.DeliverBodies(peer.id, txs, uncles)
if err != nil {
fmt.Printf("delivered %d bodies %v\n", len(txs), err)
}
} else {
i++
time.Sleep(200 * time.Millisecond)
}
}
}()
go func() {
defer wg.Done()
// reserve receiptfetch
peer := dummyPeer("peer-3")
for {
f, _, _ := q.ReserveReceipts(peer, rand.Intn(50))
if f != nil {
var rcs [][]*types.Receipt
for _, hdr := range f.Headers {
rcs = append(rcs, world.getReceipts(hdr.Number.Uint64()))
}
_, err := q.DeliverReceipts(peer.id, rcs)
if err != nil {
fmt.Printf("delivered %d receipts %v\n", len(rcs), err)
}
time.Sleep(100 * time.Millisecond)
} else {
time.Sleep(200 * time.Millisecond)
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 50; i++ {
time.Sleep(300 * time.Millisecond)
//world.tick()
//fmt.Printf("trying to progress\n")
world.progress(rand.Intn(100))
}
for i := 0; i < 50; i++ {
time.Sleep(2990 * time.Millisecond)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for {
time.Sleep(990 * time.Millisecond)
fmt.Printf("world block tip is %d\n",
world.chain[len(world.chain)-1].Header().Number.Uint64())
fmt.Println(q.Stats())
}
}()
wg.Wait()
}
func newNetwork() *network {
var l sync.RWMutex
return &network{
cond: sync.NewCond(&l),
offset: 1, // block 1 is at blocks[0]
}
}
// represents the network
type network struct {
offset int
chain []*types.Block
receipts []types.Receipts
lock sync.RWMutex
cond *sync.Cond
}
func (n *network) getTransactions(blocknum uint64) types.Transactions {
index := blocknum - uint64(n.offset)
return n.chain[index].Transactions()
}
func (n *network) getReceipts(blocknum uint64) types.Receipts {
index := blocknum - uint64(n.offset)
if got := n.chain[index].Header().Number.Uint64(); got != blocknum {
fmt.Printf("Err, got %d exp %d\n", got, blocknum)
panic("sd")
}
return n.receipts[index]
}
func (n *network) forget(blocknum uint64) {
index := blocknum - uint64(n.offset)
n.chain = n.chain[index:]
n.receipts = n.receipts[index:]
n.offset = int(blocknum)
}
func (n *network) progress(numBlocks int) {
n.lock.Lock()
defer n.lock.Unlock()
//fmt.Printf("progressing...\n")
newBlocks, newR := makeChain(numBlocks, 0, n.chain[len(n.chain)-1], false)
n.chain = append(n.chain, newBlocks...)
n.receipts = append(n.receipts, newR...)
n.cond.Broadcast()
}
func (n *network) headers(from int) []*types.Header {
numHeaders := 128
var hdrs []*types.Header
index := from - n.offset
for index >= len(n.chain) {
// wait for progress
n.cond.L.Lock()
//fmt.Printf("header going into wait\n")
n.cond.Wait()
index = from - n.offset
n.cond.L.Unlock()
}
n.lock.RLock()
defer n.lock.RUnlock()
for i, b := range n.chain[index:] {
hdrs = append(hdrs, b.Header())
if i >= numHeaders {
break
}
}
return hdrs
}

View File

@ -0,0 +1,194 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"fmt"
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/core/types"
)
// resultStore implements a structure for maintaining fetchResults, tracking their
// download-progress and delivering (finished) results.
type resultStore struct {
items []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block chain
// Internal index of first non-completed entry, updated atomically when needed.
// If all items are complete, this will equal length(items), so
// *important* : is not safe to use for indexing without checking against length
indexIncomplete int32 // atomic access
// throttleThreshold is the limit up to which we _want_ to fill the
// results. If blocks are large, we want to limit the results to less
// than the number of available slots, and maybe only fill 1024 out of
// 8192 possible places. The queue will, at certain times, recalibrate
// this index.
throttleThreshold uint64
lock sync.RWMutex
}
func newResultStore(size int) *resultStore {
return &resultStore{
resultOffset: 0,
items: make([]*fetchResult, size),
throttleThreshold: uint64(size),
}
}
// SetThrottleThreshold updates the throttling threshold based on the requested
// limit and the total queue capacity. It returns the (possibly capped) threshold
func (r *resultStore) SetThrottleThreshold(threshold uint64) uint64 {
r.lock.Lock()
defer r.lock.Unlock()
limit := uint64(len(r.items))
if threshold >= limit {
threshold = limit
}
r.throttleThreshold = threshold
return r.throttleThreshold
}
// AddFetch adds a header for body/receipt fetching. This is used when the queue
// wants to reserve headers for fetching.
//
// It returns the following:
// stale - if true, this item is already passed, and should not be requested again
// throttled - if true, the store is at capacity, this particular header is not prio now
// item - the result to store data into
// err - any error that occurred
func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, throttled bool, item *fetchResult, err error) {
r.lock.Lock()
defer r.lock.Unlock()
var index int
item, index, stale, throttled, err = r.getFetchResult(header.Number.Uint64())
if err != nil || stale || throttled {
return stale, throttled, item, err
}
if item == nil {
item = newFetchResult(header, fastSync)
r.items[index] = item
}
return stale, throttled, item, err
}
// GetDeliverySlot returns the fetchResult for the given header. If the 'stale' flag
// is true, that means the header has already been delivered 'upstream'. This method
// does not bubble up the 'throttle' flag, since it's moot at the point in time when
// the item is downloaded and ready for delivery
func (r *resultStore) GetDeliverySlot(headerNumber uint64) (*fetchResult, bool, error) {
r.lock.RLock()
defer r.lock.RUnlock()
res, _, stale, _, err := r.getFetchResult(headerNumber)
return res, stale, err
}
// getFetchResult returns the fetchResult corresponding to the given item, and
// the index where the result is stored.
func (r *resultStore) getFetchResult(headerNumber uint64) (item *fetchResult, index int, stale, throttle bool, err error) {
index = int(int64(headerNumber) - int64(r.resultOffset))
throttle = index >= int(r.throttleThreshold)
stale = index < 0
if index >= len(r.items) {
err = fmt.Errorf("%w: index allocation went beyond available resultStore space "+
"(index [%d] = header [%d] - resultOffset [%d], len(resultStore) = %d", errInvalidChain,
index, headerNumber, r.resultOffset, len(r.items))
return nil, index, stale, throttle, err
}
if stale {
return nil, index, stale, throttle, nil
}
item = r.items[index]
return item, index, stale, throttle, nil
}
// hasCompletedItems returns true if there are processable items available
// this method is cheaper than countCompleted
func (r *resultStore) HasCompletedItems() bool {
r.lock.RLock()
defer r.lock.RUnlock()
if len(r.items) == 0 {
return false
}
if item := r.items[0]; item != nil && item.AllDone() {
return true
}
return false
}
// countCompleted returns the number of items ready for delivery, stopping at
// the first non-complete item.
//
// The mthod assumes (at least) rlock is held.
func (r *resultStore) countCompleted() int {
// We iterate from the already known complete point, and see
// if any more has completed since last count
index := atomic.LoadInt32(&r.indexIncomplete)
for ; ; index++ {
if index >= int32(len(r.items)) {
break
}
result := r.items[index]
if result == nil || !result.AllDone() {
break
}
}
atomic.StoreInt32(&r.indexIncomplete, index)
return int(index)
}
// GetCompleted returns the next batch of completed fetchResults
func (r *resultStore) GetCompleted(limit int) []*fetchResult {
r.lock.Lock()
defer r.lock.Unlock()
completed := r.countCompleted()
if limit > completed {
limit = completed
}
results := make([]*fetchResult, limit)
copy(results, r.items[:limit])
// Delete the results from the cache and clear the tail.
copy(r.items, r.items[limit:])
for i := len(r.items) - limit; i < len(r.items); i++ {
r.items[i] = nil
}
// Advance the expected block number of the first cache entry
r.resultOffset += uint64(limit)
atomic.AddInt32(&r.indexIncomplete, int32(-limit))
return results
}
// Prepare initialises the offset with the given block number
func (r *resultStore) Prepare(offset uint64) {
r.lock.Lock()
defer r.lock.Unlock()
if r.resultOffset < offset {
r.resultOffset = offset
}
}

615
les/downloader/statesync.go Normal file
View File

@ -0,0 +1,615 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie"
"golang.org/x/crypto/sha3"
)
// stateReq represents a batch of state fetch requests grouped together into
// a single data retrieval network packet.
type stateReq struct {
nItems uint16 // Number of items requested for download (max is 384, so uint16 is sufficient)
trieTasks map[common.Hash]*trieTask // Trie node download tasks to track previous attempts
codeTasks map[common.Hash]*codeTask // Byte code download tasks to track previous attempts
timeout time.Duration // Maximum round trip time for this to complete
timer *time.Timer // Timer to fire when the RTT timeout expires
peer *peerConnection // Peer that we're requesting from
delivered time.Time // Time when the packet was delivered (independent when we process it)
response [][]byte // Response data of the peer (nil for timeouts)
dropped bool // Flag whether the peer dropped off early
}
// timedOut returns if this request timed out.
func (req *stateReq) timedOut() bool {
return req.response == nil
}
// stateSyncStats is a collection of progress stats to report during a state trie
// sync to RPC requests as well as to display in user logs.
type stateSyncStats struct {
processed uint64 // Number of state entries processed
duplicate uint64 // Number of state entries downloaded twice
unexpected uint64 // Number of non-requested state entries received
pending uint64 // Number of still pending state entries
}
// syncState starts downloading state with the given root hash.
func (d *Downloader) syncState(root common.Hash) *stateSync {
// Create the state sync
s := newStateSync(d, root)
select {
case d.stateSyncStart <- s:
// If we tell the statesync to restart with a new root, we also need
// to wait for it to actually also start -- when old requests have timed
// out or been delivered
<-s.started
case <-d.quitCh:
s.err = errCancelStateFetch
close(s.done)
}
return s
}
// stateFetcher manages the active state sync and accepts requests
// on its behalf.
func (d *Downloader) stateFetcher() {
for {
select {
case s := <-d.stateSyncStart:
for next := s; next != nil; {
next = d.runStateSync(next)
}
case <-d.stateCh:
// Ignore state responses while no sync is running.
case <-d.quitCh:
return
}
}
}
// runStateSync runs a state synchronisation until it completes or another root
// hash is requested to be switched over to.
func (d *Downloader) runStateSync(s *stateSync) *stateSync {
var (
active = make(map[string]*stateReq) // Currently in-flight requests
finished []*stateReq // Completed or failed requests
timeout = make(chan *stateReq) // Timed out active requests
)
log.Trace("State sync starting", "root", s.root)
defer func() {
// Cancel active request timers on exit. Also set peers to idle so they're
// available for the next sync.
for _, req := range active {
req.timer.Stop()
req.peer.SetNodeDataIdle(int(req.nItems), time.Now())
}
}()
go s.run()
defer s.Cancel()
// Listen for peer departure events to cancel assigned tasks
peerDrop := make(chan *peerConnection, 1024)
peerSub := s.d.peers.SubscribePeerDrops(peerDrop)
defer peerSub.Unsubscribe()
for {
// Enable sending of the first buffered element if there is one.
var (
deliverReq *stateReq
deliverReqCh chan *stateReq
)
if len(finished) > 0 {
deliverReq = finished[0]
deliverReqCh = s.deliver
}
select {
// The stateSync lifecycle:
case next := <-d.stateSyncStart:
d.spindownStateSync(active, finished, timeout, peerDrop)
return next
case <-s.done:
d.spindownStateSync(active, finished, timeout, peerDrop)
return nil
// Send the next finished request to the current sync:
case deliverReqCh <- deliverReq:
// Shift out the first request, but also set the emptied slot to nil for GC
copy(finished, finished[1:])
finished[len(finished)-1] = nil
finished = finished[:len(finished)-1]
// Handle incoming state packs:
case pack := <-d.stateCh:
// Discard any data not requested (or previously timed out)
req := active[pack.PeerId()]
if req == nil {
log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items())
continue
}
// Finalize the request and queue up for processing
req.timer.Stop()
req.response = pack.(*statePack).states
req.delivered = time.Now()
finished = append(finished, req)
delete(active, pack.PeerId())
// Handle dropped peer connections:
case p := <-peerDrop:
// Skip if no request is currently pending
req := active[p.id]
if req == nil {
continue
}
// Finalize the request and queue up for processing
req.timer.Stop()
req.dropped = true
req.delivered = time.Now()
finished = append(finished, req)
delete(active, p.id)
// Handle timed-out requests:
case req := <-timeout:
// If the peer is already requesting something else, ignore the stale timeout.
// This can happen when the timeout and the delivery happens simultaneously,
// causing both pathways to trigger.
if active[req.peer.id] != req {
continue
}
req.delivered = time.Now()
// Move the timed out data back into the download queue
finished = append(finished, req)
delete(active, req.peer.id)
// Track outgoing state requests:
case req := <-d.trackStateReq:
// If an active request already exists for this peer, we have a problem. In
// theory the trie node schedule must never assign two requests to the same
// peer. In practice however, a peer might receive a request, disconnect and
// immediately reconnect before the previous times out. In this case the first
// request is never honored, alas we must not silently overwrite it, as that
// causes valid requests to go missing and sync to get stuck.
if old := active[req.peer.id]; old != nil {
log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
// Move the previous request to the finished set
old.timer.Stop()
old.dropped = true
old.delivered = time.Now()
finished = append(finished, old)
}
// Start a timer to notify the sync loop if the peer stalled.
req.timer = time.AfterFunc(req.timeout, func() {
timeout <- req
})
active[req.peer.id] = req
}
}
}
// spindownStateSync 'drains' the outstanding requests; some will be delivered and other
// will time out. This is to ensure that when the next stateSync starts working, all peers
// are marked as idle and de facto _are_ idle.
func (d *Downloader) spindownStateSync(active map[string]*stateReq, finished []*stateReq, timeout chan *stateReq, peerDrop chan *peerConnection) {
log.Trace("State sync spinning down", "active", len(active), "finished", len(finished))
for len(active) > 0 {
var (
req *stateReq
reason string
)
select {
// Handle (drop) incoming state packs:
case pack := <-d.stateCh:
req = active[pack.PeerId()]
reason = "delivered"
// Handle dropped peer connections:
case p := <-peerDrop:
req = active[p.id]
reason = "peerdrop"
// Handle timed-out requests:
case req = <-timeout:
reason = "timeout"
}
if req == nil {
continue
}
req.peer.log.Trace("State peer marked idle (spindown)", "req.items", int(req.nItems), "reason", reason)
req.timer.Stop()
delete(active, req.peer.id)
req.peer.SetNodeDataIdle(int(req.nItems), time.Now())
}
// The 'finished' set contains deliveries that we were going to pass to processing.
// Those are now moot, but we still need to set those peers as idle, which would
// otherwise have been done after processing
for _, req := range finished {
req.peer.SetNodeDataIdle(int(req.nItems), time.Now())
}
}
// stateSync schedules requests for downloading a particular state trie defined
// by a given state root.
type stateSync struct {
d *Downloader // Downloader instance to access and manage current peerset
root common.Hash // State root currently being synced
sched *trie.Sync // State trie sync scheduler defining the tasks
keccak crypto.KeccakState // Keccak256 hasher to verify deliveries with
trieTasks map[common.Hash]*trieTask // Set of trie node tasks currently queued for retrieval
codeTasks map[common.Hash]*codeTask // Set of byte code tasks currently queued for retrieval
numUncommitted int
bytesUncommitted int
started chan struct{} // Started is signalled once the sync loop starts
deliver chan *stateReq // Delivery channel multiplexing peer responses
cancel chan struct{} // Channel to signal a termination request
cancelOnce sync.Once // Ensures cancel only ever gets called once
done chan struct{} // Channel to signal termination completion
err error // Any error hit during sync (set before completion)
}
// trieTask represents a single trie node download task, containing a set of
// peers already attempted retrieval from to detect stalled syncs and abort.
type trieTask struct {
path [][]byte
attempts map[string]struct{}
}
// codeTask represents a single byte code download task, containing a set of
// peers already attempted retrieval from to detect stalled syncs and abort.
type codeTask struct {
attempts map[string]struct{}
}
// newStateSync creates a new state trie download scheduler. This method does not
// yet start the sync. The user needs to call run to initiate.
func newStateSync(d *Downloader, root common.Hash) *stateSync {
return &stateSync{
d: d,
root: root,
sched: state.NewStateSync(root, d.stateDB, d.stateBloom, nil),
keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState),
trieTasks: make(map[common.Hash]*trieTask),
codeTasks: make(map[common.Hash]*codeTask),
deliver: make(chan *stateReq),
cancel: make(chan struct{}),
done: make(chan struct{}),
started: make(chan struct{}),
}
}
// run starts the task assignment and response processing loop, blocking until
// it finishes, and finally notifying any goroutines waiting for the loop to
// finish.
func (s *stateSync) run() {
close(s.started)
if s.d.snapSync {
s.err = s.d.SnapSyncer.Sync(s.root, s.cancel)
} else {
s.err = s.loop()
}
close(s.done)
}
// Wait blocks until the sync is done or canceled.
func (s *stateSync) Wait() error {
<-s.done
return s.err
}
// Cancel cancels the sync and waits until it has shut down.
func (s *stateSync) Cancel() error {
s.cancelOnce.Do(func() {
close(s.cancel)
})
return s.Wait()
}
// loop is the main event loop of a state trie sync. It it responsible for the
// assignment of new tasks to peers (including sending it to them) as well as
// for the processing of inbound data. Note, that the loop does not directly
// receive data from peers, rather those are buffered up in the downloader and
// pushed here async. The reason is to decouple processing from data receipt
// and timeouts.
func (s *stateSync) loop() (err error) {
// Listen for new peer events to assign tasks to them
newPeer := make(chan *peerConnection, 1024)
peerSub := s.d.peers.SubscribeNewPeers(newPeer)
defer peerSub.Unsubscribe()
defer func() {
cerr := s.commit(true)
if err == nil {
err = cerr
}
}()
// Keep assigning new tasks until the sync completes or aborts
for s.sched.Pending() > 0 {
if err = s.commit(false); err != nil {
return err
}
s.assignTasks()
// Tasks assigned, wait for something to happen
select {
case <-newPeer:
// New peer arrived, try to assign it download tasks
case <-s.cancel:
return errCancelStateFetch
case <-s.d.cancelCh:
return errCanceled
case req := <-s.deliver:
// Response, disconnect or timeout triggered, drop the peer if stalling
log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut())
if req.nItems <= 2 && !req.dropped && req.timedOut() {
// 2 items are the minimum requested, if even that times out, we've no use of
// this peer at the moment.
log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id)
if s.d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
req.peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", req.peer.id)
} else {
s.d.dropPeer(req.peer.id)
// If this peer was the master peer, abort sync immediately
s.d.cancelLock.RLock()
master := req.peer.id == s.d.cancelPeer
s.d.cancelLock.RUnlock()
if master {
s.d.cancel()
return errTimeout
}
}
}
// Process all the received blobs and check for stale delivery
delivered, err := s.process(req)
req.peer.SetNodeDataIdle(delivered, req.delivered)
if err != nil {
log.Warn("Node data write error", "err", err)
return err
}
}
}
return nil
}
func (s *stateSync) commit(force bool) error {
if !force && s.bytesUncommitted < ethdb.IdealBatchSize {
return nil
}
start := time.Now()
b := s.d.stateDB.NewBatch()
if err := s.sched.Commit(b); err != nil {
return err
}
if err := b.Write(); err != nil {
return fmt.Errorf("DB write error: %v", err)
}
s.updateStats(s.numUncommitted, 0, 0, time.Since(start))
s.numUncommitted = 0
s.bytesUncommitted = 0
return nil
}
// assignTasks attempts to assign new tasks to all idle peers, either from the
// batch currently being retried, or fetching new data from the trie sync itself.
func (s *stateSync) assignTasks() {
// Iterate over all idle peers and try to assign them state fetches
peers, _ := s.d.peers.NodeDataIdlePeers()
for _, p := range peers {
// Assign a batch of fetches proportional to the estimated latency/bandwidth
cap := p.NodeDataCapacity(s.d.peers.rates.TargetRoundTrip())
req := &stateReq{peer: p, timeout: s.d.peers.rates.TargetTimeout()}
nodes, _, codes := s.fillTasks(cap, req)
// If the peer was assigned tasks to fetch, send the network request
if len(nodes)+len(codes) > 0 {
req.peer.log.Trace("Requesting batch of state data", "nodes", len(nodes), "codes", len(codes), "root", s.root)
select {
case s.d.trackStateReq <- req:
req.peer.FetchNodeData(append(nodes, codes...)) // Unified retrieval under eth/6x
case <-s.cancel:
case <-s.d.cancelCh:
}
}
}
}
// fillTasks fills the given request object with a maximum of n state download
// tasks to send to the remote peer.
func (s *stateSync) fillTasks(n int, req *stateReq) (nodes []common.Hash, paths []trie.SyncPath, codes []common.Hash) {
// Refill available tasks from the scheduler.
if fill := n - (len(s.trieTasks) + len(s.codeTasks)); fill > 0 {
nodes, paths, codes := s.sched.Missing(fill)
for i, hash := range nodes {
s.trieTasks[hash] = &trieTask{
path: paths[i],
attempts: make(map[string]struct{}),
}
}
for _, hash := range codes {
s.codeTasks[hash] = &codeTask{
attempts: make(map[string]struct{}),
}
}
}
// Find tasks that haven't been tried with the request's peer. Prefer code
// over trie nodes as those can be written to disk and forgotten about.
nodes = make([]common.Hash, 0, n)
paths = make([]trie.SyncPath, 0, n)
codes = make([]common.Hash, 0, n)
req.trieTasks = make(map[common.Hash]*trieTask, n)
req.codeTasks = make(map[common.Hash]*codeTask, n)
for hash, t := range s.codeTasks {
// Stop when we've gathered enough requests
if len(nodes)+len(codes) == n {
break
}
// Skip any requests we've already tried from this peer
if _, ok := t.attempts[req.peer.id]; ok {
continue
}
// Assign the request to this peer
t.attempts[req.peer.id] = struct{}{}
codes = append(codes, hash)
req.codeTasks[hash] = t
delete(s.codeTasks, hash)
}
for hash, t := range s.trieTasks {
// Stop when we've gathered enough requests
if len(nodes)+len(codes) == n {
break
}
// Skip any requests we've already tried from this peer
if _, ok := t.attempts[req.peer.id]; ok {
continue
}
// Assign the request to this peer
t.attempts[req.peer.id] = struct{}{}
nodes = append(nodes, hash)
paths = append(paths, t.path)
req.trieTasks[hash] = t
delete(s.trieTasks, hash)
}
req.nItems = uint16(len(nodes) + len(codes))
return nodes, paths, codes
}
// process iterates over a batch of delivered state data, injecting each item
// into a running state sync, re-queuing any items that were requested but not
// delivered. Returns whether the peer actually managed to deliver anything of
// value, and any error that occurred.
func (s *stateSync) process(req *stateReq) (int, error) {
// Collect processing stats and update progress if valid data was received
duplicate, unexpected, successful := 0, 0, 0
defer func(start time.Time) {
if duplicate > 0 || unexpected > 0 {
s.updateStats(0, duplicate, unexpected, time.Since(start))
}
}(time.Now())
// Iterate over all the delivered data and inject one-by-one into the trie
for _, blob := range req.response {
hash, err := s.processNodeData(blob)
switch err {
case nil:
s.numUncommitted++
s.bytesUncommitted += len(blob)
successful++
case trie.ErrNotRequested:
unexpected++
case trie.ErrAlreadyProcessed:
duplicate++
default:
return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
}
// Delete from both queues (one delivery is enough for the syncer)
delete(req.trieTasks, hash)
delete(req.codeTasks, hash)
}
// Put unfulfilled tasks back into the retry queue
npeers := s.d.peers.Len()
for hash, task := range req.trieTasks {
// If the node did deliver something, missing items may be due to a protocol
// limit or a previous timeout + delayed delivery. Both cases should permit
// the node to retry the missing items (to avoid single-peer stalls).
if len(req.response) > 0 || req.timedOut() {
delete(task.attempts, req.peer.id)
}
// If we've requested the node too many times already, it may be a malicious
// sync where nobody has the right data. Abort.
if len(task.attempts) >= npeers {
return successful, fmt.Errorf("trie node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
}
// Missing item, place into the retry queue.
s.trieTasks[hash] = task
}
for hash, task := range req.codeTasks {
// If the node did deliver something, missing items may be due to a protocol
// limit or a previous timeout + delayed delivery. Both cases should permit
// the node to retry the missing items (to avoid single-peer stalls).
if len(req.response) > 0 || req.timedOut() {
delete(task.attempts, req.peer.id)
}
// If we've requested the node too many times already, it may be a malicious
// sync where nobody has the right data. Abort.
if len(task.attempts) >= npeers {
return successful, fmt.Errorf("byte code %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
}
// Missing item, place into the retry queue.
s.codeTasks[hash] = task
}
return successful, nil
}
// processNodeData tries to inject a trie node data blob delivered from a remote
// peer into the state trie, returning whether anything useful was written or any
// error occurred.
func (s *stateSync) processNodeData(blob []byte) (common.Hash, error) {
res := trie.SyncResult{Data: blob}
s.keccak.Reset()
s.keccak.Write(blob)
s.keccak.Read(res.Hash[:])
err := s.sched.Process(res)
return res.Hash, err
}
// updateStats bumps the various state sync progress counters and displays a log
// message for the user to see.
func (s *stateSync) updateStats(written, duplicate, unexpected int, duration time.Duration) {
s.d.syncStatsLock.Lock()
defer s.d.syncStatsLock.Unlock()
s.d.syncStatsState.pending = uint64(s.sched.Pending())
s.d.syncStatsState.processed += uint64(written)
s.d.syncStatsState.duplicate += uint64(duplicate)
s.d.syncStatsState.unexpected += uint64(unexpected)
if written > 0 || duplicate > 0 || unexpected > 0 {
log.Info("Imported new state entries", "count", written, "elapsed", common.PrettyDuration(duration), "processed", s.d.syncStatsState.processed, "pending", s.d.syncStatsState.pending, "trieretry", len(s.trieTasks), "coderetry", len(s.codeTasks), "duplicate", s.d.syncStatsState.duplicate, "unexpected", s.d.syncStatsState.unexpected)
}
if written > 0 {
rawdb.WriteFastTrieProgress(s.d.stateDB, s.d.syncStatsState.processed)
}
}

View File

@ -0,0 +1,230 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"fmt"
"math/big"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
)
// Test chain parameters.
var (
testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
testAddress = crypto.PubkeyToAddress(testKey.PublicKey)
testDB = rawdb.NewMemoryDatabase()
testGenesis = core.GenesisBlockForTesting(testDB, testAddress, big.NewInt(1000000000000000))
)
// The common prefix of all test chains:
var testChainBase = newTestChain(blockCacheMaxItems+200, testGenesis)
// Different forks on top of the base chain:
var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain
func init() {
var forkLen = int(fullMaxForkAncestry + 50)
var wg sync.WaitGroup
wg.Add(3)
go func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1); wg.Done() }()
go func() { testChainForkLightB = testChainBase.makeFork(forkLen, false, 2); wg.Done() }()
go func() { testChainForkHeavy = testChainBase.makeFork(forkLen, true, 3); wg.Done() }()
wg.Wait()
}
type testChain struct {
genesis *types.Block
chain []common.Hash
headerm map[common.Hash]*types.Header
blockm map[common.Hash]*types.Block
receiptm map[common.Hash][]*types.Receipt
tdm map[common.Hash]*big.Int
}
// newTestChain creates a blockchain of the given length.
func newTestChain(length int, genesis *types.Block) *testChain {
tc := new(testChain).copy(length)
tc.genesis = genesis
tc.chain = append(tc.chain, genesis.Hash())
tc.headerm[tc.genesis.Hash()] = tc.genesis.Header()
tc.tdm[tc.genesis.Hash()] = tc.genesis.Difficulty()
tc.blockm[tc.genesis.Hash()] = tc.genesis
tc.generate(length-1, 0, genesis, false)
return tc
}
// makeFork creates a fork on top of the test chain.
func (tc *testChain) makeFork(length int, heavy bool, seed byte) *testChain {
fork := tc.copy(tc.len() + length)
fork.generate(length, seed, tc.headBlock(), heavy)
return fork
}
// shorten creates a copy of the chain with the given length. It panics if the
// length is longer than the number of available blocks.
func (tc *testChain) shorten(length int) *testChain {
if length > tc.len() {
panic(fmt.Errorf("can't shorten test chain to %d blocks, it's only %d blocks long", length, tc.len()))
}
return tc.copy(length)
}
func (tc *testChain) copy(newlen int) *testChain {
cpy := &testChain{
genesis: tc.genesis,
headerm: make(map[common.Hash]*types.Header, newlen),
blockm: make(map[common.Hash]*types.Block, newlen),
receiptm: make(map[common.Hash][]*types.Receipt, newlen),
tdm: make(map[common.Hash]*big.Int, newlen),
}
for i := 0; i < len(tc.chain) && i < newlen; i++ {
hash := tc.chain[i]
cpy.chain = append(cpy.chain, tc.chain[i])
cpy.tdm[hash] = tc.tdm[hash]
cpy.blockm[hash] = tc.blockm[hash]
cpy.headerm[hash] = tc.headerm[hash]
cpy.receiptm[hash] = tc.receiptm[hash]
}
return cpy
}
// generate creates a chain of n blocks starting at and including parent.
// the returned hash chain is ordered head->parent. In addition, every 22th block
// contains a transaction and every 5th an uncle to allow testing correct block
// reassembly.
func (tc *testChain) generate(n int, seed byte, parent *types.Block, heavy bool) {
// start := time.Now()
// defer func() { fmt.Printf("test chain generated in %v\n", time.Since(start)) }()
blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testDB, n, func(i int, block *core.BlockGen) {
block.SetCoinbase(common.Address{seed})
// If a heavy chain is requested, delay blocks to raise difficulty
if heavy {
block.OffsetTime(-1)
}
// Include transactions to the miner to make blocks more interesting.
if parent == tc.genesis && i%22 == 0 {
signer := types.MakeSigner(params.TestChainConfig, block.Number())
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, block.BaseFee(), nil), signer, testKey)
if err != nil {
panic(err)
}
block.AddTx(tx)
}
// if the block number is a multiple of 5, add a bonus uncle to the block
if i > 0 && i%5 == 0 {
block.AddUncle(&types.Header{
ParentHash: block.PrevBlock(i - 1).Hash(),
Number: big.NewInt(block.Number().Int64() - 1),
})
}
})
// Convert the block-chain into a hash-chain and header/block maps
td := new(big.Int).Set(tc.td(parent.Hash()))
for i, b := range blocks {
td := td.Add(td, b.Difficulty())
hash := b.Hash()
tc.chain = append(tc.chain, hash)
tc.blockm[hash] = b
tc.headerm[hash] = b.Header()
tc.receiptm[hash] = receipts[i]
tc.tdm[hash] = new(big.Int).Set(td)
}
}
// len returns the total number of blocks in the chain.
func (tc *testChain) len() int {
return len(tc.chain)
}
// headBlock returns the head of the chain.
func (tc *testChain) headBlock() *types.Block {
return tc.blockm[tc.chain[len(tc.chain)-1]]
}
// td returns the total difficulty of the given block.
func (tc *testChain) td(hash common.Hash) *big.Int {
return tc.tdm[hash]
}
// headersByHash returns headers in order from the given hash.
func (tc *testChain) headersByHash(origin common.Hash, amount int, skip int, reverse bool) []*types.Header {
num, _ := tc.hashToNumber(origin)
return tc.headersByNumber(num, amount, skip, reverse)
}
// headersByNumber returns headers from the given number.
func (tc *testChain) headersByNumber(origin uint64, amount int, skip int, reverse bool) []*types.Header {
result := make([]*types.Header, 0, amount)
if !reverse {
for num := origin; num < uint64(len(tc.chain)) && len(result) < amount; num += uint64(skip) + 1 {
if header, ok := tc.headerm[tc.chain[int(num)]]; ok {
result = append(result, header)
}
}
} else {
for num := int64(origin); num >= 0 && len(result) < amount; num -= int64(skip) + 1 {
if header, ok := tc.headerm[tc.chain[int(num)]]; ok {
result = append(result, header)
}
}
}
return result
}
// receipts returns the receipts of the given block hashes.
func (tc *testChain) receipts(hashes []common.Hash) [][]*types.Receipt {
results := make([][]*types.Receipt, 0, len(hashes))
for _, hash := range hashes {
if receipt, ok := tc.receiptm[hash]; ok {
results = append(results, receipt)
}
}
return results
}
// bodies returns the block bodies of the given block hashes.
func (tc *testChain) bodies(hashes []common.Hash) ([][]*types.Transaction, [][]*types.Header) {
transactions := make([][]*types.Transaction, 0, len(hashes))
uncles := make([][]*types.Header, 0, len(hashes))
for _, hash := range hashes {
if block, ok := tc.blockm[hash]; ok {
transactions = append(transactions, block.Transactions())
uncles = append(uncles, block.Uncles())
}
}
return transactions, uncles
}
func (tc *testChain) hashToNumber(target common.Hash) (uint64, bool) {
for num, hash := range tc.chain {
if hash == target {
return uint64(num), true
}
}
return 0, false
}

79
les/downloader/types.go Normal file
View File

@ -0,0 +1,79 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"fmt"
"github.com/ethereum/go-ethereum/core/types"
)
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
// dataPack is a data message returned by a peer for some query.
type dataPack interface {
PeerId() string
Items() int
Stats() string
}
// headerPack is a batch of block headers returned by a peer.
type headerPack struct {
peerID string
headers []*types.Header
}
func (p *headerPack) PeerId() string { return p.peerID }
func (p *headerPack) Items() int { return len(p.headers) }
func (p *headerPack) Stats() string { return fmt.Sprintf("%d", len(p.headers)) }
// bodyPack is a batch of block bodies returned by a peer.
type bodyPack struct {
peerID string
transactions [][]*types.Transaction
uncles [][]*types.Header
}
func (p *bodyPack) PeerId() string { return p.peerID }
func (p *bodyPack) Items() int {
if len(p.transactions) <= len(p.uncles) {
return len(p.transactions)
}
return len(p.uncles)
}
func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) }
// receiptPack is a batch of receipts returned by a peer.
type receiptPack struct {
peerID string
receipts [][]*types.Receipt
}
func (p *receiptPack) PeerId() string { return p.peerID }
func (p *receiptPack) Items() int { return len(p.receipts) }
func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) }
// statePack is a batch of states returned by a peer.
type statePack struct {
peerID string
states [][]byte
}
func (p *statePack) PeerId() string { return p.peerID }
func (p *statePack) Items() int { return len(p.states) }
func (p *statePack) Stats() string { return fmt.Sprintf("%d", len(p.states)) }