Merge branch 'master' into max-stream-peer-servers
This commit is contained in:
@@ -128,6 +128,7 @@ func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, err
|
||||
type RetrieveRequestMsg struct {
|
||||
Addr storage.Address
|
||||
SkipCheck bool
|
||||
HopCount uint8
|
||||
}
|
||||
|
||||
func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *RetrieveRequestMsg) error {
|
||||
@@ -148,7 +149,9 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
|
||||
|
||||
var cancel func()
|
||||
// TODO: do something with this hardcoded timeout, maybe use TTL in the future
|
||||
ctx, cancel = context.WithTimeout(context.WithValue(ctx, "peer", sp.ID().String()), network.RequestTimeout)
|
||||
ctx = context.WithValue(ctx, "peer", sp.ID().String())
|
||||
ctx = context.WithValue(ctx, "hopcount", req.HopCount)
|
||||
ctx, cancel = context.WithTimeout(ctx, network.RequestTimeout)
|
||||
|
||||
go func() {
|
||||
select {
|
||||
@@ -247,6 +250,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
|
||||
err := sp.SendPriority(ctx, &RetrieveRequestMsg{
|
||||
Addr: req.Addr,
|
||||
SkipCheck: req.SkipCheck,
|
||||
HopCount: req.HopCount,
|
||||
}, Top)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
@@ -26,7 +26,7 @@ import (
|
||||
bv "github.com/ethereum/go-ethereum/swarm/network/bitvector"
|
||||
"github.com/ethereum/go-ethereum/swarm/spancontext"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
)
|
||||
|
||||
var syncBatchTimeout = 30 * time.Second
|
||||
@@ -197,10 +197,16 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hashes := req.Hashes
|
||||
want, err := bv.New(len(hashes) / HashSize)
|
||||
lenHashes := len(hashes)
|
||||
if lenHashes%HashSize != 0 {
|
||||
return fmt.Errorf("error invalid hashes length (len: %v)", lenHashes)
|
||||
}
|
||||
|
||||
want, err := bv.New(lenHashes / HashSize)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error initiaising bitvector of length %v: %v", len(hashes)/HashSize, err)
|
||||
return fmt.Errorf("error initiaising bitvector of length %v: %v", lenHashes/HashSize, err)
|
||||
}
|
||||
|
||||
ctr := 0
|
||||
@@ -208,7 +214,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
|
||||
ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout)
|
||||
|
||||
ctx = context.WithValue(ctx, "source", p.ID().String())
|
||||
for i := 0; i < len(hashes); i += HashSize {
|
||||
for i := 0; i < lenHashes; i += HashSize {
|
||||
hash := hashes[i : i+HashSize]
|
||||
|
||||
if wait := c.NeedData(ctx, hash); wait != nil {
|
||||
|
@@ -642,7 +642,7 @@ func (c *clientParams) clientCreated() {
|
||||
// Spec is the spec of the streamer protocol
|
||||
var Spec = &protocols.Spec{
|
||||
Name: "stream",
|
||||
Version: 6,
|
||||
Version: 7,
|
||||
MaxMsgSize: 10 * 1024 * 1024,
|
||||
Messages: []interface{}{
|
||||
UnsubscribeMsg{},
|
||||
|
@@ -19,6 +19,7 @@ package stream
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -56,11 +57,12 @@ func TestStreamerRequestSubscription(t *testing.T) {
|
||||
}
|
||||
|
||||
var (
|
||||
hash0 = sha3.Sum256([]byte{0})
|
||||
hash1 = sha3.Sum256([]byte{1})
|
||||
hash2 = sha3.Sum256([]byte{2})
|
||||
hashesTmp = append(hash0[:], hash1[:]...)
|
||||
hashes = append(hashesTmp, hash2[:]...)
|
||||
hash0 = sha3.Sum256([]byte{0})
|
||||
hash1 = sha3.Sum256([]byte{1})
|
||||
hash2 = sha3.Sum256([]byte{2})
|
||||
hashesTmp = append(hash0[:], hash1[:]...)
|
||||
hashes = append(hashesTmp, hash2[:]...)
|
||||
corruptHashes = append(hashes[:40])
|
||||
)
|
||||
|
||||
type testClient struct {
|
||||
@@ -460,6 +462,71 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
|
||||
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
|
||||
defer teardown()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
stream := NewStream("foo", "", true)
|
||||
|
||||
var tc *testClient
|
||||
|
||||
streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
|
||||
tc = newTestClient(t)
|
||||
return tc, nil
|
||||
})
|
||||
|
||||
node := tester.Nodes[0]
|
||||
|
||||
err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error, got %v", err)
|
||||
}
|
||||
|
||||
err = tester.TestExchanges(p2ptest.Exchange{
|
||||
Label: "Subscribe message",
|
||||
Expects: []p2ptest.Expect{
|
||||
{
|
||||
Code: 4,
|
||||
Msg: &SubscribeMsg{
|
||||
Stream: stream,
|
||||
History: NewRange(5, 8),
|
||||
Priority: Top,
|
||||
},
|
||||
Peer: node.ID(),
|
||||
},
|
||||
},
|
||||
},
|
||||
p2ptest.Exchange{
|
||||
Label: "Corrupt offered hash message",
|
||||
Triggers: []p2ptest.Trigger{
|
||||
{
|
||||
Code: 1,
|
||||
Msg: &OfferedHashesMsg{
|
||||
HandoverProof: &HandoverProof{
|
||||
Handover: &Handover{},
|
||||
},
|
||||
Hashes: corruptHashes,
|
||||
From: 5,
|
||||
To: 8,
|
||||
Stream: stream,
|
||||
},
|
||||
Peer: node.ID(),
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expectedError := errors.New("Message handler error: (msg code 1): error invalid hashes length (len: 40)")
|
||||
if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: tester.Nodes[0].ID(), Error: expectedError}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
|
||||
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
|
||||
defer teardown()
|
||||
|
Reference in New Issue
Block a user