dashboard, p2p, vendor: visualize peers (#19247)
* dashboard, p2p: visualize peers * dashboard: change scale to green to red
This commit is contained in:
committed by
Péter Szilágyi
parent
1591b63306
commit
1a29bf0ee2
@ -18,8 +18,10 @@ package dashboard
|
||||
|
||||
//go:generate yarn --cwd ./assets install
|
||||
//go:generate yarn --cwd ./assets build
|
||||
//go:generate go-bindata -nometadata -o assets.go -prefix assets -nocompress -pkg dashboard assets/index.html assets/bundle.js
|
||||
//go:generate yarn --cwd ./assets js-beautify -f bundle.js.map -r -w 1
|
||||
//go:generate go-bindata -nometadata -o assets.go -prefix assets -nocompress -pkg dashboard assets/index.html assets/bundle.js assets/bundle.js.map
|
||||
//go:generate sh -c "sed 's#var _bundleJs#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
|
||||
//go:generate sh -c "sed 's#var _bundleJsMap#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
|
||||
//go:generate sh -c "sed 's#var _indexHtml#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
|
||||
//go:generate gofmt -w -s assets.go
|
||||
|
||||
@ -27,16 +29,13 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"io"
|
||||
|
||||
"github.com/elastic/gosigar"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
@ -45,31 +44,29 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
activeMemorySampleLimit = 200 // Maximum number of active memory data samples
|
||||
virtualMemorySampleLimit = 200 // Maximum number of virtual memory data samples
|
||||
networkIngressSampleLimit = 200 // Maximum number of network ingress data samples
|
||||
networkEgressSampleLimit = 200 // Maximum number of network egress data samples
|
||||
processCPUSampleLimit = 200 // Maximum number of process cpu data samples
|
||||
systemCPUSampleLimit = 200 // Maximum number of system cpu data samples
|
||||
diskReadSampleLimit = 200 // Maximum number of disk read data samples
|
||||
diskWriteSampleLimit = 200 // Maximum number of disk write data samples
|
||||
sampleLimit = 200 // Maximum number of data samples
|
||||
)
|
||||
|
||||
var nextID uint32 // Next connection id
|
||||
|
||||
// Dashboard contains the dashboard internals.
|
||||
type Dashboard struct {
|
||||
config *Config
|
||||
config *Config // Configuration values for the dashboard
|
||||
|
||||
listener net.Listener
|
||||
conns map[uint32]*client // Currently live websocket connections
|
||||
history *Message
|
||||
lock sync.RWMutex // Lock protecting the dashboard's internals
|
||||
listener net.Listener // Network listener listening for dashboard clients
|
||||
conns map[uint32]*client // Currently live websocket connections
|
||||
nextConnID uint32 // Next connection id
|
||||
|
||||
logdir string
|
||||
history *Message // Stored historical data
|
||||
|
||||
lock sync.Mutex // Lock protecting the dashboard's internals
|
||||
sysLock sync.RWMutex // Lock protecting the stored system data
|
||||
peerLock sync.RWMutex // Lock protecting the stored peer data
|
||||
logLock sync.RWMutex // Lock protecting the stored log data
|
||||
|
||||
geodb *geoDB // geoip database instance for IP to geographical information conversions
|
||||
logdir string // Directory containing the log files
|
||||
|
||||
quit chan chan error // Channel used for graceful exit
|
||||
wg sync.WaitGroup
|
||||
wg sync.WaitGroup // Wait group used to close the data collector threads
|
||||
}
|
||||
|
||||
// client represents active websocket connection with a remote browser.
|
||||
@ -96,14 +93,14 @@ func New(config *Config, commit string, logdir string) *Dashboard {
|
||||
Version: fmt.Sprintf("v%d.%d.%d%s", params.VersionMajor, params.VersionMinor, params.VersionPatch, versionMeta),
|
||||
},
|
||||
System: &SystemMessage{
|
||||
ActiveMemory: emptyChartEntries(now, activeMemorySampleLimit, config.Refresh),
|
||||
VirtualMemory: emptyChartEntries(now, virtualMemorySampleLimit, config.Refresh),
|
||||
NetworkIngress: emptyChartEntries(now, networkIngressSampleLimit, config.Refresh),
|
||||
NetworkEgress: emptyChartEntries(now, networkEgressSampleLimit, config.Refresh),
|
||||
ProcessCPU: emptyChartEntries(now, processCPUSampleLimit, config.Refresh),
|
||||
SystemCPU: emptyChartEntries(now, systemCPUSampleLimit, config.Refresh),
|
||||
DiskRead: emptyChartEntries(now, diskReadSampleLimit, config.Refresh),
|
||||
DiskWrite: emptyChartEntries(now, diskWriteSampleLimit, config.Refresh),
|
||||
ActiveMemory: emptyChartEntries(now, sampleLimit),
|
||||
VirtualMemory: emptyChartEntries(now, sampleLimit),
|
||||
NetworkIngress: emptyChartEntries(now, sampleLimit),
|
||||
NetworkEgress: emptyChartEntries(now, sampleLimit),
|
||||
ProcessCPU: emptyChartEntries(now, sampleLimit),
|
||||
SystemCPU: emptyChartEntries(now, sampleLimit),
|
||||
DiskRead: emptyChartEntries(now, sampleLimit),
|
||||
DiskWrite: emptyChartEntries(now, sampleLimit),
|
||||
},
|
||||
},
|
||||
logdir: logdir,
|
||||
@ -111,12 +108,10 @@ func New(config *Config, commit string, logdir string) *Dashboard {
|
||||
}
|
||||
|
||||
// emptyChartEntries returns a ChartEntry array containing limit number of empty samples.
|
||||
func emptyChartEntries(t time.Time, limit int, refresh time.Duration) ChartEntries {
|
||||
func emptyChartEntries(t time.Time, limit int) ChartEntries {
|
||||
ce := make(ChartEntries, limit)
|
||||
for i := 0; i < limit; i++ {
|
||||
ce[i] = &ChartEntry{
|
||||
Time: t.Add(-time.Duration(i) * refresh),
|
||||
}
|
||||
ce[i] = new(ChartEntry)
|
||||
}
|
||||
return ce
|
||||
}
|
||||
@ -132,9 +127,10 @@ func (db *Dashboard) APIs() []rpc.API { return nil }
|
||||
func (db *Dashboard) Start(server *p2p.Server) error {
|
||||
log.Info("Starting dashboard")
|
||||
|
||||
db.wg.Add(2)
|
||||
go db.collectData()
|
||||
db.wg.Add(3)
|
||||
go db.collectSystemData()
|
||||
go db.streamLogs()
|
||||
go db.collectPeerData()
|
||||
|
||||
http.HandleFunc("/", db.webHandler)
|
||||
http.Handle("/api", websocket.Handler(db.apiHandler))
|
||||
@ -160,7 +156,7 @@ func (db *Dashboard) Stop() error {
|
||||
}
|
||||
// Close the collectors.
|
||||
errc := make(chan error, 1)
|
||||
for i := 0; i < 2; i++ {
|
||||
for i := 0; i < 3; i++ {
|
||||
db.quit <- errc
|
||||
if err := <-errc; err != nil {
|
||||
errs = append(errs, err)
|
||||
@ -206,7 +202,7 @@ func (db *Dashboard) webHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// apiHandler handles requests for the dashboard.
|
||||
func (db *Dashboard) apiHandler(conn *websocket.Conn) {
|
||||
id := atomic.AddUint32(&nextID, 1)
|
||||
id := atomic.AddUint32(&db.nextConnID, 1)
|
||||
client := &client{
|
||||
conn: conn,
|
||||
msg: make(chan *Message, 128),
|
||||
@ -233,10 +229,21 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) {
|
||||
}
|
||||
}()
|
||||
|
||||
db.lock.Lock()
|
||||
// Send the past data.
|
||||
client.msg <- deepcopy.Copy(db.history).(*Message)
|
||||
db.sysLock.RLock()
|
||||
db.peerLock.RLock()
|
||||
db.logLock.RLock()
|
||||
|
||||
h := deepcopy.Copy(db.history).(*Message)
|
||||
|
||||
db.sysLock.RUnlock()
|
||||
db.peerLock.RUnlock()
|
||||
db.logLock.RUnlock()
|
||||
|
||||
client.msg <- h
|
||||
|
||||
// Start tracking the connection and drop at connection loss.
|
||||
db.lock.Lock()
|
||||
db.conns[id] = client
|
||||
db.lock.Unlock()
|
||||
defer func() {
|
||||
@ -259,136 +266,6 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) {
|
||||
}
|
||||
}
|
||||
|
||||
// meterCollector returns a function, which retrieves a specific meter.
|
||||
func meterCollector(name string) func() int64 {
|
||||
if metric := metrics.DefaultRegistry.Get(name); metric != nil {
|
||||
m := metric.(metrics.Meter)
|
||||
return func() int64 {
|
||||
return m.Count()
|
||||
}
|
||||
}
|
||||
return func() int64 {
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// collectData collects the required data to plot on the dashboard.
|
||||
func (db *Dashboard) collectData() {
|
||||
defer db.wg.Done()
|
||||
|
||||
systemCPUUsage := gosigar.Cpu{}
|
||||
systemCPUUsage.Get()
|
||||
var (
|
||||
mem runtime.MemStats
|
||||
|
||||
collectNetworkIngress = meterCollector("p2p/InboundTraffic")
|
||||
collectNetworkEgress = meterCollector("p2p/OutboundTraffic")
|
||||
collectDiskRead = meterCollector("eth/db/chaindata/disk/read")
|
||||
collectDiskWrite = meterCollector("eth/db/chaindata/disk/write")
|
||||
|
||||
prevNetworkIngress = collectNetworkIngress()
|
||||
prevNetworkEgress = collectNetworkEgress()
|
||||
prevProcessCPUTime = getProcessCPUTime()
|
||||
prevSystemCPUUsage = systemCPUUsage
|
||||
prevDiskRead = collectDiskRead()
|
||||
prevDiskWrite = collectDiskWrite()
|
||||
|
||||
frequency = float64(db.config.Refresh / time.Second)
|
||||
numCPU = float64(runtime.NumCPU())
|
||||
)
|
||||
|
||||
for {
|
||||
select {
|
||||
case errc := <-db.quit:
|
||||
errc <- nil
|
||||
return
|
||||
case <-time.After(db.config.Refresh):
|
||||
systemCPUUsage.Get()
|
||||
var (
|
||||
curNetworkIngress = collectNetworkIngress()
|
||||
curNetworkEgress = collectNetworkEgress()
|
||||
curProcessCPUTime = getProcessCPUTime()
|
||||
curSystemCPUUsage = systemCPUUsage
|
||||
curDiskRead = collectDiskRead()
|
||||
curDiskWrite = collectDiskWrite()
|
||||
|
||||
deltaNetworkIngress = float64(curNetworkIngress - prevNetworkIngress)
|
||||
deltaNetworkEgress = float64(curNetworkEgress - prevNetworkEgress)
|
||||
deltaProcessCPUTime = curProcessCPUTime - prevProcessCPUTime
|
||||
deltaSystemCPUUsage = curSystemCPUUsage.Delta(prevSystemCPUUsage)
|
||||
deltaDiskRead = curDiskRead - prevDiskRead
|
||||
deltaDiskWrite = curDiskWrite - prevDiskWrite
|
||||
)
|
||||
prevNetworkIngress = curNetworkIngress
|
||||
prevNetworkEgress = curNetworkEgress
|
||||
prevProcessCPUTime = curProcessCPUTime
|
||||
prevSystemCPUUsage = curSystemCPUUsage
|
||||
prevDiskRead = curDiskRead
|
||||
prevDiskWrite = curDiskWrite
|
||||
|
||||
now := time.Now()
|
||||
|
||||
runtime.ReadMemStats(&mem)
|
||||
activeMemory := &ChartEntry{
|
||||
Time: now,
|
||||
Value: float64(mem.Alloc) / frequency,
|
||||
}
|
||||
virtualMemory := &ChartEntry{
|
||||
Time: now,
|
||||
Value: float64(mem.Sys) / frequency,
|
||||
}
|
||||
networkIngress := &ChartEntry{
|
||||
Time: now,
|
||||
Value: deltaNetworkIngress / frequency,
|
||||
}
|
||||
networkEgress := &ChartEntry{
|
||||
Time: now,
|
||||
Value: deltaNetworkEgress / frequency,
|
||||
}
|
||||
processCPU := &ChartEntry{
|
||||
Time: now,
|
||||
Value: deltaProcessCPUTime / frequency / numCPU * 100,
|
||||
}
|
||||
systemCPU := &ChartEntry{
|
||||
Time: now,
|
||||
Value: float64(deltaSystemCPUUsage.Sys+deltaSystemCPUUsage.User) / frequency / numCPU,
|
||||
}
|
||||
diskRead := &ChartEntry{
|
||||
Time: now,
|
||||
Value: float64(deltaDiskRead) / frequency,
|
||||
}
|
||||
diskWrite := &ChartEntry{
|
||||
Time: now,
|
||||
Value: float64(deltaDiskWrite) / frequency,
|
||||
}
|
||||
sys := db.history.System
|
||||
db.lock.Lock()
|
||||
sys.ActiveMemory = append(sys.ActiveMemory[1:], activeMemory)
|
||||
sys.VirtualMemory = append(sys.VirtualMemory[1:], virtualMemory)
|
||||
sys.NetworkIngress = append(sys.NetworkIngress[1:], networkIngress)
|
||||
sys.NetworkEgress = append(sys.NetworkEgress[1:], networkEgress)
|
||||
sys.ProcessCPU = append(sys.ProcessCPU[1:], processCPU)
|
||||
sys.SystemCPU = append(sys.SystemCPU[1:], systemCPU)
|
||||
sys.DiskRead = append(sys.DiskRead[1:], diskRead)
|
||||
sys.DiskWrite = append(sys.DiskWrite[1:], diskWrite)
|
||||
db.lock.Unlock()
|
||||
|
||||
db.sendToAll(&Message{
|
||||
System: &SystemMessage{
|
||||
ActiveMemory: ChartEntries{activeMemory},
|
||||
VirtualMemory: ChartEntries{virtualMemory},
|
||||
NetworkIngress: ChartEntries{networkIngress},
|
||||
NetworkEgress: ChartEntries{networkEgress},
|
||||
ProcessCPU: ChartEntries{processCPU},
|
||||
SystemCPU: ChartEntries{systemCPU},
|
||||
DiskRead: ChartEntries{diskRead},
|
||||
DiskWrite: ChartEntries{diskWrite},
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sendToAll sends the given message to the active dashboards.
|
||||
func (db *Dashboard) sendToAll(msg *Message) {
|
||||
db.lock.Lock()
|
||||
|
Reference in New Issue
Block a user