* swarm: initial instrumentation with go-metrics * swarm: initialise metrics collection and add ResettingTimer to HTTP requests * swarm: update metrics flags names. remove redundant Timer. * swarm: rename method for periodically updating gauges * swarm: finalise metrics after feedback * swarm/network: always init kad metrics containers * swarm/network: off-by-one index in metrics containers * swarm, metrics: resolved conflicts
		
			
				
	
	
		
			404 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			404 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2016 The go-ethereum Authors
 | |
| // This file is part of the go-ethereum library.
 | |
| //
 | |
| // The go-ethereum library is free software: you can redistribute it and/or modify
 | |
| // it under the terms of the GNU Lesser General Public License as published by
 | |
| // the Free Software Foundation, either version 3 of the License, or
 | |
| // (at your option) any later version.
 | |
| //
 | |
| // The go-ethereum library is distributed in the hope that it will be useful,
 | |
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | |
| // GNU Lesser General Public License for more details.
 | |
| //
 | |
| // You should have received a copy of the GNU Lesser General Public License
 | |
| // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| package network
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"math/rand"
 | |
| 	"path/filepath"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/ethereum/go-ethereum/common"
 | |
| 	"github.com/ethereum/go-ethereum/log"
 | |
| 	"github.com/ethereum/go-ethereum/metrics"
 | |
| 	"github.com/ethereum/go-ethereum/p2p/discover"
 | |
| 	"github.com/ethereum/go-ethereum/p2p/netutil"
 | |
| 	"github.com/ethereum/go-ethereum/swarm/network/kademlia"
 | |
| 	"github.com/ethereum/go-ethereum/swarm/storage"
 | |
| )
 | |
| 
 | |
| // Hive is the logistic manager of the swarm
 | |
| // it uses a generic kademlia nodetable to find best peer list
 | |
| // for any target
 | |
| // this is used by the netstore to search for content in the swarm
 | |
| // the bzz protocol peersMsgData exchange is relayed to Kademlia
 | |
| // for db storage and filtering
 | |
| // connections and disconnections are reported and relayed
 | |
| // to keep the nodetable uptodate
 | |
| 
 | |
| var (
 | |
| 	peersNumGauge     = metrics.NewRegisteredGauge("network.peers.num", nil)
 | |
| 	addPeerCounter    = metrics.NewRegisteredCounter("network.addpeer.count", nil)
 | |
| 	removePeerCounter = metrics.NewRegisteredCounter("network.removepeer.count", nil)
 | |
| )
 | |
| 
 | |
| type Hive struct {
 | |
| 	listenAddr   func() string
 | |
| 	callInterval uint64
 | |
| 	id           discover.NodeID
 | |
| 	addr         kademlia.Address
 | |
| 	kad          *kademlia.Kademlia
 | |
| 	path         string
 | |
| 	quit         chan bool
 | |
| 	toggle       chan bool
 | |
| 	more         chan bool
 | |
| 
 | |
| 	// for testing only
 | |
| 	swapEnabled bool
 | |
| 	syncEnabled bool
 | |
| 	blockRead   bool
 | |
| 	blockWrite  bool
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	callInterval = 3000000000
 | |
| 	// bucketSize   = 3
 | |
| 	// maxProx      = 8
 | |
| 	// proxBinSize  = 4
 | |
| )
 | |
| 
 | |
| type HiveParams struct {
 | |
| 	CallInterval uint64
 | |
| 	KadDbPath    string
 | |
| 	*kademlia.KadParams
 | |
| }
 | |
| 
 | |
| //create default params
 | |
