diff --git a/network/stream/delivery_test.go b/network/stream/delivery_test.go index 88f88650fa..3f124522ca 100644 --- a/network/stream/delivery_test.go +++ b/network/stream/delivery_test.go @@ -72,10 +72,9 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { { //to which the peer responds with offered hashes Code: 1, Msg: &OfferedHashesMsg{ - HandoverProof: nil, - Hashes: nil, - From: 0, - To: 0, + Hashes: nil, + From: 0, + To: 0, }, Peer: node.ID(), }, diff --git a/network/stream/intervals_test.go b/network/stream/intervals_test.go index b3244ecc9b..2479adbf66 100644 --- a/network/stream/intervals_test.go +++ b/network/stream/intervals_test.go @@ -314,10 +314,6 @@ func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(con return wait } -func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) { - return nil -} - func (c *testExternalClient) Close() {} type testExternalServer struct { @@ -343,7 +339,7 @@ func (s *testExternalServer) SessionIndex() (uint64, error) { return s.sessionAt, nil } -func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { +func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, error) { if to > s.maxKeys { to = s.maxKeys } @@ -351,7 +347,7 @@ func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint6 for i := from; i <= to; i++ { s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i) } - return b, from, to, nil, nil + return b, from, to, nil } func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) { diff --git a/network/stream/messages.go b/network/stream/messages.go index 550ca0a997..cb51229b80 100644 --- a/network/stream/messages.go +++ b/network/stream/messages.go @@ -183,10 +183,9 @@ func (p *Peer) handleQuitMsg(req *QuitMsg) error { // OfferedHashesMsg is the protocol msg for offering to hand over a // stream section type OfferedHashesMsg struct { - Stream Stream // name of Stream - From, To uint64 // peer and db-specific entry count - Hashes []byte // stream of hashes (128) - *HandoverProof // HandoverProof + Stream Stream // name of Stream + From, To uint64 // peer and db-specific entry count + Hashes []byte // stream of hashes (128) } // String pretty prints OfferedHashesMsg @@ -265,7 +264,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg } } select { - case c.next <- c.batchDone(p, req, hashes): + case c.next <- c.AddInterval(req.From, req.To): case <-c.quit: log.Debug("client.handleOfferedHashesMsg() quit") case <-ctx.Done(): @@ -385,12 +384,6 @@ type Handover struct { Root []byte // Root hash for indexed segment inclusion proofs } -// HandoverProof represents a signed statement that the upstream peer handed over the stream section -type HandoverProof struct { - Sig []byte // Sign(Hash(Serialisation(Handover))) - *Handover -} - // Takeover represents a statement that downstream peer took over (stored all data) // handed over type Takeover Handover diff --git a/network/stream/peer.go b/network/stream/peer.go index ab14829d4b..28aa0717ad 100644 --- a/network/stream/peer.go +++ b/network/stream/peer.go @@ -183,7 +183,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { defer metrics.GetOrRegisterResettingTimer("send.offered.hashes", nil).UpdateSince(time.Now()) - hashes, from, to, proof, err := s.setNextBatch(f, t) + hashes, from, to, err := s.setNextBatch(f, t) if err != nil { return err } @@ -191,18 +191,12 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { if len(hashes) == 0 { return nil } - if proof == nil { - proof = &HandoverProof{ - Handover: &Handover{}, - } - } s.currentBatch = hashes msg := &OfferedHashesMsg{ - HandoverProof: proof, - Hashes: hashes, - From: from, - To: to, - Stream: s.stream, + Hashes: hashes, + From: from, + To: to, + Stream: s.stream, } log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to) ctx = context.WithValue(ctx, "stream_send_tag", "send.offered.hashes") diff --git a/network/stream/stream.go b/network/stream/stream.go index bab134f718..3da8f6c876 100644 --- a/network/stream/stream.go +++ b/network/stream/stream.go @@ -474,7 +474,7 @@ type server struct { // setNextBatch adjusts passed interval based on session index and whether // stream is live or history. It calls Server SetNextBatch with adjusted // interval and returns batch hashes and their interval. -func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { +func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, error) { if s.stream.Live { if from == 0 { from = s.sessionIndex @@ -484,7 +484,7 @@ func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *Handove } } else { if (to < from && to != 0) || from > s.sessionIndex { - return nil, 0, 0, nil, nil + return nil, 0, 0, nil } if to == 0 || to > s.sessionIndex { to = s.sessionIndex @@ -500,7 +500,7 @@ type Server interface { // Based on this index, live and history stream intervals // will be adjusted before calling SetNextBatch. SessionIndex() (uint64, error) - SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) + SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, err error) GetData(context.Context, []byte) ([]byte, error) Close() } @@ -544,7 +544,6 @@ func (c *client) NextInterval() (start, end uint64, err error) { // Client interface for incoming peer Streamer type Client interface { NeedData(context.Context, []byte) func(context.Context) error - BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) Close() } @@ -574,24 +573,6 @@ func (c *client) nextBatch(from uint64) (nextFrom uint64, nextTo uint64) { return } -func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error { - if tf := c.BatchDone(req.Stream, req.From, hashes, req.Root); tf != nil { - tp, err := tf() - if err != nil { - return err - } - - if err := p.Send(context.TODO(), tp); err != nil { - return err - } - if c.to > 0 && tp.Takeover.End >= c.to { - return p.streamer.Unsubscribe(p.Peer.ID(), req.Stream) - } - return nil - } - return c.AddInterval(req.From, req.To) -} - func (c *client) close() { select { case <-c.quit: diff --git a/network/stream/streamer_test.go b/network/stream/streamer_test.go index bb88a6470c..c631900539 100644 --- a/network/stream/streamer_test.go +++ b/network/stream/streamer_test.go @@ -81,7 +81,6 @@ type testClient struct { t string wait0 chan bool wait2 chan bool - batchDone chan bool receivedHashes map[string][]byte } @@ -90,7 +89,6 @@ func newTestClient(t string) *testClient { t: t, wait0: make(chan bool), wait2: make(chan bool), - batchDone: make(chan bool), receivedHashes: make(map[string][]byte), } } @@ -111,11 +109,6 @@ func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context. return nil } -func (self *testClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) { - close(self.batchDone) - return nil -} - func (self *testClient) Close() {} type testServer struct { @@ -134,8 +127,8 @@ func (s *testServer) SessionIndex() (uint64, error) { return s.sessionIndex, nil } -func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { - return make([]byte, HashSize), from + 1, to + 1, nil, nil +func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, error) { + return make([]byte, HashSize), from + 1, to + 1, nil } func (self *testServer) GetData(context.Context, []byte) ([]byte, error) { @@ -186,9 +179,6 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { { Code: 1, Msg: &OfferedHashesMsg{ - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: hashes, From: 5, To: 8, @@ -271,9 +261,6 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: make([]byte, HashSize), From: 6, To: 9, @@ -337,9 +324,6 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: make([]byte, HashSize), From: 1, To: 0, @@ -448,9 +432,6 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { Code: 1, Msg: &OfferedHashesMsg{ Stream: NewStream("foo", "", false), - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: make([]byte, HashSize), From: 6, To: 9, @@ -461,9 +442,6 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, From: 11, To: 0, Hashes: make([]byte, HashSize), @@ -521,9 +499,6 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { { Code: 1, Msg: &OfferedHashesMsg{ - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: corruptHashes, From: 5, To: 8, @@ -586,9 +561,6 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { { Code: 1, Msg: &OfferedHashesMsg{ - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: hashes, From: 5, To: 8, @@ -620,26 +592,9 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { close(tc.wait0) - timeout := time.NewTimer(100 * time.Millisecond) - defer timeout.Stop() - - select { - case <-tc.batchDone: - t.Fatal("batch done early") - case <-timeout.C: - } + time.Sleep(100 * time.Millisecond) close(tc.wait2) - - timeout2 := time.NewTimer(10000 * time.Millisecond) - defer timeout2.Stop() - - select { - case <-tc.batchDone: - case <-timeout2.C: - t.Fatal("timeout waiting batchdone call") - } - } func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { @@ -694,9 +649,6 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { Code: 1, Msg: &OfferedHashesMsg{ Stream: NewStream("foo", "", false), - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: make([]byte, HashSize), From: 6, To: 9, @@ -707,9 +659,6 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, From: 11, To: 0, Hashes: make([]byte, HashSize), @@ -811,9 +760,6 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: make([]byte, HashSize), From: 1, To: 0, @@ -915,9 +861,6 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: make([]byte, HashSize), From: 1, To: 0, diff --git a/network/stream/syncer.go b/network/stream/syncer.go index 4b18d1008a..f73d959ce8 100644 --- a/network/stream/syncer.go +++ b/network/stream/syncer.go @@ -92,7 +92,7 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { // chunk addresses. If at least one chunk is added to the batch and no new chunks // are added in batchTimeout period, the batch will be returned. This function // will block until new chunks are received from localstore pull subscription. -func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { +func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, error) { batchStart := time.Now() descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to) defer stop() @@ -131,7 +131,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 if err != nil { metrics.GetOrRegisterCounter("syncer.set-next-batch.set-sync-err", nil).Inc(1) log.Debug("syncer pull subscription - err setting chunk as synced", "correlateId", s.correlateId, "err", err) - return nil, 0, 0, nil, err + return nil, 0, 0, err } batchSize++ if batchStartID == nil { @@ -171,7 +171,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 // if batch start id is not set, return 0 batchStartID = new(uint64) } - return batch, *batchStartID, batchEndID, nil, nil + return batch, *batchStartID, batchEndID, nil } // SwarmSyncerClient @@ -203,15 +203,6 @@ func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func return s.netStore.FetchFunc(ctx, key) } -// BatchDone -func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error) { - // TODO: reenable this with putter/getter refactored code - // if s.chunker != nil { - // return func() (*TakeoverProof, error) { return s.TakeoverProof(stream, from, hashes, root) } - // } - return nil -} - func (s *SwarmSyncerClient) Close() {} // base for parsing and formating sync bin key