306 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			306 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
|   | // Copyright 2017 The go-ethereum Authors | ||
|  | // This file is part of the go-ethereum library. | ||
|  | // | ||
|  | // The go-ethereum library is free software: you can redistribute it and/or modify | ||
|  | // it under the terms of the GNU Lesser General Public License as published by | ||
|  | // the Free Software Foundation, either version 3 of the License, or | ||
|  | // (at your option) any later version. | ||
|  | // | ||
|  | // The go-ethereum library is distributed in the hope that it will be useful, | ||
|  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
|  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
|  | // GNU Lesser General Public License for more details. | ||
|  | // | ||
|  | // You should have received a copy of the GNU Lesser General Public License | ||
|  | // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | ||
|  | 
 | ||
|  | package dashboard | ||
|  | 
 | ||
|  | //go:generate go-bindata -nometadata -o assets.go -prefix assets -pkg dashboard assets/public/... | ||
|  | 
 | ||
|  | import ( | ||
|  | 	"fmt" | ||
|  | 	"io/ioutil" | ||
|  | 	"net" | ||
|  | 	"net/http" | ||
|  | 	"path/filepath" | ||
|  | 	"sync" | ||
|  | 	"sync/atomic" | ||
|  | 	"time" | ||
|  | 
 | ||
|  | 	"github.com/ethereum/go-ethereum/log" | ||
|  | 	"github.com/ethereum/go-ethereum/p2p" | ||
|  | 	"github.com/ethereum/go-ethereum/rpc" | ||
|  | 	"github.com/rcrowley/go-metrics" | ||
|  | 	"golang.org/x/net/websocket" | ||
|  | ) | ||
|  | 
 | ||
|  | const ( | ||
|  | 	memorySampleLimit  = 200 // Maximum number of memory data samples | ||
|  | 	trafficSampleLimit = 200 // Maximum number of traffic data samples | ||
|  | ) | ||
|  | 
 | ||
|  | var nextId uint32 // Next connection id | ||
|  | 
 | ||
|  | // Dashboard contains the dashboard internals. | ||
|  | type Dashboard struct { | ||
|  | 	config *Config | ||
|  | 
 | ||
|  | 	listener net.Listener | ||
|  | 	conns    map[uint32]*client // Currently live websocket connections | ||
|  | 	charts   charts             // The collected data samples to plot | ||
|  | 	lock     sync.RWMutex       // Lock protecting the dashboard's internals | ||
|  | 
 | ||
|  | 	quit chan chan error // Channel used for graceful exit | ||
|  | 	wg   sync.WaitGroup | ||
|  | } | ||
|  | 
 | ||
|  | // message embraces the data samples of a client message. | ||
|  | type message struct { | ||
|  | 	History *charts     `json:"history,omitempty"` // Past data samples | ||
|  | 	Memory  *chartEntry `json:"memory,omitempty"`  // One memory sample | ||
|  | 	Traffic *chartEntry `json:"traffic,omitempty"` // One traffic sample | ||
|  | 	Log     string      `json:"log,omitempty"`     // One log | ||
|  | } | ||
|  | 
 | ||
|  | // client represents active websocket connection with a remote browser. | ||
|  | type client struct { | ||
|  | 	conn   *websocket.Conn // Particular live websocket connection | ||
|  | 	msg    chan message    // Message queue for the update messages | ||
|  | 	logger log.Logger      // Logger for the particular live websocket connection | ||
|  | } | ||
|  | 
 | ||
|  | // charts contains the collected data samples. | ||
|  | type charts struct { | ||
|  | 	Memory  []*chartEntry `json:"memorySamples,omitempty"` | ||
|  | 	Traffic []*chartEntry `json:"trafficSamples,omitempty"` | ||
|  | } | ||
|  | 
 | ||
|  | // chartEntry represents one data sample | ||
|  | type chartEntry struct { | ||
|  | 	Time  time.Time `json:"time,omitempty"` | ||
|  | 	Value float64   `json:"value,omitempty"` | ||
|  | } | ||
|  | 
 | ||
|  | // New creates a new dashboard instance with the given configuration. | ||
|  | func New(config *Config) (*Dashboard, error) { | ||
|  | 	return &Dashboard{ | ||
|  | 		conns:  make(map[uint32]*client), | ||
|  | 		config: config, | ||
|  | 		quit:   make(chan chan error), | ||
|  | 	}, nil | ||
|  | } | ||
|  | 
 | ||
|  | // Protocols is a meaningless implementation of node.Service. | ||
|  | func (db *Dashboard) Protocols() []p2p.Protocol { return nil } | ||
|  | 
 | ||
