swarm/network/stream: remove dead code (#1422)

* swarm/network/stream: remove dead code

* swarm/network/stream: remove HandoverProof
This commit is contained in:
Anton Evangelatov
2019-06-05 12:59:46 +02:00
committed by GitHub
parent 6d0902da3c
commit bd1887189c
7 changed files with 23 additions and 126 deletions

View File

@ -72,10 +72,9 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
{ //to which the peer responds with offered hashes
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: nil,
Hashes: nil,
From: 0,
To: 0,
Hashes: nil,
From: 0,
To: 0,
},
Peer: node.ID(),
},

View File

@ -314,10 +314,6 @@ func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(con
return wait
}
func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
return nil
}
func (c *testExternalClient) Close() {}
type testExternalServer struct {
@ -343,7 +339,7 @@ func (s *testExternalServer) SessionIndex() (uint64, error) {
return s.sessionAt, nil
}
func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, error) {
if to > s.maxKeys {
to = s.maxKeys
}
@ -351,7 +347,7 @@ func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint6
for i := from; i <= to; i++ {
s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i)
}
return b, from, to, nil, nil
return b, from, to, nil
}
func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) {

View File

@ -183,10 +183,9 @@ func (p *Peer) handleQuitMsg(req *QuitMsg) error {
// OfferedHashesMsg is the protocol msg for offering to hand over a
// stream section
type OfferedHashesMsg struct {
Stream Stream // name of Stream
From, To uint64 // peer and db-specific entry count
Hashes []byte // stream of hashes (128)
*HandoverProof // HandoverProof
Stream Stream // name of Stream
From, To uint64 // peer and db-specific entry count
Hashes []byte // stream of hashes (128)
}
// String pretty prints OfferedHashesMsg
@ -265,7 +264,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
}
}
select {
case c.next <- c.batchDone(p, req, hashes):
case c.next <- c.AddInterval(req.From, req.To):
case <-c.quit:
log.Debug("client.handleOfferedHashesMsg() quit")
case <-ctx.Done():
@ -385,12 +384,6 @@ type Handover struct {
Root []byte // Root hash for indexed segment inclusion proofs
}
// HandoverProof represents a signed statement that the upstream peer handed over the stream section
type HandoverProof struct {
Sig []byte // Sign(Hash(Serialisation(Handover)))
*Handover
}
// Takeover represents a statement that downstream peer took over (stored all data)
// handed over
type Takeover Handover

View File

@ -183,7 +183,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
defer metrics.GetOrRegisterResettingTimer("send.offered.hashes", nil).UpdateSince(time.Now())
hashes, from, to, proof, err := s.setNextBatch(f, t)
hashes, from, to, err := s.setNextBatch(f, t)
if err != nil {
return err
}
@ -191,18 +191,12 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
if len(hashes) == 0 {
return nil
}
if proof == nil {
proof = &HandoverProof{
Handover: &Handover{},
}
}
s.currentBatch = hashes
msg := &OfferedHashesMsg{
HandoverProof: proof,
Hashes: hashes,
From: from,
To: to,
Stream: s.stream,
Hashes: hashes,
From: from,
To: to,
Stream: s.stream,
}
log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
ctx = context.WithValue(ctx, "stream_send_tag", "send.offered.hashes")

View File

@ -474,7 +474,7 @@ type server struct {
// setNextBatch adjusts passed interval based on session index and whether
// stream is live or history. It calls Server SetNextBatch with adjusted
// interval and returns batch hashes and their interval.
func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, error) {
if s.stream.Live {
if from == 0 {
from = s.sessionIndex
@ -484,7 +484,7 @@ func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *Handove
}
} else {
if (to < from && to != 0) || from > s.sessionIndex {
return nil, 0, 0, nil, nil
return nil, 0, 0, nil
}
if to == 0 || to > s.sessionIndex {
to = s.sessionIndex
@ -500,7 +500,7 @@ type Server interface {
// Based on this index, live and history stream intervals
// will be adjusted before calling SetNextBatch.
SessionIndex() (uint64, error)
SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, err error)
GetData(context.Context, []byte) ([]byte, error)
Close()
}
@ -544,7 +544,6 @@ func (c *client) NextInterval() (start, end uint64, err error) {
// Client interface for incoming peer Streamer
type Client interface {
NeedData(context.Context, []byte) func(context.Context) error
BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
Close()
}
@ -574,24 +573,6 @@ func (c *client) nextBatch(from uint64) (nextFrom uint64, nextTo uint64) {
return
}
func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error {
if tf := c.BatchDone(req.Stream, req.From, hashes, req.Root); tf != nil {
tp, err := tf()
if err != nil {
return err
}
if err := p.Send(context.TODO(), tp); err != nil {
return err
}
if c.to > 0 && tp.Takeover.End >= c.to {
return p.streamer.Unsubscribe(p.Peer.ID(), req.Stream)
}
return nil
}
return c.AddInterval(req.From, req.To)
}
func (c *client) close() {
select {
case <-c.quit:

View File

@ -81,7 +81,6 @@ type testClient struct {
t string
wait0 chan bool
wait2 chan bool
batchDone chan bool
receivedHashes map[string][]byte
}
@ -90,7 +89,6 @@ func newTestClient(t string) *testClient {
t: t,
wait0: make(chan bool),
wait2: make(chan bool),
batchDone: make(chan bool),
receivedHashes: make(map[string][]byte),
}
}
@ -111,11 +109,6 @@ func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.
return nil
}
func (self *testClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
close(self.batchDone)
return nil
}
func (self *testClient) Close() {}
type testServer struct {
@ -134,8 +127,8 @@ func (s *testServer) SessionIndex() (uint64, error) {
return s.sessionIndex, nil
}
func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
return make([]byte, HashSize), from + 1, to + 1, nil, nil
func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, error) {
return make([]byte, HashSize), from + 1, to + 1, nil
}
func (self *testServer) GetData(context.Context, []byte) ([]byte, error) {
@ -186,9 +179,6 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
{
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: hashes,
From: 5,
To: 8,
@ -271,9 +261,6 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 6,
To: 9,
@ -337,9 +324,6 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 1,
To: 0,
@ -448,9 +432,6 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: NewStream("foo", "", false),
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 6,
To: 9,
@ -461,9 +442,6 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
From: 11,
To: 0,
Hashes: make([]byte, HashSize),
@ -521,9 +499,6 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
{
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: corruptHashes,
From: 5,
To: 8,
@ -586,9 +561,6 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
{
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: hashes,
From: 5,
To: 8,
@ -620,26 +592,9 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
close(tc.wait0)
timeout := time.NewTimer(100 * time.Millisecond)
defer timeout.Stop()
select {
case <-tc.batchDone:
t.Fatal("batch done early")
case <-timeout.C:
}
time.Sleep(100 * time.Millisecond)
close(tc.wait2)
timeout2 := time.NewTimer(10000 * time.Millisecond)
defer timeout2.Stop()
select {
case <-tc.batchDone:
case <-timeout2.C:
t.Fatal("timeout waiting batchdone call")
}
}
func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
@ -694,9 +649,6 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: NewStream("foo", "", false),
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 6,
To: 9,
@ -707,9 +659,6 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
From: 11,
To: 0,
Hashes: make([]byte, HashSize),
@ -811,9 +760,6 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 1,
To: 0,
@ -915,9 +861,6 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 1,
To: 0,

View File

@ -92,7 +92,7 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
// chunk addresses. If at least one chunk is added to the batch and no new chunks
// are added in batchTimeout period, the batch will be returned. This function
// will block until new chunks are received from localstore pull subscription.
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, error) {
batchStart := time.Now()
descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to)
defer stop()
@ -131,7 +131,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
if err != nil {
metrics.GetOrRegisterCounter("syncer.set-next-batch.set-sync-err", nil).Inc(1)
log.Debug("syncer pull subscription - err setting chunk as synced", "correlateId", s.correlateId, "err", err)
return nil, 0, 0, nil, err
return nil, 0, 0, err
}
batchSize++
if batchStartID == nil {
@ -171,7 +171,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
// if batch start id is not set, return 0
batchStartID = new(uint64)
}
return batch, *batchStartID, batchEndID, nil, nil
return batch, *batchStartID, batchEndID, nil
}
// SwarmSyncerClient
@ -203,15 +203,6 @@ func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func
return s.netStore.FetchFunc(ctx, key)
}
// BatchDone
func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error) {
// TODO: reenable this with putter/getter refactored code
// if s.chunker != nil {
// return func() (*TakeoverProof, error) { return s.TakeoverProof(stream, from, hashes, root) }
// }
return nil
}
func (s *SwarmSyncerClient) Close() {}
// base for parsing and formating sync bin key