core, eth: improve delivery speed on header requests (#23105)

This PR reduces the amount of work we do when answering header queries, e.g. when a peer
is syncing from us.

For some items, e.g block bodies, when we read the rlp-data from database, we plug it
directly into the response package. We didn't do that for headers, but instead read
headers-rlp, decode to types.Header, and re-encode to rlp. This PR changes that to keep it
in RLP-form as much as possible. When a node is syncing from us, it typically requests 192
contiguous headers. On master it has the following effect:

- For headers not in ancient: 2 db lookups. One for translating hash->number (even though
  the request is by number), and another for reading by hash (this latter one is sometimes
  cached).
  
- For headers in ancient: 1 file lookup/syscall for translating hash->number (even though
  the request is by number), and another for reading the header itself. After this, it
  also performes a hashing of the header, to ensure that the hash is what it expected. In
  this PR, I instead move the logic for "give me a sequence of blocks" into the lower
  layers, where the database can determine how and what to read from leveldb and/or
  ancients.

There are basically four types of requests; three of them are improved this way. The
fourth, by hash going backwards, is more tricky to optimize. However, since we know that
the gap is 0, we can look up by the parentHash, and stlil shave off all the number->hash
lookups.

The gapped collection can be optimized similarly, as a follow-up, at least in three out of
four cases.

Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
Martin Holst Swende
2021-12-07 17:50:58 +01:00
committed by GitHub
parent 7f7877a023
commit db03faa10d
13 changed files with 368 additions and 24 deletions

View File

@ -35,9 +35,6 @@ const (
// softResponseLimit is the target maximum size of replies to data retrievals.
softResponseLimit = 2 * 1024 * 1024
// estHeaderSize is the approximate size of an RLP encoded block header.
estHeaderSize = 500
// maxHeadersServe is the maximum number of block headers to serve. This number
// is there to limit the number of disk lookups.
maxHeadersServe = 1024

View File

@ -136,11 +136,13 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
query *GetBlockHeadersPacket // The query to execute for header retrieval
expect []common.Hash // The hashes of the block whose headers are expected
}{
// A single random block should be retrievable by hash and number too
// A single random block should be retrievable by hash
{
&GetBlockHeadersPacket{Origin: HashOrNumber{Hash: backend.chain.GetBlockByNumber(limit / 2).Hash()}, Amount: 1},
[]common.Hash{backend.chain.GetBlockByNumber(limit / 2).Hash()},
}, {
},
// A single random block should be retrievable by number
{
&GetBlockHeadersPacket{Origin: HashOrNumber{Number: limit / 2}, Amount: 1},
[]common.Hash{backend.chain.GetBlockByNumber(limit / 2).Hash()},
},
@ -180,10 +182,15 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
{
&GetBlockHeadersPacket{Origin: HashOrNumber{Number: 0}, Amount: 1},
[]common.Hash{backend.chain.GetBlockByNumber(0).Hash()},
}, {
},
{
&GetBlockHeadersPacket{Origin: HashOrNumber{Number: backend.chain.CurrentBlock().NumberU64()}, Amount: 1},
[]common.Hash{backend.chain.CurrentBlock().Hash()},
},
{ // If the peer requests a bit into the future, we deliver what we have
&GetBlockHeadersPacket{Origin: HashOrNumber{Number: backend.chain.CurrentBlock().NumberU64()}, Amount: 10},
[]common.Hash{backend.chain.CurrentBlock().Hash()},
},
// Ensure protocol limits are honored
{
&GetBlockHeadersPacket{Origin: HashOrNumber{Number: backend.chain.CurrentBlock().NumberU64() - 1}, Amount: limit + 10, Reverse: true},
@ -280,7 +287,7 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
RequestId: 456,
BlockHeadersPacket: headers,
}); err != nil {
t.Errorf("test %d: headers mismatch: %v", i, err)
t.Errorf("test %d by hash: headers mismatch: %v", i, err)
}
}
}

View File

