diff --git a/network/retrieval/peer.go b/network/retrieval/peer.go
new file mode 100644
index 0000000000..3b3b1b4d8b
--- /dev/null
+++ b/network/retrieval/peer.go
@@ -0,0 +1,36 @@
+// Copyright 2019 The Swarm Authors
+// This file is part of the Swarm library.
+//
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+
+package retrieval
+
+import (
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethersphere/swarm/network"
+)
+
+// Peer wraps BzzPeer with a contextual logger for this peer
+type Peer struct {
+ *network.BzzPeer
+ logger log.Logger
+}
+
+// NewPeer is the constructor for Peer
+func NewPeer(peer *network.BzzPeer) *Peer {
+ return &Peer{
+ BzzPeer: peer,
+ logger: log.New("peer", peer.ID()),
+ }
+}
diff --git a/network/retrieval/retrieve.go b/network/retrieval/retrieve.go
new file mode 100644
index 0000000000..79f61f9bd6
--- /dev/null
+++ b/network/retrieval/retrieve.go
@@ -0,0 +1,424 @@
+// Copyright 2019 The Swarm Authors
+// This file is part of the Swarm library.
+//
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+
+package retrieval
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ethersphere/swarm/chunk"
+ "github.com/ethersphere/swarm/log"
+ "github.com/ethersphere/swarm/network"
+ "github.com/ethersphere/swarm/network/timeouts"
+ "github.com/ethersphere/swarm/p2p/protocols"
+ "github.com/ethersphere/swarm/spancontext"
+ "github.com/ethersphere/swarm/storage"
+ opentracing "github.com/opentracing/opentracing-go"
+ olog "github.com/opentracing/opentracing-go/log"
+)
+
+var (
+ // Compile time interface check
+ _ node.Service = &Retrieval{}
+
+ // Metrics
+ processReceivedChunksCount = metrics.NewRegisteredCounter("network.retrieve.received_chunks.count", nil)
+ handleRetrieveRequestMsgCount = metrics.NewRegisteredCounter("network.retrieve.handle_retrieve_request_msg.count", nil)
+ retrieveChunkFail = metrics.NewRegisteredCounter("network.retrieve.retrieve_chunks_fail.count", nil)
+
+ lastReceivedRetrieveChunksMsg = metrics.GetOrRegisterGauge("network.retrieve.received_chunks", nil)
+
+ // Protocol spec
+ spec = &protocols.Spec{
+ Name: "bzz-retrieve",
+ Version: 1,
+ MaxMsgSize: 10 * 1024 * 1024,
+ Messages: []interface{}{
+ ChunkDelivery{},
+ RetrieveRequest{},
+ },
+ }
+
+ ErrNoPeerFound = errors.New("no peer found")
+)
+
+// Retrieval holds state and handles protocol messages for the `bzz-retrieve` protocol
+type Retrieval struct {
+ mtx sync.Mutex
+ netStore *storage.NetStore
+ kad *network.Kademlia
+ peers map[enode.ID]*Peer
+ spec *protocols.Spec //this protocol's spec
+
+ quit chan struct{} // termination
+}
+
+// NewRetrieval returns a new instance of the retrieval protocol handler
+func New(kad *network.Kademlia, ns *storage.NetStore) *Retrieval {
+ return &Retrieval{
+ kad: kad,
+ peers: make(map[enode.ID]*Peer),
+ netStore: ns,
+ quit: make(chan struct{}),
+ spec: spec,
+ }
+}
+
+func (r *Retrieval) addPeer(p *Peer) {
+ r.mtx.Lock()
+ defer r.mtx.Unlock()
+ r.peers[p.ID()] = p
+}
+
+func (r *Retrieval) removePeer(p *Peer) {
+ r.mtx.Lock()
+ defer r.mtx.Unlock()
+ delete(r.peers, p.ID())
+}
+
+func (r *Retrieval) getPeer(id enode.ID) *Peer {
+ r.mtx.Lock()
+ defer r.mtx.Unlock()
+
+ return r.peers[id]
+}
+
+// Run protocol function
+func (r *Retrieval) Run(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ peer := protocols.NewPeer(p, rw, r.spec)
+ bp := network.NewBzzPeer(peer)
+ sp := NewPeer(bp)
+ r.addPeer(sp)
+ defer r.removePeer(sp)
+ return peer.Run(r.handleMsg(sp))
+}
+
+func (r *Retrieval) handleMsg(p *Peer) func(context.Context, interface{}) error {
+ return func(ctx context.Context, msg interface{}) error {
+ switch msg := msg.(type) {
+ case *RetrieveRequest:
+ go r.handleRetrieveRequest(ctx, p, msg)
+ case *ChunkDelivery:
+ go r.handleChunkDelivery(ctx, p, msg)
+ }
+ return nil
+ }
+}
+
+// getOriginPo returns the originPo if the incoming Request has an Origin
+// if our node is the first node that requests this chunk, then we don't have an Origin,
+// and return -1
+// this is used only for tracing, and can probably be refactor so that we don't have to
+// iterater over Kademlia
+func (r *Retrieval) getOriginPo(req *storage.Request) int {
+ log.Trace("retrieval.getOriginPo", "req.Addr", req.Addr)
+ originPo := -1
+
+ r.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int) bool {
+ id := p.ID()
+
+ // get po between chunk and origin
+ if bytes.Equal(req.Origin.Bytes(), id.Bytes()) {
+ originPo = po
+ return false
+ }
+
+ return true
+ })
+
+ return originPo
+}
+
+// findPeer finds a peer we need to ask for a specific chunk from according to our kademlia
+func (r *Retrieval) findPeer(ctx context.Context, req *storage.Request) (retPeer *network.Peer, err error) {
+ log.Trace("retrieval.findPeer", "req.Addr", req.Addr)
+ osp, _ := ctx.Value("remote.fetch").(opentracing.Span)
+
+ // originPo - proximity of the node that made the request; -1 if the request originator is our node;
+ // myPo - this node's proximity with the requested chunk
+ // selectedPeerPo - kademlia suggested node's proximity with the requested chunk (computed further below)
+ originPo := r.getOriginPo(req)
+ myPo := chunk.Proximity(req.Addr, r.kad.BaseAddr())
+ selectedPeerPo := -1
+
+ depth := r.kad.NeighbourhoodDepth()
+
+ if osp != nil {
+ osp.LogFields(olog.Int("originPo", originPo))
+ osp.LogFields(olog.Int("depth", depth))
+ osp.LogFields(olog.Int("myPo", myPo))
+ }
+
+ // do not forward requests if origin proximity is bigger than our node's proximity
+ // this means that origin is closer to the chunk
+ if originPo > myPo {
+ return nil, errors.New("not forwarding request, origin node is closer to chunk than this node")
+ }
+
+ r.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int) bool {
+ id := p.ID()
+
+ // skip light nodes
+ if p.LightNode {
+ return true
+ }
+
+ // do not send request back to peer who asked us. maybe merge with SkipPeer at some point
+ if bytes.Equal(req.Origin.Bytes(), id.Bytes()) {
+ return true
+ }
+
+ // skip peers that we have already tried
+ if req.SkipPeer(id.String()) {
+ log.Trace("findpeer skip peer", "peer", id, "ref", req.Addr.String())
+ return true
+ }
+
+ if myPo < depth { // chunk is NOT within the neighbourhood
+ if po <= myPo { // always choose a peer strictly closer to chunk than us
+ log.Trace("findpeer1a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
+ return false
+ } else {
+ log.Trace("findpeer1b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
+ }
+ } else { // chunk IS WITHIN neighbourhood
+ if po < depth { // do not select peer outside the neighbourhood. But allows peers further from the chunk than us
+ log.Trace("findpeer2a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
+ return false
+ } else if po <= originPo { // avoid loop in neighbourhood, so not forward when a request comes from the neighbourhood
+ log.Trace("findpeer2b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
+ return false
+ } else {
+ log.Trace("findpeer2c", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
+ }
+ }
+
+ // if selected peer is not in the depth (2nd condition; if depth <= po, then peer is in nearest neighbourhood)
+ // and they have a lower po than ours, return error
+ if po < myPo && depth > po {
+ log.Trace("findpeer4 skip peer because origin was closer", "originpo", originPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
+
+ err = fmt.Errorf("not asking peers further away from origin; ref=%s originpo=%v po=%v depth=%v myPo=%v", req.Addr.String(), originPo, po, depth, myPo)
+ return false
+ }
+
+ // if chunk falls in our nearest neighbourhood (1st condition), but suggested peer is not in
+ // the nearest neighbourhood (2nd condition), don't forward the request to suggested peer
+ if depth <= myPo && depth > po {
+ log.Trace("findpeer5 skip peer because depth", "originpo", originPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
+
+ err = fmt.Errorf("not going outside of depth; ref=%s originpo=%v po=%v depth=%v myPo=%v", req.Addr.String(), originPo, po, depth, myPo)
+ return false
+ }
+
+ retPeer = p
+
+ // sp could be nil, if we encountered a peer that is not registered for delivery, i.e. doesn't support the `stream` protocol
+ // if sp is not nil, then we have selected the next peer and we stop iterating
+ // if sp is nil, we continue iterating
+ if retPeer != nil {
+ selectedPeerPo = po
+
+ return false
+ }
+
+ // continue iterating
+ return true
+ })
+
+ if osp != nil {
+ osp.LogFields(olog.Int("selectedPeerPo", selectedPeerPo))
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ if retPeer == nil {
+ return nil, ErrNoPeerFound
+ }
+
+ return retPeer, nil
+}
+
+// handleRetrieveRequest handles an incoming retrieve request from a certain Peer
+// if the chunk is found in the localstore it is served immediately, otherwise
+// it results in a new retrieve request to candidate peers in our kademlia
+func (r *Retrieval) handleRetrieveRequest(ctx context.Context, p *Peer, msg *RetrieveRequest) {
+ p.logger.Debug("retrieval.handleRetrieveRequest", "ref", msg.Addr)
+ handleRetrieveRequestMsgCount.Inc(1)
+
+ ctx, osp := spancontext.StartSpan(
+ ctx,
+ "handle.retrieve.request")
+
+ osp.LogFields(olog.String("ref", msg.Addr.String()))
+
+ defer osp.Finish()
+
+ ctx, cancel := context.WithTimeout(ctx, timeouts.FetcherGlobalTimeout)
+ defer cancel()
+
+ req := &storage.Request{
+ Addr: msg.Addr,
+ Origin: p.ID(),
+ }
+ chunk, err := r.netStore.Get(ctx, chunk.ModeGetRequest, req)
+ if err != nil {
+ retrieveChunkFail.Inc(1)
+ p.logger.Debug("netstore.Get can not retrieve chunk", "ref", msg.Addr, "err", err)
+ return
+ }
+
+ p.logger.Trace("retrieval.handleRetrieveRequest - delivery", "ref", msg.Addr)
+
+ deliveryMsg := &ChunkDelivery{
+ Addr: chunk.Address(),
+ SData: chunk.Data(),
+ }
+
+ err = p.Send(ctx, deliveryMsg)
+ if err != nil {
+ p.logger.Error("retrieval.handleRetrieveRequest - peer delivery failed", "ref", msg.Addr, "err", err)
+ osp.LogFields(olog.Bool("delivered", false))
+ return
+ }
+ osp.LogFields(olog.Bool("delivered", true))
+}
+
+// handleChunkDelivery handles a ChunkDelivery message from a certain peer
+// if the chunk proximity order in relation to our base address is within depth
+// we treat the chunk as a chunk received in syncing
+func (r *Retrieval) handleChunkDelivery(ctx context.Context, p *Peer, msg *ChunkDelivery) error {
+ p.logger.Debug("retrieval.handleChunkDelivery", "ref", msg.Addr)
+ var osp opentracing.Span
+ ctx, osp = spancontext.StartSpan(
+ ctx,
+ "handle.chunk.delivery")
+
+ processReceivedChunksCount.Inc(1)
+
+ // record the last time we received a chunk delivery message
+ lastReceivedRetrieveChunksMsg.Update(time.Now().UnixNano())
+
+ // count how many chunks we receive for retrieve requests per peer
+ peermetric := fmt.Sprintf("chunk.delivery.%x", p.BzzAddr.Over()[:16])
+ metrics.GetOrRegisterCounter(peermetric, nil).Inc(1)
+
+ peerPO := chunk.Proximity(p.BzzAddr.Over(), msg.Addr)
+ po := chunk.Proximity(r.kad.BaseAddr(), msg.Addr)
+ depth := r.kad.NeighbourhoodDepth()
+ var mode chunk.ModePut
+ // chunks within the area of responsibility should always sync
+ // https://github.com/ethersphere/go-ethereum/pull/1282#discussion_r269406125
+ if po >= depth || peerPO < po {
+ mode = chunk.ModePutSync
+ } else {
+ // do not sync if peer that is sending us a chunk is closer to the chunk then we are
+ mode = chunk.ModePutRequest
+ }
+
+ p.logger.Trace("handle.chunk.delivery", "ref", msg.Addr)
+
+ go func() {
+ defer osp.Finish()
+ p.logger.Trace("handle.chunk.delivery", "put", msg.Addr)
+ _, err := r.netStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData))
+ if err != nil {
+ if err == storage.ErrChunkInvalid {
+ p.Drop()
+ }
+ }
+ p.logger.Trace("handle.chunk.delivery", "done put", msg.Addr, "err", err)
+ }()
+ return nil
+}
+
+// RequestFromPeers sends a chunk retrieve request to the next found peer
+func (r *Retrieval) RequestFromPeers(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) {
+ log.Debug("retrieval.requestFromPeers", "req.Addr", req.Addr)
+ metrics.GetOrRegisterCounter("network.retrieve.request_from_peers", nil).Inc(1)
+
+ const maxFindPeerRetries = 5
+ retries := 0
+
+FINDPEER:
+ sp, err := r.findPeer(ctx, req)
+ if err != nil {
+ log.Trace(err.Error())
+ return nil, err
+ }
+
+ protoPeer := r.getPeer(sp.ID())
+ if protoPeer == nil {
+ retries++
+ if retries == maxFindPeerRetries {
+ log.Error("max find peer retries reached", "max retries", maxFindPeerRetries)
+ return nil, ErrNoPeerFound
+ }
+
+ goto FINDPEER
+ }
+
+ ret := RetrieveRequest{
+ Addr: req.Addr,
+ }
+ protoPeer.logger.Trace("sending retrieve request", "ref", ret.Addr, "origin", localID)
+ err = protoPeer.Send(ctx, ret)
+ if err != nil {
+ protoPeer.logger.Error("error sending retrieve request to peer", "err", err)
+ return nil, err
+ }
+
+ spID := protoPeer.ID()
+ return &spID, nil
+}
+
+func (r *Retrieval) Start(server *p2p.Server) error {
+ log.Info("starting bzz-retrieve")
+ return nil
+}
+
+func (r *Retrieval) Stop() error {
+ log.Info("shutting down bzz-retrieve")
+ close(r.quit)
+ return nil
+}
+
+func (r *Retrieval) Protocols() []p2p.Protocol {
+ return []p2p.Protocol{
+ {
+ Name: r.spec.Name,
+ Version: r.spec.Version,
+ Length: r.spec.Length(),
+ Run: r.Run,
+ },
+ }
+}
+
+func (r *Retrieval) APIs() []rpc.API {
+ return nil
+}
diff --git a/network/retrieval/retrieve_test.go b/network/retrieval/retrieve_test.go
new file mode 100644
index 0000000000..31b6ee1352
--- /dev/null
+++ b/network/retrieval/retrieve_test.go
@@ -0,0 +1,398 @@
+// Copyright 2019 The Swarm Authors
+// This file is part of the Swarm library.
+//
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+
+package retrieval
+
+import (
+ "bytes"
+ "context"
+ "crypto/rand"
+ "encoding/hex"
+ "flag"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+ "github.com/ethersphere/swarm/chunk"
+ "github.com/ethersphere/swarm/network"
+ "github.com/ethersphere/swarm/network/simulation"
+ "github.com/ethersphere/swarm/p2p/protocols"
+ "github.com/ethersphere/swarm/state"
+ "github.com/ethersphere/swarm/storage"
+ "github.com/ethersphere/swarm/storage/localstore"
+ "github.com/ethersphere/swarm/storage/mock"
+ "github.com/ethersphere/swarm/testutil"
+ "golang.org/x/crypto/sha3"
+)
+
+var (
+ loglevel = flag.Int("loglevel", 5, "verbosity of logs")
+ bucketKeyFileStore = simulation.BucketKey("filestore")
+ bucketKeyNetstore = simulation.BucketKey("netstore")
+
+ hash0 = sha3.Sum256([]byte{0})
+)
+
+func init() {
+ flag.Parse()
+
+ log.PrintOrigins(true)
+ log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(false))))
+}
+
+// TestChunkDelivery brings up two nodes, stores a few chunks on the first node, then tries to retrieve them through the second node
+func TestChunkDelivery(t *testing.T) {
+ chunkCount := 10
+ filesize := chunkCount * 4096
+
+ sim := simulation.NewBzzInProc(map[string]simulation.ServiceFunc{
+ "bzz-retrieve": newBzzRetrieveWithLocalstore,
+ })
+ defer sim.Close()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ _, err := sim.AddNode()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ nodeIDs := sim.UpNodeIDs()
+ log.Debug("uploader node", "enode", nodeIDs[0])
+
+ fs := sim.MustNodeItem(nodeIDs[0], bucketKeyFileStore).(*storage.FileStore)
+
+ //put some data into just the first node
+ data := make([]byte, filesize)
+ if _, err := io.ReadFull(rand.Reader, data); err != nil {
+ t.Fatalf("reading from crypto/rand failed: %v", err.Error())
+ }
+ refs, err := getAllRefs(data)
+ if err != nil {
+ return err
+ }
+ log.Trace("got all refs", "refs", refs)
+ _, wait, err := fs.Store(context.Background(), bytes.NewReader(data), int64(filesize), false)
+ if err != nil {
+ return err
+ }
+ if err := wait(context.Background()); err != nil {
+ return err
+ }
+
+ id, err := sim.AddNode()
+ if err != nil {
+ return err
+ }
+ err = sim.Net.Connect(id, nodeIDs[0])
+ if err != nil {
+ return err
+ }
+ nodeIDs = sim.UpNodeIDs()
+ if len(nodeIDs) != 2 {
+ return fmt.Errorf("wrong number of nodes, expected %d got %d", 2, len(nodeIDs))
+ }
+
+ // allow the two nodes time to set up the protocols otherwise kademlias will be empty when retrieve requests happen
+ time.Sleep(50 * time.Millisecond)
+ log.Debug("fetching through node", "enode", nodeIDs[1])
+ ns := sim.MustNodeItem(nodeIDs[1], bucketKeyNetstore).(*storage.NetStore)
+ ctr := 0
+ for _, ch := range refs {
+ ctr++
+ _, err := ns.Get(context.Background(), chunk.ModeGetRequest, storage.NewRequest(ch))
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ if result.Error != nil {
+ t.Fatal(result.Error)
+ }
+}
+
+// TestDeliveryForwarding tests that chunk delivery forwarding requests happen. It creates three nodes (fetching, forwarding and uploading)
+// where po(fetching,forwarding) = 1 and po(forwarding,uploading) = 1, then uploads chunks to the uploading node, afterwards
+// tries to retrieve the relevant chunks (ones with po = 0 to fetching i.e. no bits in common with fetching and with
+// po >= 1 with uploading i.e. with 1 bit or more in common with the uploading)
+func TestDeliveryForwarding(t *testing.T) {
+ chunkCount := 100
+ filesize := chunkCount * 4096
+ sim, uploader, forwarder, fetcher := setupTestDeliveryForwardingSimulation(t)
+ defer sim.Close()
+
+ log.Debug("test delivery forwarding", "uploader", uploader, "forwarder", forwarder, "fetcher", fetcher)
+
+ uploaderNodeStore := sim.MustNodeItem(uploader, bucketKeyFileStore).(*storage.FileStore)
+ fetcherBase := sim.MustNodeItem(fetcher, simulation.BucketKeyKademlia).(*network.Kademlia).BaseAddr()
+ uploaderBase := sim.MustNodeItem(fetcher, simulation.BucketKeyKademlia).(*network.Kademlia).BaseAddr()
+ ctx := context.Background()
+ _, wait, err := uploaderNodeStore.Store(ctx, testutil.RandomReader(101010, filesize), int64(filesize), false)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err = wait(ctx); err != nil {
+ t.Fatal(err)
+ }
+
+ chunks, err := getChunks(uploaderNodeStore.ChunkStore)
+ if err != nil {
+ t.Fatal(err)
+ }
+ for c := range chunks {
+ addr, err := hex.DecodeString(c)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // try to retrieve all of the chunks which have no bits in common with the
+ // fetcher, but have more than one bit in common with the uploader node
+ if chunk.Proximity(addr, fetcherBase) == 0 && chunk.Proximity(addr, uploaderBase) >= 1 {
+ req := storage.NewRequest(chunk.Address(addr))
+ fetcherNetstore := sim.MustNodeItem(fetcher, bucketKeyNetstore).(*storage.NetStore)
+ _, err := fetcherNetstore.Get(ctx, chunk.ModeGetRequest, req)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+ }
+}
+
+func setupTestDeliveryForwardingSimulation(t *testing.T) (sim *simulation.Simulation, uploader, forwarder, fetching enode.ID) {
+ sim = simulation.NewBzzInProc(map[string]simulation.ServiceFunc{
+ "bzz-retrieve": newBzzRetrieveWithLocalstore,
+ })
+
+ fetching, err := sim.AddNode()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ fetcherBase := sim.MustNodeItem(fetching, simulation.BucketKeyKademlia).(*network.Kademlia).BaseAddr()
+
+ override := func(o *adapters.NodeConfig) func(*adapters.NodeConfig) {
+ return func(c *adapters.NodeConfig) {
+ *o = *c
+ }
+ }
+
+ // create a node that will be in po 1 from fetcher
+ forwarderConfig := testutil.NodeConfigAtPo(t, fetcherBase, 1)
+ forwarder, err = sim.AddNode(override(forwarderConfig))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = sim.Net.Connect(fetching, forwarder)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ forwarderBase := sim.MustNodeItem(forwarder, simulation.BucketKeyKademlia).(*network.Kademlia).BaseAddr()
+
+ // create a node on which the files will be stored at po 1 in relation to the forwarding node
+ uploaderConfig := testutil.NodeConfigAtPo(t, forwarderBase, 1)
+ uploader, err = sim.AddNode(override(uploaderConfig))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = sim.Net.Connect(forwarder, uploader)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ return sim, uploader, forwarder, fetching
+}
+
+// if there is one peer in the Kademlia, RequestFromPeers should return it
+func TestRequestFromPeers(t *testing.T) {
+ dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8")
+
+ addr := network.RandomAddr()
+ to := network.NewKademlia(addr.OAddr, network.NewKadParams())
+ protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil)
+ peer := network.NewPeer(&network.BzzPeer{
+ BzzAddr: network.RandomAddr(),
+ LightNode: false,
+ Peer: protocolsPeer,
+ }, to)
+
+ to.On(peer)
+
+ s := New(to, nil)
+
+ req := storage.NewRequest(storage.Address(hash0[:]))
+ id, err := s.findPeer(context.Background(), req)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if id.ID() != dummyPeerID {
+ t.Fatalf("Expected an id, got %v", id)
+ }
+}
+
+// RequestFromPeers should not return light nodes
+func TestRequestFromPeersWithLightNode(t *testing.T) {
+ dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8")
+
+ addr := network.RandomAddr()
+ to := network.NewKademlia(addr.OAddr, network.NewKadParams())
+
+ protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil)
+
+ // setting up a lightnode
+ peer := network.NewPeer(&network.BzzPeer{
+ BzzAddr: network.RandomAddr(),
+ LightNode: true,
+ Peer: protocolsPeer,
+ }, to)
+
+ to.On(peer)
+
+ r := New(to, nil)
+ req := storage.NewRequest(storage.Address(hash0[:]))
+
+ // making a request which should return with "no peer found"
+ _, err := r.findPeer(context.Background(), req)
+
+ if err != ErrNoPeerFound {
+ t.Fatalf("expected '%v', got %v", ErrNoPeerFound, err)
+ }
+}
+
+func newBzzRetrieveWithLocalstore(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ n := ctx.Config.Node()
+ addr := network.NewAddr(n)
+
+ localStore, localStoreCleanup, err := newTestLocalStore(n.ID(), addr, nil)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ var kad *network.Kademlia
+ if kv, ok := bucket.Load(simulation.BucketKeyKademlia); ok {
+ kad = kv.(*network.Kademlia)
+ } else {
+ kad = network.NewKademlia(addr.Over(), network.NewKadParams())
+ bucket.Store(simulation.BucketKeyKademlia, kad)
+ }
+
+ netStore := storage.NewNetStore(localStore, n.ID())
+ lnetStore := storage.NewLNetStore(netStore)
+ fileStore := storage.NewFileStore(lnetStore, storage.NewFileStoreParams(), chunk.NewTags())
+
+ var store *state.DBStore
+ // Use on-disk DBStore to reduce memory consumption in race tests.
+ dir, err := ioutil.TempDir("", "statestore-")
+ if err != nil {
+ return nil, nil, err
+ }
+ store, err = state.NewDBStore(dir)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ r := New(kad, netStore)
+ netStore.RemoteGet = r.RequestFromPeers
+ bucket.Store(bucketKeyFileStore, fileStore)
+ bucket.Store(bucketKeyNetstore, netStore)
+ bucket.Store(simulation.BucketKeyKademlia, kad)
+
+ cleanup = func() {
+ localStore.Close()
+ localStoreCleanup()
+ store.Close()
+ os.RemoveAll(dir)
+ }
+
+ return r, cleanup, nil
+}
+
+func newTestLocalStore(id enode.ID, addr *network.BzzAddr, globalStore mock.GlobalStorer) (localStore *localstore.DB, cleanup func(), err error) {
+ dir, err := ioutil.TempDir("", "localstore-")
+ if err != nil {
+ return nil, nil, err
+ }
+ cleanup = func() {
+ os.RemoveAll(dir)
+ }
+
+ var mockStore *mock.NodeStore
+ if globalStore != nil {
+ mockStore = globalStore.NewNodeStore(common.BytesToAddress(id.Bytes()))
+ }
+
+ localStore, err = localstore.New(dir, addr.Over(), &localstore.Options{
+ MockStore: mockStore,
+ })
+ if err != nil {
+ cleanup()
+ return nil, nil, err
+ }
+ return localStore, cleanup, nil
+}
+
+func getAllRefs(testData []byte) (storage.AddressCollection, error) {
+ datadir, err := ioutil.TempDir("", "chunk-debug")
+ if err != nil {
+ return nil, err
+ }
+ defer os.RemoveAll(datadir)
+ fileStore, cleanup, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags())
+ if err != nil {
+ return nil, err
+ }
+ defer cleanup()
+
+ reader := bytes.NewReader(testData)
+ return fileStore.GetAllReferences(context.Background(), reader, false)
+}
+
+func getChunks(store chunk.Store) (chunks map[string]struct{}, err error) {
+ chunks = make(map[string]struct{})
+ for po := uint8(0); po <= chunk.MaxPO; po++ {
+ last, err := store.LastPullSubscriptionBinID(uint8(po))
+ if err != nil {
+ return nil, err
+ }
+ if last == 0 {
+ continue
+ }
+ ch, _ := store.SubscribePull(context.Background(), po, 0, last)
+ for c := range ch {
+ addr := c.Address.Hex()
+ if _, ok := chunks[addr]; ok {
+ return nil, fmt.Errorf("duplicate chunk %s", addr)
+ }
+ chunks[addr] = struct{}{}
+ }
+ }
+ return chunks, nil
+}
diff --git a/network/retrieval/wire.go b/network/retrieval/wire.go
new file mode 100644
index 0000000000..d92db6250f
--- /dev/null
+++ b/network/retrieval/wire.go
@@ -0,0 +1,30 @@
+// Copyright 2019 The Swarm Authors
+// This file is part of the Swarm library.
+//
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+
+package retrieval
+
+import "github.com/ethersphere/swarm/storage"
+
+// RetrieveRequestMsg is the protocol msg for chunk retrieve requests
+type RetrieveRequest struct {
+ Addr storage.Address
+}
+
+// ChunkDelivery is the protocol msg for delivering a solicited chunk to a peer
+type ChunkDelivery struct {
+ Addr storage.Address
+ SData []byte // the stored chunk Data (incl size)
+}
diff --git a/network/simulation/bucket.go b/network/simulation/bucket.go
index 49a1f43091..fb300538eb 100644
--- a/network/simulation/bucket.go
+++ b/network/simulation/bucket.go
@@ -16,7 +16,11 @@
package simulation
-import "github.com/ethereum/go-ethereum/p2p/enode"
+import (
+ "fmt"
+
+ "github.com/ethereum/go-ethereum/p2p/enode"
+)
// BucketKey is the type that should be used for keys in simulation buckets.
type BucketKey string
@@ -32,6 +36,24 @@ func (s *Simulation) NodeItem(id enode.ID, key interface{}) (value interface{},
return s.buckets[id].Load(key)
}
+// MustNodeItem returns the item set in ServiceFunc for a particular node or panics in case
+// the item is not found
+func (s *Simulation) MustNodeItem(id enode.ID, key interface{}) (value interface{}) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if _, ok := s.buckets[id]; !ok {
+ e := fmt.Errorf("cannot find node id %s in bucket", id.String())
+ panic(e)
+ }
+ if v, ok := s.buckets[id].Load(key); ok {
+ return v
+ } else {
+ e := fmt.Errorf("cannot find key %v on node bucket", key)
+ panic(e)
+ }
+}
+
// SetNodeItem sets a new item associated with the node with provided NodeID.
// Buckets should be used to avoid managing separate simulation global state.
func (s *Simulation) SetNodeItem(id enode.ID, key interface{}, value interface{}) {
diff --git a/network/simulation/simulation.go b/network/simulation/simulation.go
index 64ee44c52b..18a866c825 100644
--- a/network/simulation/simulation.go
+++ b/network/simulation/simulation.go
@@ -99,6 +99,34 @@ func NewInProc(services map[string]ServiceFunc) (s *Simulation) {
return s
}
+// NewBzzInProc is the same as NewInProc but injects bzz as a default protocol
+func NewBzzInProc(services map[string]ServiceFunc) (s *Simulation) {
+ services["bzz"] = func(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) {
+ addr := network.NewAddr(ctx.Config.Node())
+ hp := network.NewHiveParams()
+ hp.KeepAliveInterval = time.Duration(200) * time.Millisecond
+ hp.Discovery = false
+ var kad *network.Kademlia
+
+ // check if another kademlia already exists and load it if necessary - we dont want two independent copies of it
+ if kv, ok := bucket.Load(BucketKeyKademlia); ok {
+ kad = kv.(*network.Kademlia)
+ } else {
+ kad = network.NewKademlia(addr.Over(), network.NewKadParams())
+ bucket.Store(BucketKeyKademlia, kad)
+ }
+
+ config := &network.BzzConfig{
+ OverlayAddr: addr.Over(),
+ UnderlayAddr: addr.Under(),
+ HiveParams: hp,
+ }
+ return network.NewBzz(config, kad, nil, nil, nil), nil, nil
+ }
+
+ return NewInProc(services)
+}
+
// NewExec does the same as New but lets the caller specify the adapter to use
func NewExec(services map[string]ServiceFunc) (s *Simulation, err error) {
s = &Simulation{
diff --git a/testutil/node.go b/testutil/node.go
new file mode 100644
index 0000000000..64ab682a58
--- /dev/null
+++ b/testutil/node.go
@@ -0,0 +1,58 @@
+// Copyright 2019 The Swarm Authors
+// This file is part of the Swarm library.
+//
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+
+package testutil
+
+import (
+ "net"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/p2p/enr"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+ "github.com/ethersphere/swarm/chunk"
+ "github.com/ethersphere/swarm/network"
+)
+
+// NodeConfigAtPo brute forces a node config to create a node that has an overlay address at the provided po in relation to the given baseaddr
+func NodeConfigAtPo(t *testing.T, baseaddr []byte, po int) *adapters.NodeConfig {
+ foundPo := -1
+ var conf *adapters.NodeConfig
+ for foundPo != po {
+ conf = adapters.RandomNodeConfig()
+ ip := net.IPv4(127, 0, 0, 1)
+ enrIP := enr.IP(ip)
+ conf.Record.Set(&enrIP)
+ enrTCPPort := enr.TCP(conf.Port)
+ conf.Record.Set(&enrTCPPort)
+ enrUDPPort := enr.UDP(0)
+ conf.Record.Set(&enrUDPPort)
+
+ err := enode.SignV4(&conf.Record, conf.PrivateKey)
+ if err != nil {
+ t.Fatalf("unable to generate ENR: %v", err)
+ }
+ nod, err := enode.New(enode.V4ID{}, &conf.Record)
+ if err != nil {
+ t.Fatalf("unable to create enode: %v", err)
+ }
+
+ n := network.NewAddr(nod)
+ foundPo = chunk.Proximity(baseaddr, n.Over())
+ }
+
+ return conf
+}