|  | // APIs is a meaningless implementation of node.Service. | ||
|  | func (db *Dashboard) APIs() []rpc.API { return nil } | ||
|  | 
 | ||
|  | // Start implements node.Service, starting the data collection thread and the listening server of the dashboard. | ||
|  | func (db *Dashboard) Start(server *p2p.Server) error { | ||
|  | 	db.wg.Add(2) | ||
|  | 	go db.collectData() | ||
|  | 	go db.collectLogs() // In case of removing this line change 2 back to 1 in wg.Add. | ||
|  | 
 | ||
|  | 	http.HandleFunc("/", db.webHandler) | ||
|  | 	http.Handle("/api", websocket.Handler(db.apiHandler)) | ||
|  | 
 | ||
|  | 	listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", db.config.Host, db.config.Port)) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 	db.listener = listener | ||
|  | 
 | ||
|  | 	go http.Serve(listener, nil) | ||
|  | 
 | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | // Stop implements node.Service, stopping the data collection thread and the connection listener of the dashboard. | ||
|  | func (db *Dashboard) Stop() error { | ||
|  | 	// Close the connection listener. | ||
|  | 	var errs []error | ||
|  | 	if err := db.listener.Close(); err != nil { | ||
|  | 		errs = append(errs, err) | ||
|  | 	} | ||
|  | 	// Close the collectors. | ||
|  | 	errc := make(chan error, 1) | ||
|  | 	for i := 0; i < 2; i++ { | ||
|  | 		db.quit <- errc | ||
|  | 		if err := <-errc; err != nil { | ||
|  | 			errs = append(errs, err) | ||
|  | 		} | ||
|  | 	} | ||
|  | 	// Close the connections. | ||
|  | 	db.lock.Lock() | ||
|  | 	for _, c := range db.conns { | ||
|  | 		if err := c.conn.Close(); err != nil { | ||
|  | 			c.logger.Warn("Failed to close connection", "err", err) | ||
|  | 		} | ||
|  | 	} | ||
|  | 	db.lock.Unlock() | ||
|  | 
 | ||
|  | 	// Wait until every goroutine terminates. | ||
|  | 	db.wg.Wait() | ||
|  | 	log.Info("Dashboard stopped") | ||
|  | 
 | ||
|  | 	var err error | ||
|  | 	if len(errs) > 0 { | ||
|  | 		err = fmt.Errorf("%v", errs) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return err | ||
|  | } | ||
|  | 
 | ||
|  | // webHandler handles all non-api requests, simply flattening and returning the dashboard website. | ||
|  | func (db *Dashboard) webHandler(w http.ResponseWriter, r *http.Request) { | ||
|  | 	log.Debug("Request", "URL", r.URL) | ||
|  | 
 | ||
|  | 	path := r.URL.String() | ||
|  | 	if path == "/" { | ||
|  | 		path = "/dashboard.html" | ||
|  | 	} | ||
|  | 	// If the path of the assets is manually set | ||
|  | 	if db.config.Assets != "" { | ||
|  | 		blob, err := ioutil.ReadFile(filepath.Join(db.config.Assets, path)) | ||
|  | 		if err != nil { | ||
|  | 			log.Warn("Failed to read file", "path", path, "err", err) | ||
|  | 			http.Error(w, "not found", http.StatusNotFound) | ||
|  | 			return | ||
|  | 		} | ||
|  | 		w.Write(blob) | ||
|  | 		return | ||
|  | 	} | ||
|  | 	blob, err := Asset(filepath.Join("public", path)) | ||
|  | 	if err != nil { | ||
|  | 		log.Warn("Failed to load the asset", "path", path, "err", err) | ||
|  | 		http.Error(w, "not found", http.StatusNotFound) | ||
|  | 		return | ||
|  | 	} | ||
|  | 	w.Write(blob) | ||
|  | } | ||
|  | 
 | ||
|  | // apiHandler handles requests for the dashboard. | ||
|  | func (db *Dashboard) apiHandler(conn *websocket.Conn) { | ||
|  | 	id := atomic.AddUint32(&nextId, 1) | ||
|  | 	client := &client{ | ||
|  | 		conn:   conn, | ||
|  | 		msg:    make(chan message, 128), | ||
|  | 		logger: log.New("id", id), | ||
|  | 	} | ||
|  | 	done := make(chan struct{}) // Buffered channel as sender may exit early | ||
|  | 
 | ||
|  | 	// Start listening for messages to send. | ||
|  | 	db.wg.Add(1) | ||
|  | 	go func() { | ||
|  | 		defer db.wg.Done() | ||
|  | 
 | ||
|  | 		for { | ||
|  | 			select { | ||
|  | 			case <-done: | ||
|  | 				return | ||
|  | 			case msg := <-client.msg: | ||
|  | 				if err := websocket.JSON.Send(client.conn, msg); err != nil { | ||
|  | 					client.logger.Warn("Failed to send the message", "msg", msg, "err", err) | ||
|  | 					client.conn.Close() | ||
|  | 					return | ||
|  | 				} | ||
|  | 			} | ||
|  | 		} | ||
|  | 	}() | ||
|  | 	// Send the past data. | ||
|  | 	client.msg <- message{ | ||
|  | 		History: &db.charts, | ||
|  | 	} | ||
|  | 	// Start tracking the connection and drop at connection loss. | ||
|  | 	db.lock.Lock() | ||
|  | 	db.conns[id] = client | ||
|  | 	db.lock.Unlock() | ||
|  | 	defer func() { | ||
|  | 		db.lock.Lock() | ||
|  | 		delete(db.conns, id) | ||
|  | 		db.lock.Unlock() | ||
|  | 	}() | ||
|  | 	for { | ||
|  | 		fail := []byte{} | ||
|  | 		if _, err := conn.Read(fail); err != nil { | ||
|  | 			close(done) | ||
|  | 			return | ||
|  | 		} | ||
|  | 		// Ignore all messages | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // collectData collects the required data to plot on the dashboard. | ||
|  | func (db *Dashboard) collectData() { | ||
|  | 	defer db.wg.Done() | ||
|  | 
 | ||
|  | 	for { | ||
|  | 		select { | ||
|  | 		case errc := <-db.quit: | ||
|  | 			errc <- nil | ||
|  | 			return | ||
|  | 		case <-time.After(db.config.Refresh): | ||
|  | 			inboundTraffic := metrics.DefaultRegistry.Get("p2p/InboundTraffic").(metrics.Meter).Rate1() | ||
|  | 			memoryInUse := metrics.DefaultRegistry.Get("system/memory/inuse").(metrics.Meter).Rate1() | ||
|  | 			now := time.Now() | ||
|  | 			memory := &chartEntry{ | ||
|  | 				Time:  now, | ||
|  | 				Value: memoryInUse, | ||
|  | 			} | ||
|  | 			traffic := &chartEntry{ | ||
|  | 				Time:  now, | ||
|  | 				Value: inboundTraffic, | ||
|  | 			} | ||
|  | 			// Remove the first elements in case the samples' amount exceeds the limit. | ||
|  | 			first := 0 | ||
|  | 			if len(db.charts.Memory) == memorySampleLimit { | ||
|  | 				first = 1 | ||
|  | 			} | ||
|  | 			db.charts.Memory = append(db.charts.Memory[first:], memory) | ||
|  | 			first = 0 | ||
|  | 			if len(db.charts.Traffic) == trafficSampleLimit { | ||
|  | 				first = 1 | ||
|  | 			} | ||
|  | 			db.charts.Traffic = append(db.charts.Traffic[first:], traffic) | ||
|  | 
 | ||
|  | 			db.sendToAll(&message{ | ||
|  | 				Memory:  memory, | ||
|  | 				Traffic: traffic, | ||
|  | 			}) | ||
|  | 		} | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // collectLogs collects and sends the logs to the active dashboards. | ||
|  | func (db *Dashboard) collectLogs() { | ||
|  | 	defer db.wg.Done() | ||
|  | 
 | ||
|  | 	// TODO (kurkomisi): log collection comes here. | ||
|  | 	for { | ||
|  | 		select { | ||
|  | 		case errc := <-db.quit: | ||
|  | 			errc <- nil | ||
|  | 			return | ||
|  | 		case <-time.After(db.config.Refresh / 2): | ||
|  | 			db.sendToAll(&message{ | ||
|  | 				Log: "This is a fake log.", | ||
|  | 			}) | ||
|  | 		} | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // sendToAll sends the given message to the active dashboards. | ||
|  | func (db *Dashboard) sendToAll(msg *message) { | ||
|  | 	db.lock.Lock() | ||
|  | 	for _, c := range db.conns { | ||
|  | 		select { | ||
|  | 		case c.msg <- *msg: | ||
|  | 		default: | ||
|  | 			c.conn.Close() | ||
|  | 		} | ||
|  | 	} | ||
|  | 	db.lock.Unlock() | ||
|  | } |