| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | // Copyright 2018 The go-ethereum Authors | 
					
						
							|  |  |  | // This file is part of the go-ethereum library. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  | // it under the terms of the GNU Lesser General Public License as published by | 
					
						
							|  |  |  | // the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  | // (at your option) any later version. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is distributed in the hope that it will be useful, | 
					
						
							|  |  |  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | 
					
						
							|  |  |  | // GNU Lesser General Public License for more details. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // You should have received a copy of the GNU Lesser General Public License | 
					
						
							|  |  |  | // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package stream | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							| 
									
										
										
										
											2018-07-13 17:40:28 +02:00
										 |  |  | 	"context" | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	"errors" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	"fmt" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	"github.com/ethereum/go-ethereum/metrics" | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/p2p/discover" | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/swarm/log" | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/swarm/network" | 
					
						
							| 
									
										
										
										
											2018-07-13 17:40:28 +02:00
										 |  |  | 	"github.com/ethereum/go-ethereum/swarm/spancontext" | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	"github.com/ethereum/go-ethereum/swarm/storage" | 
					
						
							| 
									
										
										
										
											2018-07-13 17:40:28 +02:00
										 |  |  | 	opentracing "github.com/opentracing/opentracing-go" | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | const ( | 
					
						
							|  |  |  | 	swarmChunkServerStreamName = "RETRIEVE_REQUEST" | 
					
						
							|  |  |  | 	deliveryCap                = 32 | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | var ( | 
					
						
							|  |  |  | 	processReceivedChunksCount    = metrics.NewRegisteredCounter("network.stream.received_chunks.count", nil) | 
					
						
							|  |  |  | 	handleRetrieveRequestMsgCount = metrics.NewRegisteredCounter("network.stream.handle_retrieve_request_msg.count", nil) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	requestFromPeersCount     = metrics.NewRegisteredCounter("network.stream.request_from_peers.count", nil) | 
					
						
							|  |  |  | 	requestFromPeersEachCount = metrics.NewRegisteredCounter("network.stream.request_from_peers_each.count", nil) | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type Delivery struct { | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	chunkStore storage.SyncChunkStore | 
					
						
							|  |  |  | 	kad        *network.Kademlia | 
					
						
							|  |  |  | 	getPeer    func(discover.NodeID) *Peer | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery { | 
					
						
							|  |  |  | 	return &Delivery{ | 
					
						
							|  |  |  | 		chunkStore: chunkStore, | 
					
						
							|  |  |  | 		kad:        kad, | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // SwarmChunkServer implements Server | 
					
						
							|  |  |  | type SwarmChunkServer struct { | 
					
						
							|  |  |  | 	deliveryC  chan []byte | 
					
						
							|  |  |  | 	batchC     chan []byte | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	chunkStore storage.ChunkStore | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	currentLen uint64 | 
					
						
							|  |  |  | 	quit       chan struct{} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // NewSwarmChunkServer is SwarmChunkServer constructor | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	s := &SwarmChunkServer{ | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 		deliveryC:  make(chan []byte, deliveryCap), | 
					
						
							|  |  |  | 		batchC:     make(chan []byte), | 
					
						
							|  |  |  | 		chunkStore: chunkStore, | 
					
						
							|  |  |  | 		quit:       make(chan struct{}), | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	go s.processDeliveries() | 
					
						
							|  |  |  | 	return s | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // processDeliveries handles delivered chunk hashes | 
					
						
							|  |  |  | func (s *SwarmChunkServer) processDeliveries() { | 
					
						
							|  |  |  | 	var hashes []byte | 
					
						
							|  |  |  | 	var batchC chan []byte | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case <-s.quit: | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		case hash := <-s.deliveryC: | 
					
						
							|  |  |  | 			hashes = append(hashes, hash...) | 
					
						
							|  |  |  | 			batchC = s.batchC | 
					
						
							|  |  |  | 		case batchC <- hashes: | 
					
						
							|  |  |  | 			hashes = nil | 
					
						
							|  |  |  | 			batchC = nil | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // SetNextBatch | 
					
						
							|  |  |  | func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) { | 
					
						
							|  |  |  | 	select { | 
					
						
							|  |  |  | 	case hashes = <-s.batchC: | 
					
						
							|  |  |  | 	case <-s.quit: | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	from = s.currentLen | 
					
						
							|  |  |  | 	s.currentLen += uint64(len(hashes)) | 
					
						
							|  |  |  | 	to = s.currentLen | 
					
						
							|  |  |  | 	return | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Close needs to be called on a stream server | 
					
						
							|  |  |  | func (s *SwarmChunkServer) Close() { | 
					
						
							|  |  |  | 	close(s.quit) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // GetData retrives chunk data from db store | 
					
						
							| 
									
										
										
										
											2018-07-13 17:40:28 +02:00
										 |  |  | func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) { | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	chunk, err := s.chunkStore.Get(ctx, storage.Address(key)) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	return chunk.Data(), nil | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // RetrieveRequestMsg is the protocol msg for chunk retrieve requests | 
					
						
							|  |  |  | type RetrieveRequestMsg struct { | 
					
						
							|  |  |  | 	Addr      storage.Address | 
					
						
							|  |  |  | 	SkipCheck bool | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-07-13 17:40:28 +02:00
										 |  |  | func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *RetrieveRequestMsg) error { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	log.Trace("received request", "peer", sp.ID(), "hash", req.Addr) | 
					
						
							|  |  |  | 	handleRetrieveRequestMsgCount.Inc(1) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-07-13 17:40:28 +02:00
										 |  |  | 	var osp opentracing.Span | 
					
						
							|  |  |  | 	ctx, osp = spancontext.StartSpan( | 
					
						
							|  |  |  | 		ctx, | 
					
						
							|  |  |  | 		"retrieve.request") | 
					
						
							|  |  |  | 	defer osp.Finish() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", false)) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	streamer := s.Server.(*SwarmChunkServer) | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	var cancel func() | 
					
						
							|  |  |  | 	// TODO: do something with this hardcoded timeout, maybe use TTL in the future | 
					
						
							|  |  |  | 	ctx, cancel = context.WithTimeout(context.WithValue(ctx, "peer", sp.ID().String()), network.RequestTimeout) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	go func() { | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case <-ctx.Done(): | 
					
						
							|  |  |  | 		case <-streamer.quit: | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 		cancel() | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	go func() { | 
					
						
							|  |  |  | 		chunk, err := d.chunkStore.Get(ctx, req.Addr) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			log.Warn("ChunkStore.Get can not retrieve chunk", "err", err) | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		if req.SkipCheck { | 
					
						
							|  |  |  | 			err = sp.Deliver(ctx, chunk, s.priority) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 			return | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 		select { | 
					
						
							|  |  |  | 		case streamer.deliveryC <- chunk.Address()[:]: | 
					
						
							|  |  |  | 		case <-streamer.quit: | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type ChunkDeliveryMsg struct { | 
					
						
							|  |  |  | 	Addr  storage.Address | 
					
						
							|  |  |  | 	SData []byte // the stored chunk Data (incl size) | 
					
						
							|  |  |  | 	peer  *Peer  // set in handleChunkDeliveryMsg | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | // TODO: Fix context SNAFU | 
					
						
							| 
									
										
										
										
											2018-07-13 17:40:28 +02:00
										 |  |  | func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error { | 
					
						
							|  |  |  | 	var osp opentracing.Span | 
					
						
							|  |  |  | 	ctx, osp = spancontext.StartSpan( | 
					
						
							|  |  |  | 		ctx, | 
					
						
							|  |  |  | 		"chunk.delivery") | 
					
						
							|  |  |  | 	defer osp.Finish() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	processReceivedChunksCount.Inc(1) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	go func() { | 
					
						
							|  |  |  | 		req.peer = sp | 
					
						
							|  |  |  | 		err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData)) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 			if err == storage.ErrChunkInvalid { | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 				// we removed this log because it spams the logs | 
					
						
							|  |  |  | 				// TODO: Enable this log line | 
					
						
							|  |  |  | 				// log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", req.Addr, ) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 				req.peer.Drop(err) | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 	return nil | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // RequestFromPeers sends a chunk retrieve request to | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) { | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 	requestFromPeersCount.Inc(1) | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	var sp *Peer | 
					
						
							|  |  |  | 	spID := req.Source | 
					
						
							| 
									
										
										
										
											2018-07-13 17:40:28 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 	if spID != nil { | 
					
						
							|  |  |  | 		sp = d.getPeer(*spID) | 
					
						
							|  |  |  | 		if sp == nil { | 
					
						
							|  |  |  | 			return nil, nil, fmt.Errorf("source peer %v not found", spID.String()) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} else { | 
					
						
							|  |  |  | 		d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool { | 
					
						
							|  |  |  | 			id := p.ID() | 
					
						
							|  |  |  | 			// TODO: skip light nodes that do not accept retrieve requests | 
					
						
							|  |  |  | 			if req.SkipPeer(id.String()) { | 
					
						
							|  |  |  | 				log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 				return true | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 			sp = d.getPeer(id) | 
					
						
							|  |  |  | 			if sp == nil { | 
					
						
							|  |  |  | 				log.Warn("Delivery.RequestFromPeers: peer not found", "id", id) | 
					
						
							|  |  |  | 				return true | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			spID = &id | 
					
						
							|  |  |  | 			return false | 
					
						
							|  |  |  | 		}) | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		if sp == nil { | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 			return nil, nil, errors.New("no peer found") | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-09-13 11:42:19 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	err := sp.SendPriority(ctx, &RetrieveRequestMsg{ | 
					
						
							|  |  |  | 		Addr:      req.Addr, | 
					
						
							|  |  |  | 		SkipCheck: req.SkipCheck, | 
					
						
							|  |  |  | 	}, Top) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	requestFromPeersEachCount.Inc(1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return spID, sp.quit, nil | 
					
						
							| 
									
										
										
										
											2018-06-20 14:06:27 +02:00
										 |  |  | } |