Merge pull request #19437 from zsfelfoldi/fix-sendtx

les: fix SendTx cost calculation and verify cost table
This commit is contained in:
Péter Szilágyi 2019-04-10 15:45:13 +03:00 committed by GitHub
commit cdae1c59ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 72 additions and 12 deletions

View File

@ -324,7 +324,11 @@ func (pm *ProtocolManager) handle(p *peer) error {
} }
} }
var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsV1Msg, SendTxMsg, SendTxV2Msg, GetTxStatusMsg, GetHeaderProofsMsg, GetProofsV2Msg, GetHelperTrieProofsMsg} var (
reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsV1Msg, SendTxMsg, SendTxV2Msg, GetTxStatusMsg, GetHeaderProofsMsg, GetProofsV2Msg, GetHelperTrieProofsMsg}
reqListV1 = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsV1Msg, SendTxMsg, GetHeaderProofsMsg}
reqListV2 = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, SendTxV2Msg, GetTxStatusMsg, GetProofsV2Msg, GetHelperTrieProofsMsg}
)
// handleMsg is invoked whenever an inbound message is received from a remote // handleMsg is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error. // peer. The remote connection is torn down upon returning any error.

View File

@ -508,8 +508,9 @@ func TestTransactionStatusLes2(t *testing.T) {
test := func(tx *types.Transaction, send bool, expStatus txStatus) { test := func(tx *types.Transaction, send bool, expStatus txStatus) {
reqID++ reqID++
if send { if send {
cost := peer.GetRequestCost(SendTxV2Msg, 1) enc, _ := rlp.EncodeToBytes(types.Transactions{tx})
sendRequest(peer.app, SendTxV2Msg, reqID, cost, types.Transactions{tx}) cost := peer.GetTxRelayCost(1, len(enc))
sendRequest(peer.app, SendTxV2Msg, reqID, cost, rlp.RawValue(enc))
} else { } else {
cost := peer.GetRequestCost(GetTxStatusMsg, 1) cost := peer.GetRequestCost(GetTxStatusMsg, 1)
sendRequest(peer.app, GetTxStatusMsg, reqID, cost, []common.Hash{tx.Hash()}) sendRequest(peer.app, GetTxStatusMsg, reqID, cost, []common.Hash{tx.Hash()})

View File

@ -42,6 +42,11 @@ var (
const maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam) const maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam)
// if the total encoded size of a sent transaction batch is over txSizeCostLimit
// per transaction then the request cost is calculated as proportional to the
// encoded size instead of the transaction count
const txSizeCostLimit = 0x4000
const ( const (
announceTypeNone = iota announceTypeNone = iota
announceTypeSimple announceTypeSimple
@ -163,7 +168,41 @@ func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
cost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(amount) costs := p.fcCosts[msgcode]
if costs == nil {
return 0
}
cost := costs.baseCost + costs.reqCost*uint64(amount)
if cost > p.fcServerParams.BufLimit {
cost = p.fcServerParams.BufLimit
}
return cost
}
func (p *peer) GetTxRelayCost(amount, size int) uint64 {
p.lock.RLock()
defer p.lock.RUnlock()
var msgcode uint64
switch p.version {
case lpv1:
msgcode = SendTxMsg
case lpv2:
msgcode = SendTxV2Msg
default:
panic(nil)
}
costs := p.fcCosts[msgcode]
if costs == nil {
return 0
}
cost := costs.baseCost + costs.reqCost*uint64(amount)
sizeCost := costs.baseCost + costs.reqCost*uint64(size)/txSizeCostLimit
if sizeCost > cost {
cost = sizeCost
}
if cost > p.fcServerParams.BufLimit { if cost > p.fcServerParams.BufLimit {
cost = p.fcServerParams.BufLimit cost = p.fcServerParams.BufLimit
} }
@ -307,9 +346,9 @@ func (p *peer) RequestTxStatus(reqID, cost uint64, txHashes []common.Hash) error
return sendRequest(p.rw, GetTxStatusMsg, reqID, cost, txHashes) return sendRequest(p.rw, GetTxStatusMsg, reqID, cost, txHashes)
} }
// SendTxStatus sends a batch of transactions to be added to the remote transaction pool. // SendTxs sends a batch of transactions to be added to the remote transaction pool.
func (p *peer) SendTxs(reqID, cost uint64, txs types.Transactions) error { func (p *peer) SendTxs(reqID, cost uint64, txs rlp.RawValue) error {
p.Log().Debug("Fetching batch of transactions", "count", len(txs)) p.Log().Debug("Fetching batch of transactions", "size", len(txs))
switch p.version { switch p.version {
case lpv1: case lpv1:
return p2p.Send(p.rw, SendTxMsg, txs) // old message format does not include reqID return p2p.Send(p.rw, SendTxMsg, txs) // old message format does not include reqID
@ -485,6 +524,20 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis
p.fcServerParams = params p.fcServerParams = params
p.fcServer = flowcontrol.NewServerNode(params) p.fcServer = flowcontrol.NewServerNode(params)
p.fcCosts = MRC.decode() p.fcCosts = MRC.decode()
var checkList []uint64
switch p.version {
case lpv1:
checkList = reqListV1
case lpv2:
checkList = reqListV2
default:
panic(nil)
}
for _, msgCode := range checkList {
if p.fcCosts[msgCode] == nil {
return errResp(ErrUselessPeer, "peer does not support message %d", msgCode)
}
}
} }
p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum} p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum}

View File

@ -21,6 +21,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
) )
type ltrInfo struct { type ltrInfo struct {
@ -113,21 +114,22 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) {
for p, list := range sendTo { for p, list := range sendTo {
pp := p pp := p
ll := list ll := list
enc, _ := rlp.EncodeToBytes(ll)
reqID := genReqID() reqID := genReqID()
rq := &distReq{ rq := &distReq{
getCost: func(dp distPeer) uint64 { getCost: func(dp distPeer) uint64 {
peer := dp.(*peer) peer := dp.(*peer)
return peer.GetRequestCost(SendTxMsg, len(ll)) return peer.GetTxRelayCost(len(ll), len(enc))
}, },
canSend: func(dp distPeer) bool { canSend: func(dp distPeer) bool {
return dp.(*peer) == pp return dp.(*peer) == pp
}, },
request: func(dp distPeer) func() { request: func(dp distPeer) func() {
peer := dp.(*peer) peer := dp.(*peer)
cost := peer.GetRequestCost(SendTxMsg, len(ll)) cost := peer.GetTxRelayCost(len(ll), len(enc))
peer.fcServer.QueueRequest(reqID, cost) peer.fcServer.QueueRequest(reqID, cost)
return func() { peer.SendTxs(reqID, cost, ll) } return func() { peer.SendTxs(reqID, cost, enc) }
}, },
} }
self.reqDist.queue(rq) self.reqDist.queue(rq)

View File

@ -23,7 +23,7 @@ import (
const ( const (
VersionMajor = 1 // Major version component of the current release VersionMajor = 1 // Major version component of the current release
VersionMinor = 8 // Minor version component of the current release VersionMinor = 8 // Minor version component of the current release
VersionPatch = 25 // Patch version component of the current release VersionPatch = 26 // Patch version component of the current release
VersionMeta = "stable" // Version metadata to append to the version string VersionMeta = "stable" // Version metadata to append to the version string
) )

View File

@ -23,7 +23,7 @@ import (
const ( const (
VersionMajor = 0 // Major version component of the current release VersionMajor = 0 // Major version component of the current release
VersionMinor = 3 // Minor version component of the current release VersionMinor = 3 // Minor version component of the current release
VersionPatch = 13 // Patch version component of the current release VersionPatch = 14 // Patch version component of the current release
VersionMeta = "stable" // Version metadata to append to the version string VersionMeta = "stable" // Version metadata to append to the version string
) )