Files
.circleci
.github
accounts
build
cmd
common
consensus
console
contracts
core
crypto
docs
eth
ethclient
ethdb
ethstats
event
graphql
internal
les
light
log
metrics
miner
mobile
node
p2p
discover
dnsdisc
enode
enr
msgrate
nat
netutil
nodestate
rlpx
simulations
tracker
tracker.go
dial.go
dial_test.go
message.go
message_test.go
metrics.go
peer.go
peer_error.go
peer_test.go
protocol.go
server.go
server_test.go
transport.go
transport_test.go
util.go
util_test.go
params
rlp
rpc
signer
swarm
tests
trie
.dockerignore
.gitattributes
.gitignore
.gitmodules
.golangci.yml
.mailmap
.travis.yml
AUTHORS
COPYING
COPYING.LESSER
Dockerfile
Dockerfile.alltools
Makefile
README.md
SECURITY.md
appveyor.yml
circle.yml
go.mod
go.sum
interfaces.go
oss-fuzz.sh

206 lines
6.9 KiB
Go
Raw Permalink Normal View History

// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package tracker
import (
"container/list"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)
const (
// trackedGaugeName is the prefix of the per-packet request tracking.
trackedGaugeName = "p2p/tracked"
// lostMeterName is the prefix of the per-packet request expirations.
lostMeterName = "p2p/lost"
// staleMeterName is the prefix of the per-packet stale responses.
staleMeterName = "p2p/stale"
// waitHistName is the prefix of the per-packet (req only) waiting time histograms.
waitHistName = "p2p/wait"
// maxTrackedPackets is a huge number to act as a failsafe on the number of
// pending requests the node will track. It should never be hit unless an
// attacker figures out a way to spin requests.
maxTrackedPackets = 100000
)
// request tracks sent network requests which have not yet received a response.
type request struct {
peer string
version uint // Protocol version
reqCode uint64 // Protocol message code of the request
resCode uint64 // Protocol message code of the expected response
time time.Time // Timestamp when the request was made
expire *list.Element // Expiration marker to untrack it
}
// Tracker is a pending network request tracker to measure how much time it takes
// a remote peer to respond.
type Tracker struct {
protocol string // Protocol capability identifier for the metrics
timeout time.Duration // Global timeout after which to drop a tracked packet
pending map[uint64]*request // Currently pending requests
expire *list.List // Linked list tracking the expiration order
wake *time.Timer // Timer tracking the expiration of the next item
lock sync.Mutex // Lock protecting from concurrent updates
}
// New creates a new network request tracker to monitor how much time it takes to
// fill certain requests and how individual peers perform.
func New(protocol string, timeout time.Duration) *Tracker {
return &Tracker{
protocol: protocol,
timeout: timeout,
pending: make(map[uint64]*request),
expire: list.New(),
}
}
// Track adds a network request to the tracker to wait for a response to arrive
// or until the request it cancelled or times out.
func (t *Tracker) Track(peer string, version uint, reqCode uint64, resCode uint64, id uint64) {
if !metrics.Enabled {
return
}
t.lock.Lock()
defer t.lock.Unlock()
// If there's a duplicate request, we've just random-collided (or more probably,
// we have a bug), report it. We could also add a metric, but we're not really
// expecting ourselves to be buggy, so a noisy warning should be enough.
if _, ok := t.pending[id]; ok {
log.Error("Network request id collision", "protocol", t.protocol, "version", version, "code", reqCode, "id", id)
return
}
// If we have too many pending requests, bail out instead of leaking memory
if pending := len(t.pending); pending >= maxTrackedPackets {
log.Error("Request tracker exceeded allowance", "pending", pending, "peer", peer, "protocol", t.protocol, "version", version, "code", reqCode)
return
}
// Id doesn't exist yet, start tracking it
t.pending[id] = &request{
peer: peer,
version: version,
reqCode: reqCode,
resCode: resCode,
time: time.Now(),
expire: t.expire.PushBack(id),
}
g := fmt.Sprintf("%s/%s/%d/%#02x", trackedGaugeName, t.protocol, version, reqCode)
metrics.GetOrRegisterGauge(g, nil).Inc(1)
// If we've just inserted the first item, start the expiration timer
if t.wake == nil {
t.wake = time.AfterFunc(t.timeout, t.clean)
}
}
// clean is called automatically when a preset time passes without a response
// being dleivered for the first network request.
func (t *Tracker) clean() {
t.lock.Lock()
defer t.lock.Unlock()
// Expire anything within a certain threshold (might be no items at all if
// we raced with the delivery)
for t.expire.Len() > 0 {
// Stop iterating if the next pending request is still alive
var (
head = t.expire.Front()
id = head.Value.(uint64)
req = t.pending[id]
)
if time.Since(req.time) < t.timeout+5*time.Millisecond {
break
}
// Nope, dead, drop it
t.expire.Remove(head)
delete(t.pending, id)
g := fmt.Sprintf("%s/%s/%d/%#02x", trackedGaugeName, t.protocol, req.version, req.reqCode)
metrics.GetOrRegisterGauge(g, nil).Dec(1)
m := fmt.Sprintf("%s/%s/%d/%#02x", lostMeterName, t.protocol, req.version, req.reqCode)
metrics.GetOrRegisterMeter(m, nil).Mark(1)
}
t.schedule()
}
// schedule starts a timer to trigger on the expiration of the first network
// packet.
func (t *Tracker) schedule() {
if t.expire.Len() == 0 {
t.wake = nil
return
}
t.wake = time.AfterFunc(time.Until(t.pending[t.expire.Front().Value.(uint64)].time.Add(t.timeout)), t.clean)
}
// Fulfil fills a pending request, if any is available, reporting on various metrics.
func (t *Tracker) Fulfil(peer string, version uint, code uint64, id uint64) {
if !metrics.Enabled {
return
}
t.lock.Lock()
defer t.lock.Unlock()
// If it's a non existing request, track as stale response
req, ok := t.pending[id]
if !ok {
m := fmt.Sprintf("%s/%s/%d/%#02x", staleMeterName, t.protocol, version, code)
metrics.GetOrRegisterMeter(m, nil).Mark(1)
return
}
// If the response is funky, it might be some active attack
if req.peer != peer || req.version != version || req.resCode != code {
log.Warn("Network response id collision",
"have", fmt.Sprintf("%s:%s/%d:%d", peer, t.protocol, version, code),
"want", fmt.Sprintf("%s:%s/%d:%d", peer, t.protocol, req.version, req.resCode),
)
return
}
// Everything matches, mark the request serviced and meter it
t.expire.Remove(req.expire)
delete(t.pending, id)
if req.expire.Prev() == nil {
if t.wake.Stop() {
t.schedule()
}
}
g := fmt.Sprintf("%s/%s/%d/%#02x", trackedGaugeName, t.protocol, req.version, req.reqCode)
metrics.GetOrRegisterGauge(g, nil).Dec(1)
h := fmt.Sprintf("%s/%s/%d/%#02x", waitHistName, t.protocol, req.version, req.reqCode)
sampler := func() metrics.Sample {
return metrics.ResettingSample(
metrics.NewExpDecaySample(1028, 0.015),
)
}
metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(time.Since(req.time).Microseconds())
}