dashboard: integrate Flow, sketch message API (#15713)

* dashboard: minor design change

* dashboard: Flow integration, message API

* dashboard: minor polishes, exclude misspell linter
This commit is contained in:
Kurkó Mihály
2017-12-21 17:54:38 +02:00
committed by Péter Szilágyi
parent 52f4d6dd78
commit 9dbb8ef4aa
23 changed files with 49950 additions and 610 deletions

View File

@ -16,7 +16,10 @@
package dashboard
//go:generate go-bindata -nometadata -o assets.go -prefix assets -pkg dashboard assets/public/...
//go:generate ./assets/node_modules/.bin/webpack --config ./assets/webpack.config.js --context ./assets
//go:generate go-bindata -nometadata -o assets.go -prefix assets -nocompress -pkg dashboard assets/public/...
//go:generate gofmt -s -w assets.go
//go:generate sed -i "s#var _public#//nolint:misspell\\n&#" assets.go
import (
"fmt"
@ -40,7 +43,7 @@ const (
trafficSampleLimit = 200 // Maximum number of traffic data samples
)
var nextId uint32 // Next connection id
var nextID uint32 // Next connection id
// Dashboard contains the dashboard internals.
type Dashboard struct {
@ -48,46 +51,30 @@ type Dashboard struct {
listener net.Listener
conns map[uint32]*client // Currently live websocket connections
charts charts // The collected data samples to plot
lock sync.RWMutex // Lock protecting the dashboard's internals
charts *HomeMessage
lock sync.RWMutex // Lock protecting the dashboard's internals
quit chan chan error // Channel used for graceful exit
wg sync.WaitGroup
}
// message embraces the data samples of a client message.
type message struct {
History *charts `json:"history,omitempty"` // Past data samples
Memory *chartEntry `json:"memory,omitempty"` // One memory sample
Traffic *chartEntry `json:"traffic,omitempty"` // One traffic sample
Log string `json:"log,omitempty"` // One log
}
// client represents active websocket connection with a remote browser.
type client struct {
conn *websocket.Conn // Particular live websocket connection
msg chan message // Message queue for the update messages
msg chan Message // Message queue for the update messages
logger log.Logger // Logger for the particular live websocket connection
}
// charts contains the collected data samples.
type charts struct {
Memory []*chartEntry `json:"memorySamples,omitempty"`
Traffic []*chartEntry `json:"trafficSamples,omitempty"`
}
// chartEntry represents one data sample
type chartEntry struct {
Time time.Time `json:"time,omitempty"`
Value float64 `json:"value,omitempty"`
}
// New creates a new dashboard instance with the given configuration.
func New(config *Config) (*Dashboard, error) {
return &Dashboard{
conns: make(map[uint32]*client),
config: config,
quit: make(chan chan error),
charts: &HomeMessage{
Memory: &Chart{},
Traffic: &Chart{},
},
}, nil
}
@ -183,13 +170,13 @@ func (db *Dashboard) webHandler(w http.ResponseWriter, r *http.Request) {
// apiHandler handles requests for the dashboard.
func (db *Dashboard) apiHandler(conn *websocket.Conn) {
id := atomic.AddUint32(&nextId, 1)
id := atomic.AddUint32(&nextID, 1)
client := &client{
conn: conn,
msg: make(chan message, 128),
msg: make(chan Message, 128),
logger: log.New("id", id),
}
done := make(chan struct{}) // Buffered channel as sender may exit early
done := make(chan struct{})
// Start listening for messages to send.
db.wg.Add(1)
@ -210,8 +197,15 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) {
}
}()
// Send the past data.
client.msg <- message{
History: &db.charts,
client.msg <- Message{
Home: &HomeMessage{
Memory: &Chart{
History: db.charts.Memory.History,
},
Traffic: &Chart{
History: db.charts.Traffic.History,
},
},
}
// Start tracking the connection and drop at connection loss.
db.lock.Lock()
@ -245,29 +239,34 @@ func (db *Dashboard) collectData() {
inboundTraffic := metrics.DefaultRegistry.Get("p2p/InboundTraffic").(metrics.Meter).Rate1()
memoryInUse := metrics.DefaultRegistry.Get("system/memory/inuse").(metrics.Meter).Rate1()
now := time.Now()
memory := &chartEntry{
memory := &ChartEntry{
Time: now,
Value: memoryInUse,
}
traffic := &chartEntry{
traffic := &ChartEntry{
Time: now,
Value: inboundTraffic,
}
// Remove the first elements in case the samples' amount exceeds the limit.
first := 0
if len(db.charts.Memory) == memorySampleLimit {
if len(db.charts.Memory.History) == memorySampleLimit {
first = 1
}
db.charts.Memory = append(db.charts.Memory[first:], memory)
db.charts.Memory.History = append(db.charts.Memory.History[first:], memory)
first = 0
if len(db.charts.Traffic) == trafficSampleLimit {
if len(db.charts.Traffic.History) == trafficSampleLimit {
first = 1
}
db.charts.Traffic = append(db.charts.Traffic[first:], traffic)
db.charts.Traffic.History = append(db.charts.Traffic.History[first:], traffic)
db.sendToAll(&message{
Memory: memory,
Traffic: traffic,
db.sendToAll(&Message{
Home: &HomeMessage{
Memory: &Chart{
New: memory,
},
Traffic: &Chart{
New: traffic,
},
},
})
}
}
@ -277,6 +276,7 @@ func (db *Dashboard) collectData() {
func (db *Dashboard) collectLogs() {
defer db.wg.Done()
id := 1
// TODO (kurkomisi): log collection comes here.
for {
select {
@ -284,15 +284,18 @@ func (db *Dashboard) collectLogs() {
errc <- nil
return
case <-time.After(db.config.Refresh / 2):
db.sendToAll(&message{
Log: "This is a fake log.",
db.sendToAll(&Message{
Logs: &LogsMessage{
Log: fmt.Sprintf("%-4d: This is a fake log.", id),
},
})
id++
}
}
}
// sendToAll sends the given message to the active dashboards.
func (db *Dashboard) sendToAll(msg *message) {
func (db *Dashboard) sendToAll(msg *Message) {
db.lock.Lock()
for _, c := range db.conns {
select {