swarm: initial instrumentation (#15969)
* swarm: initial instrumentation with go-metrics * swarm: initialise metrics collection and add ResettingTimer to HTTP requests * swarm: update metrics flags names. remove redundant Timer. * swarm: rename method for periodically updating gauges * swarm: finalise metrics after feedback * swarm/network: always init kad metrics containers * swarm/network: off-by-one index in metrics containers * swarm, metrics: resolved conflicts
This commit is contained in:
committed by
Balint Gabor
parent
b677a07d36
commit
dcca613a0b
@@ -23,9 +23,19 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
)
|
||||
|
||||
//metrics variables
|
||||
var (
|
||||
syncReceiveCount = metrics.NewRegisteredCounter("network.sync.recv.count", nil)
|
||||
syncReceiveIgnore = metrics.NewRegisteredCounter("network.sync.recv.ignore", nil)
|
||||
syncSendCount = metrics.NewRegisteredCounter("network.sync.send.count", nil)
|
||||
syncSendRefused = metrics.NewRegisteredCounter("network.sync.send.refused", nil)
|
||||
syncSendNotFound = metrics.NewRegisteredCounter("network.sync.send.notfound", nil)
|
||||
)
|
||||
|
||||
// Handler for storage/retrieval related protocol requests
|
||||
// implements the StorageHandler interface used by the bzz protocol
|
||||
type Depo struct {
|
||||
@@ -107,6 +117,7 @@ func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) {
|
||||
log.Trace(fmt.Sprintf("Depo.handleStoreRequest: %v not found locally. create new chunk/request", req.Key))
|
||||
// not found in memory cache, ie., a genuine store request
|
||||
// create chunk
|
||||
syncReceiveCount.Inc(1)
|
||||
chunk = storage.NewChunk(req.Key, nil)
|
||||
|
||||
case chunk.SData == nil:
|
||||
@@ -116,6 +127,7 @@ func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) {
|
||||
default:
|
||||
// data is found, store request ignored
|
||||
// this should update access count?
|
||||
syncReceiveIgnore.Inc(1)
|
||||
log.Trace(fmt.Sprintf("Depo.HandleStoreRequest: %v found locally. ignore.", req))
|
||||
islocal = true
|
||||
//return
|
||||
@@ -172,11 +184,14 @@ func (self *Depo) HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer)
|
||||
SData: chunk.SData,
|
||||
requestTimeout: req.timeout, //
|
||||
}
|
||||
syncSendCount.Inc(1)
|
||||
p.syncer.addRequest(sreq, DeliverReq)
|
||||
} else {
|
||||
syncSendRefused.Inc(1)
|
||||
log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content found, not wanted", req.Key.Log()))
|
||||
}
|
||||
} else {
|
||||
syncSendNotFound.Inc(1)
|
||||
log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content not found locally. asked swarm for help. will get back", req.Key.Log()))
|
||||
}
|
||||
}
|
||||
|
@@ -24,6 +24,7 @@ import (
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||
"github.com/ethereum/go-ethereum/p2p/netutil"
|
||||
"github.com/ethereum/go-ethereum/swarm/network/kademlia"
|
||||
@@ -39,6 +40,12 @@ import (
|
||||
// connections and disconnections are reported and relayed
|
||||
// to keep the nodetable uptodate
|
||||
|
||||
var (
|
||||
peersNumGauge = metrics.NewRegisteredGauge("network.peers.num", nil)
|
||||
addPeerCounter = metrics.NewRegisteredCounter("network.addpeer.count", nil)
|
||||
removePeerCounter = metrics.NewRegisteredCounter("network.removepeer.count", nil)
|
||||
)
|
||||
|
||||
type Hive struct {
|
||||
listenAddr func() string
|
||||
callInterval uint64
|
||||
@@ -192,6 +199,7 @@ func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPee
|
||||
func (self *Hive) keepAlive() {
|
||||
alarm := time.NewTicker(time.Duration(self.callInterval)).C
|
||||
for {
|
||||
peersNumGauge.Update(int64(self.kad.Count()))
|
||||
select {
|
||||
case <-alarm:
|
||||
if self.kad.DBCount() > 0 {
|
||||
@@ -223,6 +231,7 @@ func (self *Hive) Stop() error {
|
||||
|
||||
// called at the end of a successful protocol handshake
|
||||
func (self *Hive) addPeer(p *peer) error {
|
||||
addPeerCounter.Inc(1)
|
||||
defer func() {
|
||||
select {
|
||||
case self.more <- true:
|
||||
@@ -247,6 +256,7 @@ func (self *Hive) addPeer(p *peer) error {
|
||||
|
||||
// called after peer disconnected
|
||||
func (self *Hive) removePeer(p *peer) {
|
||||
removePeerCounter.Inc(1)
|
||||
log.Debug(fmt.Sprintf("bee %v removed", p))
|
||||
self.kad.Off(p, saveSync)
|
||||
select {
|
||||
|
@@ -24,6 +24,16 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
)
|
||||
|
||||
//metrics variables
|
||||
//For metrics, we want to count how many times peers are added/removed
|
||||
//at a certain index. Thus we do that with an array of counters with
|
||||
//entry for each index
|
||||
var (
|
||||
bucketAddIndexCount []metrics.Counter
|
||||
bucketRmIndexCount []metrics.Counter
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -88,12 +98,14 @@ type Node interface {
|
||||
// params is KadParams configuration
|
||||
func New(addr Address, params *KadParams) *Kademlia {
|
||||
buckets := make([][]Node, params.MaxProx+1)
|
||||
return &Kademlia{
|
||||
kad := &Kademlia{
|
||||
addr: addr,
|
||||
KadParams: params,
|
||||
buckets: buckets,
|
||||
db: newKadDb(addr, params),
|
||||
}
|
||||
kad.initMetricsVariables()
|
||||
return kad
|
||||
}
|
||||
|
||||
// accessor for KAD base address
|
||||
@@ -138,6 +150,7 @@ func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error
|
||||
// TODO: give priority to peers with active traffic
|
||||
if len(bucket) < self.BucketSize { // >= allows us to add peers beyond the bucketsize limitation
|
||||
self.buckets[index] = append(bucket, node)
|
||||
bucketAddIndexCount[index].Inc(1)
|
||||
log.Debug(fmt.Sprintf("add node %v to table", node))
|
||||
self.setProxLimit(index, true)
|
||||
record.node = node
|
||||
@@ -178,6 +191,7 @@ func (self *Kademlia) Off(node Node, cb func(*NodeRecord, Node)) (err error) {
|
||||
defer self.lock.Unlock()
|
||||
|
||||
index := self.proximityBin(node.Addr())
|
||||
bucketRmIndexCount[index].Inc(1)
|
||||
bucket := self.buckets[index]
|
||||
for i := 0; i < len(bucket); i++ {
|
||||
if node.Addr() == bucket[i].Addr() {
|
||||
@@ -426,3 +440,15 @@ func (self *Kademlia) String() string {
|
||||
rows = append(rows, "=========================================================================")
|
||||
return strings.Join(rows, "\n")
|
||||
}
|
||||
|
||||
//We have to build up the array of counters for each index
|
||||
func (self *Kademlia) initMetricsVariables() {
|
||||
//create the arrays
|
||||
bucketAddIndexCount = make([]metrics.Counter, self.MaxProx+1)
|
||||
bucketRmIndexCount = make([]metrics.Counter, self.MaxProx+1)
|
||||
//at each index create a metrics counter
|
||||
for i := 0; i < (self.KadParams.MaxProx + 1); i++ {
|
||||
bucketAddIndexCount[i] = metrics.NewRegisteredCounter(fmt.Sprintf("network.kademlia.bucket.add.%d.index", i), nil)
|
||||
bucketRmIndexCount[i] = metrics.NewRegisteredCounter(fmt.Sprintf("network.kademlia.bucket.rm.%d.index", i), nil)
|
||||
}
|
||||
}
|
||||
|
@@ -39,12 +39,26 @@ import (
|
||||
|
||||
"github.com/ethereum/go-ethereum/contracts/chequebook"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
bzzswap "github.com/ethereum/go-ethereum/swarm/services/swap"
|
||||
"github.com/ethereum/go-ethereum/swarm/services/swap/swap"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
)
|
||||
|
||||
//metrics variables
|
||||
var (
|
||||
storeRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.storerequest.count", nil)
|
||||
retrieveRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.retrieverequest.count", nil)
|
||||
peersMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.peers.count", nil)
|
||||
syncRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.syncrequest.count", nil)
|
||||
unsyncedKeysMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.unsyncedkeys.count", nil)
|
||||
deliverRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.deliverrequest.count", nil)
|
||||
paymentMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.payment.count", nil)
|
||||
invalidMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.invalid.count", nil)
|
||||
handleStatusMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.handlestatus.count", nil)
|
||||
)
|
||||
|
||||
const (
|
||||
Version = 0
|
||||
ProtocolLength = uint64(8)
|
||||
@@ -206,6 +220,7 @@ func (self *bzz) handle() error {
|
||||
|
||||
case storeRequestMsg:
|
||||
// store requests are dispatched to netStore
|
||||
storeRequestMsgCounter.Inc(1)
|
||||
var req storeRequestMsgData
|
||||
if err := msg.Decode(&req); err != nil {
|
||||
return fmt.Errorf("<- %v: %v", msg, err)
|
||||
@@ -221,6 +236,7 @@ func (self *bzz) handle() error {
|
||||
|
||||
case retrieveRequestMsg:
|
||||
// retrieve Requests are dispatched to netStore
|
||||
retrieveRequestMsgCounter.Inc(1)
|
||||
var req retrieveRequestMsgData
|
||||
if err := msg.Decode(&req); err != nil {
|
||||
return fmt.Errorf("<- %v: %v", msg, err)
|
||||
@@ -241,6 +257,7 @@ func (self *bzz) handle() error {
|
||||
case peersMsg:
|
||||
// response to lookups and immediate response to retrieve requests
|
||||
// dispatches new peer data to the hive that adds them to KADDB
|
||||
peersMsgCounter.Inc(1)
|
||||
var req peersMsgData
|
||||
if err := msg.Decode(&req); err != nil {
|
||||
return fmt.Errorf("<- %v: %v", msg, err)
|
||||
@@ -250,6 +267,7 @@ func (self *bzz) handle() error {
|
||||
self.hive.HandlePeersMsg(&req, &peer{bzz: self})
|
||||
|
||||
case syncRequestMsg:
|
||||
syncRequestMsgCounter.Inc(1)
|
||||
var req syncRequestMsgData
|
||||
if err := msg.Decode(&req); err != nil {
|
||||
return fmt.Errorf("<- %v: %v", msg, err)
|
||||
@@ -260,6 +278,7 @@ func (self *bzz) handle() error {
|
||||
|
||||
case unsyncedKeysMsg:
|
||||
// coming from parent node offering
|
||||
unsyncedKeysMsgCounter.Inc(1)
|
||||
var req unsyncedKeysMsgData
|
||||
if err := msg.Decode(&req); err != nil {
|
||||
return fmt.Errorf("<- %v: %v", msg, err)
|
||||
@@ -274,6 +293,7 @@ func (self *bzz) handle() error {
|
||||
case deliveryRequestMsg:
|
||||
// response to syncKeysMsg hashes filtered not existing in db
|
||||
// also relays the last synced state to the source
|
||||
deliverRequestMsgCounter.Inc(1)
|
||||
var req deliveryRequestMsgData
|
||||
if err := msg.Decode(&req); err != nil {
|
||||
return fmt.Errorf("<-msg %v: %v", msg, err)
|
||||
@@ -287,6 +307,7 @@ func (self *bzz) handle() error {
|
||||
|
||||
case paymentMsg:
|
||||
// swap protocol message for payment, Units paid for, Cheque paid with
|
||||
paymentMsgCounter.Inc(1)
|
||||
if self.swapEnabled {
|
||||
var req paymentMsgData
|
||||
if err := msg.Decode(&req); err != nil {
|
||||
@@ -298,6 +319,7 @@ func (self *bzz) handle() error {
|
||||
|
||||
default:
|
||||
// no other message is allowed
|
||||
invalidMsgCounter.Inc(1)
|
||||
return fmt.Errorf("invalid message code: %v", msg.Code)
|
||||
}
|
||||
return nil
|
||||
@@ -332,6 +354,8 @@ func (self *bzz) handleStatus() (err error) {
|
||||
return fmt.Errorf("first msg has code %x (!= %x)", msg.Code, statusMsg)
|
||||
}
|
||||
|
||||
handleStatusMsgCounter.Inc(1)
|
||||
|
||||
if msg.Size > ProtocolMaxMsgSize {
|
||||
return fmt.Errorf("message too long: %v > %v", msg.Size, ProtocolMaxMsgSize)
|
||||
}
|
||||
|
Reference in New Issue
Block a user