swarm/network/stream: generalise setting of next batch (#17818)
* swarm/network/stream: generalize SetNextBatch and add Server SessionIndex * swarm/network/stream: fix a typo in comment * swarm/network/stream: remove live argument from NewSwarmSyncerServer
This commit is contained in:
		@@ -96,6 +96,11 @@ func (s *SwarmChunkServer) processDeliveries() {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SessionIndex returns zero in all cases for SwarmChunkServer.
 | 
			
		||||
func (s *SwarmChunkServer) SessionIndex() (uint64, error) {
 | 
			
		||||
	return 0, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetNextBatch
 | 
			
		||||
func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) {
 | 
			
		||||
	select {
 | 
			
		||||
@@ -141,7 +146,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
 | 
			
		||||
		"retrieve.request")
 | 
			
		||||
	defer osp.Finish()
 | 
			
		||||
 | 
			
		||||
	s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", false))
 | 
			
		||||
	s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -88,7 +88,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
 | 
			
		||||
	peer := streamer.getPeer(node.ID())
 | 
			
		||||
 | 
			
		||||
	peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{
 | 
			
		||||
		Stream:   NewStream(swarmChunkServerStreamName, "", false),
 | 
			
		||||
		Stream:   NewStream(swarmChunkServerStreamName, "", true),
 | 
			
		||||
		History:  nil,
 | 
			
		||||
		Priority: Top,
 | 
			
		||||
	})
 | 
			
		||||
@@ -136,7 +136,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
 | 
			
		||||
	node := tester.Nodes[0]
 | 
			
		||||
	peer := streamer.getPeer(node.ID())
 | 
			
		||||
 | 
			
		||||
	stream := NewStream(swarmChunkServerStreamName, "", false)
 | 
			
		||||
	stream := NewStream(swarmChunkServerStreamName, "", true)
 | 
			
		||||
 | 
			
		||||
	peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{
 | 
			
		||||
		Stream:   stream,
 | 
			
		||||
@@ -409,7 +409,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
 | 
			
		||||
				return fmt.Errorf("No registry")
 | 
			
		||||
			}
 | 
			
		||||
			registry := item.(*Registry)
 | 
			
		||||
			err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top)
 | 
			
		||||
			err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", true), NewRange(0, 0), Top)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
 
 | 
			
		||||
@@ -345,8 +345,6 @@ func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*
 | 
			
		||||
 | 
			
		||||
func (c *testExternalClient) Close() {}
 | 
			
		||||
 | 
			
		||||
const testExternalServerBatchSize = 10
 | 
			
		||||
 | 
			
		||||
type testExternalServer struct {
 | 
			
		||||
	t         string
 | 
			
		||||
	keyFunc   func(key []byte, index uint64)
 | 
			
		||||
@@ -366,17 +364,11 @@ func newTestExternalServer(t string, sessionAt, maxKeys uint64, keyFunc func(key
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *testExternalServer) SessionIndex() (uint64, error) {
 | 
			
		||||
	return s.sessionAt, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
 | 
			
		||||
	if from == 0 && to == 0 {
 | 
			
		||||
		from = s.sessionAt
 | 
			
		||||
		to = s.sessionAt + testExternalServerBatchSize
 | 
			
		||||
	}
 | 
			
		||||
	if to-from > testExternalServerBatchSize {
 | 
			
		||||
		to = from + testExternalServerBatchSize - 1
 | 
			
		||||
	}
 | 
			
		||||
	if from >= s.maxKeys && to > s.maxKeys {
 | 
			
		||||
		return nil, 0, 0, nil, io.EOF
 | 
			
		||||
	}
 | 
			
		||||
	if to > s.maxKeys {
 | 
			
		||||
		to = s.maxKeys
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -166,7 +166,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
 | 
			
		||||
		"send.offered.hashes")
 | 
			
		||||
	defer sp.Finish()
 | 
			
		||||
 | 
			
		||||
	hashes, from, to, proof, err := s.SetNextBatch(f, t)
 | 
			
		||||
	hashes, from, to, proof, err := s.setNextBatch(f, t)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
@@ -214,10 +214,15 @@ func (p *Peer) setServer(s Stream, o Server, priority uint8) (*server, error) {
 | 
			
		||||
		return nil, ErrMaxPeerServers
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	sessionIndex, err := o.SessionIndex()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	os := &server{
 | 
			
		||||
		Server:   o,
 | 
			
		||||
		stream:   s,
 | 
			
		||||
		priority: priority,
 | 
			
		||||
		Server:       o,
 | 
			
		||||
		stream:       s,
 | 
			
		||||
		priority:     priority,
 | 
			
		||||
		sessionIndex: sessionIndex,
 | 
			
		||||
	}
 | 
			
		||||
	p.servers[s] = os
 | 
			
		||||
	return os, nil
 | 
			
		||||
 
 | 
			
		||||
@@ -375,7 +375,7 @@ func (r *Registry) Run(p *network.BzzPeer) error {
 | 
			
		||||
	defer sp.close()
 | 
			
		||||
 | 
			
		||||
	if r.doRetrieve {
 | 
			
		||||
		err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", false), nil, Top)
 | 
			
		||||
		err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
@@ -500,10 +500,38 @@ type server struct {
 | 
			
		||||
	stream       Stream
 | 
			
		||||
	priority     uint8
 | 
			
		||||
	currentBatch []byte
 | 
			
		||||
	sessionIndex uint64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// setNextBatch adjusts passed interval based on session index and whether
 | 
			
		||||
// stream is live or history. It calls Server SetNextBatch with adjusted
 | 
			
		||||
// interval and returns batch hashes and their interval.
 | 
			
		||||
func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
 | 
			
		||||
	if s.stream.Live {
 | 
			
		||||
		if from == 0 {
 | 
			
		||||
			from = s.sessionIndex
 | 
			
		||||
		}
 | 
			
		||||
		if to <= from || from >= s.sessionIndex {
 | 
			
		||||
			to = math.MaxUint64
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		if (to < from && to != 0) || from > s.sessionIndex {
 | 
			
		||||
			return nil, 0, 0, nil, nil
 | 
			
		||||
		}
 | 
			
		||||
		if to == 0 || to > s.sessionIndex {
 | 
			
		||||
			to = s.sessionIndex
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return s.SetNextBatch(from, to)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Server interface for outgoing peer Streamer
 | 
			
		||||
type Server interface {
 | 
			
		||||
	// SessionIndex is called when a server is initialized
 | 
			
		||||
	// to get the current cursor state of the stream data.
 | 
			
		||||
	// Based on this index, live and history stream intervals
 | 
			
		||||
	// will be adjusted before calling SetNextBatch.
 | 
			
		||||
	SessionIndex() (uint64, error)
 | 
			
		||||
	SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
 | 
			
		||||
	GetData(context.Context, []byte) ([]byte, error)
 | 
			
		||||
	Close()
 | 
			
		||||
 
 | 
			
		||||
@@ -107,15 +107,21 @@ func (self *testClient) BatchDone(Stream, uint64, []byte, []byte) func() (*Takeo
 | 
			
		||||
func (self *testClient) Close() {}
 | 
			
		||||
 | 
			
		||||
type testServer struct {
 | 
			
		||||
	t string
 | 
			
		||||
	t            string
 | 
			
		||||
	sessionIndex uint64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newTestServer(t string) *testServer {
 | 
			
		||||
func newTestServer(t string, sessionIndex uint64) *testServer {
 | 
			
		||||
	return &testServer{
 | 
			
		||||
		t: t,
 | 
			
		||||
		t:            t,
 | 
			
		||||
		sessionIndex: sessionIndex,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *testServer) SessionIndex() (uint64, error) {
 | 
			
		||||
	return s.sessionIndex, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
 | 
			
		||||
	return make([]byte, HashSize), from + 1, to + 1, nil, nil
 | 
			
		||||
}
 | 
			
		||||
@@ -230,7 +236,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
 | 
			
		||||
	stream := NewStream("foo", "", false)
 | 
			
		||||
 | 
			
		||||
	streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
 | 
			
		||||
		return newTestServer(t), nil
 | 
			
		||||
		return newTestServer(t, 10), nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	node := tester.Nodes[0]
 | 
			
		||||
@@ -297,7 +303,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
 | 
			
		||||
	stream := NewStream("foo", "", true)
 | 
			
		||||
 | 
			
		||||
	streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
 | 
			
		||||
		return newTestServer(t), nil
 | 
			
		||||
		return newTestServer(t, 0), nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	node := tester.Nodes[0]
 | 
			
		||||
@@ -324,7 +330,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
 | 
			
		||||
					},
 | 
			
		||||
					Hashes: make([]byte, HashSize),
 | 
			
		||||
					From:   1,
 | 
			
		||||
					To:     1,
 | 
			
		||||
					To:     0,
 | 
			
		||||
				},
 | 
			
		||||
				Peer: node.ID(),
 | 
			
		||||
			},
 | 
			
		||||
@@ -361,7 +367,7 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
 | 
			
		||||
		return newTestServer(t), nil
 | 
			
		||||
		return newTestServer(t, 0), nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	stream := NewStream("bar", "", true)
 | 
			
		||||
@@ -407,9 +413,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
 | 
			
		||||
	stream := NewStream("foo", "", true)
 | 
			
		||||
 | 
			
		||||
	streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
 | 
			
		||||
		return &testServer{
 | 
			
		||||
			t: t,
 | 
			
		||||
		}, nil
 | 
			
		||||
		return newTestServer(t, 10), nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	node := tester.Nodes[0]
 | 
			
		||||
@@ -448,8 +452,8 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
 | 
			
		||||
					HandoverProof: &HandoverProof{
 | 
			
		||||
						Handover: &Handover{},
 | 
			
		||||
					},
 | 
			
		||||
					From:   1,
 | 
			
		||||
					To:     1,
 | 
			
		||||
					From:   11,
 | 
			
		||||
					To:     0,
 | 
			
		||||
					Hashes: make([]byte, HashSize),
 | 
			
		||||
				},
 | 
			
		||||
				Peer: node.ID(),
 | 
			
		||||
@@ -634,7 +638,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
 | 
			
		||||
		return newTestServer(t), nil
 | 
			
		||||
		return newTestServer(t, 10), nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	node := tester.Nodes[0]
 | 
			
		||||
@@ -694,8 +698,8 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
 | 
			
		||||
						HandoverProof: &HandoverProof{
 | 
			
		||||
							Handover: &Handover{},
 | 
			
		||||
						},
 | 
			
		||||
						From:   1,
 | 
			
		||||
						To:     1,
 | 
			
		||||
						From:   11,
 | 
			
		||||
						To:     0,
 | 
			
		||||
						Hashes: make([]byte, HashSize),
 | 
			
		||||
					},
 | 
			
		||||
					Peer: node.ID(),
 | 
			
		||||
@@ -769,7 +773,7 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
 | 
			
		||||
		return newTestServer(t), nil
 | 
			
		||||
		return newTestServer(t, 0), nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	node := tester.Nodes[0]
 | 
			
		||||
@@ -799,7 +803,7 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
 | 
			
		||||
						},
 | 
			
		||||
						Hashes: make([]byte, HashSize),
 | 
			
		||||
						From:   1,
 | 
			
		||||
						To:     1,
 | 
			
		||||
						To:     0,
 | 
			
		||||
					},
 | 
			
		||||
					Peer: node.ID(),
 | 
			
		||||
				},
 | 
			
		||||
@@ -843,7 +847,7 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
 | 
			
		||||
		return newTestServer(t), nil
 | 
			
		||||
		return newTestServer(t, 0), nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	node := tester.Nodes[0]
 | 
			
		||||
@@ -903,7 +907,7 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
 | 
			
		||||
						},
 | 
			
		||||
						Hashes: make([]byte, HashSize),
 | 
			
		||||
						From:   1,
 | 
			
		||||
						To:     1,
 | 
			
		||||
						To:     0,
 | 
			
		||||
					},
 | 
			
		||||
					Peer: node.ID(),
 | 
			
		||||
				},
 | 
			
		||||
 
 | 
			
		||||
@@ -18,7 +18,6 @@ package stream
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"math"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
@@ -36,38 +35,27 @@ const (
 | 
			
		||||
// * live request delivery with or without checkback
 | 
			
		||||
// * (live/non-live historical) chunk syncing per proximity bin
 | 
			
		||||
type SwarmSyncerServer struct {
 | 
			
		||||
	po        uint8
 | 
			
		||||
	store     storage.SyncChunkStore
 | 
			
		||||
	sessionAt uint64
 | 
			
		||||
	start     uint64
 | 
			
		||||
	live      bool
 | 
			
		||||
	quit      chan struct{}
 | 
			
		||||
	po    uint8
 | 
			
		||||
	store storage.SyncChunkStore
 | 
			
		||||
	quit  chan struct{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewSwarmSyncerServer is contructor for SwarmSyncerServer
 | 
			
		||||
func NewSwarmSyncerServer(live bool, po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) {
 | 
			
		||||
	sessionAt := syncChunkStore.BinIndex(po)
 | 
			
		||||
	var start uint64
 | 
			
		||||
	if live {
 | 
			
		||||
		start = sessionAt
 | 
			
		||||
	}
 | 
			
		||||
// NewSwarmSyncerServer is constructor for SwarmSyncerServer
 | 
			
		||||
func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) {
 | 
			
		||||
	return &SwarmSyncerServer{
 | 
			
		||||
		po:        po,
 | 
			
		||||
		store:     syncChunkStore,
 | 
			
		||||
		sessionAt: sessionAt,
 | 
			
		||||
		start:     start,
 | 
			
		||||
		live:      live,
 | 
			
		||||
		quit:      make(chan struct{}),
 | 
			
		||||
		po:    po,
 | 
			
		||||
		store: syncChunkStore,
 | 
			
		||||
		quit:  make(chan struct{}),
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) {
 | 
			
		||||
	streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, live bool) (Server, error) {
 | 
			
		||||
	streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) {
 | 
			
		||||
		po, err := ParseSyncBinKey(t)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		return NewSwarmSyncerServer(live, po, syncChunkStore)
 | 
			
		||||
		return NewSwarmSyncerServer(po, syncChunkStore)
 | 
			
		||||
	})
 | 
			
		||||
	// streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) {
 | 
			
		||||
	// 	return NewOutgoingProvableSwarmSyncer(po, db)
 | 
			
		||||
@@ -88,25 +76,15 @@ func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, er
 | 
			
		||||
	return chunk.Data(), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SessionIndex returns current storage bin (po) index.
 | 
			
		||||
func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
 | 
			
		||||
	return s.store.BinIndex(s.po), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetBatch retrieves the next batch of hashes from the dbstore
 | 
			
		||||
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
 | 
			
		||||
	var batch []byte
 | 
			
		||||
	i := 0
 | 
			
		||||
	if s.live {
 | 
			
		||||
		if from == 0 {
 | 
			
		||||
			from = s.start
 | 
			
		||||
		}
 | 
			
		||||
		if to <= from || from >= s.sessionAt {
 | 
			
		||||
			to = math.MaxUint64
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		if (to < from && to != 0) || from > s.sessionAt {
 | 
			
		||||
			return nil, 0, 0, nil, nil
 | 
			
		||||
		}
 | 
			
		||||
		if to == 0 || to > s.sessionAt {
 | 
			
		||||
			to = s.sessionAt
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var ticker *time.Ticker
 | 
			
		||||
	defer func() {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user