cmd, dashboard, log: log collection and exploration (#17097)

* cmd, dashboard, internal, log, node: logging feature

* cmd, dashboard, internal, log: requested changes

* dashboard, vendor: gofmt, govendor, use vendored file watcher

* dashboard, log: gofmt -s -w, goimports

* dashboard, log: gosimple
This commit is contained in:
Kurkó Mihály
2018-07-11 10:59:04 +03:00
committed by Péter Szilágyi
parent 2eedbe799f
commit a9835c1816
28 changed files with 11444 additions and 8211 deletions

View File

@ -32,12 +32,15 @@ import (
"sync/atomic"
"time"
"io"
"github.com/elastic/gosigar"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/mohae/deepcopy"
"golang.org/x/net/websocket"
)
@ -60,10 +63,11 @@ type Dashboard struct {
listener net.Listener
conns map[uint32]*client // Currently live websocket connections
charts *SystemMessage
commit string
history *Message
lock sync.RWMutex // Lock protecting the dashboard's internals
logdir string
quit chan chan error // Channel used for graceful exit
wg sync.WaitGroup
}
@ -71,30 +75,39 @@ type Dashboard struct {
// client represents active websocket connection with a remote browser.
type client struct {
conn *websocket.Conn // Particular live websocket connection
msg chan Message // Message queue for the update messages
msg chan *Message // Message queue for the update messages
logger log.Logger // Logger for the particular live websocket connection
}
// New creates a new dashboard instance with the given configuration.
func New(config *Config, commit string) (*Dashboard, error) {
func New(config *Config, commit string, logdir string) *Dashboard {
now := time.Now()
db := &Dashboard{
versionMeta := ""
if len(params.VersionMeta) > 0 {
versionMeta = fmt.Sprintf(" (%s)", params.VersionMeta)
}
return &Dashboard{
conns: make(map[uint32]*client),
config: config,
quit: make(chan chan error),
charts: &SystemMessage{
ActiveMemory: emptyChartEntries(now, activeMemorySampleLimit, config.Refresh),
VirtualMemory: emptyChartEntries(now, virtualMemorySampleLimit, config.Refresh),
NetworkIngress: emptyChartEntries(now, networkIngressSampleLimit, config.Refresh),
NetworkEgress: emptyChartEntries(now, networkEgressSampleLimit, config.Refresh),
ProcessCPU: emptyChartEntries(now, processCPUSampleLimit, config.Refresh),
SystemCPU: emptyChartEntries(now, systemCPUSampleLimit, config.Refresh),
DiskRead: emptyChartEntries(now, diskReadSampleLimit, config.Refresh),
DiskWrite: emptyChartEntries(now, diskWriteSampleLimit, config.Refresh),
history: &Message{
General: &GeneralMessage{
Commit: commit,
Version: fmt.Sprintf("v%d.%d.%d%s", params.VersionMajor, params.VersionMinor, params.VersionPatch, versionMeta),
},
System: &SystemMessage{
ActiveMemory: emptyChartEntries(now, activeMemorySampleLimit, config.Refresh),
VirtualMemory: emptyChartEntries(now, virtualMemorySampleLimit, config.Refresh),
NetworkIngress: emptyChartEntries(now, networkIngressSampleLimit, config.Refresh),
NetworkEgress: emptyChartEntries(now, networkEgressSampleLimit, config.Refresh),
ProcessCPU: emptyChartEntries(now, processCPUSampleLimit, config.Refresh),
SystemCPU: emptyChartEntries(now, systemCPUSampleLimit, config.Refresh),
DiskRead: emptyChartEntries(now, diskReadSampleLimit, config.Refresh),
DiskWrite: emptyChartEntries(now, diskWriteSampleLimit, config.Refresh),
},
},
commit: commit,
logdir: logdir,
}
return db, nil
}
// emptyChartEntries returns a ChartEntry array containing limit number of empty samples.
@ -108,19 +121,20 @@ func emptyChartEntries(t time.Time, limit int, refresh time.Duration) ChartEntri
return ce
}
// Protocols is a meaningless implementation of node.Service.
// Protocols implements the node.Service interface.
func (db *Dashboard) Protocols() []p2p.Protocol { return nil }
// APIs is a meaningless implementation of node.Service.
// APIs implements the node.Service interface.
func (db *Dashboard) APIs() []rpc.API { return nil }
// Start implements node.Service, starting the data collection thread and the listening server of the dashboard.
// Start starts the data collection thread and the listening server of the dashboard.
// Implements the node.Service interface.
func (db *Dashboard) Start(server *p2p.Server) error {
log.Info("Starting dashboard")
db.wg.Add(2)
go db.collectData()
go db.collectLogs() // In case of removing this line change 2 back to 1 in wg.Add.
go db.streamLogs()
http.HandleFunc("/", db.webHandler)
http.Handle("/api", websocket.Handler(db.apiHandler))
@ -136,7 +150,8 @@ func (db *Dashboard) Start(server *p2p.Server) error {
return nil
}
// Stop implements node.Service, stopping the data collection thread and the connection listener of the dashboard.
// Stop stops the data collection thread and the connection listener of the dashboard.
// Implements the node.Service interface.
func (db *Dashboard) Stop() error {
// Close the connection listener.
var errs []error
@ -194,7 +209,7 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) {
id := atomic.AddUint32(&nextID, 1)
client := &client{
conn: conn,
msg: make(chan Message, 128),
msg: make(chan *Message, 128),
logger: log.New("id", id),
}
done := make(chan struct{})
@ -218,29 +233,10 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) {
}
}()
versionMeta := ""
if len(params.VersionMeta) > 0 {
versionMeta = fmt.Sprintf(" (%s)", params.VersionMeta)
}
// Send the past data.
client.msg <- Message{
General: &GeneralMessage{
Version: fmt.Sprintf("v%d.%d.%d%s", params.VersionMajor, params.VersionMinor, params.VersionPatch, versionMeta),
Commit: db.commit,
},
System: &SystemMessage{
ActiveMemory: db.charts.ActiveMemory,
VirtualMemory: db.charts.VirtualMemory,
NetworkIngress: db.charts.NetworkIngress,
NetworkEgress: db.charts.NetworkEgress,
ProcessCPU: db.charts.ProcessCPU,
SystemCPU: db.charts.SystemCPU,
DiskRead: db.charts.DiskRead,
DiskWrite: db.charts.DiskWrite,
},
}
// Start tracking the connection and drop at connection loss.
db.lock.Lock()
// Send the past data.
client.msg <- deepcopy.Copy(db.history).(*Message)
// Start tracking the connection and drop at connection loss.
db.conns[id] = client
db.lock.Unlock()
defer func() {
@ -249,29 +245,53 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) {
db.lock.Unlock()
}()
for {
fail := []byte{}
if _, err := conn.Read(fail); err != nil {
r := new(Request)
if err := websocket.JSON.Receive(conn, r); err != nil {
if err != io.EOF {
client.logger.Warn("Failed to receive request", "err", err)
}
close(done)
return
}
// Ignore all messages
if r.Logs != nil {
db.handleLogRequest(r.Logs, client)
}
}
}
// meterCollector returns a function, which retrieves a specific meter.
func meterCollector(name string) func() int64 {
if metric := metrics.DefaultRegistry.Get(name); metric != nil {
m := metric.(metrics.Meter)
return func() int64 {
return m.Count()
}
}
return func() int64 {
return 0
}
}
// collectData collects the required data to plot on the dashboard.
func (db *Dashboard) collectData() {
defer db.wg.Done()
systemCPUUsage := gosigar.Cpu{}
systemCPUUsage.Get()
var (
mem runtime.MemStats
prevNetworkIngress = metrics.DefaultRegistry.Get("p2p/InboundTraffic").(metrics.Meter).Count()
prevNetworkEgress = metrics.DefaultRegistry.Get("p2p/OutboundTraffic").(metrics.Meter).Count()
collectNetworkIngress = meterCollector("p2p/InboundTraffic")
collectNetworkEgress = meterCollector("p2p/OutboundTraffic")
collectDiskRead = meterCollector("eth/db/chaindata/disk/read")
collectDiskWrite = meterCollector("eth/db/chaindata/disk/write")
prevNetworkIngress = collectNetworkIngress()
prevNetworkEgress = collectNetworkEgress()
prevProcessCPUTime = getProcessCPUTime()
prevSystemCPUUsage = systemCPUUsage
prevDiskRead = metrics.DefaultRegistry.Get("eth/db/chaindata/disk/read").(metrics.Meter).Count()
prevDiskWrite = metrics.DefaultRegistry.Get("eth/db/chaindata/disk/write").(metrics.Meter).Count()
prevDiskRead = collectDiskRead()
prevDiskWrite = collectDiskWrite()
frequency = float64(db.config.Refresh / time.Second)
numCPU = float64(runtime.NumCPU())
@ -285,12 +305,12 @@ func (db *Dashboard) collectData() {
case <-time.After(db.config.Refresh):
systemCPUUsage.Get()
var (
curNetworkIngress = metrics.DefaultRegistry.Get("p2p/InboundTraffic").(metrics.Meter).Count()
curNetworkEgress = metrics.DefaultRegistry.Get("p2p/OutboundTraffic").(metrics.Meter).Count()
curNetworkIngress = collectNetworkIngress()
curNetworkEgress = collectNetworkEgress()
curProcessCPUTime = getProcessCPUTime()
curSystemCPUUsage = systemCPUUsage
curDiskRead = metrics.DefaultRegistry.Get("eth/db/chaindata/disk/read").(metrics.Meter).Count()
curDiskWrite = metrics.DefaultRegistry.Get("eth/db/chaindata/disk/write").(metrics.Meter).Count()
curDiskRead = collectDiskRead()
curDiskWrite = collectDiskWrite()
deltaNetworkIngress = float64(curNetworkIngress - prevNetworkIngress)
deltaNetworkEgress = float64(curNetworkEgress - prevNetworkEgress)
@ -341,14 +361,17 @@ func (db *Dashboard) collectData() {
Time: now,
Value: float64(deltaDiskWrite) / frequency,
}
db.charts.ActiveMemory = append(db.charts.ActiveMemory[1:], activeMemory)
db.charts.VirtualMemory = append(db.charts.VirtualMemory[1:], virtualMemory)
db.charts.NetworkIngress = append(db.charts.NetworkIngress[1:], networkIngress)
db.charts.NetworkEgress = append(db.charts.NetworkEgress[1:], networkEgress)
db.charts.ProcessCPU = append(db.charts.ProcessCPU[1:], processCPU)
db.charts.SystemCPU = append(db.charts.SystemCPU[1:], systemCPU)
db.charts.DiskRead = append(db.charts.DiskRead[1:], diskRead)
db.charts.DiskWrite = append(db.charts.DiskRead[1:], diskWrite)
sys := db.history.System
db.lock.Lock()
sys.ActiveMemory = append(sys.ActiveMemory[1:], activeMemory)
sys.VirtualMemory = append(sys.VirtualMemory[1:], virtualMemory)
sys.NetworkIngress = append(sys.NetworkIngress[1:], networkIngress)
sys.NetworkEgress = append(sys.NetworkEgress[1:], networkEgress)
sys.ProcessCPU = append(sys.ProcessCPU[1:], processCPU)
sys.SystemCPU = append(sys.SystemCPU[1:], systemCPU)
sys.DiskRead = append(sys.DiskRead[1:], diskRead)
sys.DiskWrite = append(sys.DiskRead[1:], diskWrite)
db.lock.Unlock()
db.sendToAll(&Message{
System: &SystemMessage{
@ -366,34 +389,12 @@ func (db *Dashboard) collectData() {
}
}
// collectLogs collects and sends the logs to the active dashboards.
func (db *Dashboard) collectLogs() {
defer db.wg.Done()
id := 1
// TODO (kurkomisi): log collection comes here.
for {
select {
case errc := <-db.quit:
errc <- nil
return
case <-time.After(db.config.Refresh / 2):
db.sendToAll(&Message{
Logs: &LogsMessage{
Log: []string{fmt.Sprintf("%-4d: This is a fake log.", id)},
},
})
id++
}
}
}
// sendToAll sends the given message to the active dashboards.
func (db *Dashboard) sendToAll(msg *Message) {
db.lock.Lock()
for _, c := range db.conns {
select {
case c.msg <- *msg:
case c.msg <- msg:
default:
c.conn.Close()
}