| func NewDefaultHiveParams() *HiveParams {
 | |
| 	kad := kademlia.NewDefaultKadParams()
 | |
| 	// kad.BucketSize = bucketSize
 | |
| 	// kad.MaxProx = maxProx
 | |
| 	// kad.ProxBinSize = proxBinSize
 | |
| 
 | |
| 	return &HiveParams{
 | |
| 		CallInterval: callInterval,
 | |
| 		KadParams:    kad,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| //this can only finally be set after all config options (file, cmd line, env vars)
 | |
| //have been evaluated
 | |
| func (self *HiveParams) Init(path string) {
 | |
| 	self.KadDbPath = filepath.Join(path, "bzz-peers.json")
 | |
| }
 | |
| 
 | |
| func NewHive(addr common.Hash, params *HiveParams, swapEnabled, syncEnabled bool) *Hive {
 | |
| 	kad := kademlia.New(kademlia.Address(addr), params.KadParams)
 | |
| 	return &Hive{
 | |
| 		callInterval: params.CallInterval,
 | |
| 		kad:          kad,
 | |
| 		addr:         kad.Addr(),
 | |
| 		path:         params.KadDbPath,
 | |
| 		swapEnabled:  swapEnabled,
 | |
| 		syncEnabled:  syncEnabled,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (self *Hive) SyncEnabled(on bool) {
 | |
| 	self.syncEnabled = on
 | |
| }
 | |
| 
 | |
| func (self *Hive) SwapEnabled(on bool) {
 | |
| 	self.swapEnabled = on
 | |
| }
 | |
| 
 | |
| func (self *Hive) BlockNetworkRead(on bool) {
 | |
| 	self.blockRead = on
 | |
| }
 | |
| 
 | |
| func (self *Hive) BlockNetworkWrite(on bool) {
 | |
| 	self.blockWrite = on
 | |
| }
 | |
| 
 | |
| // public accessor to the hive base address
 | |
| func (self *Hive) Addr() kademlia.Address {
 | |
| 	return self.addr
 | |
| }
 | |
| 
 | |
| // Start receives network info only at startup
 | |
| // listedAddr is a function to retrieve listening address to advertise to peers
 | |
| // connectPeer is a function to connect to a peer based on its NodeID or enode URL
 | |
| // there are called on the p2p.Server which runs on the node
 | |
| func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPeer func(string) error) (err error) {
 | |
| 	self.toggle = make(chan bool)
 | |
| 	self.more = make(chan bool)
 | |
| 	self.quit = make(chan bool)
 | |
| 	self.id = id
 | |
| 	self.listenAddr = listenAddr
 | |
| 	err = self.kad.Load(self.path, nil)
 | |
| 	if err != nil {
 | |
| 		log.Warn(fmt.Sprintf("Warning: error reading kaddb '%s' (skipping): %v", self.path, err))
 | |
| 		err = nil
 | |
| 	}
 | |
| 	// this loop is doing bootstrapping and maintains a healthy table
 | |
| 	go self.keepAlive()
 | |
| 	go func() {
 | |
| 		// whenever toggled ask kademlia about most preferred peer
 | |
| 		for alive := range self.more {
 | |
| 			if !alive {
 | |
| 				// receiving false closes the loop while allowing parallel routines
 | |
| 				// to attempt to write to more (remove Peer when shutting down)
 | |
| 				return
 | |
| 			}
 | |
| 			node, need, proxLimit := self.kad.Suggest()
 | |
| 
 | |
| 			if node != nil && len(node.Url) > 0 {
 | |
| 				log.Trace(fmt.Sprintf("call known bee %v", node.Url))
 | |
| 				// enode or any lower level connection address is unnecessary in future
 | |
| 				// discovery table is used to look it up.
 | |
| 				connectPeer(node.Url)
 | |
| 			}
 | |
| 			if need {
 | |
| 				// a random peer is taken from the table
 | |
| 				peers := self.kad.FindClosest(kademlia.RandomAddressAt(self.addr, rand.Intn(self.kad.MaxProx)), 1)
 | |
| 				if len(peers) > 0 {
 | |
| 					// a random address at prox bin 0 is sent for lookup
 | |
| 					randAddr := kademlia.RandomAddressAt(self.addr, proxLimit)
 | |
| 					req := &retrieveRequestMsgData{
 | |
| 						Key: storage.Key(randAddr[:]),
 | |
| 					}
 | |
| 					log.Trace(fmt.Sprintf("call any bee near %v (PO%03d) - messenger bee: %v", randAddr, proxLimit, peers[0]))
 | |
| 					peers[0].(*peer).retrieve(req)
 | |
| 				} else {
 | |
| 					log.Warn(fmt.Sprintf("no peer"))
 | |
| 				}
 | |
| 				log.Trace(fmt.Sprintf("buzz kept alive"))
 | |
| 			} else {
 | |
| 				log.Info(fmt.Sprintf("no need for more bees"))
 | |
| 			}
 | |
| 			select {
 | |
| 			case self.toggle <- need:
 | |
| 			case <-self.quit:
 | |
| 				return
 | |
| 			}
 | |
| 			log.Debug(fmt.Sprintf("queen's address: %v, population: %d (%d)", self.addr, self.kad.Count(), self.kad.DBCount()))
 | |
| 		}
 | |
| 	}()
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // keepAlive is a forever loop
 | |
| // in its awake state it periodically triggers connection attempts
 | |
| // by writing to self.more until Kademlia Table is saturated
 | |
| // wake state is toggled by writing to self.toggle
 | |
| // it restarts if the table becomes non-full again due to disconnections
 | |
| func (self *Hive) keepAlive() {
 | |
| 	alarm := time.NewTicker(time.Duration(self.callInterval)).C
 | |
| 	for {
 | |
| 		peersNumGauge.Update(int64(self.kad.Count()))
 | |
| 		select {
 | |
| 		case <-alarm:
 | |
| 			if self.kad.DBCount() > 0 {
 | |
| 				select {
 | |
| 				case self.more <- true:
 | |
| 					log.Debug(fmt.Sprintf("buzz wakeup"))
 | |
| 				default:
 | |
| 				}
 | |
| 			}
 | |
| 		case need := <-self.toggle:
 | |
| 			if alarm == nil && need {
 | |
| 				alarm = time.NewTicker(time.Duration(self.callInterval)).C
 | |
| 			}
 | |
| 			if alarm != nil && !need {
 | |
| 				alarm = nil
 | |
| 
 | |
| 			}
 | |
| 		case <-self.quit:
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (self *Hive) Stop() error {
 | |
| 	// closing toggle channel quits the updateloop
 | |
| 	close(self.quit)
 | |
| 	return self.kad.Save(self.path, saveSync)
 | |
| }
 | |
| 
 | |
| // called at the end of a successful protocol handshake
 | |
| func (self *Hive) addPeer(p *peer) error {
 | |
| 	addPeerCounter.Inc(1)
 | |
| 	defer func() {
 | |
| 		select {
 | |
| 		case self.more <- true:
 | |
| 		default:
 | |
| 		}
 | |
| 	}()
 | |
| 	log.Trace(fmt.Sprintf("hi new bee %v", p))
 | |
| 	err := self.kad.On(p, loadSync)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	// self lookup (can be encoded as nil/zero key since peers addr known) + no id ()
 | |
| 	// the most common way of saying hi in bzz is initiation of gossip
 | |
| 	// let me know about anyone new from my hood , here is the storageradius
 | |
| 	// to send the 6 byte self lookup
 | |
| 	// we do not record as request or forward it, just reply with peers
 | |
| 	p.retrieve(&retrieveRequestMsgData{})
 | |
| 	log.Trace(fmt.Sprintf("'whatsup wheresdaparty' sent to %v", p))
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // called after peer disconnected
 | |
| func (self *Hive) removePeer(p *peer) {
 | |
| 	removePeerCounter.Inc(1)
 | |
| 	log.Debug(fmt.Sprintf("bee %v removed", p))
 | |
| 	self.kad.Off(p, saveSync)
 | |
| 	select {
 | |
| 	case self.more <- true:
 | |
| 	default:
 | |
| 	}
 | |
| 	if self.kad.Count() == 0 {
 | |
| 		log.Debug(fmt.Sprintf("empty, all bees gone"))
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Retrieve a list of live peers that are closer to target than us
 | |
| func (self *Hive) getPeers(target storage.Key, max int) (peers []*peer) {
 | |
| 	var addr kademlia.Address
 | |
| 	copy(addr[:], target[:])
 | |
| 	for _, node := range self.kad.FindClosest(addr, max) {
 | |
| 		peers = append(peers, node.(*peer))
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // disconnects all the peers
 | |
| func (self *Hive) DropAll() {
 | |
| 	log.Info(fmt.Sprintf("dropping all bees"))
 | |
| 	for _, node := range self.kad.FindClosest(kademlia.Address{}, 0) {
 | |
| 		node.Drop()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // contructor for kademlia.NodeRecord based on peer address alone
 | |
| // TODO: should go away and only addr passed to kademlia
 | |
| func newNodeRecord(addr *peerAddr) *kademlia.NodeRecord {
 | |
| 	now := time.Now()
 | |
| 	return &kademlia.NodeRecord{
 | |
| 		Addr:  addr.Addr,
 | |
| 		Url:   addr.String(),
 | |
| 		Seen:  now,
 | |
| 		After: now,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // called by the protocol when receiving peerset (for target address)
 | |
| // peersMsgData is converted to a slice of NodeRecords for Kademlia
 | |
| // this is to store all thats needed
 | |
| func (self *Hive) HandlePeersMsg(req *peersMsgData, from *peer) {
 | |
| 	var nrs []*kademlia.NodeRecord
 | |
| 	for _, p := range req.Peers {
 | |
| 		if err := netutil.CheckRelayIP(from.remoteAddr.IP, p.IP); err != nil {
 | |
| 			log.Trace(fmt.Sprintf("invalid peer IP %v from %v: %v", from.remoteAddr.IP, p.IP, err))
 | |
| 			continue
 | |
| 		}
 | |
| 		nrs = append(nrs, newNodeRecord(p))
 | |
| 	}
 | |
| 	self.kad.Add(nrs)
 | |
| }
 | |
| 
 | |
| // peer wraps the protocol instance to represent a connected peer
 | |
| // it implements kademlia.Node interface
 | |
| type peer struct {
 | |
| 	*bzz // protocol instance running on peer connection
 | |
| }
 | |
| 
 | |
| // protocol instance implements kademlia.Node interface (embedded peer)
 | |
| func (self *peer) Addr() kademlia.Address {
 | |
| 	return self.remoteAddr.Addr
 | |
| }
 | |
| 
 | |
| func (self *peer) Url() string {
 | |
| 	return self.remoteAddr.String()
 | |
| }
 | |
| 
 | |
| // TODO take into account traffic
 | |
| func (self *peer) LastActive() time.Time {
 | |
| 	return self.lastActive
 | |
| }
 | |
| 
 | |
| // reads the serialised form of sync state persisted as the 'Meta' attribute
 | |
| // and sets the decoded syncState on the online node
 | |
| func loadSync(record *kademlia.NodeRecord, node kademlia.Node) error {
 | |
| 	p, ok := node.(*peer)
 | |
| 	if !ok {
 | |
| 		return fmt.Errorf("invalid type")
 | |
| 	}
 | |
| 	if record.Meta == nil {
 | |
| 		log.Debug(fmt.Sprintf("no sync state for node record %v setting default", record))
 | |
| 		p.syncState = &syncState{DbSyncState: &storage.DbSyncState{}}
 | |
| 		return nil
 | |
| 	}
 | |
| 	state, err := decodeSync(record.Meta)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("error decoding kddb record meta info into a sync state: %v", err)
 | |
| 	}
 | |
| 	log.Trace(fmt.Sprintf("sync state for node record %v read from Meta: %s", record, string(*(record.Meta))))
 | |
| 	p.syncState = state
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // callback when saving a sync state
 | |
| func saveSync(record *kademlia.NodeRecord, node kademlia.Node) {
 | |
| 	if p, ok := node.(*peer); ok {
 | |
| 		meta, err := encodeSync(p.syncState)
 | |
| 		if err != nil {
 | |
| 			log.Warn(fmt.Sprintf("error saving sync state for %v: %v", node, err))
 | |
| 			return
 | |
| 		}
 | |
| 		log.Trace(fmt.Sprintf("saved sync state for %v: %s", node, string(*meta)))
 | |
| 		record.Meta = meta
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // the immediate response to a retrieve request,
 | |
| // sends relevant peer data given by the kademlia hive to the requester
 | |
| // TODO: remember peers sent for duration of the session, only new peers sent
 | |
| func (self *Hive) peers(req *retrieveRequestMsgData) {
 | |
| 	if req != nil {
 | |
| 		var addrs []*peerAddr
 | |
| 		if req.timeout == nil || time.Now().Before(*(req.timeout)) {
 | |
| 			key := req.Key
 | |
| 			// self lookup from remote peer
 | |
| 			if storage.IsZeroKey(key) {
 | |
| 				addr := req.from.Addr()
 | |
| 				key = storage.Key(addr[:])
 | |
| 				req.Key = nil
 | |
| 			}
 | |
| 			// get peer addresses from hive
 | |
| 			for _, peer := range self.getPeers(key, int(req.MaxPeers)) {
 | |
| 				addrs = append(addrs, peer.remoteAddr)
 | |
| 			}
 | |
| 			log.Debug(fmt.Sprintf("Hive sending %d peer addresses to %v. req.Id: %v, req.Key: %v", len(addrs), req.from, req.Id, req.Key.Log()))
 | |
| 
 | |
| 			peersData := &peersMsgData{
 | |
| 				Peers: addrs,
 | |
| 				Key:   req.Key,
 | |
| 				Id:    req.Id,
 | |
| 			}
 | |
| 			peersData.setTimeout(req.timeout)
 | |
| 			req.from.peers(peersData)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (self *Hive) String() string {
 | |
| 	return self.kad.String()
 | |
| }
 |