Merge pull request #24032 from karalabe/downloader-response-preprocess
eth: pre-process downloader responses on the peer reader thread
This commit is contained in:
@ -102,6 +102,7 @@ type Response struct {
|
||||
|
||||
Req *Request // Original request to cross-reference with
|
||||
Res interface{} // Remote response for the request query
|
||||
Meta interface{} // Metadata generated locally on the receiver thread
|
||||
Time time.Duration // Time it took for the request to be served
|
||||
Done chan error // Channel to signal message handling to the reader
|
||||
}
|
||||
@ -137,7 +138,7 @@ func (p *Peer) dispatchRequest(req *Request) error {
|
||||
|
||||
// dispatchRequest fulfils a pending request and delivers it to the requested
|
||||
// sink.
|
||||
func (p *Peer) dispatchResponse(res *Response) error {
|
||||
func (p *Peer) dispatchResponse(res *Response, metadata func() interface{}) error {
|
||||
resOp := &response{
|
||||
res: res,
|
||||
fail: make(chan error),
|
||||
@ -151,6 +152,11 @@ func (p *Peer) dispatchResponse(res *Response) error {
|
||||
if err := <-resOp.fail; err != nil {
|
||||
return nil
|
||||
}
|
||||
// Request was accepted, run any postprocessing step to generate metadata
|
||||
// on the receiver thread, not the sink thread
|
||||
if metadata != nil {
|
||||
res.Meta = metadata()
|
||||
}
|
||||
// Deliver the filled out response and wait until it's handled. This
|
||||
// path is a bit funky as Go's select has no order, so if a response
|
||||
// arrives to an already cancelled request, there's a 50-50% changes
|
||||
|
@ -286,11 +286,18 @@ func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
|
||||
if err := msg.Decode(res); err != nil {
|
||||
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
|
||||
}
|
||||
metadata := func() interface{} {
|
||||
hashes := make([]common.Hash, len(res.BlockHeadersPacket))
|
||||
for i, header := range res.BlockHeadersPacket {
|
||||
hashes[i] = header.Hash()
|
||||
}
|
||||
return hashes
|
||||
}
|
||||
return peer.dispatchResponse(&Response{
|
||||
id: res.RequestId,
|
||||
code: BlockHeadersMsg,
|
||||
Res: &res.BlockHeadersPacket,
|
||||
})
|
||||
}, metadata)
|
||||
}
|
||||
|
||||
func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
|
||||
@ -299,11 +306,23 @@ func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
|
||||
if err := msg.Decode(res); err != nil {
|
||||
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
|
||||
}
|
||||
metadata := func() interface{} {
|
||||
var (
|
||||
txsHashes = make([]common.Hash, len(res.BlockBodiesPacket))
|
||||
uncleHashes = make([]common.Hash, len(res.BlockBodiesPacket))
|
||||
)
|
||||
hasher := trie.NewStackTrie(nil)
|
||||
for i, body := range res.BlockBodiesPacket {
|
||||
txsHashes[i] = types.DeriveSha(types.Transactions(body.Transactions), hasher)
|
||||
uncleHashes[i] = types.CalcUncleHash(body.Uncles)
|
||||
}
|
||||
return [][]common.Hash{txsHashes, uncleHashes}
|
||||
}
|
||||
return peer.dispatchResponse(&Response{
|
||||
id: res.RequestId,
|
||||
code: BlockBodiesMsg,
|
||||
Res: &res.BlockBodiesPacket,
|
||||
})
|
||||
}, metadata)
|
||||
}
|
||||
|
||||
func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error {
|
||||
@ -316,7 +335,7 @@ func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error {
|
||||
id: res.RequestId,
|
||||
code: NodeDataMsg,
|
||||
Res: &res.NodeDataPacket,
|
||||
})
|
||||
}, nil) // No post-processing, we're not using this packet anymore
|
||||
}
|
||||
|
||||
func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error {
|
||||
@ -325,11 +344,19 @@ func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error {
|
||||
if err := msg.Decode(res); err != nil {
|
||||
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
|
||||
}
|
||||
metadata := func() interface{} {
|
||||
hasher := trie.NewStackTrie(nil)
|
||||
hashes := make([]common.Hash, len(res.ReceiptsPacket))
|
||||
for i, receipt := range res.ReceiptsPacket {
|
||||
hashes[i] = types.DeriveSha(types.Receipts(receipt), hasher)
|
||||
}
|
||||
return hashes
|
||||
}
|
||||
return peer.dispatchResponse(&Response{
|
||||
id: res.RequestId,
|
||||
code: ReceiptsMsg,
|
||||
Res: &res.ReceiptsPacket,
|
||||
})
|
||||
}, metadata)
|
||||
}
|
||||
|
||||
func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error {
|
||||
|
Reference in New Issue
Block a user