@ -36,12 +36,21 @@ func handleGetBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
response := ServiceGetBlockHeadersQuery(backend.Chain(), query.GetBlockHeadersPacket, peer)
return peer.ReplyBlockHeaders(query.RequestId, response)
return peer.ReplyBlockHeadersRLP(query.RequestId, response)
}
// ServiceGetBlockHeadersQuery assembles the response to a header query. It is
// exposed to allow external packages to test protocol behavior.
func ServiceGetBlockHeadersQuery(chain *core.BlockChain, query *GetBlockHeadersPacket, peer *Peer) []*types.Header {
func ServiceGetBlockHeadersQuery(chain *core.BlockChain, query *GetBlockHeadersPacket, peer *Peer) []rlp.RawValue {
if query.Skip == 0 {
// The fast path: when the request is for a contiguous segment of headers.
return serviceContiguousBlockHeaderQuery(chain, query)
} else {
return serviceNonContiguousBlockHeaderQuery(chain, query, peer)
}
}
func serviceNonContiguousBlockHeaderQuery(chain *core.BlockChain, query *GetBlockHeadersPacket, peer *Peer) []rlp.RawValue {
hashMode := query.Origin.Hash != (common.Hash{})
first := true
maxNonCanonical := uint64(100)
@ -49,7 +58,7 @@ func ServiceGetBlockHeadersQuery(chain *core.BlockChain, query *GetBlockHeadersP
// Gather headers until the fetch or network limits is reached
var (
bytes common.StorageSize
headers []*types.Header
headers []rlp.RawValue
unknown bool
lookups int
)
@ -74,9 +83,12 @@ func ServiceGetBlockHeadersQuery(chain *core.BlockChain, query *GetBlockHeadersP
if origin == nil {
break
}
headers = append(headers, origin)
bytes += estHeaderSize
if rlpData, err := rlp.EncodeToBytes(origin); err != nil {
log.Crit("Unable to decode our own headers", "err", err)
} else {
headers = append(headers, rlp.RawValue(rlpData))
bytes += common.StorageSize(len(rlpData))
}
// Advance to the next header of the query
switch {
case hashMode && query.Reverse:
@ -127,6 +139,69 @@ func ServiceGetBlockHeadersQuery(chain *core.BlockChain, query *GetBlockHeadersP
return headers
}
func serviceContiguousBlockHeaderQuery(chain *core.BlockChain, query *GetBlockHeadersPacket) []rlp.RawValue {
count := query.Amount
if count > maxHeadersServe {
count = maxHeadersServe
}
if query.Origin.Hash == (common.Hash{}) {
// Number mode, just return the canon chain segment. The backend
// delivers in [N, N-1, N-2..] descending order, so we need to
// accommodate for that.
from := query.Origin.Number
if !query.Reverse {
from = from + count - 1
}
headers := chain.GetHeadersFrom(from, count)
if !query.Reverse {
for i, j := 0, len(headers)-1; i < j; i, j = i+1, j-1 {
headers[i], headers[j] = headers[j], headers[i]
}
}
return headers
}
// Hash mode.
var (
headers []rlp.RawValue
hash = query.Origin.Hash
header = chain.GetHeaderByHash(hash)
)
if header != nil {
rlpData, _ := rlp.EncodeToBytes(header)
headers = append(headers, rlpData)
} else {
// We don't even have the origin header
return headers
}
num := header.Number.Uint64()
if !query.Reverse {
// Theoretically, we are tasked to deliver header by hash H, and onwards.
// However, if H is not canon, we will be unable to deliver any descendants of
// H.
if canonHash := chain.GetCanonicalHash(num); canonHash != hash {
// Not canon, we can't deliver descendants
return headers
}
descendants := chain.GetHeadersFrom(num+count-1, count-1)
for i, j := 0, len(descendants)-1; i < j; i, j = i+1, j-1 {
descendants[i], descendants[j] = descendants[j], descendants[i]
}
headers = append(headers, descendants...)
return headers
}
{ // Last mode: deliver ancestors of H
for i := uint64(1); header != nil && i < count; i++ {
header = chain.GetHeaderByHash(header.ParentHash)
if header == nil {
break
}
rlpData, _ := rlp.EncodeToBytes(header)
headers = append(headers, rlpData)
}
return headers
}
}
func handleGetBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block body retrieval message
var query GetBlockBodiesPacket66

View File

@ -297,10 +297,10 @@ func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
}
// ReplyBlockHeaders is the eth/66 version of SendBlockHeaders.
func (p *Peer) ReplyBlockHeaders(id uint64, headers []*types.Header) error {
return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket66{
RequestId: id,
BlockHeadersPacket: headers,
func (p *Peer) ReplyBlockHeadersRLP(id uint64, headers []rlp.RawValue) error {
return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersRLPPacket66{
RequestId: id,
BlockHeadersRLPPacket: headers,
})
}

View File

@ -175,6 +175,16 @@ type BlockHeadersPacket66 struct {
BlockHeadersPacket
}
// BlockHeadersRLPPacket represents a block header response, to use when we already
// have the headers rlp encoded.
type BlockHeadersRLPPacket []rlp.RawValue
// BlockHeadersPacket represents a block header response over eth/66.
type BlockHeadersRLPPacket66 struct {
RequestId uint64
BlockHeadersRLPPacket
}
// NewBlockPacket is the network packet for the block propagation message.
type NewBlockPacket struct {
Block *types.Block