| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | // Copyright 2018 The go-ethereum Authors | 
					
						
							|  |  |  | // This file is part of the go-ethereum library. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  | // it under the terms of the GNU Lesser General Public License as published by | 
					
						
							|  |  |  | // the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  | // (at your option) any later version. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is distributed in the hope that it will be useful, | 
					
						
							|  |  |  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | 
					
						
							|  |  |  | // GNU Lesser General Public License for more details. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // You should have received a copy of the GNU Lesser General Public License | 
					
						
							|  |  |  | // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package stream | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							| 
									
										
										
										
											2018-09-24 17:40:22 +02:00
										 |  |  | 	"errors" | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"sync" | 
					
						
							|  |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/metrics" | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/p2p/protocols" | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/swarm/log" | 
					
						
							|  |  |  | 	pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue" | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/swarm/network/stream/intervals" | 
					
						
							| 
									
										
										
										
											2018-07-13 17:40:28 +02:00
										 |  |  | 	"github.com/ethereum/go-ethereum/swarm/spancontext" | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	"github.com/ethereum/go-ethereum/swarm/state" | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/swarm/storage" | 
					
						
							| 
									
										
										
										
											2019-02-20 14:50:37 +01:00
										 |  |  | 	"github.com/ethereum/go-ethereum/swarm/tracing" | 
					
						
							| 
									
										
										
										
											2018-07-13 17:40:28 +02:00
										 |  |  | 	opentracing "github.com/opentracing/opentracing-go" | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type notFoundError struct { | 
					
						
							|  |  |  | 	t string | 
					
						
							|  |  |  | 	s Stream | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func newNotFoundError(t string, s Stream) *notFoundError { | 
					
						
							|  |  |  | 	return ¬FoundError{t: t, s: s} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (e *notFoundError) Error() string { | 
					
						
							|  |  |  | 	return fmt.Sprintf("%s not found for stream %q", e.t, e.s) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-24 17:40:22 +02:00
										 |  |  | // ErrMaxPeerServers will be returned if peer server limit is reached. | 
					
						
							|  |  |  | // It will be sent in the SubscribeErrorMsg. | 
					
						
							|  |  |  | var ErrMaxPeerServers = errors.New("max peer servers") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | // Peer is the Peer extension for the streaming protocol | 
					
						
							|  |  |  | type Peer struct { | 
					
						
							|  |  |  | 	*protocols.Peer | 
					
						
							|  |  |  | 	streamer *Registry | 
					
						
							|  |  |  | 	pq       *pq.PriorityQueue | 
					
						
							|  |  |  | 	serverMu sync.RWMutex | 
					
						
							|  |  |  | 	clientMu sync.RWMutex // protects both clients and clientParams | 
					
						
							|  |  |  | 	servers  map[Stream]*server | 
					
						
							|  |  |  | 	clients  map[Stream]*client | 
					
						
							|  |  |  | 	// clientParams map keeps required client arguments | 
					
						
							|  |  |  | 	// that are set on Registry.Subscribe and used | 
					
						
							|  |  |  | 	// on creating a new client in offered hashes handler. | 
					
						
							|  |  |  | 	clientParams map[Stream]*clientParams | 
					
						
							|  |  |  | 	quit         chan struct{} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-07-13 17:40:28 +02:00
										 |  |  | type WrappedPriorityMsg struct { | 
					
						
							|  |  |  | 	Context context.Context | 
					
						
							|  |  |  | 	Msg     interface{} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | // NewPeer is the constructor for Peer | 
					
						
							|  |  |  | func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { | 
					
						
							|  |  |  | 	p := &Peer{ | 
					
						
							|  |  |  | 		Peer:         peer, | 
					
						
							|  |  |  | 		pq:           pq.New(int(PriorityQueue), PriorityQueueCap), | 
					
						
							|  |  |  | 		streamer:     streamer, | 
					
						
							|  |  |  | 		servers:      make(map[Stream]*server), | 
					
						
							|  |  |  | 		clients:      make(map[Stream]*client), | 
					
						
							|  |  |  | 		clientParams: make(map[Stream]*clientParams), | 
					
						
							|  |  |  | 		quit:         make(chan struct{}), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	ctx, cancel := context.WithCancel(context.Background()) | 
					
						
							| 
									
										
										
										
											2018-07-13 17:40:28 +02:00
										 |  |  | 	go p.pq.Run(ctx, func(i interface{}) { | 
					
						
							|  |  |  | 		wmsg := i.(WrappedPriorityMsg) | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 		err := p.Send(wmsg.Context, wmsg.Msg) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err) | 
					
						
							|  |  |  | 			p.Drop(err) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2018-07-13 17:40:28 +02:00
										 |  |  | 	}) | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// basic monitoring for pq contention | 
					
						
							|  |  |  | 	go func(pq *pq.PriorityQueue) { | 
					
						
							|  |  |  | 		ticker := time.NewTicker(5 * time.Second) | 
					
						
							|  |  |  | 		defer ticker.Stop() | 
					
						
							|  |  |  | 		for { | 
					
						
							|  |  |  | 			select { | 
					
						
							|  |  |  | 			case <-ticker.C: | 
					
						
							| 
									
										
										
										
											2019-02-24 03:39:23 -08:00
										 |  |  | 				var lenMaxi int | 
					
						
							|  |  |  | 				var capMaxi int | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 				for k := range pq.Queues { | 
					
						
							| 
									
										
										
										
											2019-02-24 03:39:23 -08:00
										 |  |  | 					if lenMaxi < len(pq.Queues[k]) { | 
					
						
							|  |  |  | 						lenMaxi = len(pq.Queues[k]) | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 					} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-24 03:39:23 -08:00
										 |  |  | 					if capMaxi < cap(pq.Queues[k]) { | 
					
						
							|  |  |  | 						capMaxi = cap(pq.Queues[k]) | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 					} | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-24 03:39:23 -08:00
										 |  |  | 				metrics.GetOrRegisterGauge(fmt.Sprintf("pq_len_%s", p.ID().TerminalString()), nil).Update(int64(lenMaxi)) | 
					
						
							|  |  |  | 				metrics.GetOrRegisterGauge(fmt.Sprintf("pq_cap_%s", p.ID().TerminalString()), nil).Update(int64(capMaxi)) | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 			case <-p.quit: | 
					
						
							|  |  |  | 				return | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	}(p.pq) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	go func() { | 
					
						
							|  |  |  | 		<-p.quit | 
					
						
							| 
									
										
										
										
											2019-02-20 14:50:37 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		cancel() | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 	return p | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Deliver sends a storeRequestMsg protocol message to the peer | 
					
						
							| 
									
										
										
										
											2018-10-21 02:30:41 -05:00
										 |  |  | // Depending on the `syncing` parameter we send different message types | 
					
						
							|  |  |  | func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error { | 
					
						
							|  |  |  | 	var msg interface{} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	spanName := "send.chunk.delivery" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	//we send different types of messages if delivery is for syncing or retrievals, | 
					
						
							|  |  |  | 	//even if handling and content of the message are the same, | 
					
						
							|  |  |  | 	//because swap accounting decides which messages need accounting based on the message type | 
					
						
							|  |  |  | 	if syncing { | 
					
						
							|  |  |  | 		msg = &ChunkDeliveryMsgSyncing{ | 
					
						
							|  |  |  | 			Addr:  chunk.Address(), | 
					
						
							|  |  |  | 			SData: chunk.Data(), | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		spanName += ".syncing" | 
					
						
							|  |  |  | 	} else { | 
					
						
							|  |  |  | 		msg = &ChunkDeliveryMsgRetrieval{ | 
					
						
							|  |  |  | 			Addr:  chunk.Address(), | 
					
						
							|  |  |  | 			SData: chunk.Data(), | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		spanName += ".retrieval" | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-07-13 17:40:28 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-20 14:50:37 +01:00
										 |  |  | 	ctx = context.WithValue(ctx, "stream_send_tag", nil) | 
					
						
							|  |  |  | 	return p.SendPriority(ctx, msg, priority) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // SendPriority sends message to the peer using the outgoing priority queue | 
					
						
							| 
									
										
										
										
											2019-02-20 14:50:37 +01:00
										 |  |  | func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now()) | 
					
						
							| 
									
										
										
										
											2019-03-08 08:52:25 +01:00
										 |  |  | 	ctx = tracing.StartSaveSpan(ctx) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1) | 
					
						
							| 
									
										
										
										
											2018-07-13 17:40:28 +02:00
										 |  |  | 	wmsg := WrappedPriorityMsg{ | 
					
						
							|  |  |  | 		Context: ctx, | 
					
						
							|  |  |  | 		Msg:     msg, | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	err := p.pq.Push(wmsg, int(priority)) | 
					
						
							|  |  |  | 	if err == pq.ErrContention { | 
					
						
							|  |  |  | 		log.Warn("dropping peer on priority queue contention", "peer", p.ID()) | 
					
						
							|  |  |  | 		p.Drop(err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return err | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // SendOfferedHashes sends OfferedHashesMsg protocol msg | 
					
						
							|  |  |  | func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { | 
					
						
							| 
									
										
										
										
											2018-07-13 17:40:28 +02:00
										 |  |  | 	var sp opentracing.Span | 
					
						
							|  |  |  | 	ctx, sp := spancontext.StartSpan( | 
					
						
							|  |  |  | 		context.TODO(), | 
					
						
							| 
									
										
										
										
											2019-02-20 14:50:37 +01:00
										 |  |  | 		"send.offered.hashes", | 
					
						
							|  |  |  | 	) | 
					
						
							| 
									
										
										
										
											2018-07-13 17:40:28 +02:00
										 |  |  | 	defer sp.Finish() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-10-12 16:26:16 +02:00
										 |  |  | 	hashes, from, to, proof, err := s.setNextBatch(f, t) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-09-20 01:10:40 +09:00
										 |  |  | 	// true only when quitting | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	if len(hashes) == 0 { | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if proof == nil { | 
					
						
							|  |  |  | 		proof = &HandoverProof{ | 
					
						
							|  |  |  | 			Handover: &Handover{}, | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	s.currentBatch = hashes | 
					
						
							|  |  |  | 	msg := &OfferedHashesMsg{ | 
					
						
							|  |  |  | 		HandoverProof: proof, | 
					
						
							|  |  |  | 		Hashes:        hashes, | 
					
						
							|  |  |  | 		From:          from, | 
					
						
							|  |  |  | 		To:            to, | 
					
						
							|  |  |  | 		Stream:        s.stream, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to) | 
					
						
							| 
									
										
										
										
											2019-02-20 14:50:37 +01:00
										 |  |  | 	ctx = context.WithValue(ctx, "stream_send_tag", "send.offered.hashes") | 
					
						
							|  |  |  | 	return p.SendPriority(ctx, msg, s.priority) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (p *Peer) getServer(s Stream) (*server, error) { | 
					
						
							|  |  |  | 	p.serverMu.RLock() | 
					
						
							|  |  |  | 	defer p.serverMu.RUnlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	server := p.servers[s] | 
					
						
							|  |  |  | 	if server == nil { | 
					
						
							|  |  |  | 		return nil, newNotFoundError("server", s) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return server, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (p *Peer) setServer(s Stream, o Server, priority uint8) (*server, error) { | 
					
						
							|  |  |  | 	p.serverMu.Lock() | 
					
						
							|  |  |  | 	defer p.serverMu.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if p.servers[s] != nil { | 
					
						
							|  |  |  | 		return nil, fmt.Errorf("server %s already registered", s) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-09-24 17:40:22 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	if p.streamer.maxPeerServers > 0 && len(p.servers) >= p.streamer.maxPeerServers { | 
					
						
							|  |  |  | 		return nil, ErrMaxPeerServers | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-10-12 16:26:16 +02:00
										 |  |  | 	sessionIndex, err := o.SessionIndex() | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	os := &server{ | 
					
						
							| 
									
										
										
										
											2018-10-12 16:26:16 +02:00
										 |  |  | 		Server:       o, | 
					
						
							|  |  |  | 		stream:       s, | 
					
						
							|  |  |  | 		priority:     priority, | 
					
						
							|  |  |  | 		sessionIndex: sessionIndex, | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	p.servers[s] = os | 
					
						
							|  |  |  | 	return os, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (p *Peer) removeServer(s Stream) error { | 
					
						
							|  |  |  | 	p.serverMu.Lock() | 
					
						
							|  |  |  | 	defer p.serverMu.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	server, ok := p.servers[s] | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							|  |  |  | 		return newNotFoundError("server", s) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	server.Close() | 
					
						
							|  |  |  | 	delete(p.servers, s) | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (p *Peer) getClient(ctx context.Context, s Stream) (c *client, err error) { | 
					
						
							|  |  |  | 	var params *clientParams | 
					
						
							|  |  |  | 	func() { | 
					
						
							|  |  |  | 		p.clientMu.RLock() | 
					
						
							|  |  |  | 		defer p.clientMu.RUnlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		c = p.clients[s] | 
					
						
							|  |  |  | 		if c != nil { | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		params = p.clientParams[s] | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 	if c != nil { | 
					
						
							|  |  |  | 		return c, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if params != nil { | 
					
						
							|  |  |  | 		//debug.PrintStack() | 
					
						
							|  |  |  | 		if err := params.waitClient(ctx); err != nil { | 
					
						
							|  |  |  | 			return nil, err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	p.clientMu.RLock() | 
					
						
							|  |  |  | 	defer p.clientMu.RUnlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	c = p.clients[s] | 
					
						
							|  |  |  | 	if c != nil { | 
					
						
							|  |  |  | 		return c, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return nil, newNotFoundError("client", s) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (p *Peer) getOrSetClient(s Stream, from, to uint64) (c *client, created bool, err error) { | 
					
						
							|  |  |  | 	p.clientMu.Lock() | 
					
						
							|  |  |  | 	defer p.clientMu.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	c = p.clients[s] | 
					
						
							|  |  |  | 	if c != nil { | 
					
						
							|  |  |  | 		return c, false, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	f, err := p.streamer.GetClientFunc(s.Name) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, false, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	is, err := f(p, s.Key, s.Live) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, false, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	cp, err := p.getClientParams(s) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, false, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	defer func() { | 
					
						
							|  |  |  | 		if err == nil { | 
					
						
							|  |  |  | 			if err := p.removeClientParams(s); err != nil { | 
					
						
							|  |  |  | 				log.Error("stream set client: remove client params", "stream", s, "peer", p, "err", err) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	intervalsKey := peerStreamIntervalsKey(p, s) | 
					
						
							|  |  |  | 	if s.Live { | 
					
						
							|  |  |  | 		// try to find previous history and live intervals and merge live into history | 
					
						
							|  |  |  | 		historyKey := peerStreamIntervalsKey(p, NewStream(s.Name, s.Key, false)) | 
					
						
							|  |  |  | 		historyIntervals := &intervals.Intervals{} | 
					
						
							|  |  |  | 		err := p.streamer.intervalsStore.Get(historyKey, historyIntervals) | 
					
						
							|  |  |  | 		switch err { | 
					
						
							|  |  |  | 		case nil: | 
					
						
							|  |  |  | 			liveIntervals := &intervals.Intervals{} | 
					
						
							|  |  |  | 			err := p.streamer.intervalsStore.Get(intervalsKey, liveIntervals) | 
					
						
							|  |  |  | 			switch err { | 
					
						
							|  |  |  | 			case nil: | 
					
						
							|  |  |  | 				historyIntervals.Merge(liveIntervals) | 
					
						
							|  |  |  | 				if err := p.streamer.intervalsStore.Put(historyKey, historyIntervals); err != nil { | 
					
						
							|  |  |  | 					log.Error("stream set client: put history intervals", "stream", s, "peer", p, "err", err) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			case state.ErrNotFound: | 
					
						
							|  |  |  | 			default: | 
					
						
							|  |  |  | 				log.Error("stream set client: get live intervals", "stream", s, "peer", p, "err", err) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		case state.ErrNotFound: | 
					
						
							|  |  |  | 		default: | 
					
						
							|  |  |  | 			log.Error("stream set client: get history intervals", "stream", s, "peer", p, "err", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if err := p.streamer.intervalsStore.Put(intervalsKey, intervals.NewIntervals(from)); err != nil { | 
					
						
							|  |  |  | 		return nil, false, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	next := make(chan error, 1) | 
					
						
							|  |  |  | 	c = &client{ | 
					
						
							|  |  |  | 		Client:         is, | 
					
						
							|  |  |  | 		stream:         s, | 
					
						
							|  |  |  | 		priority:       cp.priority, | 
					
						
							|  |  |  | 		to:             cp.to, | 
					
						
							|  |  |  | 		next:           next, | 
					
						
							|  |  |  | 		quit:           make(chan struct{}), | 
					
						
							|  |  |  | 		intervalsStore: p.streamer.intervalsStore, | 
					
						
							|  |  |  | 		intervalsKey:   intervalsKey, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	p.clients[s] = c | 
					
						
							|  |  |  | 	cp.clientCreated() // unblock all possible getClient calls that are waiting | 
					
						
							|  |  |  | 	next <- nil        // this is to allow wantedKeysMsg before first batch arrives | 
					
						
							|  |  |  | 	return c, true, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (p *Peer) removeClient(s Stream) error { | 
					
						
							|  |  |  | 	p.clientMu.Lock() | 
					
						
							|  |  |  | 	defer p.clientMu.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	client, ok := p.clients[s] | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							|  |  |  | 		return newNotFoundError("client", s) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	client.close() | 
					
						
							| 
									
										
										
										
											2018-09-24 17:40:22 +02:00
										 |  |  | 	delete(p.clients, s) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (p *Peer) setClientParams(s Stream, params *clientParams) error { | 
					
						
							|  |  |  | 	p.clientMu.Lock() | 
					
						
							|  |  |  | 	defer p.clientMu.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if p.clients[s] != nil { | 
					
						
							|  |  |  | 		return fmt.Errorf("client %s already exists", s) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if p.clientParams[s] != nil { | 
					
						
							|  |  |  | 		return fmt.Errorf("client params %s already set", s) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	p.clientParams[s] = params | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (p *Peer) getClientParams(s Stream) (*clientParams, error) { | 
					
						
							|  |  |  | 	params := p.clientParams[s] | 
					
						
							|  |  |  | 	if params == nil { | 
					
						
							|  |  |  | 		return nil, fmt.Errorf("client params '%v' not provided to peer %v", s, p.ID()) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return params, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (p *Peer) removeClientParams(s Stream) error { | 
					
						
							|  |  |  | 	_, ok := p.clientParams[s] | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							|  |  |  | 		return newNotFoundError("client params", s) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	delete(p.clientParams, s) | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (p *Peer) close() { | 
					
						
							|  |  |  | 	for _, s := range p.servers { | 
					
						
							|  |  |  | 		s.Close() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } |