Compare commits
10 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
4bcc0a37ab | ||
|
b5f92e66c6 | ||
|
d8787230fa | ||
|
cdae1c59ab | ||
|
0b00e19ed9 | ||
|
c8d8126bd0 | ||
|
0de9f32ae8 | ||
|
14ae1246b7 | ||
|
dc59af8622 | ||
|
45730cfab3 |
@@ -372,7 +372,7 @@ func copyDb(ctx *cli.Context) error {
|
||||
chain, chainDb := utils.MakeChain(ctx, stack)
|
||||
|
||||
syncmode := *utils.GlobalTextMarshaler(ctx, utils.SyncModeFlag.Name).(*downloader.SyncMode)
|
||||
dl := downloader.New(syncmode, chainDb, new(event.TypeMux), chain, nil, nil)
|
||||
dl := downloader.New(syncmode, 0, chainDb, new(event.TypeMux), chain, nil, nil)
|
||||
|
||||
// Create a source peer to satisfy downloader requests from
|
||||
db, err := ethdb.NewLDBDatabase(ctx.Args().First(), ctx.GlobalInt(utils.CacheFlag.Name), 256)
|
||||
|
@@ -125,6 +125,8 @@ var (
|
||||
utils.VMEnableDebugFlag,
|
||||
utils.NetworkIdFlag,
|
||||
utils.ConstantinopleOverrideFlag,
|
||||
utils.RPCCORSDomainFlag,
|
||||
utils.RPCVirtualHostsFlag,
|
||||
utils.EthStatsURLFlag,
|
||||
utils.MetricsEnabledFlag,
|
||||
utils.FakePoWFlag,
|
||||
|
@@ -75,6 +75,7 @@ var (
|
||||
errUnknownPeer = errors.New("peer is unknown or unhealthy")
|
||||
errBadPeer = errors.New("action from bad peer ignored")
|
||||
errStallingPeer = errors.New("peer is stalling")
|
||||
errUnsyncedPeer = errors.New("unsynced peer")
|
||||
errNoPeers = errors.New("no peers to keep download active")
|
||||
errTimeout = errors.New("timeout")
|
||||
errEmptyHeaderSet = errors.New("empty header set by peer")
|
||||
@@ -99,10 +100,11 @@ type Downloader struct {
|
||||
mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
|
||||
mux *event.TypeMux // Event multiplexer to announce sync operation events
|
||||
|
||||
genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT)
|
||||
queue *queue // Scheduler for selecting the hashes to download
|
||||
peers *peerSet // Set of active peers from which download can proceed
|
||||
stateDB ethdb.Database
|
||||
checkpoint uint64 // Checkpoint block number to enforce head against (e.g. fast sync)
|
||||
genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT)
|
||||
queue *queue // Scheduler for selecting the hashes to download
|
||||
peers *peerSet // Set of active peers from which download can proceed
|
||||
stateDB ethdb.Database
|
||||
|
||||
rttEstimate uint64 // Round trip time to target for download requests
|
||||
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
|
||||
@@ -205,15 +207,15 @@ type BlockChain interface {
|
||||
}
|
||||
|
||||
// New creates a new downloader to fetch hashes and blocks from remote peers.
|
||||
func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
|
||||
func New(mode SyncMode, checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
|
||||
if lightchain == nil {
|
||||
lightchain = chain
|
||||
}
|
||||
|
||||
dl := &Downloader{
|
||||
mode: mode,
|
||||
stateDB: stateDb,
|
||||
mux: mux,
|
||||
checkpoint: checkpoint,
|
||||
queue: newQueue(),
|
||||
peers: newPeerSet(),
|
||||
rttEstimate: uint64(rttMaxEstimate),
|
||||
@@ -326,7 +328,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
|
||||
case nil:
|
||||
case errBusy:
|
||||
|
||||
case errTimeout, errBadPeer, errStallingPeer,
|
||||
case errTimeout, errBadPeer, errStallingPeer, errUnsyncedPeer,
|
||||
errEmptyHeaderSet, errPeersUnavailable, errTooOld,
|
||||
errInvalidAncestor, errInvalidChain:
|
||||
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
|
||||
@@ -577,6 +579,10 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
|
||||
return nil, errBadPeer
|
||||
}
|
||||
head := headers[0]
|
||||
if d.mode == FastSync && head.Number.Uint64() < d.checkpoint {
|
||||
p.log.Warn("Remote head below checkpoint", "number", head.Number, "hash", head.Hash())
|
||||
return nil, errUnsyncedPeer
|
||||
}
|
||||
p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
|
||||
return head, nil
|
||||
|
||||
|
@@ -26,7 +26,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
ethereum "github.com/ethereum/go-ethereum"
|
||||
"github.com/ethereum/go-ethereum"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
@@ -73,7 +73,8 @@ func newTester() *downloadTester {
|
||||
}
|
||||
tester.stateDb = ethdb.NewMemDatabase()
|
||||
tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00})
|
||||
tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer)
|
||||
|
||||
tester.downloader = New(FullSync, 0, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer)
|
||||
return tester
|
||||
}
|
||||
|
||||
@@ -1049,6 +1050,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
|
||||
{errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
|
||||
{errBadPeer, true}, // Peer was deemed bad for some reason, drop it
|
||||
{errStallingPeer, true}, // Peer was detected to be stalling, drop it
|
||||
{errUnsyncedPeer, true}, // Peer was detected to be unsynced, drop it
|
||||
{errNoPeers, false}, // No peers to download from, soft race, no issue
|
||||
{errTimeout, true}, // No hashes received in due time, drop the peer
|
||||
{errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
|
||||
@@ -1567,3 +1569,39 @@ func TestRemoteHeaderRequestSpan(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Tests that peers below a pre-configured checkpoint block are prevented from
|
||||
// being fast-synced from, avoiding potential cheap eclipse attacks.
|
||||
func TestCheckpointEnforcement62(t *testing.T) { testCheckpointEnforcement(t, 62, FullSync) }
|
||||
func TestCheckpointEnforcement63Full(t *testing.T) { testCheckpointEnforcement(t, 63, FullSync) }
|
||||
func TestCheckpointEnforcement63Fast(t *testing.T) { testCheckpointEnforcement(t, 63, FastSync) }
|
||||
func TestCheckpointEnforcement64Full(t *testing.T) { testCheckpointEnforcement(t, 64, FullSync) }
|
||||
func TestCheckpointEnforcement64Fast(t *testing.T) { testCheckpointEnforcement(t, 64, FastSync) }
|
||||
func TestCheckpointEnforcement64Light(t *testing.T) { testCheckpointEnforcement(t, 64, LightSync) }
|
||||
|
||||
func testCheckpointEnforcement(t *testing.T, protocol int, mode SyncMode) {
|
||||
t.Parallel()
|
||||
|
||||
// Create a new tester with a particular hard coded checkpoint block
|
||||
tester := newTester()
|
||||
defer tester.terminate()
|
||||
|
||||
tester.downloader.checkpoint = uint64(fsMinFullBlocks) + 256
|
||||
chain := testChainBase.shorten(int(tester.downloader.checkpoint) - 1)
|
||||
|
||||
// Attempt to sync with the peer and validate the result
|
||||
tester.newPeer("peer", protocol, chain)
|
||||
|
||||
var expect error
|
||||
if mode == FastSync {
|
||||
expect = errUnsyncedPeer
|
||||
}
|
||||
if err := tester.sync("peer", nil, mode); err != expect {
|
||||
t.Fatalf("block sync error mismatch: have %v, want %v", err, expect)
|
||||
}
|
||||
if mode == FastSync {
|
||||
assertOwnChain(t, tester, 1)
|
||||
} else {
|
||||
assertOwnChain(t, tester, chain.len())
|
||||
}
|
||||
}
|
||||
|
@@ -28,7 +28,6 @@ import (
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/consensus"
|
||||
"github.com/ethereum/go-ethereum/consensus/misc"
|
||||
"github.com/ethereum/go-ethereum/core"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/eth/downloader"
|
||||
@@ -55,7 +54,7 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
daoChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the DAO handshake challenge
|
||||
syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge
|
||||
)
|
||||
|
||||
// errIncompatibleConfig is returned if the requested protocols and configs are
|
||||
@@ -72,6 +71,9 @@ type ProtocolManager struct {
|
||||
fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
|
||||
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
|
||||
|
||||
checkpointNumber uint64 // Block number for the sync progress validator to cross reference
|
||||
checkpointHash common.Hash // Block hash for the sync progress validator to cross reference
|
||||
|
||||
txpool txPool
|
||||
blockchain *core.BlockChain
|
||||
chainconfig *params.ChainConfig
|
||||
@@ -126,6 +128,11 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
|
||||
if mode == downloader.FastSync {
|
||||
manager.fastSync = uint32(1)
|
||||
}
|
||||
// If we have trusted checkpoints, enforce them on the chain
|
||||
if checkpoint, ok := params.TrustedCheckpoints[blockchain.Genesis().Hash()]; ok {
|
||||
manager.checkpointNumber = (checkpoint.SectionIndex+1)*params.CHTFrequencyClient - 1
|
||||
manager.checkpointHash = checkpoint.SectionHead
|
||||
}
|
||||
// Initiate a sub-protocol for every implemented version we can handle
|
||||
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
|
||||
for i, version := range ProtocolVersions {
|
||||
@@ -165,7 +172,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
|
||||
return nil, errIncompatibleConfig
|
||||
}
|
||||
// Construct the different synchronisation mechanisms
|
||||
manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
|
||||
manager.downloader = downloader.New(mode, manager.checkpointNumber, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
|
||||
|
||||
validator := func(header *types.Header) error {
|
||||
return engine.VerifyHeader(blockchain, header, true)
|
||||
@@ -291,22 +298,22 @@ func (pm *ProtocolManager) handle(p *peer) error {
|
||||
// after this will be sent via broadcasts.
|
||||
pm.syncTransactions(p)
|
||||
|
||||
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
|
||||
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
|
||||
// Request the peer's DAO fork header for extra-data validation
|
||||
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
|
||||
// If we have a trusted CHT, reject all peers below that (avoid fast sync eclipse)
|
||||
if pm.checkpointHash != (common.Hash{}) {
|
||||
// Request the peer's checkpoint header for chain height/weight validation
|
||||
if err := p.RequestHeadersByNumber(pm.checkpointNumber, 1, 0, false); err != nil {
|
||||
return err
|
||||
}
|
||||
// Start a timer to disconnect if the peer doesn't reply in time
|
||||
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
|
||||
p.Log().Debug("Timed out DAO fork-check, dropping")
|
||||
p.syncDrop = time.AfterFunc(syncChallengeTimeout, func() {
|
||||
p.Log().Warn("Checkpoint challenge timed out, dropping", "addr", p.RemoteAddr(), "type", p.Name())
|
||||
pm.removePeer(p.id)
|
||||
})
|
||||
// Make sure it's cleaned up if the peer dies off
|
||||
defer func() {
|
||||
if p.forkDrop != nil {
|
||||
p.forkDrop.Stop()
|
||||
p.forkDrop = nil
|
||||
if p.syncDrop != nil {
|
||||
p.syncDrop.Stop()
|
||||
p.syncDrop = nil
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -438,41 +445,33 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
||||
if err := msg.Decode(&headers); err != nil {
|
||||
return errResp(ErrDecode, "msg %v: %v", msg, err)
|
||||
}
|
||||
// If no headers were received, but we're expending a DAO fork check, maybe it's that
|
||||
if len(headers) == 0 && p.forkDrop != nil {
|
||||
// Possibly an empty reply to the fork header checks, sanity check TDs
|
||||
verifyDAO := true
|
||||
// If no headers were received, but we're expencting a checkpoint header, consider it that
|
||||
if len(headers) == 0 && p.syncDrop != nil {
|
||||
// Stop the timer either way, decide later to drop or not
|
||||
p.syncDrop.Stop()
|
||||
p.syncDrop = nil
|
||||
|
||||
// If we already have a DAO header, we can check the peer's TD against it. If
|
||||
// the peer's ahead of this, it too must have a reply to the DAO check
|
||||
if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
|
||||
if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
|
||||
verifyDAO = false
|
||||
}
|
||||
}
|
||||
// If we're seemingly on the same chain, disable the drop timer
|
||||
if verifyDAO {
|
||||
p.Log().Debug("Seems to be on the same side of the DAO fork")
|
||||
p.forkDrop.Stop()
|
||||
p.forkDrop = nil
|
||||
return nil
|
||||
// If we're doing a fast sync, we must enforce the checkpoint block to avoid
|
||||
// eclipse attacks. Unsynced nodes are welcome to connect after we're done
|
||||
// joining the network
|
||||
if atomic.LoadUint32(&pm.fastSync) == 1 {
|
||||
p.Log().Warn("Dropping unsynced node during fast sync", "addr", p.RemoteAddr(), "type", p.Name())
|
||||
return errors.New("unsynced node cannot serve fast sync")
|
||||
}
|
||||
}
|
||||
// Filter out any explicitly requested headers, deliver the rest to the downloader
|
||||
filter := len(headers) == 1
|
||||
if filter {
|
||||
// If it's a potential DAO fork check, validate against the rules
|
||||
if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
|
||||
// Disable the fork drop timer
|
||||
p.forkDrop.Stop()
|
||||
p.forkDrop = nil
|
||||
// If it's a potential sync progress check, validate the content and advertised chain weight
|
||||
if p.syncDrop != nil && headers[0].Number.Uint64() == pm.checkpointNumber {
|
||||
// Disable the sync drop timer
|
||||
p.syncDrop.Stop()
|
||||
p.syncDrop = nil
|
||||
|
||||
// Validate the header and either drop the peer or continue
|
||||
if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
|
||||
p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
|
||||
return err
|
||||
if headers[0].Hash() != pm.checkpointHash {
|
||||
return errors.New("checkpoint hash mismatch")
|
||||
}
|
||||
p.Log().Debug("Verified to be on the same side of the DAO fork")
|
||||
return nil
|
||||
}
|
||||
// Otherwise if it's a whitelisted block, validate against the set
|
||||
|
@@ -449,79 +449,131 @@ func testGetReceipt(t *testing.T, protocol int) {
|
||||
}
|
||||
}
|
||||
|
||||
// Tests that post eth protocol handshake, DAO fork-enabled clients also execute
|
||||
// a DAO "challenge" verifying each others' DAO fork headers to ensure they're on
|
||||
// compatible chains.
|
||||
func TestDAOChallengeNoVsNo(t *testing.T) { testDAOChallenge(t, false, false, false) }
|
||||
func TestDAOChallengeNoVsPro(t *testing.T) { testDAOChallenge(t, false, true, false) }
|
||||
func TestDAOChallengeProVsNo(t *testing.T) { testDAOChallenge(t, true, false, false) }
|
||||
func TestDAOChallengeProVsPro(t *testing.T) { testDAOChallenge(t, true, true, false) }
|
||||
func TestDAOChallengeNoVsTimeout(t *testing.T) { testDAOChallenge(t, false, false, true) }
|
||||
func TestDAOChallengeProVsTimeout(t *testing.T) { testDAOChallenge(t, true, true, true) }
|
||||
// Tests that post eth protocol handshake, clients perform a mutual checkpoint
|
||||
// challenge to validate each other's chains. Hash mismatches, or missing ones
|
||||
// during a fast sync should lead to the peer getting dropped.
|
||||
func TestCheckpointChallenge(t *testing.T) {
|
||||
tests := []struct {
|
||||
syncmode downloader.SyncMode
|
||||
checkpoint bool
|
||||
timeout bool
|
||||
empty bool
|
||||
match bool
|
||||
drop bool
|
||||
}{
|
||||
// If checkpointing is not enabled locally, don't challenge and don't drop
|
||||
{downloader.FullSync, false, false, false, false, false},
|
||||
{downloader.FastSync, false, false, false, false, false},
|
||||
{downloader.LightSync, false, false, false, false, false},
|
||||
|
||||
func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool) {
|
||||
// Reduce the DAO handshake challenge timeout
|
||||
if timeout {
|
||||
defer func(old time.Duration) { daoChallengeTimeout = old }(daoChallengeTimeout)
|
||||
daoChallengeTimeout = 500 * time.Millisecond
|
||||
// If checkpointing is enabled locally and remote response is empty, only drop during fast sync
|
||||
{downloader.FullSync, true, false, true, false, false},
|
||||
{downloader.FastSync, true, false, true, false, true}, // Special case, fast sync, unsynced peer
|
||||
{downloader.LightSync, true, false, true, false, false},
|
||||
|
||||
// If checkpointing is enabled locally and remote response mismatches, always drop
|
||||
{downloader.FullSync, true, false, false, false, true},
|
||||
{downloader.FastSync, true, false, false, false, true},
|
||||
{downloader.LightSync, true, false, false, false, true},
|
||||
|
||||
// If checkpointing is enabled locally and remote response matches, never drop
|
||||
{downloader.FullSync, true, false, false, true, false},
|
||||
{downloader.FastSync, true, false, false, true, false},
|
||||
{downloader.LightSync, true, false, false, true, false},
|
||||
|
||||
// If checkpointing is enabled locally and remote times out, always drop
|
||||
{downloader.FullSync, true, true, false, true, true},
|
||||
{downloader.FastSync, true, true, false, true, true},
|
||||
{downloader.LightSync, true, true, false, true, true},
|
||||
}
|
||||
// Create a DAO aware protocol manager
|
||||
for _, tt := range tests {
|
||||
t.Run(fmt.Sprintf("sync %v checkpoint %v timeout %v empty %v match %v", tt.syncmode, tt.checkpoint, tt.timeout, tt.empty, tt.match), func(t *testing.T) {
|
||||
testCheckpointChallenge(t, tt.syncmode, tt.checkpoint, tt.timeout, tt.empty, tt.match, tt.drop)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpoint bool, timeout bool, empty bool, match bool, drop bool) {
|
||||
// Reduce the checkpoint handshake challenge timeout
|
||||
defer func(old time.Duration) { syncChallengeTimeout = old }(syncChallengeTimeout)
|
||||
syncChallengeTimeout = 250 * time.Millisecond
|
||||
|
||||
// Initialize a chain and generate a fake CHT if checkpointing is enabled
|
||||
var (
|
||||
evmux = new(event.TypeMux)
|
||||
pow = ethash.NewFaker()
|
||||
db = ethdb.NewMemDatabase()
|
||||
config = ¶ms.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked}
|
||||
gspec = &core.Genesis{Config: config}
|
||||
genesis = gspec.MustCommit(db)
|
||||
config = new(params.ChainConfig)
|
||||
genesis = (&core.Genesis{Config: config}).MustCommit(db)
|
||||
)
|
||||
blockchain, err := core.NewBlockChain(db, nil, config, pow, vm.Config{}, nil)
|
||||
// If checkpointing is enabled, create and inject a fake CHT and the corresponding
|
||||
// chllenge response.
|
||||
var response *types.Header
|
||||
if checkpoint {
|
||||
index := uint64(rand.Intn(500))
|
||||
number := (index+1)*params.CHTFrequencyClient - 1
|
||||
response = &types.Header{Number: big.NewInt(int64(number)), Extra: []byte("valid")}
|
||||
|
||||
cht := ¶ms.TrustedCheckpoint{
|
||||
SectionIndex: index,
|
||||
SectionHead: response.Hash(),
|
||||
}
|
||||
params.TrustedCheckpoints[genesis.Hash()] = cht
|
||||
defer delete(params.TrustedCheckpoints, genesis.Hash())
|
||||
}
|
||||
// Create a checkpoint aware protocol manager
|
||||
blockchain, err := core.NewBlockChain(db, nil, config, ethash.NewFaker(), vm.Config{}, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new blockchain: %v", err)
|
||||
}
|
||||
pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db, nil)
|
||||
pm, err := NewProtocolManager(config, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), ethash.NewFaker(), blockchain, db, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to start test protocol manager: %v", err)
|
||||
}
|
||||
pm.Start(1000)
|
||||
defer pm.Stop()
|
||||
|
||||
// Connect a new peer and check that we receive the DAO challenge
|
||||
// Connect a new peer and check that we receive the checkpoint challenge
|
||||
peer, _ := newTestPeer("peer", eth63, pm, true)
|
||||
defer peer.close()
|
||||
|
||||
challenge := &getBlockHeadersData{
|
||||
Origin: hashOrNumber{Number: config.DAOForkBlock.Uint64()},
|
||||
Amount: 1,
|
||||
Skip: 0,
|
||||
Reverse: false,
|
||||
}
|
||||
if err := p2p.ExpectMsg(peer.app, GetBlockHeadersMsg, challenge); err != nil {
|
||||
t.Fatalf("challenge mismatch: %v", err)
|
||||
}
|
||||
// Create a block to reply to the challenge if no timeout is simulated
|
||||
if !timeout {
|
||||
blocks, _ := core.GenerateChain(¶ms.ChainConfig{}, genesis, ethash.NewFaker(), db, 1, func(i int, block *core.BlockGen) {
|
||||
if remoteForked {
|
||||
block.SetExtra(params.DAOForkBlockExtra)
|
||||
if checkpoint {
|
||||
challenge := &getBlockHeadersData{
|
||||
Origin: hashOrNumber{Number: response.Number.Uint64()},
|
||||
Amount: 1,
|
||||
Skip: 0,
|
||||
Reverse: false,
|
||||
}
|
||||
if err := p2p.ExpectMsg(peer.app, GetBlockHeadersMsg, challenge); err != nil {
|
||||
t.Fatalf("challenge mismatch: %v", err)
|
||||
}
|
||||
// Create a block to reply to the challenge if no timeout is simulated
|
||||
if !timeout {
|
||||
if empty {
|
||||
if err := p2p.Send(peer.app, BlockHeadersMsg, []*types.Header{}); err != nil {
|
||||
t.Fatalf("failed to answer challenge: %v", err)
|
||||
}
|
||||
} else if match {
|
||||
if err := p2p.Send(peer.app, BlockHeadersMsg, []*types.Header{response}); err != nil {
|
||||
t.Fatalf("failed to answer challenge: %v", err)
|
||||
}
|
||||
} else {
|
||||
if err := p2p.Send(peer.app, BlockHeadersMsg, []*types.Header{{Number: response.Number}}); err != nil {
|
||||
t.Fatalf("failed to answer challenge: %v", err)
|
||||
}
|
||||
}
|
||||
})
|
||||
if err := p2p.Send(peer.app, BlockHeadersMsg, []*types.Header{blocks[0].Header()}); err != nil {
|
||||
t.Fatalf("failed to answer challenge: %v", err)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond) // Sleep to avoid the verification racing with the drops
|
||||
} else {
|
||||
// Otherwise wait until the test timeout passes
|
||||
time.Sleep(daoChallengeTimeout + 500*time.Millisecond)
|
||||
}
|
||||
// Verify that depending on fork side, the remote peer is maintained or dropped
|
||||
if localForked == remoteForked && !timeout {
|
||||
if peers := pm.peers.Len(); peers != 1 {
|
||||
t.Fatalf("peer count mismatch: have %d, want %d", peers, 1)
|
||||
}
|
||||
} else {
|
||||
// Wait until the test timeout passes to ensure proper cleanup
|
||||
time.Sleep(syncChallengeTimeout + 100*time.Millisecond)
|
||||
|
||||
// Verify that the remote peer is maintained or dropped
|
||||
if drop {
|
||||
if peers := pm.peers.Len(); peers != 0 {
|
||||
t.Fatalf("peer count mismatch: have %d, want %d", peers, 0)
|
||||
}
|
||||
} else {
|
||||
if peers := pm.peers.Len(); peers != 1 {
|
||||
t.Fatalf("peer count mismatch: have %d, want %d", peers, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -79,7 +79,7 @@ type peer struct {
|
||||
rw p2p.MsgReadWriter
|
||||
|
||||
version int // Protocol version negotiated
|
||||
forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time
|
||||
syncDrop *time.Timer // Timed connection dropper if sync progress isn't validated in time
|
||||
|
||||
head common.Hash
|
||||
td *big.Int
|
||||
|
@@ -188,14 +188,12 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
|
||||
atomic.StoreUint32(&pm.fastSync, 1)
|
||||
mode = downloader.FastSync
|
||||
}
|
||||
|
||||
if mode == downloader.FastSync {
|
||||
// Make sure the peer's total difficulty we are synchronizing is higher.
|
||||
if pm.blockchain.GetTdByHash(pm.blockchain.CurrentFastBlock().Hash()).Cmp(pTd) >= 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Run the sync cycle, and disable fast sync if we've went past the pivot block
|
||||
if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
|
||||
return
|
||||
|
@@ -153,9 +153,12 @@ func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.In
|
||||
if disableClientRemovePeer {
|
||||
removePeer = func(id string) {}
|
||||
}
|
||||
|
||||
if lightSync {
|
||||
manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer)
|
||||
var checkpoint uint64
|
||||
if cht, ok := params.TrustedCheckpoints[blockchain.Genesis().Hash()]; ok {
|
||||
checkpoint = (cht.SectionIndex+1)*params.CHTFrequencyClient - 1
|
||||
}
|
||||
manager.downloader = downloader.New(downloader.LightSync, checkpoint, chainDb, manager.eventMux, nil, blockchain, removePeer)
|
||||
manager.peers.notify((*downloaderPeerNotify)(manager))
|
||||
manager.fetcher = newLightFetcher(manager)
|
||||
}
|
||||
@@ -324,7 +327,11 @@ func (pm *ProtocolManager) handle(p *peer) error {
|
||||
}
|
||||
}
|
||||
|
||||
var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsV1Msg, SendTxMsg, SendTxV2Msg, GetTxStatusMsg, GetHeaderProofsMsg, GetProofsV2Msg, GetHelperTrieProofsMsg}
|
||||
var (
|
||||
reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsV1Msg, SendTxMsg, SendTxV2Msg, GetTxStatusMsg, GetHeaderProofsMsg, GetProofsV2Msg, GetHelperTrieProofsMsg}
|
||||
reqListV1 = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsV1Msg, SendTxMsg, GetHeaderProofsMsg}
|
||||
reqListV2 = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, SendTxV2Msg, GetTxStatusMsg, GetProofsV2Msg, GetHelperTrieProofsMsg}
|
||||
)
|
||||
|
||||
// handleMsg is invoked whenever an inbound message is received from a remote
|
||||
// peer. The remote connection is torn down upon returning any error.
|
||||
|
@@ -508,8 +508,9 @@ func TestTransactionStatusLes2(t *testing.T) {
|
||||
test := func(tx *types.Transaction, send bool, expStatus txStatus) {
|
||||
reqID++
|
||||
if send {
|
||||
cost := peer.GetRequestCost(SendTxV2Msg, 1)
|
||||
sendRequest(peer.app, SendTxV2Msg, reqID, cost, types.Transactions{tx})
|
||||
enc, _ := rlp.EncodeToBytes(types.Transactions{tx})
|
||||
cost := peer.GetTxRelayCost(1, len(enc))
|
||||
sendRequest(peer.app, SendTxV2Msg, reqID, cost, rlp.RawValue(enc))
|
||||
} else {
|
||||
cost := peer.GetRequestCost(GetTxStatusMsg, 1)
|
||||
sendRequest(peer.app, GetTxStatusMsg, reqID, cost, []common.Hash{tx.Hash()})
|
||||
|
61
les/peer.go
61
les/peer.go
@@ -42,6 +42,11 @@ var (
|
||||
|
||||
const maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam)
|
||||
|
||||
// if the total encoded size of a sent transaction batch is over txSizeCostLimit
|
||||
// per transaction then the request cost is calculated as proportional to the
|
||||
// encoded size instead of the transaction count
|
||||
const txSizeCostLimit = 0x4000
|
||||
|
||||
const (
|
||||
announceTypeNone = iota
|
||||
announceTypeSimple
|
||||
@@ -163,7 +168,41 @@ func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
|
||||
cost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(amount)
|
||||
costs := p.fcCosts[msgcode]
|
||||
if costs == nil {
|
||||
return 0
|
||||
}
|
||||
cost := costs.baseCost + costs.reqCost*uint64(amount)
|
||||
if cost > p.fcServerParams.BufLimit {
|
||||
cost = p.fcServerParams.BufLimit
|
||||
}
|
||||
return cost
|
||||
}
|
||||
|
||||
func (p *peer) GetTxRelayCost(amount, size int) uint64 {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
|
||||
var msgcode uint64
|
||||
switch p.version {
|
||||
case lpv1:
|
||||
msgcode = SendTxMsg
|
||||
case lpv2:
|
||||
msgcode = SendTxV2Msg
|
||||
default:
|
||||
panic(nil)
|
||||
}
|
||||
|
||||
costs := p.fcCosts[msgcode]
|
||||
if costs == nil {
|
||||
return 0
|
||||
}
|
||||
cost := costs.baseCost + costs.reqCost*uint64(amount)
|
||||
sizeCost := costs.baseCost + costs.reqCost*uint64(size)/txSizeCostLimit
|
||||
if sizeCost > cost {
|
||||
cost = sizeCost
|
||||
}
|
||||
|
||||
if cost > p.fcServerParams.BufLimit {
|
||||
cost = p.fcServerParams.BufLimit
|
||||
}
|
||||
@@ -307,9 +346,9 @@ func (p *peer) RequestTxStatus(reqID, cost uint64, txHashes []common.Hash) error
|
||||
return sendRequest(p.rw, GetTxStatusMsg, reqID, cost, txHashes)
|
||||
}
|
||||
|
||||
// SendTxStatus sends a batch of transactions to be added to the remote transaction pool.
|
||||
func (p *peer) SendTxs(reqID, cost uint64, txs types.Transactions) error {
|
||||
p.Log().Debug("Fetching batch of transactions", "count", len(txs))
|
||||
// SendTxs sends a batch of transactions to be added to the remote transaction pool.
|
||||
func (p *peer) SendTxs(reqID, cost uint64, txs rlp.RawValue) error {
|
||||
p.Log().Debug("Fetching batch of transactions", "size", len(txs))
|
||||
switch p.version {
|
||||
case lpv1:
|
||||
return p2p.Send(p.rw, SendTxMsg, txs) // old message format does not include reqID
|
||||
@@ -485,6 +524,20 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis
|
||||
p.fcServerParams = params
|
||||
p.fcServer = flowcontrol.NewServerNode(params)
|
||||
p.fcCosts = MRC.decode()
|
||||
var checkList []uint64
|
||||
switch p.version {
|
||||
case lpv1:
|
||||
checkList = reqListV1
|
||||
case lpv2:
|
||||
checkList = reqListV2
|
||||
default:
|
||||
panic(nil)
|
||||
}
|
||||
for _, msgCode := range checkList {
|
||||
if p.fcCosts[msgCode] == nil {
|
||||
return errResp(ErrUselessPeer, "peer does not support message %d", msgCode)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum}
|
||||
|
@@ -21,6 +21,7 @@ import (
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
)
|
||||
|
||||
type ltrInfo struct {
|
||||
@@ -113,21 +114,22 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) {
|
||||
for p, list := range sendTo {
|
||||
pp := p
|
||||
ll := list
|
||||
enc, _ := rlp.EncodeToBytes(ll)
|
||||
|
||||
reqID := genReqID()
|
||||
rq := &distReq{
|
||||
getCost: func(dp distPeer) uint64 {
|
||||
peer := dp.(*peer)
|
||||
return peer.GetRequestCost(SendTxMsg, len(ll))
|
||||
return peer.GetTxRelayCost(len(ll), len(enc))
|
||||
},
|
||||
canSend: func(dp distPeer) bool {
|
||||
return dp.(*peer) == pp
|
||||
},
|
||||
request: func(dp distPeer) func() {
|
||||
peer := dp.(*peer)
|
||||
cost := peer.GetRequestCost(SendTxMsg, len(ll))
|
||||
cost := peer.GetTxRelayCost(len(ll), len(enc))
|
||||
peer.fcServer.QueueRequest(reqID, cost)
|
||||
return func() { peer.SendTxs(reqID, cost, ll) }
|
||||
return func() { peer.SendTxs(reqID, cost, enc) }
|
||||
},
|
||||
}
|
||||
self.reqDist.queue(rq)
|
||||
|
@@ -100,7 +100,7 @@ func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.
|
||||
if bc.genesisBlock == nil {
|
||||
return nil, core.ErrNoGenesis
|
||||
}
|
||||
if cp, ok := trustedCheckpoints[bc.genesisBlock.Hash()]; ok {
|
||||
if cp, ok := params.TrustedCheckpoints[bc.genesisBlock.Hash()]; ok {
|
||||
bc.addTrustedCheckpoint(cp)
|
||||
}
|
||||
if err := bc.loadLastState(); err != nil {
|
||||
|
@@ -104,14 +104,6 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
// trustedCheckpoints associates each known checkpoint with the genesis hash of the chain it belongs to
|
||||
var trustedCheckpoints = map[common.Hash]*params.TrustedCheckpoint{
|
||||
params.MainnetGenesisHash: params.MainnetTrustedCheckpoint,
|
||||
params.TestnetGenesisHash: params.TestnetTrustedCheckpoint,
|
||||
params.RinkebyGenesisHash: params.RinkebyTrustedCheckpoint,
|
||||
params.GoerliGenesisHash: params.GoerliTrustedCheckpoint,
|
||||
}
|
||||
|
||||
var (
|
||||
ErrNoTrustedCht = errors.New("no trusted canonical hash trie")
|
||||
ErrNoTrustedBloomTrie = errors.New("no trusted bloom trie")
|
||||
|
@@ -31,6 +31,15 @@ var (
|
||||
GoerliGenesisHash = common.HexToHash("0xbf7e331f7f7c1dd2e05159666b3bf8bc7a8a3a9eb1d518969eab529dd9b88c1a")
|
||||
)
|
||||
|
||||
// TrustedCheckpoints associates each known checkpoint with the genesis hash of
|
||||
// the chain it belongs to.
|
||||
var TrustedCheckpoints = map[common.Hash]*TrustedCheckpoint{
|
||||
MainnetGenesisHash: MainnetTrustedCheckpoint,
|
||||
TestnetGenesisHash: TestnetTrustedCheckpoint,
|
||||
RinkebyGenesisHash: RinkebyTrustedCheckpoint,
|
||||
GoerliGenesisHash: GoerliTrustedCheckpoint,
|
||||
}
|
||||
|
||||
var (
|
||||
// MainnetChainConfig is the chain parameters to run a node on the main network.
|
||||
MainnetChainConfig = &ChainConfig{
|
||||
|
@@ -23,7 +23,7 @@ import (
|
||||
const (
|
||||
VersionMajor = 1 // Major version component of the current release
|
||||
VersionMinor = 8 // Minor version component of the current release
|
||||
VersionPatch = 24 // Patch version component of the current release
|
||||
VersionPatch = 27 // Patch version component of the current release
|
||||
VersionMeta = "stable" // Version metadata to append to the version string
|
||||
)
|
||||
|
||||
|
@@ -23,7 +23,7 @@ import (
|
||||
const (
|
||||
VersionMajor = 0 // Major version component of the current release
|
||||
VersionMinor = 3 // Minor version component of the current release
|
||||
VersionPatch = 12 // Patch version component of the current release
|
||||
VersionPatch = 15 // Patch version component of the current release
|
||||
VersionMeta = "stable" // Version metadata to append to the version string
|
||||
)
|
||||
|
||||
|
Reference in New Issue
Block a user