p2p/msgrate: return capacity as integer, clamp to max uint32 (#22943)

* p2p/msgrate: return capacity as integer

* eth/protocols/snap: remove conversions

* p2p/msgrate: add overflow test

* p2p/msgrate: make the capacity overflow test actually overflow

* p2p/msgrate: clamp capacity to max int32

* p2p/msgrate: fix min/max confusion
This commit is contained in:
Felix Lange
2021-05-27 18:43:55 +02:00
committed by GitHub
parent 0703ef62d3
commit 427175153c
4 changed files with 79 additions and 41 deletions

View File

@ -21,7 +21,6 @@ package downloader
import (
"errors"
"math"
"math/big"
"sort"
"sync"
@ -232,7 +231,7 @@ func (p *peerConnection) SetNodeDataIdle(delivered int, deliveryTime time.Time)
// HeaderCapacity retrieves the peers header download allowance based on its
// previously discovered throughput.
func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int {
cap := int(math.Ceil(p.rates.Capacity(eth.BlockHeadersMsg, targetRTT)))
cap := p.rates.Capacity(eth.BlockHeadersMsg, targetRTT)
if cap > MaxHeaderFetch {
cap = MaxHeaderFetch
}
@ -242,7 +241,7 @@ func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int {
// BlockCapacity retrieves the peers block download allowance based on its
// previously discovered throughput.
func (p *peerConnection) BlockCapacity(targetRTT time.Duration) int {
cap := int(math.Ceil(p.rates.Capacity(eth.BlockBodiesMsg, targetRTT)))
cap := p.rates.Capacity(eth.BlockBodiesMsg, targetRTT)
if cap > MaxBlockFetch {
cap = MaxBlockFetch
}
@ -252,7 +251,7 @@ func (p *peerConnection) BlockCapacity(targetRTT time.Duration) int {
// ReceiptCapacity retrieves the peers receipt download allowance based on its
// previously discovered throughput.
func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int {
cap := int(math.Ceil(p.rates.Capacity(eth.ReceiptsMsg, targetRTT)))
cap := p.rates.Capacity(eth.ReceiptsMsg, targetRTT)
if cap > MaxReceiptFetch {
cap = MaxReceiptFetch
}
@ -262,7 +261,7 @@ func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int {
// NodeDataCapacity retrieves the peers state download allowance based on its
// previously discovered throughput.
func (p *peerConnection) NodeDataCapacity(targetRTT time.Duration) int {
cap := int(math.Ceil(p.rates.Capacity(eth.NodeDataMsg, targetRTT)))
cap := p.rates.Capacity(eth.NodeDataMsg, targetRTT)
if cap > MaxStateFetch {
cap = MaxStateFetch
}
@ -411,7 +410,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.headerIdle) == 0
}
throughput := func(p *peerConnection) float64 {
throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.BlockHeadersMsg, time.Second)
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
@ -423,7 +422,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.blockIdle) == 0
}
throughput := func(p *peerConnection) float64 {
throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.BlockBodiesMsg, time.Second)
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
@ -435,7 +434,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.receiptIdle) == 0
}
throughput := func(p *peerConnection) float64 {
throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.ReceiptsMsg, time.Second)
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
@ -447,7 +446,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.stateIdle) == 0
}
throughput := func(p *peerConnection) float64 {
throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.NodeDataMsg, time.Second)
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
@ -455,45 +454,48 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
// idlePeers retrieves a flat list of all currently idle peers satisfying the
// protocol version constraints, using the provided function to check idleness.
// The resulting set of peers are sorted by their measure throughput.
func (ps *peerSet) idlePeers(minProtocol, maxProtocol uint, idleCheck func(*peerConnection) bool, throughput func(*peerConnection) float64) ([]*peerConnection, int) {
// The resulting set of peers are sorted by their capacity.
func (ps *peerSet) idlePeers(minProtocol, maxProtocol uint, idleCheck func(*peerConnection) bool, capacity func(*peerConnection) int) ([]*peerConnection, int) {
ps.lock.RLock()
defer ps.lock.RUnlock()
idle, total := make([]*peerConnection, 0, len(ps.peers)), 0
tps := make([]float64, 0, len(ps.peers))
var (
total = 0
idle = make([]*peerConnection, 0, len(ps.peers))
tps = make([]int, 0, len(ps.peers))
)
for _, p := range ps.peers {
if p.version >= minProtocol && p.version <= maxProtocol {
if idleCheck(p) {
idle = append(idle, p)
tps = append(tps, throughput(p))
tps = append(tps, capacity(p))
}
total++
}
}
// And sort them
sortPeers := &peerThroughputSort{idle, tps}
sortPeers := &peerCapacitySort{idle, tps}
sort.Sort(sortPeers)
return sortPeers.p, total
}
// peerThroughputSort implements the Sort interface, and allows for
// sorting a set of peers by their throughput
// The sorted data is with the _highest_ throughput first
type peerThroughputSort struct {
// peerCapacitySort implements sort.Interface.
// It sorts peer connections by capacity (descending).
type peerCapacitySort struct {
p []*peerConnection
tp []float64
tp []int
}
func (ps *peerThroughputSort) Len() int {
func (ps *peerCapacitySort) Len() int {
return len(ps.p)
}
func (ps *peerThroughputSort) Less(i, j int) bool {
func (ps *peerCapacitySort) Less(i, j int) bool {
return ps.tp[i] > ps.tp[j]
}
func (ps *peerThroughputSort) Swap(i, j int) {
func (ps *peerCapacitySort) Swap(i, j int) {
ps.p[i], ps.p[j] = ps.p[j], ps.p[i]
ps.tp[i], ps.tp[j] = ps.tp[j], ps.tp[i]
}

View File

@ -861,7 +861,7 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac
// Sort the peers by download capacity to use faster ones if many available
idlers := &capacitySort{
ids: make([]string, 0, len(s.accountIdlers)),
caps: make([]float64, 0, len(s.accountIdlers)),
caps: make([]int, 0, len(s.accountIdlers)),
}
targetTTL := s.rates.TargetTimeout()
for id := range s.accountIdlers {
@ -958,7 +958,7 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan *
// Sort the peers by download capacity to use faster ones if many available
idlers := &capacitySort{
ids: make([]string, 0, len(s.bytecodeIdlers)),
caps: make([]float64, 0, len(s.bytecodeIdlers)),
caps: make([]int, 0, len(s.bytecodeIdlers)),
}
targetTTL := s.rates.TargetTimeout()
for id := range s.bytecodeIdlers {
@ -1012,11 +1012,11 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan *
if cap > maxCodeRequestCount {
cap = maxCodeRequestCount
}
hashes := make([]common.Hash, 0, int(cap))
hashes := make([]common.Hash, 0, cap)
for hash := range task.codeTasks {
delete(task.codeTasks, hash)
hashes = append(hashes, hash)
if len(hashes) >= int(cap) {
if len(hashes) >= cap {
break
}
}
@ -1061,7 +1061,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
// Sort the peers by download capacity to use faster ones if many available
idlers := &capacitySort{
ids: make([]string, 0, len(s.storageIdlers)),
caps: make([]float64, 0, len(s.storageIdlers)),
caps: make([]int, 0, len(s.storageIdlers)),
}
targetTTL := s.rates.TargetTimeout()
for id := range s.storageIdlers {
@ -1120,7 +1120,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
if cap < minRequestSize { // Don't bother with peers below a bare minimum performance
cap = minRequestSize
}
storageSets := int(cap / 1024)
storageSets := cap / 1024
var (
accounts = make([]common.Hash, 0, storageSets)
@ -1217,7 +1217,7 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
// Sort the peers by download capacity to use faster ones if many available
idlers := &capacitySort{
ids: make([]string, 0, len(s.trienodeHealIdlers)),
caps: make([]float64, 0, len(s.trienodeHealIdlers)),
caps: make([]int, 0, len(s.trienodeHealIdlers)),
}
targetTTL := s.rates.TargetTimeout()
for id := range s.trienodeHealIdlers {
@ -1284,9 +1284,9 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
cap = maxTrieRequestCount
}
var (
hashes = make([]common.Hash, 0, int(cap))
paths = make([]trie.SyncPath, 0, int(cap))
pathsets = make([]TrieNodePathSet, 0, int(cap))
hashes = make([]common.Hash, 0, cap)
paths = make([]trie.SyncPath, 0, cap)
pathsets = make([]TrieNodePathSet, 0, cap)
)
for hash, pathset := range s.healer.trieTasks {
delete(s.healer.trieTasks, hash)
@ -1295,7 +1295,7 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
paths = append(paths, pathset)
pathsets = append(pathsets, [][]byte(pathset)) // TODO(karalabe): group requests by account hash
if len(hashes) >= int(cap) {
if len(hashes) >= cap {
break
}
}
@ -1341,7 +1341,7 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai
// Sort the peers by download capacity to use faster ones if many available
idlers := &capacitySort{
ids: make([]string, 0, len(s.bytecodeHealIdlers)),
caps: make([]float64, 0, len(s.bytecodeHealIdlers)),
caps: make([]int, 0, len(s.bytecodeHealIdlers)),
}
targetTTL := s.rates.TargetTimeout()
for id := range s.bytecodeHealIdlers {
@ -1407,12 +1407,12 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai
if cap > maxCodeRequestCount {
cap = maxCodeRequestCount
}
hashes := make([]common.Hash, 0, int(cap))
hashes := make([]common.Hash, 0, cap)
for hash := range s.healer.codeTasks {
delete(s.healer.codeTasks, hash)
hashes = append(hashes, hash)
if len(hashes) >= int(cap) {
if len(hashes) >= cap {
break
}
}
@ -2852,7 +2852,7 @@ func estimateRemainingSlots(hashes int, last common.Hash) (uint64, error) {
// of highest capacity being at the front.
type capacitySort struct {
ids []string
caps []float64
caps []int
}
func (s *capacitySort) Len() int {