Revert "eth: drop eth/65, the last non-reqid protocol version" (#23426)

This commit is contained in:
Péter Szilágyi
2021-08-20 15:14:21 +03:00
committed by GitHub
parent 5566e5d152
commit c368f728c1
15 changed files with 602 additions and 172 deletions

View File

@ -171,21 +171,39 @@ type Decoder interface {
Time() time.Time
}
var eth65 = map[uint64]msgHandler{
GetBlockHeadersMsg: handleGetBlockHeaders,
BlockHeadersMsg: handleBlockHeaders,
GetBlockBodiesMsg: handleGetBlockBodies,
BlockBodiesMsg: handleBlockBodies,
GetNodeDataMsg: handleGetNodeData,
NodeDataMsg: handleNodeData,
GetReceiptsMsg: handleGetReceipts,
ReceiptsMsg: handleReceipts,
NewBlockHashesMsg: handleNewBlockhashes,
NewBlockMsg: handleNewBlock,
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
GetPooledTransactionsMsg: handleGetPooledTransactions,
PooledTransactionsMsg: handlePooledTransactions,
}
var eth66 = map[uint64]msgHandler{
NewBlockHashesMsg: handleNewBlockhashes,
NewBlockMsg: handleNewBlock,
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
GetBlockHeadersMsg: handleGetBlockHeaders66,
BlockHeadersMsg: handleBlockHeaders66,
GetBlockBodiesMsg: handleGetBlockBodies66,
BlockBodiesMsg: handleBlockBodies66,
GetNodeDataMsg: handleGetNodeData66,
NodeDataMsg: handleNodeData66,
GetReceiptsMsg: handleGetReceipts66,
ReceiptsMsg: handleReceipts66,
GetPooledTransactionsMsg: handleGetPooledTransactions66,
PooledTransactionsMsg: handlePooledTransactions66,
// eth66 messages with request-id
GetBlockHeadersMsg: handleGetBlockHeaders66,
BlockHeadersMsg: handleBlockHeaders66,
GetBlockBodiesMsg: handleGetBlockBodies66,
BlockBodiesMsg: handleBlockBodies66,
GetNodeDataMsg: handleGetNodeData66,
NodeDataMsg: handleNodeData66,
GetReceiptsMsg: handleGetReceipts66,
ReceiptsMsg: handleReceipts66,
GetPooledTransactionsMsg: handleGetPooledTransactions66,
PooledTransactionsMsg: handlePooledTransactions66,
}
// handleMessage is invoked whenever an inbound message is received from a remote
@ -201,11 +219,10 @@ func handleMessage(backend Backend, peer *Peer) error {
}
defer msg.Discard()
var handlers = eth66
//if peer.Version() >= ETH67 { // Left in as a sample when new protocol is added
// handlers = eth67
//}
var handlers = eth65
if peer.Version() >= ETH66 {
handlers = eth66
}
// Track the amount of time it takes to serve the request and run the handler
if metrics.Enabled {
h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code)

View File

@ -110,6 +110,7 @@ func (b *testBackend) Handle(*Peer, Packet) error {
}
// Tests that block headers can be retrieved from a remote chain based on user queries.
func TestGetBlockHeaders65(t *testing.T) { testGetBlockHeaders(t, ETH65) }
func TestGetBlockHeaders66(t *testing.T) { testGetBlockHeaders(t, ETH66) }
func testGetBlockHeaders(t *testing.T, protocol uint) {
@ -253,30 +254,44 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
headers = append(headers, backend.chain.GetBlockByHash(hash).Header())
}
// Send the hash request and verify the response
p2p.Send(peer.app, GetBlockHeadersMsg, GetBlockHeadersPacket66{
RequestId: 123,
GetBlockHeadersPacket: tt.query,
})
if err := p2p.ExpectMsg(peer.app, BlockHeadersMsg, BlockHeadersPacket66{
RequestId: 123,
BlockHeadersPacket: headers,
}); err != nil {
t.Errorf("test %d: headers mismatch: %v", i, err)
if protocol <= ETH65 {
p2p.Send(peer.app, GetBlockHeadersMsg, tt.query)
if err := p2p.ExpectMsg(peer.app, BlockHeadersMsg, headers); err != nil {
t.Errorf("test %d: headers mismatch: %v", i, err)
}
} else {
p2p.Send(peer.app, GetBlockHeadersMsg, GetBlockHeadersPacket66{
RequestId: 123,
GetBlockHeadersPacket: tt.query,
})
if err := p2p.ExpectMsg(peer.app, BlockHeadersMsg, BlockHeadersPacket66{
RequestId: 123,
BlockHeadersPacket: headers,
}); err != nil {
t.Errorf("test %d: headers mismatch: %v", i, err)
}
}
// If the test used number origins, repeat with hashes as the too
if tt.query.Origin.Hash == (common.Hash{}) {
if origin := backend.chain.GetBlockByNumber(tt.query.Origin.Number); origin != nil {
tt.query.Origin.Hash, tt.query.Origin.Number = origin.Hash(), 0
p2p.Send(peer.app, GetBlockHeadersMsg, GetBlockHeadersPacket66{
RequestId: 456,
GetBlockHeadersPacket: tt.query,
})
if err := p2p.ExpectMsg(peer.app, BlockHeadersMsg, BlockHeadersPacket66{
RequestId: 456,
BlockHeadersPacket: headers,
}); err != nil {
t.Errorf("test %d: headers mismatch: %v", i, err)
if protocol <= ETH65 {
p2p.Send(peer.app, GetBlockHeadersMsg, tt.query)
if err := p2p.ExpectMsg(peer.app, BlockHeadersMsg, headers); err != nil {
t.Errorf("test %d: headers mismatch: %v", i, err)
}
} else {
p2p.Send(peer.app, GetBlockHeadersMsg, GetBlockHeadersPacket66{
RequestId: 456,
GetBlockHeadersPacket: tt.query,
})
if err := p2p.ExpectMsg(peer.app, BlockHeadersMsg, BlockHeadersPacket66{
RequestId: 456,
BlockHeadersPacket: headers,
}); err != nil {
t.Errorf("test %d: headers mismatch: %v", i, err)
}
}
}
}
@ -284,6 +299,7 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
}
// Tests that block contents can be retrieved from a remote chain based on their hashes.
func TestGetBlockBodies65(t *testing.T) { testGetBlockBodies(t, ETH65) }
func TestGetBlockBodies66(t *testing.T) { testGetBlockBodies(t, ETH66) }
func testGetBlockBodies(t *testing.T, protocol uint) {
@ -353,20 +369,28 @@ func testGetBlockBodies(t *testing.T, protocol uint) {
}
}
// Send the hash request and verify the response
p2p.Send(peer.app, GetBlockBodiesMsg, GetBlockBodiesPacket66{
RequestId: 123,
GetBlockBodiesPacket: hashes,
})
if err := p2p.ExpectMsg(peer.app, BlockBodiesMsg, BlockBodiesPacket66{
RequestId: 123,
BlockBodiesPacket: bodies,
}); err != nil {
t.Errorf("test %d: bodies mismatch: %v", i, err)
if protocol <= ETH65 {
p2p.Send(peer.app, GetBlockBodiesMsg, hashes)
if err := p2p.ExpectMsg(peer.app, BlockBodiesMsg, bodies); err != nil {
t.Errorf("test %d: bodies mismatch: %v", i, err)
}
} else {
p2p.Send(peer.app, GetBlockBodiesMsg, GetBlockBodiesPacket66{
RequestId: 123,
GetBlockBodiesPacket: hashes,
})
if err := p2p.ExpectMsg(peer.app, BlockBodiesMsg, BlockBodiesPacket66{
RequestId: 123,
BlockBodiesPacket: bodies,
}); err != nil {
t.Errorf("test %d: bodies mismatch: %v", i, err)
}
}
}
}
// Tests that the state trie nodes can be retrieved based on hashes.
func TestGetNodeData65(t *testing.T) { testGetNodeData(t, ETH65) }
func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66) }
func testGetNodeData(t *testing.T, protocol uint) {
@ -425,10 +449,14 @@ func testGetNodeData(t *testing.T, protocol uint) {
}
it.Release()
p2p.Send(peer.app, GetNodeDataMsg, GetNodeDataPacket66{
RequestId: 123,
GetNodeDataPacket: hashes,
})
if protocol <= ETH65 {
p2p.Send(peer.app, GetNodeDataMsg, hashes)
} else {
p2p.Send(peer.app, GetNodeDataMsg, GetNodeDataPacket66{
RequestId: 123,
GetNodeDataPacket: hashes,
})
}
msg, err := peer.app.ReadMsg()
if err != nil {
t.Fatalf("failed to read node data response: %v", err)
@ -436,14 +464,18 @@ func testGetNodeData(t *testing.T, protocol uint) {
if msg.Code != NodeDataMsg {
t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, NodeDataMsg)
}
var (
data [][]byte
res NodeDataPacket66
)
if err := msg.Decode(&res); err != nil {
t.Fatalf("failed to decode response node data: %v", err)
var data [][]byte
if protocol <= ETH65 {
if err := msg.Decode(&data); err != nil {
t.Fatalf("failed to decode response node data: %v", err)
}
} else {
var res NodeDataPacket66
if err := msg.Decode(&res); err != nil {
t.Fatalf("failed to decode response node data: %v", err)
}
data = res.NodeDataPacket
}
data = res.NodeDataPacket
// Verify that all hashes correspond to the requested data, and reconstruct a state tree
for i, want := range hashes {
if hash := crypto.Keccak256Hash(data[i]); hash != want {
@ -474,6 +506,7 @@ func testGetNodeData(t *testing.T, protocol uint) {
}
// Tests that the transaction receipts can be retrieved based on hashes.
func TestGetBlockReceipts65(t *testing.T) { testGetBlockReceipts(t, ETH65) }
func TestGetBlockReceipts66(t *testing.T) { testGetBlockReceipts(t, ETH66) }
func testGetBlockReceipts(t *testing.T, protocol uint) {
@ -533,14 +566,21 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
receipts = append(receipts, backend.chain.GetReceiptsByHash(block.Hash()))
}
// Send the hash request and verify the response
p2p.Send(peer.app, GetReceiptsMsg, GetReceiptsPacket66{
RequestId: 123,
GetReceiptsPacket: hashes,
})
if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, ReceiptsPacket66{
RequestId: 123,
ReceiptsPacket: receipts,
}); err != nil {
t.Errorf("receipts mismatch: %v", err)
if protocol <= ETH65 {
p2p.Send(peer.app, GetReceiptsMsg, hashes)
if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, receipts); err != nil {
t.Errorf("receipts mismatch: %v", err)
}
} else {
p2p.Send(peer.app, GetReceiptsMsg, GetReceiptsPacket66{
RequestId: 123,
GetReceiptsPacket: hashes,
})
if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, ReceiptsPacket66{
RequestId: 123,
ReceiptsPacket: receipts,
}); err != nil {
t.Errorf("receipts mismatch: %v", err)
}
}
}

View File

@ -27,6 +27,17 @@ import (
"github.com/ethereum/go-ethereum/trie"
)
// handleGetBlockHeaders handles Block header query, collect the requested headers and reply
func handleGetBlockHeaders(backend Backend, msg Decoder, peer *Peer) error {
// Decode the complex header query
var query GetBlockHeadersPacket
if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
response := answerGetBlockHeadersQuery(backend, &query, peer)
return peer.SendBlockHeaders(response)
}
// handleGetBlockHeaders66 is the eth/66 version of handleGetBlockHeaders
func handleGetBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
// Decode the complex header query
@ -124,6 +135,16 @@ func answerGetBlockHeadersQuery(backend Backend, query *GetBlockHeadersPacket, p
return headers
}
func handleGetBlockBodies(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block body retrieval message
var query GetBlockBodiesPacket
if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
response := answerGetBlockBodiesQuery(backend, query, peer)
return peer.SendBlockBodiesRLP(response)
}
func handleGetBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block body retrieval message
var query GetBlockBodiesPacket66
@ -153,6 +174,16 @@ func answerGetBlockBodiesQuery(backend Backend, query GetBlockBodiesPacket, peer
return bodies
}
func handleGetNodeData(backend Backend, msg Decoder, peer *Peer) error {
// Decode the trie node data retrieval message
var query GetNodeDataPacket
if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
response := answerGetNodeDataQuery(backend, query, peer)
return peer.SendNodeData(response)
}
func handleGetNodeData66(backend Backend, msg Decoder, peer *Peer) error {
// Decode the trie node data retrieval message
var query GetNodeDataPacket66
@ -192,6 +223,16 @@ func answerGetNodeDataQuery(backend Backend, query GetNodeDataPacket, peer *Peer
return nodes
}
func handleGetReceipts(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block receipts retrieval message
var query GetReceiptsPacket
if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
response := answerGetReceiptsQuery(backend, query, peer)
return peer.SendReceiptsRLP(response)
}
func handleGetReceipts66(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block receipts retrieval message
var query GetReceiptsPacket66
@ -271,6 +312,15 @@ func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error {
return backend.Handle(peer, ann)
}
func handleBlockHeaders(backend Backend, msg Decoder, peer *Peer) error {
// A batch of headers arrived to one of our previous requests
res := new(BlockHeadersPacket)
if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
return backend.Handle(peer, res)
}
func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
// A batch of headers arrived to one of our previous requests
res := new(BlockHeadersPacket66)
@ -282,6 +332,15 @@ func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
return backend.Handle(peer, &res.BlockHeadersPacket)
}
func handleBlockBodies(backend Backend, msg Decoder, peer *Peer) error {
// A batch of block bodies arrived to one of our previous requests
res := new(BlockBodiesPacket)
if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
return backend.Handle(peer, res)
}
func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
// A batch of block bodies arrived to one of our previous requests
res := new(BlockBodiesPacket66)
@ -293,6 +352,15 @@ func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
return backend.Handle(peer, &res.BlockBodiesPacket)
}
func handleNodeData(backend Backend, msg Decoder, peer *Peer) error {
// A batch of node state data arrived to one of our previous requests
res := new(NodeDataPacket)
if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
return backend.Handle(peer, res)
}
func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error {
// A batch of node state data arrived to one of our previous requests
res := new(NodeDataPacket66)
@ -304,6 +372,15 @@ func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error {
return backend.Handle(peer, &res.NodeDataPacket)
}
func handleReceipts(backend Backend, msg Decoder, peer *Peer) error {
// A batch of receipts arrived to one of our previous requests
res := new(ReceiptsPacket)
if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
return backend.Handle(peer, res)
}
func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error {
// A batch of receipts arrived to one of our previous requests
res := new(ReceiptsPacket66)
@ -332,6 +409,16 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer)
return backend.Handle(peer, ann)
}
func handleGetPooledTransactions(backend Backend, msg Decoder, peer *Peer) error {
// Decode the pooled transactions retrieval message
var query GetPooledTransactionsPacket
if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
hashes, txs := answerGetPooledTransactions(backend, query, peer)
return peer.SendPooledTransactionsRLP(hashes, txs)
}
func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) error {
// Decode the pooled transactions retrieval message
var query GetPooledTransactionsPacket66
@ -390,6 +477,26 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error {
return backend.Handle(peer, &txs)
}
func handlePooledTransactions(backend Backend, msg Decoder, peer *Peer) error {
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if !backend.AcceptTxs() {
return nil
}
// Transactions can be processed, parse all of them and deliver to the pool
var txs PooledTransactionsPacket
if err := msg.Decode(&txs); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
for i, tx := range txs {
// Validate and mark the remote transaction
if tx == nil {
return fmt.Errorf("%w: transaction %d is nil", errDecode, i)
}
peer.markTransaction(tx.Hash())
}
return backend.Handle(peer, &txs)
}
func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error {
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if !backend.AcceptTxs() {

View File

@ -27,6 +27,7 @@ import (
)
// Tests that handshake failures are detected and reported correctly.
func TestHandshake65(t *testing.T) { testHandshake(t, ETH65) }
func TestHandshake66(t *testing.T) { testHandshake(t, ETH66) }
func testHandshake(t *testing.T, protocol uint) {

View File

@ -108,8 +108,9 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe
// Start up all the broadcasters
go peer.broadcastBlocks()
go peer.broadcastTransactions()
go peer.announceTransactions()
if version >= ETH65 {
go peer.announceTransactions()
}
return peer
}
@ -251,6 +252,22 @@ func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) {
}
}
// SendPooledTransactionsRLP sends requested transactions to the peer and adds the
// hashes in its transaction hash set for future reference.
//
// Note, the method assumes the hashes are correct and correspond to the list of
// transactions being sent.
func (p *Peer) SendPooledTransactionsRLP(hashes []common.Hash, txs []rlp.RawValue) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
p.knownTxs.Pop()
}
for _, hash := range hashes {
p.knownTxs.Add(hash)
}
return p2p.Send(p.rw, PooledTransactionsMsg, txs) // Not packed into PooledTransactionsPacket to avoid RLP decoding
}
// ReplyPooledTransactionsRLP is the eth/66 version of SendPooledTransactionsRLP.
func (p *Peer) ReplyPooledTransactionsRLP(id uint64, hashes []common.Hash, txs []rlp.RawValue) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
@ -329,6 +346,11 @@ func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
}
}
// SendBlockHeaders sends a batch of block headers to the remote peer.
func (p *Peer) SendBlockHeaders(headers []*types.Header) error {
return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket(headers))
}
// ReplyBlockHeaders is the eth/66 version of SendBlockHeaders.
func (p *Peer) ReplyBlockHeaders(id uint64, headers []*types.Header) error {
return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket66{
@ -337,6 +359,12 @@ func (p *Peer) ReplyBlockHeaders(id uint64, headers []*types.Header) error {
})
}
// SendBlockBodiesRLP sends a batch of block contents to the remote peer from
// an already RLP encoded format.
func (p *Peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error {
return p2p.Send(p.rw, BlockBodiesMsg, bodies) // Not packed into BlockBodiesPacket to avoid RLP decoding
}
// ReplyBlockBodiesRLP is the eth/66 version of SendBlockBodiesRLP.
func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error {
// Not packed into BlockBodiesPacket to avoid RLP decoding
@ -346,6 +374,12 @@ func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error {
})
}
// SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the
// hashes requested.
func (p *Peer) SendNodeData(data [][]byte) error {
return p2p.Send(p.rw, NodeDataMsg, NodeDataPacket(data))
}
// ReplyNodeData is the eth/66 response to GetNodeData.
func (p *Peer) ReplyNodeData(id uint64, data [][]byte) error {
return p2p.Send(p.rw, NodeDataMsg, NodeDataPacket66{
@ -354,6 +388,12 @@ func (p *Peer) ReplyNodeData(id uint64, data [][]byte) error {
})
}
// SendReceiptsRLP sends a batch of transaction receipts, corresponding to the
// ones requested from an already RLP encoded format.
func (p *Peer) SendReceiptsRLP(receipts []rlp.RawValue) error {
return p2p.Send(p.rw, ReceiptsMsg, receipts) // Not packed into ReceiptsPacket to avoid RLP decoding
}
// ReplyReceiptsRLP is the eth/66 response to GetReceipts.
func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error {
return p2p.Send(p.rw, ReceiptsMsg, ReceiptsRLPPacket66{
@ -366,102 +406,138 @@ func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error {
// single header. It is used solely by the fetcher.
func (p *Peer) RequestOneHeader(hash common.Hash) error {
p.Log().Debug("Fetching single header", "hash", hash)
id := rand.Uint64()
query := GetBlockHeadersPacket{
Origin: HashOrNumber{Hash: hash},
Amount: uint64(1),
Skip: uint64(0),
Reverse: false,
}
if p.Version() >= ETH66 {
id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: id,
GetBlockHeadersPacket: &GetBlockHeadersPacket{
Origin: HashOrNumber{Hash: hash},
Amount: uint64(1),
Skip: uint64(0),
Reverse: false,
},
})
requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: id,
GetBlockHeadersPacket: &query,
})
}
return p2p.Send(p.rw, GetBlockHeadersMsg, &query)
}
// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
// specified header query, based on the hash of an origin block.
func (p *Peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse)
id := rand.Uint64()
query := GetBlockHeadersPacket{
Origin: HashOrNumber{Hash: origin},
Amount: uint64(amount),
Skip: uint64(skip),
Reverse: reverse,
}
if p.Version() >= ETH66 {
id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: id,
GetBlockHeadersPacket: &GetBlockHeadersPacket{
Origin: HashOrNumber{Hash: origin},
Amount: uint64(amount),
Skip: uint64(skip),
Reverse: reverse,
},
})
requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: id,
GetBlockHeadersPacket: &query,
})
}
return p2p.Send(p.rw, GetBlockHeadersMsg, &query)
}
// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
// specified header query, based on the number of an origin block.
func (p *Peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse)
id := rand.Uint64()
query := GetBlockHeadersPacket{
Origin: HashOrNumber{Number: origin},
Amount: uint64(amount),
Skip: uint64(skip),
Reverse: reverse,
}
if p.Version() >= ETH66 {
id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: id,
GetBlockHeadersPacket: &GetBlockHeadersPacket{
Origin: HashOrNumber{Number: origin},
Amount: uint64(amount),
Skip: uint64(skip),
Reverse: reverse,
},
})
requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: id,
GetBlockHeadersPacket: &query,
})
}
return p2p.Send(p.rw, GetBlockHeadersMsg, &query)
}
// ExpectRequestHeadersByNumber is a testing method to mirror the recipient side
// of the RequestHeadersByNumber operation.
func (p *Peer) ExpectRequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
req := &GetBlockHeadersPacket{
Origin: HashOrNumber{Number: origin},
Amount: uint64(amount),
Skip: uint64(skip),
Reverse: reverse,
}
return p2p.ExpectMsg(p.rw, GetBlockHeadersMsg, req)
}
// RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
// specified.
func (p *Peer) RequestBodies(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
id := rand.Uint64()
if p.Version() >= ETH66 {
id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetBlockBodiesMsg, BlockBodiesMsg, id)
return p2p.Send(p.rw, GetBlockBodiesMsg, &GetBlockBodiesPacket66{
RequestId: id,
GetBlockBodiesPacket: hashes,
})
requestTracker.Track(p.id, p.version, GetBlockBodiesMsg, BlockBodiesMsg, id)
return p2p.Send(p.rw, GetBlockBodiesMsg, &GetBlockBodiesPacket66{
RequestId: id,
GetBlockBodiesPacket: hashes,
})
}
return p2p.Send(p.rw, GetBlockBodiesMsg, GetBlockBodiesPacket(hashes))
}
// RequestNodeData fetches a batch of arbitrary data from a node's known state
// data, corresponding to the specified hashes.
func (p *Peer) RequestNodeData(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of state data", "count", len(hashes))
id := rand.Uint64()
if p.Version() >= ETH66 {
id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetNodeDataMsg, NodeDataMsg, id)
return p2p.Send(p.rw, GetNodeDataMsg, &GetNodeDataPacket66{
RequestId: id,
GetNodeDataPacket: hashes,
})
requestTracker.Track(p.id, p.version, GetNodeDataMsg, NodeDataMsg, id)
return p2p.Send(p.rw, GetNodeDataMsg, &GetNodeDataPacket66{
RequestId: id,
GetNodeDataPacket: hashes,
})
}
return p2p.Send(p.rw, GetNodeDataMsg, GetNodeDataPacket(hashes))
}
// RequestReceipts fetches a batch of transaction receipts from a remote node.
func (p *Peer) RequestReceipts(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
id := rand.Uint64()
if p.Version() >= ETH66 {
id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetReceiptsMsg, ReceiptsMsg, id)
return p2p.Send(p.rw, GetReceiptsMsg, &GetReceiptsPacket66{
RequestId: id,
GetReceiptsPacket: hashes,
})
requestTracker.Track(p.id, p.version, GetReceiptsMsg, ReceiptsMsg, id)
return p2p.Send(p.rw, GetReceiptsMsg, &GetReceiptsPacket66{
RequestId: id,
GetReceiptsPacket: hashes,
})
}
return p2p.Send(p.rw, GetReceiptsMsg, GetReceiptsPacket(hashes))
}
// RequestTxs fetches a batch of transactions from a remote node.
func (p *Peer) RequestTxs(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of transactions", "count", len(hashes))
id := rand.Uint64()
if p.Version() >= ETH66 {
id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id)
return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{
RequestId: id,
GetPooledTransactionsPacket: hashes,
})
requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id)
return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{
RequestId: id,
GetPooledTransactionsPacket: hashes,
})
}
return p2p.Send(p.rw, GetPooledTransactionsMsg, GetPooledTransactionsPacket(hashes))
}

