swarm/storage: remove traces for put/get/set (#1389) * swarm/storage: remove traces for put/get/set * swarm/storage: remove Has traces
		
			
				
	
	
		
			812 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			812 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2018 The go-ethereum Authors
 | 
						|
// This file is part of the go-ethereum library.
 | 
						|
//
 | 
						|
// The go-ethereum library is free software: you can redistribute it and/or modify
 | 
						|
// it under the terms of the GNU Lesser General Public License as published by
 | 
						|
// the Free Software Foundation, either version 3 of the License, or
 | 
						|
// (at your option) any later version.
 | 
						|
//
 | 
						|
// The go-ethereum library is distributed in the hope that it will be useful,
 | 
						|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | 
						|
// GNU Lesser General Public License for more details.
 | 
						|
//
 | 
						|
// You should have received a copy of the GNU Lesser General Public License
 | 
						|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
package stream
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"math"
 | 
						|
	"reflect"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/ethereum/go-ethereum/metrics"
 | 
						|
	"github.com/ethereum/go-ethereum/p2p"
 | 
						|
	"github.com/ethereum/go-ethereum/p2p/enode"
 | 
						|
	"github.com/ethereum/go-ethereum/p2p/protocols"
 | 
						|
	"github.com/ethereum/go-ethereum/rpc"
 | 
						|
	"github.com/ethereum/go-ethereum/swarm/log"
 | 
						|
	"github.com/ethereum/go-ethereum/swarm/network"
 | 
						|
	"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
 | 
						|
	"github.com/ethereum/go-ethereum/swarm/state"
 | 
						|
	"github.com/ethereum/go-ethereum/swarm/storage"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	Low uint8 = iota
 | 
						|
	Mid
 | 
						|
	High
 | 
						|
	Top
 | 
						|
	PriorityQueue    = 4    // number of priority queues - Low, Mid, High, Top
 | 
						|
	PriorityQueueCap = 4096 // queue capacity
 | 
						|
	HashSize         = 32
 | 
						|
)
 | 
						|
 | 
						|
// Enumerate options for syncing and retrieval
 | 
						|
type SyncingOption int
 | 
						|
 | 
						|
// Syncing options
 | 
						|
const (
 | 
						|
	// Syncing disabled
 | 
						|
	SyncingDisabled SyncingOption = iota
 | 
						|
	// Register the client and the server but not subscribe
 | 
						|
	SyncingRegisterOnly
 | 
						|
	// Both client and server funcs are registered, subscribe sent automatically
 | 
						|
	SyncingAutoSubscribe
 | 
						|
)
 | 
						|
 | 
						|
// subscriptionFunc is used to determine what to do in order to perform subscriptions
 | 
						|
// usually we would start to really subscribe to nodes, but for tests other functionality may be needed
 | 
						|
// (see TestRequestPeerSubscriptions in streamer_test.go)
 | 
						|
var subscriptionFunc = doRequestSubscription
 | 
						|
 | 
						|
// Registry registry for outgoing and incoming streamer constructors
 | 
						|
type Registry struct {
 | 
						|
	addr            enode.ID
 | 
						|
	api             *API
 | 
						|
	skipCheck       bool
 | 
						|
	clientMu        sync.RWMutex
 | 
						|
	serverMu        sync.RWMutex
 | 
						|
	peersMu         sync.RWMutex
 | 
						|
	serverFuncs     map[string]func(*Peer, string, bool) (Server, error)
 | 
						|
	clientFuncs     map[string]func(*Peer, string, bool) (Client, error)
 | 
						|
	peers           map[enode.ID]*Peer
 | 
						|
	delivery        *Delivery
 | 
						|
	intervalsStore  state.Store
 | 
						|
	maxPeerServers  int
 | 
						|
	spec            *protocols.Spec   //this protocol's spec
 | 
						|
	balance         protocols.Balance //implements protocols.Balance, for accounting
 | 
						|
	prices          protocols.Prices  //implements protocols.Prices, provides prices to accounting
 | 
						|
	quit            chan struct{}     // terminates registry goroutines
 | 
						|
	syncMode        SyncingOption
 | 
						|
	syncUpdateDelay time.Duration
 | 
						|
}
 | 
						|
 | 
						|
