les, light: remove untrusted header retrieval in ODR (#21907)
* les, light: remove untrusted header retrieval in ODR * les: polish * light: check the hash equality in odr
This commit is contained in:
@ -17,6 +17,7 @@
|
||||
package les
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/big"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -200,14 +201,23 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
|
||||
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
|
||||
p.answeredRequest(resp.ReqID)
|
||||
|
||||
// Filter out any explicitly requested headers, deliver the rest to the downloader
|
||||
filter := len(headers) == 1
|
||||
if filter {
|
||||
headers = h.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
|
||||
}
|
||||
if len(headers) != 0 || !filter {
|
||||
if err := h.downloader.DeliverHeaders(p.id, headers); err != nil {
|
||||
log.Debug("Failed to deliver headers", "err", err)
|
||||
// Filter out the explicitly requested header by the retriever
|
||||
if h.backend.retriever.requested(resp.ReqID) {
|
||||
deliverMsg = &Msg{
|
||||
MsgType: MsgBlockHeaders,
|
||||
ReqID: resp.ReqID,
|
||||
Obj: resp.Headers,
|
||||
}
|
||||
} else {
|
||||
// Filter out any explicitly requested headers, deliver the rest to the downloader
|
||||
filter := len(headers) == 1
|
||||
if filter {
|
||||
headers = h.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
|
||||
}
|
||||
if len(headers) != 0 || !filter {
|
||||
if err := h.downloader.DeliverHeaders(p.id, headers); err != nil {
|
||||
log.Debug("Failed to deliver headers", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
case BlockBodiesMsg:
|
||||
@ -394,6 +404,42 @@ func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip
|
||||
return nil
|
||||
}
|
||||
|
||||
// RetrieveSingleHeaderByNumber requests a single header by the specified block
|
||||
// number. This function will wait the response until it's timeout or delivered.
|
||||
func (pc *peerConnection) RetrieveSingleHeaderByNumber(context context.Context, number uint64) (*types.Header, error) {
|
||||
reqID := genReqID()
|
||||
rq := &distReq{
|
||||
getCost: func(dp distPeer) uint64 {
|
||||
peer := dp.(*serverPeer)
|
||||
return peer.getRequestCost(GetBlockHeadersMsg, 1)
|
||||
},
|
||||
canSend: func(dp distPeer) bool {
|
||||
return dp.(*serverPeer) == pc.peer
|
||||
},
|
||||
request: func(dp distPeer) func() {
|
||||
peer := dp.(*serverPeer)
|
||||
cost := peer.getRequestCost(GetBlockHeadersMsg, 1)
|
||||
peer.fcServer.QueuedRequest(reqID, cost)
|
||||
return func() { peer.requestHeadersByNumber(reqID, number, 1, 0, false) }
|
||||
},
|
||||
}
|
||||
var header *types.Header
|
||||
if err := pc.handler.backend.retriever.retrieve(context, reqID, rq, func(peer distPeer, msg *Msg) error {
|
||||
if msg.MsgType != MsgBlockHeaders {
|
||||
return errInvalidMessageType
|
||||
}
|
||||
headers := msg.Obj.([]*types.Header)
|
||||
if len(headers) != 1 {
|
||||
return errInvalidEntryCount
|
||||
}
|
||||
header = headers[0]
|
||||
return nil
|
||||
}, nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return header, nil
|
||||
}
|
||||
|
||||
// downloaderPeerNotify implements peerSetNotify
|
||||
type downloaderPeerNotify clientHandler
|
||||
|
||||
|
22
les/odr.go
22
les/odr.go
@ -24,7 +24,6 @@ import (
|
||||
"github.com/ethereum/go-ethereum/core"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/light"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
// LesOdr implements light.OdrBackend
|
||||
@ -83,7 +82,8 @@ func (odr *LesOdr) IndexerConfig() *light.IndexerConfig {
|
||||
}
|
||||
|
||||
const (
|
||||
MsgBlockBodies = iota
|
||||
MsgBlockHeaders = iota
|
||||
MsgBlockBodies
|
||||
MsgCode
|
||||
MsgReceipts
|
||||
MsgProofsV2
|
||||
@ -122,13 +122,17 @@ func (odr *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err erro
|
||||
return func() { lreq.Request(reqID, p) }
|
||||
},
|
||||
}
|
||||
sent := mclock.Now()
|
||||
if err = odr.retriever.retrieve(ctx, reqID, rq, func(p distPeer, msg *Msg) error { return lreq.Validate(odr.db, msg) }, odr.stop); err == nil {
|
||||
// retrieved from network, store in db
|
||||
req.StoreResult(odr.db)
|
||||
|
||||
defer func(sent mclock.AbsTime) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
requestRTT.Update(time.Duration(mclock.Now() - sent))
|
||||
} else {
|
||||
log.Debug("Failed to retrieve data from network", "err", err)
|
||||
}(mclock.Now())
|
||||
|
||||
if err := odr.retriever.retrieve(ctx, reqID, rq, func(p distPeer, msg *Msg) error { return lreq.Validate(odr.db, msg) }, odr.stop); err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
req.StoreResult(odr.db)
|
||||
return nil
|
||||
}
|
||||
|
@ -327,9 +327,6 @@ func (r *ChtRequest) CanSend(peer *serverPeer) bool {
|
||||
peer.lock.RLock()
|
||||
defer peer.lock.RUnlock()
|
||||
|
||||
if r.Untrusted {
|
||||
return peer.headInfo.Number >= r.BlockNum && peer.id == r.PeerId
|
||||
}
|
||||
return peer.headInfo.Number >= r.Config.ChtConfirms && r.ChtNum <= (peer.headInfo.Number-r.Config.ChtConfirms)/r.Config.ChtSize
|
||||
}
|
||||
|
||||
@ -369,39 +366,34 @@ func (r *ChtRequest) Validate(db ethdb.Database, msg *Msg) error {
|
||||
if err := rlp.DecodeBytes(headerEnc, header); err != nil {
|
||||
return errHeaderUnavailable
|
||||
}
|
||||
|
||||
// Verify the CHT
|
||||
// Note: For untrusted CHT request, there is no proof response but
|
||||
// header data.
|
||||
var node light.ChtNode
|
||||
if !r.Untrusted {
|
||||
var encNumber [8]byte
|
||||
binary.BigEndian.PutUint64(encNumber[:], r.BlockNum)
|
||||
var (
|
||||
node light.ChtNode
|
||||
encNumber [8]byte
|
||||
)
|
||||
binary.BigEndian.PutUint64(encNumber[:], r.BlockNum)
|
||||
|
||||
reads := &readTraceDB{db: nodeSet}
|
||||
value, err := trie.VerifyProof(r.ChtRoot, encNumber[:], reads)
|
||||
if err != nil {
|
||||
return fmt.Errorf("merkle proof verification failed: %v", err)
|
||||
}
|
||||
if len(reads.reads) != nodeSet.KeyCount() {
|
||||
return errUselessNodes
|
||||
}
|
||||
|
||||
if err := rlp.DecodeBytes(value, &node); err != nil {
|
||||
return err
|
||||
}
|
||||
if node.Hash != header.Hash() {
|
||||
return errCHTHashMismatch
|
||||
}
|
||||
if r.BlockNum != header.Number.Uint64() {
|
||||
return errCHTNumberMismatch
|
||||
}
|
||||
reads := &readTraceDB{db: nodeSet}
|
||||
value, err := trie.VerifyProof(r.ChtRoot, encNumber[:], reads)
|
||||
if err != nil {
|
||||
return fmt.Errorf("merkle proof verification failed: %v", err)
|
||||
}
|
||||
if len(reads.reads) != nodeSet.KeyCount() {
|
||||
return errUselessNodes
|
||||
}
|
||||
if err := rlp.DecodeBytes(value, &node); err != nil {
|
||||
return err
|
||||
}
|
||||
if node.Hash != header.Hash() {
|
||||
return errCHTHashMismatch
|
||||
}
|
||||
if r.BlockNum != header.Number.Uint64() {
|
||||
return errCHTNumberMismatch
|
||||
}
|
||||
// Verifications passed, store and return
|
||||
r.Header = header
|
||||
r.Proof = nodeSet
|
||||
r.Td = node.Td // For untrusted request, td here is nil, todo improve the les/2 protocol
|
||||
|
||||
r.Td = node.Td
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -155,6 +155,15 @@ func (rm *retrieveManager) sendReq(reqID uint64, req *distReq, val validatorFunc
|
||||
return r
|
||||
}
|
||||
|
||||
// requested reports whether the request with given reqid is sent by the retriever.
|
||||
func (rm *retrieveManager) requested(reqId uint64) bool {
|
||||
rm.lock.RLock()
|
||||
defer rm.lock.RUnlock()
|
||||
|
||||
_, ok := rm.sentReqs[reqId]
|
||||
return ok
|
||||
}
|
||||
|
||||
// deliver is called by the LES protocol manager to deliver reply messages to waiting requests
|
||||
func (rm *retrieveManager) deliver(peer distPeer, msg *Msg) error {
|
||||
rm.lock.RLock()
|
||||
|
@ -56,8 +56,8 @@ func (h *clientHandler) validateCheckpoint(peer *serverPeer) error {
|
||||
defer cancel()
|
||||
|
||||
// Fetch the block header corresponding to the checkpoint registration.
|
||||
cp := peer.checkpoint
|
||||
header, err := light.GetUntrustedHeaderByNumber(ctx, h.backend.odr, peer.checkpointNumber, peer.id)
|
||||
wrapPeer := &peerConnection{handler: h, peer: peer}
|
||||
header, err := wrapPeer.RetrieveSingleHeaderByNumber(ctx, peer.checkpointNumber)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -66,7 +66,7 @@ func (h *clientHandler) validateCheckpoint(peer *serverPeer) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
events := h.backend.oracle.Contract().LookupCheckpointEvents(logs, cp.SectionIndex, cp.Hash())
|
||||
events := h.backend.oracle.Contract().LookupCheckpointEvents(logs, peer.checkpoint.SectionIndex, peer.checkpoint.Hash())
|
||||
if len(events) == 0 {
|
||||
return errInvalidCheckpoint
|
||||
}
|
||||
|
@ -53,7 +53,7 @@ func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
// Generate 512+4 blocks (totally 1 CHT sections)
|
||||
// Generate 128+1 blocks (totally 1 CHT sections)
|
||||
server, client, tearDown := newClientServerEnv(t, int(config.ChtSize+config.ChtConfirms), protocol, waitIndexers, nil, 0, false, false, true)
|
||||
defer tearDown()
|
||||
|
||||
|
Reference in New Issue
Block a user