View File

@ -30,6 +30,7 @@ import (
// Constants to match up protocol versions and messages
const (
ETH65 = 65
ETH66 = 66
)
@ -39,28 +40,31 @@ const ProtocolName = "eth"
// ProtocolVersions are the supported versions of the `eth` protocol (first
// is primary).
var ProtocolVersions = []uint{ETH66}
var ProtocolVersions = []uint{ETH66, ETH65}
// protocolLengths are the number of implemented message corresponding to
// different protocol versions.
var protocolLengths = map[uint]uint64{ETH66: 17}
var protocolLengths = map[uint]uint64{ETH66: 17, ETH65: 17}
// maxMessageSize is the maximum cap on the size of a protocol message.
const maxMessageSize = 10 * 1024 * 1024
const (
StatusMsg = 0x00
NewBlockHashesMsg = 0x01
TransactionsMsg = 0x02
GetBlockHeadersMsg = 0x03
BlockHeadersMsg = 0x04
GetBlockBodiesMsg = 0x05
BlockBodiesMsg = 0x06
NewBlockMsg = 0x07
GetNodeDataMsg = 0x0d
NodeDataMsg = 0x0e
GetReceiptsMsg = 0x0f
ReceiptsMsg = 0x10
// Protocol messages in eth/64
StatusMsg = 0x00
NewBlockHashesMsg = 0x01
TransactionsMsg = 0x02
GetBlockHeadersMsg = 0x03
BlockHeadersMsg = 0x04
GetBlockBodiesMsg = 0x05
BlockBodiesMsg = 0x06
NewBlockMsg = 0x07
GetNodeDataMsg = 0x0d
NodeDataMsg = 0x0e
GetReceiptsMsg = 0x0f
ReceiptsMsg = 0x10
// Protocol messages overloaded in eth/65
NewPooledTransactionHashesMsg = 0x08
GetPooledTransactionsMsg = 0x09
PooledTransactionsMsg = 0x0a
@ -124,7 +128,7 @@ type GetBlockHeadersPacket struct {
Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis)
}
// GetBlockHeadersPacket66 represents a block header query over eth/66
// GetBlockHeadersPacket represents a block header query over eth/66
type GetBlockHeadersPacket66 struct {
RequestId uint64
*GetBlockHeadersPacket