// RegistryOptions holds optional values for NewRegistry constructor.
 | 
						|
type RegistryOptions struct {
 | 
						|
	SkipCheck       bool
 | 
						|
	Syncing         SyncingOption // Defines syncing behavior
 | 
						|
	SyncUpdateDelay time.Duration
 | 
						|
	MaxPeerServers  int // The limit of servers for each peer in registry
 | 
						|
}
 | 
						|
 | 
						|
// NewRegistry is Streamer constructor
 | 
						|
func NewRegistry(localID enode.ID, delivery *Delivery, netStore *storage.NetStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry {
 | 
						|
	if options == nil {
 | 
						|
		options = &RegistryOptions{}
 | 
						|
	}
 | 
						|
	if options.SyncUpdateDelay <= 0 {
 | 
						|
		options.SyncUpdateDelay = 15 * time.Second
 | 
						|
	}
 | 
						|
 | 
						|
	quit := make(chan struct{})
 | 
						|
 | 
						|
	streamer := &Registry{
 | 
						|
		addr:            localID,
 | 
						|
		skipCheck:       options.SkipCheck,
 | 
						|
		serverFuncs:     make(map[string]func(*Peer, string, bool) (Server, error)),
 | 
						|
		clientFuncs:     make(map[string]func(*Peer, string, bool) (Client, error)),
 | 
						|
		peers:           make(map[enode.ID]*Peer),
 | 
						|
		delivery:        delivery,
 | 
						|
		intervalsStore:  intervalsStore,
 | 
						|
		maxPeerServers:  options.MaxPeerServers,
 | 
						|
		balance:         balance,
 | 
						|
		quit:            quit,
 | 
						|
		syncUpdateDelay: options.SyncUpdateDelay,
 | 
						|
		syncMode:        options.Syncing,
 | 
						|
	}
 | 
						|
 | 
						|
	streamer.setupSpec()
 | 
						|
 | 
						|
	streamer.api = NewAPI(streamer)
 | 
						|
	delivery.getPeer = streamer.getPeer
 | 
						|
 | 
						|
	// If syncing is not disabled, the syncing functions are registered (both client and server)
 | 
						|
	if options.Syncing != SyncingDisabled {
 | 
						|
		RegisterSwarmSyncerServer(streamer, netStore)
 | 
						|
		RegisterSwarmSyncerClient(streamer, netStore)
 | 
						|
	}
 | 
						|
 | 
						|
	return streamer
 | 
						|
}
 | 
						|
 | 
						|
// This is an accounted protocol, therefore we need to provide a pricing Hook to the spec
 | 
						|
// For simulations to be able to run multiple nodes and not override the hook's balance,
 | 
						|
// we need to construct a spec instance per node instance
 | 
						|
func (r *Registry) setupSpec() {
 | 
						|
	// first create the "bare" spec
 | 
						|
	r.createSpec()
 | 
						|
	// now create the pricing object
 | 
						|
	r.createPriceOracle()
 | 
						|
	// if balance is nil, this node has been started without swap support (swapEnabled flag is false)
 | 
						|
	if r.balance != nil && !reflect.ValueOf(r.balance).IsNil() {
 | 
						|
		// swap is enabled, so setup the hook
 | 
						|
		r.spec.Hook = protocols.NewAccounting(r.balance, r.prices)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// RegisterClient registers an incoming streamer constructor
 | 
						|
func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error)) {
 | 
						|
	r.clientMu.Lock()
 | 
						|
	defer r.clientMu.Unlock()
 | 
						|
 | 
						|
	r.clientFuncs[stream] = f
 | 
						|
}
 | 
						|
 | 
						|
// RegisterServer registers an outgoing streamer constructor
 | 
						|
func (r *Registry) RegisterServerFunc(stream string, f func(*Peer, string, bool) (Server, error)) {
 | 
						|
	r.serverMu.Lock()
 | 
						|
	defer r.serverMu.Unlock()
 | 
						|
 | 
						|
	r.serverFuncs[stream] = f
 | 
						|
}
 | 
						|
 | 
						|
// GetClient accessor for incoming streamer constructors
 | 
						|
func (r *Registry) GetClientFunc(stream string) (func(*Peer, string, bool) (Client, error), error) {
 | 
						|
	r.clientMu.RLock()
 | 
						|
	defer r.clientMu.RUnlock()
 | 
						|
 | 
						|
	f := r.clientFuncs[stream]
 | 
						|
	if f == nil {
 | 
						|
		return nil, fmt.Errorf("stream %v not registered", stream)
 | 
						|
	}
 | 
						|
	return f, nil
 | 
						|
}
 | 
						|
 | 
						|
// GetServer accessor for incoming streamer constructors
 | 
						|
func (r *Registry) GetServerFunc(stream string) (func(*Peer, string, bool) (Server, error), error) {
 | 
						|
	r.serverMu.RLock()
 | 
						|
	defer r.serverMu.RUnlock()
 | 
						|
 | 
						|
	f := r.serverFuncs[stream]
 | 
						|
	if f == nil {
 | 
						|
		return nil, fmt.Errorf("stream %v not registered", stream)
 | 
						|
	}
 | 
						|
	return f, nil
 | 
						|
}
 | 
						|
 | 
						|
func (r *Registry) RequestSubscription(peerId enode.ID, s Stream, h *Range, prio uint8) error {
 | 
						|
	// check if the stream is registered
 | 
						|
	if _, err := r.GetServerFunc(s.Name); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	peer := r.getPeer(peerId)
 | 
						|
	if peer == nil {
 | 
						|
		return fmt.Errorf("peer not found %v", peerId)
 | 
						|
	}
 | 
						|
 | 
						|
	if _, err := peer.getServer(s); err != nil {
 | 
						|
		if e, ok := err.(*notFoundError); ok && e.t == "server" {
 | 
						|
			// request subscription only if the server for this stream is not created
 | 
						|
			log.Debug("RequestSubscription ", "peer", peerId, "stream", s, "history", h)
 | 
						|
			return peer.Send(context.TODO(), &RequestSubscriptionMsg{
 | 
						|
				Stream:   s,
 | 
						|
				History:  h,
 | 
						|
				Priority: prio,
 | 
						|
			})
 | 
						|
		}
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	log.Trace("RequestSubscription: already subscribed", "peer", peerId, "stream", s, "history", h)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Subscribe initiates the streamer
 | 
						|
func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8) error {
 | 
						|
	// check if the stream is registered
 | 
						|
	if _, err := r.GetClientFunc(s.Name); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	peer := r.getPeer(peerId)
 | 
						|
	if peer == nil {
 | 
						|
		return fmt.Errorf("peer not found %v", peerId)
 | 
						|
	}
 | 
						|
 | 
						|
	var to uint64
 | 
						|
	if !s.Live && h != nil {
 | 
						|
		to = h.To
 | 
						|
	}
 | 
						|
 | 
						|
	err := peer.setClientParams(s, newClientParams(priority, to))
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if s.Live && h != nil {
 | 
						|
		if err := peer.setClientParams(
 | 
						|
			getHistoryStream(s),
 | 
						|
			newClientParams(getHistoryPriority(priority), h.To),
 | 
						|
		); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	msg := &SubscribeMsg{
 | 
						|
		Stream:   s,
 | 
						|
		History:  h,
 | 
						|
		Priority: priority,
 | 
						|
	}
 | 
						|
	log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h)
 | 
						|
 | 
						|
	return peer.Send(context.TODO(), msg)
 | 
						|
}
 | 
						|
 | 
						|
func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error {
 | 
						|
	peer := r.getPeer(peerId)
 | 
						|
	if peer == nil {
 | 
						|
		return fmt.Errorf("peer not found %v", peerId)
 | 
						|
	}
 | 
						|
 | 
						|
	msg := &UnsubscribeMsg{
 | 
						|
		Stream: s,
 | 
						|
	}
 | 
						|
	log.Debug("Unsubscribe ", "peer", peerId, "stream", s)
 | 
						|
 | 
						|
	if err := peer.Send(context.TODO(), msg); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return peer.removeClient(s)
 | 
						|
}
 | 
						|
 | 
						|
// Quit sends the QuitMsg to the peer to remove the
 | 
						|
// stream peer client and terminate the streaming.
 | 
						|
func (r *Registry) Quit(peerId enode.ID, s Stream) error {
 | 
						|
	peer := r.getPeer(peerId)
 | 
						|
	if peer == nil {
 | 
						|
		log.Debug("stream quit: peer not found", "peer", peerId, "stream", s)
 | 
						|
		// if the peer is not found, abort the request
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	msg := &QuitMsg{
 | 
						|
		Stream: s,
 | 
						|
	}
 | 
						|
	log.Debug("Quit ", "peer", peerId, "stream", s)
 | 
						|
 | 
						|
	return peer.Send(context.TODO(), msg)
 | 
						|
}
 | 
						|
 | 
						|
func (r *Registry) Close() error {
 | 
						|
	// Stop sending neighborhood depth change and address count
 | 
						|
	// change from Kademlia that were initiated in NewRegistry constructor.
 | 
						|
	r.delivery.Close()
 | 
						|
	close(r.quit)
 | 
						|
	return r.intervalsStore.Close()
 | 
						|
}
 | 
						|
 | 
						|
func (r *Registry) getPeer(peerId enode.ID) *Peer {
 | 
						|
	r.peersMu.RLock()
 | 
						|
	defer r.peersMu.RUnlock()
 | 
						|
 | 
						|
	return r.peers[peerId]
 | 
						|
}
 | 
						|
 | 
						|
func (r *Registry) setPeer(peer *Peer) {
 | 
						|
	r.peersMu.Lock()
 | 
						|
	r.peers[peer.ID()] = peer
 | 
						|
	metrics.GetOrRegisterCounter("registry.setpeer", nil).Inc(1)
 | 
						|
	metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers)))
 | 
						|
	r.peersMu.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
func (r *Registry) deletePeer(peer *Peer) {
 | 
						|
	r.peersMu.Lock()
 | 
						|
	delete(r.peers, peer.ID())
 | 
						|
	metrics.GetOrRegisterCounter("registry.deletepeer", nil).Inc(1)
 | 
						|
	metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers)))
 | 
						|
	r.peersMu.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
func (r *Registry) peersCount() (c int) {
 | 
						|
	r.peersMu.Lock()
 | 
						|
	c = len(r.peers)
 | 
						|
	r.peersMu.Unlock()
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// Run protocol run function
 | 
						|
func (r *Registry) Run(p *network.BzzPeer) error {
 | 
						|
	sp := NewPeer(p, r)
 | 
						|
	r.setPeer(sp)
 | 
						|
 | 
						|
	if r.syncMode == SyncingAutoSubscribe {
 | 
						|
		go sp.runUpdateSyncing()
 | 
						|
	}
 | 
						|
 | 
						|
	defer r.deletePeer(sp)
 | 
						|
	defer close(sp.quit)
 | 
						|
	defer sp.close()
 | 
						|
 | 
						|
	return sp.Run(sp.HandleMsg)
 | 
						|
}
 | 
						|
 | 
						|
// doRequestSubscription sends the actual RequestSubscription to the peer
 | 
						|
func doRequestSubscription(r *Registry, id enode.ID, bin uint8) error {
 | 
						|
	log.Debug("Requesting subscription by registry:", "registry", r.addr, "peer", id, "bin", bin)
 | 
						|
	// bin is always less then 256 and it is safe to convert it to type uint8
 | 
						|
	stream := NewStream("SYNC", FormatSyncBinKey(bin), true)
 | 
						|
	err := r.RequestSubscription(id, stream, NewRange(0, 0), High)
 | 
						|
	if err != nil {
 | 
						|
		log.Debug("Request subscription", "err", err, "peer", id, "stream", stream)
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
 | 
						|
	peer := protocols.NewPeer(p, rw, r.spec)
 | 
						|
	bp := network.NewBzzPeer(peer)
 | 
						|
	np := network.NewPeer(bp, r.delivery.kad)
 | 
						|
	r.delivery.kad.On(np)
 | 
						|
	defer r.delivery.kad.Off(np)
 | 
						|
	return r.Run(bp)
 | 
						|
}
 | 
						|
 | 
						|
// HandleMsg is the message handler that delegates incoming messages
 | 
						|
func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
 | 
						|
	select {
 | 
						|
	case <-p.streamer.quit:
 | 
						|
		log.Trace("message received after the streamer is closed", "peer", p.ID())
 | 
						|
		// return without an error since streamer is closed and
 | 
						|
		// no messages should be handled as other subcomponents like
 | 
						|
		// storage leveldb may be closed
 | 
						|
		return nil
 | 
						|
	default:
 | 
						|
	}
 | 
						|
 | 
						|
	switch msg := msg.(type) {
 | 
						|
 | 
						|
	case *SubscribeMsg:
 | 
						|
		return p.handleSubscribeMsg(ctx, msg)
 | 
						|
 | 
						|
	case *SubscribeErrorMsg:
 | 
						|
		return p.handleSubscribeErrorMsg(msg)
 | 
						|
 | 
						|
	case *UnsubscribeMsg:
 | 
						|
		return p.handleUnsubscribeMsg(msg)
 | 
						|
 | 
						|
	case *OfferedHashesMsg:
 | 
						|
		go func() {
 | 
						|
			err := p.handleOfferedHashesMsg(ctx, msg)
 | 
						|
			if err != nil {
 | 
						|
				log.Error(err.Error())
 | 
						|
				p.Drop()
 | 
						|
			}
 | 
						|
		}()
 | 
						|
		return nil
 | 
						|
 | 
						|
	case *TakeoverProofMsg:
 | 
						|
		go func() {
 | 
						|
			err := p.handleTakeoverProofMsg(ctx, msg)
 | 
						|
			if err != nil {
 | 
						|
				log.Error(err.Error())
 | 
						|
				p.Drop()
 | 
						|
			}
 | 
						|
		}()
 | 
						|
		return nil
 | 
						|
 | 
						|
	case *WantedHashesMsg:
 | 
						|
		go func() {
 | 
						|
			err := p.handleWantedHashesMsg(ctx, msg)
 | 
						|
			if err != nil {
 | 
						|
				log.Error(err.Error())
 | 
						|
				p.Drop()
 | 
						|
			}
 | 
						|
		}()
 | 
						|
		return nil
 | 
						|
 | 
						|
	case *ChunkDeliveryMsgRetrieval:
 | 
						|
		// handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
 | 
						|
		go func() {
 | 
						|
			err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
 | 
						|
			if err != nil {
 | 
						|
				log.Error(err.Error())
 | 
						|
				p.Drop()
 | 
						|
			}
 | 
						|
		}()
 | 
						|
		return nil
 | 
						|
 | 
						|
	case *ChunkDeliveryMsgSyncing:
 | 
						|
		// handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
 | 
						|
		go func() {
 | 
						|
			err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
 | 
						|
			if err != nil {
 | 
						|
				log.Error(err.Error())
 | 
						|
				p.Drop()
 | 
						|
			}
 | 
						|
		}()
 | 
						|
		return nil
 | 
						|
 | 
						|
	case *RetrieveRequestMsg:
 | 
						|
		go func() {
 | 
						|
			err := p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg)
 | 
						|
			if err != nil {
 | 
						|
				log.Error(err.Error())
 | 
						|
				p.Drop()
 | 
						|
			}
 | 
						|
		}()
 | 
						|
		return nil
 | 
						|
 | 
						|
	case *RequestSubscriptionMsg:
 | 
						|
		return p.handleRequestSubscription(ctx, msg)
 | 
						|
 | 
						|
	case *QuitMsg:
 | 
						|
		return p.handleQuitMsg(msg)
 | 
						|
 | 
						|
	default:
 | 
						|
		return fmt.Errorf("unknown message type: %T", msg)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
type server struct {
 | 
						|
	Server
 | 
						|
	stream       Stream
 | 
						|
	priority     uint8
 | 
						|
	currentBatch []byte
 | 
						|
	sessionIndex uint64
 | 
						|
}
 | 
						|
 | 
						|
// setNextBatch adjusts passed interval based on session index and whether
 | 
						|
// stream is live or history. It calls Server SetNextBatch with adjusted
 | 
						|
// interval and returns batch hashes and their interval.
 | 
						|
func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
 | 
						|
	if s.stream.Live {
 | 
						|
		if from == 0 {
 | 
						|
			from = s.sessionIndex
 | 
						|
		}
 | 
						|
		if to <= from || from >= s.sessionIndex {
 | 
						|
			to = math.MaxUint64
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		if (to < from && to != 0) || from > s.sessionIndex {
 | 
						|
			return nil, 0, 0, nil, nil
 | 
						|
		}
 | 
						|
		if to == 0 || to > s.sessionIndex {
 | 
						|
			to = s.sessionIndex
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return s.SetNextBatch(from, to)
 | 
						|
}
 | 
						|
 | 
						|
// Server interface for outgoing peer Streamer
 | 
						|
type Server interface {
 | 
						|
	// SessionIndex is called when a server is initialized
 | 
						|
	// to get the current cursor state of the stream data.
 | 
						|
	// Based on this index, live and history stream intervals
 | 
						|
	// will be adjusted before calling SetNextBatch.
 | 
						|
	SessionIndex() (uint64, error)
 | 
						|
	SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
 | 
						|
	GetData(context.Context, []byte) ([]byte, error)
 | 
						|
	Close()
 | 
						|
}
 | 
						|
 | 
						|
type client struct {
 | 
						|
	Client
 | 
						|
	stream    Stream
 | 
						|
	priority  uint8
 | 
						|
	sessionAt uint64
 | 
						|
	to        uint64
 | 
						|
	next      chan error
 | 
						|
	quit      chan struct{}
 | 
						|
 | 
						|
	intervalsKey   string
 | 
						|
	intervalsStore state.Store
 | 
						|
}
 | 
						|
 | 
						|
func peerStreamIntervalsKey(p *Peer, s Stream) string {
 | 
						|
	return p.ID().String() + s.String()
 | 
						|
}
 | 
						|
 | 
						|
func (c *client) AddInterval(start, end uint64) (err error) {
 | 
						|
	i := &intervals.Intervals{}
 | 
						|
	if err = c.intervalsStore.Get(c.intervalsKey, i); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	i.Add(start, end)
 | 
						|
	return c.intervalsStore.Put(c.intervalsKey, i)
 | 
						|
}
 | 
						|
 | 
						|
func (c *client) NextInterval() (start, end uint64, err error) {
 | 
						|
	i := &intervals.Intervals{}
 | 
						|
	err = c.intervalsStore.Get(c.intervalsKey, i)
 | 
						|
	if err != nil {
 | 
						|
		return 0, 0, err
 | 
						|
	}
 | 
						|
	start, end = i.Next()
 | 
						|
	return start, end, nil
 | 
						|
}
 | 
						|
 | 
						|
// Client interface for incoming peer Streamer
 | 
						|
type Client interface {
 | 
						|
	NeedData(context.Context, []byte) func(context.Context) error
 | 
						|
	BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
 | 
						|
	Close()
 | 
						|
}
 | 
						|
 | 
						|
func (c *client) nextBatch(from uint64) (nextFrom uint64, nextTo uint64) {
 | 
						|
	if c.to > 0 && from >= c.to {
 | 
						|
		return 0, 0
 | 
						|
	}
 | 
						|
	if c.stream.Live {
 | 
						|
		return from, 0
 | 
						|
	} else if from >= c.sessionAt {
 | 
						|
		if c.to > 0 {
 | 
						|
			return from, c.to
 | 
						|
		}
 | 
						|
		return from, math.MaxUint64
 | 
						|
	}
 | 
						|
	nextFrom, nextTo, err := c.NextInterval()
 | 
						|
	if err != nil {
 | 
						|
		log.Error("next intervals", "stream", c.stream)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if nextTo > c.to {
 | 
						|
		nextTo = c.to
 | 
						|
	}
 | 
						|
	if nextTo == 0 {
 | 
						|
		nextTo = c.sessionAt
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error {
 | 
						|
	if tf := c.BatchDone(req.Stream, req.From, hashes, req.Root); tf != nil {
 | 
						|
		tp, err := tf()
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		if err := p.Send(context.TODO(), tp); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		if c.to > 0 && tp.Takeover.End >= c.to {
 | 
						|
			return p.streamer.Unsubscribe(p.Peer.ID(), req.Stream)
 | 
						|
		}
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	return c.AddInterval(req.From, req.To)
 | 
						|
}
 | 
						|
 | 
						|
func (c *client) close() {
 | 
						|
	select {
 | 
						|
	case <-c.quit:
 | 
						|
	default:
 | 
						|
		close(c.quit)
 | 
						|
	}
 | 
						|
	c.Close()
 | 
						|
}
 | 
						|
 | 
						|
// clientParams store parameters for the new client
 | 
						|
// between a subscription and initial offered hashes request handling.
 | 
						|
type clientParams struct {
 | 
						|
	priority uint8
 | 
						|
	to       uint64
 | 
						|
	// signal when the client is created
 | 
						|
	clientCreatedC chan struct{}
 | 
						|
}
 | 
						|
 | 
						|
func newClientParams(priority uint8, to uint64) *clientParams {
 | 
						|
	return &clientParams{
 | 
						|
		priority:       priority,
 | 
						|
		to:             to,
 | 
						|
		clientCreatedC: make(chan struct{}),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (c *clientParams) waitClient(ctx context.Context) error {
 | 
						|
	select {
 | 
						|
	case <-ctx.Done():
 | 
						|
		return ctx.Err()
 | 
						|
	case <-c.clientCreatedC:
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (c *clientParams) clientCreated() {
 | 
						|
	close(c.clientCreatedC)
 | 
						|
}
 | 
						|
 | 
						|
// GetSpec returns the streamer spec to callers
 | 
						|
// This used to be a global variable but for simulations with
 | 
						|
// multiple nodes its fields (notably the Hook) would be overwritten
 | 
						|
func (r *Registry) GetSpec() *protocols.Spec {
 | 
						|
	return r.spec
 | 
						|
}
 | 
						|
 | 
						|
func (r *Registry) createSpec() {
 | 
						|
	// Spec is the spec of the streamer protocol
 | 
						|
	var spec = &protocols.Spec{
 | 
						|
		Name:       "stream",
 | 
						|
		Version:    8,
 | 
						|
		MaxMsgSize: 10 * 1024 * 1024,
 | 
						|
		Messages: []interface{}{
 | 
						|
			UnsubscribeMsg{},
 | 
						|
			OfferedHashesMsg{},
 | 
						|
			WantedHashesMsg{},
 | 
						|
			TakeoverProofMsg{},
 | 
						|
			SubscribeMsg{},
 | 
						|
			RetrieveRequestMsg{},
 | 
						|
			ChunkDeliveryMsgRetrieval{},
 | 
						|
			SubscribeErrorMsg{},
 | 
						|
			RequestSubscriptionMsg{},
 | 
						|
			QuitMsg{},
 | 
						|
			ChunkDeliveryMsgSyncing{},
 | 
						|
		},
 | 
						|
	}
 | 
						|
	r.spec = spec
 | 
						|
}
 | 
						|
 | 
						|
// An accountable message needs some meta information attached to it
 | 
						|
// in order to evaluate the correct price
 | 
						|
type StreamerPrices struct {
 | 
						|
	priceMatrix map[reflect.Type]*protocols.Price
 | 
						|
	registry    *Registry
 | 
						|
}
 | 
						|
 | 
						|
// Price implements the accounting interface and returns the price for a specific message
 | 
						|
func (sp *StreamerPrices) Price(msg interface{}) *protocols.Price {
 | 
						|
	t := reflect.TypeOf(msg).Elem()
 | 
						|
	return sp.priceMatrix[t]
 | 
						|
}
 | 
						|
 | 
						|
// Instead of hardcoding the price, get it
 | 
						|
// through a function - it could be quite complex in the future
 | 
						|
func (sp *StreamerPrices) getRetrieveRequestMsgPrice() uint64 {
 | 
						|
	return uint64(1)
 | 
						|
}
 | 
						|
 | 
						|
// Instead of hardcoding the price, get it
 | 
						|
// through a function - it could be quite complex in the future
 | 
						|
func (sp *StreamerPrices) getChunkDeliveryMsgRetrievalPrice() uint64 {
 | 
						|
	return uint64(1)
 | 
						|
}
 | 
						|
 | 
						|
// createPriceOracle sets up a matrix which can be queried to get
 | 
						|
// the price for a message via the Price method
 | 
						|
func (r *Registry) createPriceOracle() {
 | 
						|
	sp := &StreamerPrices{
 | 
						|
		registry: r,
 | 
						|
	}
 | 
						|
	sp.priceMatrix = map[reflect.Type]*protocols.Price{
 | 
						|
		reflect.TypeOf(ChunkDeliveryMsgRetrieval{}): {
 | 
						|
			Value:   sp.getChunkDeliveryMsgRetrievalPrice(), // arbitrary price for now
 | 
						|
			PerByte: true,
 | 
						|
			Payer:   protocols.Receiver,
 | 
						|
		},
 | 
						|
		reflect.TypeOf(RetrieveRequestMsg{}): {
 | 
						|
			Value:   sp.getRetrieveRequestMsgPrice(), // arbitrary price for now
 | 
						|
			PerByte: false,
 | 
						|
			Payer:   protocols.Sender,
 | 
						|
		},
 | 
						|
	}
 | 
						|
	r.prices = sp
 | 
						|
}
 | 
						|
 | 
						|
func (r *Registry) Protocols() []p2p.Protocol {
 | 
						|
	return []p2p.Protocol{
 | 
						|
		{
 | 
						|
			Name:    r.spec.Name,
 | 
						|
			Version: r.spec.Version,
 | 
						|
			Length:  r.spec.Length(),
 | 
						|
			Run:     r.runProtocol,
 | 
						|
		},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (r *Registry) APIs() []rpc.API {
 | 
						|
	return []rpc.API{
 | 
						|
		{
 | 
						|
			Namespace: "stream",
 | 
						|
			Version:   "3.0",
 | 
						|
			Service:   r.api,
 | 
						|
			Public:    false,
 | 
						|
		},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (r *Registry) Start(server *p2p.Server) error {
 | 
						|
	log.Info("Streamer started")
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (r *Registry) Stop() error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
type Range struct {
 | 
						|
	From, To uint64
 | 
						|
}
 | 
						|
 | 
						|
func NewRange(from, to uint64) *Range {
 | 
						|
	return &Range{
 | 
						|
		From: from,
 | 
						|
		To:   to,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (r *Range) String() string {
 | 
						|
	return fmt.Sprintf("%v-%v", r.From, r.To)
 | 
						|
}
 | 
						|
 | 
						|
func getHistoryPriority(priority uint8) uint8 {
 | 
						|
	if priority == 0 {
 | 
						|
		return 0
 | 
						|
	}
 | 
						|
	return priority - 1
 | 
						|
}
 | 
						|
 | 
						|
func getHistoryStream(s Stream) Stream {
 | 
						|
	return NewStream(s.Name, s.Key, false)
 | 
						|
}
 | 
						|
 | 
						|
type API struct {
 | 
						|
	streamer *Registry
 | 
						|
}
 | 
						|
 | 
						|
func NewAPI(r *Registry) *API {
 | 
						|
	return &API{
 | 
						|
		streamer: r,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (api *API) SubscribeStream(peerId enode.ID, s Stream, history *Range, priority uint8) error {
 | 
						|
	return api.streamer.Subscribe(peerId, s, history, priority)
 | 
						|
}
 | 
						|
 | 
						|
func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error {
 | 
						|
	return api.streamer.Unsubscribe(peerId, s)
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
GetPeerServerSubscriptions is a API function which allows to query a peer for stream subscriptions it has.
 | 
						|
It can be called via RPC.
 | 
						|
It returns a map of node IDs with an array of string representations of Stream objects.
 | 
						|
*/
 | 
						|
func (api *API) GetPeerServerSubscriptions() map[string][]string {
 | 
						|
	pstreams := make(map[string][]string)
 | 
						|
 | 
						|
	api.streamer.peersMu.RLock()
 | 
						|
	defer api.streamer.peersMu.RUnlock()
 | 
						|
 | 
						|
	for id, p := range api.streamer.peers {
 | 
						|
		var streams []string
 | 
						|
		//every peer has a map of stream servers
 | 
						|
		//every stream server represents a subscription
 | 
						|
		p.serverMu.RLock()
 | 
						|
		for s := range p.servers {
 | 
						|
			//append the string representation of the stream
 | 
						|
			//to the list for this peer
 | 
						|
			streams = append(streams, s.String())
 | 
						|
		}
 | 
						|
		p.serverMu.RUnlock()
 | 
						|
		//set the array of stream servers to the map
 | 
						|
		pstreams[id.String()] = streams
 | 
						|
	}
 | 
						|
	return pstreams
 | 
						|
}
 |